1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.MessageEvent;
21 import org.jboss.netty.channel.socket.Worker;
22 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25
26 import java.io.IOException;
27 import java.nio.channels.AsynchronousCloseException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.NotYetConnectedException;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.ArrayList;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Queue;
38 import java.util.Set;
39 import java.util.concurrent.Executor;
40
41 import static org.jboss.netty.channel.Channels.*;
42
43 abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
44
45 protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
46
47 AbstractNioWorker(Executor executor) {
48 super(executor);
49 }
50
51 AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
52 super(executor, determiner);
53 }
54
55 public void executeInIoThread(Runnable task) {
56 executeInIoThread(task, false);
57 }
58
59
60
61
62
63
64
65
66
67
68 public void executeInIoThread(Runnable task, boolean alwaysAsync) {
69 if (!alwaysAsync && isIoThread()) {
70 task.run();
71 } else {
72 registerTask(task);
73 }
74 }
75
76 @Override
77 protected void close(SelectionKey k) {
78 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
79 close(ch, succeededFuture(ch));
80 }
81
82 @Override
83 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
84 return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
85 }
86
87 @Override
88 public void run() {
89 super.run();
90 sendBufferPool.releaseExternalResources();
91 }
92
93 @Override
94 protected void process(Selector selector) throws IOException {
95 Set<SelectionKey> selectedKeys = selector.selectedKeys();
96
97
98
99 if (selectedKeys.isEmpty()) {
100 return;
101 }
102 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
103 SelectionKey k = i.next();
104 i.remove();
105 try {
106 int readyOps = k.readyOps();
107 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
108 if (!read(k)) {
109
110 continue;
111 }
112 }
113 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
114 writeFromSelectorLoop(k);
115 }
116 } catch (CancelledKeyException e) {
117 close(k);
118 }
119
120 if (cleanUpCancelledKeys()) {
121 break;
122 }
123 }
124 }
125
126 void writeFromUserCode(final AbstractNioChannel<?> channel) {
127 if (!channel.isConnected()) {
128 cleanUpWriteBuffer(channel);
129 return;
130 }
131
132 if (scheduleWriteIfNecessary(channel)) {
133 return;
134 }
135
136
137
138 if (channel.writeSuspended) {
139 return;
140 }
141
142 if (channel.inWriteNowLoop) {
143 return;
144 }
145
146 write0(channel);
147 }
148
149 void writeFromTaskLoop(AbstractNioChannel<?> ch) {
150 if (!ch.writeSuspended) {
151 write0(ch);
152 }
153 }
154
155 void writeFromSelectorLoop(final SelectionKey k) {
156 AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
157 ch.writeSuspended = false;
158 write0(ch);
159 }
160
161 protected abstract boolean scheduleWriteIfNecessary(AbstractNioChannel<?> channel);
162
163 protected void write0(AbstractNioChannel<?> channel) {
164 boolean open = true;
165 boolean addOpWrite = false;
166 boolean removeOpWrite = false;
167 boolean iothread = isIoThread(channel);
168
169 long writtenBytes = 0;
170
171 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
172 final WritableByteChannel ch = channel.channel;
173 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
174 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
175 List<Throwable> causes = null;
176
177 synchronized (channel.writeLock) {
178 channel.inWriteNowLoop = true;
179 for (;;) {
180
181 MessageEvent evt = channel.currentWriteEvent;
182 SendBuffer buf = null;
183 ChannelFuture future = null;
184 try {
185 if (evt == null) {
186 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
187 removeOpWrite = true;
188 channel.writeSuspended = false;
189 break;
190 }
191 future = evt.getFuture();
192
193 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
194 } else {
195 future = evt.getFuture();
196 buf = channel.currentWriteBuffer;
197 }
198
199 long localWrittenBytes = 0;
200 for (int i = writeSpinCount; i > 0; i --) {
201 localWrittenBytes = buf.transferTo(ch);
202 if (localWrittenBytes != 0) {
203 writtenBytes += localWrittenBytes;
204 break;
205 }
206 if (buf.finished()) {
207 break;
208 }
209 }
210
211 if (buf.finished()) {
212
213 buf.release();
214 channel.currentWriteEvent = null;
215 channel.currentWriteBuffer = null;
216
217
218 evt = null;
219 buf = null;
220 future.setSuccess();
221 } else {
222
223 addOpWrite = true;
224 channel.writeSuspended = true;
225
226 if (writtenBytes > 0) {
227
228 future.setProgress(
229 localWrittenBytes,
230 buf.writtenBytes(), buf.totalBytes());
231 }
232 break;
233 }
234 } catch (AsynchronousCloseException e) {
235
236 } catch (Throwable t) {
237 if (buf != null) {
238 buf.release();
239 }
240 channel.currentWriteEvent = null;
241 channel.currentWriteBuffer = null;
242
243
244 buf = null;
245
246 evt = null;
247 if (future != null) {
248 future.setFailure(t);
249 }
250 if (iothread) {
251
252
253
254
255 if (causes == null) {
256 causes = new ArrayList<Throwable>(1);
257 }
258 causes.add(t);
259 } else {
260 fireExceptionCaughtLater(channel, t);
261 }
262 if (t instanceof IOException) {
263
264
265
266
267
268 open = false;
269 }
270 }
271 }
272 channel.inWriteNowLoop = false;
273
274
275
276
277
278
279
280 if (open) {
281 if (addOpWrite) {
282 setOpWrite(channel);
283 } else if (removeOpWrite) {
284 clearOpWrite(channel);
285 }
286 }
287 }
288 if (causes != null) {
289 for (Throwable cause: causes) {
290
291 fireExceptionCaught(channel, cause);
292 }
293 }
294 if (!open) {
295
296 close(channel, succeededFuture(channel));
297 }
298 if (iothread) {
299 fireWriteComplete(channel, writtenBytes);
300 } else {
301 fireWriteCompleteLater(channel, writtenBytes);
302 }
303 }
304
305 static boolean isIoThread(AbstractNioChannel<?> channel) {
306 return Thread.currentThread() == channel.worker.thread;
307 }
308
309 protected void setOpWrite(AbstractNioChannel<?> channel) {
310 Selector selector = this.selector;
311 SelectionKey key = channel.channel.keyFor(selector);
312 if (key == null) {
313 return;
314 }
315 if (!key.isValid()) {
316 close(key);
317 return;
318 }
319
320 int interestOps = channel.getInternalInterestOps();
321 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
322 interestOps |= SelectionKey.OP_WRITE;
323 key.interestOps(interestOps);
324 channel.setInternalInterestOps(interestOps);
325 }
326 }
327
328 protected void clearOpWrite(AbstractNioChannel<?> channel) {
329 Selector selector = this.selector;
330 SelectionKey key = channel.channel.keyFor(selector);
331 if (key == null) {
332 return;
333 }
334 if (!key.isValid()) {
335 close(key);
336 return;
337 }
338
339 int interestOps = channel.getInternalInterestOps();
340 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
341 interestOps &= ~SelectionKey.OP_WRITE;
342 key.interestOps(interestOps);
343 channel.setInternalInterestOps(interestOps);
344 }
345 }
346
347 protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
348 boolean connected = channel.isConnected();
349 boolean bound = channel.isBound();
350 boolean iothread = isIoThread(channel);
351
352 try {
353 channel.channel.close();
354 increaseCancelledKeys();
355
356 if (channel.setClosed()) {
357 future.setSuccess();
358 if (connected) {
359 if (iothread) {
360 fireChannelDisconnected(channel);
361 } else {
362 fireChannelDisconnectedLater(channel);
363 }
364 }
365 if (bound) {
366 if (iothread) {
367 fireChannelUnbound(channel);
368 } else {
369 fireChannelUnboundLater(channel);
370 }
371 }
372
373 cleanUpWriteBuffer(channel);
374 if (iothread) {
375 fireChannelClosed(channel);
376 } else {
377 fireChannelClosedLater(channel);
378 }
379 } else {
380 future.setSuccess();
381 }
382 } catch (Throwable t) {
383 future.setFailure(t);
384 if (iothread) {
385 fireExceptionCaught(channel, t);
386 } else {
387 fireExceptionCaughtLater(channel, t);
388 }
389 }
390 }
391
392 protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
393 Exception cause = null;
394 boolean fireExceptionCaught = false;
395
396
397 synchronized (channel.writeLock) {
398 MessageEvent evt = channel.currentWriteEvent;
399 if (evt != null) {
400
401
402 if (channel.isOpen()) {
403 cause = new NotYetConnectedException();
404 } else {
405 cause = new ClosedChannelException();
406 }
407
408 ChannelFuture future = evt.getFuture();
409 if (channel.currentWriteBuffer != null) {
410 channel.currentWriteBuffer.release();
411 channel.currentWriteBuffer = null;
412 }
413 channel.currentWriteEvent = null;
414
415
416 evt = null;
417 future.setFailure(cause);
418 fireExceptionCaught = true;
419 }
420
421 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
422 for (;;) {
423 evt = writeBuffer.poll();
424 if (evt == null) {
425 break;
426 }
427
428
429 if (cause == null) {
430 if (channel.isOpen()) {
431 cause = new NotYetConnectedException();
432 } else {
433 cause = new ClosedChannelException();
434 }
435 fireExceptionCaught = true;
436 }
437 evt.getFuture().setFailure(cause);
438 }
439 }
440
441 if (fireExceptionCaught) {
442 if (isIoThread(channel)) {
443 fireExceptionCaught(channel, cause);
444 } else {
445 fireExceptionCaughtLater(channel, cause);
446 }
447 }
448 }
449
450 void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
451 boolean iothread = isIoThread(channel);
452 if (!iothread) {
453 channel.getPipeline().execute(new Runnable() {
454 public void run() {
455 setInterestOps(channel, future, interestOps);
456 }
457 });
458 return;
459 }
460
461 boolean changed = false;
462 try {
463 Selector selector = this.selector;
464 SelectionKey key = channel.channel.keyFor(selector);
465
466
467 int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getInternalInterestOps() & Channel.OP_WRITE;
468
469 if (key == null || selector == null) {
470 if (channel.getInternalInterestOps() != newInterestOps) {
471 changed = true;
472 }
473
474
475
476 channel.setInternalInterestOps(newInterestOps);
477
478 future.setSuccess();
479 if (changed) {
480 if (iothread) {
481 fireChannelInterestChanged(channel);
482 } else {
483 fireChannelInterestChangedLater(channel);
484 }
485 }
486
487 return;
488 }
489
490 if (channel.getInternalInterestOps() != newInterestOps) {
491 changed = true;
492 key.interestOps(newInterestOps);
493 if (Thread.currentThread() != thread &&
494 wakenUp.compareAndSet(false, true)) {
495 selector.wakeup();
496 }
497 channel.setInternalInterestOps(newInterestOps);
498 }
499
500 future.setSuccess();
501 if (changed) {
502 fireChannelInterestChanged(channel);
503 }
504 } catch (CancelledKeyException e) {
505
506 ClosedChannelException cce = new ClosedChannelException();
507 future.setFailure(cce);
508 fireExceptionCaught(channel, cce);
509 } catch (Throwable t) {
510 future.setFailure(t);
511 fireExceptionCaught(channel, t);
512 }
513 }
514
515
516
517
518
519
520
521
522 protected abstract boolean read(SelectionKey k);
523
524 }