查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import java.util.Collections;
19  import java.util.Iterator;
20  import java.util.LinkedHashMap;
21  import java.util.Set;
22  import java.util.concurrent.ThreadFactory;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  /**
27   * Abstract base class for {@link EventExecutorGroup} implementations that handles their tasks with multiple threads at
28   * the same time.
29   */
30  public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
31  
32      private final EventExecutor[] children;
33      private final AtomicInteger childIndex = new AtomicInteger();
34      private final AtomicInteger terminatedChildren = new AtomicInteger();
35      private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
36      private final EventExecutorChooser chooser;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param nThreads          the number of threads that will be used by this instance.
42       * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used.
43       * @param args              arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call
44       */
45      protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
46          if (nThreads <= 0) {
47              throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
48          }
49  
50          if (threadFactory == null) {
51              threadFactory = newDefaultThreadFactory();
52          }
53  
54          children = new SingleThreadEventExecutor[nThreads];
55          if (isPowerOfTwo(children.length)) {
56              chooser = new PowerOfTwoEventExecutorChooser();
57          } else {
58              chooser = new GenericEventExecutorChooser();
59          }
60  
61          for (int i = 0; i < nThreads; i ++) {
62              boolean success = false;
63              try {
64                  children[i] = newChild(threadFactory, args);
65                  success = true;
66              } catch (Exception e) {
67                  // TODO: Think about if this is a good exception type
68                  throw new IllegalStateException("failed to create a child event loop", e);
69              } finally {
70                  if (!success) {
71                      for (int j = 0; j < i; j ++) {
72                          children[j].shutdownGracefully();
73                      }
74  
75                      for (int j = 0; j < i; j ++) {
76                          EventExecutor e = children[j];
77                          try {
78                              while (!e.isTerminated()) {
79                                  e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
80                              }
81                          } catch (InterruptedException interrupted) {
82                              Thread.currentThread().interrupt();
83                              break;
84                          }
85                      }
86                  }
87              }
88          }
89  
90          final FutureListener<Object> terminationListener = new FutureListener<Object>() {
91              @Override
92              public void operationComplete(Future<Object> future) throws Exception {
93                  if (terminatedChildren.incrementAndGet() == children.length) {
94                      terminationFuture.setSuccess(null);
95                  }
96              }
97          };
98  
99          for (EventExecutor e: children) {
100             e.terminationFuture().addListener(terminationListener);
101         }
102     }
103 
104     protected ThreadFactory newDefaultThreadFactory() {
105         return new DefaultThreadFactory(getClass());
106     }
107 
108     @Override
109     public EventExecutor next() {
110         return chooser.next();
111     }
112 
113     @Override
114     public Iterator<EventExecutor> iterator() {
115         return children().iterator();
116     }
117 
118     /**
119      * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
120      * 1:1 to the threads it use.
121      */
122     public final int executorCount() {
123         return children.length;
124     }
125 
126     /**
127      * Return a safe-copy of all of the children of this group.
128      */
129     protected Set<EventExecutor> children() {
130         Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
131         Collections.addAll(children, this.children);
132         return children;
133     }
134 
135     /**
136      * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
137      * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
138      *
139      */
140     protected abstract EventExecutor newChild(
141             ThreadFactory threadFactory, Object... args) throws Exception;
142 
143     @Override
144     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
145         for (EventExecutor l: children) {
146             l.shutdownGracefully(quietPeriod, timeout, unit);
147         }
148         return terminationFuture();
149     }
150 
151     @Override
152     public Future<?> terminationFuture() {
153         return terminationFuture;
154     }
155 
156     @Override
157     @Deprecated
158     public void shutdown() {
159         for (EventExecutor l: children) {
160             l.shutdown();
161         }
162     }
163 
164     @Override
165     public boolean isShuttingDown() {
166         for (EventExecutor l: children) {
167             if (!l.isShuttingDown()) {
168                 return false;
169             }
170         }
171         return true;
172     }
173 
174     @Override
175     public boolean isShutdown() {
176         for (EventExecutor l: children) {
177             if (!l.isShutdown()) {
178                 return false;
179             }
180         }
181         return true;
182     }
183 
184     @Override
185     public boolean isTerminated() {
186         for (EventExecutor l: children) {
187             if (!l.isTerminated()) {
188                 return false;
189             }
190         }
191         return true;
192     }
193 
194     @Override
195     public boolean awaitTermination(long timeout, TimeUnit unit)
196             throws InterruptedException {
197         long deadline = System.nanoTime() + unit.toNanos(timeout);
198         loop: for (EventExecutor l: children) {
199             for (;;) {
200                 long timeLeft = deadline - System.nanoTime();
201                 if (timeLeft <= 0) {
202                     break loop;
203                 }
204                 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
205                     break;
206                 }
207             }
208         }
209         return isTerminated();
210     }
211 
212     private static boolean isPowerOfTwo(int val) {
213         return (val & -val) == val;
214     }
215 
216     private interface EventExecutorChooser {
217         EventExecutor next();
218     }
219 
220     private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
221         @Override
222         public EventExecutor next() {
223             return children[childIndex.getAndIncrement() & children.length - 1];
224         }
225     }
226 
227     private final class GenericEventExecutorChooser implements EventExecutorChooser {
228         @Override
229         public EventExecutor next() {
230             return children[Math.abs(childIndex.getAndIncrement() % children.length)];
231         }
232     }
233 }