1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.RecvByteBufAllocator;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.SocketChannelConfig;
37 import io.netty.channel.unix.FileDescriptor;
38 import io.netty.channel.unix.IovArray;
39 import io.netty.channel.unix.Socket;
40 import io.netty.channel.unix.UnixChannel;
41 import io.netty.util.ReferenceCountUtil;
42 import io.netty.util.concurrent.Future;
43
44 import java.io.IOException;
45 import java.net.InetSocketAddress;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.AlreadyConnectedException;
49 import java.nio.channels.ClosedChannelException;
50 import java.nio.channels.ConnectionPendingException;
51 import java.nio.channels.NotYetConnectedException;
52 import java.nio.channels.UnresolvedAddressException;
53 import java.util.concurrent.TimeUnit;
54
55 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
56 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
57 import static io.netty.util.internal.ObjectUtil.checkNotNull;
58
59 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
60 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
61 protected final LinuxSocket socket;
62
63
64
65
66 private ChannelPromise connectPromise;
67 private Future<?> connectTimeoutFuture;
68 private SocketAddress requestedRemoteAddress;
69
70 private volatile SocketAddress local;
71 private volatile SocketAddress remote;
72
73 protected int flags = Native.EPOLLET;
74 boolean inputClosedSeenErrorOnRead;
75 boolean epollInReadyRunnablePending;
76
77 protected volatile boolean active;
78
79 AbstractEpollChannel(LinuxSocket fd) {
80 this(null, fd, false);
81 }
82
83 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active) {
84 super(parent);
85 this.socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 this.local = fd.localAddress();
91 this.remote = fd.remoteAddress();
92 }
93 }
94
95 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
96 super(parent);
97 this.socket = checkNotNull(fd, "fd");
98 this.active = true;
99
100
101 this.remote = remote;
102 this.local = fd.localAddress();
103 }
104
105 static boolean isSoErrorZero(Socket fd) {
106 try {
107 return fd.getSoError() == 0;
108 } catch (IOException e) {
109 throw new ChannelException(e);
110 }
111 }
112
113 protected void setFlag(int flag) throws IOException {
114 if (!isFlagSet(flag)) {
115 flags |= flag;
116 modifyEvents();
117 }
118 }
119
120 void clearFlag(int flag) throws IOException {
121 if (isFlagSet(flag)) {
122 flags &= ~flag;
123 modifyEvents();
124 }
125 }
126
127 boolean isFlagSet(int flag) {
128 return (flags & flag) != 0;
129 }
130
131 @Override
132 public final FileDescriptor fd() {
133 return socket;
134 }
135
136 @Override
137 public abstract EpollChannelConfig config();
138
139 @Override
140 public boolean isActive() {
141 return active;
142 }
143
144 @Override
145 public ChannelMetadata metadata() {
146 return METADATA;
147 }
148
149 @Override
150 protected void doClose() throws Exception {
151 active = false;
152
153
154 inputClosedSeenErrorOnRead = true;
155 try {
156 ChannelPromise promise = connectPromise;
157 if (promise != null) {
158
159 promise.tryFailure(new ClosedChannelException());
160 connectPromise = null;
161 }
162
163 Future<?> future = connectTimeoutFuture;
164 if (future != null) {
165 future.cancel(false);
166 connectTimeoutFuture = null;
167 }
168
169 if (isRegistered()) {
170
171
172
173
174 EventLoop loop = eventLoop();
175 if (loop.inEventLoop()) {
176 doDeregister();
177 } else {
178 loop.execute(new Runnable() {
179 @Override
180 public void run() {
181 try {
182 doDeregister();
183 } catch (Throwable cause) {
184 pipeline().fireExceptionCaught(cause);
185 }
186 }
187 });
188 }
189 }
190 } finally {
191 socket.close();
192 }
193 }
194
195 void resetCachedAddresses() {
196 local = socket.localAddress();
197 remote = socket.remoteAddress();
198 }
199
200 @Override
201 protected void doDisconnect() throws Exception {
202 doClose();
203 }
204
205 @Override
206 protected boolean isCompatible(EventLoop loop) {
207 return loop instanceof EpollEventLoop;
208 }
209
210 @Override
211 public boolean isOpen() {
212 return socket.isOpen();
213 }
214
215 @Override
216 protected void doDeregister() throws Exception {
217 ((EpollEventLoop) eventLoop()).remove(this);
218 }
219
220 @Override
221 protected final void doBeginRead() throws Exception {
222
223 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
224 unsafe.readPending = true;
225
226
227
228
229 setFlag(Native.EPOLLIN);
230
231
232
233 if (unsafe.maybeMoreDataToRead) {
234 unsafe.executeEpollInReadyRunnable(config());
235 }
236 }
237
238 final boolean shouldBreakEpollInReady(ChannelConfig config) {
239 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
240 }
241
242 private static boolean isAllowHalfClosure(ChannelConfig config) {
243 if (config instanceof EpollDomainSocketChannelConfig) {
244 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
245 }
246 return config instanceof SocketChannelConfig &&
247 ((SocketChannelConfig) config).isAllowHalfClosure();
248 }
249
250 final void clearEpollIn() {
251
252 if (isRegistered()) {
253 final EventLoop loop = eventLoop();
254 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
255 if (loop.inEventLoop()) {
256 unsafe.clearEpollIn0();
257 } else {
258
259 loop.execute(new Runnable() {
260 @Override
261 public void run() {
262 if (!unsafe.readPending && !config().isAutoRead()) {
263
264 unsafe.clearEpollIn0();
265 }
266 }
267 });
268 }
269 } else {
270
271
272 flags &= ~Native.EPOLLIN;
273 }
274 }
275
276 private void modifyEvents() throws IOException {
277 if (isOpen() && isRegistered()) {
278 ((EpollEventLoop) eventLoop()).modify(this);
279 }
280 }
281
282 @Override
283 protected void doRegister() throws Exception {
284
285
286
287 epollInReadyRunnablePending = false;
288 ((EpollEventLoop) eventLoop()).add(this);
289 }
290
291 @Override
292 protected abstract AbstractEpollUnsafe newUnsafe();
293
294
295
296
297 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
298 return newDirectBuffer(buf, buf);
299 }
300
301
302
303
304
305
306 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
307 final int readableBytes = buf.readableBytes();
308 if (readableBytes == 0) {
309 ReferenceCountUtil.release(holder);
310 return Unpooled.EMPTY_BUFFER;
311 }
312
313 final ByteBufAllocator alloc = alloc();
314 if (alloc.isDirectBufferPooled()) {
315 return newDirectBuffer0(holder, buf, alloc, readableBytes);
316 }
317
318 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
319 if (directBuf == null) {
320 return newDirectBuffer0(holder, buf, alloc, readableBytes);
321 }
322
323 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
324 ReferenceCountUtil.safeRelease(holder);
325 return directBuf;
326 }
327
328 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
329 final ByteBuf directBuf = alloc.directBuffer(capacity);
330 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
331 ReferenceCountUtil.safeRelease(holder);
332 return directBuf;
333 }
334
335 protected static void checkResolvable(InetSocketAddress addr) {
336 if (addr.isUnresolved()) {
337 throw new UnresolvedAddressException();
338 }
339 }
340
341
342
343
344 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
345 int writerIndex = byteBuf.writerIndex();
346 int localReadAmount;
347 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
348 if (byteBuf.hasMemoryAddress()) {
349 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
350 } else {
351 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
352 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
353 }
354 if (localReadAmount > 0) {
355 byteBuf.writerIndex(writerIndex + localReadAmount);
356 }
357 return localReadAmount;
358 }
359
360 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
361 if (buf.hasMemoryAddress()) {
362 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
363 if (localFlushedAmount > 0) {
364 in.removeBytes(localFlushedAmount);
365 return 1;
366 }
367 } else {
368 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
369 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
370 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
371 if (localFlushedAmount > 0) {
372 nioBuf.position(nioBuf.position() + localFlushedAmount);
373 in.removeBytes(localFlushedAmount);
374 return 1;
375 }
376 }
377 return WRITE_STATUS_SNDBUF_FULL;
378 }
379
380
381
382
383
384 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
385 throws IOException {
386 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
387 if (data.hasMemoryAddress()) {
388 long memoryAddress = data.memoryAddress();
389 if (remoteAddress == null) {
390 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
391 }
392 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
393 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
394 }
395
396 if (data.nioBufferCount() > 1) {
397 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
398 array.add(data, data.readerIndex(), data.readableBytes());
399 int cnt = array.count();
400 assert cnt != 0;
401
402 if (remoteAddress == null) {
403 return socket.writevAddresses(array.memoryAddress(0), cnt);
404 }
405 return socket.sendToAddresses(array.memoryAddress(0), cnt,
406 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
407 }
408
409 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
410 if (remoteAddress == null) {
411 return socket.send(nioData, nioData.position(), nioData.limit());
412 }
413 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
414 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
415 }
416
417 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
418 boolean readPending;
419 boolean maybeMoreDataToRead;
420 private EpollRecvByteAllocatorHandle allocHandle;
421 private final Runnable epollInReadyRunnable = new Runnable() {
422 @Override
423 public void run() {
424 epollInReadyRunnablePending = false;
425 epollInReady();
426 }
427 };
428
429
430
431
432 abstract void epollInReady();
433
434 final void epollInBefore() {
435 maybeMoreDataToRead = false;
436 }
437
438 final void epollInFinally(ChannelConfig config) {
439 maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
440
441 if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
442
443
444
445
446
447
448
449 executeEpollInReadyRunnable(config);
450 } else if (!readPending && !config.isAutoRead()) {
451
452
453
454
455
456
457 clearEpollIn();
458 }
459 }
460
461 final void executeEpollInReadyRunnable(ChannelConfig config) {
462 if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
463 return;
464 }
465 epollInReadyRunnablePending = true;
466 eventLoop().execute(epollInReadyRunnable);
467 }
468
469
470
471
472 final void epollRdHupReady() {
473
474 recvBufAllocHandle().receivedRdHup();
475
476 if (isActive()) {
477
478
479
480 epollInReady();
481 } else {
482
483 shutdownInput(true);
484 }
485
486
487 clearEpollRdHup();
488 }
489
490
491
492
493 private void clearEpollRdHup() {
494 try {
495 clearFlag(Native.EPOLLRDHUP);
496 } catch (IOException e) {
497 pipeline().fireExceptionCaught(e);
498 close(voidPromise());
499 }
500 }
501
502
503
504
505 void shutdownInput(boolean rdHup) {
506 if (!socket.isInputShutdown()) {
507 if (isAllowHalfClosure(config())) {
508 try {
509 socket.shutdown(true, false);
510 } catch (IOException ignored) {
511
512
513 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
514 return;
515 } catch (NotYetConnectedException ignore) {
516
517
518 }
519 clearEpollIn0();
520 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
521 } else {
522 close(voidPromise());
523 }
524 } else if (!rdHup && !inputClosedSeenErrorOnRead) {
525 inputClosedSeenErrorOnRead = true;
526 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
527 }
528 }
529
530 private void fireEventAndClose(Object evt) {
531 pipeline().fireUserEventTriggered(evt);
532 close(voidPromise());
533 }
534
535 @Override
536 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
537 if (allocHandle == null) {
538 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
539 }
540 return allocHandle;
541 }
542
543
544
545
546
547 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
548 return new EpollRecvByteAllocatorHandle(handle);
549 }
550
551 @Override
552 protected final void flush0() {
553
554
555
556 if (!isFlagSet(Native.EPOLLOUT)) {
557 super.flush0();
558 }
559 }
560
561
562
563
564 final void epollOutReady() {
565 if (connectPromise != null) {
566
567 finishConnect();
568 } else if (!socket.isOutputShutdown()) {
569
570 super.flush0();
571 }
572 }
573
574 protected final void clearEpollIn0() {
575 assert eventLoop().inEventLoop();
576 try {
577 readPending = false;
578 clearFlag(Native.EPOLLIN);
579 } catch (IOException e) {
580
581
582 pipeline().fireExceptionCaught(e);
583 unsafe().close(unsafe().voidPromise());
584 }
585 }
586
587 @Override
588 public void connect(
589 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
590
591
592 if (promise.isDone() || !ensureOpen(promise)) {
593 return;
594 }
595
596 try {
597 if (connectPromise != null) {
598 throw new ConnectionPendingException();
599 }
600
601 boolean wasActive = isActive();
602 if (doConnect(remoteAddress, localAddress)) {
603 fulfillConnectPromise(promise, wasActive);
604 } else {
605 connectPromise = promise;
606 requestedRemoteAddress = remoteAddress;
607
608
609 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
610 if (connectTimeoutMillis > 0) {
611 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
612 @Override
613 public void run() {
614 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
615 if (connectPromise != null && !connectPromise.isDone()
616 && connectPromise.tryFailure(new ConnectTimeoutException(
617 "connection timed out after " + connectTimeoutMillis + " ms: " +
618 remoteAddress))) {
619 close(voidPromise());
620 }
621 }
622 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
623 }
624
625 promise.addListener(new ChannelFutureListener() {
626 @Override
627 public void operationComplete(ChannelFuture future) {
628
629
630 if (future.isCancelled()) {
631 if (connectTimeoutFuture != null) {
632 connectTimeoutFuture.cancel(false);
633 }
634 connectPromise = null;
635 close(voidPromise());
636 }
637 }
638 });
639 }
640 } catch (Throwable t) {
641 closeIfClosed();
642 promise.tryFailure(annotateConnectException(t, remoteAddress));
643 }
644 }
645
646 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
647 if (promise == null) {
648
649 return;
650 }
651 active = true;
652
653
654
655 boolean active = isActive();
656
657
658 boolean promiseSet = promise.trySuccess();
659
660
661
662 if (!wasActive && active) {
663 pipeline().fireChannelActive();
664 }
665
666
667 if (!promiseSet) {
668 close(voidPromise());
669 }
670 }
671
672 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
673 if (promise == null) {
674
675 return;
676 }
677
678
679 promise.tryFailure(cause);
680 closeIfClosed();
681 }
682
683 private void finishConnect() {
684
685
686
687 assert eventLoop().inEventLoop();
688
689 boolean connectStillInProgress = false;
690 try {
691 boolean wasActive = isActive();
692 if (!doFinishConnect()) {
693 connectStillInProgress = true;
694 return;
695 }
696 fulfillConnectPromise(connectPromise, wasActive);
697 } catch (Throwable t) {
698 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
699 } finally {
700 if (!connectStillInProgress) {
701
702
703 if (connectTimeoutFuture != null) {
704 connectTimeoutFuture.cancel(false);
705 }
706 connectPromise = null;
707 }
708 }
709 }
710
711
712
713
714 private boolean doFinishConnect() throws Exception {
715 if (socket.finishConnect()) {
716 clearFlag(Native.EPOLLOUT);
717 if (requestedRemoteAddress instanceof InetSocketAddress) {
718 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
719 }
720 requestedRemoteAddress = null;
721
722 return true;
723 }
724 setFlag(Native.EPOLLOUT);
725 return false;
726 }
727 }
728
729 @Override
730 protected void doBind(SocketAddress local) throws Exception {
731 if (local instanceof InetSocketAddress) {
732 checkResolvable((InetSocketAddress) local);
733 }
734 socket.bind(local);
735 this.local = socket.localAddress();
736 }
737
738
739
740
741 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
742 if (localAddress instanceof InetSocketAddress) {
743 checkResolvable((InetSocketAddress) localAddress);
744 }
745
746 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
747 ? (InetSocketAddress) remoteAddress : null;
748 if (remoteSocketAddr != null) {
749 checkResolvable(remoteSocketAddr);
750 }
751
752 if (remote != null) {
753
754
755
756 throw new AlreadyConnectedException();
757 }
758
759 if (localAddress != null) {
760 socket.bind(localAddress);
761 }
762
763 boolean connected = doConnect0(remoteAddress);
764 if (connected) {
765 remote = remoteSocketAddr == null ?
766 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
767 }
768
769
770
771 local = socket.localAddress();
772 return connected;
773 }
774
775 boolean doConnect0(SocketAddress remote) throws Exception {
776 boolean success = false;
777 try {
778 boolean connected = socket.connect(remote);
779 if (!connected) {
780 setFlag(Native.EPOLLOUT);
781 }
782 success = true;
783 return connected;
784 } finally {
785 if (!success) {
786 doClose();
787 }
788 }
789 }
790
791 @Override
792 protected SocketAddress localAddress0() {
793 return local;
794 }
795
796 @Override
797 protected SocketAddress remoteAddress0() {
798 return remote;
799 }
800 }