1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.EventLoop;
20 import io.netty.channel.EventLoopGroup;
21 import io.netty.channel.EventLoopTaskQueueFactory;
22 import io.netty.channel.SelectStrategy;
23 import io.netty.channel.SingleThreadEventLoop;
24 import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
25 import io.netty.channel.unix.FileDescriptor;
26 import io.netty.channel.unix.IovArray;
27 import io.netty.util.IntSupplier;
28 import io.netty.util.collection.IntObjectHashMap;
29 import io.netty.util.collection.IntObjectMap;
30 import io.netty.util.concurrent.RejectedExecutionHandler;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.util.Iterator;
38 import java.util.Queue;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41
42 import static java.lang.Math.min;
43
44
45
46
47 final class KQueueEventLoop extends SingleThreadEventLoop {
48 private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
49 private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
50 AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
51 private static final int KQUEUE_WAKE_UP_IDENT = 0;
52
53
54
55 private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399;
56
57 static {
58
59
60 KQueue.ensureAvailability();
61 }
62
63 private final boolean allowGrowing;
64 private final FileDescriptor kqueueFd;
65 private final KQueueEventArray changeList;
66 private final KQueueEventArray eventList;
67 private final SelectStrategy selectStrategy;
68 private final IovArray iovArray = new IovArray();
69 private final IntSupplier selectNowSupplier = new IntSupplier() {
70 @Override
71 public int get() throws Exception {
72 return kqueueWaitNow();
73 }
74 };
75 private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
76
77 private volatile int wakenUp;
78 private volatile int ioRatio = 50;
79
80 KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
81 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
82 EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
83 super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
84 rejectedExecutionHandler);
85 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
86 this.kqueueFd = Native.newKQueue();
87 if (maxEvents == 0) {
88 allowGrowing = true;
89 maxEvents = 4096;
90 } else {
91 allowGrowing = false;
92 }
93 this.changeList = new KQueueEventArray(maxEvents);
94 this.eventList = new KQueueEventArray(maxEvents);
95 int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
96 if (result < 0) {
97 cleanup();
98 throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
99 }
100 }
101
102 private static Queue<Runnable> newTaskQueue(
103 EventLoopTaskQueueFactory queueFactory) {
104 if (queueFactory == null) {
105 return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
106 }
107 return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
108 }
109
110 void add(AbstractKQueueChannel ch) {
111 assert inEventLoop();
112 AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
113
114
115 assert old == null || !old.isOpen();
116 }
117
118 void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
119 assert inEventLoop();
120 changeList.evSet(ch, filter, flags, fflags);
121 }
122
123 void remove(AbstractKQueueChannel ch) throws Exception {
124 assert inEventLoop();
125 int fd = ch.fd().intValue();
126
127 AbstractKQueueChannel old = channels.remove(fd);
128 if (old != null && old != ch) {
129
130 channels.put(fd, old);
131
132
133 assert !ch.isOpen();
134 } else if (ch.isOpen()) {
135
136
137
138
139 ch.unregisterFilters();
140 }
141 }
142
143
144
145
146 IovArray cleanArray() {
147 iovArray.clear();
148 return iovArray;
149 }
150
151 @Override
152 protected void wakeup(boolean inEventLoop) {
153 if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
154 wakeup();
155 }
156 }
157
158 private void wakeup() {
159 Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
160
161
162 }
163
164 private int kqueueWait(boolean oldWakeup) throws IOException {
165
166
167
168
169 if (oldWakeup && hasTasks()) {
170 return kqueueWaitNow();
171 }
172
173 long totalDelay = delayNanos(System.nanoTime());
174 int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
175 int delayNanos = (int) (totalDelay % 1000000000L);
176 return kqueueWait(delaySeconds, delayNanos);
177 }
178
179 private int kqueueWaitNow() throws IOException {
180 return kqueueWait(0, 0);
181 }
182
183 private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
184 int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
185 changeList.clear();
186 return numEvents;
187 }
188
189 private void processReady(int ready) {
190 for (int i = 0; i < ready; ++i) {
191 final short filter = eventList.filter(i);
192 final short flags = eventList.flags(i);
193 final int fd = eventList.fd(i);
194 if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
195
196
197 assert filter != Native.EVFILT_USER ||
198 (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
199 continue;
200 }
201
202 AbstractKQueueChannel channel = channels.get(fd);
203 if (channel == null) {
204
205
206
207 logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
208 continue;
209 }
210
211 AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
212
213
214 if (filter == Native.EVFILT_WRITE) {
215 unsafe.writeReady();
216 } else if (filter == Native.EVFILT_READ) {
217
218 unsafe.readReady(eventList.data(i));
219 } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
220 unsafe.readEOF();
221 }
222
223
224
225
226 if ((flags & Native.EV_EOF) != 0) {
227 unsafe.readEOF();
228 }
229 }
230 }
231
232 @Override
233 protected void run() {
234 for (;;) {
235 try {
236 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
237 switch (strategy) {
238 case SelectStrategy.CONTINUE:
239 continue;
240
241 case SelectStrategy.BUSY_WAIT:
242
243
244 case SelectStrategy.SELECT:
245 strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275 if (wakenUp == 1) {
276 wakeup();
277 }
278
279 default:
280 }
281
282 final int ioRatio = this.ioRatio;
283 if (ioRatio == 100) {
284 try {
285 if (strategy > 0) {
286 processReady(strategy);
287 }
288 } finally {
289 runAllTasks();
290 }
291 } else {
292 final long ioStartTime = System.nanoTime();
293
294 try {
295 if (strategy > 0) {
296 processReady(strategy);
297 }
298 } finally {
299 final long ioTime = System.nanoTime() - ioStartTime;
300 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
301 }
302 }
303 if (allowGrowing && strategy == eventList.capacity()) {
304
305 eventList.realloc(false);
306 }
307 } catch (Error e) {
308 throw e;
309 } catch (Throwable t) {
310 handleLoopException(t);
311 } finally {
312
313 try {
314 if (isShuttingDown()) {
315 closeAll();
316 if (confirmShutdown()) {
317 break;
318 }
319 }
320 } catch (Error e) {
321 throw e;
322 } catch (Throwable t) {
323 handleLoopException(t);
324 }
325 }
326 }
327 }
328
329 @Override
330 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
331 return newTaskQueue0(maxPendingTasks);
332 }
333
334 private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
335
336 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
337 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
338 }
339
340
341
342
343 public int getIoRatio() {
344 return ioRatio;
345 }
346
347
348
349
350
351 public void setIoRatio(int ioRatio) {
352 if (ioRatio <= 0 || ioRatio > 100) {
353 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
354 }
355 this.ioRatio = ioRatio;
356 }
357
358 @Override
359 public int registeredChannels() {
360 return channels.size();
361 }
362
363 @Override
364 public Iterator<Channel> registeredChannelsIterator() {
365 assert inEventLoop();
366 IntObjectMap<AbstractKQueueChannel> ch = channels;
367 if (ch.isEmpty()) {
368 return ChannelsReadOnlyIterator.empty();
369 }
370 return new ChannelsReadOnlyIterator<AbstractKQueueChannel>(ch.values());
371 }
372
373 @Override
374 protected void cleanup() {
375 try {
376 try {
377 kqueueFd.close();
378 } catch (IOException e) {
379 logger.warn("Failed to close the kqueue fd.", e);
380 }
381 } finally {
382
383 changeList.free();
384 eventList.free();
385 }
386 }
387
388 private void closeAll() {
389 try {
390 kqueueWaitNow();
391 } catch (IOException e) {
392
393 }
394
395
396
397 AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
398
399 for (AbstractKQueueChannel ch: localChannels) {
400 ch.unsafe().close(ch.unsafe().voidPromise());
401 }
402 }
403
404 private static void handleLoopException(Throwable t) {
405 logger.warn("Unexpected exception in the selector loop.", t);
406
407
408
409 try {
410 Thread.sleep(1000);
411 } catch (InterruptedException e) {
412
413 }
414 }
415 }