查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.EventLoop;
20  import io.netty.channel.EventLoopGroup;
21  import io.netty.channel.EventLoopTaskQueueFactory;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.channel.unix.IovArray;
27  import io.netty.util.IntSupplier;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.RejectedExecutionHandler;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.UnstableApi;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.io.IOException;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import static java.lang.Math.min;
45  
46  /**
47   * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
48   */
49  public class EpollEventLoop extends SingleThreadEventLoop {
50      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
51      private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
52              SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
53  
54      static {
55          // Ensure JNI is initialized by the time this class is loaded by this time!
56          // We use unix-common methods in this class which are backed by JNI methods.
57          Epoll.ensureAvailability();
58      }
59  
60      private FileDescriptor epollFd;
61      private FileDescriptor eventFd;
62      private FileDescriptor timerFd;
63      private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
64      private final boolean allowGrowing;
65      private final EpollEventArray events;
66  
67      // These are initialized on first use
68      private IovArray iovArray;
69      private NativeDatagramPacketArray datagramPacketArray;
70  
71      private final SelectStrategy selectStrategy;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return epollWaitNow();
76          }
77      };
78  
79      private static final long AWAKE = -1L;
80      private static final long NONE = Long.MAX_VALUE;
81  
82      // nextWakeupNanos is:
83      //    AWAKE            when EL is awake
84      //    NONE             when EL is waiting with no wakeup scheduled
85      //    other value T    when EL is waiting with wakeup scheduled at time T
86      private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
87      private boolean pendingWakeup;
88      private volatile int ioRatio = 50;
89  
90      // See https://man7.org/linux/man-pages/man2/timerfd_create.2.html.
91      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
92  
93      EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
94                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
95                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
96          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
97                  rejectedExecutionHandler);
98          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
99          if (maxEvents == 0) {
100             allowGrowing = true;
101             events = new EpollEventArray(4096);
102         } else {
103             allowGrowing = false;
104             events = new EpollEventArray(maxEvents);
105         }
106         openFileDescriptors();
107     }
108 
109     /**
110      * This method is intended for use by a process checkpoint/restore
111      * integration, such as OpenJDK CRaC.
112      */
113     @UnstableApi
114     public void openFileDescriptors() {
115         boolean success = false;
116         FileDescriptor epollFd = null;
117         FileDescriptor eventFd = null;
118         FileDescriptor timerFd = null;
119         try {
120             this.epollFd = epollFd = Native.newEpollCreate();
121             this.eventFd = eventFd = Native.newEventFd();
122             try {
123                 // It is important to use EPOLLET here as we only want to get the notification once per
124                 // wakeup and don't call eventfd_read(...).
125                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
126             } catch (IOException e) {
127                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
128             }
129             this.timerFd = timerFd = Native.newTimerFd();
130             try {
131                 // It is important to use EPOLLET here as we only want to get the notification once per
132                 // wakeup and don't call read(...).
133                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
134             } catch (IOException e) {
135                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
136             }
137             success = true;
138         } finally {
139             if (!success) {
140                 closeFileDescriptor(epollFd);
141                 closeFileDescriptor(eventFd);
142                 closeFileDescriptor(timerFd);
143             }
144         }
145     }
146 
147     private static void closeFileDescriptor(FileDescriptor fd) {
148         if (fd != null) {
149             try {
150                 fd.close();
151             } catch (Exception e) {
152                 // ignore
153             }
154         }
155     }
156 
157     private static Queue<Runnable> newTaskQueue(
158             EventLoopTaskQueueFactory queueFactory) {
159         if (queueFactory == null) {
160             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
161         }
162         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
163     }
164 
165     /**
166      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
167      */
168     IovArray cleanIovArray() {
169         if (iovArray == null) {
170             iovArray = new IovArray();
171         } else {
172             iovArray.clear();
173         }
174         return iovArray;
175     }
176 
177     /**
178      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
179      */
180     NativeDatagramPacketArray cleanDatagramPacketArray() {
181         if (datagramPacketArray == null) {
182             datagramPacketArray = new NativeDatagramPacketArray();
183         } else {
184             datagramPacketArray.clear();
185         }
186         return datagramPacketArray;
187     }
188 
189     @Override
190     protected void wakeup(boolean inEventLoop) {
191         if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
192             // write to the evfd which will then wake-up epoll_wait(...)
193             Native.eventFdWrite(eventFd.intValue(), 1L);
194         }
195     }
196 
197     @Override
198     protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
199         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
200         return deadlineNanos < nextWakeupNanos.get();
201     }
202 
203     @Override
204     protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
205         // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
206         return deadlineNanos < nextWakeupNanos.get();
207     }
208 
209     /**
210      * Register the given epoll with this {@link EventLoop}.
211      */
212     void add(AbstractEpollChannel ch) throws IOException {
213         assert inEventLoop();
214         int fd = ch.socket.intValue();
215         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
216         AbstractEpollChannel old = channels.put(fd, ch);
217 
218         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
219         // closed.
220         assert old == null || !old.isOpen();
221     }
222 
223     /**
224      * The flags of the given epoll was modified so update the registration
225      */
226     void modify(AbstractEpollChannel ch) throws IOException {
227         assert inEventLoop();
228         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
229     }
230 
231     /**
232      * Deregister the given epoll from this {@link EventLoop}.
233      */
234     void remove(AbstractEpollChannel ch) throws IOException {
235         assert inEventLoop();
236         int fd = ch.socket.intValue();
237 
238         AbstractEpollChannel old = channels.remove(fd);
239         if (old != null && old != ch) {
240             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
241             channels.put(fd, old);
242 
243             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
244             assert !ch.isOpen();
245         } else if (ch.isOpen()) {
246             // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
247             // removed once the file-descriptor is closed.
248             Native.epollCtlDel(epollFd.intValue(), fd);
249         }
250     }
251 
252     @Override
253     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
254         return newTaskQueue0(maxPendingTasks);
255     }
256 
257     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
258         // This event loop never calls takeTask()
259         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
260                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
261     }
262 
263     /**
264      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
265      */
266     public int getIoRatio() {
267         return ioRatio;
268     }
269 
270     /**
271      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
272      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
273      */
274     public void setIoRatio(int ioRatio) {
275         if (ioRatio <= 0 || ioRatio > 100) {
276             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
277         }
278         this.ioRatio = ioRatio;
279     }
280 
281     @Override
282     public int registeredChannels() {
283         return channels.size();
284     }
285 
286     @Override
287     public Iterator<Channel> registeredChannelsIterator() {
288         assert inEventLoop();
289         IntObjectMap<AbstractEpollChannel> ch = channels;
290         if (ch.isEmpty()) {
291             return ChannelsReadOnlyIterator.empty();
292         }
293         return new ChannelsReadOnlyIterator<AbstractEpollChannel>(ch.values());
294     }
295 
296     private long epollWait(long deadlineNanos) throws IOException {
297         if (deadlineNanos == NONE) {
298             return Native.epollWait(epollFd, events, timerFd,
299                     Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
300         }
301         long totalDelay = deadlineToDelayNanos(deadlineNanos);
302         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
303         int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
304         return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
305     }
306 
307     private int epollWaitNoTimerChange() throws IOException {
308         return Native.epollWait(epollFd, events, false);
309     }
310 
311     private int epollWaitNow() throws IOException {
312         return Native.epollWait(epollFd, events, true);
313     }
314 
315     private int epollBusyWait() throws IOException {
316         return Native.epollBusyWait(epollFd, events);
317     }
318 
319     private int epollWaitTimeboxed() throws IOException {
320         // Wait with 1 second "safeguard" timeout
321         return Native.epollWait(epollFd, events, 1000);
322     }
323 
324     @Override
325     protected void run() {
326         long prevDeadlineNanos = NONE;
327         for (;;) {
328             try {
329                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
330                 switch (strategy) {
331                     case SelectStrategy.CONTINUE:
332                         continue;
333 
334                     case SelectStrategy.BUSY_WAIT:
335                         strategy = epollBusyWait();
336                         break;
337 
338                     case SelectStrategy.SELECT:
339                         if (pendingWakeup) {
340                             // We are going to be immediately woken so no need to reset wakenUp
341                             // or check for timerfd adjustment.
342                             strategy = epollWaitTimeboxed();
343                             if (strategy != 0) {
344                                 break;
345                             }
346                             // We timed out so assume that we missed the write event due to an
347                             // abnormally failed syscall (the write itself or a prior epoll_wait)
348                             logger.warn("Missed eventfd write (not seen after > 1 second)");
349                             pendingWakeup = false;
350                             if (hasTasks()) {
351                                 break;
352                             }
353                             // fall-through
354                         }
355 
356                         long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
357                         if (curDeadlineNanos == -1L) {
358                             curDeadlineNanos = NONE; // nothing on the calendar
359                         }
360                         nextWakeupNanos.set(curDeadlineNanos);
361                         try {
362                             if (!hasTasks()) {
363                                 if (curDeadlineNanos == prevDeadlineNanos) {
364                                     // No timer activity needed
365                                     strategy = epollWaitNoTimerChange();
366                                 } else {
367                                     // Timerfd needs to be re-armed or disarmed
368                                     long result = epollWait(curDeadlineNanos);
369                                     // The result contains the actual return value and if a timer was used or not.
370                                     // We need to "unpack" using the helper methods exposed in Native.
371                                     strategy = Native.epollReady(result);
372                                     prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
373                                 }
374                             }
375                         } finally {
376                             // Try get() first to avoid much more expensive CAS in the case we
377                             // were woken via the wakeup() method (submitted task)
378                             if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
379                                 pendingWakeup = true;
380                             }
381                         }
382                         // fallthrough
383                     default:
384                 }
385 
386                 final int ioRatio = this.ioRatio;
387                 if (ioRatio == 100) {
388                     try {
389                         if (strategy > 0 && processReady(events, strategy)) {
390                             prevDeadlineNanos = NONE;
391                         }
392                     } finally {
393                         // Ensure we always run tasks.
394                         runAllTasks();
395                     }
396                 } else if (strategy > 0) {
397                     final long ioStartTime = System.nanoTime();
398                     try {
399                         if (processReady(events, strategy)) {
400                             prevDeadlineNanos = NONE;
401                         }
402                     } finally {
403                         // Ensure we always run tasks.
404                         final long ioTime = System.nanoTime() - ioStartTime;
405                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
406                     }
407                 } else {
408                     runAllTasks(0); // This will run the minimum number of tasks
409                 }
410                 if (allowGrowing && strategy == events.length()) {
411                     //increase the size of the array as we needed the whole space for the events
412                     events.increase();
413                 }
414             } catch (Error e) {
415                 throw e;
416             } catch (Throwable t) {
417                 handleLoopException(t);
418             } finally {
419                 // Always handle shutdown even if the loop processing threw an exception.
420                 try {
421                     if (isShuttingDown()) {
422                         closeAll();
423                         if (confirmShutdown()) {
424                             break;
425                         }
426                     }
427                 } catch (Error e) {
428                     throw e;
429                 } catch (Throwable t) {
430                     handleLoopException(t);
431                 }
432             }
433         }
434     }
435 
436     /**
437      * Visible only for testing!
438      */
439     void handleLoopException(Throwable t) {
440         logger.warn("Unexpected exception in the selector loop.", t);
441 
442         // Prevent possible consecutive immediate failures that lead to
443         // excessive CPU consumption.
444         try {
445             Thread.sleep(1000);
446         } catch (InterruptedException e) {
447             // Ignore.
448         }
449     }
450 
451     private void closeAll() {
452         // Using the intermediate collection to prevent ConcurrentModificationException.
453         // In the `close()` method, the channel is deleted from `channels` map.
454         AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
455 
456         for (AbstractEpollChannel ch: localChannels) {
457             ch.unsafe().close(ch.unsafe().voidPromise());
458         }
459     }
460 
461     // Returns true if a timerFd event was encountered
462     private boolean processReady(EpollEventArray events, int ready) {
463         boolean timerFired = false;
464         for (int i = 0; i < ready; i ++) {
465             final int fd = events.fd(i);
466             if (fd == eventFd.intValue()) {
467                 pendingWakeup = false;
468             } else if (fd == timerFd.intValue()) {
469                 timerFired = true;
470             } else {
471                 final long ev = events.events(i);
472 
473                 AbstractEpollChannel ch = channels.get(fd);
474                 if (ch != null) {
475                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
476                     // sure about it!
477                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
478                     // past.
479                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
480 
481                     // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
482                     // to read from the file descriptor.
483                     // See https://github.com/netty/netty/issues/3785
484                     //
485                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
486                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
487                     // the connection).
488                     // See https://github.com/netty/netty/issues/3848
489                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
490                         // Force flush of data as the epoll is writable again
491                         unsafe.epollOutReady();
492                     }
493 
494                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
495                     // See https://github.com/netty/netty/issues/4317.
496                     //
497                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
498                     // try to read from the underlying file descriptor and so notify the user about the error.
499                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
500                         // The Channel is still open and there is something to read. Do it now.
501                         unsafe.epollInReady();
502                     }
503 
504                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
505                     // we may close the channel directly or try to read more data depending on the state of the
506                     // Channel and als depending on the AbstractEpollChannel subtype.
507                     if ((ev & Native.EPOLLRDHUP) != 0) {
508                         unsafe.epollRdHupReady();
509                     }
510                 } else {
511                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
512                     try {
513                         Native.epollCtlDel(epollFd.intValue(), fd);
514                     } catch (IOException ignore) {
515                         // This can happen but is nothing we need to worry about as we only try to delete
516                         // the fd from the epoll set as we not found it in our mappings. So this call to
517                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
518                         // deleted before or the file descriptor was closed before.
519                     }
520                 }
521             }
522         }
523         return timerFired;
524     }
525 
526     @Override
527     protected void cleanup() {
528         try {
529             closeFileDescriptors();
530         } finally {
531             // release native memory
532             if (iovArray != null) {
533                 iovArray.release();
534                 iovArray = null;
535             }
536             if (datagramPacketArray != null) {
537                 datagramPacketArray.release();
538                 datagramPacketArray = null;
539             }
540             events.free();
541         }
542     }
543 
544     /**
545      * This method is intended for use by process checkpoint/restore
546      * integration, such as OpenJDK CRaC.
547      * It's up to the caller to ensure that there is no concurrent use
548      * of the FDs while these are closed, e.g. by blocking the executor.
549      */
550     @UnstableApi
551     public void closeFileDescriptors() {
552         // Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
553         while (pendingWakeup) {
554             try {
555                 int count = epollWaitTimeboxed();
556                 if (count == 0) {
557                     // We timed-out so assume that the write we're expecting isn't coming
558                     break;
559                 }
560                 for (int i = 0; i < count; i++) {
561                     if (events.fd(i) == eventFd.intValue()) {
562                         pendingWakeup = false;
563                         break;
564                     }
565                 }
566             } catch (IOException ignore) {
567                 // ignore
568             }
569         }
570         try {
571             eventFd.close();
572         } catch (IOException e) {
573             logger.warn("Failed to close the event fd.", e);
574         }
575         try {
576             timerFd.close();
577         } catch (IOException e) {
578             logger.warn("Failed to close the timer fd.", e);
579         }
580 
581         try {
582             epollFd.close();
583         } catch (IOException e) {
584             logger.warn("Failed to close the epoll fd.", e);
585         }
586     }
587 }