查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.EventLoopGroup;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.SingleThreadEventLoop;
26  import io.netty.channel.local.LocalAddress;
27  import io.netty.channel.local.LocalServerChannel;
28  import io.netty.util.concurrent.EventExecutor;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.Promise;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.Timeout;
33  import org.junit.jupiter.api.function.Executable;
34  
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.NoSuchElementException;
38  import java.util.Set;
39  import java.util.concurrent.Callable;
40  import java.util.concurrent.CountDownLatch;
41  import java.util.concurrent.RejectedExecutionException;
42  import java.util.concurrent.TimeUnit;
43  
44  import static org.junit.jupiter.api.Assertions.assertEquals;
45  import static org.junit.jupiter.api.Assertions.assertFalse;
46  import static org.junit.jupiter.api.Assertions.assertNotNull;
47  import static org.junit.jupiter.api.Assertions.assertThrows;
48  import static org.junit.jupiter.api.Assertions.assertTrue;
49  import static org.junit.jupiter.api.Assertions.fail;
50  import static org.junit.jupiter.api.Assumptions.assumeTrue;
51  
52  public abstract class AbstractSingleThreadEventLoopTest {
53      @Test
54      @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
55      public void testChannelsRegistered() throws Exception {
56          EventLoopGroup group = newEventLoopGroup();
57          final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
58  
59          try {
60              final Channel ch1 = newChannel();
61              final Channel ch2 = newChannel();
62  
63              int rc = registeredChannels(loop);
64              boolean channelCountSupported = rc != -1;
65  
66              if (channelCountSupported) {
67                  assertEquals(0, registeredChannels(loop));
68              }
69  
70              assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
71              assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
72              if (channelCountSupported) {
73                  checkNumRegisteredChannels(loop, 2);
74              }
75  
76              assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
77              if (channelCountSupported) {
78                  checkNumRegisteredChannels(loop, 1);
79              }
80          } finally {
81              group.shutdownGracefully();
82          }
83      }
84  
85      private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
86          // We need to loop as some EventLoop implementations may need some time to update the counter correctly.
87          while (registeredChannels(loop) != numChannels) {
88              Thread.sleep(50);
89          }
90      }
91  
92      // Only reliable if run from event loop
93      private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
94          return loop.submit(new Callable<Integer>() {
95              @Override
96              public Integer call() {
97                  return loop.registeredChannels();
98              }
99          }).get(1, TimeUnit.SECONDS);
100     }
101 
102     @Test
103     @SuppressWarnings("deprecation")
104     public void shutdownBeforeStart() throws Exception {
105         EventLoopGroup group = newEventLoopGroup();
106         assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
107         group.shutdown();
108         assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
109     }
110 
111     @Test
112     public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
113         EventLoopGroup group = newEventLoopGroup();
114         assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
115     }
116 
117     // Copied from AbstractEventLoopTest
118     @Test
119     @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
120     public void testShutdownGracefullyNoQuietPeriod() throws Exception {
121         EventLoopGroup loop = newEventLoopGroup();
122         ServerBootstrap b = new ServerBootstrap();
123         b.group(loop)
124         .channel(serverChannelClass())
125         .childHandler(new ChannelInboundHandlerAdapter());
126 
127         // Not close the Channel to ensure the EventLoop is still shutdown in time.
128         ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
129                 ? b.bind(new LocalAddress("local")) : b.bind(0);
130         cf.sync().channel();
131 
132         Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
133         assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
134         assertTrue(f.syncUninterruptibly().isSuccess());
135         assertTrue(loop.isShutdown());
136         assertTrue(loop.isTerminated());
137     }
138 
139     @Test
140     public void shutdownGracefullyBeforeStart() throws Exception {
141         EventLoopGroup group = newEventLoopGroup();
142         assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
143     }
144 
145     @Test
146     public void gracefulShutdownAfterStart() throws Exception {
147         EventLoop loop = newEventLoopGroup().next();
148         final CountDownLatch latch = new CountDownLatch(1);
149         loop.execute(new Runnable() {
150             @Override
151             public void run() {
152                 latch.countDown();
153             }
154         });
155 
156         // Wait for the event loop thread to start.
157         latch.await();
158 
159         // Request the event loop thread to stop.
160         loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
161 
162         // Wait until the event loop is terminated.
163         assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
164 
165         assertRejection(loop);
166     }
167 
168     @Test
169     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
170     public void testChannelsIteratorEmpty() throws Exception {
171         assumeTrue(supportsChannelIteration());
172         EventLoopGroup group = newEventLoopGroup();
173         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
174         try {
175             runBlockingOn(loop, new Runnable() {
176                 @Override
177                 public void run() {
178                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
179 
180                     assertFalse(iterator.hasNext());
181                     assertThrows(NoSuchElementException.class, new Executable() {
182                         @Override
183                         public void execute() {
184                             iterator.next();
185                         }
186                     });
187                 }
188             });
189         } finally {
190             group.shutdownGracefully();
191         }
192     }
193 
194     @Test
195     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
196     public void testChannelsIterator() throws Exception {
197         assumeTrue(supportsChannelIteration());
198         EventLoopGroup group = newEventLoopGroup();
199         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
200         try {
201             final Channel ch1 = newChannel();
202             final Channel ch2 = newChannel();
203             loop.register(ch1).syncUninterruptibly();
204             loop.register(ch2).syncUninterruptibly();
205             assertEquals(2, registeredChannels(loop));
206 
207             runBlockingOn(loop, new Runnable() {
208                 @Override
209                 public void run() {
210                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
211 
212                     assertTrue(iterator.hasNext());
213                     Channel actualCh1 = iterator.next();
214                     assertNotNull(actualCh1);
215 
216                     assertTrue(iterator.hasNext());
217                     Channel actualCh2 = iterator.next();
218                     assertNotNull(actualCh2);
219 
220                     Set<Channel> expected = new HashSet<Channel>(4);
221                     expected.add(ch1);
222                     expected.add(ch2);
223                     expected.remove(actualCh1);
224                     expected.remove(actualCh2);
225                     assertTrue(expected.isEmpty());
226 
227                     assertFalse(iterator.hasNext());
228                     assertThrows(NoSuchElementException.class, new Executable() {
229                         @Override
230                         public void execute() {
231                             iterator.next();
232                         }
233                     });
234                 }
235             });
236         } finally {
237             group.shutdownGracefully();
238         }
239     }
240 
241     @Test
242     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
243     public void testChannelsIteratorRemoveThrows() throws Exception {
244         assumeTrue(supportsChannelIteration());
245         EventLoopGroup group = newEventLoopGroup();
246         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
247 
248         try {
249             final Channel ch = newChannel();
250             loop.register(ch).syncUninterruptibly();
251             assertEquals(1, registeredChannels(loop));
252 
253             runBlockingOn(loop, new Runnable() {
254                 @Override
255                 public void run() {
256                     assertThrows(UnsupportedOperationException.class, new Executable() {
257                         @Override
258                         public void execute() {
259                             loop.registeredChannelsIterator().remove();
260                         }
261                     });
262                 }
263             });
264         } finally {
265             group.shutdownGracefully();
266         }
267     }
268 
269     private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
270         final Promise<Void> promise = eventLoop.newPromise();
271             eventLoop.execute(new Runnable() {
272                 @Override
273                 public void run() {
274                     try {
275                         action.run();
276                         promise.setSuccess(null);
277                     } catch (Throwable t) {
278                         promise.tryFailure(t);
279                     }
280                 }
281             });
282         try {
283             promise.await();
284         } catch (InterruptedException e) {
285             throw new RuntimeException(e);
286         }
287         Throwable cause = promise.cause();
288         if (cause != null) {
289             if (cause instanceof RuntimeException) {
290                 throw (RuntimeException) cause;
291             }
292             throw new RuntimeException(cause);
293         }
294     }
295 
296     private static final Runnable NOOP = new Runnable() {
297         @Override
298         public void run() { }
299     };
300 
301     private static void assertRejection(EventExecutor loop) {
302         try {
303             loop.execute(NOOP);
304             fail("A task must be rejected after shutdown() is called.");
305         } catch (RejectedExecutionException e) {
306             // Expected
307         }
308     }
309 
310     protected boolean supportsChannelIteration() {
311         return false;
312     }
313     protected abstract EventLoopGroup newEventLoopGroup();
314     protected abstract Channel newChannel();
315     protected abstract Class<? extends ServerChannel> serverChannelClass();
316 }