1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.UnstableApi;
21
22 import java.util.Collection;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Queue;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.RejectedExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicReference;
33
34
35
36
37
38
39
40
41 @UnstableApi
42 public final class NonStickyEventExecutorGroup implements EventExecutorGroup {
43 private final EventExecutorGroup group;
44 private final int maxTaskExecutePerRun;
45
46
47
48
49
50 public NonStickyEventExecutorGroup(EventExecutorGroup group) {
51 this(group, 1024);
52 }
53
54
55
56
57
58 public NonStickyEventExecutorGroup(EventExecutorGroup group, int maxTaskExecutePerRun) {
59 this.group = verify(group);
60 this.maxTaskExecutePerRun = ObjectUtil.checkPositive(maxTaskExecutePerRun, "maxTaskExecutePerRun");
61 }
62
63 private static EventExecutorGroup verify(EventExecutorGroup group) {
64 Iterator<EventExecutor> executors = ObjectUtil.checkNotNull(group, "group").iterator();
65 while (executors.hasNext()) {
66 EventExecutor executor = executors.next();
67 if (executor instanceof OrderedEventExecutor) {
68 throw new IllegalArgumentException("EventExecutorGroup " + group
69 + " contains OrderedEventExecutors: " + executor);
70 }
71 }
72 return group;
73 }
74
75 private NonStickyOrderedEventExecutor newExecutor(EventExecutor executor) {
76 return new NonStickyOrderedEventExecutor(executor, maxTaskExecutePerRun);
77 }
78
79 @Override
80 public boolean isShuttingDown() {
81 return group.isShuttingDown();
82 }
83
84 @Override
85 public Future<?> shutdownGracefully() {
86 return group.shutdownGracefully();
87 }
88
89 @Override
90 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
91 return group.shutdownGracefully(quietPeriod, timeout, unit);
92 }
93
94 @Override
95 public Future<?> terminationFuture() {
96 return group.terminationFuture();
97 }
98
99 @SuppressWarnings("deprecation")
100 @Override
101 public void shutdown() {
102 group.shutdown();
103 }
104
105 @SuppressWarnings("deprecation")
106 @Override
107 public List<Runnable> shutdownNow() {
108 return group.shutdownNow();
109 }
110
111 @Override
112 public EventExecutor next() {
113 return newExecutor(group.next());
114 }
115
116 @Override
117 public Iterator<EventExecutor> iterator() {
118 final Iterator<EventExecutor> itr = group.iterator();
119 return new Iterator<EventExecutor>() {
120 @Override
121 public boolean hasNext() {
122 return itr.hasNext();
123 }
124
125 @Override
126 public EventExecutor next() {
127 return newExecutor(itr.next());
128 }
129
130 @Override
131 public void remove() {
132 itr.remove();
133 }
134 };
135 }
136
137 @Override
138 public Future<?> submit(Runnable task) {
139 return group.submit(task);
140 }
141
142 @Override
143 public <T> Future<T> submit(Runnable task, T result) {
144 return group.submit(task, result);
145 }
146
147 @Override
148 public <T> Future<T> submit(Callable<T> task) {
149 return group.submit(task);
150 }
151
152 @Override
153 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
154 return group.schedule(command, delay, unit);
155 }
156
157 @Override
158 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
159 return group.schedule(callable, delay, unit);
160 }
161
162 @Override
163 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
164 return group.scheduleAtFixedRate(command, initialDelay, period, unit);
165 }
166
167 @Override
168 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
169 return group.scheduleWithFixedDelay(command, initialDelay, delay, unit);
170 }
171
172 @Override
173 public boolean isShutdown() {
174 return group.isShutdown();
175 }
176
177 @Override
178 public boolean isTerminated() {
179 return group.isTerminated();
180 }
181
182 @Override
183 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
184 return group.awaitTermination(timeout, unit);
185 }
186
187 @Override
188 public <T> List<java.util.concurrent.Future<T>> invokeAll(
189 Collection<? extends Callable<T>> tasks) throws InterruptedException {
190 return group.invokeAll(tasks);
191 }
192
193 @Override
194 public <T> List<java.util.concurrent.Future<T>> invokeAll(
195 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
196 return group.invokeAll(tasks, timeout, unit);
197 }
198
199 @Override
200 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
201 return group.invokeAny(tasks);
202 }
203
204 @Override
205 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
206 throws InterruptedException, ExecutionException, TimeoutException {
207 return group.invokeAny(tasks, timeout, unit);
208 }
209
210 @Override
211 public void execute(Runnable command) {
212 group.execute(command);
213 }
214
215 private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor
216 implements Runnable, OrderedEventExecutor {
217 private final EventExecutor executor;
218 private final Queue<Runnable> tasks = PlatformDependent.newMpscQueue();
219
220 private static final int NONE = 0;
221 private static final int SUBMITTED = 1;
222 private static final int RUNNING = 2;
223
224 private final AtomicInteger state = new AtomicInteger();
225 private final int maxTaskExecutePerRun;
226
227 private final AtomicReference<Thread> executingThread = new AtomicReference<Thread>();
228
229 NonStickyOrderedEventExecutor(EventExecutor executor, int maxTaskExecutePerRun) {
230 super(executor);
231 this.executor = executor;
232 this.maxTaskExecutePerRun = maxTaskExecutePerRun;
233 }
234
235 @Override
236 public void run() {
237 if (!state.compareAndSet(SUBMITTED, RUNNING)) {
238 return;
239 }
240 Thread current = Thread.currentThread();
241 executingThread.set(current);
242 for (;;) {
243 int i = 0;
244 try {
245 for (; i < maxTaskExecutePerRun; i++) {
246 Runnable task = tasks.poll();
247 if (task == null) {
248 break;
249 }
250 safeExecute(task);
251 }
252 } finally {
253 if (i == maxTaskExecutePerRun) {
254 try {
255 state.set(SUBMITTED);
256
257 executingThread.compareAndSet(current, null);
258 executor.execute(this);
259 return;
260 } catch (Throwable ignore) {
261
262 state.set(RUNNING);
263
264
265
266 }
267 } else {
268 state.set(NONE);
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 if (tasks.isEmpty() || !state.compareAndSet(NONE, RUNNING)) {
285
286 executingThread.compareAndSet(current, null);
287 return;
288 }
289 }
290 }
291 }
292 }
293
294 @Override
295 public boolean inEventLoop(Thread thread) {
296 return executingThread.get() == thread;
297 }
298
299 @Override
300 public boolean isShuttingDown() {
301 return executor.isShutdown();
302 }
303
304 @Override
305 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
306 return executor.shutdownGracefully(quietPeriod, timeout, unit);
307 }
308
309 @Override
310 public Future<?> terminationFuture() {
311 return executor.terminationFuture();
312 }
313
314 @Override
315 public void shutdown() {
316 executor.shutdown();
317 }
318
319 @Override
320 public boolean isShutdown() {
321 return executor.isShutdown();
322 }
323
324 @Override
325 public boolean isTerminated() {
326 return executor.isTerminated();
327 }
328
329 @Override
330 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
331 return executor.awaitTermination(timeout, unit);
332 }
333
334 @Override
335 public void execute(Runnable command) {
336 if (!tasks.offer(command)) {
337 throw new RejectedExecutionException();
338 }
339 if (state.compareAndSet(NONE, SUBMITTED)) {
340
341
342 executor.execute(this);
343 }
344 }
345 }
346 }