1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.UnstableApi;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60 private static final String EXPECTED_TYPES =
61 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65 private final Runnable flushTask = new Runnable() {
66 @Override
67 public void run() {
68
69
70 ((AbstractEpollUnsafe) unsafe()).flush0();
71 }
72 };
73
74
75 private volatile Queue<SpliceInTask> spliceQueue;
76 private FileDescriptor pipeIn;
77 private FileDescriptor pipeOut;
78
79 private WritableByteChannel byteChannel;
80
81 protected AbstractEpollStreamChannel(Channel parent, int fd) {
82 this(parent, new LinuxSocket(fd));
83 }
84
85 protected AbstractEpollStreamChannel(int fd) {
86 this(new LinuxSocket(fd));
87 }
88
89 AbstractEpollStreamChannel(LinuxSocket fd) {
90 this(fd, isSoErrorZero(fd));
91 }
92
93 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94 super(parent, fd, true);
95
96 flags |= Native.EPOLLRDHUP;
97 }
98
99 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100 super(parent, fd, remote);
101
102 flags |= Native.EPOLLRDHUP;
103 }
104
105 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106 super(null, fd, active);
107
108 flags |= Native.EPOLLRDHUP;
109 }
110
111 @Override
112 protected AbstractEpollUnsafe newUnsafe() {
113 return new EpollStreamUnsafe();
114 }
115
116 @Override
117 public ChannelMetadata metadata() {
118 return METADATA;
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136 return spliceTo(ch, len, newPromise());
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154 final ChannelPromise promise) {
155 if (ch.eventLoop() != eventLoop()) {
156 throw new IllegalArgumentException("EventLoops are not the same.");
157 }
158 checkPositiveOrZero(len, "len");
159 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162 }
163 checkNotNull(promise, "promise");
164 if (!isOpen()) {
165 promise.tryFailure(new ClosedChannelException());
166 } else {
167 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168 failSpliceIfClosed(promise);
169 }
170 return promise;
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188 return spliceTo(ch, offset, len, newPromise());
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206 final ChannelPromise promise) {
207 checkPositiveOrZero(len, "len");
208 checkPositiveOrZero(offset, "offset");
209 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211 }
212 checkNotNull(promise, "promise");
213 if (!isOpen()) {
214 promise.tryFailure(new ClosedChannelException());
215 } else {
216 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217 failSpliceIfClosed(promise);
218 }
219 return promise;
220 }
221
222 private void failSpliceIfClosed(ChannelPromise promise) {
223 if (!isOpen()) {
224
225
226 if (!promise.isDone()) {
227 final ClosedChannelException ex = new ClosedChannelException();
228 if (promise.tryFailure(ex)) {
229 eventLoop().execute(new Runnable() {
230 @Override
231 public void run() {
232
233 clearSpliceQueue(ex);
234 }
235 });
236 }
237 }
238 }
239 }
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
256 int readableBytes = buf.readableBytes();
257 if (readableBytes == 0) {
258 in.remove();
259 return 0;
260 }
261
262 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
263 return doWriteBytes(in, buf);
264 } else {
265 ByteBuffer[] nioBuffers = buf.nioBuffers();
266 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
267 config().getMaxBytesPerGatheringWrite());
268 }
269 }
270
271 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
272
273
274
275 if (attempted == written) {
276 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
277 config().setMaxBytesPerGatheringWrite(attempted << 1);
278 }
279 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
280 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
281 }
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
300 final long expectedWrittenBytes = array.size();
301 assert expectedWrittenBytes != 0;
302 final int cnt = array.count();
303 assert cnt != 0;
304
305 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
306 if (localWrittenBytes > 0) {
307 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
308 in.removeBytes(localWrittenBytes);
309 return 1;
310 }
311 return WRITE_STATUS_SNDBUF_FULL;
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332 private int writeBytesMultiple(
333 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
334 long maxBytesPerGatheringWrite) throws IOException {
335 assert expectedWrittenBytes != 0;
336 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
337 expectedWrittenBytes = maxBytesPerGatheringWrite;
338 }
339
340 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
341 if (localWrittenBytes > 0) {
342 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
343 in.removeBytes(localWrittenBytes);
344 return 1;
345 }
346 return WRITE_STATUS_SNDBUF_FULL;
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
364 final long offset = region.transferred();
365 final long regionCount = region.count();
366 if (offset >= regionCount) {
367 in.remove();
368 return 0;
369 }
370
371 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
372 if (flushedAmount > 0) {
373 in.progress(flushedAmount);
374 if (region.transferred() >= regionCount) {
375 in.remove();
376 }
377 return 1;
378 } else if (flushedAmount == 0) {
379 validateFileRegion(region, offset);
380 }
381 return WRITE_STATUS_SNDBUF_FULL;
382 }
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
399 if (region.transferred() >= region.count()) {
400 in.remove();
401 return 0;
402 }
403
404 if (byteChannel == null) {
405 byteChannel = new EpollSocketWritableByteChannel();
406 }
407 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
408 if (flushedAmount > 0) {
409 in.progress(flushedAmount);
410 if (region.transferred() >= region.count()) {
411 in.remove();
412 }
413 return 1;
414 }
415 return WRITE_STATUS_SNDBUF_FULL;
416 }
417
418 @Override
419 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
420 int writeSpinCount = config().getWriteSpinCount();
421 do {
422 final int msgCount = in.size();
423
424 if (msgCount > 1 && in.current() instanceof ByteBuf) {
425 writeSpinCount -= doWriteMultiple(in);
426 } else if (msgCount == 0) {
427
428 clearFlag(Native.EPOLLOUT);
429
430 return;
431 } else {
432 writeSpinCount -= doWriteSingle(in);
433 }
434
435
436
437
438 } while (writeSpinCount > 0);
439
440 if (writeSpinCount == 0) {
441
442
443
444
445 clearFlag(Native.EPOLLOUT);
446
447
448 eventLoop().execute(flushTask);
449 } else {
450
451
452 setFlag(Native.EPOLLOUT);
453 }
454 }
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
471
472 Object msg = in.current();
473 if (msg instanceof ByteBuf) {
474 return writeBytes(in, (ByteBuf) msg);
475 } else if (msg instanceof DefaultFileRegion) {
476 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
477 } else if (msg instanceof FileRegion) {
478 return writeFileRegion(in, (FileRegion) msg);
479 } else if (msg instanceof SpliceOutTask) {
480 if (!((SpliceOutTask) msg).spliceOut()) {
481 return WRITE_STATUS_SNDBUF_FULL;
482 }
483 in.remove();
484 return 1;
485 } else {
486
487 throw new Error();
488 }
489 }
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
506 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
507 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
508 array.maxBytes(maxBytesPerGatheringWrite);
509 in.forEachFlushedMessage(array);
510
511 if (array.count() >= 1) {
512
513 return writeBytesMultiple(in, array);
514 }
515
516 in.removeBytes(0);
517 return 0;
518 }
519
520 @Override
521 protected Object filterOutboundMessage(Object msg) {
522 if (msg instanceof ByteBuf) {
523 ByteBuf buf = (ByteBuf) msg;
524 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
525 }
526
527 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
528 return msg;
529 }
530
531 throw new UnsupportedOperationException(
532 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
533 }
534
535 @UnstableApi
536 @Override
537 protected final void doShutdownOutput() throws Exception {
538 socket.shutdown(false, true);
539 }
540
541 private void shutdownInput0(final ChannelPromise promise) {
542 try {
543 socket.shutdown(true, false);
544 promise.setSuccess();
545 } catch (Throwable cause) {
546 promise.setFailure(cause);
547 }
548 }
549
550 @Override
551 public boolean isOutputShutdown() {
552 return socket.isOutputShutdown();
553 }
554
555 @Override
556 public boolean isInputShutdown() {
557 return socket.isInputShutdown();
558 }
559
560 @Override
561 public boolean isShutdown() {
562 return socket.isShutdown();
563 }
564
565 @Override
566 public ChannelFuture shutdownOutput() {
567 return shutdownOutput(newPromise());
568 }
569
570 @Override
571 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
572 EventLoop loop = eventLoop();
573 if (loop.inEventLoop()) {
574 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
575 } else {
576 loop.execute(new Runnable() {
577 @Override
578 public void run() {
579 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
580 }
581 });
582 }
583
584 return promise;
585 }
586
587 @Override
588 public ChannelFuture shutdownInput() {
589 return shutdownInput(newPromise());
590 }
591
592 @Override
593 public ChannelFuture shutdownInput(final ChannelPromise promise) {
594 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
595 if (closeExecutor != null) {
596 closeExecutor.execute(new Runnable() {
597 @Override
598 public void run() {
599 shutdownInput0(promise);
600 }
601 });
602 } else {
603 EventLoop loop = eventLoop();
604 if (loop.inEventLoop()) {
605 shutdownInput0(promise);
606 } else {
607 loop.execute(new Runnable() {
608 @Override
609 public void run() {
610 shutdownInput0(promise);
611 }
612 });
613 }
614 }
615 return promise;
616 }
617
618 @Override
619 public ChannelFuture shutdown() {
620 return shutdown(newPromise());
621 }
622
623 @Override
624 public ChannelFuture shutdown(final ChannelPromise promise) {
625 ChannelFuture shutdownOutputFuture = shutdownOutput();
626 if (shutdownOutputFuture.isDone()) {
627 shutdownOutputDone(shutdownOutputFuture, promise);
628 } else {
629 shutdownOutputFuture.addListener(new ChannelFutureListener() {
630 @Override
631 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
632 shutdownOutputDone(shutdownOutputFuture, promise);
633 }
634 });
635 }
636 return promise;
637 }
638
639 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
640 ChannelFuture shutdownInputFuture = shutdownInput();
641 if (shutdownInputFuture.isDone()) {
642 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
643 } else {
644 shutdownInputFuture.addListener(new ChannelFutureListener() {
645 @Override
646 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
647 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
648 }
649 });
650 }
651 }
652
653 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
654 ChannelFuture shutdownInputFuture,
655 ChannelPromise promise) {
656 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
657 Throwable shutdownInputCause = shutdownInputFuture.cause();
658 if (shutdownOutputCause != null) {
659 if (shutdownInputCause != null) {
660 logger.debug("Exception suppressed because a previous exception occurred.",
661 shutdownInputCause);
662 }
663 promise.setFailure(shutdownOutputCause);
664 } else if (shutdownInputCause != null) {
665 promise.setFailure(shutdownInputCause);
666 } else {
667 promise.setSuccess();
668 }
669 }
670
671 @Override
672 protected void doClose() throws Exception {
673 try {
674
675 super.doClose();
676 } finally {
677 safeClosePipe(pipeIn);
678 safeClosePipe(pipeOut);
679 clearSpliceQueue(null);
680 }
681 }
682
683 private void clearSpliceQueue(ClosedChannelException exception) {
684 Queue<SpliceInTask> sQueue = spliceQueue;
685 if (sQueue == null) {
686 return;
687 }
688 for (;;) {
689 SpliceInTask task = sQueue.poll();
690 if (task == null) {
691 break;
692 }
693 if (exception == null) {
694 exception = new ClosedChannelException();
695 }
696 task.promise.tryFailure(exception);
697 }
698 }
699
700 private static void safeClosePipe(FileDescriptor fd) {
701 if (fd != null) {
702 try {
703 fd.close();
704 } catch (IOException e) {
705 logger.warn("Error while closing a pipe", e);
706 }
707 }
708 }
709
710 class EpollStreamUnsafe extends AbstractEpollUnsafe {
711
712 @Override
713 protected Executor prepareToClose() {
714 return super.prepareToClose();
715 }
716
717 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
718 EpollRecvByteAllocatorHandle allocHandle) {
719 if (byteBuf != null) {
720 if (byteBuf.isReadable()) {
721 readPending = false;
722 pipeline.fireChannelRead(byteBuf);
723 } else {
724 byteBuf.release();
725 }
726 }
727 allocHandle.readComplete();
728 pipeline.fireChannelReadComplete();
729 pipeline.fireExceptionCaught(cause);
730
731
732
733 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
734 shutdownInput(false);
735 }
736 }
737
738 @Override
739 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
740 return new EpollRecvByteAllocatorStreamingHandle(handle);
741 }
742
743 @Override
744 void epollInReady() {
745 final ChannelConfig config = config();
746 if (shouldBreakEpollInReady(config)) {
747 clearEpollIn0();
748 return;
749 }
750 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
751 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
752
753 final ChannelPipeline pipeline = pipeline();
754 final ByteBufAllocator allocator = config.getAllocator();
755 allocHandle.reset(config);
756 epollInBefore();
757
758 ByteBuf byteBuf = null;
759 boolean close = false;
760 Queue<SpliceInTask> sQueue = null;
761 try {
762 do {
763 if (sQueue != null || (sQueue = spliceQueue) != null) {
764 SpliceInTask spliceTask = sQueue.peek();
765 if (spliceTask != null) {
766 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
767
768 if (allocHandle.isReceivedRdHup()) {
769 shutdownInput(true);
770 }
771 if (spliceInResult) {
772
773
774 if (isActive()) {
775 sQueue.remove();
776 }
777 continue;
778 } else {
779 break;
780 }
781 }
782 }
783
784
785
786 byteBuf = allocHandle.allocate(allocator);
787 allocHandle.lastBytesRead(doReadBytes(byteBuf));
788 if (allocHandle.lastBytesRead() <= 0) {
789
790 byteBuf.release();
791 byteBuf = null;
792 close = allocHandle.lastBytesRead() < 0;
793 if (close) {
794
795 readPending = false;
796 }
797 break;
798 }
799 allocHandle.incMessagesRead(1);
800 readPending = false;
801 pipeline.fireChannelRead(byteBuf);
802 byteBuf = null;
803
804 if (shouldBreakEpollInReady(config)) {
805
806
807
808
809
810
811
812
813
814
815
816 break;
817 }
818 } while (allocHandle.continueReading());
819
820 allocHandle.readComplete();
821 pipeline.fireChannelReadComplete();
822
823 if (close) {
824 shutdownInput(false);
825 }
826 } catch (Throwable t) {
827 handleReadException(pipeline, byteBuf, t, close, allocHandle);
828 } finally {
829 if (sQueue == null) {
830 epollInFinally(config);
831 } else {
832 if (!config.isAutoRead()) {
833 clearEpollIn();
834 }
835 }
836 }
837 }
838 }
839
840 private void addToSpliceQueue(final SpliceInTask task) {
841 Queue<SpliceInTask> sQueue = spliceQueue;
842 if (sQueue == null) {
843 synchronized (this) {
844 sQueue = spliceQueue;
845 if (sQueue == null) {
846 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
847 }
848 }
849 }
850 sQueue.add(task);
851 }
852
853 protected abstract class SpliceInTask {
854 final ChannelPromise promise;
855 int len;
856
857 protected SpliceInTask(int len, ChannelPromise promise) {
858 this.promise = promise;
859 this.len = len;
860 }
861
862 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
863
864 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
865
866 int length = Math.min(handle.guess(), len);
867 int splicedIn = 0;
868 for (;;) {
869
870 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
871 handle.lastBytesRead(localSplicedIn);
872 if (localSplicedIn == 0) {
873 break;
874 }
875 splicedIn += localSplicedIn;
876 length -= localSplicedIn;
877 }
878
879 return splicedIn;
880 }
881 }
882
883
884 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
885 private final AbstractEpollStreamChannel ch;
886
887 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
888 super(len, promise);
889 this.ch = ch;
890 }
891
892 @Override
893 public void operationComplete(ChannelFuture future) throws Exception {
894 if (!future.isSuccess()) {
895
896 promise.tryFailure(future.cause());
897 }
898 }
899
900 @Override
901 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
902 assert ch.eventLoop().inEventLoop();
903 if (len == 0) {
904
905 promise.trySuccess();
906 return true;
907 }
908 try {
909
910
911
912 FileDescriptor pipeOut = ch.pipeOut;
913 if (pipeOut == null) {
914
915 FileDescriptor[] pipe = pipe();
916 ch.pipeIn = pipe[0];
917 pipeOut = ch.pipeOut = pipe[1];
918 }
919
920 int splicedIn = spliceIn(pipeOut, handle);
921 if (splicedIn > 0) {
922
923 if (len != Integer.MAX_VALUE) {
924 len -= splicedIn;
925 }
926
927
928
929 final ChannelPromise splicePromise;
930 if (len == 0) {
931 splicePromise = promise;
932 } else {
933 splicePromise = ch.newPromise().addListener(this);
934 }
935
936 boolean autoRead = config().isAutoRead();
937
938
939
940 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
941 ch.unsafe().flush();
942 if (autoRead && !splicePromise.isDone()) {
943
944
945
946
947 config().setAutoRead(false);
948 }
949 }
950
951 return len == 0;
952 } catch (Throwable cause) {
953
954 promise.tryFailure(cause);
955 return true;
956 }
957 }
958 }
959
960 private final class SpliceOutTask {
961 private final AbstractEpollStreamChannel ch;
962 private final boolean autoRead;
963 private int len;
964
965 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
966 this.ch = ch;
967 this.len = len;
968 this.autoRead = autoRead;
969 }
970
971 public boolean spliceOut() throws Exception {
972 assert ch.eventLoop().inEventLoop();
973 try {
974 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
975 len -= splicedOut;
976 if (len == 0) {
977 if (autoRead) {
978
979 config().setAutoRead(true);
980 }
981 return true;
982 }
983 return false;
984 } catch (IOException e) {
985 if (autoRead) {
986
987 config().setAutoRead(true);
988 }
989 throw e;
990 }
991 }
992 }
993
994 private final class SpliceFdTask extends SpliceInTask {
995 private final FileDescriptor fd;
996 private final ChannelPromise promise;
997 private int offset;
998
999 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1000 super(len, promise);
1001 this.fd = fd;
1002 this.promise = promise;
1003 this.offset = offset;
1004 }
1005
1006 @Override
1007 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1008 assert eventLoop().inEventLoop();
1009 if (len == 0) {
1010
1011 promise.trySuccess();
1012 return true;
1013 }
1014
1015 try {
1016 FileDescriptor[] pipe = pipe();
1017 FileDescriptor pipeIn = pipe[0];
1018 FileDescriptor pipeOut = pipe[1];
1019 try {
1020 int splicedIn = spliceIn(pipeOut, handle);
1021 if (splicedIn > 0) {
1022
1023 if (len != Integer.MAX_VALUE) {
1024 len -= splicedIn;
1025 }
1026 do {
1027 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1028 offset += splicedOut;
1029 splicedIn -= splicedOut;
1030 } while (splicedIn > 0);
1031 if (len == 0) {
1032
1033 promise.trySuccess();
1034 return true;
1035 }
1036 }
1037 return false;
1038 } finally {
1039 safeClosePipe(pipeIn);
1040 safeClosePipe(pipeOut);
1041 }
1042 } catch (Throwable cause) {
1043
1044 promise.tryFailure(cause);
1045 return true;
1046 }
1047 }
1048 }
1049
1050 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1051 EpollSocketWritableByteChannel() {
1052 super(socket);
1053 assert fd == socket;
1054 }
1055
1056 @Override
1057 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1058 return socket.send(buf, pos, limit);
1059 }
1060
1061 @Override
1062 protected ByteBufAllocator alloc() {
1063 return AbstractEpollStreamChannel.this.alloc();
1064 }
1065 }
1066 }