查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.PlatformDependent;
28  import io.netty.util.internal.ReflectionUtil;
29  import io.netty.util.internal.SystemPropertyUtil;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.io.IOException;
34  import java.lang.reflect.Field;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.SelectableChannel;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.nio.channels.spi.SelectorProvider;
40  import java.security.AccessController;
41  import java.security.PrivilegedAction;
42  import java.util.ArrayList;
43  import java.util.Collection;
44  import java.util.Iterator;
45  import java.util.Queue;
46  import java.util.Set;
47  import java.util.concurrent.ThreadFactory;
48  import java.util.concurrent.Callable;
49  import java.util.concurrent.TimeUnit;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  
52  /**
53   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
54   * {@link Selector} and so does the multi-plexing of these in the event loop.
55   *
56   */
57  public final class NioEventLoop extends SingleThreadEventLoop {
58  
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
60  
61      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
62  
63      private static final boolean DISABLE_KEYSET_OPTIMIZATION =
64              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
65  
66      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
67      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
68  
69      private final IntSupplier selectNowSupplier = new IntSupplier() {
70          @Override
71          public int get() throws Exception {
72              return selectNow();
73          }
74      };
75      private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
76          @Override
77          public Integer call() throws Exception {
78              return NioEventLoop.super.pendingTasks();
79          }
80      };
81  
82      // Workaround for JDK NIO bug.
83      //
84      // See:
85      // - http://bugs.sun.com/view_bug.do?bug_id=6427854
86      // - https://github.com/netty/netty/issues/203
87      static {
88          final String key = "sun.nio.ch.bugLevel";
89          final String buglevel = SystemPropertyUtil.get(key);
90          if (buglevel == null) {
91              try {
92                  AccessController.doPrivileged(new PrivilegedAction<Void>() {
93                      @Override
94                      public Void run() {
95                          System.setProperty(key, "");
96                          return null;
97                      }
98                  });
99              } catch (final SecurityException e) {
100                 logger.debug("Unable to get/set System Property: " + key, e);
101             }
102         }
103 
104         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
105         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
106             selectorAutoRebuildThreshold = 0;
107         }
108 
109         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
110 
111         if (logger.isDebugEnabled()) {
112             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
113             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
114         }
115     }
116 
117     /**
118      * The NIO {@link Selector}.
119      */
120     private Selector selector;
121     private Selector unwrappedSelector;
122     private SelectedSelectionKeySet selectedKeys;
123 
124     private final SelectorProvider provider;
125 
126     /**
127      * Boolean that controls determines if a blocked Selector.select should
128      * break out of its selection process. In our case we use a timeout for
129      * the select method and the select method will block for that time unless
130      * waken up.
131      */
132     private final AtomicBoolean wakenUp = new AtomicBoolean();
133 
134     private final SelectStrategy selectStrategy;
135 
136     private volatile int ioRatio = 50;
137     private int cancelledKeys;
138     private boolean needsToSelectAgain;
139 
140     NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
141                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
142         super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
143         if (selectorProvider == null) {
144             throw new NullPointerException("selectorProvider");
145         }
146         if (strategy == null) {
147             throw new NullPointerException("selectStrategy");
148         }
149         provider = selectorProvider;
150         final SelectorTuple selectorTuple = openSelector();
151         selector = selectorTuple.selector;
152         unwrappedSelector = selectorTuple.unwrappedSelector;
153         selectStrategy = strategy;
154     }
155 
156     private static final class SelectorTuple {
157         final Selector unwrappedSelector;
158         final Selector selector;
159 
160         SelectorTuple(Selector unwrappedSelector) {
161             this.unwrappedSelector = unwrappedSelector;
162             this.selector = unwrappedSelector;
163         }
164 
165         SelectorTuple(Selector unwrappedSelector, Selector selector) {
166             this.unwrappedSelector = unwrappedSelector;
167             this.selector = selector;
168         }
169     }
170 
171     private SelectorTuple openSelector() {
172         final Selector unwrappedSelector;
173         try {
174             unwrappedSelector = provider.openSelector();
175         } catch (IOException e) {
176             throw new ChannelException("failed to open a new selector", e);
177         }
178 
179         if (DISABLE_KEYSET_OPTIMIZATION) {
180             return new SelectorTuple(unwrappedSelector);
181         }
182 
183         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
184 
185         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
186             @Override
187             public Object run() {
188                 try {
189                     return Class.forName(
190                             "sun.nio.ch.SelectorImpl",
191                             false,
192                             PlatformDependent.getSystemClassLoader());
193                 } catch (Throwable cause) {
194                     return cause;
195                 }
196             }
197         });
198 
199         if (!(maybeSelectorImplClass instanceof Class) ||
200                 // ensure the current selector implementation is what we can instrument.
201                 !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
202             if (maybeSelectorImplClass instanceof Throwable) {
203                 Throwable t = (Throwable) maybeSelectorImplClass;
204                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
205             }
206             return new SelectorTuple(unwrappedSelector);
207         }
208 
209         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
210 
211         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
212             @Override
213             public Object run() {
214                 try {
215                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
216                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
217 
218                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
219                     if (cause != null) {
220                         return cause;
221                     }
222                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
223                     if (cause != null) {
224                         return cause;
225                     }
226 
227                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
228                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
229                     return null;
230                 } catch (NoSuchFieldException e) {
231                     return e;
232                 } catch (IllegalAccessException e) {
233                     return e;
234                 }
235             }
236         });
237 
238         if (maybeException instanceof Exception) {
239             selectedKeys = null;
240             Exception e = (Exception) maybeException;
241             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
242             return new SelectorTuple(unwrappedSelector);
243         }
244         selectedKeys = selectedKeySet;
245         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
246         return new SelectorTuple(unwrappedSelector,
247                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
248     }
249 
250     /**
251      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
252      */
253     public SelectorProvider selectorProvider() {
254         return provider;
255     }
256 
257     @Override
258     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
259         // This event loop never calls takeTask()
260         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
261                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
262     }
263 
264     @Override
265     public int pendingTasks() {
266         // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
267         // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
268         // See https://github.com/netty/netty/issues/5297
269         if (inEventLoop()) {
270             return super.pendingTasks();
271         } else {
272             return submit(pendingTasksCallable).syncUninterruptibly().getNow();
273         }
274     }
275 
276     /**
277      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
278      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
279      * be executed by this event loop when the {@link SelectableChannel} is ready.
280      */
281     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
282         if (ch == null) {
283             throw new NullPointerException("ch");
284         }
285         if (interestOps == 0) {
286             throw new IllegalArgumentException("interestOps must be non-zero.");
287         }
288         if ((interestOps & ~ch.validOps()) != 0) {
289             throw new IllegalArgumentException(
290                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
291         }
292         if (task == null) {
293             throw new NullPointerException("task");
294         }
295 
296         if (isShutdown()) {
297             throw new IllegalStateException("event loop shut down");
298         }
299 
300         try {
301             ch.register(selector, interestOps, task);
302         } catch (Exception e) {
303             throw new EventLoopException("failed to register a channel", e);
304         }
305     }
306 
307     /**
308      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
309      */
310     public int getIoRatio() {
311         return ioRatio;
312     }
313 
314     /**
315      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
316      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
317      */
318     public void setIoRatio(int ioRatio) {
319         if (ioRatio <= 0 || ioRatio > 100) {
320             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
321         }
322         this.ioRatio = ioRatio;
323     }
324 
325     /**
326      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
327      * around the infamous epoll 100% CPU bug.
328      */
329     public void rebuildSelector() {
330         if (!inEventLoop()) {
331             execute(new Runnable() {
332                 @Override
333                 public void run() {
334                     rebuildSelector0();
335                 }
336             });
337             return;
338         }
339         rebuildSelector0();
340     }
341 
342     private void rebuildSelector0() {
343         final Selector oldSelector = selector;
344         final SelectorTuple newSelectorTuple;
345 
346         if (oldSelector == null) {
347             return;
348         }
349 
350         try {
351             newSelectorTuple = openSelector();
352         } catch (Exception e) {
353             logger.warn("Failed to create a new Selector.", e);
354             return;
355         }
356 
357         // Register all channels to the new Selector.
358         int nChannels = 0;
359         for (SelectionKey key: oldSelector.keys()) {
360             Object a = key.attachment();
361             try {
362                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
363                     continue;
364                 }
365 
366                 int interestOps = key.interestOps();
367                 key.cancel();
368                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
369                 if (a instanceof AbstractNioChannel) {
370                     // Update SelectionKey
371                     ((AbstractNioChannel) a).selectionKey = newKey;
372                 }
373                 nChannels ++;
374             } catch (Exception e) {
375                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
376                 if (a instanceof AbstractNioChannel) {
377                     AbstractNioChannel ch = (AbstractNioChannel) a;
378                     ch.unsafe().close(ch.unsafe().voidPromise());
379                 } else {
380                     @SuppressWarnings("unchecked")
381                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
382                     invokeChannelUnregistered(task, key, e);
383                 }
384             }
385         }
386 
387         selector = newSelectorTuple.selector;
388         unwrappedSelector = newSelectorTuple.unwrappedSelector;
389 
390         try {
391             // time to close the old selector as everything else is registered to the new one
392             oldSelector.close();
393         } catch (Throwable t) {
394             if (logger.isWarnEnabled()) {
395                 logger.warn("Failed to close the old Selector.", t);
396             }
397         }
398 
399         logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
400     }
401 
402     @Override
403     protected void run() {
404         for (;;) {
405             try {
406                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
407                     case SelectStrategy.CONTINUE:
408                         continue;
409                     case SelectStrategy.SELECT:
410                         select(wakenUp.getAndSet(false));
411 
412                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
413                         // before calling 'selector.wakeup()' to reduce the wake-up
414                         // overhead. (Selector.wakeup() is an expensive operation.)
415                         //
416                         // However, there is a race condition in this approach.
417                         // The race condition is triggered when 'wakenUp' is set to
418                         // true too early.
419                         //
420                         // 'wakenUp' is set to true too early if:
421                         // 1) Selector is waken up between 'wakenUp.set(false)' and
422                         //    'selector.select(...)'. (BAD)
423                         // 2) Selector is waken up between 'selector.select(...)' and
424                         //    'if (wakenUp.get()) { ... }'. (OK)
425                         //
426                         // In the first case, 'wakenUp' is set to true and the
427                         // following 'selector.select(...)' will wake up immediately.
428                         // Until 'wakenUp' is set to false again in the next round,
429                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
430                         // any attempt to wake up the Selector will fail, too, causing
431                         // the following 'selector.select(...)' call to block
432                         // unnecessarily.
433                         //
434                         // To fix this problem, we wake up the selector again if wakenUp
435                         // is true immediately after selector.select(...).
436                         // It is inefficient in that it wakes up the selector for both
437                         // the first case (BAD - wake-up required) and the second case
438                         // (OK - no wake-up required).
439 
440                         if (wakenUp.get()) {
441                             selector.wakeup();
442                         }
443                         // fall through
444                     default:
445                 }
446 
447                 cancelledKeys = 0;
448                 needsToSelectAgain = false;
449                 final int ioRatio = this.ioRatio;
450                 if (ioRatio == 100) {
451                     try {
452                         processSelectedKeys();
453                     } finally {
454                         // Ensure we always run tasks.
455                         runAllTasks();
456                     }
457                 } else {
458                     final long ioStartTime = System.nanoTime();
459                     try {
460                         processSelectedKeys();
461                     } finally {
462                         // Ensure we always run tasks.
463                         final long ioTime = System.nanoTime() - ioStartTime;
464                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
465                     }
466                 }
467             } catch (Throwable t) {
468                 handleLoopException(t);
469             }
470             // Always handle shutdown even if the loop processing threw an exception.
471             try {
472                 if (isShuttingDown()) {
473                     closeAll();
474                     if (confirmShutdown()) {
475                         return;
476                     }
477                 }
478             } catch (Throwable t) {
479                 handleLoopException(t);
480             }
481         }
482     }
483 
484     private static void handleLoopException(Throwable t) {
485         logger.warn("Unexpected exception in the selector loop.", t);
486 
487         // Prevent possible consecutive immediate failures that lead to
488         // excessive CPU consumption.
489         try {
490             Thread.sleep(1000);
491         } catch (InterruptedException e) {
492             // Ignore.
493         }
494     }
495 
496     private void processSelectedKeys() {
497         if (selectedKeys != null) {
498             processSelectedKeysOptimized();
499         } else {
500             processSelectedKeysPlain(selector.selectedKeys());
501         }
502     }
503 
504     @Override
505     protected void cleanup() {
506         try {
507             selector.close();
508         } catch (IOException e) {
509             logger.warn("Failed to close a selector.", e);
510         }
511     }
512 
513     void cancel(SelectionKey key) {
514         key.cancel();
515         cancelledKeys ++;
516         if (cancelledKeys >= CLEANUP_INTERVAL) {
517             cancelledKeys = 0;
518             needsToSelectAgain = true;
519         }
520     }
521 
522     @Override
523     protected Runnable pollTask() {
524         Runnable task = super.pollTask();
525         if (needsToSelectAgain) {
526             selectAgain();
527         }
528         return task;
529     }
530 
531     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
532         // check if the set is empty and if so just return to not create garbage by
533         // creating a new Iterator every time even if there is nothing to process.
534         // See https://github.com/netty/netty/issues/597
535         if (selectedKeys.isEmpty()) {
536             return;
537         }
538 
539         Iterator<SelectionKey> i = selectedKeys.iterator();
540         for (;;) {
541             final SelectionKey k = i.next();
542             final Object a = k.attachment();
543             i.remove();
544 
545             if (a instanceof AbstractNioChannel) {
546                 processSelectedKey(k, (AbstractNioChannel) a);
547             } else {
548                 @SuppressWarnings("unchecked")
549                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
550                 processSelectedKey(k, task);
551             }
552 
553             if (!i.hasNext()) {
554                 break;
555             }
556 
557             if (needsToSelectAgain) {
558                 selectAgain();
559                 selectedKeys = selector.selectedKeys();
560 
561                 // Create the iterator again to avoid ConcurrentModificationException
562                 if (selectedKeys.isEmpty()) {
563                     break;
564                 } else {
565                     i = selectedKeys.iterator();
566                 }
567             }
568         }
569     }
570 
571     private void processSelectedKeysOptimized() {
572         for (int i = 0; i < selectedKeys.size; ++i) {
573             final SelectionKey k = selectedKeys.keys[i];
574             // null out entry in the array to allow to have it GC'ed once the Channel close
575             // See https://github.com/netty/netty/issues/2363
576             selectedKeys.keys[i] = null;
577 
578             final Object a = k.attachment();
579 
580             if (a instanceof AbstractNioChannel) {
581                 processSelectedKey(k, (AbstractNioChannel) a);
582             } else {
583                 @SuppressWarnings("unchecked")
584                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
585                 processSelectedKey(k, task);
586             }
587 
588             if (needsToSelectAgain) {
589                 // null out entries in the array to allow to have it GC'ed once the Channel close
590                 // See https://github.com/netty/netty/issues/2363
591                 selectedKeys.reset(i + 1);
592 
593                 selectAgain();
594                 i = -1;
595             }
596         }
597     }
598 
599     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
600         final NioUnsafe unsafe = ch.unsafe();
601         if (!k.isValid()) {
602             final EventLoop eventLoop;
603             try {
604                 eventLoop = ch.eventLoop();
605             } catch (Throwable ignored) {
606                 // If the channel implementation throws an exception because there is no event loop, we ignore this
607                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
608                 // to close ch.
609                 return;
610             }
611             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
612             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
613             // still healthy and should not be closed.
614             // See https://github.com/netty/netty/issues/5125
615             if (eventLoop != this || eventLoop == null) {
616                 return;
617             }
618             // close the channel if the key is not valid anymore
619             unsafe.close(unsafe.voidPromise());
620             return;
621         }
622 
623         try {
624             int readyOps = k.readyOps();
625             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
626             // the NIO JDK channel implementation may throw a NotYetConnectedException.
627             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
628                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
629                 // See https://github.com/netty/netty/issues/924
630                 int ops = k.interestOps();
631                 ops &= ~SelectionKey.OP_CONNECT;
632                 k.interestOps(ops);
633 
634                 unsafe.finishConnect();
635             }
636 
637             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
638             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
639                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
640                 ch.unsafe().forceFlush();
641             }
642 
643             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
644             // to a spin loop
645             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
646                 unsafe.read();
647             }
648         } catch (CancelledKeyException ignored) {
649             unsafe.close(unsafe.voidPromise());
650         }
651     }
652 
653     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
654         int state = 0;
655         try {
656             task.channelReady(k.channel(), k);
657             state = 1;
658         } catch (Exception e) {
659             k.cancel();
660             invokeChannelUnregistered(task, k, e);
661             state = 2;
662         } finally {
663             switch (state) {
664             case 0:
665                 k.cancel();
666                 invokeChannelUnregistered(task, k, null);
667                 break;
668             case 1:
669                 if (!k.isValid()) { // Cancelled by channelReady()
670                     invokeChannelUnregistered(task, k, null);
671                 }
672                 break;
673             }
674         }
675     }
676 
677     private void closeAll() {
678         selectAgain();
679         Set<SelectionKey> keys = selector.keys();
680         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
681         for (SelectionKey k: keys) {
682             Object a = k.attachment();
683             if (a instanceof AbstractNioChannel) {
684                 channels.add((AbstractNioChannel) a);
685             } else {
686                 k.cancel();
687                 @SuppressWarnings("unchecked")
688                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
689                 invokeChannelUnregistered(task, k, null);
690             }
691         }
692 
693         for (AbstractNioChannel ch: channels) {
694             ch.unsafe().close(ch.unsafe().voidPromise());
695         }
696     }
697 
698     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
699         try {
700             task.channelUnregistered(k.channel(), cause);
701         } catch (Exception e) {
702             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
703         }
704     }
705 
706     @Override
707     protected void wakeup(boolean inEventLoop) {
708         if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
709             selector.wakeup();
710         }
711     }
712 
713     Selector unwrappedSelector() {
714         return unwrappedSelector;
715     }
716 
717     int selectNow() throws IOException {
718         try {
719             return selector.selectNow();
720         } finally {
721             // restore wakeup state if needed
722             if (wakenUp.get()) {
723                 selector.wakeup();
724             }
725         }
726     }
727 
728     private void select(boolean oldWakenUp) throws IOException {
729         Selector selector = this.selector;
730         try {
731             int selectCnt = 0;
732             long currentTimeNanos = System.nanoTime();
733             long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
734             for (;;) {
735                 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
736                 if (timeoutMillis <= 0) {
737                     if (selectCnt == 0) {
738                         selector.selectNow();
739                         selectCnt = 1;
740                     }
741                     break;
742                 }
743 
744                 // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
745                 // Selector#wakeup. So we need to check task queue again before executing select operation.
746                 // If we don't, the task might be pended until select operation was timed out.
747                 // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
748                 if (hasTasks() && wakenUp.compareAndSet(false, true)) {
749                     selector.selectNow();
750                     selectCnt = 1;
751                     break;
752                 }
753 
754                 int selectedKeys = selector.select(timeoutMillis);
755                 selectCnt ++;
756 
757                 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
758                     // - Selected something,
759                     // - waken up by user, or
760                     // - the task queue has a pending task.
761                     // - a scheduled task is ready for processing
762                     break;
763                 }
764                 if (Thread.interrupted()) {
765                     // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
766                     // As this is most likely a bug in the handler of the user or it's client library we will
767                     // also log it.
768                     //
769                     // See https://github.com/netty/netty/issues/2426
770                     if (logger.isDebugEnabled()) {
771                         logger.debug("Selector.select() returned prematurely because " +
772                                 "Thread.currentThread().interrupt() was called. Use " +
773                                 "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
774                     }
775                     selectCnt = 1;
776                     break;
777                 }
778 
779                 long time = System.nanoTime();
780                 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
781                     // timeoutMillis elapsed without anything selected.
782                     selectCnt = 1;
783                 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
784                         selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
785                     // The selector returned prematurely many times in a row.
786                     // Rebuild the selector to work around the problem.
787                     logger.warn(
788                             "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
789                             selectCnt, selector);
790 
791                     rebuildSelector();
792                     selector = this.selector;
793 
794                     // Select again to populate selectedKeys.
795                     selector.selectNow();
796                     selectCnt = 1;
797                     break;
798                 }
799 
800                 currentTimeNanos = time;
801             }
802 
803             if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
804                 if (logger.isDebugEnabled()) {
805                     logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
806                             selectCnt - 1, selector);
807                 }
808             }
809         } catch (CancelledKeyException e) {
810             if (logger.isDebugEnabled()) {
811                 logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
812                         selector, e);
813             }
814             // Harmless exception - log anyway
815         }
816     }
817 
818     private void selectAgain() {
819         needsToSelectAgain = false;
820         try {
821             selector.selectNow();
822         } catch (Throwable t) {
823             logger.warn("Failed to update SelectionKeys.", t);
824         }
825     }
826 }