查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  import io.netty.util.internal.SystemPropertyUtil;
20  import io.netty.util.internal.logging.InternalLogger;
21  import io.netty.util.internal.logging.InternalLoggerFactory;
22  
23  import java.lang.Thread.State;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.LinkedHashSet;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.LinkedBlockingQueue;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.Semaphore;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.TimeoutException;
39  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40  
41  /**
42   * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
43   *
44   */
45  public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
46  
47      static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
48              SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
49  
50      private static final InternalLogger logger =
51              InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
52  
53      private static final int ST_NOT_STARTED = 1;
54      private static final int ST_STARTED = 2;
55      private static final int ST_SHUTTING_DOWN = 3;
56      private static final int ST_SHUTDOWN = 4;
57      private static final int ST_TERMINATED = 5;
58  
59      private static final Runnable WAKEUP_TASK = new Runnable() {
60          @Override
61          public void run() {
62              // Do nothing.
63          }
64      };
65  
66      private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
67              AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
68  
69      private final EventExecutorGroup parent;
70      private final Queue<Runnable> taskQueue;
71      private final Thread thread;
72      private final ThreadProperties threadProperties;
73      private final Semaphore threadLock = new Semaphore(0);
74      private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
75      private final boolean addTaskWakesUp;
76      private final int maxPendingTasks;
77      private final RejectedExecutionHandler rejectedExecutionHandler;
78  
79      private long lastExecutionTime;
80  
81      @SuppressWarnings({ "FieldMayBeFinal", "unused" })
82      private volatile int state = ST_NOT_STARTED;
83  
84      private volatile long gracefulShutdownQuietPeriod;
85      private volatile long gracefulShutdownTimeout;
86      private long gracefulShutdownStartTime;
87  
88      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
89  
90      /**
91       * Create a new instance
92       *
93       * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
94       * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
95       * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
96       *                          executor thread
97       */
98      protected SingleThreadEventExecutor(
99              EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
100         this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
101                 RejectedExecutionHandlers.reject());
102     }
103 
104     /**
105      * Create a new instance
106      *
107      * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
108      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
109      * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
110      *                          executor thread
111      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
112      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
113      */
114     @SuppressWarnings("deprecation")
115     protected SingleThreadEventExecutor(
116             EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,
117             RejectedExecutionHandler rejectedHandler) {
118         if (threadFactory == null) {
119             throw new NullPointerException("threadFactory");
120         }
121 
122         this.parent = parent;
123         this.addTaskWakesUp = addTaskWakesUp;
124 
125         thread = threadFactory.newThread(new Runnable() {
126             @Override
127             public void run() {
128                 boolean success = false;
129                 updateLastExecutionTime();
130                 try {
131                     SingleThreadEventExecutor.this.run();
132                     success = true;
133                 } catch (Throwable t) {
134                     logger.warn("Unexpected exception from an event executor: ", t);
135                 } finally {
136                     for (;;) {
137                         int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
138                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
139                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
140                             break;
141                         }
142                     }
143                     // Check if confirmShutdown() was called at the end of the loop.
144                     if (success && gracefulShutdownStartTime == 0) {
145                         logger.error(
146                                 "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
147                                 SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
148                                 "before run() implementation terminates.");
149                     }
150 
151                     try {
152                         // Run all remaining tasks and shutdown hooks.
153                         for (;;) {
154                             if (confirmShutdown()) {
155                                 break;
156                             }
157                         }
158                     } finally {
159                         try {
160                             cleanup();
161                         } finally {
162                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
163                             threadLock.release();
164                             if (!taskQueue.isEmpty()) {
165                                 logger.warn(
166                                         "An event executor terminated with " +
167                                         "non-empty task queue (" + taskQueue.size() + ')');
168                             }
169 
170                             terminationFuture.setSuccess(null);
171                         }
172                     }
173                 }
174             }
175         });
176         threadProperties = new DefaultThreadProperties(thread);
177         this.maxPendingTasks = Math.max(16, maxPendingTasks);
178         taskQueue = newTaskQueue();
179         rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
180     }
181 
182     /**
183      * @deprecated Please use and override {@link #newTaskQueue(int)}.
184      */
185     @Deprecated
186     protected Queue<Runnable> newTaskQueue() {
187         return newTaskQueue(maxPendingTasks);
188     }
189 
190     /**
191      * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
192      * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
193      * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
194      * implementation that does not support blocking operations at all.
195      */
196     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
197         return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
198     }
199 
200     @Override
201     public EventExecutorGroup parent() {
202         return parent;
203     }
204 
205     /**
206      * Interrupt the current running {@link Thread}.
207      */
208     protected void interruptThread() {
209         thread.interrupt();
210     }
211 
212     /**
213      * @see Queue#poll()
214      */
215     protected Runnable pollTask() {
216         assert inEventLoop();
217         for (;;) {
218             Runnable task = taskQueue.poll();
219             if (task == WAKEUP_TASK) {
220                 continue;
221             }
222             return task;
223         }
224     }
225 
226     /**
227      * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
228      * <p>
229      * Be aware that this method will throw an {@link UnsupportedOperationException} if the task queue, which was
230      * created via {@link #newTaskQueue()}, does not implement {@link BlockingQueue}.
231      * </p>
232      *
233      * @return {@code null} if the executor thread has been interrupted or waken up.
234      */
235     protected Runnable takeTask() {
236         assert inEventLoop();
237         if (!(taskQueue instanceof BlockingQueue)) {
238             throw new UnsupportedOperationException();
239         }
240 
241         BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
242         for (;;) {
243             ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
244             if (scheduledTask == null) {
245                 Runnable task = null;
246                 try {
247                     task = taskQueue.take();
248                     if (task == WAKEUP_TASK) {
249                         task = null;
250                     }
251                 } catch (InterruptedException e) {
252                     // Ignore
253                 }
254                 return task;
255             } else {
256                 long delayNanos = scheduledTask.delayNanos();
257                 Runnable task = null;
258                 if (delayNanos > 0) {
259                     try {
260                         task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
261                     } catch (InterruptedException e) {
262                         return null;
263                     }
264                 }
265                 if (task == null) {
266                     // We need to fetch the scheduled tasks now as otherwise there may be a chance that
267                     // scheduled tasks are never executed if there is always one task in the taskQueue.
268                     // This is for example true for the read task of OIO Transport
269                     // See https://github.com/netty/netty/issues/1614
270                     fetchFromScheduledTaskQueue();
271                     task = taskQueue.poll();
272                 }
273 
274                 if (task != null) {
275                     return task;
276                 }
277             }
278         }
279     }
280 
281     private boolean fetchFromScheduledTaskQueue() {
282         long nanoTime = AbstractScheduledEventExecutor.nanoTime();
283         Runnable scheduledTask  = pollScheduledTask(nanoTime);
284         while (scheduledTask != null) {
285             if (!taskQueue.offer(scheduledTask)) {
286                 // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
287                 scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
288                 return false;
289             }
290             scheduledTask  = pollScheduledTask(nanoTime);
291         }
292         return true;
293     }
294 
295     /**
296      * @see Queue#peek()
297      */
298     protected Runnable peekTask() {
299         assert inEventLoop();
300         return taskQueue.peek();
301     }
302 
303     /**
304      * @see Queue#isEmpty()
305      */
306     protected boolean hasTasks() {
307         assert inEventLoop();
308         return !taskQueue.isEmpty();
309     }
310 
311     /**
312      * Return the number of tasks that are pending for processing.
313      *
314      * <strong>Be aware that this operation may be expensive as it depends on the internal implementation of the
315      * SingleThreadEventExecutor. So use it was care!</strong>
316      */
317     public int pendingTasks() {
318         return taskQueue.size();
319     }
320 
321     /**
322      * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
323      * before.
324      */
325     protected void addTask(Runnable task) {
326         if (task == null) {
327             throw new NullPointerException("task");
328         }
329         if (!offerTask(task)) {
330             rejectedExecutionHandler.rejected(task, this);
331         }
332     }
333 
334     final boolean offerTask(Runnable task) {
335         if (isShutdown()) {
336             reject();
337         }
338         return taskQueue.offer(task);
339     }
340 
341     /**
342      * @see Queue#remove(Object)
343      */
344     protected boolean removeTask(Runnable task) {
345         if (task == null) {
346             throw new NullPointerException("task");
347         }
348         return taskQueue.remove(task);
349     }
350 
351     /**
352      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
353      *
354      * @return {@code true} if and only if at least one task was run
355      */
356     protected boolean runAllTasks() {
357         boolean fetchedAll;
358         do {
359             fetchedAll = fetchFromScheduledTaskQueue();
360             Runnable task = pollTask();
361             if (task == null) {
362                 return false;
363             }
364 
365             for (;;) {
366                 try {
367                     task.run();
368                 } catch (Throwable t) {
369                     logger.warn("A task raised an exception.", t);
370                 }
371 
372                 task = pollTask();
373                 if (task == null) {
374                     break;
375                 }
376             }
377         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
378 
379         lastExecutionTime = ScheduledFutureTask.nanoTime();
380         return true;
381     }
382 
383     /**
384      * Poll all tasks from the task queue and run them via {@link Runnable#run()} method.  This method stops running
385      * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
386      */
387     protected boolean runAllTasks(long timeoutNanos) {
388         fetchFromScheduledTaskQueue();
389         Runnable task = pollTask();
390         if (task == null) {
391             return false;
392         }
393 
394         final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
395         long runTasks = 0;
396         long lastExecutionTime;
397         for (;;) {
398             try {
399                 task.run();
400             } catch (Throwable t) {
401                 logger.warn("A task raised an exception.", t);
402             }
403 
404             runTasks ++;
405 
406             // Check timeout every 64 tasks because nanoTime() is relatively expensive.
407             // XXX: Hard-coded value - will make it configurable if it is really a problem.
408             if ((runTasks & 0x3F) == 0) {
409                 lastExecutionTime = ScheduledFutureTask.nanoTime();
410                 if (lastExecutionTime >= deadline) {
411                     break;
412                 }
413             }
414 
415             task = pollTask();
416             if (task == null) {
417                 lastExecutionTime = ScheduledFutureTask.nanoTime();
418                 break;
419             }
420         }
421 
422         this.lastExecutionTime = lastExecutionTime;
423         return true;
424     }
425 
426     /**
427      * Returns the amount of time left until the scheduled task with the closest dead line is executed.
428      */
429     protected long delayNanos(long currentTimeNanos) {
430         ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
431         if (scheduledTask == null) {
432             return SCHEDULE_PURGE_INTERVAL;
433         }
434 
435         return scheduledTask.delayNanos(currentTimeNanos);
436     }
437 
438     /**
439      * Updates the internal timestamp that tells when a submitted task was executed most recently.
440      * {@link #runAllTasks()} and {@link #runAllTasks(long)} updates this timestamp automatically, and thus there's
441      * usually no need to call this method.  However, if you take the tasks manually using {@link #takeTask()} or
442      * {@link #pollTask()}, you have to call this method at the end of task execution loop for accurate quiet period
443      * checks.
444      */
445     protected void updateLastExecutionTime() {
446         lastExecutionTime = ScheduledFutureTask.nanoTime();
447     }
448 
449     /**
450      *
451      */
452     protected abstract void run();
453 
454     /**
455      * Do nothing, sub-classes may override
456      */
457     protected void cleanup() {
458         // NOOP
459     }
460 
461     protected void wakeup(boolean inEventLoop) {
462         if (!inEventLoop || state == ST_SHUTTING_DOWN) {
463             // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
464             // is already something in the queue.
465             taskQueue.offer(WAKEUP_TASK);
466         }
467     }
468 
469     @Override
470     public boolean inEventLoop(Thread thread) {
471         return thread == this.thread;
472     }
473 
474     /**
475      * Add a {@link Runnable} which will be executed on shutdown of this instance
476      */
477     public void addShutdownHook(final Runnable task) {
478         if (inEventLoop()) {
479             shutdownHooks.add(task);
480         } else {
481             execute(new Runnable() {
482                 @Override
483                 public void run() {
484                     shutdownHooks.add(task);
485                 }
486             });
487         }
488     }
489 
490     /**
491      * Remove a previous added {@link Runnable} as a shutdown hook
492      */
493     public void removeShutdownHook(final Runnable task) {
494         if (inEventLoop()) {
495             shutdownHooks.remove(task);
496         } else {
497             execute(new Runnable() {
498                 @Override
499                 public void run() {
500                     shutdownHooks.remove(task);
501                 }
502             });
503         }
504     }
505 
506     private boolean runShutdownHooks() {
507         boolean ran = false;
508         // Note shutdown hooks can add / remove shutdown hooks.
509         while (!shutdownHooks.isEmpty()) {
510             List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
511             shutdownHooks.clear();
512             for (Runnable task: copy) {
513                 try {
514                     task.run();
515                 } catch (Throwable t) {
516                     logger.warn("Shutdown hook raised an exception.", t);
517                 } finally {
518                     ran = true;
519                 }
520             }
521         }
522 
523         if (ran) {
524             lastExecutionTime = ScheduledFutureTask.nanoTime();
525         }
526 
527         return ran;
528     }
529 
530     @Override
531     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
532         if (quietPeriod < 0) {
533             throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
534         }
535         if (timeout < quietPeriod) {
536             throw new IllegalArgumentException(
537                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
538         }
539         if (unit == null) {
540             throw new NullPointerException("unit");
541         }
542 
543         if (isShuttingDown()) {
544             return terminationFuture();
545         }
546 
547         boolean inEventLoop = inEventLoop();
548         boolean wakeup;
549         int oldState;
550         for (;;) {
551             if (isShuttingDown()) {
552                 return terminationFuture();
553             }
554             int newState;
555             wakeup = true;
556             oldState = state;
557             if (inEventLoop) {
558                 newState = ST_SHUTTING_DOWN;
559             } else {
560                 switch (oldState) {
561                     case ST_NOT_STARTED:
562                     case ST_STARTED:
563                         newState = ST_SHUTTING_DOWN;
564                         break;
565                     default:
566                         newState = oldState;
567                         wakeup = false;
568                 }
569             }
570             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
571                 break;
572             }
573         }
574         gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
575         gracefulShutdownTimeout = unit.toNanos(timeout);
576 
577         if (oldState == ST_NOT_STARTED) {
578             thread.start();
579         }
580 
581         if (wakeup) {
582             wakeup(inEventLoop);
583         }
584 
585         return terminationFuture();
586     }
587 
588     @Override
589     public Future<?> terminationFuture() {
590         return terminationFuture;
591     }
592 
593     @Override
594     @Deprecated
595     public void shutdown() {
596         if (isShutdown()) {
597             return;
598         }
599 
600         boolean inEventLoop = inEventLoop();
601         boolean wakeup;
602         int oldState;
603         for (;;) {
604             if (isShuttingDown()) {
605                 return;
606             }
607             int newState;
608             wakeup = true;
609             oldState = state;
610             if (inEventLoop) {
611                 newState = ST_SHUTDOWN;
612             } else {
613                 switch (oldState) {
614                     case ST_NOT_STARTED:
615                     case ST_STARTED:
616                     case ST_SHUTTING_DOWN:
617                         newState = ST_SHUTDOWN;
618                         break;
619                     default:
620                         newState = oldState;
621                         wakeup = false;
622                 }
623             }
624             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
625                 break;
626             }
627         }
628 
629         if (oldState == ST_NOT_STARTED) {
630             thread.start();
631         }
632 
633         if (wakeup) {
634             wakeup(inEventLoop);
635         }
636     }
637 
638     @Override
639     public boolean isShuttingDown() {
640         return state >= ST_SHUTTING_DOWN;
641     }
642 
643     @Override
644     public boolean isShutdown() {
645         return state >= ST_SHUTDOWN;
646     }
647 
648     @Override
649     public boolean isTerminated() {
650         return state == ST_TERMINATED;
651     }
652 
653     /**
654      * Confirm that the shutdown if the instance should be done now!
655      */
656     protected boolean confirmShutdown() {
657         if (!isShuttingDown()) {
658             return false;
659         }
660 
661         if (!inEventLoop()) {
662             throw new IllegalStateException("must be invoked from an event loop");
663         }
664 
665         cancelScheduledTasks();
666 
667         if (gracefulShutdownStartTime == 0) {
668             gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
669         }
670 
671         if (runAllTasks() || runShutdownHooks()) {
672             if (isShutdown()) {
673                 // Executor shut down - no new tasks anymore.
674                 return true;
675             }
676 
677             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
678             // terminate if the quiet period is 0.
679             // See https://github.com/netty/netty/issues/4241
680             if (gracefulShutdownQuietPeriod == 0) {
681                 return true;
682             }
683             wakeup(true);
684             return false;
685         }
686 
687         final long nanoTime = ScheduledFutureTask.nanoTime();
688 
689         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
690             return true;
691         }
692 
693         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
694             // Check if any tasks were added to the queue every 100ms.
695             // TODO: Change the behavior of takeTask() so that it returns on timeout.
696             wakeup(true);
697             try {
698                 Thread.sleep(100);
699             } catch (InterruptedException e) {
700                 // Ignore
701             }
702 
703             return false;
704         }
705 
706         // No tasks were added for last quiet period - hopefully safe to shut down.
707         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
708         return true;
709     }
710 
711     @Override
712     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
713         if (unit == null) {
714             throw new NullPointerException("unit");
715         }
716 
717         if (inEventLoop()) {
718             throw new IllegalStateException("cannot await termination of the current thread");
719         }
720 
721         if (threadLock.tryAcquire(timeout, unit)) {
722             threadLock.release();
723         }
724 
725         return isTerminated();
726     }
727 
728     @Override
729     public void execute(Runnable task) {
730         if (task == null) {
731             throw new NullPointerException("task");
732         }
733 
734         boolean inEventLoop = inEventLoop();
735         if (inEventLoop) {
736             addTask(task);
737         } else {
738             startThread();
739             addTask(task);
740             if (isShutdown() && removeTask(task)) {
741                 reject();
742             }
743         }
744 
745         if (!addTaskWakesUp && wakesUpForTask(task)) {
746             wakeup(inEventLoop);
747         }
748     }
749 
750     @Override
751     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
752         throwIfInEventLoop("invokeAny");
753         return super.invokeAny(tasks);
754     }
755 
756     @Override
757     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
758             throws InterruptedException, ExecutionException, TimeoutException {
759         throwIfInEventLoop("invokeAny");
760         return super.invokeAny(tasks, timeout, unit);
761     }
762 
763     @Override
764     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
765             throws InterruptedException {
766         throwIfInEventLoop("invokeAll");
767         return super.invokeAll(tasks);
768     }
769 
770     @Override
771     public <T> List<java.util.concurrent.Future<T>> invokeAll(
772             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
773         throwIfInEventLoop("invokeAll");
774         return super.invokeAll(tasks, timeout, unit);
775     }
776 
777     private void throwIfInEventLoop(String method) {
778         if (inEventLoop()) {
779             throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed");
780         }
781     }
782 
783     /**
784      * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}.
785      */
786     public final ThreadProperties threadProperties() {
787         return threadProperties;
788     }
789 
790     @SuppressWarnings("unused")
791     protected boolean wakesUpForTask(Runnable task) {
792         return true;
793     }
794 
795     protected static void reject() {
796         throw new RejectedExecutionException("event executor terminated");
797     }
798 
799     // ScheduledExecutorService implementation
800 
801     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
802 
803     private void startThread() {
804         if (state == ST_NOT_STARTED) {
805             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
806                 thread.start();
807             }
808         }
809     }
810 
811     private static final class DefaultThreadProperties implements ThreadProperties {
812         private final Thread t;
813 
814         DefaultThreadProperties(Thread t) {
815             this.t = t;
816         }
817 
818         @Override
819         public State state() {
820             return t.getState();
821         }
822 
823         @Override
824         public int priority() {
825             return t.getPriority();
826         }
827 
828         @Override
829         public boolean isInterrupted() {
830             return t.isInterrupted();
831         }
832 
833         @Override
834         public boolean isDaemon() {
835             return t.isDaemon();
836         }
837 
838         @Override
839         public String name() {
840             return t.getName();
841         }
842 
843         @Override
844         public long id() {
845             return t.getId();
846         }
847 
848         @Override
849         public StackTraceElement[] stackTrace() {
850             return t.getStackTrace();
851         }
852 
853         @Override
854         public boolean isAlive() {
855             return t.isAlive();
856         }
857     }
858 }