1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelException;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.logging.InternalLogger;
22 import org.jboss.netty.logging.InternalLoggerFactory;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25 import org.jboss.netty.util.internal.DeadLockProofWorker;
26
27 import java.io.IOException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.SocketChannel;
34 import java.util.ConcurrentModificationException;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 abstract class AbstractNioSelector implements NioSelector {
44
45 private static final AtomicInteger nextId = new AtomicInteger();
46
47 private final int id = nextId.incrementAndGet();
48
49
50
51
52 protected static final InternalLogger logger = InternalLoggerFactory
53 .getInstance(AbstractNioSelector.class);
54
55 private static final int CLEANUP_INTERVAL = 256;
56
57
58
59
60
61 private final Executor executor;
62
63
64
65
66
67 protected volatile Thread thread;
68
69
70
71
72 final CountDownLatch startupLatch = new CountDownLatch(1);
73
74
75
76
77 protected volatile Selector selector;
78
79
80
81
82
83
84
85 protected final AtomicBoolean wakenUp = new AtomicBoolean();
86
87 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
88
89 private volatile int cancelledKeys;
90
91 private final CountDownLatch shutdownLatch = new CountDownLatch(1);
92 private volatile boolean shutdown;
93
94 AbstractNioSelector(Executor executor) {
95 this(executor, null);
96 }
97
98 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
99 this.executor = executor;
100 openSelector(determiner);
101 }
102
103 public void register(Channel channel, ChannelFuture future) {
104 Runnable task = createRegisterTask(channel, future);
105 registerTask(task);
106 }
107
108 protected final void registerTask(Runnable task) {
109 taskQueue.add(task);
110
111 Selector selector = this.selector;
112
113 if (selector != null) {
114 if (wakenUp.compareAndSet(false, true)) {
115 selector.wakeup();
116 }
117 } else {
118 if (taskQueue.remove(task)) {
119
120 throw new RejectedExecutionException("Worker has already been shutdown");
121 }
122 }
123 }
124
125 protected final boolean isIoThread() {
126 return Thread.currentThread() == thread;
127 }
128
129 public void rebuildSelector() {
130 if (!isIoThread()) {
131 taskQueue.add(new Runnable() {
132 public void run() {
133 rebuildSelector();
134 }
135 });
136 return;
137 }
138
139 final Selector oldSelector = selector;
140 final Selector newSelector;
141
142 if (oldSelector == null) {
143 return;
144 }
145
146 try {
147 newSelector = SelectorUtil.open();
148 } catch (Exception e) {
149 logger.warn("Failed to create a new Selector.", e);
150 return;
151 }
152
153
154 int nChannels = 0;
155 for (;;) {
156 try {
157 for (SelectionKey key: oldSelector.keys()) {
158 try {
159 if (key.channel().keyFor(newSelector) != null) {
160 continue;
161 }
162
163 int interestOps = key.interestOps();
164 key.cancel();
165 key.channel().register(newSelector, interestOps, key.attachment());
166 nChannels ++;
167 } catch (Exception e) {
168 logger.warn("Failed to re-register a Channel to the new Selector,", e);
169 close(key);
170 }
171 }
172 } catch (ConcurrentModificationException e) {
173
174 continue;
175 }
176
177 break;
178 }
179
180 selector = newSelector;
181
182 try {
183
184 oldSelector.close();
185 } catch (Throwable t) {
186 if (logger.isWarnEnabled()) {
187 logger.warn("Failed to close the old Selector.", t);
188 }
189 }
190
191 logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
192 }
193
194 public void run() {
195 thread = Thread.currentThread();
196 startupLatch.countDown();
197
198 int selectReturnsImmediately = 0;
199 Selector selector = this.selector;
200
201 if (selector == null) {
202 return;
203 }
204
205 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
206 boolean wakenupFromLoop = false;
207 for (;;) {
208 wakenUp.set(false);
209
210 try {
211 long beforeSelect = System.nanoTime();
212 int selected = select(selector);
213 if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
214 long timeBlocked = System.nanoTime() - beforeSelect;
215 if (timeBlocked < minSelectTimeout) {
216 boolean notConnected = false;
217
218 for (SelectionKey key: selector.keys()) {
219 SelectableChannel ch = key.channel();
220 try {
221 if (ch instanceof DatagramChannel && !ch.isOpen() ||
222 ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() &&
223
224
225 !((SocketChannel) ch).isConnectionPending()) {
226 notConnected = true;
227
228 key.cancel();
229 }
230 } catch (CancelledKeyException e) {
231
232 }
233 }
234 if (notConnected) {
235 selectReturnsImmediately = 0;
236 } else {
237 if (Thread.interrupted() && !shutdown) {
238
239
240
241
242
243 if (logger.isDebugEnabled()) {
244 logger.debug("Selector.select() returned prematurely because the I/O thread " +
245 "has been interrupted. Use shutdown() to shut the NioSelector down.");
246 }
247 selectReturnsImmediately = 0;
248 } else {
249
250
251
252 selectReturnsImmediately ++;
253 }
254 }
255 } else {
256 selectReturnsImmediately = 0;
257 }
258 } else {
259 selectReturnsImmediately = 0;
260 }
261
262 if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
263 if (selectReturnsImmediately == 1024) {
264
265
266
267 rebuildSelector();
268 selector = this.selector;
269 selectReturnsImmediately = 0;
270 wakenupFromLoop = false;
271
272 continue;
273 }
274 } else {
275
276 selectReturnsImmediately = 0;
277 }
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307 if (wakenUp.get()) {
308 wakenupFromLoop = true;
309 selector.wakeup();
310 } else {
311 wakenupFromLoop = false;
312 }
313
314 cancelledKeys = 0;
315 processTaskQueue();
316 selector = this.selector;
317
318 if (shutdown) {
319 this.selector = null;
320
321
322 processTaskQueue();
323
324 for (SelectionKey k: selector.keys()) {
325 close(k);
326 }
327
328 try {
329 selector.close();
330 } catch (IOException e) {
331 logger.warn(
332 "Failed to close a selector.", e);
333 }
334 shutdownLatch.countDown();
335 break;
336 } else {
337 process(selector);
338 }
339 } catch (Throwable t) {
340 logger.warn(
341 "Unexpected exception in the selector loop.", t);
342
343
344
345 try {
346 Thread.sleep(1000);
347 } catch (InterruptedException e) {
348
349 }
350 }
351 }
352 }
353
354
355
356
357
358 private void openSelector(ThreadNameDeterminer determiner) {
359 try {
360 selector = SelectorUtil.open();
361 } catch (Throwable t) {
362 throw new ChannelException("Failed to create a selector.", t);
363 }
364
365
366 boolean success = false;
367 try {
368 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
369 success = true;
370 } finally {
371 if (!success) {
372
373 try {
374 selector.close();
375 } catch (Throwable t) {
376 logger.warn("Failed to close a selector.", t);
377 }
378 selector = null;
379
380 }
381 }
382 assert selector != null && selector.isOpen();
383 }
384
385 private void processTaskQueue() {
386 for (;;) {
387 final Runnable task = taskQueue.poll();
388 if (task == null) {
389 break;
390 }
391 task.run();
392 try {
393 cleanUpCancelledKeys();
394 } catch (IOException e) {
395
396 }
397 }
398 }
399
400 protected final void increaseCancelledKeys() {
401 cancelledKeys ++;
402 }
403
404 protected final boolean cleanUpCancelledKeys() throws IOException {
405 if (cancelledKeys >= CLEANUP_INTERVAL) {
406 cancelledKeys = 0;
407 selector.selectNow();
408 return true;
409 }
410 return false;
411 }
412
413 public void shutdown() {
414 if (isIoThread()) {
415 throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
416 }
417
418 Selector selector = this.selector;
419 shutdown = true;
420 if (selector != null) {
421 selector.wakeup();
422 }
423 try {
424 shutdownLatch.await();
425 } catch (InterruptedException e) {
426 logger.error("Interrupted while wait for resources to be released #" + id);
427 Thread.currentThread().interrupt();
428 }
429 }
430
431 protected abstract void process(Selector selector) throws IOException;
432
433 protected int select(Selector selector) throws IOException {
434 return SelectorUtil.select(selector);
435 }
436
437 protected abstract void close(SelectionKey k);
438
439 protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
440
441 protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
442 }