1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.internal.ChannelUtils;
33 import io.netty.channel.socket.DuplexChannel;
34 import io.netty.channel.unix.FileDescriptor;
35 import io.netty.channel.unix.IovArray;
36 import io.netty.channel.unix.SocketWritableByteChannel;
37 import io.netty.channel.unix.UnixChannelUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.StringUtil;
40 import io.netty.util.internal.UnstableApi;
41 import io.netty.util.internal.logging.InternalLogger;
42 import io.netty.util.internal.logging.InternalLoggerFactory;
43
44 import java.io.IOException;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.ClosedChannelException;
48 import java.nio.channels.WritableByteChannel;
49 import java.util.Queue;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54 import static io.netty.channel.unix.FileDescriptor.pipe;
55 import static io.netty.util.internal.ObjectUtil.checkNotNull;
56 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
57
58 public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel implements DuplexChannel {
59 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
60 private static final String EXPECTED_TYPES =
61 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEpollStreamChannel.class);
64
65 private final Runnable flushTask = new Runnable() {
66 @Override
67 public void run() {
68 // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
69 // meantime.
70 ((AbstractEpollUnsafe) unsafe()).flush0();
71 }
72 };
73
74 // Lazy init these if we need to splice(...)
75 private volatile Queue<SpliceInTask> spliceQueue;
76 private FileDescriptor pipeIn;
77 private FileDescriptor pipeOut;
78
79 private WritableByteChannel byteChannel;
80
81 protected AbstractEpollStreamChannel(Channel parent, int fd) {
82 this(parent, new LinuxSocket(fd));
83 }
84
85 protected AbstractEpollStreamChannel(int fd) {
86 this(new LinuxSocket(fd));
87 }
88
89 AbstractEpollStreamChannel(LinuxSocket fd) {
90 this(fd, isSoErrorZero(fd));
91 }
92
93 AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
94 super(parent, fd, true);
95 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
96 flags |= Native.EPOLLRDHUP;
97 }
98
99 protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
100 super(parent, fd, remote);
101 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
102 flags |= Native.EPOLLRDHUP;
103 }
104
105 protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
106 super(null, fd, active);
107 // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
108 flags |= Native.EPOLLRDHUP;
109 }
110
111 @Override
112 protected AbstractEpollUnsafe newUnsafe() {
113 return new EpollStreamUnsafe();
114 }
115
116 @Override
117 public ChannelMetadata metadata() {
118 return METADATA;
119 }
120
121 /**
122 * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
123 * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
124 * splice until the {@link ChannelFuture} was canceled or it was failed.
125 *
126 * Please note:
127 * <ul>
128 * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
129 * {@link IllegalArgumentException} is thrown. </li>
130 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
131 * target {@link AbstractEpollStreamChannel}</li>
132 * </ul>
133 *
134 */
135 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len) {
136 return spliceTo(ch, len, newPromise());
137 }
138
139 /**
140 * Splice from this {@link AbstractEpollStreamChannel} to another {@link AbstractEpollStreamChannel}.
141 * The {@code len} is the number of bytes to splice. If using {@link Integer#MAX_VALUE} it will
142 * splice until the {@link ChannelFuture} was canceled or it was failed.
143 *
144 * Please note:
145 * <ul>
146 * <li>both channels need to be registered to the same {@link EventLoop}, otherwise an
147 * {@link IllegalArgumentException} is thrown. </li>
148 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this and the
149 * target {@link AbstractEpollStreamChannel}</li>
150 * </ul>
151 *
152 */
153 public final ChannelFuture spliceTo(final AbstractEpollStreamChannel ch, final int len,
154 final ChannelPromise promise) {
155 if (ch.eventLoop() != eventLoop()) {
156 throw new IllegalArgumentException("EventLoops are not the same.");
157 }
158 checkPositiveOrZero(len, "len");
159 if (ch.config().getEpollMode() != EpollMode.LEVEL_TRIGGERED
160 || config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
161 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
162 }
163 checkNotNull(promise, "promise");
164 if (!isOpen()) {
165 promise.tryFailure(new ClosedChannelException());
166 } else {
167 addToSpliceQueue(new SpliceInChannelTask(ch, len, promise));
168 failSpliceIfClosed(promise);
169 }
170 return promise;
171 }
172
173 /**
174 * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
175 * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
176 * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
177 * {@link ChannelFuture} was canceled or it was failed.
178 *
179 * Please note:
180 * <ul>
181 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
182 * {@link AbstractEpollStreamChannel}</li>
183 * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelFuture} is notified</li>
184 * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
185 * </ul>
186 */
187 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len) {
188 return spliceTo(ch, offset, len, newPromise());
189 }
190
191 /**
192 * Splice from this {@link AbstractEpollStreamChannel} to another {@link FileDescriptor}.
193 * The {@code offset} is the offset for the {@link FileDescriptor} and {@code len} is the
194 * number of bytes to splice. If using {@link Integer#MAX_VALUE} it will splice until the
195 * {@link ChannelFuture} was canceled or it was failed.
196 *
197 * Please note:
198 * <ul>
199 * <li>{@link EpollChannelConfig#getEpollMode()} must be {@link EpollMode#LEVEL_TRIGGERED} for this
200 * {@link AbstractEpollStreamChannel}</li>
201 * <li>the {@link FileDescriptor} will not be closed after the {@link ChannelPromise} is notified</li>
202 * <li>this channel must be registered to an event loop or {@link IllegalStateException} will be thrown.</li>
203 * </ul>
204 */
205 public final ChannelFuture spliceTo(final FileDescriptor ch, final int offset, final int len,
206 final ChannelPromise promise) {
207 checkPositiveOrZero(len, "len");
208 checkPositiveOrZero(offset, "offset");
209 if (config().getEpollMode() != EpollMode.LEVEL_TRIGGERED) {
210 throw new IllegalStateException("spliceTo() supported only when using " + EpollMode.LEVEL_TRIGGERED);
211 }
212 checkNotNull(promise, "promise");
213 if (!isOpen()) {
214 promise.tryFailure(new ClosedChannelException());
215 } else {
216 addToSpliceQueue(new SpliceFdTask(ch, offset, len, promise));
217 failSpliceIfClosed(promise);
218 }
219 return promise;
220 }
221
222 private void failSpliceIfClosed(ChannelPromise promise) {
223 if (!isOpen()) {
224 // Seems like the Channel was closed in the meantime try to fail the promise to prevent any
225 // cases where a future may not be notified otherwise.
226 if (!promise.isDone()) {
227 final ClosedChannelException ex = new ClosedChannelException();
228 if (promise.tryFailure(ex)) {
229 eventLoop().execute(new Runnable() {
230 @Override
231 public void run() {
232 // Call this via the EventLoop as it is a MPSC queue.
233 clearSpliceQueue(ex);
234 }
235 });
236 }
237 }
238 }
239 }
240
241 /**
242 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
243 * @param in the collection which contains objects to write.
244 * @param buf the {@link ByteBuf} from which the bytes should be written
245 * @return The value that should be decremented from the write quantum which starts at
246 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
247 * <ul>
248 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
249 * is encountered</li>
250 * <li>1 - if a single call to write data was made to the OS</li>
251 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
252 * no data was accepted</li>
253 * </ul>
254 */
255 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
256 int readableBytes = buf.readableBytes();
257 if (readableBytes == 0) {
258 in.remove();
259 return 0;
260 }
261
262 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
263 return doWriteBytes(in, buf);
264 } else {
265 ByteBuffer[] nioBuffers = buf.nioBuffers();
266 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
267 config().getMaxBytesPerGatheringWrite());
268 }
269 }
270
271 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
272 // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
273 // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
274 // make a best effort to adjust as OS behavior changes.
275 if (attempted == written) {
276 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
277 config().setMaxBytesPerGatheringWrite(attempted << 1);
278 }
279 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
280 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
281 }
282 }
283
284 /**
285 * Write multiple bytes via {@link IovArray}.
286 * @param in the collection which contains objects to write.
287 * @param array The array which contains the content to write.
288 * @return The value that should be decremented from the write quantum which starts at
289 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
290 * <ul>
291 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
292 * is encountered</li>
293 * <li>1 - if a single call to write data was made to the OS</li>
294 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
295 * no data was accepted</li>
296 * </ul>
297 * @throws IOException If an I/O exception occurs during write.
298 */
299 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
300 final long expectedWrittenBytes = array.size();
301 assert expectedWrittenBytes != 0;
302 final int cnt = array.count();
303 assert cnt != 0;
304
305 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
306 if (localWrittenBytes > 0) {
307 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
308 in.removeBytes(localWrittenBytes);
309 return 1;
310 }
311 return WRITE_STATUS_SNDBUF_FULL;
312 }
313
314 /**
315 * Write multiple bytes via {@link ByteBuffer} array.
316 * @param in the collection which contains objects to write.
317 * @param nioBuffers The buffers to write.
318 * @param nioBufferCnt The number of buffers to write.
319 * @param expectedWrittenBytes The number of bytes we expect to write.
320 * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
321 * @return The value that should be decremented from the write quantum which starts at
322 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
323 * <ul>
324 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
325 * is encountered</li>
326 * <li>1 - if a single call to write data was made to the OS</li>
327 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
328 * no data was accepted</li>
329 * </ul>
330 * @throws IOException If an I/O exception occurs during write.
331 */
332 private int writeBytesMultiple(
333 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
334 long maxBytesPerGatheringWrite) throws IOException {
335 assert expectedWrittenBytes != 0;
336 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
337 expectedWrittenBytes = maxBytesPerGatheringWrite;
338 }
339
340 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
341 if (localWrittenBytes > 0) {
342 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
343 in.removeBytes(localWrittenBytes);
344 return 1;
345 }
346 return WRITE_STATUS_SNDBUF_FULL;
347 }
348
349 /**
350 * Write a {@link DefaultFileRegion}
351 * @param in the collection which contains objects to write.
352 * @param region the {@link DefaultFileRegion} from which the bytes should be written
353 * @return The value that should be decremented from the write quantum which starts at
354 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
355 * <ul>
356 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
357 * is encountered</li>
358 * <li>1 - if a single call to write data was made to the OS</li>
359 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
360 * no data was accepted</li>
361 * </ul>
362 */
363 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
364 final long offset = region.transferred();
365 final long regionCount = region.count();
366 if (offset >= regionCount) {
367 in.remove();
368 return 0;
369 }
370
371 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
372 if (flushedAmount > 0) {
373 in.progress(flushedAmount);
374 if (region.transferred() >= regionCount) {
375 in.remove();
376 }
377 return 1;
378 } else if (flushedAmount == 0) {
379 validateFileRegion(region, offset);
380 }
381 return WRITE_STATUS_SNDBUF_FULL;
382 }
383
384 /**
385 * Write a {@link FileRegion}
386 * @param in the collection which contains objects to write.
387 * @param region the {@link FileRegion} from which the bytes should be written
388 * @return The value that should be decremented from the write quantum which starts at
389 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
390 * <ul>
391 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
392 * is encountered</li>
393 * <li>1 - if a single call to write data was made to the OS</li>
394 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
395 * no data was accepted</li>
396 * </ul>
397 */
398 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
399 if (region.transferred() >= region.count()) {
400 in.remove();
401 return 0;
402 }
403
404 if (byteChannel == null) {
405 byteChannel = new EpollSocketWritableByteChannel();
406 }
407 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
408 if (flushedAmount > 0) {
409 in.progress(flushedAmount);
410 if (region.transferred() >= region.count()) {
411 in.remove();
412 }
413 return 1;
414 }
415 return WRITE_STATUS_SNDBUF_FULL;
416 }
417
418 @Override
419 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
420 int writeSpinCount = config().getWriteSpinCount();
421 do {
422 final int msgCount = in.size();
423 // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
424 if (msgCount > 1 && in.current() instanceof ByteBuf) {
425 writeSpinCount -= doWriteMultiple(in);
426 } else if (msgCount == 0) {
427 // Wrote all messages.
428 clearFlag(Native.EPOLLOUT);
429 // Return here so we not set the EPOLLOUT flag.
430 return;
431 } else { // msgCount == 1
432 writeSpinCount -= doWriteSingle(in);
433 }
434
435 // We do not break the loop here even if the outbound buffer was flushed completely,
436 // because a user might have triggered another write and flush when we notify his or her
437 // listeners.
438 } while (writeSpinCount > 0);
439
440 if (writeSpinCount == 0) {
441 // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
442 // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
443 // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
444 // and set the EPOLLOUT if necessary.
445 clearFlag(Native.EPOLLOUT);
446
447 // We used our writeSpin quantum, and should try to write again later.
448 eventLoop().execute(flushTask);
449 } else {
450 // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
451 // when it can accept more data.
452 setFlag(Native.EPOLLOUT);
453 }
454 }
455
456 /**
457 * Attempt to write a single object.
458 * @param in the collection which contains objects to write.
459 * @return The value that should be decremented from the write quantum which starts at
460 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
461 * <ul>
462 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
463 * is encountered</li>
464 * <li>1 - if a single call to write data was made to the OS</li>
465 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
466 * no data was accepted</li>
467 * </ul>
468 * @throws Exception If an I/O error occurs.
469 */
470 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
471 // The outbound buffer contains only one message or it contains a file region.
472 Object msg = in.current();
473 if (msg instanceof ByteBuf) {
474 return writeBytes(in, (ByteBuf) msg);
475 } else if (msg instanceof DefaultFileRegion) {
476 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
477 } else if (msg instanceof FileRegion) {
478 return writeFileRegion(in, (FileRegion) msg);
479 } else if (msg instanceof SpliceOutTask) {
480 if (!((SpliceOutTask) msg).spliceOut()) {
481 return WRITE_STATUS_SNDBUF_FULL;
482 }
483 in.remove();
484 return 1;
485 } else {
486 // Should never reach here.
487 throw new Error();
488 }
489 }
490
491 /**
492 * Attempt to write multiple {@link ByteBuf} objects.
493 * @param in the collection which contains objects to write.
494 * @return The value that should be decremented from the write quantum which starts at
495 * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
496 * <ul>
497 * <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
498 * is encountered</li>
499 * <li>1 - if a single call to write data was made to the OS</li>
500 * <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
501 * no data was accepted</li>
502 * </ul>
503 * @throws Exception If an I/O error occurs.
504 */
505 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
506 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
507 IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
508 array.maxBytes(maxBytesPerGatheringWrite);
509 in.forEachFlushedMessage(array);
510
511 if (array.count() >= 1) {
512 // TODO: Handle the case where cnt == 1 specially.
513 return writeBytesMultiple(in, array);
514 }
515 // cnt == 0, which means the outbound buffer contained empty buffers only.
516 in.removeBytes(0);
517 return 0;
518 }
519
520 @Override
521 protected Object filterOutboundMessage(Object msg) {
522 if (msg instanceof ByteBuf) {
523 ByteBuf buf = (ByteBuf) msg;
524 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf): buf;
525 }
526
527 if (msg instanceof FileRegion || msg instanceof SpliceOutTask) {
528 return msg;
529 }
530
531 throw new UnsupportedOperationException(
532 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
533 }
534
535 @UnstableApi
536 @Override
537 protected final void doShutdownOutput() throws Exception {
538 socket.shutdown(false, true);
539 }
540
541 private void shutdownInput0(final ChannelPromise promise) {
542 try {
543 socket.shutdown(true, false);
544 promise.setSuccess();
545 } catch (Throwable cause) {
546 promise.setFailure(cause);
547 }
548 }
549
550 @Override
551 public boolean isOutputShutdown() {
552 return socket.isOutputShutdown();
553 }
554
555 @Override
556 public boolean isInputShutdown() {
557 return socket.isInputShutdown();
558 }
559
560 @Override
561 public boolean isShutdown() {
562 return socket.isShutdown();
563 }
564
565 @Override
566 public ChannelFuture shutdownOutput() {
567 return shutdownOutput(newPromise());
568 }
569
570 @Override
571 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
572 EventLoop loop = eventLoop();
573 if (loop.inEventLoop()) {
574 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
575 } else {
576 loop.execute(new Runnable() {
577 @Override
578 public void run() {
579 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
580 }
581 });
582 }
583
584 return promise;
585 }
586
587 @Override
588 public ChannelFuture shutdownInput() {
589 return shutdownInput(newPromise());
590 }
591
592 @Override
593 public ChannelFuture shutdownInput(final ChannelPromise promise) {
594 Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose();
595 if (closeExecutor != null) {
596 closeExecutor.execute(new Runnable() {
597 @Override
598 public void run() {
599 shutdownInput0(promise);
600 }
601 });
602 } else {
603 EventLoop loop = eventLoop();
604 if (loop.inEventLoop()) {
605 shutdownInput0(promise);
606 } else {
607 loop.execute(new Runnable() {
608 @Override
609 public void run() {
610 shutdownInput0(promise);
611 }
612 });
613 }
614 }
615 return promise;
616 }
617
618 @Override
619 public ChannelFuture shutdown() {
620 return shutdown(newPromise());
621 }
622
623 @Override
624 public ChannelFuture shutdown(final ChannelPromise promise) {
625 ChannelFuture shutdownOutputFuture = shutdownOutput();
626 if (shutdownOutputFuture.isDone()) {
627 shutdownOutputDone(shutdownOutputFuture, promise);
628 } else {
629 shutdownOutputFuture.addListener(new ChannelFutureListener() {
630 @Override
631 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
632 shutdownOutputDone(shutdownOutputFuture, promise);
633 }
634 });
635 }
636 return promise;
637 }
638
639 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
640 ChannelFuture shutdownInputFuture = shutdownInput();
641 if (shutdownInputFuture.isDone()) {
642 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
643 } else {
644 shutdownInputFuture.addListener(new ChannelFutureListener() {
645 @Override
646 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
647 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
648 }
649 });
650 }
651 }
652
653 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
654 ChannelFuture shutdownInputFuture,
655 ChannelPromise promise) {
656 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
657 Throwable shutdownInputCause = shutdownInputFuture.cause();
658 if (shutdownOutputCause != null) {
659 if (shutdownInputCause != null) {
660 logger.debug("Exception suppressed because a previous exception occurred.",
661 shutdownInputCause);
662 }
663 promise.setFailure(shutdownOutputCause);
664 } else if (shutdownInputCause != null) {
665 promise.setFailure(shutdownInputCause);
666 } else {
667 promise.setSuccess();
668 }
669 }
670
671 @Override
672 protected void doClose() throws Exception {
673 try {
674 // Calling super.doClose() first so spliceTo(...) will fail on next call.
675 super.doClose();
676 } finally {
677 safeClosePipe(pipeIn);
678 safeClosePipe(pipeOut);
679 clearSpliceQueue(null);
680 }
681 }
682
683 private void clearSpliceQueue(ClosedChannelException exception) {
684 Queue<SpliceInTask> sQueue = spliceQueue;
685 if (sQueue == null) {
686 return;
687 }
688 for (;;) {
689 SpliceInTask task = sQueue.poll();
690 if (task == null) {
691 break;
692 }
693 if (exception == null) {
694 exception = new ClosedChannelException();
695 }
696 task.promise.tryFailure(exception);
697 }
698 }
699
700 private static void safeClosePipe(FileDescriptor fd) {
701 if (fd != null) {
702 try {
703 fd.close();
704 } catch (IOException e) {
705 logger.warn("Error while closing a pipe", e);
706 }
707 }
708 }
709
710 class EpollStreamUnsafe extends AbstractEpollUnsafe {
711 // Overridden here just to be able to access this method from AbstractEpollStreamChannel
712 @Override
713 protected Executor prepareToClose() {
714 return super.prepareToClose();
715 }
716
717 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
718 EpollRecvByteAllocatorHandle allocHandle) {
719 if (byteBuf != null) {
720 if (byteBuf.isReadable()) {
721 readPending = false;
722 pipeline.fireChannelRead(byteBuf);
723 } else {
724 byteBuf.release();
725 }
726 }
727 allocHandle.readComplete();
728 pipeline.fireChannelReadComplete();
729 pipeline.fireExceptionCaught(cause);
730
731 // If oom will close the read event, release connection.
732 // See https://github.com/netty/netty/issues/10434
733 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
734 shutdownInput(false);
735 }
736 }
737
738 @Override
739 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
740 return new EpollRecvByteAllocatorStreamingHandle(handle);
741 }
742
743 @Override
744 void epollInReady() {
745 final ChannelConfig config = config();
746 if (shouldBreakEpollInReady(config)) {
747 clearEpollIn0();
748 return;
749 }
750 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
751 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
752
753 final ChannelPipeline pipeline = pipeline();
754 final ByteBufAllocator allocator = config.getAllocator();
755 allocHandle.reset(config);
756 epollInBefore();
757
758 ByteBuf byteBuf = null;
759 boolean close = false;
760 Queue<SpliceInTask> sQueue = null;
761 try {
762 do {
763 if (sQueue != null || (sQueue = spliceQueue) != null) {
764 SpliceInTask spliceTask = sQueue.peek();
765 if (spliceTask != null) {
766 boolean spliceInResult = spliceTask.spliceIn(allocHandle);
767
768 if (allocHandle.isReceivedRdHup()) {
769 shutdownInput(true);
770 }
771 if (spliceInResult) {
772 // We need to check if it is still active as if not we removed all SpliceTasks in
773 // doClose(...)
774 if (isActive()) {
775 sQueue.remove();
776 }
777 continue;
778 } else {
779 break;
780 }
781 }
782 }
783
784 // we use a direct buffer here as the native implementations only be able
785 // to handle direct buffers.
786 byteBuf = allocHandle.allocate(allocator);
787 allocHandle.lastBytesRead(doReadBytes(byteBuf));
788 if (allocHandle.lastBytesRead() <= 0) {
789 // nothing was read, release the buffer.
790 byteBuf.release();
791 byteBuf = null;
792 close = allocHandle.lastBytesRead() < 0;
793 if (close) {
794 // There is nothing left to read as we received an EOF.
795 readPending = false;
796 }
797 break;
798 }
799 allocHandle.incMessagesRead(1);
800 readPending = false;
801 pipeline.fireChannelRead(byteBuf);
802 byteBuf = null;
803
804 if (shouldBreakEpollInReady(config)) {
805 // We need to do this for two reasons:
806 //
807 // - If the input was shutdown in between (which may be the case when the user did it in the
808 // fireChannelRead(...) method we should not try to read again to not produce any
809 // miss-leading exceptions.
810 //
811 // - If the user closes the channel we need to ensure we not try to read from it again as
812 // the filedescriptor may be re-used already by the OS if the system is handling a lot of
813 // concurrent connections and so needs a lot of filedescriptors. If not do this we risk
814 // reading data from a filedescriptor that belongs to another socket then the socket that
815 // was "wrapped" by this Channel implementation.
816 break;
817 }
818 } while (allocHandle.continueReading());
819
820 allocHandle.readComplete();
821 pipeline.fireChannelReadComplete();
822
823 if (close) {
824 shutdownInput(false);
825 }
826 } catch (Throwable t) {
827 handleReadException(pipeline, byteBuf, t, close, allocHandle);
828 } finally {
829 if (sQueue == null) {
830 epollInFinally(config);
831 } else {
832 if (!config.isAutoRead()) {
833 clearEpollIn();
834 }
835 }
836 }
837 }
838 }
839
840 private void addToSpliceQueue(final SpliceInTask task) {
841 Queue<SpliceInTask> sQueue = spliceQueue;
842 if (sQueue == null) {
843 synchronized (this) {
844 sQueue = spliceQueue;
845 if (sQueue == null) {
846 spliceQueue = sQueue = PlatformDependent.newMpscQueue();
847 }
848 }
849 }
850 sQueue.add(task);
851 }
852
853 protected abstract class SpliceInTask {
854 final ChannelPromise promise;
855 int len;
856
857 protected SpliceInTask(int len, ChannelPromise promise) {
858 this.promise = promise;
859 this.len = len;
860 }
861
862 abstract boolean spliceIn(RecvByteBufAllocator.Handle handle);
863
864 protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException {
865 // calculate the maximum amount of data we are allowed to splice
866 int length = Math.min(handle.guess(), len);
867 int splicedIn = 0;
868 for (;;) {
869 // Splicing until there is nothing left to splice.
870 int localSplicedIn = Native.splice(socket.intValue(), -1, pipeOut.intValue(), -1, length);
871 handle.lastBytesRead(localSplicedIn);
872 if (localSplicedIn == 0) {
873 break;
874 }
875 splicedIn += localSplicedIn;
876 length -= localSplicedIn;
877 }
878
879 return splicedIn;
880 }
881 }
882
883 // Let it directly implement channelFutureListener as well to reduce object creation.
884 private final class SpliceInChannelTask extends SpliceInTask implements ChannelFutureListener {
885 private final AbstractEpollStreamChannel ch;
886
887 SpliceInChannelTask(AbstractEpollStreamChannel ch, int len, ChannelPromise promise) {
888 super(len, promise);
889 this.ch = ch;
890 }
891
892 @Override
893 public void operationComplete(ChannelFuture future) throws Exception {
894 if (!future.isSuccess()) {
895 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
896 promise.tryFailure(future.cause());
897 }
898 }
899
900 @Override
901 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
902 assert ch.eventLoop().inEventLoop();
903 if (len == 0) {
904 // Use trySuccess() as the promise might already be closed by spliceTo(...)
905 promise.trySuccess();
906 return true;
907 }
908 try {
909 // We create the pipe on the target channel as this will allow us to just handle pending writes
910 // later in a correct fashion without get into any ordering issues when spliceTo(...) is called
911 // on multiple Channels pointing to one target Channel.
912 FileDescriptor pipeOut = ch.pipeOut;
913 if (pipeOut == null) {
914 // Create a new pipe as non was created before.
915 FileDescriptor[] pipe = pipe();
916 ch.pipeIn = pipe[0];
917 pipeOut = ch.pipeOut = pipe[1];
918 }
919
920 int splicedIn = spliceIn(pipeOut, handle);
921 if (splicedIn > 0) {
922 // Integer.MAX_VALUE is a special value which will result in splice forever.
923 if (len != Integer.MAX_VALUE) {
924 len -= splicedIn;
925 }
926
927 // Depending on if we are done with splicing inbound data we set the right promise for the
928 // outbound splicing.
929 final ChannelPromise splicePromise;
930 if (len == 0) {
931 splicePromise = promise;
932 } else {
933 splicePromise = ch.newPromise().addListener(this);
934 }
935
936 boolean autoRead = config().isAutoRead();
937
938 // Just call unsafe().write(...) and flush() as we not want to traverse the whole pipeline for this
939 // case.
940 ch.unsafe().write(new SpliceOutTask(ch, splicedIn, autoRead), splicePromise);
941 ch.unsafe().flush();
942 if (autoRead && !splicePromise.isDone()) {
943 // Write was not done which means the target channel was not writable. In this case we need to
944 // disable reading until we are done with splicing to the target channel because:
945 //
946 // - The user may want to to trigger another splice operation once the splicing was complete.
947 config().setAutoRead(false);
948 }
949 }
950
951 return len == 0;
952 } catch (Throwable cause) {
953 // Use tryFailure(...) as the promise might already be closed by spliceTo(...)
954 promise.tryFailure(cause);
955 return true;
956 }
957 }
958 }
959
960 private final class SpliceOutTask {
961 private final AbstractEpollStreamChannel ch;
962 private final boolean autoRead;
963 private int len;
964
965 SpliceOutTask(AbstractEpollStreamChannel ch, int len, boolean autoRead) {
966 this.ch = ch;
967 this.len = len;
968 this.autoRead = autoRead;
969 }
970
971 public boolean spliceOut() throws Exception {
972 assert ch.eventLoop().inEventLoop();
973 try {
974 int splicedOut = Native.splice(ch.pipeIn.intValue(), -1, ch.socket.intValue(), -1, len);
975 len -= splicedOut;
976 if (len == 0) {
977 if (autoRead) {
978 // AutoRead was used and we spliced everything so start reading again
979 config().setAutoRead(true);
980 }
981 return true;
982 }
983 return false;
984 } catch (IOException e) {
985 if (autoRead) {
986 // AutoRead was used and we spliced everything so start reading again
987 config().setAutoRead(true);
988 }
989 throw e;
990 }
991 }
992 }
993
994 private final class SpliceFdTask extends SpliceInTask {
995 private final FileDescriptor fd;
996 private final ChannelPromise promise;
997 private int offset;
998
999 SpliceFdTask(FileDescriptor fd, int offset, int len, ChannelPromise promise) {
1000 super(len, promise);
1001 this.fd = fd;
1002 this.promise = promise;
1003 this.offset = offset;
1004 }
1005
1006 @Override
1007 public boolean spliceIn(RecvByteBufAllocator.Handle handle) {
1008 assert eventLoop().inEventLoop();
1009 if (len == 0) {
1010 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1011 promise.trySuccess();
1012 return true;
1013 }
1014
1015 try {
1016 FileDescriptor[] pipe = pipe();
1017 FileDescriptor pipeIn = pipe[0];
1018 FileDescriptor pipeOut = pipe[1];
1019 try {
1020 int splicedIn = spliceIn(pipeOut, handle);
1021 if (splicedIn > 0) {
1022 // Integer.MAX_VALUE is a special value which will result in splice forever.
1023 if (len != Integer.MAX_VALUE) {
1024 len -= splicedIn;
1025 }
1026 do {
1027 int splicedOut = Native.splice(pipeIn.intValue(), -1, fd.intValue(), offset, splicedIn);
1028 offset += splicedOut;
1029 splicedIn -= splicedOut;
1030 } while (splicedIn > 0);
1031 if (len == 0) {
1032 // Use trySuccess() as the promise might already be failed by spliceTo(...)
1033 promise.trySuccess();
1034 return true;
1035 }
1036 }
1037 return false;
1038 } finally {
1039 safeClosePipe(pipeIn);
1040 safeClosePipe(pipeOut);
1041 }
1042 } catch (Throwable cause) {
1043 // Use tryFailure(...) as the promise might already be failed by spliceTo(...)
1044 promise.tryFailure(cause);
1045 return true;
1046 }
1047 }
1048 }
1049
1050 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
1051 EpollSocketWritableByteChannel() {
1052 super(socket);
1053 assert fd == socket;
1054 }
1055
1056 @Override
1057 protected int write(final ByteBuffer buf, final int pos, final int limit) throws IOException {
1058 return socket.send(buf, pos, limit);
1059 }
1060
1061 @Override
1062 protected ByteBufAllocator alloc() {
1063 return AbstractEpollStreamChannel.this.alloc();
1064 }
1065 }
1066 }