查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelInboundHandlerAdapter;
27  import io.netty.channel.ChannelInitializer;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.RecvByteBufAllocator;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.UncheckedBooleanSupplier;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import static org.junit.jupiter.api.Assertions.assertEquals;
40  import static org.junit.jupiter.api.Assertions.assertTrue;
41  
42  public class SocketAutoReadTest extends AbstractSocketTest {
43      @Test
44      public void testAutoReadOffDuringReadOnlyReadsOneTime(TestInfo testInfo) throws Throwable {
45          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
46              @Override
47              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
48                  testAutoReadOffDuringReadOnlyReadsOneTime(serverBootstrap, bootstrap);
49              }
50          });
51      }
52  
53      public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54          testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb);
55          testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb);
56      }
57  
58      private static void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
59                                                             ServerBootstrap sb, Bootstrap cb) throws Throwable {
60          Channel serverChannel = null;
61          Channel clientChannel = null;
62          try {
63              AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
64              AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
65              sb.option(ChannelOption.SO_BACKLOG, 1024)
66                      .option(ChannelOption.AUTO_READ, true)
67                      .childOption(ChannelOption.AUTO_READ, true)
68                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
69                      // test the auto read feature being turned off when data is first read.
70                      .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
71                      .childHandler(serverInitializer);
72  
73              serverChannel = sb.bind().syncUninterruptibly().channel();
74  
75              cb.option(ChannelOption.AUTO_READ, true)
76                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
77                      // test the auto read feature being turned off when data is first read.
78                      .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
79                      .handler(clientInitializer);
80  
81              clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
82  
83              // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
84              clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
85              serverInitializer.autoReadHandler.assertSingleRead();
86  
87              // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
88              serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3]));
89              clientInitializer.autoReadHandler.assertSingleRead();
90  
91              if (readOutsideEventLoopThread) {
92                  serverInitializer.channel.read();
93              }
94              serverInitializer.autoReadHandler.assertSingleReadSecondTry();
95  
96              if (readOutsideEventLoopThread) {
97                  clientChannel.read();
98              }
99              clientInitializer.autoReadHandler.assertSingleReadSecondTry();
100         } finally {
101             if (clientChannel != null) {
102                 clientChannel.close().sync();
103             }
104             if (serverChannel != null) {
105                 serverChannel.close().sync();
106             }
107         }
108     }
109 
110     private static class AutoReadInitializer extends ChannelInitializer<Channel> {
111         final AutoReadHandler autoReadHandler;
112         volatile Channel channel;
113 
114         AutoReadInitializer(boolean readInEventLoop) {
115             autoReadHandler = new AutoReadHandler(readInEventLoop);
116         }
117 
118         @Override
119         protected void initChannel(Channel ch) throws Exception {
120             channel = ch;
121             ch.pipeline().addLast(autoReadHandler);
122         }
123     }
124 
125     private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
126         private final AtomicInteger count = new AtomicInteger();
127         private final CountDownLatch latch = new CountDownLatch(1);
128         private final CountDownLatch latch2;
129         private final boolean callRead;
130 
131         AutoReadHandler(boolean callRead) {
132             this.callRead = callRead;
133             latch2 = new CountDownLatch(callRead ? 3 : 2);
134         }
135 
136         @Override
137         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
138             ReferenceCountUtil.release(msg);
139             if (count.incrementAndGet() == 1) {
140                 ctx.channel().config().setAutoRead(false);
141             }
142             if (callRead) {
143                 // Test calling read in the EventLoop thread to ensure a read is eventually done.
144                 ctx.read();
145             }
146         }
147 
148         @Override
149         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
150             latch.countDown();
151             latch2.countDown();
152         }
153 
154         void assertSingleRead() throws InterruptedException {
155             assertTrue(latch.await(5, TimeUnit.SECONDS));
156             assertTrue(count.get() > 0);
157         }
158 
159         void assertSingleReadSecondTry() throws InterruptedException {
160             assertTrue(latch2.await(5, TimeUnit.SECONDS));
161             assertEquals(callRead ? 3 : 2, count.get());
162         }
163     }
164 
165     /**
166      * Designed to keep reading as long as autoread is enabled.
167      */
168     private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
169         @Override
170         public ExtendedHandle newHandle() {
171             return new ExtendedHandle() {
172                 private ChannelConfig config;
173                 private int attemptedBytesRead;
174                 private int lastBytesRead;
175                 @Override
176                 public ByteBuf allocate(ByteBufAllocator alloc) {
177                     return alloc.ioBuffer(guess(), guess());
178                 }
179 
180                 @Override
181                 public int guess() {
182                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
183                 }
184 
185                 @Override
186                 public void reset(ChannelConfig config) {
187                     this.config = config;
188                 }
189 
190                 @Override
191                 public void incMessagesRead(int numMessages) {
192                     // No need to track the number of messages read because it is not used.
193                 }
194 
195                 @Override
196                 public void lastBytesRead(int bytes) {
197                     lastBytesRead = bytes;
198                 }
199 
200                 @Override
201                 public int lastBytesRead() {
202                     return lastBytesRead;
203                 }
204 
205                 @Override
206                 public void attemptedBytesRead(int bytes) {
207                     attemptedBytesRead = bytes;
208                 }
209 
210                 @Override
211                 public int attemptedBytesRead() {
212                     return attemptedBytesRead;
213                 }
214 
215                 @Override
216                 public boolean continueReading() {
217                     return config.isAutoRead();
218                 }
219 
220                 @Override
221                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
222                     return config.isAutoRead();
223                 }
224 
225                 @Override
226                 public void readComplete() {
227                     // Nothing needs to be done or adjusted after each read cycle is completed.
228                 }
229             };
230         }
231     }
232 }