查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandlerAdapter;
23  import io.netty.channel.ChannelInitializer;
24  import io.netty.channel.ChannelOption;
25  import io.netty.handler.codec.serialization.ClassResolvers;
26  import io.netty.handler.codec.serialization.ObjectDecoder;
27  import io.netty.handler.codec.serialization.ObjectEncoder;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  
31  import java.io.IOException;
32  import java.util.Random;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import static org.junit.jupiter.api.Assertions.assertEquals;
36  
37  public class SocketObjectEchoTest extends AbstractSocketTest {
38  
39      static final Random random = new Random();
40      static final String[] data = new String[1024];
41  
42      static {
43          for (int i = 0; i < data.length; i ++) {
44              int eLen = random.nextInt(512);
45              char[] e = new char[eLen];
46              for (int j = 0; j < eLen; j ++) {
47                  e[j] = (char) ('a' + random.nextInt(26));
48              }
49  
50              data[i] = new String(e);
51          }
52      }
53  
54      @Test
55      public void testObjectEcho(TestInfo testInfo) throws Throwable {
56          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
57              @Override
58              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
59                  testObjectEcho(serverBootstrap, bootstrap);
60              }
61          });
62      }
63  
64      public void testObjectEcho(ServerBootstrap sb, Bootstrap cb) throws Throwable {
65          testObjectEcho(sb, cb, true);
66      }
67  
68      @Test
69      public void testObjectEchoNotAutoRead(TestInfo testInfo) throws Throwable {
70          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
71              @Override
72              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
73                  testObjectEchoNotAutoRead(serverBootstrap, bootstrap);
74              }
75          });
76      }
77  
78      public void testObjectEchoNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
79          testObjectEcho(sb, cb, false);
80      }
81  
82      private static void testObjectEcho(ServerBootstrap sb, Bootstrap cb, boolean autoRead) throws Throwable {
83          sb.childOption(ChannelOption.AUTO_READ, autoRead);
84          cb.option(ChannelOption.AUTO_READ, autoRead);
85  
86          final EchoHandler sh = new EchoHandler(autoRead);
87          final EchoHandler ch = new EchoHandler(autoRead);
88  
89          sb.childHandler(new ChannelInitializer<Channel>() {
90              @Override
91              public void initChannel(Channel sch) throws Exception {
92                  sch.pipeline().addLast(
93                          new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
94                          new ObjectEncoder(),
95                          sh);
96              }
97          });
98  
99          cb.handler(new ChannelInitializer<Channel>() {
100             @Override
101             public void initChannel(Channel sch) throws Exception {
102                 sch.pipeline().addLast(
103                         new ObjectDecoder(ClassResolvers.cacheDisabled(getClass().getClassLoader())),
104                         new ObjectEncoder(),
105                         ch);
106             }
107         });
108 
109         Channel sc = sb.bind().sync().channel();
110         Channel cc = cb.connect(sc.localAddress()).sync().channel();
111         for (String element : data) {
112             cc.writeAndFlush(element);
113         }
114 
115         while (ch.counter < data.length) {
116             if (sh.exception.get() != null) {
117                 break;
118             }
119             if (ch.exception.get() != null) {
120                 break;
121             }
122 
123             Thread.sleep(50);
124         }
125 
126         while (sh.counter < data.length) {
127             if (sh.exception.get() != null) {
128                 break;
129             }
130             if (ch.exception.get() != null) {
131                 break;
132             }
133 
134             Thread.sleep(50);
135         }
136 
137         sh.channel.close().sync();
138         ch.channel.close().sync();
139         sc.close().sync();
140 
141         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
142             throw sh.exception.get();
143         }
144         if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) {
145             throw ch.exception.get();
146         }
147         if (sh.exception.get() != null) {
148             throw sh.exception.get();
149         }
150         if (ch.exception.get() != null) {
151             throw ch.exception.get();
152         }
153     }
154 
155     private static class EchoHandler extends ChannelInboundHandlerAdapter {
156         private final boolean autoRead;
157         volatile Channel channel;
158         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
159         volatile int counter;
160 
161         EchoHandler(boolean autoRead) {
162             this.autoRead = autoRead;
163         }
164 
165         @Override
166         public void channelActive(ChannelHandlerContext ctx)
167                 throws Exception {
168             channel = ctx.channel();
169             if (!autoRead) {
170                 ctx.read();
171             }
172         }
173 
174         @Override
175         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
176             assertEquals(data[counter], msg);
177 
178             if (channel.parent() != null) {
179                 channel.write(msg);
180             }
181 
182             counter ++;
183         }
184 
185         @Override
186         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
187             try {
188                 ctx.flush();
189             } finally {
190                 if (!autoRead) {
191                     ctx.read();
192                 }
193             }
194         }
195 
196         @Override
197         public void exceptionCaught(ChannelHandlerContext ctx,
198                 Throwable cause) throws Exception {
199             if (exception.compareAndSet(null, cause)) {
200                 ctx.close();
201             }
202         }
203     }
204 }