1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInboundHandlerAdapter;
23 import io.netty.channel.ChannelInitializer;
24 import io.netty.channel.ChannelOption;
25 import io.netty.handler.codec.serialization.ClassResolvers;
26 import io.netty.handler.codec.serialization.ObjectDecoder;
27 import io.netty.handler.codec.serialization.ObjectEncoder;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30
31 import java.io.IOException;
32 import java.util.Random;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import static org.junit.jupiter.api.Assertions.assertEquals;
36
37 public class SocketObjectEchoTest extends AbstractSocketTest {
38
39 static final Random random = new Random();
40 static final String[] data = new String[1024];
41
42 static {
43 for (int i = 0; i < data.length; i ++) {
44 int eLen = random.nextInt(512);
45 char[] e = new char[eLen];
46 for (int j = 0; j < eLen; j ++) {
47 e[j] = (char) ('a' + random.nextInt(26));
48 }
49
50 data[i] = new String(e);
51 }
52 }
53
54 @Test
55 public void testObjectEcho(TestInfo testInfo) throws Throwable {
56 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
57 @Override
58 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
59 testObjectEcho(serverBootstrap, bootstrap);
60 }
61 });
62 }
63
64 public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
65 testObjectEcho(sb, cb, true);
66 }
67
68 @Test
69 public void testObjectEchoNotAutoRead(TestInfo testInfo) throws Throwable {
70 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
71 @Override
72 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
73 testObjectEchoNotAutoRead(serverBootstrap, bootstrap);
74 }
75 });
76 }
77
78 public void testObjectEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
79 testObjectEcho(sb, cb, false);
80 }
81
82 private static void testObjectEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
83 sb.childOption(ChannelOption.AUTO_READ, autoRead);
84 cb.option(ChannelOption.AUTO_READ, autoRead);
85
86 final EchoHandler sh = new EchoHandler(autoRead);
87 final EchoHandler ch = new EchoHandler(autoRead);
88
89 sb.childHandler(new ChannelInitializer<Channel>() {
90 @Override
91 public void initChannel(Channel sch) throws Exception {
92 sch.pipeline().addLast(
93 new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
94 new ObjectEncoder(),
95 sh);
96 }
97 });
98
99 cb.handler(new ChannelInitializer<Channel>() {
100 @Override
101 public void initChannel(Channel sch) throws Exception {
102 sch.pipeline().addLast(
103 new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
104 new ObjectEncoder(),
105 ch);
106 }
107 });
108
109 Channel sc = sb.bind().sync().channel();
110 Channel cc = cb.connect(sc.localAddress()).sync().channel();
111 for (String element : data) {
112 cc.writeAndFlush(element);
113 }
114
115 while (ch.counter < data.length) {
116 if (sh.exception.get() != null) {
117 break;
118 }
119 if (ch.exception.get() != null) {
120 break;
121 }
122
123 Thread.sleep(50);
124 }
125
126 while (sh.counter < data.length) {
127 if (sh.exception.get() != null) {
128 break;
129 }
130 if (ch.exception.get() != null) {
131 break;
132 }
133
134 Thread.sleep(50);
135 }
136
137 sh.channel.close().sync();
138 ch.channel.close().sync();
139 sc.close().sync();
140
141 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
142 throw sh.exception.get();
143 }
144 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
145 throw ch.exception.get();
146 }
147 if (sh.exception.get() != null) {
148 throw sh.exception.get();
149 }
150 if (ch.exception.get() != null) {
151 throw ch.exception.get();
152 }
153 }
154
155 private static class EchoHandler extends ChannelInboundHandlerAdapter {
156 private final boolean autoRead;
157 volatile Channel channel;
158 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
159 volatile int counter;
160
161 EchoHandler(boolean autoRead) {
162 this.autoRead = autoRead;
163 }
164
165 @Override
166 public void channelActive(ChannelHandlerContext ctx)
167 throws Exception {
168 channel = ctx.channel();
169 if (!autoRead) {
170 ctx.read();
171 }
172 }
173
174 @Override
175 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
176 assertEquals(data[counter], msg);
177
178 if (channel.parent() != null) {
179 channel.write(msg);
180 }
181
182 counter ++;
183 }
184
185 @Override
186 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
187 try {
188 ctx.flush();
189 } finally {
190 if (!autoRead) {
191 ctx.read();
192 }
193 }
194 }
195
196 @Override
197 public void exceptionCaught(ChannelHandlerContext ctx,
198 Throwable cause) throws Exception {
199 if (exception.compareAndSet(null, cause)) {
200 ctx.close();
201 }
202 }
203 }
204 }