查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelFutureListener;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelInboundHandlerAdapter;
27  import io.netty.channel.ChannelInitializer;
28  import io.netty.channel.ChannelOption;
29  import io.netty.testsuite.transport.TestsuitePermutation;
30  import io.netty.util.CharsetUtil;
31  import io.netty.util.concurrent.ImmediateEventExecutor;
32  import io.netty.util.concurrent.Promise;
33  import org.junit.jupiter.api.Test;
34  import org.junit.jupiter.api.TestInfo;
35  import org.junit.jupiter.api.Timeout;
36  
37  import java.io.IOException;
38  import java.net.SocketAddress;
39  import java.util.List;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  public abstract class AbstractSocketReuseFdTest extends AbstractSocketTest {
45      @Override
46      protected abstract SocketAddress newSocketAddress();
47  
48      @Override
49      protected abstract List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories();
50  
51      @Test
52      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
53      public void testReuseFd(TestInfo testInfo) throws Throwable {
54          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
55              @Override
56              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
57                  testReuseFd(serverBootstrap, bootstrap);
58              }
59          });
60      }
61  
62      public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable {
63          sb.childOption(ChannelOption.AUTO_READ, true);
64          cb.option(ChannelOption.AUTO_READ, true);
65  
66          // Use a number which will typically not exceed /proc/sys/net/core/somaxconn (which is 128 on linux by default
67          // often).
68          int numChannels = 100;
69          final AtomicReference<Throwable> globalException = new AtomicReference<Throwable>();
70          final AtomicInteger serverRemaining = new AtomicInteger(numChannels);
71          final AtomicInteger clientRemaining = new AtomicInteger(numChannels);
72          final Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
73          final Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
74  
75          sb.childHandler(new ChannelInitializer<Channel>() {
76              @Override
77              public void initChannel(Channel sch) {
78                  ReuseFdHandler sh = new ReuseFdHandler(
79                      false,
80                      globalException,
81                      serverRemaining,
82                      serverDonePromise);
83                  sch.pipeline().addLast("handler", sh);
84              }
85          });
86  
87          cb.handler(new ChannelInitializer<Channel>() {
88              @Override
89              public void initChannel(Channel sch) {
90                  ReuseFdHandler ch = new ReuseFdHandler(
91                      true,
92                      globalException,
93                      clientRemaining,
94                      clientDonePromise);
95                  sch.pipeline().addLast("handler", ch);
96              }
97          });
98  
99          ChannelFutureListener listener = new ChannelFutureListener() {
100             @Override
101             public void operationComplete(ChannelFuture future) {
102                 if (!future.isSuccess()) {
103                     clientDonePromise.tryFailure(future.cause());
104                 }
105             }
106         };
107 
108         Channel sc = sb.bind().sync().channel();
109         for (int i = 0; i < numChannels; i++) {
110             cb.connect(sc.localAddress()).addListener(listener);
111         }
112 
113         clientDonePromise.sync();
114         serverDonePromise.sync();
115         sc.close().sync();
116 
117         if (globalException.get() != null && !(globalException.get() instanceof IOException)) {
118             throw globalException.get();
119         }
120     }
121 
122     static class ReuseFdHandler extends ChannelInboundHandlerAdapter {
123         private static final String EXPECTED_PAYLOAD = "payload";
124 
125         private final Promise<Void> donePromise;
126         private final AtomicInteger remaining;
127         private final boolean client;
128         volatile Channel channel;
129         final AtomicReference<Throwable> globalException;
130         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
131         final StringBuilder received = new StringBuilder();
132 
133         ReuseFdHandler(
134             boolean client,
135             AtomicReference<Throwable> globalException,
136             AtomicInteger remaining,
137             Promise<Void> donePromise) {
138             this.client = client;
139             this.globalException = globalException;
140             this.remaining = remaining;
141             this.donePromise = donePromise;
142         }
143 
144         @Override
145         public void channelActive(ChannelHandlerContext ctx) {
146             channel = ctx.channel();
147             if (client) {
148                 ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII));
149             }
150         }
151 
152         @Override
153         public void channelRead(ChannelHandlerContext ctx, Object msg) {
154             if (msg instanceof ByteBuf) {
155                 ByteBuf buf = (ByteBuf) msg;
156                 received.append(buf.toString(CharsetUtil.US_ASCII));
157                 buf.release();
158 
159                 if (received.toString().equals(EXPECTED_PAYLOAD)) {
160                     if (client) {
161                         ctx.close();
162                     } else {
163                         ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII));
164                     }
165                 }
166             }
167         }
168 
169         @Override
170         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
171             if (exception.compareAndSet(null, cause)) {
172                 donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
173                 ctx.close();
174             }
175             globalException.compareAndSet(null, cause);
176         }
177 
178         @Override
179         public void channelInactive(ChannelHandlerContext ctx) {
180             if (remaining.decrementAndGet() == 0) {
181                 if (received.toString().equals(EXPECTED_PAYLOAD)) {
182                     donePromise.setSuccess(null);
183                 } else {
184                     donePromise.tryFailure(new Exception("Unexpected payload:" + received));
185                 }
186             }
187         }
188     }
189 }