1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.logging.InternalLogger;
19 import io.netty.util.internal.logging.InternalLoggerFactory;
20
21 import java.util.Collections;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.Delayed;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.RejectedExecutionHandler;
29 import java.util.concurrent.RunnableScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33
34 import static java.util.concurrent.TimeUnit.NANOSECONDS;
35
36
37
38
39
40
41
42
43 public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor {
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(
45 UnorderedThreadPoolEventExecutor.class);
46
47 private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
48 private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
49
50
51
52
53
54 public UnorderedThreadPoolEventExecutor(int corePoolSize) {
55 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class));
56 }
57
58
59
60
61 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) {
62 super(corePoolSize, threadFactory);
63 }
64
65
66
67
68
69 public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) {
70 this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler);
71 }
72
73
74
75
76 public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory,
77 RejectedExecutionHandler handler) {
78 super(corePoolSize, threadFactory, handler);
79 }
80
81 @Override
82 public EventExecutor next() {
83 return this;
84 }
85
86 @Override
87 public EventExecutorGroup parent() {
88 return this;
89 }
90
91 @Override
92 public boolean inEventLoop() {
93 return false;
94 }
95
96 @Override
97 public boolean inEventLoop(Thread thread) {
98 return false;
99 }
100
101 @Override
102 public <V> Promise<V> newPromise() {
103 return new DefaultPromise<V>(this);
104 }
105
106 @Override
107 public <V> ProgressivePromise<V> newProgressivePromise() {
108 return new DefaultProgressivePromise<V>(this);
109 }
110
111 @Override
112 public <V> Future<V> newSucceededFuture(V result) {
113 return new SucceededFuture<V>(this, result);
114 }
115
116 @Override
117 public <V> Future<V> newFailedFuture(Throwable cause) {
118 return new FailedFuture<V>(this, cause);
119 }
120
121 @Override
122 public boolean isShuttingDown() {
123 return isShutdown();
124 }
125
126 @Override
127 public List<Runnable> shutdownNow() {
128 List<Runnable> tasks = super.shutdownNow();
129 terminationFuture.trySuccess(null);
130 return tasks;
131 }
132
133 @Override
134 public void shutdown() {
135 super.shutdown();
136 terminationFuture.trySuccess(null);
137 }
138
139 @Override
140 public Future<?> shutdownGracefully() {
141 return shutdownGracefully(2, 15, TimeUnit.SECONDS);
142 }
143
144 @Override
145 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
146
147
148 shutdown();
149 return terminationFuture();
150 }
151
152 @Override
153 public Future<?> terminationFuture() {
154 return terminationFuture;
155 }
156
157 @Override
158 public Iterator<EventExecutor> iterator() {
159 return executorSet.iterator();
160 }
161
162 @Override
163 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
164 return runnable instanceof NonNotifyRunnable ?
165 task : new RunnableScheduledFutureTask<V>(this, task, false);
166 }
167
168 @Override
169 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
170 return new RunnableScheduledFutureTask<V>(this, task, true);
171 }
172
173 @Override
174 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
175 return (ScheduledFuture<?>) super.schedule(command, delay, unit);
176 }
177
178 @Override
179 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
180 return (ScheduledFuture<V>) super.schedule(callable, delay, unit);
181 }
182
183 @Override
184 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
185 return (ScheduledFuture<?>) super.scheduleAtFixedRate(command, initialDelay, period, unit);
186 }
187
188 @Override
189 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
190 return (ScheduledFuture<?>) super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
191 }
192
193 @Override
194 public Future<?> submit(Runnable task) {
195 return (Future<?>) super.submit(task);
196 }
197
198 @Override
199 public <T> Future<T> submit(Runnable task, T result) {
200 return (Future<T>) super.submit(task, result);
201 }
202
203 @Override
204 public <T> Future<T> submit(Callable<T> task) {
205 return (Future<T>) super.submit(task);
206 }
207
208 @Override
209 public void execute(Runnable command) {
210 super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
211 }
212
213 private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
214 implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
215 private final RunnableScheduledFuture<V> future;
216 private final boolean wasCallable;
217
218 RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future, boolean wasCallable) {
219 super(executor, future);
220 this.future = future;
221 this.wasCallable = wasCallable;
222 }
223
224 @Override
225 V runTask() throws Throwable {
226 V result = super.runTask();
227 if (result == null && wasCallable) {
228
229
230
231
232 assert future.isDone();
233 try {
234 return future.get();
235 } catch (ExecutionException e) {
236
237 throw e.getCause();
238 }
239 }
240 return result;
241 }
242
243 @Override
244 public void run() {
245 if (!isPeriodic()) {
246 super.run();
247 } else if (!isDone()) {
248 try {
249
250 runTask();
251 } catch (Throwable cause) {
252 if (!tryFailureInternal(cause)) {
253 logger.warn("Failure during execution of task", cause);
254 }
255 }
256 }
257 }
258
259 @Override
260 public boolean isPeriodic() {
261 return future.isPeriodic();
262 }
263
264 @Override
265 public long getDelay(TimeUnit unit) {
266 return future.getDelay(unit);
267 }
268
269 @Override
270 public int compareTo(Delayed o) {
271 return future.compareTo(o);
272 }
273 }
274
275
276
277
278
279
280
281
282 private static final class NonNotifyRunnable implements Runnable {
283
284 private final Runnable task;
285
286 NonNotifyRunnable(Runnable task) {
287 this.task = task;
288 }
289
290 @Override
291 public void run() {
292 task.run();
293 }
294 }
295 }