1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.nio.channels.SocketChannel;
26 import java.util.Queue;
27 import java.util.Set;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.common.ByteBuffer;
32 import org.apache.mina.common.ExceptionMonitor;
33 import org.apache.mina.common.IdleStatus;
34 import org.apache.mina.common.IoSession;
35 import org.apache.mina.common.WriteTimeoutException;
36 import org.apache.mina.common.IoFilter.WriteRequest;
37 import org.apache.mina.util.NamePreservingRunnable;
38
39
40
41
42
43
44
45 class SocketIoProcessor {
46
47
48
49
50
51
52
53 private static final int WRITE_SPIN_COUNT = 256;
54
55 private final Object lock = new Object();
56
57 private final String threadName;
58
59 private final Executor executor;
60
61 private volatile Selector selector;
62
63 private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
64
65 private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
66
67 private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
68
69 private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
70
71 private Worker worker;
72
73 private long lastIdleCheckTime = System.currentTimeMillis();
74
75 SocketIoProcessor(String threadName, Executor executor) {
76 this.threadName = threadName;
77 this.executor = executor;
78 }
79
80 void addNew(SocketSessionImpl session) throws IOException {
81 newSessions.add(session);
82 startupWorker();
83 }
84
85 void remove(SocketSessionImpl session) throws IOException {
86 scheduleRemove(session);
87 startupWorker();
88 }
89
90 private void startupWorker() throws IOException {
91 synchronized (lock) {
92 if (worker == null) {
93 selector = Selector.open();
94 worker = new Worker();
95 executor.execute(new NamePreservingRunnable(worker, threadName));
96 }
97 selector.wakeup();
98 }
99 }
100
101 void flush(SocketSessionImpl session) {
102 if ( scheduleFlush(session) ) {
103 Selector selector = this.selector;
104 if (selector != null) {
105 selector.wakeup();
106 }
107 }
108 }
109
110 void updateTrafficMask(SocketSessionImpl session) {
111 scheduleTrafficControl(session);
112 Selector selector = this.selector;
113 if (selector != null) {
114 selector.wakeup();
115 }
116 }
117
118 private void scheduleRemove(SocketSessionImpl session) {
119 removingSessions.add(session);
120 }
121
122 private boolean scheduleFlush(SocketSessionImpl session) {
123 if (session.setScheduledForFlush(true)) {
124 flushingSessions.add(session);
125
126 return true;
127 }
128
129 return false;
130 }
131
132 private void scheduleTrafficControl(SocketSessionImpl session) {
133 trafficControllingSessions.add(session);
134 }
135
136 private void doAddNew() {
137 Selector selector = this.selector;
138 for (;;) {
139 SocketSessionImpl session = newSessions.poll();
140
141 if (session == null)
142 break;
143
144 SocketChannel ch = session.getChannel();
145 try {
146 ch.configureBlocking(false);
147 session.setSelectionKey(ch.register(selector,
148 SelectionKey.OP_READ, session));
149
150
151
152 session.getServiceListeners().fireSessionCreated(session);
153 } catch (IOException e) {
154
155
156 session.getFilterChain().fireExceptionCaught(session, e);
157 }
158 }
159 }
160
161 private void doRemove() {
162 for (;;) {
163 SocketSessionImpl session = removingSessions.poll();
164
165 if (session == null)
166 break;
167
168 SocketChannel ch = session.getChannel();
169 SelectionKey key = session.getSelectionKey();
170
171
172 if (key == null) {
173 scheduleRemove(session);
174 break;
175 }
176
177 if (!key.isValid()) {
178 continue;
179 }
180
181 try {
182 key.cancel();
183 ch.close();
184 } catch (IOException e) {
185 session.getFilterChain().fireExceptionCaught(session, e);
186 } finally {
187 releaseWriteBuffers(session);
188 session.getServiceListeners().fireSessionDestroyed(session);
189 }
190 }
191 }
192
193 private void process(Set<SelectionKey> selectedKeys) {
194 for (SelectionKey key : selectedKeys) {
195 SocketSessionImpl session = (SocketSessionImpl) key.attachment();
196
197 if (key.isReadable() && session.getTrafficMask().isReadable()) {
198 read(session);
199 }
200
201 if (key.isWritable() && session.getTrafficMask().isWritable()) {
202 scheduleFlush(session);
203 }
204 }
205
206 selectedKeys.clear();
207 }
208
209 private void read(SocketSessionImpl session) {
210 ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
211 SocketChannel ch = session.getChannel();
212
213 try {
214 int readBytes = 0;
215 int ret;
216
217 try {
218 while ((ret = ch.read(buf.buf())) > 0) {
219 readBytes += ret;
220 }
221 } finally {
222 buf.flip();
223 }
224
225 session.increaseReadBytes(readBytes);
226
227 if (readBytes > 0) {
228 session.getFilterChain().fireMessageReceived(session, buf);
229 buf = null;
230
231 if (readBytes * 2 < session.getReadBufferSize()) {
232 session.decreaseReadBufferSize();
233 } else if (readBytes == session.getReadBufferSize()) {
234 session.increaseReadBufferSize();
235 }
236 }
237 if (ret < 0) {
238 scheduleRemove(session);
239 }
240 } catch (Throwable e) {
241 if (e instanceof IOException)
242 scheduleRemove(session);
243 session.getFilterChain().fireExceptionCaught(session, e);
244 } finally {
245 if (buf != null)
246 buf.release();
247 }
248 }
249
250 private void notifyIdleness() {
251
252 long currentTime = System.currentTimeMillis();
253 if ((currentTime - lastIdleCheckTime) >= 1000) {
254 lastIdleCheckTime = currentTime;
255 Set<SelectionKey> keys = selector.keys();
256 if (keys != null) {
257 for (SelectionKey key : keys) {
258 SocketSessionImpl session = (SocketSessionImpl) key
259 .attachment();
260 notifyIdleness(session, currentTime);
261 }
262 }
263 }
264 }
265
266 private void notifyIdleness(SocketSessionImpl session, long currentTime) {
267 notifyIdleness0(session, currentTime, session
268 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
269 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
270 .getLastIdleTime(IdleStatus.BOTH_IDLE)));
271 notifyIdleness0(session, currentTime, session
272 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
273 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
274 session.getLastIdleTime(IdleStatus.READER_IDLE)));
275 notifyIdleness0(session, currentTime, session
276 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
277 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
278 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
279
280 notifyWriteTimeout(session, currentTime, session
281 .getWriteTimeoutInMillis(), session.getLastWriteTime());
282 }
283
284 private void notifyIdleness0(SocketSessionImpl session, long currentTime,
285 long idleTime, IdleStatus status, long lastIoTime) {
286 if (idleTime > 0 && lastIoTime != 0
287 && (currentTime - lastIoTime) >= idleTime) {
288 session.increaseIdleCount(status);
289 session.getFilterChain().fireSessionIdle(session, status);
290 }
291 }
292
293 private void notifyWriteTimeout(SocketSessionImpl session,
294 long currentTime, long writeTimeout, long lastIoTime) {
295 SelectionKey key = session.getSelectionKey();
296 if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
297 && key != null && key.isValid()
298 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
299 session.getFilterChain().fireExceptionCaught(session,
300 new WriteTimeoutException());
301 }
302 }
303
304 private void doFlush() {
305 for (;;) {
306 SocketSessionImpl session = flushingSessions.poll();
307
308 if (session == null)
309 break;
310
311 session.setScheduledForFlush(false);
312
313 if (!session.isConnected()) {
314 releaseWriteBuffers(session);
315 continue;
316 }
317
318 SelectionKey key = session.getSelectionKey();
319
320
321 if (key == null) {
322 scheduleFlush(session);
323 break;
324 }
325
326
327 if (!key.isValid()) {
328 continue;
329 }
330
331 try {
332 boolean flushedAll = doFlush(session);
333 if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
334 scheduleFlush( session );
335 }
336 } catch (IOException e) {
337 scheduleRemove(session);
338 session.getFilterChain().fireExceptionCaught(session, e);
339 }
340 }
341 }
342
343 private void releaseWriteBuffers(SocketSessionImpl session) {
344 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
345 WriteRequest req;
346
347 if ((req = writeRequestQueue.poll()) != null) {
348 ByteBuffer buf = (ByteBuffer) req.getMessage();
349 try {
350 buf.release();
351 } catch (IllegalStateException e) {
352 session.getFilterChain().fireExceptionCaught(session, e);
353 } finally {
354
355
356 if (buf.hasRemaining()) {
357 req.getFuture().setWritten(false);
358 } else {
359 session.getFilterChain().fireMessageSent(session, req);
360 }
361 }
362
363
364 while ((req = writeRequestQueue.poll()) != null) {
365 try {
366 ((ByteBuffer) req.getMessage()).release();
367 } catch (IllegalStateException e) {
368 session.getFilterChain().fireExceptionCaught(session, e);
369 } finally {
370 req.getFuture().setWritten(false);
371 }
372 }
373 }
374 }
375
376 private boolean doFlush(SocketSessionImpl session) throws IOException {
377 SocketChannel ch = session.getChannel();
378 if (!ch.isConnected()) {
379 scheduleRemove(session);
380 return false;
381 }
382
383
384 SelectionKey key = session.getSelectionKey();
385 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
386
387 Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
388
389 int writtenBytes = 0;
390 int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
391 try {
392 for (;;) {
393 WriteRequest req = writeRequestQueue.peek();
394
395 if (req == null)
396 break;
397
398 ByteBuffer buf = (ByteBuffer) req.getMessage();
399 if (buf.remaining() == 0) {
400 writeRequestQueue.poll();
401
402 buf.reset();
403
404 if (!buf.hasRemaining()) {
405 session.increaseWrittenMessages();
406 }
407
408 session.getFilterChain().fireMessageSent(session, req);
409 continue;
410 }
411
412 int localWrittenBytes = 0;
413 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
414 localWrittenBytes = ch.write(buf.buf());
415 if (localWrittenBytes != 0 || !buf.hasRemaining()) {
416 break;
417 }
418 }
419
420 writtenBytes += localWrittenBytes;
421
422 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
423
424 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
425 return false;
426 }
427 }
428 } finally {
429 session.increaseWrittenBytes(writtenBytes);
430 }
431
432 return true;
433 }
434
435 private void doUpdateTrafficMask() {
436 if (trafficControllingSessions.isEmpty())
437 return;
438
439 for (;;) {
440 SocketSessionImpl session = trafficControllingSessions.poll();
441
442 if (session == null)
443 break;
444
445 SelectionKey key = session.getSelectionKey();
446
447
448
449 if (key == null) {
450 scheduleTrafficControl(session);
451 break;
452 }
453
454 if (!key.isValid()) {
455 continue;
456 }
457
458
459
460 int ops = SelectionKey.OP_READ;
461 Queue<WriteRequest> writeRequestQueue = session
462 .getWriteRequestQueue();
463 synchronized (writeRequestQueue) {
464 if (!writeRequestQueue.isEmpty()) {
465 ops |= SelectionKey.OP_WRITE;
466 }
467 }
468
469
470 int mask = session.getTrafficMask().getInterestOps();
471 key.interestOps(ops & mask);
472 }
473 }
474
475 private class Worker implements Runnable {
476 public void run() {
477 Selector selector = SocketIoProcessor.this.selector;
478 for (;;) {
479 try {
480 int nKeys = selector.select(1000);
481 doAddNew();
482 doUpdateTrafficMask();
483
484 if (nKeys > 0) {
485 process(selector.selectedKeys());
486 }
487
488 doFlush();
489 doRemove();
490 notifyIdleness();
491
492 if (selector.keys().isEmpty()) {
493 synchronized (lock) {
494 if (selector.keys().isEmpty()
495 && newSessions.isEmpty()) {
496 worker = null;
497
498 try {
499 selector.close();
500 } catch (IOException e) {
501 ExceptionMonitor.getInstance()
502 .exceptionCaught(e);
503 } finally {
504 selector = null;
505 }
506
507 break;
508 }
509 }
510 }
511 } catch (Throwable t) {
512 ExceptionMonitor.getInstance().exceptionCaught(t);
513
514 try {
515 Thread.sleep(1000);
516 } catch (InterruptedException e1) {
517 ExceptionMonitor.getInstance().exceptionCaught(e1);
518 }
519 }
520 }
521 }
522 }
523 }