1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.SingleThreadEventLoop;
26 import io.netty.channel.local.LocalAddress;
27 import io.netty.channel.local.LocalServerChannel;
28 import io.netty.util.concurrent.EventExecutor;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.Promise;
31 import org.junit.jupiter.api.Test;
32 import org.junit.jupiter.api.Timeout;
33 import org.junit.jupiter.api.function.Executable;
34
35 import java.util.HashSet;
36 import java.util.Iterator;
37 import java.util.NoSuchElementException;
38 import java.util.Set;
39 import java.util.concurrent.Callable;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.RejectedExecutionException;
42 import java.util.concurrent.TimeUnit;
43
44 import static org.junit.jupiter.api.Assertions.assertEquals;
45 import static org.junit.jupiter.api.Assertions.assertFalse;
46 import static org.junit.jupiter.api.Assertions.assertNotNull;
47 import static org.junit.jupiter.api.Assertions.assertThrows;
48 import static org.junit.jupiter.api.Assertions.assertTrue;
49 import static org.junit.jupiter.api.Assertions.fail;
50 import static org.junit.jupiter.api.Assumptions.assumeTrue;
51
52 public abstract class AbstractSingleThreadEventLoopTest {
53 @Test
54 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
55 public void testChannelsRegistered() throws Exception {
56 EventLoopGroup group = newEventLoopGroup();
57 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
58
59 try {
60 final Channel ch1 = newChannel();
61 final Channel ch2 = newChannel();
62
63 int rc = registeredChannels(loop);
64 boolean channelCountSupported = rc != -1;
65
66 if (channelCountSupported) {
67 assertEquals(0, registeredChannels(loop));
68 }
69
70 assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
71 assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
72 if (channelCountSupported) {
73 checkNumRegisteredChannels(loop, 2);
74 }
75
76 assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
77 if (channelCountSupported) {
78 checkNumRegisteredChannels(loop, 1);
79 }
80 } finally {
81 group.shutdownGracefully();
82 }
83 }
84
85 private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
86
87 while (registeredChannels(loop) != numChannels) {
88 Thread.sleep(50);
89 }
90 }
91
92
93 private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
94 return loop.submit(new Callable<Integer>() {
95 @Override
96 public Integer call() {
97 return loop.registeredChannels();
98 }
99 }).get(1, TimeUnit.SECONDS);
100 }
101
102 @Test
103 @SuppressWarnings("deprecation")
104 public void shutdownBeforeStart() throws Exception {
105 EventLoopGroup group = newEventLoopGroup();
106 assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
107 group.shutdown();
108 assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
109 }
110
111 @Test
112 public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
113 EventLoopGroup group = newEventLoopGroup();
114 assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
115 }
116
117
118 @Test
119 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
120 public void testShutdownGracefullyNoQuietPeriod() throws Exception {
121 EventLoopGroup loop = newEventLoopGroup();
122 ServerBootstrap b = new ServerBootstrap();
123 b.group(loop)
124 .channel(serverChannelClass())
125 .childHandler(new ChannelInboundHandlerAdapter());
126
127
128 ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
129 ? b.bind(new LocalAddress("local")) : b.bind(0);
130 cf.sync().channel();
131
132 Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
133 assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
134 assertTrue(f.syncUninterruptibly().isSuccess());
135 assertTrue(loop.isShutdown());
136 assertTrue(loop.isTerminated());
137 }
138
139 @Test
140 public void shutdownGracefullyBeforeStart() throws Exception {
141 EventLoopGroup group = newEventLoopGroup();
142 assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
143 }
144
145 @Test
146 public void gracefulShutdownAfterStart() throws Exception {
147 EventLoop loop = newEventLoopGroup().next();
148 final CountDownLatch latch = new CountDownLatch(1);
149 loop.execute(new Runnable() {
150 @Override
151 public void run() {
152 latch.countDown();
153 }
154 });
155
156
157 latch.await();
158
159
160 loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
161
162
163 assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
164
165 assertRejection(loop);
166 }
167
168 @Test
169 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
170 public void testChannelsIteratorEmpty() throws Exception {
171 assumeTrue(supportsChannelIteration());
172 EventLoopGroup group = newEventLoopGroup();
173 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
174 try {
175 runBlockingOn(loop, new Runnable() {
176 @Override
177 public void run() {
178 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
179
180 assertFalse(iterator.hasNext());
181 assertThrows(NoSuchElementException.class, new Executable() {
182 @Override
183 public void execute() {
184 iterator.next();
185 }
186 });
187 }
188 });
189 } finally {
190 group.shutdownGracefully();
191 }
192 }
193
194 @Test
195 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
196 public void testChannelsIterator() throws Exception {
197 assumeTrue(supportsChannelIteration());
198 EventLoopGroup group = newEventLoopGroup();
199 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
200 try {
201 final Channel ch1 = newChannel();
202 final Channel ch2 = newChannel();
203 loop.register(ch1).syncUninterruptibly();
204 loop.register(ch2).syncUninterruptibly();
205 assertEquals(2, registeredChannels(loop));
206
207 runBlockingOn(loop, new Runnable() {
208 @Override
209 public void run() {
210 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
211
212 assertTrue(iterator.hasNext());
213 Channel actualCh1 = iterator.next();
214 assertNotNull(actualCh1);
215
216 assertTrue(iterator.hasNext());
217 Channel actualCh2 = iterator.next();
218 assertNotNull(actualCh2);
219
220 Set<Channel> expected = new HashSet<Channel>(4);
221 expected.add(ch1);
222 expected.add(ch2);
223 expected.remove(actualCh1);
224 expected.remove(actualCh2);
225 assertTrue(expected.isEmpty());
226
227 assertFalse(iterator.hasNext());
228 assertThrows(NoSuchElementException.class, new Executable() {
229 @Override
230 public void execute() {
231 iterator.next();
232 }
233 });
234 }
235 });
236 } finally {
237 group.shutdownGracefully();
238 }
239 }
240
241 @Test
242 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
243 public void testChannelsIteratorRemoveThrows() throws Exception {
244 assumeTrue(supportsChannelIteration());
245 EventLoopGroup group = newEventLoopGroup();
246 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
247
248 try {
249 final Channel ch = newChannel();
250 loop.register(ch).syncUninterruptibly();
251 assertEquals(1, registeredChannels(loop));
252
253 runBlockingOn(loop, new Runnable() {
254 @Override
255 public void run() {
256 assertThrows(UnsupportedOperationException.class, new Executable() {
257 @Override
258 public void execute() {
259 loop.registeredChannelsIterator().remove();
260 }
261 });
262 }
263 });
264 } finally {
265 group.shutdownGracefully();
266 }
267 }
268
269 private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
270 final Promise<Void> promise = eventLoop.newPromise();
271 eventLoop.execute(new Runnable() {
272 @Override
273 public void run() {
274 try {
275 action.run();
276 promise.setSuccess(null);
277 } catch (Throwable t) {
278 promise.tryFailure(t);
279 }
280 }
281 });
282 try {
283 promise.await();
284 } catch (InterruptedException e) {
285 throw new RuntimeException(e);
286 }
287 Throwable cause = promise.cause();
288 if (cause != null) {
289 if (cause instanceof RuntimeException) {
290 throw (RuntimeException) cause;
291 }
292 throw new RuntimeException(cause);
293 }
294 }
295
296 private static final Runnable NOOP = new Runnable() {
297 @Override
298 public void run() { }
299 };
300
301 private static void assertRejection(EventExecutor loop) {
302 try {
303 loop.execute(NOOP);
304 fail("A task must be rejected after shutdown() is called.");
305 } catch (RejectedExecutionException e) {
306
307 }
308 }
309
310 protected boolean supportsChannelIteration() {
311 return false;
312 }
313 protected abstract EventLoopGroup newEventLoopGroup();
314 protected abstract Channel newChannel();
315 protected abstract Class<? extends ServerChannel> serverChannelClass();
316 }