1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.SimpleChannelInboundHandler;
28 import io.netty.util.concurrent.DefaultEventExecutorGroup;
29 import io.netty.util.concurrent.EventExecutorGroup;
30 import java.util.concurrent.TimeUnit;
31
32 import org.junit.jupiter.api.AfterAll;
33 import org.junit.jupiter.api.BeforeAll;
34 import org.junit.jupiter.api.Test;
35
36 import java.io.IOException;
37 import java.util.Random;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.jupiter.api.TestInfo;
40 import org.junit.jupiter.api.Timeout;
41
42 import static org.junit.jupiter.api.Assertions.assertEquals;
43 import static org.junit.jupiter.api.Assertions.assertNotEquals;
44
45 public class SocketEchoTest extends AbstractSocketTest {
46
47 private static final Random random = new Random();
48 static final byte[] data = new byte[1048576];
49
50 private static EventExecutorGroup group;
51
52 static {
53 random.nextBytes(data);
54 }
55
56 @BeforeAll
57 public static void createGroup() {
58 group = new DefaultEventExecutorGroup(2);
59 }
60
61 @AfterAll
62 public static void destroyGroup() throws Exception {
63 group.shutdownGracefully().sync();
64 }
65
66 @Test
67 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
68 public void testSimpleEcho(TestInfo testInfo) throws Throwable {
69 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
70 @Override
71 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
72 testSimpleEcho(serverBootstrap, bootstrap);
73 }
74 });
75 }
76
77 public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
78 testSimpleEcho0(sb, cb, false, false, true);
79 }
80
81 @Test
82 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
83 public void testSimpleEchoNotAutoRead(TestInfo testInfo) throws Throwable {
84 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
85 @Override
86 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
87 testSimpleEchoNotAutoRead(sb1, cb1);
88 }
89 });
90 }
91
92 public void testSimpleEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
93 testSimpleEcho0(sb, cb, false, false, false);
94 }
95
96 @Test
97 public void testSimpleEchoWithAdditionalExecutor(TestInfo testInfo) throws Throwable {
98 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
99 @Override
100 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
101 testSimpleEchoWithAdditionalExecutor(sb1, cb1);
102 }
103 });
104 }
105
106 public void testSimpleEchoWithAdditionalExecutor(ServerBootstrap sb, Bootstrap cb) throws Throwable {
107 testSimpleEcho0(sb, cb, true, false, true);
108 }
109
110 @Test
111 public void testSimpleEchoWithAdditionalExecutorNotAutoRead(TestInfo testInfo) throws Throwable {
112 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
113 @Override
114 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
115 testSimpleEchoWithAdditionalExecutorNotAutoRead(sb1, cb1);
116 }
117 });
118 }
119
120 public void testSimpleEchoWithAdditionalExecutorNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
121 testSimpleEcho0(sb, cb, true, false, false);
122 }
123
124 @Test
125 public void testSimpleEchoWithVoidPromise(TestInfo testInfo) throws Throwable {
126 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
127 @Override
128 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
129 testSimpleEchoWithVoidPromise(sb1, cb1);
130 }
131 });
132 }
133
134 public void testSimpleEchoWithVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
135 testSimpleEcho0(sb, cb, false, true, true);
136 }
137
138 @Test
139 public void testSimpleEchoWithVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
140 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
141 @Override
142 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
143 testSimpleEchoWithVoidPromiseNotAutoRead(sb1, cb1);
144 }
145 });
146 }
147
148 public void testSimpleEchoWithVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
149 testSimpleEcho0(sb, cb, false, true, false);
150 }
151
152 @Test
153 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
154 public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(TestInfo testInfo) throws Throwable {
155 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
156 @Override
157 public void run(ServerBootstrap sb1, Bootstrap cb1) throws Throwable {
158 testSimpleEchoWithAdditionalExecutorAndVoidPromise(sb1, cb1);
159 }
160 });
161 }
162
163 public void testSimpleEchoWithAdditionalExecutorAndVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
164 testSimpleEcho0(sb, cb, true, true, true);
165 }
166
167 private static void testSimpleEcho0(
168 ServerBootstrap sb, Bootstrap cb, boolean additionalExecutor, boolean voidPromise, boolean autoRead)
169 throws Throwable {
170
171 final EchoHandler sh = new EchoHandler(autoRead);
172 final EchoHandler ch = new EchoHandler(autoRead);
173
174 if (additionalExecutor) {
175 sb.childHandler(new ChannelInitializer<Channel>() {
176 @Override
177 protected void initChannel(Channel c) throws Exception {
178 c.pipeline().addLast(group, sh);
179 }
180 });
181 cb.handler(new ChannelInitializer<Channel>() {
182 @Override
183 protected void initChannel(Channel c) throws Exception {
184 c.pipeline().addLast(group, ch);
185 }
186 });
187 } else {
188 sb.childHandler(sh);
189 sb.handler(new ChannelInboundHandlerAdapter() {
190 @Override
191 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
192 cause.printStackTrace();
193 }
194 });
195 cb.handler(ch);
196 }
197 sb.childOption(ChannelOption.AUTO_READ, autoRead);
198 cb.option(ChannelOption.AUTO_READ, autoRead);
199
200 Channel sc = sb.bind().sync().channel();
201 Channel cc = cb.connect(sc.localAddress()).sync().channel();
202
203 for (int i = 0; i < data.length;) {
204 int length = Math.min(random.nextInt(1024 * 64), data.length - i);
205 ByteBuf buf = Unpooled.wrappedBuffer(data, i, length);
206 if (voidPromise) {
207 assertEquals(cc.voidPromise(), cc.writeAndFlush(buf, cc.voidPromise()));
208 } else {
209 assertNotEquals(cc.voidPromise(), cc.writeAndFlush(buf));
210 }
211 i += length;
212 }
213
214 while (ch.counter < data.length) {
215 if (sh.exception.get() != null) {
216 break;
217 }
218 if (ch.exception.get() != null) {
219 break;
220 }
221
222 Thread.sleep(50);
223 }
224
225 while (sh.counter < data.length) {
226 if (sh.exception.get() != null) {
227 break;
228 }
229 if (ch.exception.get() != null) {
230 break;
231 }
232
233 Thread.sleep(50);
234 }
235
236 sh.channel.close().sync();
237 ch.channel.close().sync();
238 sc.close().sync();
239
240 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
241 throw sh.exception.get();
242 }
243 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
244 throw ch.exception.get();
245 }
246 if (sh.exception.get() != null) {
247 throw sh.exception.get();
248 }
249 if (ch.exception.get() != null) {
250 throw ch.exception.get();
251 }
252 }
253
254 private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
255 private final boolean autoRead;
256 volatile Channel channel;
257 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
258 volatile int counter;
259
260 EchoHandler(boolean autoRead) {
261 this.autoRead = autoRead;
262 }
263
264 @Override
265 public void channelActive(ChannelHandlerContext ctx)
266 throws Exception {
267 channel = ctx.channel();
268 if (!autoRead) {
269 ctx.read();
270 }
271 }
272
273 @Override
274 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
275 byte[] actual = new byte[in.readableBytes()];
276 in.readBytes(actual);
277
278 int lastIdx = counter;
279 for (int i = 0; i < actual.length; i ++) {
280 assertEquals(data[i + lastIdx], actual[i]);
281 }
282
283 if (channel.parent() != null) {
284 channel.write(Unpooled.wrappedBuffer(actual));
285 }
286
287 counter += actual.length;
288 }
289
290 @Override
291 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
292 try {
293 ctx.flush();
294 } finally {
295 if (!autoRead) {
296 ctx.read();
297 }
298 }
299 }
300
301 @Override
302 public void exceptionCaught(ChannelHandlerContext ctx,
303 Throwable cause) throws Exception {
304 if (exception.compareAndSet(null, cause)) {
305 cause.printStackTrace();
306 ctx.close();
307 }
308 }
309 }
310 }