1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultFileRegion;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.FileRegion;
31 import io.netty.channel.internal.ChannelUtils;
32 import io.netty.channel.socket.DuplexChannel;
33 import io.netty.channel.unix.IovArray;
34 import io.netty.channel.unix.SocketWritableByteChannel;
35 import io.netty.channel.unix.UnixChannelUtil;
36 import io.netty.util.internal.StringUtil;
37 import io.netty.util.internal.UnstableApi;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.SocketAddress;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.WritableByteChannel;
45 import java.util.concurrent.Executor;
46
47 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
48 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
49
50 @UnstableApi
51 public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
53 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
54 private static final String EXPECTED_TYPES =
55 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
56 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
57 private WritableByteChannel byteChannel;
58 private final Runnable flushTask = new Runnable() {
59 @Override
60 public void run() {
61
62
63 ((AbstractKQueueUnsafe) unsafe()).flush0();
64 }
65 };
66
67 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
68 super(parent, fd, active);
69 }
70
71 AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
72 super(parent, fd, remote);
73 }
74
75 AbstractKQueueStreamChannel(BsdSocket fd) {
76 this(null, fd, isSoErrorZero(fd));
77 }
78
79 @Override
80 protected AbstractKQueueUnsafe newUnsafe() {
81 return new KQueueStreamUnsafe();
82 }
83
84 @Override
85 public ChannelMetadata metadata() {
86 return METADATA;
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
104 int readableBytes = buf.readableBytes();
105 if (readableBytes == 0) {
106 in.remove();
107 return 0;
108 }
109
110 if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
111 return doWriteBytes(in, buf);
112 } else {
113 ByteBuffer[] nioBuffers = buf.nioBuffers();
114 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
115 config().getMaxBytesPerGatheringWrite());
116 }
117 }
118
119 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
120
121
122
123 if (attempted == written) {
124 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
125 config().setMaxBytesPerGatheringWrite(attempted << 1);
126 }
127 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
128 config().setMaxBytesPerGatheringWrite(attempted >>> 1);
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
148 final long expectedWrittenBytes = array.size();
149 assert expectedWrittenBytes != 0;
150 final int cnt = array.count();
151 assert cnt != 0;
152
153 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
154 if (localWrittenBytes > 0) {
155 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
156 in.removeBytes(localWrittenBytes);
157 return 1;
158 }
159 return WRITE_STATUS_SNDBUF_FULL;
160 }
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 private int writeBytesMultiple(
181 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
182 long maxBytesPerGatheringWrite) throws IOException {
183 assert expectedWrittenBytes != 0;
184 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
185 expectedWrittenBytes = maxBytesPerGatheringWrite;
186 }
187
188 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
189 if (localWrittenBytes > 0) {
190 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
191 in.removeBytes(localWrittenBytes);
192 return 1;
193 }
194 return WRITE_STATUS_SNDBUF_FULL;
195 }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
212 final long regionCount = region.count();
213 final long offset = region.transferred();
214
215 if (offset >= regionCount) {
216 in.remove();
217 return 0;
218 }
219
220 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
221 if (flushedAmount > 0) {
222 in.progress(flushedAmount);
223 if (region.transferred() >= regionCount) {
224 in.remove();
225 }
226 return 1;
227 } else if (flushedAmount == 0) {
228 validateFileRegion(region, offset);
229 }
230 return WRITE_STATUS_SNDBUF_FULL;
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
248 if (region.transferred() >= region.count()) {
249 in.remove();
250 return 0;
251 }
252
253 if (byteChannel == null) {
254 byteChannel = new KQueueSocketWritableByteChannel();
255 }
256 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
257 if (flushedAmount > 0) {
258 in.progress(flushedAmount);
259 if (region.transferred() >= region.count()) {
260 in.remove();
261 }
262 return 1;
263 }
264 return WRITE_STATUS_SNDBUF_FULL;
265 }
266
267 @Override
268 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
269 int writeSpinCount = config().getWriteSpinCount();
270 do {
271 final int msgCount = in.size();
272
273 if (msgCount > 1 && in.current() instanceof ByteBuf) {
274 writeSpinCount -= doWriteMultiple(in);
275 } else if (msgCount == 0) {
276
277 writeFilter(false);
278
279 return;
280 } else {
281 writeSpinCount -= doWriteSingle(in);
282 }
283
284
285
286
287 } while (writeSpinCount > 0);
288
289 if (writeSpinCount == 0) {
290
291
292
293
294 writeFilter(false);
295
296
297 eventLoop().execute(flushTask);
298 } else {
299
300
301 writeFilter(true);
302 }
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
320
321 Object msg = in.current();
322 if (msg instanceof ByteBuf) {
323 return writeBytes(in, (ByteBuf) msg);
324 } else if (msg instanceof DefaultFileRegion) {
325 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
326 } else if (msg instanceof FileRegion) {
327 return writeFileRegion(in, (FileRegion) msg);
328 } else {
329
330 throw new Error();
331 }
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
349 final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
350 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
351 array.maxBytes(maxBytesPerGatheringWrite);
352 in.forEachFlushedMessage(array);
353
354 if (array.count() >= 1) {
355
356 return writeBytesMultiple(in, array);
357 }
358
359 in.removeBytes(0);
360 return 0;
361 }
362
363 @Override
364 protected Object filterOutboundMessage(Object msg) {
365 if (msg instanceof ByteBuf) {
366 ByteBuf buf = (ByteBuf) msg;
367 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
368 }
369
370 if (msg instanceof FileRegion) {
371 return msg;
372 }
373
374 throw new UnsupportedOperationException(
375 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
376 }
377
378 @UnstableApi
379 @Override
380 protected final void doShutdownOutput() throws Exception {
381 socket.shutdown(false, true);
382 }
383
384 @Override
385 public boolean isOutputShutdown() {
386 return socket.isOutputShutdown();
387 }
388
389 @Override
390 public boolean isInputShutdown() {
391 return socket.isInputShutdown();
392 }
393
394 @Override
395 public boolean isShutdown() {
396 return socket.isShutdown();
397 }
398
399 @Override
400 public ChannelFuture shutdownOutput() {
401 return shutdownOutput(newPromise());
402 }
403
404 @Override
405 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
406 EventLoop loop = eventLoop();
407 if (loop.inEventLoop()) {
408 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
409 } else {
410 loop.execute(new Runnable() {
411 @Override
412 public void run() {
413 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
414 }
415 });
416 }
417 return promise;
418 }
419
420 @Override
421 public ChannelFuture shutdownInput() {
422 return shutdownInput(newPromise());
423 }
424
425 @Override
426 public ChannelFuture shutdownInput(final ChannelPromise promise) {
427 EventLoop loop = eventLoop();
428 if (loop.inEventLoop()) {
429 shutdownInput0(promise);
430 } else {
431 loop.execute(new Runnable() {
432 @Override
433 public void run() {
434 shutdownInput0(promise);
435 }
436 });
437 }
438 return promise;
439 }
440
441 private void shutdownInput0(ChannelPromise promise) {
442 try {
443 socket.shutdown(true, false);
444 } catch (Throwable cause) {
445 promise.setFailure(cause);
446 return;
447 }
448 promise.setSuccess();
449 }
450
451 @Override
452 public ChannelFuture shutdown() {
453 return shutdown(newPromise());
454 }
455
456 @Override
457 public ChannelFuture shutdown(final ChannelPromise promise) {
458 ChannelFuture shutdownOutputFuture = shutdownOutput();
459 if (shutdownOutputFuture.isDone()) {
460 shutdownOutputDone(shutdownOutputFuture, promise);
461 } else {
462 shutdownOutputFuture.addListener(new ChannelFutureListener() {
463 @Override
464 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
465 shutdownOutputDone(shutdownOutputFuture, promise);
466 }
467 });
468 }
469 return promise;
470 }
471
472 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
473 ChannelFuture shutdownInputFuture = shutdownInput();
474 if (shutdownInputFuture.isDone()) {
475 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
476 } else {
477 shutdownInputFuture.addListener(new ChannelFutureListener() {
478 @Override
479 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
480 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
481 }
482 });
483 }
484 }
485
486 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
487 ChannelFuture shutdownInputFuture,
488 ChannelPromise promise) {
489 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
490 Throwable shutdownInputCause = shutdownInputFuture.cause();
491 if (shutdownOutputCause != null) {
492 if (shutdownInputCause != null) {
493 logger.debug("Exception suppressed because a previous exception occurred.",
494 shutdownInputCause);
495 }
496 promise.setFailure(shutdownOutputCause);
497 } else if (shutdownInputCause != null) {
498 promise.setFailure(shutdownInputCause);
499 } else {
500 promise.setSuccess();
501 }
502 }
503
504 class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
505
506 @Override
507 protected Executor prepareToClose() {
508 return super.prepareToClose();
509 }
510
511 @Override
512 void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
513 final ChannelConfig config = config();
514 if (shouldBreakReadReady(config)) {
515 clearReadFilter0();
516 return;
517 }
518 final ChannelPipeline pipeline = pipeline();
519 final ByteBufAllocator allocator = config.getAllocator();
520 allocHandle.reset(config);
521 readReadyBefore();
522
523 ByteBuf byteBuf = null;
524 boolean close = false;
525 try {
526 do {
527
528
529 byteBuf = allocHandle.allocate(allocator);
530 allocHandle.lastBytesRead(doReadBytes(byteBuf));
531 if (allocHandle.lastBytesRead() <= 0) {
532
533 byteBuf.release();
534 byteBuf = null;
535 close = allocHandle.lastBytesRead() < 0;
536 if (close) {
537
538 readPending = false;
539 }
540 break;
541 }
542 allocHandle.incMessagesRead(1);
543 readPending = false;
544 pipeline.fireChannelRead(byteBuf);
545 byteBuf = null;
546
547 if (shouldBreakReadReady(config)) {
548
549
550
551
552
553
554
555
556
557
558
559 break;
560 }
561 } while (allocHandle.continueReading());
562
563 allocHandle.readComplete();
564 pipeline.fireChannelReadComplete();
565
566 if (close) {
567 shutdownInput(false);
568 }
569 } catch (Throwable t) {
570 handleReadException(pipeline, byteBuf, t, close, allocHandle);
571 } finally {
572 readReadyFinally(config);
573 }
574 }
575
576 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
577 KQueueRecvByteAllocatorHandle allocHandle) {
578 if (byteBuf != null) {
579 if (byteBuf.isReadable()) {
580 readPending = false;
581 pipeline.fireChannelRead(byteBuf);
582 } else {
583 byteBuf.release();
584 }
585 }
586 if (!failConnectPromise(cause)) {
587 allocHandle.readComplete();
588 pipeline.fireChannelReadComplete();
589 pipeline.fireExceptionCaught(cause);
590
591
592
593 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
594 shutdownInput(false);
595 }
596 }
597 }
598 }
599
600 private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
601 KQueueSocketWritableByteChannel() {
602 super(socket);
603 }
604
605 @Override
606 protected ByteBufAllocator alloc() {
607 return AbstractKQueueStreamChannel.this.alloc();
608 }
609 }
610 }