查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInitializer;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.handler.codec.DelimiterBasedFrameDecoder;
26  import io.netty.handler.codec.Delimiters;
27  import io.netty.handler.codec.string.StringDecoder;
28  import io.netty.handler.codec.string.StringEncoder;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.ImmediateEventExecutor;
31  import io.netty.util.concurrent.Promise;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  import org.junit.jupiter.api.Timeout;
35  
36  import java.io.IOException;
37  import java.util.Random;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  public class SocketStringEchoTest extends AbstractSocketTest {
42  
43      static final Random random = new Random();
44      static final String[] data = new String[1024];
45  
46      static {
47          for (int i = 0; i < data.length; i ++) {
48              int eLen = random.nextInt(512);
49              char[] e = new char[eLen];
50              for (int j = 0; j < eLen; j ++) {
51                  e[j] = (char) ('a' + random.nextInt(26));
52              }
53  
54              data[i] = new String(e);
55          }
56      }
57  
58      @Test
59      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
60      public void testStringEcho(TestInfo testInfo) throws Throwable {
61          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
62              @Override
63              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
64                  testStringEcho(serverBootstrap, bootstrap);
65              }
66          });
67      }
68  
69      public void testStringEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
70          testStringEcho(sb, cb, true);
71      }
72  
73      @Test
74      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
75      public void testStringEchoNotAutoRead(TestInfo testInfo) throws Throwable {
76          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
77              @Override
78              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
79                  testStringEchoNotAutoRead(serverBootstrap, bootstrap);
80              }
81          });
82      }
83  
84      public void testStringEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
85          testStringEcho(sb, cb, false);
86      }
87  
88      private static void testStringEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
89          sb.childOption(ChannelOption.AUTO_READ, autoRead);
90          cb.option(ChannelOption.AUTO_READ, autoRead);
91  
92          Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
93          Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
94          final StringEchoHandler sh = new StringEchoHandler(autoRead, serverDonePromise);
95          final StringEchoHandler ch = new StringEchoHandler(autoRead, clientDonePromise);
96  
97          sb.childHandler(new ChannelInitializer<Channel>() {
98              @Override
99              public void initChannel(Channel sch) throws Exception {
100                 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
101                 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
102                 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
103                 sch.pipeline().addAfter("decoder", "handler", sh);
104             }
105         });
106 
107         cb.handler(new ChannelInitializer<Channel>() {
108             @Override
109             public void initChannel(Channel sch) throws Exception {
110                 sch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(512, Delimiters.lineDelimiter()));
111                 sch.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.ISO_8859_1));
112                 sch.pipeline().addBefore("decoder", "encoder", new StringEncoder(CharsetUtil.ISO_8859_1));
113                 sch.pipeline().addAfter("decoder", "handler", ch);
114             }
115         });
116 
117         Channel sc = sb.bind().sync().channel();
118         Channel cc = cb.connect(sc.localAddress()).sync().channel();
119         for (String element : data) {
120             String delimiter = random.nextBoolean() ? "\r\n" : "\n";
121             cc.writeAndFlush(element + delimiter);
122         }
123 
124         ch.donePromise.sync();
125         sh.donePromise.sync();
126         sh.channel.close().sync();
127         ch.channel.close().sync();
128         sc.close().sync();
129 
130         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
131             throw sh.exception.get();
132         }
133         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
134             throw ch.exception.get();
135         }
136         if (sh.exception.get() != null) {
137             throw sh.exception.get();
138         }
139         if (ch.exception.get() != null) {
140             throw ch.exception.get();
141         }
142     }
143 
144     static class StringEchoHandler extends SimpleChannelInboundHandler<String> {
145         private final boolean autoRead;
146         private final Promise<Void> donePromise;
147         private int dataIndex;
148         volatile Channel channel;
149         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
150 
151         StringEchoHandler(boolean autoRead, Promise<Void> donePromise) {
152             this.autoRead = autoRead;
153             this.donePromise = donePromise;
154         }
155 
156         @Override
157         public void channelActive(ChannelHandlerContext ctx) throws Exception {
158             channel = ctx.channel();
159             if (!autoRead) {
160                 ctx.read();
161             }
162         }
163 
164         @Override
165         public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
166             if (!data[dataIndex].equals(msg)) {
167                 donePromise.tryFailure(new IllegalStateException("index: " + dataIndex + " didn't match!"));
168                 ctx.close();
169                 return;
170             }
171 
172             if (channel.parent() != null) {
173                 String delimiter = random.nextBoolean() ? "\r\n" : "\n";
174                 channel.write(msg + delimiter);
175             }
176 
177             if (++dataIndex >= data.length) {
178                 donePromise.setSuccess(null);
179             }
180         }
181 
182         @Override
183         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
184             try {
185                 ctx.flush();
186             } finally {
187                 if (!autoRead) {
188                     ctx.read();
189                 }
190             }
191         }
192 
193         @Override
194         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
195             if (exception.compareAndSet(null, cause)) {
196                 donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
197                 ctx.close();
198             }
199         }
200 
201         @Override
202         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
203             donePromise.tryFailure(new IllegalStateException("channelInactive: " + ctx.channel()));
204         }
205     }
206 }