查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.nio.channels.Selector;
25  import java.nio.channels.SocketChannel;
26  import java.util.Queue;
27  import java.util.Set;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.common.ByteBuffer;
32  import org.apache.mina.common.ExceptionMonitor;
33  import org.apache.mina.common.IdleStatus;
34  import org.apache.mina.common.IoSession;
35  import org.apache.mina.common.WriteTimeoutException;
36  import org.apache.mina.common.IoFilter.WriteRequest;
37  import org.apache.mina.util.NamePreservingRunnable;
38  
39  /**
40   * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
41   *
42   * @author The Apache Directory Project (mina-dev@directory.apache.org)
43   * @version $Rev: 592224 $, $Date: 2007-11-06 11:11:40 +0900 (Tue, 06 Nov 2007) $,
44   */
45  class SocketIoProcessor {
46  
47      /**
48       * The maximum loop count for a write operation until
49       * {@link #write(IoSession, IoBuffer)} returns non-zero value.
50       * It is similar to what a spin lock is for in concurrency programming.
51       * It improves memory utilization and write throughput significantly.
52       */
53      private static final int WRITE_SPIN_COUNT = 256;
54      
55      private final Object lock = new Object();
56  
57      private final String threadName;
58  
59      private final Executor executor;
60  
61      private volatile Selector selector;
62  
63      private final Queue<SocketSessionImpl> newSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
64  
65      private final Queue<SocketSessionImpl> removingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
66  
67      private final Queue<SocketSessionImpl> flushingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
68  
69      private final Queue<SocketSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<SocketSessionImpl>();
70  
71      private Worker worker;
72  
73      private long lastIdleCheckTime = System.currentTimeMillis();
74  
75      SocketIoProcessor(String threadName, Executor executor) {
76          this.threadName = threadName;
77          this.executor = executor;
78      }
79  
80      void addNew(SocketSessionImpl session) throws IOException {
81          newSessions.add(session);
82          startupWorker();
83      }
84  
85      void remove(SocketSessionImpl session) throws IOException {
86          scheduleRemove(session);
87          startupWorker();
88      }
89  
90      private void startupWorker() throws IOException {
91          synchronized (lock) {
92              if (worker == null) {
93                  selector = Selector.open();
94                  worker = new Worker();
95                  executor.execute(new NamePreservingRunnable(worker, threadName));
96              }
97              selector.wakeup();
98          }
99      }
100 
101     void flush(SocketSessionImpl session) {
102         if ( scheduleFlush(session) ) {
103             Selector selector = this.selector;
104             if (selector != null) {
105                 selector.wakeup();
106             }
107         }
108     }
109 
110     void updateTrafficMask(SocketSessionImpl session) {
111         scheduleTrafficControl(session);
112         Selector selector = this.selector;
113         if (selector != null) {
114             selector.wakeup();
115         }
116     }
117 
118     private void scheduleRemove(SocketSessionImpl session) {
119         removingSessions.add(session);
120     }
121 
122     private boolean scheduleFlush(SocketSessionImpl session) {
123         if (session.setScheduledForFlush(true)) {
124             flushingSessions.add(session);
125 
126             return true;
127         }
128 
129         return false;
130     }
131 
132     private void scheduleTrafficControl(SocketSessionImpl session) {
133         trafficControllingSessions.add(session);
134     }
135 
136     private void doAddNew() {
137         Selector selector = this.selector;
138         for (;;) {
139             SocketSessionImpl session = newSessions.poll();
140 
141             if (session == null)
142                 break;
143 
144             SocketChannel ch = session.getChannel();
145             try {
146                 ch.configureBlocking(false);
147                 session.setSelectionKey(ch.register(selector,
148                         SelectionKey.OP_READ, session));
149 
150                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
151                 // in AbstractIoFilterChain.fireSessionOpened().
152                 session.getServiceListeners().fireSessionCreated(session);
153             } catch (IOException e) {
154                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
155                 // and call ConnectFuture.setException().
156                 session.getFilterChain().fireExceptionCaught(session, e);
157             }
158         }
159     }
160 
161     private void doRemove() {
162         for (;;) {
163             SocketSessionImpl session = removingSessions.poll();
164 
165             if (session == null)
166                 break;
167 
168             SocketChannel ch = session.getChannel();
169             SelectionKey key = session.getSelectionKey();
170             // Retry later if session is not yet fully initialized.
171             // (In case that Session.close() is called before addSession() is processed)
172             if (key == null) {
173                 scheduleRemove(session);
174                 break;
175             }
176             // skip if channel is already closed
177             if (!key.isValid()) {
178                 continue;
179             }
180 
181             try {
182                 key.cancel();
183                 ch.close();
184             } catch (IOException e) {
185                 session.getFilterChain().fireExceptionCaught(session, e);
186             } finally {
187                 releaseWriteBuffers(session);
188                 session.getServiceListeners().fireSessionDestroyed(session);
189             }
190         }
191     }
192 
193     private void process(Set<SelectionKey> selectedKeys) {
194         for (SelectionKey key : selectedKeys) {
195             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
196 
197             if (key.isReadable() && session.getTrafficMask().isReadable()) {
198                 read(session);
199             }
200 
201             if (key.isWritable() && session.getTrafficMask().isWritable()) {
202                 scheduleFlush(session);
203             }
204         }
205 
206         selectedKeys.clear();
207     }
208 
209     private void read(SocketSessionImpl session) {
210         ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
211         SocketChannel ch = session.getChannel();
212 
213         try {
214             int readBytes = 0;
215             int ret;
216 
217             try {
218                 while ((ret = ch.read(buf.buf())) > 0) {
219                     readBytes += ret;
220                 }
221             } finally {
222                 buf.flip();
223             }
224 
225             session.increaseReadBytes(readBytes);
226 
227             if (readBytes > 0) {
228                 session.getFilterChain().fireMessageReceived(session, buf);
229                 buf = null;
230 
231                 if (readBytes * 2 < session.getReadBufferSize()) {
232                     session.decreaseReadBufferSize();
233                 } else if (readBytes == session.getReadBufferSize()) {
234                     session.increaseReadBufferSize();
235                 }
236             }
237             if (ret < 0) {
238                 scheduleRemove(session);
239             }
240         } catch (Throwable e) {
241             if (e instanceof IOException)
242                 scheduleRemove(session);
243             session.getFilterChain().fireExceptionCaught(session, e);
244         } finally {
245             if (buf != null)
246                 buf.release();
247         }
248     }
249 
250     private void notifyIdleness() {
251         // process idle sessions
252         long currentTime = System.currentTimeMillis();
253         if ((currentTime - lastIdleCheckTime) >= 1000) {
254             lastIdleCheckTime = currentTime;
255             Set<SelectionKey> keys = selector.keys();
256             if (keys != null) {
257                 for (SelectionKey key : keys) {
258                     SocketSessionImpl session = (SocketSessionImpl) key
259                             .attachment();
260                     notifyIdleness(session, currentTime);
261                 }
262             }
263         }
264     }
265 
266     private void notifyIdleness(SocketSessionImpl session, long currentTime) {
267         notifyIdleness0(session, currentTime, session
268                 .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
269                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
270                         .getLastIdleTime(IdleStatus.BOTH_IDLE)));
271         notifyIdleness0(session, currentTime, session
272                 .getIdleTimeInMillis(IdleStatus.READER_IDLE),
273                 IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
274                         session.getLastIdleTime(IdleStatus.READER_IDLE)));
275         notifyIdleness0(session, currentTime, session
276                 .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
277                 IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
278                         session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
279 
280         notifyWriteTimeout(session, currentTime, session
281                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
282     }
283 
284     private void notifyIdleness0(SocketSessionImpl session, long currentTime,
285             long idleTime, IdleStatus status, long lastIoTime) {
286         if (idleTime > 0 && lastIoTime != 0
287                 && (currentTime - lastIoTime) >= idleTime) {
288             session.increaseIdleCount(status);
289             session.getFilterChain().fireSessionIdle(session, status);
290         }
291     }
292 
293     private void notifyWriteTimeout(SocketSessionImpl session,
294             long currentTime, long writeTimeout, long lastIoTime) {
295         SelectionKey key = session.getSelectionKey();
296         if (writeTimeout > 0 && (currentTime - lastIoTime) >= writeTimeout
297                 && key != null && key.isValid()
298                 && (key.interestOps() & SelectionKey.OP_WRITE) != 0) {
299             session.getFilterChain().fireExceptionCaught(session,
300                     new WriteTimeoutException());
301         }
302     }
303 
304     private void doFlush() {
305         for (;;) {
306             SocketSessionImpl session = flushingSessions.poll();
307 
308             if (session == null)
309                 break;
310 
311             session.setScheduledForFlush(false);
312 
313             if (!session.isConnected()) {
314                 releaseWriteBuffers(session);
315                 continue;
316             }
317 
318             SelectionKey key = session.getSelectionKey();
319             // Retry later if session is not yet fully initialized.
320             // (In case that Session.write() is called before addSession() is processed)
321             if (key == null) {
322                 scheduleFlush(session);
323                 break;
324             }
325 
326             // Skip if the channel is already closed.
327             if (!key.isValid()) {
328                 continue;
329             }
330 
331             try {
332                 boolean flushedAll = doFlush(session);
333                 if( flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
334                     scheduleFlush( session );
335                 }
336             } catch (IOException e) {
337                 scheduleRemove(session);
338                 session.getFilterChain().fireExceptionCaught(session, e);
339             }
340         }
341     }
342 
343     private void releaseWriteBuffers(SocketSessionImpl session) {
344         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
345         WriteRequest req;
346 
347         if ((req = writeRequestQueue.poll()) != null) {
348             ByteBuffer buf = (ByteBuffer) req.getMessage();
349             try {
350                 buf.release();
351             } catch (IllegalStateException e) {
352                 session.getFilterChain().fireExceptionCaught(session, e);
353             } finally {
354                 // The first unwritten empty buffer must be
355                 // forwarded to the filter chain.
356                 if (buf.hasRemaining()) {
357                     req.getFuture().setWritten(false);
358                 } else {
359                     session.getFilterChain().fireMessageSent(session, req);
360                 }
361             }
362 
363             // Discard others.
364             while ((req = writeRequestQueue.poll()) != null) {
365                 try {
366                     ((ByteBuffer) req.getMessage()).release();
367                 } catch (IllegalStateException e) {
368                     session.getFilterChain().fireExceptionCaught(session, e);
369                 } finally {
370                     req.getFuture().setWritten(false);
371                 }
372             }
373         }
374     }
375 
376     private boolean doFlush(SocketSessionImpl session) throws IOException {
377         SocketChannel ch = session.getChannel();
378         if (!ch.isConnected()) {
379             scheduleRemove(session);
380             return false;
381         }
382         
383         // Clear OP_WRITE
384         SelectionKey key = session.getSelectionKey();
385         key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
386 
387         Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
388 
389         int writtenBytes = 0;
390         int maxWrittenBytes = ((SocketSessionConfig) session.getConfig()).getSendBufferSize() << 1;
391         try {
392             for (;;) {
393                 WriteRequest req = writeRequestQueue.peek();
394 
395                 if (req == null)
396                     break;
397 
398                 ByteBuffer buf = (ByteBuffer) req.getMessage();
399                 if (buf.remaining() == 0) {
400                     writeRequestQueue.poll();
401 
402                     buf.reset();
403                     
404                     if (!buf.hasRemaining()) {
405                         session.increaseWrittenMessages();
406                     }
407                     
408                     session.getFilterChain().fireMessageSent(session, req);
409                     continue;
410                 }
411 
412                 int localWrittenBytes = 0;
413                 for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
414                     localWrittenBytes = ch.write(buf.buf());
415                     if (localWrittenBytes != 0 || !buf.hasRemaining()) {
416                         break;
417                     }
418                 }
419 
420                 writtenBytes += localWrittenBytes;
421 
422                 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
423                     // Kernel buffer is full or wrote too much.
424                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
425                     return false;
426                 }
427             }
428         } finally {
429             session.increaseWrittenBytes(writtenBytes);
430         }
431 
432         return true;
433     }
434 
435     private void doUpdateTrafficMask() {
436         if (trafficControllingSessions.isEmpty())
437             return;
438 
439         for (;;) {
440             SocketSessionImpl session = trafficControllingSessions.poll();
441 
442             if (session == null)
443                 break;
444 
445             SelectionKey key = session.getSelectionKey();
446             // Retry later if session is not yet fully initialized.
447             // (In case that Session.suspend??() or session.resume??() is
448             // called before addSession() is processed)
449             if (key == null) {
450                 scheduleTrafficControl(session);
451                 break;
452             }
453             // skip if channel is already closed
454             if (!key.isValid()) {
455                 continue;
456             }
457 
458             // The normal is OP_READ and, if there are write requests in the
459             // session's write queue, set OP_WRITE to trigger flushing.
460             int ops = SelectionKey.OP_READ;
461             Queue<WriteRequest> writeRequestQueue = session
462                     .getWriteRequestQueue();
463             synchronized (writeRequestQueue) {
464                 if (!writeRequestQueue.isEmpty()) {
465                     ops |= SelectionKey.OP_WRITE;
466                 }
467             }
468 
469             // Now mask the preferred ops with the mask of the current session
470             int mask = session.getTrafficMask().getInterestOps();
471             key.interestOps(ops & mask);
472         }
473     }
474 
475     private class Worker implements Runnable {
476         public void run() {
477             Selector selector = SocketIoProcessor.this.selector;
478             for (;;) {
479                 try {
480                     int nKeys = selector.select(1000);
481                     doAddNew();
482                     doUpdateTrafficMask();
483 
484                     if (nKeys > 0) {
485                         process(selector.selectedKeys());
486                     }
487 
488                     doFlush();
489                     doRemove();
490                     notifyIdleness();
491 
492                     if (selector.keys().isEmpty()) {
493                         synchronized (lock) {
494                             if (selector.keys().isEmpty()
495                                     && newSessions.isEmpty()) {
496                                 worker = null;
497 
498                                 try {
499                                     selector.close();
500                                 } catch (IOException e) {
501                                     ExceptionMonitor.getInstance()
502                                             .exceptionCaught(e);
503                                 } finally {
504                                     selector = null;
505                                 }
506 
507                                 break;
508                             }
509                         }
510                     }
511                 } catch (Throwable t) {
512                     ExceptionMonitor.getInstance().exceptionCaught(t);
513 
514                     try {
515                         Thread.sleep(1000);
516                     } catch (InterruptedException e1) {
517                         ExceptionMonitor.getInstance().exceptionCaught(e1);
518                     }
519                 }
520             }
521         }
522     }
523 }