1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.RecvByteBufAllocator;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.SocketChannelConfig;
37 import io.netty.channel.unix.FileDescriptor;
38 import io.netty.channel.unix.UnixChannel;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.Future;
41 import io.netty.util.internal.UnstableApi;
42
43 import java.io.IOException;
44 import java.net.ConnectException;
45 import java.net.InetSocketAddress;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.AlreadyConnectedException;
49 import java.nio.channels.ConnectionPendingException;
50 import java.nio.channels.NotYetConnectedException;
51 import java.nio.channels.UnresolvedAddressException;
52 import java.util.concurrent.TimeUnit;
53
54 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56 import static io.netty.util.internal.ObjectUtil.checkNotNull;
57
58 abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60
61
62
63
64 private ChannelPromise connectPromise;
65 private Future<?> connectTimeoutFuture;
66 private SocketAddress requestedRemoteAddress;
67
68 final BsdSocket socket;
69 private boolean readFilterEnabled;
70 private boolean writeFilterEnabled;
71 boolean readReadyRunnablePending;
72 boolean inputClosedSeenErrorOnRead;
73 protected volatile boolean active;
74 private volatile SocketAddress local;
75 private volatile SocketAddress remote;
76
77 AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
78 super(parent);
79 socket = checkNotNull(fd, "fd");
80 this.active = active;
81 if (active) {
82
83
84 local = fd.localAddress();
85 remote = fd.remoteAddress();
86 }
87 }
88
89 AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
90 super(parent);
91 socket = checkNotNull(fd, "fd");
92 active = true;
93
94
95 this.remote = remote;
96 local = fd.localAddress();
97 }
98
99 static boolean isSoErrorZero(BsdSocket fd) {
100 try {
101 return fd.getSoError() == 0;
102 } catch (IOException e) {
103 throw new ChannelException(e);
104 }
105 }
106
107 @Override
108 public final FileDescriptor fd() {
109 return socket;
110 }
111
112 @Override
113 public boolean isActive() {
114 return active;
115 }
116
117 @Override
118 public ChannelMetadata metadata() {
119 return METADATA;
120 }
121
122 @Override
123 protected void doClose() throws Exception {
124 active = false;
125
126
127 inputClosedSeenErrorOnRead = true;
128 socket.close();
129 }
130
131 @Override
132 protected void doDisconnect() throws Exception {
133 doClose();
134 }
135
136 void resetCachedAddresses() {
137 local = socket.localAddress();
138 remote = socket.remoteAddress();
139 }
140
141 @Override
142 protected boolean isCompatible(EventLoop loop) {
143 return loop instanceof KQueueEventLoop;
144 }
145
146 @Override
147 public boolean isOpen() {
148 return socket.isOpen();
149 }
150
151 @Override
152 protected void doDeregister() throws Exception {
153 ((KQueueEventLoop) eventLoop()).remove(this);
154
155
156
157 readFilterEnabled = false;
158 writeFilterEnabled = false;
159 }
160
161 void unregisterFilters() throws Exception {
162
163 readFilter(false);
164 writeFilter(false);
165 clearRdHup0();
166 }
167
168 private void clearRdHup0() {
169 evSet0(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP);
170 }
171
172 @Override
173 protected final void doBeginRead() throws Exception {
174
175 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
176 unsafe.readPending = true;
177
178
179
180
181 readFilter(true);
182
183
184
185 if (unsafe.maybeMoreDataToRead) {
186 unsafe.executeReadReadyRunnable(config());
187 }
188 }
189
190 @Override
191 protected void doRegister() throws Exception {
192
193
194
195 readReadyRunnablePending = false;
196
197 ((KQueueEventLoop) eventLoop()).add(this);
198
199
200 if (writeFilterEnabled) {
201 evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
202 }
203 if (readFilterEnabled) {
204 evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
205 }
206 evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
207 }
208
209 @Override
210 protected abstract AbstractKQueueUnsafe newUnsafe();
211
212 @Override
213 public abstract KQueueChannelConfig config();
214
215
216
217
218 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
219 return newDirectBuffer(buf, buf);
220 }
221
222
223
224
225
226
227 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
228 final int readableBytes = buf.readableBytes();
229 if (readableBytes == 0) {
230 ReferenceCountUtil.release(holder);
231 return Unpooled.EMPTY_BUFFER;
232 }
233
234 final ByteBufAllocator alloc = alloc();
235 if (alloc.isDirectBufferPooled()) {
236 return newDirectBuffer0(holder, buf, alloc, readableBytes);
237 }
238
239 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
240 if (directBuf == null) {
241 return newDirectBuffer0(holder, buf, alloc, readableBytes);
242 }
243
244 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
245 ReferenceCountUtil.safeRelease(holder);
246 return directBuf;
247 }
248
249 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
250 final ByteBuf directBuf = alloc.directBuffer(capacity);
251 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
252 ReferenceCountUtil.safeRelease(holder);
253 return directBuf;
254 }
255
256 protected static void checkResolvable(InetSocketAddress addr) {
257 if (addr.isUnresolved()) {
258 throw new UnresolvedAddressException();
259 }
260 }
261
262
263
264
265 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
266 int writerIndex = byteBuf.writerIndex();
267 int localReadAmount;
268 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
269 if (byteBuf.hasMemoryAddress()) {
270 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
271 } else {
272 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
273 localReadAmount = socket.read(buf, buf.position(), buf.limit());
274 }
275 if (localReadAmount > 0) {
276 byteBuf.writerIndex(writerIndex + localReadAmount);
277 }
278 return localReadAmount;
279 }
280
281 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
282 if (buf.hasMemoryAddress()) {
283 int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
284 if (localFlushedAmount > 0) {
285 in.removeBytes(localFlushedAmount);
286 return 1;
287 }
288 } else {
289 final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
290 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
291 int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
292 if (localFlushedAmount > 0) {
293 nioBuf.position(nioBuf.position() + localFlushedAmount);
294 in.removeBytes(localFlushedAmount);
295 return 1;
296 }
297 }
298 return WRITE_STATUS_SNDBUF_FULL;
299 }
300
301 final boolean shouldBreakReadReady(ChannelConfig config) {
302 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
303 }
304
305 private static boolean isAllowHalfClosure(ChannelConfig config) {
306 if (config instanceof KQueueDomainSocketChannelConfig) {
307 return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
308 }
309
310 return config instanceof SocketChannelConfig &&
311 ((SocketChannelConfig) config).isAllowHalfClosure();
312 }
313
314 final void clearReadFilter() {
315
316 if (isRegistered()) {
317 final EventLoop loop = eventLoop();
318 final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
319 if (loop.inEventLoop()) {
320 unsafe.clearReadFilter0();
321 } else {
322
323 loop.execute(new Runnable() {
324 @Override
325 public void run() {
326 if (!unsafe.readPending && !config().isAutoRead()) {
327
328 unsafe.clearReadFilter0();
329 }
330 }
331 });
332 }
333 } else {
334
335
336 readFilterEnabled = false;
337 }
338 }
339
340 void readFilter(boolean readFilterEnabled) throws IOException {
341 if (this.readFilterEnabled != readFilterEnabled) {
342 this.readFilterEnabled = readFilterEnabled;
343 evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
344 }
345 }
346
347 void writeFilter(boolean writeFilterEnabled) throws IOException {
348 if (this.writeFilterEnabled != writeFilterEnabled) {
349 this.writeFilterEnabled = writeFilterEnabled;
350 evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
351 }
352 }
353
354 private void evSet(short filter, short flags) {
355 if (isRegistered()) {
356 evSet0(filter, flags);
357 }
358 }
359
360 private void evSet0(short filter, short flags) {
361 evSet0(filter, flags, 0);
362 }
363
364 private void evSet0(short filter, short flags, int fflags) {
365
366 if (isOpen()) {
367 ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
368 }
369 }
370
371 @UnstableApi
372 public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
373 boolean readPending;
374 boolean maybeMoreDataToRead;
375 private KQueueRecvByteAllocatorHandle allocHandle;
376 private final Runnable readReadyRunnable = new Runnable() {
377 @Override
378 public void run() {
379 readReadyRunnablePending = false;
380 readReady(recvBufAllocHandle());
381 }
382 };
383
384 final void readReady(long numberBytesPending) {
385 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386 allocHandle.numberBytesPending(numberBytesPending);
387 readReady(allocHandle);
388 }
389
390 abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
391
392 final void readReadyBefore() {
393 maybeMoreDataToRead = false;
394 }
395
396 final void readReadyFinally(ChannelConfig config) {
397 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
398
399 if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
400
401
402
403
404
405
406
407 executeReadReadyRunnable(config);
408 } else if (!readPending && !config.isAutoRead()) {
409
410
411
412
413
414
415 clearReadFilter0();
416 }
417 }
418
419 final boolean failConnectPromise(Throwable cause) {
420 if (connectPromise != null) {
421
422
423
424 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
425 AbstractKQueueChannel.this.connectPromise = null;
426 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
427 : new ConnectException("failed to connect").initCause(cause))) {
428 closeIfClosed();
429 return true;
430 }
431 }
432 return false;
433 }
434
435 final void writeReady() {
436 if (connectPromise != null) {
437
438 finishConnect();
439 } else if (!socket.isOutputShutdown()) {
440
441 super.flush0();
442 }
443 }
444
445
446
447
448 void shutdownInput(boolean readEOF) {
449
450
451
452
453
454 if (readEOF && connectPromise != null) {
455 finishConnect();
456 }
457 if (!socket.isInputShutdown()) {
458 if (isAllowHalfClosure(config())) {
459 try {
460 socket.shutdown(true, false);
461 } catch (IOException ignored) {
462
463
464 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
465 return;
466 } catch (NotYetConnectedException ignore) {
467
468
469 }
470 clearReadFilter0();
471 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
472 } else {
473 close(voidPromise());
474 }
475 } else if (!readEOF && !inputClosedSeenErrorOnRead) {
476 inputClosedSeenErrorOnRead = true;
477 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
478 }
479 }
480
481 final void readEOF() {
482
483 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
484 allocHandle.readEOF();
485
486 if (isActive()) {
487
488
489
490 readReady(allocHandle);
491 } else {
492
493 shutdownInput(true);
494 }
495
496
497 clearRdHup0();
498 }
499
500 @Override
501 public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
502 if (allocHandle == null) {
503 allocHandle = new KQueueRecvByteAllocatorHandle(
504 (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
505 }
506 return allocHandle;
507 }
508
509 @Override
510 protected final void flush0() {
511
512
513
514 if (!writeFilterEnabled) {
515 super.flush0();
516 }
517 }
518
519 final void executeReadReadyRunnable(ChannelConfig config) {
520 if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
521 return;
522 }
523 readReadyRunnablePending = true;
524 eventLoop().execute(readReadyRunnable);
525 }
526
527 protected final void clearReadFilter0() {
528 assert eventLoop().inEventLoop();
529 try {
530 readPending = false;
531 readFilter(false);
532 } catch (IOException e) {
533
534
535 pipeline().fireExceptionCaught(e);
536 unsafe().close(unsafe().voidPromise());
537 }
538 }
539
540 private void fireEventAndClose(Object evt) {
541 pipeline().fireUserEventTriggered(evt);
542 close(voidPromise());
543 }
544
545 @Override
546 public void connect(
547 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
548
549
550 if (promise.isDone() || !ensureOpen(promise)) {
551 return;
552 }
553
554 try {
555 if (connectPromise != null) {
556 throw new ConnectionPendingException();
557 }
558
559 boolean wasActive = isActive();
560 if (doConnect(remoteAddress, localAddress)) {
561 fulfillConnectPromise(promise, wasActive);
562 } else {
563 connectPromise = promise;
564 requestedRemoteAddress = remoteAddress;
565
566
567 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
568 if (connectTimeoutMillis > 0) {
569 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
570 @Override
571 public void run() {
572 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
573 if (connectPromise != null && !connectPromise.isDone()
574 && connectPromise.tryFailure(new ConnectTimeoutException(
575 "connection timed out after " + connectTimeoutMillis + " ms: " +
576 remoteAddress))) {
577 close(voidPromise());
578 }
579 }
580 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
581 }
582
583 promise.addListener(new ChannelFutureListener() {
584 @Override
585 public void operationComplete(ChannelFuture future) {
586
587
588 if (future.isCancelled()) {
589 if (connectTimeoutFuture != null) {
590 connectTimeoutFuture.cancel(false);
591 }
592 connectPromise = null;
593 close(voidPromise());
594 }
595 }
596 });
597 }
598 } catch (Throwable t) {
599 closeIfClosed();
600 promise.tryFailure(annotateConnectException(t, remoteAddress));
601 }
602 }
603
604 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
605 if (promise == null) {
606
607 return;
608 }
609 active = true;
610
611
612
613 boolean active = isActive();
614
615
616 boolean promiseSet = promise.trySuccess();
617
618
619
620 if (!wasActive && active) {
621 pipeline().fireChannelActive();
622 }
623
624
625 if (!promiseSet) {
626 close(voidPromise());
627 }
628 }
629
630 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
631 if (promise == null) {
632
633 return;
634 }
635
636
637 promise.tryFailure(cause);
638 closeIfClosed();
639 }
640
641 private void finishConnect() {
642
643
644
645 assert eventLoop().inEventLoop();
646
647 boolean connectStillInProgress = false;
648 try {
649 boolean wasActive = isActive();
650 if (!doFinishConnect()) {
651 connectStillInProgress = true;
652 return;
653 }
654 fulfillConnectPromise(connectPromise, wasActive);
655 } catch (Throwable t) {
656 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
657 } finally {
658 if (!connectStillInProgress) {
659
660
661 if (connectTimeoutFuture != null) {
662 connectTimeoutFuture.cancel(false);
663 }
664 connectPromise = null;
665 }
666 }
667 }
668
669 private boolean doFinishConnect() throws Exception {
670 if (socket.finishConnect()) {
671 writeFilter(false);
672 if (requestedRemoteAddress instanceof InetSocketAddress) {
673 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
674 }
675 requestedRemoteAddress = null;
676 return true;
677 }
678 writeFilter(true);
679 return false;
680 }
681 }
682
683 @Override
684 protected void doBind(SocketAddress local) throws Exception {
685 if (local instanceof InetSocketAddress) {
686 checkResolvable((InetSocketAddress) local);
687 }
688 socket.bind(local);
689 this.local = socket.localAddress();
690 }
691
692
693
694
695 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
696 if (localAddress instanceof InetSocketAddress) {
697 checkResolvable((InetSocketAddress) localAddress);
698 }
699
700 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
701 ? (InetSocketAddress) remoteAddress : null;
702 if (remoteSocketAddr != null) {
703 checkResolvable(remoteSocketAddr);
704 }
705
706 if (remote != null) {
707
708
709
710 throw new AlreadyConnectedException();
711 }
712
713 if (localAddress != null) {
714 socket.bind(localAddress);
715 }
716
717 boolean connected = doConnect0(remoteAddress, localAddress);
718 if (connected) {
719 remote = remoteSocketAddr == null?
720 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
721 }
722
723
724
725 local = socket.localAddress();
726 return connected;
727 }
728
729 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
730 boolean success = false;
731 try {
732 boolean connected = socket.connect(remoteAddress);
733 if (!connected) {
734 writeFilter(true);
735 }
736 success = true;
737 return connected;
738 } finally {
739 if (!success) {
740 doClose();
741 }
742 }
743 }
744
745 @Override
746 protected SocketAddress localAddress0() {
747 return local;
748 }
749
750 @Override
751 protected SocketAddress remoteAddress0() {
752 return remote;
753 }
754 }