1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.sctp;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import io.netty.channel.sctp.SctpChannel;
27 import io.netty.handler.codec.sctp.SctpInboundByteStreamHandler;
28 import io.netty.handler.codec.sctp.SctpMessageCompletionHandler;
29 import io.netty.handler.codec.sctp.SctpOutboundByteStreamHandler;
30 import io.netty.testsuite.util.TestUtils;
31 import org.junit.jupiter.api.Test;
32
33 import java.io.IOException;
34 import java.util.Random;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.junit.jupiter.api.TestInfo;
37
38 import static org.junit.jupiter.api.Assertions.assertEquals;
39 import static org.junit.jupiter.api.Assumptions.assumeTrue;
40
41 public class SctpEchoTest extends AbstractSctpTest {
42
43 private static final Random random = new Random();
44 static final byte[] data = new byte[4096];
45
46 static {
47 random.nextBytes(data);
48 }
49
50 @Test
51 public void testSimpleEcho(TestInfo testInfo) throws Throwable {
52 assumeTrue(TestUtils.isSctpSupported());
53 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
54 @Override
55 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
56 testSimpleEcho(serverBootstrap, bootstrap);
57 }
58 });
59 }
60
61 public void testSimpleEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
62 testSimpleEcho0(sb, cb, false);
63 }
64
65 @Test
66 public void testSimpleEchoUnordered(TestInfo testInfo) throws Throwable {
67 assumeTrue(TestUtils.isSctpSupported());
68 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
69 @Override
70 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
71 testSimpleEchoUnordered(serverBootstrap, bootstrap);
72 }
73 });
74 }
75
76 public void testSimpleEchoUnordered(ServerBootstrap sb, Bootstrap cb) throws Throwable {
77 testSimpleEcho0(sb, cb, true);
78 }
79
80 private static void testSimpleEcho0(ServerBootstrap sb, Bootstrap cb, final boolean unordered) throws Throwable {
81 final EchoHandler sh = new EchoHandler();
82 final EchoHandler ch = new EchoHandler();
83
84 sb.childHandler(new ChannelInitializer<SctpChannel>() {
85 @Override
86 public void initChannel(SctpChannel c) throws Exception {
87 c.pipeline().addLast(
88 new SctpMessageCompletionHandler(),
89 new SctpInboundByteStreamHandler(0, 0),
90 new SctpOutboundByteStreamHandler(0, 0, unordered),
91 sh);
92 }
93 });
94 cb.handler(new ChannelInitializer<SctpChannel>() {
95 @Override
96 public void initChannel(SctpChannel c) throws Exception {
97 c.pipeline().addLast(
98 new SctpMessageCompletionHandler(),
99 new SctpInboundByteStreamHandler(0, 0),
100 new SctpOutboundByteStreamHandler(0, 0, unordered),
101 ch);
102 }
103 });
104
105 Channel sc = sb.bind().sync().channel();
106 Channel cc = cb.connect(sc.localAddress()).sync().channel();
107
108 for (int i = 0; i < data.length;) {
109 int length = Math.min(random.nextInt(1024 * 64), data.length - i);
110 cc.writeAndFlush(Unpooled.wrappedBuffer(data, i, length));
111 i += length;
112 }
113
114 while (ch.counter < data.length) {
115 if (sh.exception.get() != null) {
116 break;
117 }
118 if (ch.exception.get() != null) {
119 break;
120 }
121
122 Thread.sleep(50);
123 }
124
125 while (sh.counter < data.length) {
126 if (sh.exception.get() != null) {
127 break;
128 }
129 if (ch.exception.get() != null) {
130 break;
131 }
132
133 Thread.sleep(50);
134 }
135
136 sh.channel.close().sync();
137 ch.channel.close().sync();
138 sc.close().sync();
139
140 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
141 throw sh.exception.get();
142 }
143 if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
144 throw ch.exception.get();
145 }
146 if (sh.exception.get() != null) {
147 throw sh.exception.get();
148 }
149 if (ch.exception.get() != null) {
150 throw ch.exception.get();
151 }
152 }
153
154 private static class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
155 volatile Channel channel;
156 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
157 volatile int counter;
158
159 @Override
160 public void channelActive(ChannelHandlerContext ctx) throws Exception {
161 channel = ctx.channel();
162 }
163
164 @Override
165 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
166 byte[] actual = new byte[in.readableBytes()];
167 in.readBytes(actual);
168
169 int lastIdx = counter;
170 for (int i = 0; i < actual.length; i++) {
171 assertEquals(data[i + lastIdx], actual[i]);
172 }
173
174 if (channel.parent() != null) {
175 channel.writeAndFlush(Unpooled.wrappedBuffer(actual));
176 }
177
178 counter += actual.length;
179 }
180
181 @Override
182 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
183 if (exception.compareAndSet(null, cause)) {
184 ctx.close();
185 }
186 }
187 }
188 }