查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelException;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.EventLoopException;
22  import io.netty.channel.EventLoopTaskQueueFactory;
23  import io.netty.channel.SelectStrategy;
24  import io.netty.channel.SingleThreadEventLoop;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.ReflectionUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.lang.reflect.Field;
36  import java.nio.channels.CancelledKeyException;
37  import java.nio.channels.SelectableChannel;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SelectionKey;
40  
41  import java.nio.channels.spi.SelectorProvider;
42  import java.security.AccessController;
43  import java.security.PrivilegedAction;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Iterator;
47  import java.util.NoSuchElementException;
48  import java.util.Queue;
49  import java.util.Set;
50  import java.util.concurrent.Executor;
51  import java.util.concurrent.atomic.AtomicLong;
52  
53  /**
54   * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
55   * {@link Selector} and so does the multi-plexing of these in the event loop.
56   *
57   */
58  public final class NioEventLoop extends SingleThreadEventLoop {
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
61  
62      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
63  
64      private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
65              SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
66  
67      private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
68      private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
69  
70      private final IntSupplier selectNowSupplier = new IntSupplier() {
71          @Override
72          public int get() throws Exception {
73              return selectNow();
74          }
75      };
76  
77      // Workaround for JDK NIO bug.
78      //
79      // See:
80      // - https://bugs.openjdk.java.net/browse/JDK-6427854 for first few dev (unreleased) builds of JDK 7
81      // - https://bugs.openjdk.java.net/browse/JDK-6527572 for JDK prior to 5.0u15-rev and 6u10
82      // - https://github.com/netty/netty/issues/203
83      static {
84          if (PlatformDependent.javaVersion() < 7) {
85              final String key = "sun.nio.ch.bugLevel";
86              final String bugLevel = SystemPropertyUtil.get(key);
87              if (bugLevel == null) {
88                  try {
89                      AccessController.doPrivileged(new PrivilegedAction<Void>() {
90                          @Override
91                          public Void run() {
92                              System.setProperty(key, "");
93                              return null;
94                          }
95                      });
96                  } catch (final SecurityException e) {
97                      logger.debug("Unable to get/set System Property: " + key, e);
98                  }
99              }
100         }
101 
102         int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
103         if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
104             selectorAutoRebuildThreshold = 0;
105         }
106 
107         SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
108 
109         if (logger.isDebugEnabled()) {
110             logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
111             logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
112         }
113     }
114 
115     /**
116      * The NIO {@link Selector}.
117      */
118     private Selector selector;
119     private Selector unwrappedSelector;
120     private SelectedSelectionKeySet selectedKeys;
121 
122     private final SelectorProvider provider;
123 
124     private static final long AWAKE = -1L;
125     private static final long NONE = Long.MAX_VALUE;
126 
127     // nextWakeupNanos is:
128     //    AWAKE            when EL is awake
129     //    NONE             when EL is waiting with no wakeup scheduled
130     //    other value T    when EL is waiting with wakeup scheduled at time T
131     private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
132 
133     private final SelectStrategy selectStrategy;
134 
135     private volatile int ioRatio = 50;
136     private int cancelledKeys;
137     private boolean needsToSelectAgain;
138 
139     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
140                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
141                  EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
142         super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
143                 rejectedExecutionHandler);
144         this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
145         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
146         final SelectorTuple selectorTuple = openSelector();
147         this.selector = selectorTuple.selector;
148         this.unwrappedSelector = selectorTuple.unwrappedSelector;
149     }
150 
151     private static Queue<Runnable> newTaskQueue(
152             EventLoopTaskQueueFactory queueFactory) {
153         if (queueFactory == null) {
154             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
155         }
156         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
157     }
158 
159     private static final class SelectorTuple {
160         final Selector unwrappedSelector;
161         final Selector selector;
162 
163         SelectorTuple(Selector unwrappedSelector) {
164             this.unwrappedSelector = unwrappedSelector;
165             this.selector = unwrappedSelector;
166         }
167 
168         SelectorTuple(Selector unwrappedSelector, Selector selector) {
169             this.unwrappedSelector = unwrappedSelector;
170             this.selector = selector;
171         }
172     }
173 
174     private SelectorTuple openSelector() {
175         final Selector unwrappedSelector;
176         try {
177             unwrappedSelector = provider.openSelector();
178         } catch (IOException e) {
179             throw new ChannelException("failed to open a new selector", e);
180         }
181 
182         if (DISABLE_KEY_SET_OPTIMIZATION) {
183             return new SelectorTuple(unwrappedSelector);
184         }
185 
186         Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
187             @Override
188             public Object run() {
189                 try {
190                     return Class.forName(
191                             "sun.nio.ch.SelectorImpl",
192                             false,
193                             PlatformDependent.getSystemClassLoader());
194                 } catch (Throwable cause) {
195                     return cause;
196                 }
197             }
198         });
199 
200         if (!(maybeSelectorImplClass instanceof Class) ||
201             // ensure the current selector implementation is what we can instrument.
202             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
203             if (maybeSelectorImplClass instanceof Throwable) {
204                 Throwable t = (Throwable) maybeSelectorImplClass;
205                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
206             }
207             return new SelectorTuple(unwrappedSelector);
208         }
209 
210         final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
211         final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
212 
213         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
214             @Override
215             public Object run() {
216                 try {
217                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
218                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
219 
220                     if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
221                         // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
222                         // This allows us to also do this in Java9+ without any extra flags.
223                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
224                         long publicSelectedKeysFieldOffset =
225                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
226 
227                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
228                             PlatformDependent.putObject(
229                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
230                             PlatformDependent.putObject(
231                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
232                             return null;
233                         }
234                         // We could not retrieve the offset, lets try reflection as last-resort.
235                     }
236 
237                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
238                     if (cause != null) {
239                         return cause;
240                     }
241                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
242                     if (cause != null) {
243                         return cause;
244                     }
245 
246                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
247                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
248                     return null;
249                 } catch (NoSuchFieldException e) {
250                     return e;
251                 } catch (IllegalAccessException e) {
252                     return e;
253                 }
254             }
255         });
256 
257         if (maybeException instanceof Exception) {
258             selectedKeys = null;
259             Exception e = (Exception) maybeException;
260             logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
261             return new SelectorTuple(unwrappedSelector);
262         }
263         selectedKeys = selectedKeySet;
264         logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
265         return new SelectorTuple(unwrappedSelector,
266                                  new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
267     }
268 
269     /**
270      * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.
271      */
272     public SelectorProvider selectorProvider() {
273         return provider;
274     }
275 
276     @Override
277     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
278         return newTaskQueue0(maxPendingTasks);
279     }
280 
281     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
282         // This event loop never calls takeTask()
283         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
284                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
285     }
286 
287     /**
288      * Registers an arbitrary {@link SelectableChannel}, not necessarily created by Netty, to the {@link Selector}
289      * of this event loop.  Once the specified {@link SelectableChannel} is registered, the specified {@code task} will
290      * be executed by this event loop when the {@link SelectableChannel} is ready.
291      */
292     public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
293         ObjectUtil.checkNotNull(ch, "ch");
294         if (interestOps == 0) {
295             throw new IllegalArgumentException("interestOps must be non-zero.");
296         }
297         if ((interestOps & ~ch.validOps()) != 0) {
298             throw new IllegalArgumentException(
299                     "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
300         }
301         ObjectUtil.checkNotNull(task, "task");
302 
303         if (isShutdown()) {
304             throw new IllegalStateException("event loop shut down");
305         }
306 
307         if (inEventLoop()) {
308             register0(ch, interestOps, task);
309         } else {
310             try {
311                 // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
312                 // may block for a long time while trying to obtain an internal lock that may be hold while selecting.
313                 submit(new Runnable() {
314                     @Override
315                     public void run() {
316                         register0(ch, interestOps, task);
317                     }
318                 }).sync();
319             } catch (InterruptedException ignore) {
320                 // Even if interrupted we did schedule it so just mark the Thread as interrupted.
321                 Thread.currentThread().interrupt();
322             }
323         }
324     }
325 
326     private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
327         try {
328             ch.register(unwrappedSelector, interestOps, task);
329         } catch (Exception e) {
330             throw new EventLoopException("failed to register a channel", e);
331         }
332     }
333 
334     /**
335      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
336      */
337     public int getIoRatio() {
338         return ioRatio;
339     }
340 
341     /**
342      * Sets the percentage of the desired amount of time spent for I/O in the event loop. Value range from 1-100.
343      * The default value is {@code 50}, which means the event loop will try to spend the same amount of time for I/O
344      * as for non-I/O tasks. The lower the number the more time can be spent on non-I/O tasks. If value set to
345      * {@code 100}, this feature will be disabled and event loop will not attempt to balance I/O and non-I/O tasks.
346      */
347     public void setIoRatio(int ioRatio) {
348         if (ioRatio <= 0 || ioRatio > 100) {
349             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
350         }
351         this.ioRatio = ioRatio;
352     }
353 
354     /**
355      * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
356      * around the infamous epoll 100% CPU bug.
357      */
358     public void rebuildSelector() {
359         if (!inEventLoop()) {
360             execute(new Runnable() {
361                 @Override
362                 public void run() {
363                     rebuildSelector0();
364                 }
365             });
366             return;
367         }
368         rebuildSelector0();
369     }
370 
371     @Override
372     public int registeredChannels() {
373         return selector.keys().size() - cancelledKeys;
374     }
375 
376     @Override
377     public Iterator<Channel> registeredChannelsIterator() {
378         assert inEventLoop();
379         final Set<SelectionKey> keys = selector.keys();
380         if (keys.isEmpty()) {
381             return ChannelsReadOnlyIterator.empty();
382         }
383         return new Iterator<Channel>() {
384             final Iterator<SelectionKey> selectionKeyIterator =
385                     ObjectUtil.checkNotNull(keys, "selectionKeys")
386                             .iterator();
387             Channel next;
388             boolean isDone;
389 
390             @Override
391             public boolean hasNext() {
392                 if (isDone) {
393                     return false;
394                 }
395                 Channel cur = next;
396                 if (cur == null) {
397                     cur = next = nextOrDone();
398                     return cur != null;
399                 }
400                 return true;
401             }
402 
403             @Override
404             public Channel next() {
405                 if (isDone) {
406                     throw new NoSuchElementException();
407                 }
408                 Channel cur = next;
409                 if (cur == null) {
410                     cur = nextOrDone();
411                     if (cur == null) {
412                         throw new NoSuchElementException();
413                     }
414                 }
415                 next = nextOrDone();
416                 return cur;
417             }
418 
419             @Override
420             public void remove() {
421                 throw new UnsupportedOperationException("remove");
422             }
423 
424             private Channel nextOrDone() {
425                 Iterator<SelectionKey> it = selectionKeyIterator;
426                 while (it.hasNext()) {
427                     SelectionKey key = it.next();
428                     if (key.isValid()) {
429                         Object attachment = key.attachment();
430                         if (attachment instanceof AbstractNioChannel) {
431                             return (AbstractNioChannel) attachment;
432                         }
433                     }
434                 }
435                 isDone = true;
436                 return null;
437             }
438         };
439     }
440 
441     private void rebuildSelector0() {
442         final Selector oldSelector = selector;
443         final SelectorTuple newSelectorTuple;
444 
445         if (oldSelector == null) {
446             return;
447         }
448 
449         try {
450             newSelectorTuple = openSelector();
451         } catch (Exception e) {
452             logger.warn("Failed to create a new Selector.", e);
453             return;
454         }
455 
456         // Register all channels to the new Selector.
457         int nChannels = 0;
458         for (SelectionKey key: oldSelector.keys()) {
459             Object a = key.attachment();
460             try {
461                 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
462                     continue;
463                 }
464 
465                 int interestOps = key.interestOps();
466                 key.cancel();
467                 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
468                 if (a instanceof AbstractNioChannel) {
469                     // Update SelectionKey
470                     ((AbstractNioChannel) a).selectionKey = newKey;
471                 }
472                 nChannels ++;
473             } catch (Exception e) {
474                 logger.warn("Failed to re-register a Channel to the new Selector.", e);
475                 if (a instanceof AbstractNioChannel) {
476                     AbstractNioChannel ch = (AbstractNioChannel) a;
477                     ch.unsafe().close(ch.unsafe().voidPromise());
478                 } else {
479                     @SuppressWarnings("unchecked")
480                     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
481                     invokeChannelUnregistered(task, key, e);
482                 }
483             }
484         }
485 
486         selector = newSelectorTuple.selector;
487         unwrappedSelector = newSelectorTuple.unwrappedSelector;
488 
489         try {
490             // time to close the old selector as everything else is registered to the new one
491             oldSelector.close();
492         } catch (Throwable t) {
493             if (logger.isWarnEnabled()) {
494                 logger.warn("Failed to close the old Selector.", t);
495             }
496         }
497 
498         if (logger.isInfoEnabled()) {
499             logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
500         }
501     }
502 
503     @Override
504     protected void run() {
505         int selectCnt = 0;
506         for (;;) {
507             try {
508                 int strategy;
509                 try {
510                     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
511                     switch (strategy) {
512                     case SelectStrategy.CONTINUE:
513                         continue;
514 
515                     case SelectStrategy.BUSY_WAIT:
516                         // fall-through to SELECT since the busy-wait is not supported with NIO
517 
518                     case SelectStrategy.SELECT:
519                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
520                         if (curDeadlineNanos == -1L) {
521                             curDeadlineNanos = NONE; // nothing on the calendar
522                         }
523                         nextWakeupNanos.set(curDeadlineNanos);
524                         try {
525                             if (!hasTasks()) {
526                                 strategy = select(curDeadlineNanos);
527                             }
528                         } finally {
529                             // This update is just to help block unnecessary selector wakeups
530                             // so use of lazySet is ok (no race condition)
531                             nextWakeupNanos.lazySet(AWAKE);
532                         }
533                         // fall through
534                     default:
535                     }
536                 } catch (IOException e) {
537                     // If we receive an IOException here its because the Selector is messed up. Let's rebuild
538                     // the selector and retry. https://github.com/netty/netty/issues/8566
539                     rebuildSelector0();
540                     selectCnt = 0;
541                     handleLoopException(e);
542                     continue;
543                 }
544 
545                 selectCnt++;
546                 cancelledKeys = 0;
547                 needsToSelectAgain = false;
548                 final int ioRatio = this.ioRatio;
549                 boolean ranTasks;
550                 if (ioRatio == 100) {
551                     try {
552                         if (strategy > 0) {
553                             processSelectedKeys();
554                         }
555                     } finally {
556                         // Ensure we always run tasks.
557                         ranTasks = runAllTasks();
558                     }
559                 } else if (strategy > 0) {
560                     final long ioStartTime = System.nanoTime();
561                     try {
562                         processSelectedKeys();
563                     } finally {
564                         // Ensure we always run tasks.
565                         final long ioTime = System.nanoTime() - ioStartTime;
566                         ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
567                     }
568                 } else {
569                     ranTasks = runAllTasks(0); // This will run the minimum number of tasks
570                 }
571 
572                 if (ranTasks || strategy > 0) {
573                     if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
574                         logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
575                                 selectCnt - 1, selector);
576                     }
577                     selectCnt = 0;
578                 } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
579                     selectCnt = 0;
580                 }
581             } catch (CancelledKeyException e) {
582                 // Harmless exception - log anyway
583                 if (logger.isDebugEnabled()) {
584                     logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
585                             selector, e);
586                 }
587             } catch (Error e) {
588                 throw e;
589             } catch (Throwable t) {
590                 handleLoopException(t);
591             } finally {
592                 // Always handle shutdown even if the loop processing threw an exception.
593                 try {
594                     if (isShuttingDown()) {
595                         closeAll();
596                         if (confirmShutdown()) {
597                             return;
598                         }
599                     }
600                 } catch (Error e) {
601                     throw e;
602                 } catch (Throwable t) {
603                     handleLoopException(t);
604                 }
605             }
606         }
607     }
608 
609     // returns true if selectCnt should be reset
610     private boolean unexpectedSelectorWakeup(int selectCnt) {
611         if (Thread.interrupted()) {
612             // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
613             // As this is most likely a bug in the handler of the user or it's client library we will
614             // also log it.
615             //
616             // See https://github.com/netty/netty/issues/2426
617             if (logger.isDebugEnabled()) {
618                 logger.debug("Selector.select() returned prematurely because " +
619                         "Thread.currentThread().interrupt() was called. Use " +
620                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
621             }
622             return true;
623         }
624         if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
625                 selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
626             // The selector returned prematurely many times in a row.
627             // Rebuild the selector to work around the problem.
628             logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
629                     selectCnt, selector);
630             rebuildSelector();
631             return true;
632         }
633         return false;
634     }
635 
636     private static void handleLoopException(Throwable t) {
637         logger.warn("Unexpected exception in the selector loop.", t);
638 
639         // Prevent possible consecutive immediate failures that lead to
640         // excessive CPU consumption.
641         try {
642             Thread.sleep(1000);
643         } catch (InterruptedException e) {
644             // Ignore.
645         }
646     }
647 
648     private void processSelectedKeys() {
649         if (selectedKeys != null) {
650             processSelectedKeysOptimized();
651         } else {
652             processSelectedKeysPlain(selector.selectedKeys());
653         }
654     }
655 
656     @Override
657     protected void cleanup() {
658         try {
659             selector.close();
660         } catch (IOException e) {
661             logger.warn("Failed to close a selector.", e);
662         }
663     }
664 
665     void cancel(SelectionKey key) {
666         key.cancel();
667         cancelledKeys ++;
668         if (cancelledKeys >= CLEANUP_INTERVAL) {
669             cancelledKeys = 0;
670             needsToSelectAgain = true;
671         }
672     }
673 
674     private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
675         // check if the set is empty and if so just return to not create garbage by
676         // creating a new Iterator every time even if there is nothing to process.
677         // See https://github.com/netty/netty/issues/597
678         if (selectedKeys.isEmpty()) {
679             return;
680         }
681 
682         Iterator<SelectionKey> i = selectedKeys.iterator();
683         for (;;) {
684             final SelectionKey k = i.next();
685             final Object a = k.attachment();
686             i.remove();
687 
688             if (a instanceof AbstractNioChannel) {
689                 processSelectedKey(k, (AbstractNioChannel) a);
690             } else {
691                 @SuppressWarnings("unchecked")
692                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
693                 processSelectedKey(k, task);
694             }
695 
696             if (!i.hasNext()) {
697                 break;
698             }
699 
700             if (needsToSelectAgain) {
701                 selectAgain();
702                 selectedKeys = selector.selectedKeys();
703 
704                 // Create the iterator again to avoid ConcurrentModificationException
705                 if (selectedKeys.isEmpty()) {
706                     break;
707                 } else {
708                     i = selectedKeys.iterator();
709                 }
710             }
711         }
712     }
713 
714     private void processSelectedKeysOptimized() {
715         for (int i = 0; i < selectedKeys.size; ++i) {
716             final SelectionKey k = selectedKeys.keys[i];
717             // null out entry in the array to allow to have it GC'ed once the Channel close
718             // See https://github.com/netty/netty/issues/2363
719             selectedKeys.keys[i] = null;
720 
721             final Object a = k.attachment();
722 
723             if (a instanceof AbstractNioChannel) {
724                 processSelectedKey(k, (AbstractNioChannel) a);
725             } else {
726                 @SuppressWarnings("unchecked")
727                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
728                 processSelectedKey(k, task);
729             }
730 
731             if (needsToSelectAgain) {
732                 // null out entries in the array to allow to have it GC'ed once the Channel close
733                 // See https://github.com/netty/netty/issues/2363
734                 selectedKeys.reset(i + 1);
735 
736                 selectAgain();
737                 i = -1;
738             }
739         }
740     }
741 
742     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
743         final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
744         if (!k.isValid()) {
745             final EventLoop eventLoop;
746             try {
747                 eventLoop = ch.eventLoop();
748             } catch (Throwable ignored) {
749                 // If the channel implementation throws an exception because there is no event loop, we ignore this
750                 // because we are only trying to determine if ch is registered to this event loop and thus has authority
751                 // to close ch.
752                 return;
753             }
754             // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
755             // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
756             // still healthy and should not be closed.
757             // See https://github.com/netty/netty/issues/5125
758             if (eventLoop == this) {
759                 // close the channel if the key is not valid anymore
760                 unsafe.close(unsafe.voidPromise());
761             }
762             return;
763         }
764 
765         try {
766             int readyOps = k.readyOps();
767             // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
768             // the NIO JDK channel implementation may throw a NotYetConnectedException.
769             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
770                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
771                 // See https://github.com/netty/netty/issues/924
772                 int ops = k.interestOps();
773                 ops &= ~SelectionKey.OP_CONNECT;
774                 k.interestOps(ops);
775 
776                 unsafe.finishConnect();
777             }
778 
779             // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
780             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
781                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
782                unsafe.forceFlush();
783             }
784 
785             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
786             // to a spin loop
787             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
788                 unsafe.read();
789             }
790         } catch (CancelledKeyException ignored) {
791             unsafe.close(unsafe.voidPromise());
792         }
793     }
794 
795     private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
796         int state = 0;
797         try {
798             task.channelReady(k.channel(), k);
799             state = 1;
800         } catch (Exception e) {
801             k.cancel();
802             invokeChannelUnregistered(task, k, e);
803             state = 2;
804         } finally {
805             switch (state) {
806             case 0:
807                 k.cancel();
808                 invokeChannelUnregistered(task, k, null);
809                 break;
810             case 1:
811                 if (!k.isValid()) { // Cancelled by channelReady()
812                     invokeChannelUnregistered(task, k, null);
813                 }
814                 break;
815             default:
816                  break;
817             }
818         }
819     }
820 
821     private void closeAll() {
822         selectAgain();
823         Set<SelectionKey> keys = selector.keys();
824         Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
825         for (SelectionKey k: keys) {
826             Object a = k.attachment();
827             if (a instanceof AbstractNioChannel) {
828                 channels.add((AbstractNioChannel) a);
829             } else {
830                 k.cancel();
831                 @SuppressWarnings("unchecked")
832                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
833                 invokeChannelUnregistered(task, k, null);
834             }
835         }
836 
837         for (AbstractNioChannel ch: channels) {
838             ch.unsafe().close(ch.unsafe().voidPromise());
839         }
840     }
841 
842     private static void invokeChannelUnregistered(NioTask<SelectableChannel> task, SelectionKey k, Throwable cause) {
843         try {
844             task.channelUnregistered(k.channel(), cause);
845         } catch (Exception e) {
846             logger.warn("Unexpected exception while running NioTask.channelUnregistered()", e);
847         }
848     }
849 
850     @Override
851     protected void wakeup(boolean inEventLoop) {
852         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
853             selector.wakeup();
854         }
855     }
856 
857     @Override
858     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
859         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
860         return deadlineNanos < nextWakeupNanos.get();
861     }
862 
863     @Override
864     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
865         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
866         return deadlineNanos < nextWakeupNanos.get();
867     }
868 
869     Selector unwrappedSelector() {
870         return unwrappedSelector;
871     }
872 
873     int selectNow() throws IOException {
874         return selector.selectNow();
875     }
876 
877     private int select(long deadlineNanos) throws IOException {
878         if (deadlineNanos == NONE) {
879             return selector.select();
880         }
881         // Timeout will only be 0 if deadline is within 5 microsecs
882         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
883         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
884     }
885 
886     private void selectAgain() {
887         needsToSelectAgain = false;
888         try {
889             selector.selectNow();
890         } catch (Throwable t) {
891             logger.warn("Failed to update SelectionKeys.", t);
892         }
893     }
894 }