查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoHandler;
49  import org.apache.mina.core.service.IoProcessor;
50  import org.apache.mina.core.service.IoService;
51  import org.apache.mina.core.service.TransportMetadata;
52  import org.apache.mina.core.write.DefaultWriteRequest;
53  import org.apache.mina.core.write.WriteException;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.core.write.WriteTimeoutException;
57  import org.apache.mina.core.write.WriteToClosedSessionException;
58  import org.apache.mina.util.ExceptionMonitor;
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66      /** The associated handler */
67      private final IoHandler handler;
68  
69      /** The session config */
70      protected IoSessionConfig config;
71  
72      /** The service which will manage this session */
73      private final IoService service;
74  
75      private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76              "readyReadFutures");
77  
78      private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79              "waitingReadFutures");
80  
81      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82          public void operationComplete(CloseFuture future) {
83              AbstractIoSession session = (AbstractIoSession) future.getSession();
84              session.scheduledWriteBytes.set(0);
85              session.scheduledWriteMessages.set(0);
86              session.readBytesThroughput = 0;
87              session.readMessagesThroughput = 0;
88              session.writtenBytesThroughput = 0;
89              session.writtenMessagesThroughput = 0;
90          }
91      };
92  
93      /**
94       * An internal write request object that triggers session close.
95       * 
96       * @see #writeRequestQueue
97       */
98      private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
99  
100     private final Object lock = new Object();
101 
102     private IoSessionAttributeMap attributes;
103 
104     private WriteRequestQueue writeRequestQueue;
105 
106     private WriteRequest currentWriteRequest;
107 
108     /** The Session creation's time */
109     private final long creationTime;
110 
111     /** An id generator guaranteed to generate unique IDs for the session */
112     private static AtomicLong idGenerator = new AtomicLong(0);
113 
114     /** The session ID */
115     private long sessionId;
116 
117     /**
118      * A future that will be set 'closed' when the connection is closed.
119      */
120     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
121 
122     private volatile boolean closing;
123 
124     // traffic control
125     private boolean readSuspended = false;
126 
127     private boolean writeSuspended = false;
128 
129     // Status variables
130     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
131 
132     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
133 
134     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
135 
136     private long readBytes;
137 
138     private long writtenBytes;
139 
140     private long readMessages;
141 
142     private long writtenMessages;
143 
144     private long lastReadTime;
145 
146     private long lastWriteTime;
147 
148     private long lastThroughputCalculationTime;
149 
150     private long lastReadBytes;
151 
152     private long lastWrittenBytes;
153 
154     private long lastReadMessages;
155 
156     private long lastWrittenMessages;
157 
158     private double readBytesThroughput;
159 
160     private double writtenBytesThroughput;
161 
162     private double readMessagesThroughput;
163 
164     private double writtenMessagesThroughput;
165 
166     private AtomicInteger idleCountForBoth = new AtomicInteger();
167 
168     private AtomicInteger idleCountForRead = new AtomicInteger();
169 
170     private AtomicInteger idleCountForWrite = new AtomicInteger();
171 
172     private long lastIdleTimeForBoth;
173 
174     private long lastIdleTimeForRead;
175 
176     private long lastIdleTimeForWrite;
177 
178     private boolean deferDecreaseReadBuffer = true;
179 
180     /**
181      * Create a Session for a service
182      * 
183      * @param service the Service for this session
184      */
185     protected AbstractIoSession(IoService service) {
186         this.service = service;
187         this.handler = service.getHandler();
188 
189         // Initialize all the Session counters to the current time
190         long currentTime = System.currentTimeMillis();
191         creationTime = currentTime;
192         lastThroughputCalculationTime = currentTime;
193         lastReadTime = currentTime;
194         lastWriteTime = currentTime;
195         lastIdleTimeForBoth = currentTime;
196         lastIdleTimeForRead = currentTime;
197         lastIdleTimeForWrite = currentTime;
198 
199         // TODO add documentation
200         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
201 
202         // Set a new ID for this session
203         sessionId = idGenerator.incrementAndGet();
204     }
205 
206     /**
207      * {@inheritDoc}
208      * 
209      * We use an AtomicLong to guarantee that the session ID are unique.
210      */
211     public final long getId() {
212         return sessionId;
213     }
214 
215     /**
216      * @return The associated IoProcessor for this session
217      */
218     public abstract IoProcessor getProcessor();
219 
220     /**
221      * {@inheritDoc}
222      */
223     public final boolean isConnected() {
224         return !closeFuture.isClosed();
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     public boolean isActive() {
231         // Return true by default
232         return true;
233     }
234 
235     /**
236      * {@inheritDoc}
237      */
238     public final boolean isClosing() {
239         return closing || closeFuture.isClosed();
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     public boolean isSecured() {
246         // Always false...
247         return false;
248     }
249 
250     /**
251      * {@inheritDoc}
252      */
253     public final CloseFuture getCloseFuture() {
254         return closeFuture;
255     }
256 
257     /**
258      * Tells if the session is scheduled for flushed
259      * 
260      * @return true if the session is scheduled for flush
261      */
262     public final boolean isScheduledForFlush() {
263         return scheduledForFlush.get();
264     }
265 
266     /**
267      * Schedule the session for flushed
268      */
269     public final void scheduledForFlush() {
270         scheduledForFlush.set(true);
271     }
272 
273     /**
274      * Change the session's status : it's not anymore scheduled for flush
275      */
276     public final void unscheduledForFlush() {
277         scheduledForFlush.set(false);
278     }
279 
280     /**
281      * Set the scheduledForFLush flag. As we may have concurrent access to this
282      * flag, we compare and set it in one call.
283      * 
284      * @param schedule
285      *            the new value to set if not already set.
286      * @return true if the session flag has been set, and if it wasn't set
287      *         already.
288      */
289     public final boolean setScheduledForFlush(boolean schedule) {
290         if (schedule) {
291             // If the current tag is set to false, switch it to true,
292             // otherwise, we do nothing but return false : the session
293             // is already scheduled for flush
294             return scheduledForFlush.compareAndSet(false, schedule);
295         }
296 
297         scheduledForFlush.set(schedule);
298         return true;
299     }
300 
301     /**
302      * {@inheritDoc}
303      */
304     public final CloseFuture close(boolean rightNow) {
305         if (rightNow) {
306             return closeNow();
307         } else {
308             return closeOnFlush();
309         }
310     }
311 
312     /**
313      * {@inheritDoc}
314      */
315     public final CloseFuture close() {
316         try {
317             closeNow();
318         } finally {
319             return closeFuture;
320         }
321     }
322 
323     /**
324      * {@inheritDoc}
325      */
326     public final CloseFuture closeOnFlush() {
327         if (!isClosing()) {
328             getWriteRequestQueue().offer(this, CLOSE_REQUEST);
329             getProcessor().flush(this);
330             return closeFuture;
331         } else {
332             return closeFuture;
333         }
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     public final CloseFuture closeNow() {
340         synchronized (lock) {
341             if (isClosing()) {
342                 return closeFuture;
343             }
344 
345             closing = true;
346         }
347 
348         getFilterChain().fireFilterClose();
349 
350         return closeFuture;
351     }
352 
353     /**
354      * {@inheritDoc}
355      */
356     public IoHandler getHandler() {
357         return handler;
358     }
359 
360     /**
361      * {@inheritDoc}
362      */
363     public IoSessionConfig getConfig() {
364         return config;
365     }
366 
367     /**
368      * {@inheritDoc}
369      */
370     public final ReadFuture read() {
371         if (!getConfig().isUseReadOperation()) {
372             throw new IllegalStateException("useReadOperation is not enabled.");
373         }
374 
375         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
376         ReadFuture future;
377         synchronized (readyReadFutures) {
378             future = readyReadFutures.poll();
379             if (future != null) {
380                 if (future.isClosed()) {
381                     // Let other readers get notified.
382                     readyReadFutures.offer(future);
383                 }
384             } else {
385                 future = new DefaultReadFuture(this);
386                 getWaitingReadFutures().offer(future);
387             }
388         }
389 
390         return future;
391     }
392 
393     /**
394      * Associates a message to a ReadFuture
395      * 
396      * @param message the message to associate to the ReadFuture
397      * 
398      */
399     public final void offerReadFuture(Object message) {
400         newReadFuture().setRead(message);
401     }
402 
403     /**
404      * Associates a failure to a ReadFuture
405      * 
406      * @param exception the exception to associate to the ReadFuture
407      */
408     public final void offerFailedReadFuture(Throwable exception) {
409         newReadFuture().setException(exception);
410     }
411 
412     /**
413      * Inform the ReadFuture that the session has been closed
414      */
415     public final void offerClosedReadFuture() {
416         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
417         
418         synchronized (readyReadFutures) {
419             newReadFuture().setClosed();
420         }
421     }
422 
423     /**
424      * @return a readFuture get from the waiting ReadFuture
425      */
426     private ReadFuture newReadFuture() {
427         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
428         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
429         ReadFuture future;
430         
431         synchronized (readyReadFutures) {
432             future = waitingReadFutures.poll();
433             
434             if (future == null) {
435                 future = new DefaultReadFuture(this);
436                 readyReadFutures.offer(future);
437             }
438         }
439         
440         return future;
441     }
442 
443     /**
444      * @return a queue of ReadFuture
445      */
446     private Queue<ReadFuture> getReadyReadFutures() {
447         Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
448         
449         if (readyReadFutures == null) {
450             readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
451 
452             Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
453                     readyReadFutures);
454             
455             if (oldReadyReadFutures != null) {
456                 readyReadFutures = oldReadyReadFutures;
457             }
458         }
459         
460         return readyReadFutures;
461     }
462 
463     /**
464      * @return the queue of waiting ReadFuture
465      */
466     private Queue<ReadFuture> getWaitingReadFutures() {
467         Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
468         
469         if (waitingReadyReadFutures == null) {
470             waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
471 
472             Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
473                     WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
474             
475             if (oldWaitingReadyReadFutures != null) {
476                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
477             }
478         }
479         
480         return waitingReadyReadFutures;
481     }
482 
483     /**
484      * {@inheritDoc}
485      */
486     public WriteFuture write(Object message) {
487         return write(message, null);
488     }
489 
490     /**
491      * {@inheritDoc}
492      */
493     public WriteFuture write(Object message, SocketAddress remoteAddress) {
494         if (message == null) {
495             throw new IllegalArgumentException("Trying to write a null message : not allowed");
496         }
497 
498         // We can't send a message to a connected session if we don't have
499         // the remote address
500         if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
501             throw new UnsupportedOperationException();
502         }
503 
504         // If the session has been closed or is closing, we can't either
505         // send a message to the remote side. We generate a future
506         // containing an exception.
507         if (isClosing() || !isConnected()) {
508             WriteFuture future = new DefaultWriteFuture(this);
509             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
510             WriteException writeException = new WriteToClosedSessionException(request);
511             future.setException(writeException);
512             return future;
513         }
514 
515         FileChannel openedFileChannel = null;
516 
517         // TODO: remove this code as soon as we use InputStream
518         // instead of Object for the message.
519         try {
520             if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
521                 // Nothing to write : probably an error in the user code
522                 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
523             } else if (message instanceof FileChannel) {
524                 FileChannel fileChannel = (FileChannel) message;
525                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
526             } else if (message instanceof File) {
527                 File file = (File) message;
528                 openedFileChannel = new FileInputStream(file).getChannel();
529                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
530             }
531         } catch (IOException e) {
532             ExceptionMonitor.getInstance().exceptionCaught(e);
533             return DefaultWriteFuture.newNotWrittenFuture(this, e);
534         }
535 
536         // Now, we can write the message. First, create a future
537         WriteFuture writeFuture = new DefaultWriteFuture(this);
538         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
539 
540         // Then, get the chain and inject the WriteRequest into it
541         IoFilterChain filterChain = getFilterChain();
542         filterChain.fireFilterWrite(writeRequest);
543 
544         // TODO : This is not our business ! The caller has created a
545         // FileChannel,
546         // he has to close it !
547         if (openedFileChannel != null) {
548             // If we opened a FileChannel, it needs to be closed when the write
549             // has completed
550             final FileChannel finalChannel = openedFileChannel;
551             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
552                 public void operationComplete(WriteFuture future) {
553                     try {
554                         finalChannel.close();
555                     } catch (IOException e) {
556                         ExceptionMonitor.getInstance().exceptionCaught(e);
557                     }
558                 }
559             });
560         }
561 
562         // Return the WriteFuture.
563         return writeFuture;
564     }
565 
566     /**
567      * {@inheritDoc}
568      */
569     public final Object getAttachment() {
570         return getAttribute("");
571     }
572 
573     /**
574      * {@inheritDoc}
575      */
576     public final Object setAttachment(Object attachment) {
577         return setAttribute("", attachment);
578     }
579 
580     /**
581      * {@inheritDoc}
582      */
583     public final Object getAttribute(Object key) {
584         return getAttribute(key, null);
585     }
586 
587     /**
588      * {@inheritDoc}
589      */
590     public final Object getAttribute(Object key, Object defaultValue) {
591         return attributes.getAttribute(this, key, defaultValue);
592     }
593 
594     /**
595      * {@inheritDoc}
596      */
597     public final Object setAttribute(Object key, Object value) {
598         return attributes.setAttribute(this, key, value);
599     }
600 
601     /**
602      * {@inheritDoc}
603      */
604     public final Object setAttribute(Object key) {
605         return setAttribute(key, Boolean.TRUE);
606     }
607 
608     /**
609      * {@inheritDoc}
610      */
611     public final Object setAttributeIfAbsent(Object key, Object value) {
612         return attributes.setAttributeIfAbsent(this, key, value);
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     public final Object setAttributeIfAbsent(Object key) {
619         return setAttributeIfAbsent(key, Boolean.TRUE);
620     }
621 
622     /**
623      * {@inheritDoc}
624      */
625     public final Object removeAttribute(Object key) {
626         return attributes.removeAttribute(this, key);
627     }
628 
629     /**
630      * {@inheritDoc}
631      */
632     public final boolean removeAttribute(Object key, Object value) {
633         return attributes.removeAttribute(this, key, value);
634     }
635 
636     /**
637      * {@inheritDoc}
638      */
639     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
640         return attributes.replaceAttribute(this, key, oldValue, newValue);
641     }
642 
643     /**
644      * {@inheritDoc}
645      */
646     public final boolean containsAttribute(Object key) {
647         return attributes.containsAttribute(this, key);
648     }
649 
650     /**
651      * {@inheritDoc}
652      */
653     public final Set<Object> getAttributeKeys() {
654         return attributes.getAttributeKeys(this);
655     }
656 
657     /**
658      * @return The map of attributes associated with the session
659      */
660     public final IoSessionAttributeMap getAttributeMap() {
661         return attributes;
662     }
663 
664     /**
665      * Set the map of attributes associated with the session
666      * 
667      * @param attributes The Map of attributes
668      */
669     public final void setAttributeMap(IoSessionAttributeMap attributes) {
670         this.attributes = attributes;
671     }
672 
673     /**
674      * Create a new close aware write queue, based on the given write queue.
675      * 
676      * @param writeRequestQueue The write request queue
677      */
678     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
679         this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue);
680     }
681 
682     /**
683      * {@inheritDoc}
684      */
685     public final void suspendRead() {
686         readSuspended = true;
687         if (isClosing() || !isConnected()) {
688             return;
689         }
690         getProcessor().updateTrafficControl(this);
691     }
692 
693     /**
694      * {@inheritDoc}
695      */
696     public final void suspendWrite() {
697         writeSuspended = true;
698         if (isClosing() || !isConnected()) {
699             return;
700         }
701         getProcessor().updateTrafficControl(this);
702     }
703 
704     /**
705      * {@inheritDoc}
706      */
707     @SuppressWarnings("unchecked")
708     public final void resumeRead() {
709         readSuspended = false;
710         if (isClosing() || !isConnected()) {
711             return;
712         }
713         getProcessor().updateTrafficControl(this);
714     }
715 
716     /**
717      * {@inheritDoc}
718      */
719     @SuppressWarnings("unchecked")
720     public final void resumeWrite() {
721         writeSuspended = false;
722         if (isClosing() || !isConnected()) {
723             return;
724         }
725         getProcessor().updateTrafficControl(this);
726     }
727 
728     /**
729      * {@inheritDoc}
730      */
731     public boolean isReadSuspended() {
732         return readSuspended;
733     }
734 
735     /**
736      * {@inheritDoc}
737      */
738     public boolean isWriteSuspended() {
739         return writeSuspended;
740     }
741 
742     /**
743      * {@inheritDoc}
744      */
745     public final long getReadBytes() {
746         return readBytes;
747     }
748 
749     /**
750      * {@inheritDoc}
751      */
752     public final long getWrittenBytes() {
753         return writtenBytes;
754     }
755 
756     /**
757      * {@inheritDoc}
758      */
759     public final long getReadMessages() {
760         return readMessages;
761     }
762 
763     /**
764      * {@inheritDoc}
765      */
766     public final long getWrittenMessages() {
767         return writtenMessages;
768     }
769 
770     /**
771      * {@inheritDoc}
772      */
773     public final double getReadBytesThroughput() {
774         return readBytesThroughput;
775     }
776 
777     /**
778      * {@inheritDoc}
779      */
780     public final double getWrittenBytesThroughput() {
781         return writtenBytesThroughput;
782     }
783 
784     /**
785      * {@inheritDoc}
786      */
787     public final double getReadMessagesThroughput() {
788         return readMessagesThroughput;
789     }
790 
791     /**
792      * {@inheritDoc}
793      */
794     public final double getWrittenMessagesThroughput() {
795         return writtenMessagesThroughput;
796     }
797 
798     /**
799      * {@inheritDoc}
800      */
801     public final void updateThroughput(long currentTime, boolean force) {
802         int interval = (int) (currentTime - lastThroughputCalculationTime);
803 
804         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
805 
806         if (((minInterval == 0) || (interval < minInterval)) && !force) {
807             return;
808         }
809 
810         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
811         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
812         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
813         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
814 
815         lastReadBytes = readBytes;
816         lastWrittenBytes = writtenBytes;
817         lastReadMessages = readMessages;
818         lastWrittenMessages = writtenMessages;
819 
820         lastThroughputCalculationTime = currentTime;
821     }
822 
823     /**
824      * {@inheritDoc}
825      */
826     public final long getScheduledWriteBytes() {
827         return scheduledWriteBytes.get();
828     }
829 
830     /**
831      * {@inheritDoc}
832      */
833     public final int getScheduledWriteMessages() {
834         return scheduledWriteMessages.get();
835     }
836 
837     /**
838      * Set the number of scheduled write bytes
839      * 
840      * @param byteCount The number of scheduled bytes for write
841      */
842     protected void setScheduledWriteBytes(int byteCount) {
843         scheduledWriteBytes.set(byteCount);
844     }
845 
846     /**
847      * Set the number of scheduled write messages
848      * 
849      * @param messages The number of scheduled messages for write
850      */
851     protected void setScheduledWriteMessages(int messages) {
852         scheduledWriteMessages.set(messages);
853     }
854 
855     /**
856      * Increase the number of read bytes
857      * 
858      * @param increment The number of read bytes
859      * @param currentTime The current time
860      */
861     public final void increaseReadBytes(long increment, long currentTime) {
862         if (increment <= 0) {
863             return;
864         }
865 
866         readBytes += increment;
867         lastReadTime = currentTime;
868         idleCountForBoth.set(0);
869         idleCountForRead.set(0);
870 
871         if (getService() instanceof AbstractIoService) {
872             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
873         }
874     }
875 
876     /**
877      * Increase the number of read messages
878      * 
879      * @param currentTime The current time
880      */
881     public final void increaseReadMessages(long currentTime) {
882         readMessages++;
883         lastReadTime = currentTime;
884         idleCountForBoth.set(0);
885         idleCountForRead.set(0);
886 
887         if (getService() instanceof AbstractIoService) {
888             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
889         }
890     }
891 
892     /**
893      * Increase the number of written bytes
894      * 
895      * @param increment The number of written bytes
896      * @param currentTime The current time
897      */
898     public final void increaseWrittenBytes(int increment, long currentTime) {
899         if (increment <= 0) {
900             return;
901         }
902 
903         writtenBytes += increment;
904         lastWriteTime = currentTime;
905         idleCountForBoth.set(0);
906         idleCountForWrite.set(0);
907 
908         if (getService() instanceof AbstractIoService) {
909             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
910         }
911 
912         increaseScheduledWriteBytes(-increment);
913     }
914 
915     /**
916      * Increase the number of written messages
917      * 
918      * @param request The written message
919      * @param currentTime The current tile
920      */
921     public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
922         Object message = request.getMessage();
923 
924         if (message instanceof IoBuffer) {
925             IoBuffer b = (IoBuffer) message;
926 
927             if (b.hasRemaining()) {
928                 return;
929             }
930         }
931 
932         writtenMessages++;
933         lastWriteTime = currentTime;
934 
935         if (getService() instanceof AbstractIoService) {
936             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
937         }
938 
939         decreaseScheduledWriteMessages();
940     }
941 
942     /**
943      * Increase the number of scheduled write bytes for the session
944      * 
945      * @param increment The number of newly added bytes to write
946      */
947     public final void increaseScheduledWriteBytes(int increment) {
948         scheduledWriteBytes.addAndGet(increment);
949         if (getService() instanceof AbstractIoService) {
950             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
951         }
952     }
953 
954     /**
955      * Increase the number of scheduled message to write
956      */
957     public final void increaseScheduledWriteMessages() {
958         scheduledWriteMessages.incrementAndGet();
959         
960         if (getService() instanceof AbstractIoService) {
961             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
962         }
963     }
964 
965     /**
966      * Decrease the number of scheduled message written
967      */
968     private void decreaseScheduledWriteMessages() {
969         scheduledWriteMessages.decrementAndGet();
970         if (getService() instanceof AbstractIoService) {
971             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
972         }
973     }
974 
975     /**
976      * Decrease the counters of written messages and written bytes when a message has been written
977      * 
978      * @param request The written message
979      */
980     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
981         Object message = request.getMessage();
982         
983         if (message instanceof IoBuffer) {
984             IoBuffer b = (IoBuffer) message;
985             
986             if (b.hasRemaining()) {
987                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
988             } else {
989                 decreaseScheduledWriteMessages();
990             }
991         } else {
992             decreaseScheduledWriteMessages();
993         }
994     }
995 
996     /**
997      * {@inheritDoc}
998      */
999     public final WriteRequestQueue getWriteRequestQueue() {
1000         if (writeRequestQueue == null) {
1001             throw new IllegalStateException();
1002         }
1003         
1004         return writeRequestQueue;
1005     }
1006 
1007     /**
1008      * {@inheritDoc}
1009      */
1010     public final WriteRequest getCurrentWriteRequest() {
1011         return currentWriteRequest;
1012     }
1013 
1014     /**
1015      * {@inheritDoc}
1016      */
1017     public final Object getCurrentWriteMessage() {
1018         WriteRequest req = getCurrentWriteRequest();
1019         
1020         if (req == null) {
1021             return null;
1022         }
1023         return req.getMessage();
1024     }
1025 
1026     /**
1027      * {@inheritDoc}
1028      */
1029     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1030         this.currentWriteRequest = currentWriteRequest;
1031     }
1032 
1033     /**
1034      * Increase the ReadBuffer size (it will double)
1035      */
1036     public final void increaseReadBufferSize() {
1037         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1038         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1039             getConfig().setReadBufferSize(newReadBufferSize);
1040         } else {
1041             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1042         }
1043 
1044         deferDecreaseReadBuffer = true;
1045     }
1046 
1047     /**
1048      * Decrease the ReadBuffer size (it will be divided by a factor 2)
1049      */
1050     public final void decreaseReadBufferSize() {
1051         if (deferDecreaseReadBuffer) {
1052             deferDecreaseReadBuffer = false;
1053             return;
1054         }
1055 
1056         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1057             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1058         }
1059 
1060         deferDecreaseReadBuffer = true;
1061     }
1062 
1063     /**
1064      * {@inheritDoc}
1065      */
1066     public final long getCreationTime() {
1067         return creationTime;
1068     }
1069 
1070     /**
1071      * {@inheritDoc}
1072      */
1073     public final long getLastIoTime() {
1074         return Math.max(lastReadTime, lastWriteTime);
1075     }
1076 
1077     /**
1078      * {@inheritDoc}
1079      */
1080     public final long getLastReadTime() {
1081         return lastReadTime;
1082     }
1083 
1084     /**
1085      * {@inheritDoc}
1086      */
1087     public final long getLastWriteTime() {
1088         return lastWriteTime;
1089     }
1090 
1091     /**
1092      * {@inheritDoc}
1093      */
1094     public final boolean isIdle(IdleStatus status) {
1095         if (status == IdleStatus.BOTH_IDLE) {
1096             return idleCountForBoth.get() > 0;
1097         }
1098 
1099         if (status == IdleStatus.READER_IDLE) {
1100             return idleCountForRead.get() > 0;
1101         }
1102 
1103         if (status == IdleStatus.WRITER_IDLE) {
1104             return idleCountForWrite.get() > 0;
1105         }
1106 
1107         throw new IllegalArgumentException("Unknown idle status: " + status);
1108     }
1109 
1110     /**
1111      * {@inheritDoc}
1112      */
1113     public final boolean isBothIdle() {
1114         return isIdle(IdleStatus.BOTH_IDLE);
1115     }
1116 
1117     /**
1118      * {@inheritDoc}
1119      */
1120     public final boolean isReaderIdle() {
1121         return isIdle(IdleStatus.READER_IDLE);
1122     }
1123 
1124     /**
1125      * {@inheritDoc}
1126      */
1127     public final boolean isWriterIdle() {
1128         return isIdle(IdleStatus.WRITER_IDLE);
1129     }
1130 
1131     /**
1132      * {@inheritDoc}
1133      */
1134     public final int getIdleCount(IdleStatus status) {
1135         if (getConfig().getIdleTime(status) == 0) {
1136             if (status == IdleStatus.BOTH_IDLE) {
1137                 idleCountForBoth.set(0);
1138             }
1139 
1140             if (status == IdleStatus.READER_IDLE) {
1141                 idleCountForRead.set(0);
1142             }
1143 
1144             if (status == IdleStatus.WRITER_IDLE) {
1145                 idleCountForWrite.set(0);
1146             }
1147         }
1148 
1149         if (status == IdleStatus.BOTH_IDLE) {
1150             return idleCountForBoth.get();
1151         }
1152 
1153         if (status == IdleStatus.READER_IDLE) {
1154             return idleCountForRead.get();
1155         }
1156 
1157         if (status == IdleStatus.WRITER_IDLE) {
1158             return idleCountForWrite.get();
1159         }
1160 
1161         throw new IllegalArgumentException("Unknown idle status: " + status);
1162     }
1163 
1164     /**
1165      * {@inheritDoc}
1166      */
1167     public final long getLastIdleTime(IdleStatus status) {
1168         if (status == IdleStatus.BOTH_IDLE) {
1169             return lastIdleTimeForBoth;
1170         }
1171 
1172         if (status == IdleStatus.READER_IDLE) {
1173             return lastIdleTimeForRead;
1174         }
1175 
1176         if (status == IdleStatus.WRITER_IDLE) {
1177             return lastIdleTimeForWrite;
1178         }
1179 
1180         throw new IllegalArgumentException("Unknown idle status: " + status);
1181     }
1182 
1183     /**
1184      * Increase the count of the various Idle counter
1185      * 
1186      * @param status The current status
1187      * @param currentTime The current time
1188      */
1189     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1190         if (status == IdleStatus.BOTH_IDLE) {
1191             idleCountForBoth.incrementAndGet();
1192             lastIdleTimeForBoth = currentTime;
1193         } else if (status == IdleStatus.READER_IDLE) {
1194             idleCountForRead.incrementAndGet();
1195             lastIdleTimeForRead = currentTime;
1196         } else if (status == IdleStatus.WRITER_IDLE) {
1197             idleCountForWrite.incrementAndGet();
1198             lastIdleTimeForWrite = currentTime;
1199         } else {
1200             throw new IllegalArgumentException("Unknown idle status: " + status);
1201         }
1202     }
1203 
1204     /**
1205      * {@inheritDoc}
1206      */
1207     public final int getBothIdleCount() {
1208         return getIdleCount(IdleStatus.BOTH_IDLE);
1209     }
1210 
1211     /**
1212      * {@inheritDoc}
1213      */
1214     public final long getLastBothIdleTime() {
1215         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1216     }
1217 
1218     /**
1219      * {@inheritDoc}
1220      */
1221     public final long getLastReaderIdleTime() {
1222         return getLastIdleTime(IdleStatus.READER_IDLE);
1223     }
1224 
1225     /**
1226      * {@inheritDoc}
1227      */
1228     public final long getLastWriterIdleTime() {
1229         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1230     }
1231 
1232     /**
1233      * {@inheritDoc}
1234      */
1235     public final int getReaderIdleCount() {
1236         return getIdleCount(IdleStatus.READER_IDLE);
1237     }
1238 
1239     /**
1240      * {@inheritDoc}
1241      */
1242     public final int getWriterIdleCount() {
1243         return getIdleCount(IdleStatus.WRITER_IDLE);
1244     }
1245 
1246     /**
1247      * {@inheritDoc}
1248      */
1249     public SocketAddress getServiceAddress() {
1250         IoService service = getService();
1251         if (service instanceof IoAcceptor) {
1252             return ((IoAcceptor) service).getLocalAddress();
1253         }
1254 
1255         return getRemoteAddress();
1256     }
1257 
1258     /**
1259      * {@inheritDoc}
1260      */
1261     @Override
1262     public final int hashCode() {
1263         return super.hashCode();
1264     }
1265 
1266     /**
1267      * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1268      * replaced.
1269      */
1270     @Override
1271     public final boolean equals(Object o) {
1272         return super.equals(o);
1273     }
1274 
1275     /**
1276      * {@inheritDoc}
1277      */
1278     @Override
1279     public String toString() {
1280         if (isConnected() || isClosing()) {
1281             String remote = null;
1282             String local = null;
1283 
1284             try {
1285                 remote = String.valueOf(getRemoteAddress());
1286             } catch (Exception e) {
1287                 remote = "Cannot get the remote address informations: " + e.getMessage();
1288             }
1289 
1290             try {
1291                 local = String.valueOf(getLocalAddress());
1292             } catch (Exception e) {
1293             }
1294 
1295             if (getService() instanceof IoAcceptor) {
1296                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1297             }
1298 
1299             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1300         }
1301 
1302         return "(" + getIdAsString() + ") Session disconnected ...";
1303     }
1304 
1305     /**
1306      * Get the Id as a String
1307      */
1308     private String getIdAsString() {
1309         String id = Long.toHexString(getId()).toUpperCase();
1310         
1311         if (id.length() <= 8) {
1312             return "0x00000000".substring(0, 10 - id.length()) + id;
1313         } else {
1314             return "0x" + id;
1315         }
1316     }
1317 
1318     /**
1319      * TGet the Service name
1320      */
1321     private String getServiceName() {
1322         TransportMetadata tm = getTransportMetadata();
1323         if (tm == null) {
1324             return "null";
1325         }
1326 
1327         return tm.getProviderName() + ' ' + tm.getName();
1328     }
1329 
1330     /**
1331      * {@inheritDoc}
1332      */
1333     public IoService getService() {
1334         return service;
1335     }
1336 
1337     /**
1338      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1339      * in the specified collection.
1340      * 
1341      * @param sessions The sessions that are notified
1342      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1343      */
1344     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1345         while (sessions.hasNext()) {
1346             IoSession session = sessions.next();
1347             
1348             if (!session.getCloseFuture().isClosed()) {
1349                 notifyIdleSession(session, currentTime);
1350             }
1351         }
1352     }
1353 
1354     /**
1355      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1356      * specified {@code session}.
1357      * 
1358      * @param session The session that is notified
1359      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1360      */
1361     public static void notifyIdleSession(IoSession session, long currentTime) {
1362         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1363                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1364 
1365         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1366                 IdleStatus.READER_IDLE,
1367                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1368 
1369         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1370                 IdleStatus.WRITER_IDLE,
1371                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1372 
1373         notifyWriteTimeout(session, currentTime);
1374     }
1375 
1376     private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1377             long lastIoTime) {
1378         if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1379             session.getFilterChain().fireSessionIdle(status);
1380         }
1381     }
1382 
1383     private static void notifyWriteTimeout(IoSession session, long currentTime) {
1384 
1385         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1386         if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1387                 && !session.getWriteRequestQueue().isEmpty(session)) {
1388             WriteRequest request = session.getCurrentWriteRequest();
1389             if (request != null) {
1390                 session.setCurrentWriteRequest(null);
1391                 WriteTimeoutException cause = new WriteTimeoutException(request);
1392                 request.getFuture().setException(cause);
1393                 session.getFilterChain().fireExceptionCaught(cause);
1394                 // WriteException is an IOException, so we close the session.
1395                 session.close(true);
1396             }
1397         }
1398     }
1399 
1400     /**
1401      * A queue which handles the CLOSE request.
1402      * 
1403      * TODO : Check that when closing a session, all the pending requests are
1404      * correctly sent.
1405      */
1406     private class CloseAwareWriteQueue implements WriteRequestQueue {
1407 
1408         private final WriteRequestQueue queue;
1409 
1410         /**
1411          * {@inheritDoc}
1412          */
1413         public CloseAwareWriteQueue(WriteRequestQueue queue) {
1414             this.queue = queue;
1415         }
1416 
1417         /**
1418          * {@inheritDoc}
1419          */
1420         public synchronized WriteRequest poll(IoSession session) {
1421             WriteRequest answer = queue.poll(session);
1422 
1423             if (answer == CLOSE_REQUEST) {
1424                 AbstractIoSession.this.close( true );
1425                 dispose(session);
1426                 answer = null;
1427             }
1428 
1429             return answer;
1430         }
1431 
1432         /**
1433          * {@inheritDoc}
1434          */
1435         public void offer(IoSession session, WriteRequest e) {
1436             queue.offer(session, e);
1437         }
1438 
1439         /**
1440          * {@inheritDoc}
1441          */
1442         public boolean isEmpty(IoSession session) {
1443             return queue.isEmpty(session);
1444         }
1445 
1446         /**
1447          * {@inheritDoc}
1448          */
1449         public void clear(IoSession session) {
1450             queue.clear(session);
1451         }
1452 
1453         /**
1454          * {@inheritDoc}
1455          */
1456         public void dispose(IoSession session) {
1457             queue.dispose(session);
1458         }
1459 
1460         /**
1461          * {@inheritDoc}
1462          */
1463         public int size() {
1464             return queue.size();
1465         }
1466     }
1467 }