1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.EventLoop;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.EventLoopTaskQueueFactory;
22 import io.netty.channel.SelectStrategy;
23 import io.netty.channel.SingleThreadEventLoop;
24 import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.channel.unix.IovArray;
27 import io.netty.util.IntSupplier;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.RejectedExecutionHandler;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.UnstableApi;
35 import io.netty.util.internal.logging.InternalLogger;
36 import io.netty.util.internal.logging.InternalLoggerFactory;
37
38 import java.io.IOException;
39 import java.util.Iterator;
40 import java.util.Queue;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import static java.lang.Math.min;
45
46
47
48
49 public class EpollEventLoop extends SingleThreadEventLoop {
50 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
51 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
52 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
53
54 static {
55
56
57 Epoll.ensureAvailability();
58 }
59
60 private FileDescriptor epollFd;
61 private FileDescriptor eventFd;
62 private FileDescriptor timerFd;
63 private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
64 private final boolean allowGrowing;
65 private final EpollEventArray events;
66
67
68 private IovArray iovArray;
69 private NativeDatagramPacketArray datagramPacketArray;
70
71 private final SelectStrategy selectStrategy;
72 private final IntSupplier selectNowSupplier = new IntSupplier() {
73 @Override
74 public int get() throws Exception {
75 return epollWaitNow();
76 }
77 };
78
79 private static final long AWAKE = -1L;
80 private static final long NONE = Long.MAX_VALUE;
81
82
83
84
85
86 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
87 private boolean pendingWakeup;
88 private volatile int ioRatio = 50;
89
90
91 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
92
93 EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
94 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
95 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
96 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
97 rejectedExecutionHandler);
98 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
99 if (maxEvents == 0) {
100 allowGrowing = true;
101 events = new EpollEventArray(4096);
102 } else {
103 allowGrowing = false;
104 events = new EpollEventArray(maxEvents);
105 }
106 openFileDescriptors();
107 }
108
109
110
111
112
113 @UnstableApi
114 public void openFileDescriptors() {
115 boolean success = false;
116 FileDescriptor epollFd = null;
117 FileDescriptor eventFd = null;
118 FileDescriptor timerFd = null;
119 try {
120 this.epollFd = epollFd = Native.newEpollCreate();
121 this.eventFd = eventFd = Native.newEventFd();
122 try {
123
124
125 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
126 } catch (IOException e) {
127 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
128 }
129 this.timerFd = timerFd = Native.newTimerFd();
130 try {
131
132
133 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
134 } catch (IOException e) {
135 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
136 }
137 success = true;
138 } finally {
139 if (!success) {
140 closeFileDescriptor(epollFd);
141 closeFileDescriptor(eventFd);
142 closeFileDescriptor(timerFd);
143 }
144 }
145 }
146
147 private static void closeFileDescriptor(FileDescriptor fd) {
148 if (fd != null) {
149 try {
150 fd.close();
151 } catch (Exception e) {
152
153 }
154 }
155 }
156
157 private static Queue<Runnable> newTaskQueue(
158 EventLoopTaskQueueFactory queueFactory) {
159 if (queueFactory == null) {
160 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
161 }
162 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
163 }
164
165
166
167
168 IovArray cleanIovArray() {
169 if (iovArray == null) {
170 iovArray = new IovArray();
171 } else {
172 iovArray.clear();
173 }
174 return iovArray;
175 }
176
177
178
179
180 NativeDatagramPacketArray cleanDatagramPacketArray() {
181 if (datagramPacketArray == null) {
182 datagramPacketArray = new NativeDatagramPacketArray();
183 } else {
184 datagramPacketArray.clear();
185 }
186 return datagramPacketArray;
187 }
188
189 @Override
190 protected void wakeup(boolean inEventLoop) {
191 if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
192
193 Native.eventFdWrite(eventFd.intValue(), 1L);
194 }
195 }
196
197 @Override
198 protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
199
200 return deadlineNanos < nextWakeupNanos.get();
201 }
202
203 @Override
204 protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
205
206 return deadlineNanos < nextWakeupNanos.get();
207 }
208
209
210
211
212 void add(AbstractEpollChannel ch) throws IOException {
213 assert inEventLoop();
214 int fd = ch.socket.intValue();
215 Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
216 AbstractEpollChannel old = channels.put(fd, ch);
217
218
219
220 assert old == null || !old.isOpen();
221 }
222
223
224
225
226 void modify(AbstractEpollChannel ch) throws IOException {
227 assert inEventLoop();
228 Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
229 }
230
231
232
233
234 void remove(AbstractEpollChannel ch) throws IOException {
235 assert inEventLoop();
236 int fd = ch.socket.intValue();
237
238 AbstractEpollChannel old = channels.remove(fd);
239 if (old != null && old != ch) {
240
241 channels.put(fd, old);
242
243
244 assert !ch.isOpen();
245 } else if (ch.isOpen()) {
246
247
248 Native.epollCtlDel(epollFd.intValue(), fd);
249 }
250 }
251
252 @Override
253 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
254 return newTaskQueue0(maxPendingTasks);
255 }
256
257 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
258
259 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
260 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
261 }
262
263
264
265
266 public int getIoRatio() {
267 return ioRatio;
268 }
269
270
271
272
273
274 public void setIoRatio(int ioRatio) {
275 if (ioRatio <= 0 || ioRatio > 100) {
276 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
277 }
278 this.ioRatio = ioRatio;
279 }
280
281 @Override
282 public int registeredChannels() {
283 return channels.size();
284 }
285
286 @Override
287 public Iterator<Channel> registeredChannelsIterator() {
288 assert inEventLoop();
289 IntObjectMap<AbstractEpollChannel> ch = channels;
290 if (ch.isEmpty()) {
291 return ChannelsReadOnlyIterator.empty();
292 }
293 return new ChannelsReadOnlyIterator<AbstractEpollChannel>(ch.values());
294 }
295
296 private long epollWait(long deadlineNanos) throws IOException {
297 if (deadlineNanos == NONE) {
298 return Native.epollWait(epollFd, events, timerFd,
299 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
300 }
301 long totalDelay = deadlineToDelayNanos(deadlineNanos);
302 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
303 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
304 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
305 }
306
307 private int epollWaitNoTimerChange() throws IOException {
308 return Native.epollWait(epollFd, events, false);
309 }
310
311 private int epollWaitNow() throws IOException {
312 return Native.epollWait(epollFd, events, true);
313 }
314
315 private int epollBusyWait() throws IOException {
316 return Native.epollBusyWait(epollFd, events);
317 }
318
319 private int epollWaitTimeboxed() throws IOException {
320
321 return Native.epollWait(epollFd, events, 1000);
322 }
323
324 @Override
325 protected void run() {
326 long prevDeadlineNanos = NONE;
327 for (;;) {
328 try {
329 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
330 switch (strategy) {
331 case SelectStrategy.CONTINUE:
332 continue;
333
334 case SelectStrategy.BUSY_WAIT:
335 strategy = epollBusyWait();
336 break;
337
338 case SelectStrategy.SELECT:
339 if (pendingWakeup) {
340
341
342 strategy = epollWaitTimeboxed();
343 if (strategy != 0) {
344 break;
345 }
346
347
348 logger.warn("Missed eventfd write (not seen after > 1 second)");
349 pendingWakeup = false;
350 if (hasTasks()) {
351 break;
352 }
353
354 }
355
356 long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
357 if (curDeadlineNanos == -1L) {
358 curDeadlineNanos = NONE;
359 }
360 nextWakeupNanos.set(curDeadlineNanos);
361 try {
362 if (!hasTasks()) {
363 if (curDeadlineNanos == prevDeadlineNanos) {
364
365 strategy = epollWaitNoTimerChange();
366 } else {
367
368 long result = epollWait(curDeadlineNanos);
369
370
371 strategy = Native.epollReady(result);
372 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
373 }
374 }
375 } finally {
376
377
378 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
379 pendingWakeup = true;
380 }
381 }
382
383 default:
384 }
385
386 final int ioRatio = this.ioRatio;
387 if (ioRatio == 100) {
388 try {
389 if (strategy > 0 && processReady(events, strategy)) {
390 prevDeadlineNanos = NONE;
391 }
392 } finally {
393
394 runAllTasks();
395 }
396 } else if (strategy > 0) {
397 final long ioStartTime = System.nanoTime();
398 try {
399 if (processReady(events, strategy)) {
400 prevDeadlineNanos = NONE;
401 }
402 } finally {
403
404 final long ioTime = System.nanoTime() - ioStartTime;
405 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
406 }
407 } else {
408 runAllTasks(0);
409 }
410 if (allowGrowing && strategy == events.length()) {
411
412 events.increase();
413 }
414 } catch (Error e) {
415 throw e;
416 } catch (Throwable t) {
417 handleLoopException(t);
418 } finally {
419
420 try {
421 if (isShuttingDown()) {
422 closeAll();
423 if (confirmShutdown()) {
424 break;
425 }
426 }
427 } catch (Error e) {
428 throw e;
429 } catch (Throwable t) {
430 handleLoopException(t);
431 }
432 }
433 }
434 }
435
436
437
438
439 void handleLoopException(Throwable t) {
440 logger.warn("Unexpected exception in the selector loop.", t);
441
442
443
444 try {
445 Thread.sleep(1000);
446 } catch (InterruptedException e) {
447
448 }
449 }
450
451 private void closeAll() {
452
453
454 AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
455
456 for (AbstractEpollChannel ch: localChannels) {
457 ch.unsafe().close(ch.unsafe().voidPromise());
458 }
459 }
460
461
462 private boolean processReady(EpollEventArray events, int ready) {
463 boolean timerFired = false;
464 for (int i = 0; i < ready; i ++) {
465 final int fd = events.fd(i);
466 if (fd == eventFd.intValue()) {
467 pendingWakeup = false;
468 } else if (fd == timerFd.intValue()) {
469 timerFired = true;
470 } else {
471 final long ev = events.events(i);
472
473 AbstractEpollChannel ch = channels.get(fd);
474 if (ch != null) {
475
476
477
478
479 AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
480
481
482
483
484
485
486
487
488
489 if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
490
491 unsafe.epollOutReady();
492 }
493
494
495
496
497
498
499 if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
500
501 unsafe.epollInReady();
502 }
503
504
505
506
507 if ((ev & Native.EPOLLRDHUP) != 0) {
508 unsafe.epollRdHupReady();
509 }
510 } else {
511
512 try {
513 Native.epollCtlDel(epollFd.intValue(), fd);
514 } catch (IOException ignore) {
515
516
517
518
519 }
520 }
521 }
522 }
523 return timerFired;
524 }
525
526 @Override
527 protected void cleanup() {
528 try {
529 closeFileDescriptors();
530 } finally {
531
532 if (iovArray != null) {
533 iovArray.release();
534 iovArray = null;
535 }
536 if (datagramPacketArray != null) {
537 datagramPacketArray.release();
538 datagramPacketArray = null;
539 }
540 events.free();
541 }
542 }
543
544
545
546
547
548
549
550 @UnstableApi
551 public void closeFileDescriptors() {
552
553 while (pendingWakeup) {
554 try {
555 int count = epollWaitTimeboxed();
556 if (count == 0) {
557
558 break;
559 }
560 for (int i = 0; i < count; i++) {
561 if (events.fd(i) == eventFd.intValue()) {
562 pendingWakeup = false;
563 break;
564 }
565 }
566 } catch (IOException ignore) {
567
568 }
569 }
570 try {
571 eventFd.close();
572 } catch (IOException e) {
573 logger.warn("Failed to close the event fd.", e);
574 }
575 try {
576 timerFd.close();
577 } catch (IOException e) {
578 logger.warn("Failed to close the timer fd.", e);
579 }
580
581 try {
582 epollFd.close();
583 } catch (IOException e) {
584 logger.warn("Failed to close the epoll fd.", e);
585 }
586 }
587 }