1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18
19 import io.netty.util.concurrent.AbstractEventExecutorGroup;
20 import io.netty.util.concurrent.DefaultPromise;
21 import io.netty.util.concurrent.DefaultThreadFactory;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.FutureListener;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.concurrent.ThreadPerTaskExecutor;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PlatformDependent;
31 import io.netty.util.internal.ReadOnlyIterator;
32
33 import java.util.Collections;
34 import java.util.Iterator;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.ThreadFactory;
41 import java.util.concurrent.TimeUnit;
42
43
44
45
46
47
48 @Deprecated
49 public class ThreadPerChannelEventLoopGroup extends AbstractEventExecutorGroup implements EventLoopGroup {
50
51 private final Object[] childArgs;
52 private final int maxChannels;
53 final Executor executor;
54 final Set<EventLoop> activeChildren =
55 Collections.newSetFromMap(PlatformDependent.<EventLoop, Boolean>newConcurrentHashMap());
56 final Queue<EventLoop> idleChildren = new ConcurrentLinkedQueue<EventLoop>();
57 private final ChannelException tooManyChannels;
58
59 private volatile boolean shuttingDown;
60 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
61 private final FutureListener<Object> childTerminationListener = new FutureListener<Object>() {
62 @Override
63 public void operationComplete(Future<Object> future) throws Exception {
64
65 if (isTerminated()) {
66 terminationFuture.trySuccess(null);
67 }
68 }
69 };
70
71
72
73
74 protected ThreadPerChannelEventLoopGroup() {
75 this(0);
76 }
77
78
79
80
81
82
83
84
85
86
87 protected ThreadPerChannelEventLoopGroup(int maxChannels) {
88 this(maxChannels, (ThreadFactory) null);
89 }
90
91
92
93
94
95
96
97
98
99
100
101
102
103 protected ThreadPerChannelEventLoopGroup(int maxChannels, ThreadFactory threadFactory, Object... args) {
104 this(maxChannels, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119 protected ThreadPerChannelEventLoopGroup(int maxChannels, Executor executor, Object... args) {
120 ObjectUtil.checkPositiveOrZero(maxChannels, "maxChannels");
121 if (executor == null) {
122 executor = new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass()));
123 }
124
125 if (args == null) {
126 childArgs = EmptyArrays.EMPTY_OBJECTS;
127 } else {
128 childArgs = args.clone();
129 }
130
131 this.maxChannels = maxChannels;
132 this.executor = executor;
133
134 tooManyChannels =
135 ChannelException.newStatic("too many channels (max: " + maxChannels + ')',
136 ThreadPerChannelEventLoopGroup.class, "nextChild()");
137 }
138
139
140
141
142 protected EventLoop newChild(@SuppressWarnings("UnusedParameters") Object... args) throws Exception {
143 return new ThreadPerChannelEventLoop(this);
144 }
145
146 @Override
147 public Iterator<EventExecutor> iterator() {
148 return new ReadOnlyIterator<EventExecutor>(activeChildren.iterator());
149 }
150
151 @Override
152 public EventLoop next() {
153 throw new UnsupportedOperationException();
154 }
155
156 @Override
157 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
158 shuttingDown = true;
159
160 for (EventLoop l: activeChildren) {
161 l.shutdownGracefully(quietPeriod, timeout, unit);
162 }
163 for (EventLoop l: idleChildren) {
164 l.shutdownGracefully(quietPeriod, timeout, unit);
165 }
166
167
168 if (isTerminated()) {
169 terminationFuture.trySuccess(null);
170 }
171
172 return terminationFuture();
173 }
174
175 @Override
176 public Future<?> terminationFuture() {
177 return terminationFuture;
178 }
179
180 @Override
181 @Deprecated
182 public void shutdown() {
183 shuttingDown = true;
184
185 for (EventLoop l: activeChildren) {
186 l.shutdown();
187 }
188 for (EventLoop l: idleChildren) {
189 l.shutdown();
190 }
191
192
193 if (isTerminated()) {
194 terminationFuture.trySuccess(null);
195 }
196 }
197
198 @Override
199 public boolean isShuttingDown() {
200 for (EventLoop l: activeChildren) {
201 if (!l.isShuttingDown()) {
202 return false;
203 }
204 }
205 for (EventLoop l: idleChildren) {
206 if (!l.isShuttingDown()) {
207 return false;
208 }
209 }
210 return true;
211 }
212
213 @Override
214 public boolean isShutdown() {
215 for (EventLoop l: activeChildren) {
216 if (!l.isShutdown()) {
217 return false;
218 }
219 }
220 for (EventLoop l: idleChildren) {
221 if (!l.isShutdown()) {
222 return false;
223 }
224 }
225 return true;
226 }
227
228 @Override
229 public boolean isTerminated() {
230 for (EventLoop l: activeChildren) {
231 if (!l.isTerminated()) {
232 return false;
233 }
234 }
235 for (EventLoop l: idleChildren) {
236 if (!l.isTerminated()) {
237 return false;
238 }
239 }
240 return true;
241 }
242
243 @Override
244 public boolean awaitTermination(long timeout, TimeUnit unit)
245 throws InterruptedException {
246 long deadline = System.nanoTime() + unit.toNanos(timeout);
247 for (EventLoop l: activeChildren) {
248 for (;;) {
249 long timeLeft = deadline - System.nanoTime();
250 if (timeLeft <= 0) {
251 return isTerminated();
252 }
253 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
254 break;
255 }
256 }
257 }
258 for (EventLoop l: idleChildren) {
259 for (;;) {
260 long timeLeft = deadline - System.nanoTime();
261 if (timeLeft <= 0) {
262 return isTerminated();
263 }
264 if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
265 break;
266 }
267 }
268 }
269 return isTerminated();
270 }
271
272 @Override
273 public ChannelFuture register(Channel channel) {
274 ObjectUtil.checkNotNull(channel, "channel");
275 try {
276 EventLoop l = nextChild();
277 return l.register(new DefaultChannelPromise(channel, l));
278 } catch (Throwable t) {
279 return new FailedChannelFuture(channel, GlobalEventExecutor.INSTANCE, t);
280 }
281 }
282
283 @Override
284 public ChannelFuture register(ChannelPromise promise) {
285 try {
286 return nextChild().register(promise);
287 } catch (Throwable t) {
288 promise.setFailure(t);
289 return promise;
290 }
291 }
292
293 @Deprecated
294 @Override
295 public ChannelFuture register(Channel channel, ChannelPromise promise) {
296 ObjectUtil.checkNotNull(channel, "channel");
297 try {
298 return nextChild().register(channel, promise);
299 } catch (Throwable t) {
300 promise.setFailure(t);
301 return promise;
302 }
303 }
304
305 private EventLoop nextChild() throws Exception {
306 if (shuttingDown) {
307 throw new RejectedExecutionException("shutting down");
308 }
309
310 EventLoop loop = idleChildren.poll();
311 if (loop == null) {
312 if (maxChannels > 0 && activeChildren.size() >= maxChannels) {
313 throw tooManyChannels;
314 }
315 loop = newChild(childArgs);
316 loop.terminationFuture().addListener(childTerminationListener);
317 }
318 activeChildren.add(loop);
319 return loop;
320 }
321 }