1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelInboundHandlerAdapter;
27 import io.netty.channel.ChannelInitializer;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.UncheckedBooleanSupplier;
32 import org.junit.jupiter.api.Test;
33 import org.junit.jupiter.api.TestInfo;
34
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import static org.junit.jupiter.api.Assertions.assertEquals;
40 import static org.junit.jupiter.api.Assertions.assertTrue;
41
42 public class SocketAutoReadTest extends AbstractSocketTest {
43 @Test
44 public void testAutoReadOffDuringReadOnlyReadsOneTime(TestInfo testInfo) throws Throwable {
45 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
46 @Override
47 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
48 testAutoReadOffDuringReadOnlyReadsOneTime(serverBootstrap, bootstrap);
49 }
50 });
51 }
52
53 public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54 testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb);
55 testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb);
56 }
57
58 private static void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
59 ServerBootstrap sb, Bootstrap cb) throws Throwable {
60 Channel serverChannel = null;
61 Channel clientChannel = null;
62 try {
63 AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
64 AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
65 sb.option(ChannelOption.SO_BACKLOG, 1024)
66 .option(ChannelOption.AUTO_READ, true)
67 .childOption(ChannelOption.AUTO_READ, true)
68
69
70 .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
71 .childHandler(serverInitializer);
72
73 serverChannel = sb.bind().syncUninterruptibly().channel();
74
75 cb.option(ChannelOption.AUTO_READ, true)
76
77
78 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
79 .handler(clientInitializer);
80
81 clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
82
83
84 clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
85 serverInitializer.autoReadHandler.assertSingleRead();
86
87
88 serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
89 clientInitializer.autoReadHandler.assertSingleRead();
90
91 if (readOutsideEventLoopThread) {
92 serverInitializer.channel.read();
93 }
94 serverInitializer.autoReadHandler.assertSingleReadSecondTry();
95
96 if (readOutsideEventLoopThread) {
97 clientChannel.read();
98 }
99 clientInitializer.autoReadHandler.assertSingleReadSecondTry();
100 } finally {
101 if (clientChannel != null) {
102 clientChannel.close().sync();
103 }
104 if (serverChannel != null) {
105 serverChannel.close().sync();
106 }
107 }
108 }
109
110 private static class AutoReadInitializer extends ChannelInitializer<Channel> {
111 final AutoReadHandler autoReadHandler;
112 volatile Channel channel;
113
114 AutoReadInitializer(boolean readInEventLoop) {
115 autoReadHandler = new AutoReadHandler(readInEventLoop);
116 }
117
118 @Override
119 protected void initChannel(Channel ch) throws Exception {
120 channel = ch;
121 ch.pipeline().addLast(autoReadHandler);
122 }
123 }
124
125 private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
126 private final AtomicInteger count = new AtomicInteger();
127 private final CountDownLatch latch = new CountDownLatch(1);
128 private final CountDownLatch latch2;
129 private final boolean callRead;
130
131 AutoReadHandler(boolean callRead) {
132 this.callRead = callRead;
133 latch2 = new CountDownLatch(callRead ? 3 : 2);
134 }
135
136 @Override
137 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
138 ReferenceCountUtil.release(msg);
139 if (count.incrementAndGet() == 1) {
140 ctx.channel().config().setAutoRead(false);
141 }
142 if (callRead) {
143
144 ctx.read();
145 }
146 }
147
148 @Override
149 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
150 latch.countDown();
151 latch2.countDown();
152 }
153
154 void assertSingleRead() throws InterruptedException {
155 assertTrue(latch.await(5, TimeUnit.SECONDS));
156 assertTrue(count.get() > 0);
157 }
158
159 void assertSingleReadSecondTry() throws InterruptedException {
160 assertTrue(latch2.await(5, TimeUnit.SECONDS));
161 assertEquals(callRead ? 3 : 2, count.get());
162 }
163 }
164
165
166
167
168 private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
169 @Override
170 public ExtendedHandle newHandle() {
171 return new ExtendedHandle() {
172 private ChannelConfig config;
173 private int attemptedBytesRead;
174 private int lastBytesRead;
175 @Override
176 public ByteBuf allocate(ByteBufAllocator alloc) {
177 return alloc.ioBuffer(guess(), guess());
178 }
179
180 @Override
181 public int guess() {
182 return 1;
183 }
184
185 @Override
186 public void reset(ChannelConfig config) {
187 this.config = config;
188 }
189
190 @Override
191 public void incMessagesRead(int numMessages) {
192
193 }
194
195 @Override
196 public void lastBytesRead(int bytes) {
197 lastBytesRead = bytes;
198 }
199
200 @Override
201 public int lastBytesRead() {
202 return lastBytesRead;
203 }
204
205 @Override
206 public void attemptedBytesRead(int bytes) {
207 attemptedBytesRead = bytes;
208 }
209
210 @Override
211 public int attemptedBytesRead() {
212 return attemptedBytesRead;
213 }
214
215 @Override
216 public boolean continueReading() {
217 return config.isAutoRead();
218 }
219
220 @Override
221 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
222 return config.isAutoRead();
223 }
224
225 @Override
226 public void readComplete() {
227
228 }
229 };
230 }
231 }
232 }