1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.PlatformDependent;
20 import io.netty.util.internal.SystemPropertyUtil;
21 import io.netty.util.internal.ThreadExecutorMap;
22 import io.netty.util.internal.UnstableApi;
23 import io.netty.util.internal.logging.InternalLogger;
24 import io.netty.util.internal.logging.InternalLoggerFactory;
25 import org.jetbrains.annotations.Async.Schedule;
26
27 import java.lang.Thread.State;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.LinkedHashSet;
31 import java.util.List;
32 import java.util.Queue;
33 import java.util.Set;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.LinkedBlockingQueue;
40 import java.util.concurrent.RejectedExecutionException;
41 import java.util.concurrent.ThreadFactory;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
46
47
48
49
50
51 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
52
53 static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
54 SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
55
56 private static final InternalLogger logger =
57 InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
58
59 private static final int ST_NOT_STARTED = 1;
60 private static final int ST_STARTED = 2;
61 private static final int ST_SHUTTING_DOWN = 3;
62 private static final int ST_SHUTDOWN = 4;
63 private static final int ST_TERMINATED = 5;
64
65 private static final Runnable NOOP_TASK = new Runnable() {
66 @Override
67 public void run() {
68
69 }
70 };
71
72 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
73 AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
74 private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
75 AtomicReferenceFieldUpdater.newUpdater(
76 SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
77
78 private final Queue<Runnable> taskQueue;
79
80 private volatile Thread thread;
81 @SuppressWarnings("unused")
82 private volatile ThreadProperties threadProperties;
83 private final Executor executor;
84 private volatile boolean interrupted;
85
86 private final CountDownLatch threadLock = new CountDownLatch(1);
87 private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
88 private final boolean addTaskWakesUp;
89 private final int maxPendingTasks;
90 private final RejectedExecutionHandler rejectedExecutionHandler;
91
92 private long lastExecutionTime;
93
94 @SuppressWarnings({ "FieldMayBeFinal", "unused" })
95 private volatile int state = ST_NOT_STARTED;
96
97 private volatile long gracefulShutdownQuietPeriod;
98 private volatile long gracefulShutdownTimeout;
99 private long gracefulShutdownStartTime;
100
101 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
102
103
104
105
106
107
108
109
110
111 protected SingleThreadEventExecutor(
112 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
113 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
114 }
115
116
117
118
119
120
121
122
123
124
125
126 protected SingleThreadEventExecutor(
127 EventExecutorGroup parent, ThreadFactory threadFactory,
128 boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
129 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
130 }
131
132
133
134
135
136
137
138
139
140 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
141 this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
142 }
143
144
145
146
147
148
149
150
151
152
153
154 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
155 boolean addTaskWakesUp, int maxPendingTasks,
156 RejectedExecutionHandler rejectedHandler) {
157 super(parent);
158 this.addTaskWakesUp = addTaskWakesUp;
159 this.maxPendingTasks = Math.max(16, maxPendingTasks);
160 this.executor = ThreadExecutorMap.apply(executor, this);
161 taskQueue = newTaskQueue(this.maxPendingTasks);
162 rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
163 }
164
165 protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
166 boolean addTaskWakesUp, Queue<Runnable> taskQueue,
167 RejectedExecutionHandler rejectedHandler) {
168 super(parent);
169 this.addTaskWakesUp = addTaskWakesUp;
170 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
171 this.executor = ThreadExecutorMap.apply(executor, this);
172 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
173 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
174 }
175
176
177
178
179 @Deprecated
180 protected Queue<Runnable> newTaskQueue() {
181 return newTaskQueue(maxPendingTasks);
182 }
183
184
185
186
187
188
189
190 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
191 return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
192 }
193
194
195
196
197 protected void interruptThread() {
198 Thread currentThread = thread;
199 if (currentThread == null) {
200 interrupted = true;
201 } else {
202 currentThread.interrupt();
203 }
204 }
205
206
207
208
209 protected Runnable pollTask() {
210 assert inEventLoop();
211 return pollTaskFrom(taskQueue);
212 }
213
214 protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
215 for (;;) {
216 Runnable task = taskQueue.poll();
217 if (task != WAKEUP_TASK) {
218 return task;
219 }
220 }
221 }
222
223
224
225
226
227
228
229
230
231
232 protected Runnable takeTask() {
233 assert inEventLoop();
234 if (!(taskQueue instanceof BlockingQueue)) {
235 throw new UnsupportedOperationException();
236 }
237
238 BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
239 for (;;) {
240 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
241 if (scheduledTask == null) {
242 Runnable task = null;
243 try {
244 task = taskQueue.take();
245 if (task == WAKEUP_TASK) {
246 task = null;
247 }
248 } catch (InterruptedException e) {
249
250 }
251 return task;
252 } else {
253 long delayNanos = scheduledTask.delayNanos();
254 Runnable task = null;
255 if (delayNanos > 0) {
256 try {
257 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
258 } catch (InterruptedException e) {
259
260 return null;
261 }
262 }
263 if (task == null) {
264
265
266
267
268 fetchFromScheduledTaskQueue();
269 task = taskQueue.poll();
270 }
271
272 if (task != null) {
273 return task;
274 }
275 }
276 }
277 }
278
279 private boolean fetchFromScheduledTaskQueue() {
280 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
281 return true;
282 }
283 long nanoTime = getCurrentTimeNanos();
284 for (;;) {
285 Runnable scheduledTask = pollScheduledTask(nanoTime);
286 if (scheduledTask == null) {
287 return true;
288 }
289 if (!taskQueue.offer(scheduledTask)) {
290
291 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
292 return false;
293 }
294 }
295 }
296
297
298
299
300 private boolean executeExpiredScheduledTasks() {
301 if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
302 return false;
303 }
304 long nanoTime = getCurrentTimeNanos();
305 Runnable scheduledTask = pollScheduledTask(nanoTime);
306 if (scheduledTask == null) {
307 return false;
308 }
309 do {
310 safeExecute(scheduledTask);
311 } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
312 return true;
313 }
314
315
316
317
318 protected Runnable peekTask() {
319 assert inEventLoop();
320 return taskQueue.peek();
321 }
322
323
324
325
326 protected boolean hasTasks() {
327 assert inEventLoop();
328 return !taskQueue.isEmpty();
329 }
330
331
332
333
334 public int pendingTasks() {
335 return taskQueue.size();
336 }
337
338
339
340
341
342 protected void addTask(Runnable task) {
343 ObjectUtil.checkNotNull(task, "task");
344 if (!offerTask(task)) {
345 reject(task);
346 }
347 }
348
349 final boolean offerTask(Runnable task) {
350 if (isShutdown()) {
351 reject();
352 }
353 return taskQueue.offer(task);
354 }
355
356
357
358
359 protected boolean removeTask(Runnable task) {
360 return taskQueue.remove(ObjectUtil.checkNotNull(task, "task"));
361 }
362
363
364
365
366
367
368 protected boolean runAllTasks() {
369 assert inEventLoop();
370 boolean fetchedAll;
371 boolean ranAtLeastOne = false;
372
373 do {
374 fetchedAll = fetchFromScheduledTaskQueue();
375 if (runAllTasksFrom(taskQueue)) {
376 ranAtLeastOne = true;
377 }
378 } while (!fetchedAll);
379
380 if (ranAtLeastOne) {
381 lastExecutionTime = getCurrentTimeNanos();
382 }
383 afterRunningAllTasks();
384 return ranAtLeastOne;
385 }
386
387
388
389
390
391
392
393
394
395 protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
396 assert inEventLoop();
397 boolean ranAtLeastOneTask;
398 int drainAttempt = 0;
399 do {
400
401
402 ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
403 } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
404
405 if (drainAttempt > 0) {
406 lastExecutionTime = getCurrentTimeNanos();
407 }
408 afterRunningAllTasks();
409
410 return drainAttempt > 0;
411 }
412
413
414
415
416
417
418
419
420 protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
421 Runnable task = pollTaskFrom(taskQueue);
422 if (task == null) {
423 return false;
424 }
425 for (;;) {
426 safeExecute(task);
427 task = pollTaskFrom(taskQueue);
428 if (task == null) {
429 return true;
430 }
431 }
432 }
433
434
435
436
437
438
439 private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
440 Runnable task = pollTaskFrom(taskQueue);
441 if (task == null) {
442 return false;
443 }
444 int remaining = Math.min(maxPendingTasks, taskQueue.size());
445 safeExecute(task);
446
447
448 while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
449 safeExecute(task);
450 }
451 return true;
452 }
453
454
455
456
457
458 protected boolean runAllTasks(long timeoutNanos) {
459 fetchFromScheduledTaskQueue();
460 Runnable task = pollTask();
461 if (task == null) {
462 afterRunningAllTasks();
463 return false;
464 }
465
466 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
467 long runTasks = 0;
468 long lastExecutionTime;
469 for (;;) {
470 safeExecute(task);
471
472 runTasks ++;
473
474
475
476 if ((runTasks & 0x3F) == 0) {
477 lastExecutionTime = getCurrentTimeNanos();
478 if (lastExecutionTime >= deadline) {
479 break;
480 }
481 }
482
483 task = pollTask();
484 if (task == null) {
485 lastExecutionTime = getCurrentTimeNanos();
486 break;
487 }
488 }
489
490 afterRunningAllTasks();
491 this.lastExecutionTime = lastExecutionTime;
492 return true;
493 }
494
495
496
497
498 @UnstableApi
499 protected void afterRunningAllTasks() { }
500
501
502
503
504 protected long delayNanos(long currentTimeNanos) {
505 currentTimeNanos -= initialNanoTime();
506
507 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
508 if (scheduledTask == null) {
509 return SCHEDULE_PURGE_INTERVAL;
510 }
511
512 return scheduledTask.delayNanos(currentTimeNanos);
513 }
514
515
516
517
518
519 @UnstableApi
520 protected long deadlineNanos() {
521 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
522 if (scheduledTask == null) {
523 return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
524 }
525 return scheduledTask.deadlineNanos();
526 }
527
528
529
530
531
532
533
534
535 protected void updateLastExecutionTime() {
536 lastExecutionTime = getCurrentTimeNanos();
537 }
538
539
540
541
542 protected abstract void run();
543
544
545
546
547 protected void cleanup() {
548
549 }
550
551 protected void wakeup(boolean inEventLoop) {
552 if (!inEventLoop) {
553
554
555 taskQueue.offer(WAKEUP_TASK);
556 }
557 }
558
559 @Override
560 public boolean inEventLoop(Thread thread) {
561 return thread == this.thread;
562 }
563
564
565
566
567 public void addShutdownHook(final Runnable task) {
568 if (inEventLoop()) {
569 shutdownHooks.add(task);
570 } else {
571 execute(new Runnable() {
572 @Override
573 public void run() {
574 shutdownHooks.add(task);
575 }
576 });
577 }
578 }
579
580
581
582
583 public void removeShutdownHook(final Runnable task) {
584 if (inEventLoop()) {
585 shutdownHooks.remove(task);
586 } else {
587 execute(new Runnable() {
588 @Override
589 public void run() {
590 shutdownHooks.remove(task);
591 }
592 });
593 }
594 }
595
596 private boolean runShutdownHooks() {
597 boolean ran = false;
598
599 while (!shutdownHooks.isEmpty()) {
600 List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
601 shutdownHooks.clear();
602 for (Runnable task: copy) {
603 try {
604 runTask(task);
605 } catch (Throwable t) {
606 logger.warn("Shutdown hook raised an exception.", t);
607 } finally {
608 ran = true;
609 }
610 }
611 }
612
613 if (ran) {
614 lastExecutionTime = getCurrentTimeNanos();
615 }
616
617 return ran;
618 }
619
620 @Override
621 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
622 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
623 if (timeout < quietPeriod) {
624 throw new IllegalArgumentException(
625 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
626 }
627 ObjectUtil.checkNotNull(unit, "unit");
628
629 if (isShuttingDown()) {
630 return terminationFuture();
631 }
632
633 boolean inEventLoop = inEventLoop();
634 boolean wakeup;
635 int oldState;
636 for (;;) {
637 if (isShuttingDown()) {
638 return terminationFuture();
639 }
640 int newState;
641 wakeup = true;
642 oldState = state;
643 if (inEventLoop) {
644 newState = ST_SHUTTING_DOWN;
645 } else {
646 switch (oldState) {
647 case ST_NOT_STARTED:
648 case ST_STARTED:
649 newState = ST_SHUTTING_DOWN;
650 break;
651 default:
652 newState = oldState;
653 wakeup = false;
654 }
655 }
656 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
657 break;
658 }
659 }
660 gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
661 gracefulShutdownTimeout = unit.toNanos(timeout);
662
663 if (ensureThreadStarted(oldState)) {
664 return terminationFuture;
665 }
666
667 if (wakeup) {
668 taskQueue.offer(WAKEUP_TASK);
669 if (!addTaskWakesUp) {
670 wakeup(inEventLoop);
671 }
672 }
673
674 return terminationFuture();
675 }
676
677 @Override
678 public Future<?> terminationFuture() {
679 return terminationFuture;
680 }
681
682 @Override
683 @Deprecated
684 public void shutdown() {
685 if (isShutdown()) {
686 return;
687 }
688
689 boolean inEventLoop = inEventLoop();
690 boolean wakeup;
691 int oldState;
692 for (;;) {
693 if (isShuttingDown()) {
694 return;
695 }
696 int newState;
697 wakeup = true;
698 oldState = state;
699 if (inEventLoop) {
700 newState = ST_SHUTDOWN;
701 } else {
702 switch (oldState) {
703 case ST_NOT_STARTED:
704 case ST_STARTED:
705 case ST_SHUTTING_DOWN:
706 newState = ST_SHUTDOWN;
707 break;
708 default:
709 newState = oldState;
710 wakeup = false;
711 }
712 }
713 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
714 break;
715 }
716 }
717
718 if (ensureThreadStarted(oldState)) {
719 return;
720 }
721
722 if (wakeup) {
723 taskQueue.offer(WAKEUP_TASK);
724 if (!addTaskWakesUp) {
725 wakeup(inEventLoop);
726 }
727 }
728 }
729
730 @Override
731 public boolean isShuttingDown() {
732 return state >= ST_SHUTTING_DOWN;
733 }
734
735 @Override
736 public boolean isShutdown() {
737 return state >= ST_SHUTDOWN;
738 }
739
740 @Override
741 public boolean isTerminated() {
742 return state == ST_TERMINATED;
743 }
744
745
746
747
748 protected boolean confirmShutdown() {
749 if (!isShuttingDown()) {
750 return false;
751 }
752
753 if (!inEventLoop()) {
754 throw new IllegalStateException("must be invoked from an event loop");
755 }
756
757 cancelScheduledTasks();
758
759 if (gracefulShutdownStartTime == 0) {
760 gracefulShutdownStartTime = getCurrentTimeNanos();
761 }
762
763 if (runAllTasks() || runShutdownHooks()) {
764 if (isShutdown()) {
765
766 return true;
767 }
768
769
770
771
772 if (gracefulShutdownQuietPeriod == 0) {
773 return true;
774 }
775 taskQueue.offer(WAKEUP_TASK);
776 return false;
777 }
778
779 final long nanoTime = getCurrentTimeNanos();
780
781 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
782 return true;
783 }
784
785 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
786
787
788 taskQueue.offer(WAKEUP_TASK);
789 try {
790 Thread.sleep(100);
791 } catch (InterruptedException e) {
792
793 }
794
795 return false;
796 }
797
798
799
800 return true;
801 }
802
803 @Override
804 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
805 ObjectUtil.checkNotNull(unit, "unit");
806 if (inEventLoop()) {
807 throw new IllegalStateException("cannot await termination of the current thread");
808 }
809
810 threadLock.await(timeout, unit);
811
812 return isTerminated();
813 }
814
815 @Override
816 public void execute(Runnable task) {
817 execute0(task);
818 }
819
820 @Override
821 public void lazyExecute(Runnable task) {
822 lazyExecute0(task);
823 }
824
825 private void execute0(@Schedule Runnable task) {
826 ObjectUtil.checkNotNull(task, "task");
827 execute(task, wakesUpForTask(task));
828 }
829
830 private void lazyExecute0(@Schedule Runnable task) {
831 execute(ObjectUtil.checkNotNull(task, "task"), false);
832 }
833
834 private void execute(Runnable task, boolean immediate) {
835 boolean inEventLoop = inEventLoop();
836 addTask(task);
837 if (!inEventLoop) {
838 startThread();
839 if (isShutdown()) {
840 boolean reject = false;
841 try {
842 if (removeTask(task)) {
843 reject = true;
844 }
845 } catch (UnsupportedOperationException e) {
846
847
848
849 }
850 if (reject) {
851 reject();
852 }
853 }
854 }
855
856 if (!addTaskWakesUp && immediate) {
857 wakeup(inEventLoop);
858 }
859 }
860
861 @Override
862 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
863 throwIfInEventLoop("invokeAny");
864 return super.invokeAny(tasks);
865 }
866
867 @Override
868 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
869 throws InterruptedException, ExecutionException, TimeoutException {
870 throwIfInEventLoop("invokeAny");
871 return super.invokeAny(tasks, timeout, unit);
872 }
873
874 @Override
875 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
876 throws InterruptedException {
877 throwIfInEventLoop("invokeAll");
878 return super.invokeAll(tasks);
879 }
880
881 @Override
882 public <T> List<java.util.concurrent.Future<T>> invokeAll(
883 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
884 throwIfInEventLoop("invokeAll");
885 return super.invokeAll(tasks, timeout, unit);
886 }
887
888 private void throwIfInEventLoop(String method) {
889 if (inEventLoop()) {
890 throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
891 }
892 }
893
894
895
896
897
898
899 public final ThreadProperties threadProperties() {
900 ThreadProperties threadProperties = this.threadProperties;
901 if (threadProperties == null) {
902 Thread thread = this.thread;
903 if (thread == null) {
904 assert !inEventLoop();
905 submit(NOOP_TASK).syncUninterruptibly();
906 thread = this.thread;
907 assert thread != null;
908 }
909
910 threadProperties = new DefaultThreadProperties(thread);
911 if (!PROPERTIES_UPDATER.compareAndSet(this, null, threadProperties)) {
912 threadProperties = this.threadProperties;
913 }
914 }
915
916 return threadProperties;
917 }
918
919
920
921
922 @Deprecated
923 protected interface NonWakeupRunnable extends LazyRunnable { }
924
925
926
927
928
929 protected boolean wakesUpForTask(Runnable task) {
930 return true;
931 }
932
933 protected static void reject() {
934 throw new RejectedExecutionException("event executor terminated");
935 }
936
937
938
939
940
941
942 protected final void reject(Runnable task) {
943 rejectedExecutionHandler.rejected(task, this);
944 }
945
946
947
948 private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
949
950 private void startThread() {
951 if (state == ST_NOT_STARTED) {
952 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
953 boolean success = false;
954 try {
955 doStartThread();
956 success = true;
957 } finally {
958 if (!success) {
959 STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
960 }
961 }
962 }
963 }
964 }
965
966 private boolean ensureThreadStarted(int oldState) {
967 if (oldState == ST_NOT_STARTED) {
968 try {
969 doStartThread();
970 } catch (Throwable cause) {
971 STATE_UPDATER.set(this, ST_TERMINATED);
972 terminationFuture.tryFailure(cause);
973
974 if (!(cause instanceof Exception)) {
975
976 PlatformDependent.throwException(cause);
977 }
978 return true;
979 }
980 }
981 return false;
982 }
983
984 private void doStartThread() {
985 assert thread == null;
986 executor.execute(new Runnable() {
987 @Override
988 public void run() {
989 thread = Thread.currentThread();
990 if (interrupted) {
991 thread.interrupt();
992 }
993
994 boolean success = false;
995 updateLastExecutionTime();
996 try {
997 SingleThreadEventExecutor.this.run();
998 success = true;
999 } catch (Throwable t) {
1000 logger.warn("Unexpected exception from an event executor: ", t);
1001 } finally {
1002 for (;;) {
1003 int oldState = state;
1004 if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
1005 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
1006 break;
1007 }
1008 }
1009
1010
1011 if (success && gracefulShutdownStartTime == 0) {
1012 if (logger.isErrorEnabled()) {
1013 logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
1014 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
1015 "be called before run() implementation terminates.");
1016 }
1017 }
1018
1019 try {
1020
1021
1022
1023 for (;;) {
1024 if (confirmShutdown()) {
1025 break;
1026 }
1027 }
1028
1029
1030
1031 for (;;) {
1032 int oldState = state;
1033 if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
1034 SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
1035 break;
1036 }
1037 }
1038
1039
1040
1041 confirmShutdown();
1042 } finally {
1043 try {
1044 cleanup();
1045 } finally {
1046
1047
1048
1049
1050 FastThreadLocal.removeAll();
1051
1052 STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
1053 threadLock.countDown();
1054 int numUserTasks = drainTasks();
1055 if (numUserTasks > 0 && logger.isWarnEnabled()) {
1056 logger.warn("An event executor terminated with " +
1057 "non-empty task queue (" + numUserTasks + ')');
1058 }
1059 terminationFuture.setSuccess(null);
1060 }
1061 }
1062 }
1063 }
1064 });
1065 }
1066
1067 final int drainTasks() {
1068 int numTasks = 0;
1069 for (;;) {
1070 Runnable runnable = taskQueue.poll();
1071 if (runnable == null) {
1072 break;
1073 }
1074
1075
1076 if (WAKEUP_TASK != runnable) {
1077 numTasks++;
1078 }
1079 }
1080 return numTasks;
1081 }
1082
1083 private static final class DefaultThreadProperties implements ThreadProperties {
1084 private final Thread t;
1085
1086 DefaultThreadProperties(Thread t) {
1087 this.t = t;
1088 }
1089
1090 @Override
1091 public State state() {
1092 return t.getState();
1093 }
1094
1095 @Override
1096 public int priority() {
1097 return t.getPriority();
1098 }
1099
1100 @Override
1101 public boolean isInterrupted() {
1102 return t.isInterrupted();
1103 }
1104
1105 @Override
1106 public boolean isDaemon() {
1107 return t.isDaemon();
1108 }
1109
1110 @Override
1111 public String name() {
1112 return t.getName();
1113 }
1114
1115 @Override
1116 public long id() {
1117 return t.getId();
1118 }
1119
1120 @Override
1121 public StackTraceElement[] stackTrace() {
1122 return t.getStackTrace();
1123 }
1124
1125 @Override
1126 public boolean isAlive() {
1127 return t.isAlive();
1128 }
1129 }
1130 }