查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.internal.ChannelUtils;
33  import io.netty.channel.socket.DuplexChannel;
34  import io.netty.channel.unix.FileDescriptor;
35  import io.netty.channel.unix.IovArray;
36  import io.netty.channel.unix.SocketWritableByteChannel;
37  import io.netty.channel.unix.UnixChannelUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.StringUtil;
40  import io.netty.util.internal.UnstableApi;
41  import io.netty.util.internal.logging.InternalLogger;
42  import io.netty.util.internal.logging.InternalLoggerFactory;
43  
44  import java.io.IOException;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.ClosedChannelException;
48  import java.nio.channels.WritableByteChannel;
49  import java.util.Queue;
50  import java.util.concurrent.Executor;
51  
52  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54  import static io.netty.channel.unix.FileDescriptor.pipe;
55  import static io.netty.util.internal.ObjectUtil.checkNotNull;
56  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57  
58  public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60      private static final String EXPECTED_TYPES =
61              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64  
65      private final Runnable flushTask = new Runnable() {
66          @Override
67          public void run() {
68              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
69              // meantime.
70              ((AbstractEpollUnsafe) unsafe()).flush0();
71          }
72      };
73  
74      // Lazy init these if we need to splice(...)
75      private volatile Queue<SpliceInTask> spliceQueue;
76      private FileDescriptor pipeIn;
77      private FileDescriptor pipeOut;
78  
79      private WritableByteChannel byteChannel;
80  
81      protected AbstractEpollStreamChannel(Channel parent, int fd) {
82          this(parent, new LinuxSocket(fd));
83      }
84  
85      protected AbstractEpollStreamChannel(int fd) {
86          this(new LinuxSocket(fd));
87      }
88  
89      AbstractEpollStreamChannel(LinuxSocket fd) {
90          this(fd, isSoErrorZero(fd));
91      }
92  
93      AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94          super(parent, fd, true);
95          // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
96          flags |= Native.EPOLLRDHUP;
97      }
98  
99      protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100         super(parent, fd, remote);
101         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
102         flags |= Native.EPOLLRDHUP;
103     }
104 
105     protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106         super(null, fd, active);
107         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
108         flags |= Native.EPOLLRDHUP;
109     }
110 
111     @Override
112     protected AbstractEpollUnsafe newUnsafe() {
113         return new EpollStreamUnsafe();
114     }
115 
116     @Override
117     public ChannelMetadata metadata() {
118         return METADATA;
119     }
120 
121     /**
122      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
123      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
124      * splice until the {@link ChannelFuture} was canceled or it was failed.
125      *
126      * Please note:
127      * <ul>
128      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
129      *   {@link IllegalArgumentException} is thrown. </li>
130      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
131      *   target {@link AbstractEpollStreamChannel}</li>
132      * </ul>
133      *
134      */
135     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136         return spliceTo(ch, len, newPromise());
137     }
138 
139     /**
140      * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
141      * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
142      * splice until the {@link ChannelFuture} was canceled or it was failed.
143      *
144      * Please note:
145      * <ul>
146      *   <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
147      *   {@link IllegalArgumentException} is thrown. </li>
148      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
149      *   target {@link AbstractEpollStreamChannel}</li>
150      * </ul>
151      *
152      */
153     public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154                                         final ChannelPromise promise) {
155         if (ch.eventLoop() != eventLoop()) {
156             throw new IllegalArgumentException("EventLoops are not the same.");
157         }
158         checkPositiveOrZero(len, "len");
159         if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160                 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162         }
163         checkNotNull(promise, "promise");
164         if (!isOpen()) {
165             promise.tryFailure(new ClosedChannelException());
166         } else {
167             addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168             failSpliceIfClosed(promise);
169         }
170         return promise;
171     }
172 
173     /**
174      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
175      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
176      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
177      * {@link ChannelFuture} was canceled or it was failed.
178      *
179      * Please note:
180      * <ul>
181      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
182      *   {@link AbstractEpollStreamChannel}</li>
183      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
184      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
185      * </ul>
186      */
187     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188         return spliceTo(ch, offset, len, newPromise());
189     }
190 
191     /**
192      * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
193      * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
194      * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
195      * {@link ChannelFuture} was canceled or it was failed.
196      *
197      * Please note:
198      * <ul>
199      *   <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
200      *   {@link AbstractEpollStreamChannel}</li>
201      *   <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
202      *   <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
203      * </ul>
204      */
205     public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206                                         final ChannelPromise promise) {
207         checkPositiveOrZero(len, "len");
208         checkPositiveOrZero(offset, "offset");
209         if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210             throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211         }
212         checkNotNull(promise, "promise");
213         if (!isOpen()) {
214             promise.tryFailure(new ClosedChannelException());
215         } else {
216             addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217             failSpliceIfClosed(promise);
218         }
219         return promise;
220     }
221 
222     private void failSpliceIfClosed(ChannelPromise promise) {
223         if (!isOpen()) {
224             // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
225             // cases where a future may not be notified otherwise.
226             if (!promise.isDone()) {
227                 final ClosedChannelException ex = new ClosedChannelException();
228                 if (promise.tryFailure(ex)) {
229                     eventLoop().execute(new Runnable() {
230                         @Override
231                         public void run() {
232                             // Call this via the EventLoop as it is a MPSC queue.
233                             clearSpliceQueue(ex);
234                         }
235                     });
236                 }
237             }
238         }
239     }
240 
241     /**
242      * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
243      * @param in the collection which contains objects to write.
244      * @param buf the {@link ByteBuf} from which the bytes should be written
245      * @return The value that should be decremented from the write quantum which starts at
246      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
247      * <ul>
248      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
249      *     is encountered</li>
250      *     <li>1 - if a single call to write data was made to the OS</li>
251      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
252      *     no data was accepted</li>
253      * </ul>
254      */
255     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
256         int readableBytes = buf.readableBytes();
257         if (readableBytes == 0) {
258             in.remove();
259             return 0;
260         }
261 
262         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
263             return doWriteBytes(in, buf);
264         } else {
265             ByteBuffer[] nioBuffers = buf.nioBuffers();
266             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
267                     config().getMaxBytesPerGatheringWrite());
268         }
269     }
270 
271     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
272         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
273         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
274         // make a best effort to adjust as OS behavior changes.
275         if (attempted == written) {
276             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
277                 config().setMaxBytesPerGatheringWrite(attempted << 1);
278             }
279         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
280             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
281         }
282     }
283 
284     /**
285      * Write multiple bytes via {@link IovArray}.
286      * @param in the collection which contains objects to write.
287      * @param array The array which contains the content to write.
288      * @return The value that should be decremented from the write quantum which starts at
289      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
290      * <ul>
291      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
292      *     is encountered</li>
293      *     <li>1 - if a single call to write data was made to the OS</li>
294      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
295      *     no data was accepted</li>
296      * </ul>
297      * @throws IOException If an I/O exception occurs during write.
298      */
299     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
300         final long expectedWrittenBytes = array.size();
301         assert expectedWrittenBytes != 0;
302         final int cnt = array.count();
303         assert cnt != 0;
304 
305         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
306         if (localWrittenBytes > 0) {
307             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
308             in.removeBytes(localWrittenBytes);
309             return 1;
310         }
311         return WRITE_STATUS_SNDBUF_FULL;
312     }
313 
314     /**
315      * Write multiple bytes via {@link ByteBuffer} array.
316      * @param in the collection which contains objects to write.
317      * @param nioBuffers The buffers to write.
318      * @param nioBufferCnt The number of buffers to write.
319      * @param expectedWrittenBytes The number of bytes we expect to write.
320      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
321      * @return The value that should be decremented from the write quantum which starts at
322      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
323      * <ul>
324      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
325      *     is encountered</li>
326      *     <li>1 - if a single call to write data was made to the OS</li>
327      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
328      *     no data was accepted</li>
329      * </ul>
330      * @throws IOException If an I/O exception occurs during write.
331      */
332     private int writeBytesMultiple(
333             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
334             long maxBytesPerGatheringWrite) throws IOException {
335         assert expectedWrittenBytes != 0;
336         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
337             expectedWrittenBytes = maxBytesPerGatheringWrite;
338         }
339 
340         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
341         if (localWrittenBytes > 0) {
342             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
343             in.removeBytes(localWrittenBytes);
344             return 1;
345         }
346         return WRITE_STATUS_SNDBUF_FULL;
347     }
348 
349     /**
350      * Write a {@link DefaultFileRegion}
351      * @param in the collection which contains objects to write.
352      * @param region the {@link DefaultFileRegion} from which the bytes should be written
353      * @return The value that should be decremented from the write quantum which starts at
354      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
355      * <ul>
356      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
357      *     is encountered</li>
358      *     <li>1 - if a single call to write data was made to the OS</li>
359      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
360      *     no data was accepted</li>
361      * </ul>
362      */
363     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
364         final long offset = region.transferred();
365         final long regionCount = region.count();
366         if (offset >= regionCount) {
367             in.remove();
368             return 0;
369         }
370 
371         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
372         if (flushedAmount > 0) {
373             in.progress(flushedAmount);
374             if (region.transferred() >= regionCount) {
375                 in.remove();
376             }
377             return 1;
378         } else if (flushedAmount == 0) {
379             validateFileRegion(region, offset);
380         }
381         return WRITE_STATUS_SNDBUF_FULL;
382     }
383 
384     /**
385      * Write a {@link FileRegion}
386      * @param in the collection which contains objects to write.
387      * @param region the {@link FileRegion} from which the bytes should be written
388      * @return The value that should be decremented from the write quantum which starts at
389      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
390      * <ul>
391      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
392      *     is encountered</li>
393      *     <li>1 - if a single call to write data was made to the OS</li>
394      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
395      *     no data was accepted</li>
396      * </ul>
397      */
398     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
399         if (region.transferred() >= region.count()) {
400             in.remove();
401             return 0;
402         }
403 
404         if (byteChannel == null) {
405             byteChannel = new EpollSocketWritableByteChannel();
406         }
407         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
408         if (flushedAmount > 0) {
409             in.progress(flushedAmount);
410             if (region.transferred() >= region.count()) {
411                 in.remove();
412             }
413             return 1;
414         }
415         return WRITE_STATUS_SNDBUF_FULL;
416     }
417 
418     @Override
419     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
420         int writeSpinCount = config().getWriteSpinCount();
421         do {
422             final int msgCount = in.size();
423             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
424             if (msgCount > 1 && in.current() instanceof ByteBuf) {
425                 writeSpinCount -= doWriteMultiple(in);
426             } else if (msgCount == 0) {
427                 // Wrote all messages.
428                 clearFlag(Native.EPOLLOUT);
429                 // Return here so we not set the EPOLLOUT flag.
430                 return;
431             } else {  // msgCount == 1
432                 writeSpinCount -= doWriteSingle(in);
433             }
434 
435             // We do not break the loop here even if the outbound buffer was flushed completely,
436             // because a user might have triggered another write and flush when we notify his or her
437             // listeners.
438         } while (writeSpinCount > 0);
439 
440         if (writeSpinCount == 0) {
441             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
442             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
443             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
444             // and set the EPOLLOUT if necessary.
445             clearFlag(Native.EPOLLOUT);
446 
447             // We used our writeSpin quantum, and should try to write again later.
448             eventLoop().execute(flushTask);
449         } else {
450             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
451             // when it can accept more data.
452             setFlag(Native.EPOLLOUT);
453         }
454     }
455 
456     /**
457      * Attempt to write a single object.
458      * @param in the collection which contains objects to write.
459      * @return The value that should be decremented from the write quantum which starts at
460      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
461      * <ul>
462      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
463      *     is encountered</li>
464      *     <li>1 - if a single call to write data was made to the OS</li>
465      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
466      *     no data was accepted</li>
467      * </ul>
468      * @throws Exception If an I/O error occurs.
469      */
470     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
471         // The outbound buffer contains only one message or it contains a file region.
472         Object msg = in.current();
473         if (msg instanceof ByteBuf) {
474             return writeBytes(in, (ByteBuf) msg);
475         } else if (msg instanceof DefaultFileRegion) {
476             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
477         } else if (msg instanceof FileRegion) {
478             return writeFileRegion(in, (FileRegion) msg);
479         } else if (msg instanceof SpliceOutTask) {
480             if (!((SpliceOutTask) msg).spliceOut()) {
481                 return WRITE_STATUS_SNDBUF_FULL;
482             }
483             in.remove();
484             return 1;
485         } else {
486             // Should never reach here.
487             throw new Error();
488         }
489     }
490 
491     /**
492      * Attempt to write multiple {@link ByteBuf} objects.
493      * @param in the collection which contains objects to write.
494      * @return The value that should be decremented from the write quantum which starts at
495      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
496      * <ul>
497      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
498      *     is encountered</li>
499      *     <li>1 - if a single call to write data was made to the OS</li>
500      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
501      *     no data was accepted</li>
502      * </ul>
503      * @throws Exception If an I/O error occurs.
504      */
505     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
506         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
507         IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
508         array.maxBytes(maxBytesPerGatheringWrite);
509         in.forEachFlushedMessage(array);
510 
511         if (array.count() >= 1) {
512             // TODO: Handle the case where cnt == 1 specially.
513             return writeBytesMultiple(in, array);
514         }
515         // cnt == 0, which means the outbound buffer contained empty buffers only.
516         in.removeBytes(0);
517         return 0;
518     }
519 
520     @Override
521     protected Object filterOutboundMessage(Object msg) {
522         if (msg instanceof ByteBuf) {
523             ByteBuf buf = (ByteBuf) msg;
524             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
525         }
526 
527         if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
528             return msg;
529         }
530 
531         throw new UnsupportedOperationException(
532                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
533     }
534 
535     @UnstableApi
536     @Override
537     protected final void doShutdownOutput() throws Exception {
538         socket.shutdown(false, true);
539     }
540 
541     private void shutdownInput0(final ChannelPromise promise) {
542         try {
543             socket.shutdown(true, false);
544             promise.setSuccess();
545         } catch (Throwable cause) {
546             promise.setFailure(cause);
547         }
548     }
549 
550     @Override
551     public boolean isOutputShutdown() {
552         return socket.isOutputShutdown();
553     }
554 
555     @Override
556     public boolean isInputShutdown() {
557         return socket.isInputShutdown();
558     }
559 
560     @Override
561     public boolean isShutdown() {
562         return socket.isShutdown();
563     }
564 
565     @Override
566     public ChannelFuture shutdownOutput() {
567         return shutdownOutput(newPromise());
568     }
569 
570     @Override
571     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
572         EventLoop loop = eventLoop();
573         if (loop.inEventLoop()) {
574             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
575         } else {
576             loop.execute(new Runnable() {
577                 @Override
578                 public void run() {
579                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
580                 }
581             });
582         }
583 
584         return promise;
585     }
586 
587     @Override
588     public ChannelFuture shutdownInput() {
589         return shutdownInput(newPromise());
590     }
591 
592     @Override
593     public ChannelFuture shutdownInput(final ChannelPromise promise) {
594         Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
595         if (closeExecutor != null) {
596             closeExecutor.execute(new Runnable() {
597                 @Override
598                 public void run() {
599                     shutdownInput0(promise);
600                 }
601             });
602         } else {
603             EventLoop loop = eventLoop();
604             if (loop.inEventLoop()) {
605                 shutdownInput0(promise);
606             } else {
607                 loop.execute(new Runnable() {
608                     @Override
609                     public void run() {
610                         shutdownInput0(promise);
611                     }
612                 });
613             }
614         }
615         return promise;
616     }
617 
618     @Override
619     public ChannelFuture shutdown() {
620         return shutdown(newPromise());
621     }
622 
623     @Override
624     public ChannelFuture shutdown(final ChannelPromise promise) {
625         ChannelFuture shutdownOutputFuture = shutdownOutput();
626         if (shutdownOutputFuture.isDone()) {
627             shutdownOutputDone(shutdownOutputFuture, promise);
628         } else {
629             shutdownOutputFuture.addListener(new ChannelFutureListener() {
630                 @Override
631                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
632                     shutdownOutputDone(shutdownOutputFuture, promise);
633                 }
634             });
635         }
636         return promise;
637     }
638 
639     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
640         ChannelFuture shutdownInputFuture = shutdownInput();
641         if (shutdownInputFuture.isDone()) {
642             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
643         } else {
644             shutdownInputFuture.addListener(new ChannelFutureListener() {
645                 @Override
646                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
647                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
648                 }
649             });
650         }
651     }
652 
653     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
654                               ChannelFuture shutdownInputFuture,
655                               ChannelPromise promise) {
656         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
657         Throwable shutdownInputCause = shutdownInputFuture.cause();
658         if (shutdownOutputCause != null) {
659             if (shutdownInputCause != null) {
660                 logger.debug("Exception suppressed because a previous exception occurred.",
661                         shutdownInputCause);
662             }
663             promise.setFailure(shutdownOutputCause);
664         } else if (shutdownInputCause != null) {
665             promise.setFailure(shutdownInputCause);
666         } else {
667             promise.setSuccess();
668         }
669     }
670 
671     @Override
672     protected void doClose() throws Exception {
673         try {
674             // Calling super.doClose() first so spliceTo(...) will fail on next call.
675             super.doClose();
676         } finally {
677             safeClosePipe(pipeIn);
678             safeClosePipe(pipeOut);
679             clearSpliceQueue(null);
680         }
681     }
682 
683     private void clearSpliceQueue(ClosedChannelException exception) {
684         Queue<SpliceInTask> sQueue = spliceQueue;
685         if (sQueue == null) {
686             return;
687         }
688         for (;;) {
689             SpliceInTask task = sQueue.poll();
690             if (task == null) {
691                 break;
692             }
693             if (exception == null) {
694                 exception = new ClosedChannelException();
695             }
696             task.promise.tryFailure(exception);
697         }
698     }
699 
700     private static void safeClosePipe(FileDescriptor fd) {
701         if (fd != null) {
702             try {
703                 fd.close();
704             } catch (IOException e) {
705                 logger.warn("Error while closing a pipe", e);
706             }
707         }
708     }
709 
710     class EpollStreamUnsafe extends AbstractEpollUnsafe {
711         // Overridden here just to be able to access this method from AbstractEpollStreamChannel
712         @Override
713         protected Executor prepareToClose() {
714             return super.prepareToClose();
715         }
716 
717         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
718                 EpollRecvByteAllocatorHandle allocHandle) {
719             if (byteBuf != null) {
720                 if (byteBuf.isReadable()) {
721                     readPending = false;
722                     pipeline.fireChannelRead(byteBuf);
723                 } else {
724                     byteBuf.release();
725                 }
726             }
727             allocHandle.readComplete();
728             pipeline.fireChannelReadComplete();
729             pipeline.fireExceptionCaught(cause);
730 
731             // If oom will close the read event, release connection.
732             // See https://github.com/netty/netty/issues/10434
733             if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
734                 shutdownInput(false);
735             }
736         }
737 
738         @Override
739         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
740             return new EpollRecvByteAllocatorStreamingHandle(handle);
741         }
742 
743         @Override
744         void epollInReady() {
745             final ChannelConfig config = config();
746             if (shouldBreakEpollInReady(config)) {
747                 clearEpollIn0();
748                 return;
749             }
750             final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
751             allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
752 
753             final ChannelPipeline pipeline = pipeline();
754             final ByteBufAllocator allocator = config.getAllocator();
755             allocHandle.reset(config);
756             epollInBefore();
757 
758             ByteBuf byteBuf = null;
759             boolean close = false;
760             Queue<SpliceInTask> sQueue = null;
761             try {
762                 do {
763                     if (sQueue != null || (sQueue = spliceQueue) != null) {
764                         SpliceInTask spliceTask = sQueue.peek();
765                         if (spliceTask != null) {
766                             boolean spliceInResult = spliceTask.spliceIn(allocHandle);
767 
768                             if (allocHandle.isReceivedRdHup()) {
769                                 shutdownInput(true);
770                             }
771                             if (spliceInResult) {
772                                 // We need to check if it is still active as if not we removed all SpliceTasks in
773                                 // doClose(...)
774                                 if (isActive()) {
775                                     sQueue.remove();
776                                 }
777                                 continue;
778                             } else {
779                                 break;
780                             }
781                         }
782                     }
783 
784                     // we use a direct buffer here as the native implementations only be able
785                     // to handle direct buffers.
786                     byteBuf = allocHandle.allocate(allocator);
787                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
788                     if (allocHandle.lastBytesRead() <= 0) {
789                         // nothing was read, release the buffer.
790                         byteBuf.release();
791                         byteBuf = null;
792                         close = allocHandle.lastBytesRead() < 0;
793                         if (close) {
794                             // There is nothing left to read as we received an EOF.
795                             readPending = false;
796                         }
797                         break;
798                     }
799                     allocHandle.incMessagesRead(1);
800                     readPending = false;
801                     pipeline.fireChannelRead(byteBuf);
802                     byteBuf = null;
803 
804                     if (shouldBreakEpollInReady(config)) {
805                         // We need to do this for two reasons:
806                         //
807                         // - If the input was shutdown in between (which may be the case when the user did it in the
808                         //   fireChannelRead(...) method we should not try to read again to not produce any
809                         //   miss-leading exceptions.
810                         //
811                         // - If the user closes the channel we need to ensure we not try to read from it again as
812                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
813                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
814                         //   reading data from a filedescriptor that belongs to another socket then the socket that
815                         //   was "wrapped" by this Channel implementation.
816                         break;
817                     }
818                 } while (allocHandle.continueReading());
819 
820                 allocHandle.readComplete();
821                 pipeline.fireChannelReadComplete();
822 
823                 if (close) {
824                     shutdownInput(false);
825                 }
826             } catch (Throwable t) {
827                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
828             } finally {
829                 if (sQueue == null) {
830                     epollInFinally(config);
831                 } else {
832                     if (!config.isAutoRead()) {
833                         clearEpollIn();
834                     }
835                 }
836             }
837         }
838     }
839 
840     private void addToSpliceQueue(final SpliceInTask task) {
841         Queue<SpliceInTask> sQueue = spliceQueue;
842         if (sQueue == null) {
843             synchronized (this) {
844                 sQueue = spliceQueue;
845                 if (sQueue == null) {
846                     spliceQueue = sQueue = PlatformDependent.newMpscQueue();
847                 }
848             }
849         }
850         sQueue.add(task);
851     }
852 
853     protected abstract class SpliceInTask {
854         final ChannelPromise promise;
855         int len;
856 
857         protected SpliceInTask(int len, ChannelPromise promise) {
858             this.promise = promise;
859             this.len = len;
860         }
861 
862         abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
863 
864         protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
865             // calculate the maximum amount of data we are allowed to splice
866             int length = Math.min(handle.guess(), len);
867             int splicedIn = 0;
868             for (;;) {
869                 // Splicing until there is nothing left to splice.
870                 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
871                 handle.lastBytesRead(localSplicedIn);
872                 if (localSplicedIn == 0) {
873                     break;
874                 }
875                 splicedIn += localSplicedIn;
876                 length -= localSplicedIn;
877             }
878 
879             return splicedIn;
880         }
881     }
882 
883     // Let it directly implement channelFutureListener as well to reduce object creation.
884     private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
885         private final AbstractEpollStreamChannel ch;
886 
887         SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
888             super(len, promise);
889             this.ch = ch;
890         }
891 
892         @Override
893         public void operationComplete(ChannelFuture future) throws Exception {
894             if (!future.isSuccess()) {
895                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
896                 promise.tryFailure(future.cause());
897             }
898         }
899 
900         @Override
901         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
902             assert ch.eventLoop().inEventLoop();
903             if (len == 0) {
904                 // Use trySuccess() as the promise might already be closed by spliceTo(...)
905                 promise.trySuccess();
906                 return true;
907             }
908             try {
909                 // We create the pipe on the target channel as this will allow us to just handle pending writes
910                 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
911                 // on multiple Channels pointing to one target Channel.
912                 FileDescriptor pipeOut = ch.pipeOut;
913                 if (pipeOut == null) {
914                     // Create a new pipe as non was created before.
915                     FileDescriptor[] pipe = pipe();
916                     ch.pipeIn = pipe[0];
917                     pipeOut = ch.pipeOut = pipe[1];
918                 }
919 
920                 int splicedIn = spliceIn(pipeOut, handle);
921                 if (splicedIn > 0) {
922                     // Integer.MAX_VALUE is a special value which will result in splice forever.
923                     if (len != Integer.MAX_VALUE) {
924                         len -= splicedIn;
925                     }
926 
927                     // Depending on if we are done with splicing inbound data we set the right promise for the
928                     // outbound splicing.
929                     final ChannelPromise splicePromise;
930                     if (len == 0) {
931                         splicePromise = promise;
932                     } else {
933                         splicePromise = ch.newPromise().addListener(this);
934                     }
935 
936                     boolean autoRead = config().isAutoRead();
937 
938                     // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
939                     // case.
940                     ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
941                     ch.unsafe().flush();
942                     if (autoRead && !splicePromise.isDone()) {
943                         // Write was not done which means the target channel was not writable. In this case we need to
944                         // disable reading until we are done with splicing to the target channel because:
945                         //
946                         // - The user may want to to trigger another splice operation once the splicing was complete.
947                         config().setAutoRead(false);
948                     }
949                 }
950 
951                 return len == 0;
952             } catch (Throwable cause) {
953                 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
954                 promise.tryFailure(cause);
955                 return true;
956             }
957         }
958     }
959 
960     private final class SpliceOutTask {
961         private final AbstractEpollStreamChannel ch;
962         private final boolean autoRead;
963         private int len;
964 
965         SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
966             this.ch = ch;
967             this.len = len;
968             this.autoRead = autoRead;
969         }
970 
971         public boolean spliceOut() throws Exception {
972             assert ch.eventLoop().inEventLoop();
973             try {
974                 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
975                 len -= splicedOut;
976                 if (len == 0) {
977                     if (autoRead) {
978                         // AutoRead was used and we spliced everything so start reading again
979                         config().setAutoRead(true);
980                     }
981                     return true;
982                 }
983                 return false;
984             } catch (IOException e) {
985                 if (autoRead) {
986                     // AutoRead was used and we spliced everything so start reading again
987                     config().setAutoRead(true);
988                 }
989                 throw e;
990             }
991         }
992     }
993 
994     private final class SpliceFdTask extends SpliceInTask {
995         private final FileDescriptor fd;
996         private final ChannelPromise promise;
997         private int offset;
998 
999         SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1000             super(len, promise);
1001             this.fd = fd;
1002             this.promise = promise;
1003             this.offset = offset;
1004         }
1005 
1006         @Override
1007         public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1008             assert eventLoop().inEventLoop();
1009             if (len == 0) {
1010                 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1011                 promise.trySuccess();
1012                 return true;
1013             }
1014 
1015             try {
1016                 FileDescriptor[] pipe = pipe();
1017                 FileDescriptor pipeIn = pipe[0];
1018                 FileDescriptor pipeOut = pipe[1];
1019                 try {
1020                     int splicedIn = spliceIn(pipeOut, handle);
1021                     if (splicedIn > 0) {
1022                         // Integer.MAX_VALUE is a special value which will result in splice forever.
1023                         if (len != Integer.MAX_VALUE) {
1024                             len -= splicedIn;
1025                         }
1026                         do {
1027                             int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1028                             offset += splicedOut;
1029                             splicedIn -= splicedOut;
1030                         } while (splicedIn > 0);
1031                         if (len == 0) {
1032                             // Use trySuccess() as the promise might already be failed by spliceTo(...)
1033                             promise.trySuccess();
1034                             return true;
1035                         }
1036                     }
1037                     return false;
1038                 } finally {
1039                     safeClosePipe(pipeIn);
1040                     safeClosePipe(pipeOut);
1041                 }
1042             } catch (Throwable cause) {
1043                 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1044                 promise.tryFailure(cause);
1045                 return true;
1046             }
1047         }
1048     }
1049 
1050     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1051         EpollSocketWritableByteChannel() {
1052             super(socket);
1053             assert fd == socket;
1054         }
1055 
1056         @Override
1057         protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1058             return socket.send(buf, pos, limit);
1059         }
1060 
1061         @Override
1062         protected ByteBufAllocator alloc() {
1063             return AbstractEpollStreamChannel.this.alloc();
1064         }
1065     }
1066 }