1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelConfig;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelInboundHandlerAdapter;
28 import io.netty.channel.ChannelInitializer;
29 import io.netty.channel.ChannelOption;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.SimpleChannelInboundHandler;
32 import io.netty.channel.socket.ChannelInputShutdownEvent;
33 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
34 import io.netty.channel.socket.ChannelOutputShutdownEvent;
35 import io.netty.channel.socket.DuplexChannel;
36 import io.netty.channel.socket.SocketChannel;
37 import io.netty.util.ReferenceCountUtil;
38 import io.netty.util.UncheckedBooleanSupplier;
39 import io.netty.util.internal.PlatformDependent;
40 import org.junit.jupiter.api.Test;
41 import org.junit.jupiter.api.TestInfo;
42 import org.junit.jupiter.api.Timeout;
43
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.concurrent.atomic.AtomicReference;
48
49 import static java.util.concurrent.TimeUnit.MILLISECONDS;
50 import static org.junit.jupiter.api.Assertions.assertEquals;
51 import static org.junit.jupiter.api.Assertions.assertNull;
52 import static org.junit.jupiter.api.Assertions.assertTrue;
53 import static org.junit.jupiter.api.Assumptions.assumeFalse;
54
55 public class SocketHalfClosedTest extends AbstractSocketTest {
56
57 protected int maxReadCompleteWithNoDataAfterInputShutdown() {
58 return 2;
59 }
60
61 @Test
62 @Timeout(value = 5000, unit = MILLISECONDS)
63 public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
64 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
65 @Override
66 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
67 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
68 }
69 });
70 }
71
72 private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
73 throws Throwable {
74 Channel serverChannel = null;
75 Channel clientChannel = null;
76
77 final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
78 try {
79 sb.childOption(ChannelOption.SO_LINGER, 1)
80 .childHandler(new ChannelInitializer<Channel>() {
81
82 @Override
83 protected void initChannel(Channel ch) throws Exception {
84 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
85
86 @Override
87 public void channelActive(final ChannelHandlerContext ctx) {
88 SocketChannel channel = (SocketChannel) ctx.channel();
89 channel.shutdownOutput();
90 }
91
92 @Override
93 public void channelRead(ChannelHandlerContext ctx, Object msg) {
94 ReferenceCountUtil.release(msg);
95 waitHalfClosureDone.countDown();
96 }
97 });
98 }
99 });
100
101 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
102 .handler(new ChannelInitializer<Channel>() {
103 @Override
104 protected void initChannel(Channel ch) throws Exception {
105 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
106
107 @Override
108 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
109 if (ChannelInputShutdownEvent.INSTANCE == evt) {
110 ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
111 }
112
113 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
114 ctx.close();
115 }
116 }
117 });
118 }
119 });
120
121 serverChannel = sb.bind().sync().channel();
122 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
123 waitHalfClosureDone.await();
124 } finally {
125 if (clientChannel != null) {
126 clientChannel.close().sync();
127 }
128
129 if (serverChannel != null) {
130 serverChannel.close().sync();
131 }
132 }
133 }
134
135 @Test
136 @Timeout(value = 10000, unit = MILLISECONDS)
137 public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
138 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
139 @Override
140 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
141 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
142 }
143 });
144 }
145
146 public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
147 Channel serverChannel = null;
148 try {
149 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
150 .option(ChannelOption.AUTO_READ, true);
151 sb.childHandler(new ChannelInitializer<Channel>() {
152 @Override
153 protected void initChannel(Channel ch) {
154 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
155 @Override
156 public void channelActive(ChannelHandlerContext ctx) {
157 ((DuplexChannel) ctx).shutdownOutput();
158 }
159
160 @Override
161 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
162 ctx.close();
163 }
164 });
165 }
166 });
167
168 final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
169 final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
170
171 cb.handler(new ChannelInitializer<Channel>() {
172 @Override
173 protected void initChannel(Channel ch) {
174 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
175
176 @Override
177 public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
178 if (evt == ChannelInputShutdownEvent.INSTANCE) {
179 shutdownEventReceivedCounter.incrementAndGet();
180 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
181 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
182 ctx.executor().schedule(new Runnable() {
183 @Override
184 public void run() {
185 ctx.close();
186 }
187 }, 100, MILLISECONDS);
188 }
189 }
190
191 @Override
192 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
193 ctx.close();
194 }
195 });
196 }
197 });
198
199 serverChannel = sb.bind().sync().channel();
200 Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
201 clientChannel.closeFuture().await();
202 assertEquals(1, shutdownEventReceivedCounter.get());
203 assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
204 } finally {
205 if (serverChannel != null) {
206 serverChannel.close().sync();
207 }
208 }
209 }
210
211 @Test
212 public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
213 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
214 @Override
215 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
216 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
217 }
218 });
219 }
220
221 public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
222 testAllDataReadAfterHalfClosure(true, sb, cb);
223 testAllDataReadAfterHalfClosure(false, sb, cb);
224 }
225
226 private void testAllDataReadAfterHalfClosure(final boolean autoRead,
227 ServerBootstrap sb, Bootstrap cb) throws Throwable {
228 final int totalServerBytesWritten = 1024 * 16;
229 final int numReadsPerReadLoop = 2;
230 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
231 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
232 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
233 final AtomicInteger clientReadCompletes = new AtomicInteger();
234 final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
235 Channel serverChannel = null;
236 Channel clientChannel = null;
237 try {
238 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
239 .option(ChannelOption.AUTO_READ, autoRead)
240 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
241
242 sb.childHandler(new ChannelInitializer<Channel>() {
243 @Override
244 protected void initChannel(Channel ch) throws Exception {
245 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
246 @Override
247 public void channelActive(ChannelHandlerContext ctx) throws Exception {
248 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
249 buf.writerIndex(buf.capacity());
250 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
251 @Override
252 public void operationComplete(ChannelFuture future) throws Exception {
253 ((DuplexChannel) future.channel()).shutdownOutput();
254 }
255 });
256 serverInitializedLatch.countDown();
257 }
258
259 @Override
260 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
261 ctx.close();
262 }
263 });
264 }
265 });
266
267 cb.handler(new ChannelInitializer<Channel>() {
268 @Override
269 protected void initChannel(Channel ch) throws Exception {
270 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
271 private int bytesRead;
272 private int bytesSinceReadComplete;
273
274 @Override
275 public void channelRead(ChannelHandlerContext ctx, Object msg) {
276 ByteBuf buf = (ByteBuf) msg;
277 bytesRead += buf.readableBytes();
278 bytesSinceReadComplete += buf.readableBytes();
279 buf.release();
280 }
281
282 @Override
283 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
284 if (evt == ChannelInputShutdownEvent.INSTANCE) {
285 clientHalfClosedLatch.countDown();
286 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
287 ctx.close();
288 }
289 }
290
291 @Override
292 public void channelReadComplete(ChannelHandlerContext ctx) {
293 if (bytesSinceReadComplete == 0) {
294 clientZeroDataReadCompletes.incrementAndGet();
295 } else {
296 bytesSinceReadComplete = 0;
297 }
298 clientReadCompletes.incrementAndGet();
299 if (bytesRead == totalServerBytesWritten) {
300 clientReadAllDataLatch.countDown();
301 }
302 if (!autoRead) {
303 ctx.read();
304 }
305 }
306
307 @Override
308 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
309 ctx.close();
310 }
311 });
312 }
313 });
314
315 serverChannel = sb.bind().sync().channel();
316 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
317 clientChannel.read();
318
319 serverInitializedLatch.await();
320 clientReadAllDataLatch.await();
321 clientHalfClosedLatch.await();
322
323
324
325 assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
326 "too many read complete events: " + clientReadCompletes.get());
327 assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
328 "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
329 clientReadCompletes.get());
330 } finally {
331 if (clientChannel != null) {
332 clientChannel.close().sync();
333 }
334 if (serverChannel != null) {
335 serverChannel.close().sync();
336 }
337 }
338 }
339
340 @Test
341 public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
342
343 assumeFalse(PlatformDependent.isWindows());
344 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
345 @Override
346 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
347 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
348 }
349 });
350 }
351
352 public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
353 testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
354 testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
355 testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
356 testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
357 }
358
359 private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
360 final boolean clientIsLeader,
361 ServerBootstrap sb,
362 Bootstrap cb) throws InterruptedException {
363 final int expectedBytes = 100;
364 final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
365 final CountDownLatch doneLatch = new CountDownLatch(2);
366 final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
367 Channel serverChannel = null;
368 Channel clientChannel = null;
369 try {
370 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
371 .option(ChannelOption.AUTO_CLOSE, false)
372 .option(ChannelOption.SO_LINGER, 0);
373 sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
374 .childOption(ChannelOption.AUTO_CLOSE, false)
375 .childOption(ChannelOption.SO_LINGER, 0);
376
377 final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
378 serverReadExpectedLatch, doneLatch, causeRef);
379 final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
380 serverReadExpectedLatch, doneLatch, causeRef);
381 sb.childHandler(new ChannelInitializer<Channel>() {
382 @Override
383 protected void initChannel(Channel ch) throws Exception {
384 ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
385 }
386 });
387
388 cb.handler(new ChannelInitializer<Channel>() {
389 @Override
390 protected void initChannel(Channel ch) throws Exception {
391 ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
392 }
393 });
394
395 serverChannel = sb.bind().sync().channel();
396 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
397
398 doneLatch.await();
399 assertNull(causeRef.get());
400 assertTrue(leaderHandler.seenOutputShutdown);
401 } finally {
402 if (clientChannel != null) {
403 clientChannel.close().sync();
404 }
405 if (serverChannel != null) {
406 serverChannel.close().sync();
407 }
408 }
409 }
410
411 private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
412 private final int expectedBytes;
413 private final CountDownLatch followerCloseLatch;
414 private final CountDownLatch doneLatch;
415 private final AtomicReference<Throwable> causeRef;
416 private int bytesRead;
417
418 AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
419 AtomicReference<Throwable> causeRef) {
420 this.expectedBytes = expectedBytes;
421 this.followerCloseLatch = followerCloseLatch;
422 this.doneLatch = doneLatch;
423 this.causeRef = causeRef;
424 }
425
426 @Override
427 public void channelInactive(ChannelHandlerContext ctx) {
428 checkPrematureClose();
429 }
430
431 @Override
432 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
433 ctx.close();
434 checkPrematureClose();
435 }
436
437 @Override
438 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
439 bytesRead += msg.readableBytes();
440 if (bytesRead >= expectedBytes) {
441
442 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
443 buf.writerIndex(buf.writerIndex() + expectedBytes);
444 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
445 @Override
446 public void operationComplete(ChannelFuture future) throws Exception {
447 future.channel().close().addListener(new ChannelFutureListener() {
448 @Override
449 public void operationComplete(final ChannelFuture future) throws Exception {
450
451
452
453
454
455 future.channel().eventLoop().schedule(new Runnable() {
456 @Override
457 public void run() {
458 followerCloseLatch.countDown();
459 }
460 }, 200, TimeUnit.MILLISECONDS);
461 }
462 });
463 }
464 });
465 }
466 }
467
468 private void checkPrematureClose() {
469 if (bytesRead < expectedBytes) {
470 causeRef.set(new IllegalStateException("follower premature close"));
471 doneLatch.countDown();
472 }
473 }
474 }
475
476 private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
477 private final int expectedBytes;
478 private final CountDownLatch followerCloseLatch;
479 private final CountDownLatch doneLatch;
480 private final AtomicReference<Throwable> causeRef;
481 private int bytesRead;
482 boolean seenOutputShutdown;
483
484 AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
485 AtomicReference<Throwable> causeRef) {
486 this.expectedBytes = expectedBytes;
487 this.followerCloseLatch = followerCloseLatch;
488 this.doneLatch = doneLatch;
489 this.causeRef = causeRef;
490 }
491
492 @Override
493 public void channelActive(ChannelHandlerContext ctx) throws Exception {
494 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
495 buf.writerIndex(buf.writerIndex() + expectedBytes);
496 ctx.writeAndFlush(buf.retainedDuplicate());
497
498
499
500 followerCloseLatch.await();
501
502
503 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
504 @Override
505 public void operationComplete(ChannelFuture future) throws Exception {
506 if (future.cause() == null) {
507 causeRef.set(new IllegalStateException("second write should have failed!"));
508 doneLatch.countDown();
509 }
510 }
511 });
512 }
513
514 @Override
515 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
516 bytesRead += msg.readableBytes();
517 if (bytesRead >= expectedBytes) {
518 doneLatch.countDown();
519 }
520 }
521
522 @Override
523 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
524 if (evt instanceof ChannelOutputShutdownEvent) {
525 seenOutputShutdown = true;
526 doneLatch.countDown();
527 }
528 }
529
530 @Override
531 public void channelInactive(ChannelHandlerContext ctx) {
532 checkPrematureClose();
533 }
534
535 @Override
536 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
537 ctx.close();
538 checkPrematureClose();
539 }
540
541 private void checkPrematureClose() {
542 if (bytesRead < expectedBytes || !seenOutputShutdown) {
543 causeRef.set(new IllegalStateException("leader premature close"));
544 doneLatch.countDown();
545 }
546 }
547 }
548
549 @Test
550 public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
551 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
552 @Override
553 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
554 testAllDataReadClosure(serverBootstrap, bootstrap);
555 }
556 });
557 }
558
559 public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
560 testAllDataReadClosure(true, false, sb, cb);
561 testAllDataReadClosure(true, true, sb, cb);
562 testAllDataReadClosure(false, false, sb, cb);
563 testAllDataReadClosure(false, true, sb, cb);
564 }
565
566 private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
567 ServerBootstrap sb, Bootstrap cb) throws Throwable {
568 final int totalServerBytesWritten = 1024 * 16;
569 final int numReadsPerReadLoop = 2;
570 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
571 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
572 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
573 final AtomicInteger clientReadCompletes = new AtomicInteger();
574 Channel serverChannel = null;
575 Channel clientChannel = null;
576 try {
577 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
578 .option(ChannelOption.AUTO_READ, autoRead)
579 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
580
581 sb.childHandler(new ChannelInitializer<Channel>() {
582 @Override
583 protected void initChannel(Channel ch) throws Exception {
584 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
585 @Override
586 public void channelActive(ChannelHandlerContext ctx) throws Exception {
587 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
588 buf.writerIndex(buf.capacity());
589 ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
590 serverInitializedLatch.countDown();
591 }
592
593 @Override
594 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
595 ctx.close();
596 }
597 });
598 }
599 });
600
601 cb.handler(new ChannelInitializer<Channel>() {
602 @Override
603 protected void initChannel(Channel ch) throws Exception {
604 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
605 private int bytesRead;
606
607 @Override
608 public void channelRead(ChannelHandlerContext ctx, Object msg) {
609 ByteBuf buf = (ByteBuf) msg;
610 bytesRead += buf.readableBytes();
611 buf.release();
612 }
613
614 @Override
615 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
616 if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
617 clientHalfClosedLatch.countDown();
618 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
619 ctx.close();
620 }
621 }
622
623 @Override
624 public void channelInactive(ChannelHandlerContext ctx) {
625 if (!allowHalfClosed) {
626 clientHalfClosedLatch.countDown();
627 }
628 }
629
630 @Override
631 public void channelReadComplete(ChannelHandlerContext ctx) {
632 clientReadCompletes.incrementAndGet();
633 if (bytesRead == totalServerBytesWritten) {
634 clientReadAllDataLatch.countDown();
635 }
636 if (!autoRead) {
637 ctx.read();
638 }
639 }
640
641 @Override
642 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
643 ctx.close();
644 }
645 });
646 }
647 });
648
649 serverChannel = sb.bind().sync().channel();
650 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
651 clientChannel.read();
652
653 serverInitializedLatch.await();
654 clientReadAllDataLatch.await();
655 clientHalfClosedLatch.await();
656 assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
657 "too many read complete events: " + clientReadCompletes.get());
658 } finally {
659 if (clientChannel != null) {
660 clientChannel.close().sync();
661 }
662 if (serverChannel != null) {
663 serverChannel.close().sync();
664 }
665 }
666 }
667
668
669
670
671 private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
672 private final int numReads;
673 TestNumReadsRecvByteBufAllocator(int numReads) {
674 this.numReads = numReads;
675 }
676
677 @Override
678 public ExtendedHandle newHandle() {
679 return new ExtendedHandle() {
680 private int attemptedBytesRead;
681 private int lastBytesRead;
682 private int numMessagesRead;
683 @Override
684 public ByteBuf allocate(ByteBufAllocator alloc) {
685 return alloc.ioBuffer(guess(), guess());
686 }
687
688 @Override
689 public int guess() {
690 return 1;
691 }
692
693 @Override
694 public void reset(ChannelConfig config) {
695 numMessagesRead = 0;
696 }
697
698 @Override
699 public void incMessagesRead(int numMessages) {
700 numMessagesRead += numMessages;
701 }
702
703 @Override
704 public void lastBytesRead(int bytes) {
705 lastBytesRead = bytes;
706 }
707
708 @Override
709 public int lastBytesRead() {
710 return lastBytesRead;
711 }
712
713 @Override
714 public void attemptedBytesRead(int bytes) {
715 attemptedBytesRead = bytes;
716 }
717
718 @Override
719 public int attemptedBytesRead() {
720 return attemptedBytesRead;
721 }
722
723 @Override
724 public boolean continueReading() {
725 return numMessagesRead < numReads;
726 }
727
728 @Override
729 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
730 return continueReading() && maybeMoreDataSupplier.get();
731 }
732
733 @Override
734 public void readComplete() {
735
736 }
737 };
738 }
739 }
740 }