1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.ChannelOption;
26 import io.netty.channel.SimpleChannelInboundHandler;
27 import io.netty.handler.codec.FixedLengthFrameDecoder;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30
31 import java.io.IOException;
32 import java.util.Random;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import static org.junit.jupiter.api.Assertions.assertEquals;
36
37 public class SocketFixedLengthEchoTest extends AbstractSocketTest {
38
39 private static final Random random = new Random();
40 static final byte[] data = new byte[1048576];
41
42 static {
43 random.nextBytes(data);
44 }
45
46 @Test
47 public void testFixedLengthEcho(TestInfo testInfo) throws Throwable {
48 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
49 @Override
50 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
51 testFixedLengthEcho(serverBootstrap, bootstrap);
52 }
53 });
54 }
55
56 @Test
57 public void testFixedLengthEchoNotAutoRead(TestInfo testInfo) throws Throwable {
58 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
59 @Override
60 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
61 testFixedLengthEchoNotAutoRead(serverBootstrap, bootstrap);
62 }
63 });
64 }
65
66 public void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
67 testFixedLengthEcho(sb, cb, true);
68 }
69
70 public void testFixedLengthEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
71 testFixedLengthEcho(sb, cb, false);
72 }
73
74 private static void testFixedLengthEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
75 final EchoHandler sh = new EchoHandler(autoRead);
76 final EchoHandler ch = new EchoHandler(autoRead);
77
78 sb.childOption(ChannelOption.AUTO_READ, autoRead);
79 sb.childHandler(new ChannelInitializer<Channel>() {
80 @Override
81 public void initChannel(Channel sch) throws Exception {
82 sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
83 sch.pipeline().addAfter("decoder", "handler", sh);
84 }
85 });
86
87 cb.option(ChannelOption.AUTO_READ, autoRead);
88 cb.handler(new ChannelInitializer<Channel>() {
89 @Override
90 public void initChannel(Channel sch) throws Exception {
91 sch.pipeline().addLast("decoder", new FixedLengthFrameDecoder(1024));
92 sch.pipeline().addAfter("decoder", "handler", ch);
93 }
94 });
95
96 Channel sc = sb.bind().sync().channel();
97 Channel cc = cb.connect(sc.localAddress()).sync().channel();
98 for (int i = 0; i < data.length;) {
99 int length = Math.min(random.nextInt(1024 * 3), data.length - i);
100 cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length));
101 i += length;
102 }
103
104 while (ch.counter < data.length) {
105 if (sh.exception.get() != null) {
106 break;
107 }
108 if (ch.exception.get() != null) {
109 break;
110 }
111
112 Thread.sleep(50);
113 }
114
115 while (sh.counter < data.length) {
116 if (sh.exception.get() != null) {
117 break;
118 }
119 if (ch.exception.get() != null) {
120 break;
121 }
122
123 Thread.sleep(50);
124 }
125
126 sh.channel.close().sync();
127 ch.channel.close().sync();
128 sc.close().sync();
129
130 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
131 throw sh.exception.get();
132 }
133 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
134 throw ch.exception.get();
135 }
136 if (sh.exception.get() != null) {
137 throw sh.exception.get();
138 }
139 if (ch.exception.get() != null) {
140 throw ch.exception.get();
141 }
142 }
143
144 private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
145 private final boolean autoRead;
146 volatile Channel channel;
147 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
148 volatile int counter;
149
150 EchoHandler(boolean autoRead) {
151 this.autoRead = autoRead;
152 }
153
154 @Override
155 public void channelActive(ChannelHandlerContext ctx) throws Exception {
156 channel = ctx.channel();
157 if (!autoRead) {
158 ctx.read();
159 }
160 }
161
162 @Override
163 public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
164 assertEquals(1024, msg.readableBytes());
165
166 byte[] actual = new byte[msg.readableBytes()];
167 msg.getBytes(0, actual);
168
169 int lastIdx = counter;
170 for (int i = 0; i < actual.length; i ++) {
171 assertEquals(data[i + lastIdx], actual[i]);
172 }
173
174 if (channel.parent() != null) {
175 channel.write(msg.retain());
176 }
177
178 counter += actual.length;
179 }
180
181 @Override
182 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
183 try {
184 ctx.flush();
185 } finally {
186 if (!autoRead) {
187 ctx.read();
188 }
189 }
190 }
191
192 @Override
193 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
194 if (exception.compareAndSet(null, cause)) {
195 ctx.close();
196 }
197 }
198 }
199 }