1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.udt;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.channel.group.ChannelGroup;
26 import io.netty.channel.group.DefaultChannelGroup;
27 import io.netty.channel.nio.NioEventLoopGroup;
28 import io.netty.channel.udt.UdtChannel;
29 import io.netty.channel.udt.nio.NioUdtProvider;
30 import io.netty.handler.codec.DelimiterBasedFrameDecoder;
31 import io.netty.handler.codec.Delimiters;
32 import io.netty.handler.codec.string.StringDecoder;
33 import io.netty.handler.codec.string.StringEncoder;
34 import io.netty.util.CharsetUtil;
35 import io.netty.util.NetUtil;
36 import io.netty.util.concurrent.DefaultThreadFactory;
37 import io.netty.util.concurrent.GlobalEventExecutor;
38 import io.netty.util.internal.PlatformDependent;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.Test;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.net.InetSocketAddress;
45 import java.util.concurrent.ThreadFactory;
46
47 import static org.junit.jupiter.api.Assertions.assertEquals;
48 import static org.junit.jupiter.api.Assertions.assertTrue;
49 import static org.junit.jupiter.api.Assumptions.assumeFalse;
50 import static org.junit.jupiter.api.Assumptions.assumeTrue;
51
52
53
54
55 public class UDTClientServerConnectionTest {
56
57 static class Client implements Runnable {
58
59 static final Logger log = LoggerFactory.getLogger(Client.class);
60
61 private final InetSocketAddress address;
62
63 volatile Channel channel;
64 volatile boolean isRunning;
65 volatile boolean isShutdown;
66
67 Client(InetSocketAddress address) {
68 this.address = address;
69 }
70
71 @Override
72 public void run() {
73 final Bootstrap boot = new Bootstrap();
74 final ThreadFactory clientFactory = new DefaultThreadFactory("client");
75 final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
76 clientFactory, NioUdtProvider.BYTE_PROVIDER);
77 try {
78 boot.group(connectGroup)
79 .channelFactory(NioUdtProvider.BYTE_CONNECTOR)
80 .handler(new ChannelInitializer<UdtChannel>() {
81
82 @Override
83 protected void initChannel(final UdtChannel ch)
84 throws Exception {
85 final ChannelPipeline pipeline = ch.pipeline();
86 pipeline.addLast("framer",
87 new DelimiterBasedFrameDecoder(8192,
88 Delimiters.lineDelimiter()));
89 pipeline.addLast("decoder", new StringDecoder(
90 CharsetUtil.UTF_8));
91 pipeline.addLast("encoder", new StringEncoder(
92 CharsetUtil.UTF_8));
93 pipeline.addLast("handler", new ClientHandler());
94 }
95 });
96 channel = boot.connect(address).sync().channel();
97 isRunning = true;
98 log.info("Client ready.");
99 waitForRunning(false);
100 log.info("Client closing...");
101 channel.close().sync();
102 isShutdown = true;
103 log.info("Client is done.");
104 } catch (final Throwable e) {
105 log.error("Client failed.", e);
106 } finally {
107 connectGroup.shutdownGracefully().syncUninterruptibly();
108 }
109 }
110
111 void shutdown() {
112 isRunning = false;
113 }
114
115 void waitForActive(final boolean isActive) throws Exception {
116 for (int k = 0; k < WAIT_COUNT; k++) {
117 Thread.sleep(WAIT_SLEEP);
118 final ClientHandler handler = channel.pipeline().get(
119 ClientHandler.class);
120 if (handler != null && isActive == handler.isActive) {
121 return;
122 }
123 }
124 }
125
126 void waitForRunning(final boolean isRunning) throws Exception {
127 for (int k = 0; k < WAIT_COUNT; k++) {
128 if (isRunning == this.isRunning) {
129 return;
130 }
131 Thread.sleep(WAIT_SLEEP);
132 }
133 }
134
135 private void waitForShutdown() throws Exception {
136 for (int k = 0; k < WAIT_COUNT; k++) {
137 if (isShutdown) {
138 return;
139 }
140 Thread.sleep(WAIT_SLEEP);
141 }
142 }
143 }
144
145 static class ClientHandler extends SimpleChannelInboundHandler<Object> {
146
147 static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
148
149 volatile boolean isActive;
150
151 @Override
152 public void channelActive(final ChannelHandlerContext ctx)
153 throws Exception {
154 isActive = true;
155 log.info("Client active {}", ctx.channel());
156 super.channelActive(ctx);
157 }
158
159 @Override
160 public void channelInactive(final ChannelHandlerContext ctx)
161 throws Exception {
162 isActive = false;
163 log.info("Client inactive {}", ctx.channel());
164 super.channelInactive(ctx);
165 }
166
167 @Override
168 public void exceptionCaught(final ChannelHandlerContext ctx,
169 final Throwable cause) throws Exception {
170 log.warn("Client unexpected exception from downstream.", cause);
171 ctx.close();
172 }
173
174 @Override
175 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
176 log.info("Client received: " + msg);
177 }
178 }
179
180 static class Server implements Runnable {
181
182 static final Logger log = LoggerFactory.getLogger(Server.class);
183
184 final ChannelGroup group = new DefaultChannelGroup("server group", GlobalEventExecutor.INSTANCE);
185
186 private final InetSocketAddress address;
187
188 volatile Channel channel;
189 volatile boolean isRunning;
190 volatile boolean isShutdown;
191
192 Server(InetSocketAddress address) {
193 this.address = address;
194 }
195
196 @Override
197 public void run() {
198 final ServerBootstrap boot = new ServerBootstrap();
199 final ThreadFactory acceptFactory = new DefaultThreadFactory("accept");
200 final ThreadFactory serverFactory = new DefaultThreadFactory("server");
201 final NioEventLoopGroup acceptGroup = new NioEventLoopGroup(1,
202 acceptFactory, NioUdtProvider.BYTE_PROVIDER);
203 final NioEventLoopGroup connectGroup = new NioEventLoopGroup(1,
204 serverFactory, NioUdtProvider.BYTE_PROVIDER);
205 try {
206 boot.group(acceptGroup, connectGroup)
207 .channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
208 .childHandler(new ChannelInitializer<UdtChannel>() {
209 @Override
210 protected void initChannel(final UdtChannel ch)
211 throws Exception {
212 final ChannelPipeline pipeline = ch.pipeline();
213 pipeline.addLast("framer",
214 new DelimiterBasedFrameDecoder(8192,
215 Delimiters.lineDelimiter()));
216 pipeline.addLast("decoder", new StringDecoder(
217 CharsetUtil.UTF_8));
218 pipeline.addLast("encoder", new StringEncoder(
219 CharsetUtil.UTF_8));
220 pipeline.addLast("handler", new ServerHandler(
221 group));
222 }
223 });
224 channel = boot.bind(address).sync().channel();
225 isRunning = true;
226 log.info("Server ready.");
227 waitForRunning(false);
228 log.info("Server closing acceptor...");
229 channel.close().sync();
230 log.info("Server closing connectors...");
231 group.close().sync();
232 isShutdown = true;
233 log.info("Server is done.");
234 } catch (final Throwable e) {
235 log.error("Server failure.", e);
236 } finally {
237 acceptGroup.shutdownGracefully();
238 connectGroup.shutdownGracefully();
239
240 acceptGroup.terminationFuture().syncUninterruptibly();
241 connectGroup.terminationFuture().syncUninterruptibly();
242 }
243 }
244
245 void shutdown() {
246 isRunning = false;
247 }
248
249 void waitForActive(final boolean isActive) throws Exception {
250 for (int k = 0; k < WAIT_COUNT; k++) {
251 Thread.sleep(WAIT_SLEEP);
252 if (isActive) {
253 for (final Channel channel : group) {
254 final ServerHandler handler = channel.pipeline().get(
255 ServerHandler.class);
256 if (handler != null && handler.isActive) {
257 return;
258 }
259 }
260 } else {
261 if (group.isEmpty()) {
262 return;
263 }
264 }
265 }
266 }
267
268 void waitForRunning(final boolean isRunning) throws Exception {
269 for (int k = 0; k < WAIT_COUNT; k++) {
270 if (isRunning == this.isRunning) {
271 return;
272 }
273 Thread.sleep(WAIT_SLEEP);
274 }
275 }
276
277 void waitForShutdown() throws Exception {
278 for (int k = 0; k < WAIT_COUNT; k++) {
279 if (isShutdown) {
280 return;
281 }
282 Thread.sleep(WAIT_SLEEP);
283 }
284 }
285 }
286
287 static class ServerHandler extends
288 SimpleChannelInboundHandler<Object> {
289
290 static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
291
292 final ChannelGroup group;
293
294 volatile boolean isActive;
295
296 ServerHandler(final ChannelGroup group) {
297 this.group = group;
298 }
299
300 @Override
301 public void channelActive(final ChannelHandlerContext ctx)
302 throws Exception {
303 group.add(ctx.channel());
304 isActive = true;
305 log.info("Server active : {}", ctx.channel());
306 super.channelActive(ctx);
307 }
308
309 @Override
310 public void channelInactive(final ChannelHandlerContext ctx)
311 throws Exception {
312 group.remove(ctx.channel());
313 isActive = false;
314 log.info("Server inactive: {}", ctx.channel());
315 super.channelInactive(ctx);
316 }
317
318 @Override
319 public void exceptionCaught(final ChannelHandlerContext ctx,
320 final Throwable cause) {
321 log.warn("Server close on exception.", cause);
322 ctx.close();
323 }
324
325 @Override
326 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
327 log.info("Server received: " + msg);
328 }
329 }
330 static final Logger log = LoggerFactory
331 .getLogger(UDTClientServerConnectionTest.class);
332
333
334
335
336
337
338 static final int WAIT_COUNT = 50;
339 static final int WAIT_SLEEP = 100;
340
341 @BeforeAll
342 public static void assumeUdt() {
343 assumeTrue(canLoadAndInit(), "com.barchart.udt.SocketUDT can not be loaded and initialized");
344 assumeFalse(PlatformDependent.isJ9Jvm(), "Not supported on J9 JVM");
345 }
346
347 private static boolean canLoadAndInit() {
348 try {
349 Class.forName("com.barchart.udt.SocketUDT", true,
350 UDTClientServerConnectionTest.class.getClassLoader());
351 return true;
352 } catch (Throwable e) {
353 return false;
354 }
355 }
356
357
358
359
360 @Test
361 public void connection() throws Exception {
362 log.info("Starting server.");
363
364 final Server server = new Server(new InetSocketAddress(NetUtil.LOCALHOST4, 0));
365 final Thread serverTread = new Thread(server, "server-*");
366 serverTread.start();
367 server.waitForRunning(true);
368 assertTrue(server.isRunning);
369
370 log.info("Starting client.");
371 final Client client = new Client((InetSocketAddress) server.channel.localAddress());
372 final Thread clientThread = new Thread(client, "client-*");
373 clientThread.start();
374 client.waitForRunning(true);
375 assertTrue(client.isRunning);
376
377 log.info("Wait till connection is active.");
378 client.waitForActive(true);
379 server.waitForActive(true);
380
381 log.info("Verify connection is active.");
382 assertEquals(1, server.group.size(), "group must have one");
383
384 log.info("Stopping client.");
385 client.shutdown();
386 client.waitForShutdown();
387 assertTrue(client.isShutdown);
388
389 log.info("Wait till connection is inactive.");
390 client.waitForActive(false);
391 server.waitForActive(false);
392
393 log.info("Verify connection is inactive.");
394 assertEquals(0, server.group.size(), "group must be empty");
395
396 log.info("Stopping server.");
397 server.shutdown();
398 server.waitForShutdown();
399 assertTrue(server.isShutdown);
400
401 log.info("Finished server.");
402 }
403
404 }