1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.unix.tests;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.EventLoopGroup;
28 import io.netty.channel.FixedRecvByteBufAllocator;
29 import io.netty.channel.ServerChannel;
30 import io.netty.channel.SimpleChannelInboundHandler;
31 import org.junit.jupiter.api.Test;
32 import org.junit.jupiter.api.Timeout;
33
34 import java.net.InetSocketAddress;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import static org.junit.jupiter.api.Assertions.assertEquals;
40
41 public abstract class DetectPeerCloseWithoutReadTest {
42 protected abstract EventLoopGroup newGroup();
43 protected abstract Class<? extends ServerChannel> serverChannel();
44 protected abstract Class<? extends Channel> clientChannel();
45
46 @Test
47 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48 public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws InterruptedException {
49 clientCloseWithoutServerReadIsDetected0(false);
50 }
51
52 @Test
53 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
54 public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws InterruptedException {
55 clientCloseWithoutServerReadIsDetected0(true);
56 }
57
58 private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested)
59 throws InterruptedException {
60 EventLoopGroup serverGroup = null;
61 EventLoopGroup clientGroup = null;
62 Channel serverChannel = null;
63 try {
64 final CountDownLatch latch = new CountDownLatch(1);
65 final AtomicInteger bytesRead = new AtomicInteger();
66 final int expectedBytes = 100;
67 serverGroup = newGroup();
68 clientGroup = newGroup();
69 ServerBootstrap sb = new ServerBootstrap();
70 sb.group(serverGroup);
71 sb.channel(serverChannel());
72
73
74 sb.childOption(ChannelOption.AUTO_READ, false);
75 sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
76 sb.childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
77 sb.childHandler(new ChannelInitializer<Channel>() {
78 @Override
79 protected void initChannel(Channel ch) {
80 ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
81 }
82 });
83
84 serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
85
86 Bootstrap cb = new Bootstrap();
87 cb.group(serverGroup);
88 cb.channel(clientChannel());
89 cb.handler(new ChannelInboundHandlerAdapter());
90 Channel clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
91 ByteBuf buf = clientChannel.alloc().buffer(expectedBytes);
92 buf.writerIndex(buf.writerIndex() + expectedBytes);
93 clientChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
94
95 latch.await();
96 assertEquals(expectedBytes, bytesRead.get());
97 } finally {
98 if (serverChannel != null) {
99 serverChannel.close().syncUninterruptibly();
100 }
101 if (serverGroup != null) {
102 serverGroup.shutdownGracefully();
103 }
104 if (clientGroup != null) {
105 clientGroup.shutdownGracefully();
106 }
107 }
108 }
109
110 @Test
111 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
112 public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws InterruptedException {
113 serverCloseWithoutClientReadIsDetected0(false);
114 }
115
116 @Test
117 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
118 public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws InterruptedException {
119 serverCloseWithoutClientReadIsDetected0(true);
120 }
121
122 private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws InterruptedException {
123 EventLoopGroup serverGroup = null;
124 EventLoopGroup clientGroup = null;
125 Channel serverChannel = null;
126 Channel clientChannel = null;
127 try {
128 final CountDownLatch latch = new CountDownLatch(1);
129 final AtomicInteger bytesRead = new AtomicInteger();
130 final int expectedBytes = 100;
131 serverGroup = newGroup();
132 clientGroup = newGroup();
133 ServerBootstrap sb = new ServerBootstrap();
134 sb.group(serverGroup);
135 sb.channel(serverChannel());
136 sb.childHandler(new ChannelInitializer<Channel>() {
137 @Override
138 protected void initChannel(Channel ch) {
139 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
140 @Override
141 public void channelActive(ChannelHandlerContext ctx) {
142 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
143 buf.writerIndex(buf.writerIndex() + expectedBytes);
144 ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
145 ctx.fireChannelActive();
146 }
147 });
148 }
149 });
150
151 serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
152
153 Bootstrap cb = new Bootstrap();
154 cb.group(serverGroup);
155 cb.channel(clientChannel());
156
157
158 cb.option(ChannelOption.AUTO_READ, false);
159 cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
160 cb.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(expectedBytes / 10));
161 cb.handler(new ChannelInitializer<Channel>() {
162 @Override
163 protected void initChannel(Channel ch) throws Exception {
164 ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
165 }
166 });
167 clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
168
169 latch.await();
170 assertEquals(expectedBytes, bytesRead.get());
171 } finally {
172 if (serverChannel != null) {
173 serverChannel.close().syncUninterruptibly();
174 }
175 if (clientChannel != null) {
176 clientChannel.close().syncUninterruptibly();
177 }
178 if (serverGroup != null) {
179 serverGroup.shutdownGracefully();
180 }
181 if (clientGroup != null) {
182 clientGroup.shutdownGracefully();
183 }
184 }
185 }
186
187 private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
188 private final AtomicInteger bytesRead;
189 private final boolean extraReadRequested;
190 private final CountDownLatch latch;
191
192 TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
193 this.bytesRead = bytesRead;
194 this.extraReadRequested = extraReadRequested;
195 this.latch = latch;
196 }
197
198 @Override
199 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
200 bytesRead.addAndGet(msg.readableBytes());
201
202 if (extraReadRequested) {
203
204 ctx.read();
205 }
206 }
207
208 @Override
209 public void channelInactive(ChannelHandlerContext ctx) {
210 latch.countDown();
211 ctx.fireChannelInactive();
212 }
213 }
214 }