查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.unix.tests;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.FixedRecvByteBufAllocator;
29  import io.netty.channel.ServerChannel;
30  import io.netty.channel.SimpleChannelInboundHandler;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.Timeout;
33  
34  import java.net.InetSocketAddress;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import static org.junit.jupiter.api.Assertions.assertEquals;
40  
41  public abstract class DetectPeerCloseWithoutReadTest {
42      protected abstract EventLoopGroup newGroup();
43      protected abstract Class<? extends ServerChannel> serverChannel();
44      protected abstract Class<? extends Channel> clientChannel();
45  
46      @Test
47      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48      public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws InterruptedException {
49          clientCloseWithoutServerReadIsDetected0(false);
50      }
51  
52      @Test
53      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
54      public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws InterruptedException {
55          clientCloseWithoutServerReadIsDetected0(true);
56      }
57  
58      private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested)
59              throws InterruptedException {
60          EventLoopGroup serverGroup = null;
61          EventLoopGroup clientGroup = null;
62          Channel serverChannel = null;
63          try {
64              final CountDownLatch latch = new CountDownLatch(1);
65              final AtomicInteger bytesRead = new AtomicInteger();
66              final int expectedBytes = 100;
67              serverGroup = newGroup();
68              clientGroup = newGroup();
69              ServerBootstrap sb = new ServerBootstrap();
70              sb.group(serverGroup);
71              sb.channel(serverChannel());
72              // Ensure we read only one message per read() call and that we need multiple read()
73              // calls to consume everything.
74              sb.childOption(ChannelOption.AUTO_READ, false);
75              sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
76              sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
77              sb.childHandler(new ChannelInitializer<Channel>() {
78                  @Override
79                  protected void initChannel(Channel ch) {
80                      ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
81                  }
82              });
83  
84              serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
85  
86              Bootstrap cb = new Bootstrap();
87              cb.group(serverGroup);
88              cb.channel(clientChannel());
89              cb.handler(new ChannelInboundHandlerAdapter());
90              Channel clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
91              ByteBuf buf = clientChannel.alloc().buffer(expectedBytes);
92              buf.writerIndex(buf.writerIndex() + expectedBytes);
93              clientChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
94  
95              latch.await();
96              assertEquals(expectedBytes, bytesRead.get());
97          } finally {
98              if (serverChannel != null) {
99                  serverChannel.close().syncUninterruptibly();
100             }
101             if (serverGroup != null) {
102                 serverGroup.shutdownGracefully();
103             }
104             if (clientGroup != null) {
105                 clientGroup.shutdownGracefully();
106             }
107         }
108     }
109 
110     @Test
111     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
112     public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws InterruptedException {
113         serverCloseWithoutClientReadIsDetected0(false);
114     }
115 
116     @Test
117     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
118     public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws InterruptedException {
119         serverCloseWithoutClientReadIsDetected0(true);
120     }
121 
122     private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws InterruptedException {
123         EventLoopGroup serverGroup = null;
124         EventLoopGroup clientGroup = null;
125         Channel serverChannel = null;
126         Channel clientChannel = null;
127         try {
128             final CountDownLatch latch = new CountDownLatch(1);
129             final AtomicInteger bytesRead = new AtomicInteger();
130             final int expectedBytes = 100;
131             serverGroup = newGroup();
132             clientGroup = newGroup();
133             ServerBootstrap sb = new ServerBootstrap();
134             sb.group(serverGroup);
135             sb.channel(serverChannel());
136             sb.childHandler(new ChannelInitializer<Channel>() {
137                 @Override
138                 protected void initChannel(Channel ch) {
139                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
140                         @Override
141                         public void channelActive(ChannelHandlerContext ctx) {
142                             ByteBuf buf = ctx.alloc().buffer(expectedBytes);
143                             buf.writerIndex(buf.writerIndex() + expectedBytes);
144                             ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
145                             ctx.fireChannelActive();
146                         }
147                     });
148                 }
149             });
150 
151             serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
152 
153             Bootstrap cb = new Bootstrap();
154             cb.group(serverGroup);
155             cb.channel(clientChannel());
156             // Ensure we read only one message per read() call and that we need multiple read()
157             // calls to consume everything.
158             cb.option(ChannelOption.AUTO_READ, false);
159             cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
160             cb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
161             cb.handler(new ChannelInitializer<Channel>() {
162                 @Override
163                 protected void initChannel(Channel ch) throws Exception {
164                     ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
165                 }
166             });
167             clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
168 
169             latch.await();
170             assertEquals(expectedBytes, bytesRead.get());
171         } finally {
172             if (serverChannel != null) {
173                 serverChannel.close().syncUninterruptibly();
174             }
175             if (clientChannel != null) {
176                 clientChannel.close().syncUninterruptibly();
177             }
178             if (serverGroup != null) {
179                 serverGroup.shutdownGracefully();
180             }
181             if (clientGroup != null) {
182                 clientGroup.shutdownGracefully();
183             }
184         }
185     }
186 
187     private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
188         private final AtomicInteger bytesRead;
189         private final boolean extraReadRequested;
190         private final CountDownLatch latch;
191 
192         TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
193             this.bytesRead = bytesRead;
194             this.extraReadRequested = extraReadRequested;
195             this.latch = latch;
196         }
197 
198         @Override
199         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
200             bytesRead.addAndGet(msg.readableBytes());
201 
202             if (extraReadRequested) {
203                 // Because autoread is off, we call read to consume all data until we detect the close.
204                 ctx.read();
205             }
206         }
207 
208         @Override
209         public void channelInactive(ChannelHandlerContext ctx) {
210             latch.countDown();
211             ctx.fireChannelInactive();
212         }
213     }
214 }