1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.DefaultFileRegion;
37 import org.apache.mina.core.file.FilenameFileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.future.CloseFuture;
40 import org.apache.mina.core.future.DefaultCloseFuture;
41 import org.apache.mina.core.future.DefaultReadFuture;
42 import org.apache.mina.core.future.DefaultWriteFuture;
43 import org.apache.mina.core.future.IoFutureListener;
44 import org.apache.mina.core.future.ReadFuture;
45 import org.apache.mina.core.future.WriteFuture;
46 import org.apache.mina.core.service.AbstractIoService;
47 import org.apache.mina.core.service.IoAcceptor;
48 import org.apache.mina.core.service.IoHandler;
49 import org.apache.mina.core.service.IoProcessor;
50 import org.apache.mina.core.service.IoService;
51 import org.apache.mina.core.service.TransportMetadata;
52 import org.apache.mina.core.write.DefaultWriteRequest;
53 import org.apache.mina.core.write.WriteException;
54 import org.apache.mina.core.write.WriteRequest;
55 import org.apache.mina.core.write.WriteRequestQueue;
56 import org.apache.mina.core.write.WriteTimeoutException;
57 import org.apache.mina.core.write.WriteToClosedSessionException;
58 import org.apache.mina.util.ExceptionMonitor;
59
60
61
62
63
64
65 public abstract class AbstractIoSession implements IoSession {
66
67 private final IoHandler handler;
68
69
70 protected IoSessionConfig config;
71
72
73 private final IoService service;
74
75 private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76 "readyReadFutures");
77
78 private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79 "waitingReadFutures");
80
81 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82 public void operationComplete(CloseFuture future) {
83 AbstractIoSession session = (AbstractIoSession) future.getSession();
84 session.scheduledWriteBytes.set(0);
85 session.scheduledWriteMessages.set(0);
86 session.readBytesThroughput = 0;
87 session.readMessagesThroughput = 0;
88 session.writtenBytesThroughput = 0;
89 session.writtenMessagesThroughput = 0;
90 }
91 };
92
93
94
95
96
97
98 private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
99
100 private final Object lock = new Object();
101
102 private IoSessionAttributeMap attributes;
103
104 private WriteRequestQueue writeRequestQueue;
105
106 private WriteRequest currentWriteRequest;
107
108
109 private final long creationTime;
110
111
112 private static AtomicLong idGenerator = new AtomicLong(0);
113
114
115 private long sessionId;
116
117
118
119
120 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
121
122 private volatile boolean closing;
123
124
125 private boolean readSuspended = false;
126
127 private boolean writeSuspended = false;
128
129
130 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
131
132 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
133
134 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
135
136 private long readBytes;
137
138 private long writtenBytes;
139
140 private long readMessages;
141
142 private long writtenMessages;
143
144 private long lastReadTime;
145
146 private long lastWriteTime;
147
148 private long lastThroughputCalculationTime;
149
150 private long lastReadBytes;
151
152 private long lastWrittenBytes;
153
154 private long lastReadMessages;
155
156 private long lastWrittenMessages;
157
158 private double readBytesThroughput;
159
160 private double writtenBytesThroughput;
161
162 private double readMessagesThroughput;
163
164 private double writtenMessagesThroughput;
165
166 private AtomicInteger idleCountForBoth = new AtomicInteger();
167
168 private AtomicInteger idleCountForRead = new AtomicInteger();
169
170 private AtomicInteger idleCountForWrite = new AtomicInteger();
171
172 private long lastIdleTimeForBoth;
173
174 private long lastIdleTimeForRead;
175
176 private long lastIdleTimeForWrite;
177
178 private boolean deferDecreaseReadBuffer = true;
179
180
181
182
183
184
185 protected AbstractIoSession(IoService service) {
186 this.service = service;
187 this.handler = service.getHandler();
188
189
190 long currentTime = System.currentTimeMillis();
191 creationTime = currentTime;
192 lastThroughputCalculationTime = currentTime;
193 lastReadTime = currentTime;
194 lastWriteTime = currentTime;
195 lastIdleTimeForBoth = currentTime;
196 lastIdleTimeForRead = currentTime;
197 lastIdleTimeForWrite = currentTime;
198
199
200 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
201
202
203 sessionId = idGenerator.incrementAndGet();
204 }
205
206
207
208
209
210
211 public final long getId() {
212 return sessionId;
213 }
214
215
216
217
218 public abstract IoProcessor getProcessor();
219
220
221
222
223 public final boolean isConnected() {
224 return !closeFuture.isClosed();
225 }
226
227
228
229
230 public boolean isActive() {
231
232 return true;
233 }
234
235
236
237
238 public final boolean isClosing() {
239 return closing || closeFuture.isClosed();
240 }
241
242
243
244
245 public boolean isSecured() {
246
247 return false;
248 }
249
250
251
252
253 public final CloseFuture getCloseFuture() {
254 return closeFuture;
255 }
256
257
258
259
260
261
262 public final boolean isScheduledForFlush() {
263 return scheduledForFlush.get();
264 }
265
266
267
268
269 public final void scheduledForFlush() {
270 scheduledForFlush.set(true);
271 }
272
273
274
275
276 public final void unscheduledForFlush() {
277 scheduledForFlush.set(false);
278 }
279
280
281
282
283
284
285
286
287
288
289 public final boolean setScheduledForFlush(boolean schedule) {
290 if (schedule) {
291
292
293
294 return scheduledForFlush.compareAndSet(false, schedule);
295 }
296
297 scheduledForFlush.set(schedule);
298 return true;
299 }
300
301
302
303
304 public final CloseFuture close(boolean rightNow) {
305 if (rightNow) {
306 return closeNow();
307 } else {
308 return closeOnFlush();
309 }
310 }
311
312
313
314
315 public final CloseFuture close() {
316 try {
317 closeNow();
318 } finally {
319 return closeFuture;
320 }
321 }
322
323
324
325
326 public final CloseFuture closeOnFlush() {
327 if (!isClosing()) {
328 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
329 getProcessor().flush(this);
330 return closeFuture;
331 } else {
332 return closeFuture;
333 }
334 }
335
336
337
338
339 public final CloseFuture closeNow() {
340 synchronized (lock) {
341 if (isClosing()) {
342 return closeFuture;
343 }
344
345 closing = true;
346 }
347
348 getFilterChain().fireFilterClose();
349
350 return closeFuture;
351 }
352
353
354
355
356 public IoHandler getHandler() {
357 return handler;
358 }
359
360
361
362
363 public IoSessionConfig getConfig() {
364 return config;
365 }
366
367
368
369
370 public final ReadFuture read() {
371 if (!getConfig().isUseReadOperation()) {
372 throw new IllegalStateException("useReadOperation is not enabled.");
373 }
374
375 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
376 ReadFuture future;
377 synchronized (readyReadFutures) {
378 future = readyReadFutures.poll();
379 if (future != null) {
380 if (future.isClosed()) {
381
382 readyReadFutures.offer(future);
383 }
384 } else {
385 future = new DefaultReadFuture(this);
386 getWaitingReadFutures().offer(future);
387 }
388 }
389
390 return future;
391 }
392
393
394
395
396
397
398
399 public final void offerReadFuture(Object message) {
400 newReadFuture().setRead(message);
401 }
402
403
404
405
406
407
408 public final void offerFailedReadFuture(Throwable exception) {
409 newReadFuture().setException(exception);
410 }
411
412
413
414
415 public final void offerClosedReadFuture() {
416 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
417
418 synchronized (readyReadFutures) {
419 newReadFuture().setClosed();
420 }
421 }
422
423
424
425
426 private ReadFuture newReadFuture() {
427 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
428 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
429 ReadFuture future;
430
431 synchronized (readyReadFutures) {
432 future = waitingReadFutures.poll();
433
434 if (future == null) {
435 future = new DefaultReadFuture(this);
436 readyReadFutures.offer(future);
437 }
438 }
439
440 return future;
441 }
442
443
444
445
446 private Queue<ReadFuture> getReadyReadFutures() {
447 Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
448
449 if (readyReadFutures == null) {
450 readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
451
452 Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
453 readyReadFutures);
454
455 if (oldReadyReadFutures != null) {
456 readyReadFutures = oldReadyReadFutures;
457 }
458 }
459
460 return readyReadFutures;
461 }
462
463
464
465
466 private Queue<ReadFuture> getWaitingReadFutures() {
467 Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
468
469 if (waitingReadyReadFutures == null) {
470 waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
471
472 Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
473 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
474
475 if (oldWaitingReadyReadFutures != null) {
476 waitingReadyReadFutures = oldWaitingReadyReadFutures;
477 }
478 }
479
480 return waitingReadyReadFutures;
481 }
482
483
484
485
486 public WriteFuture write(Object message) {
487 return write(message, null);
488 }
489
490
491
492
493 public WriteFuture write(Object message, SocketAddress remoteAddress) {
494 if (message == null) {
495 throw new IllegalArgumentException("Trying to write a null message : not allowed");
496 }
497
498
499
500 if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
501 throw new UnsupportedOperationException();
502 }
503
504
505
506
507 if (isClosing() || !isConnected()) {
508 WriteFuture future = new DefaultWriteFuture(this);
509 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
510 WriteException writeException = new WriteToClosedSessionException(request);
511 future.setException(writeException);
512 return future;
513 }
514
515 FileChannel openedFileChannel = null;
516
517
518
519 try {
520 if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
521
522 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
523 } else if (message instanceof FileChannel) {
524 FileChannel fileChannel = (FileChannel) message;
525 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
526 } else if (message instanceof File) {
527 File file = (File) message;
528 openedFileChannel = new FileInputStream(file).getChannel();
529 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
530 }
531 } catch (IOException e) {
532 ExceptionMonitor.getInstance().exceptionCaught(e);
533 return DefaultWriteFuture.newNotWrittenFuture(this, e);
534 }
535
536
537 WriteFuture writeFuture = new DefaultWriteFuture(this);
538 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
539
540
541 IoFilterChain filterChain = getFilterChain();
542 filterChain.fireFilterWrite(writeRequest);
543
544
545
546
547 if (openedFileChannel != null) {
548
549
550 final FileChannel finalChannel = openedFileChannel;
551 writeFuture.addListener(new IoFutureListener<WriteFuture>() {
552 public void operationComplete(WriteFuture future) {
553 try {
554 finalChannel.close();
555 } catch (IOException e) {
556 ExceptionMonitor.getInstance().exceptionCaught(e);
557 }
558 }
559 });
560 }
561
562
563 return writeFuture;
564 }
565
566
567
568
569 public final Object getAttachment() {
570 return getAttribute("");
571 }
572
573
574
575
576 public final Object setAttachment(Object attachment) {
577 return setAttribute("", attachment);
578 }
579
580
581
582
583 public final Object getAttribute(Object key) {
584 return getAttribute(key, null);
585 }
586
587
588
589
590 public final Object getAttribute(Object key, Object defaultValue) {
591 return attributes.getAttribute(this, key, defaultValue);
592 }
593
594
595
596
597 public final Object setAttribute(Object key, Object value) {
598 return attributes.setAttribute(this, key, value);
599 }
600
601
602
603
604 public final Object setAttribute(Object key) {
605 return setAttribute(key, Boolean.TRUE);
606 }
607
608
609
610
611 public final Object setAttributeIfAbsent(Object key, Object value) {
612 return attributes.setAttributeIfAbsent(this, key, value);
613 }
614
615
616
617
618 public final Object setAttributeIfAbsent(Object key) {
619 return setAttributeIfAbsent(key, Boolean.TRUE);
620 }
621
622
623
624
625 public final Object removeAttribute(Object key) {
626 return attributes.removeAttribute(this, key);
627 }
628
629
630
631
632 public final boolean removeAttribute(Object key, Object value) {
633 return attributes.removeAttribute(this, key, value);
634 }
635
636
637
638
639 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
640 return attributes.replaceAttribute(this, key, oldValue, newValue);
641 }
642
643
644
645
646 public final boolean containsAttribute(Object key) {
647 return attributes.containsAttribute(this, key);
648 }
649
650
651
652
653 public final Set<Object> getAttributeKeys() {
654 return attributes.getAttributeKeys(this);
655 }
656
657
658
659
660 public final IoSessionAttributeMap getAttributeMap() {
661 return attributes;
662 }
663
664
665
666
667
668
669 public final void setAttributeMap(IoSessionAttributeMap attributes) {
670 this.attributes = attributes;
671 }
672
673
674
675
676
677
678 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
679 this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue);
680 }
681
682
683
684
685 public final void suspendRead() {
686 readSuspended = true;
687 if (isClosing() || !isConnected()) {
688 return;
689 }
690 getProcessor().updateTrafficControl(this);
691 }
692
693
694
695
696 public final void suspendWrite() {
697 writeSuspended = true;
698 if (isClosing() || !isConnected()) {
699 return;
700 }
701 getProcessor().updateTrafficControl(this);
702 }
703
704
705
706
707 @SuppressWarnings("unchecked")
708 public final void resumeRead() {
709 readSuspended = false;
710 if (isClosing() || !isConnected()) {
711 return;
712 }
713 getProcessor().updateTrafficControl(this);
714 }
715
716
717
718
719 @SuppressWarnings("unchecked")
720 public final void resumeWrite() {
721 writeSuspended = false;
722 if (isClosing() || !isConnected()) {
723 return;
724 }
725 getProcessor().updateTrafficControl(this);
726 }
727
728
729
730
731 public boolean isReadSuspended() {
732 return readSuspended;
733 }
734
735
736
737
738 public boolean isWriteSuspended() {
739 return writeSuspended;
740 }
741
742
743
744
745 public final long getReadBytes() {
746 return readBytes;
747 }
748
749
750
751
752 public final long getWrittenBytes() {
753 return writtenBytes;
754 }
755
756
757
758
759 public final long getReadMessages() {
760 return readMessages;
761 }
762
763
764
765
766 public final long getWrittenMessages() {
767 return writtenMessages;
768 }
769
770
771
772
773 public final double getReadBytesThroughput() {
774 return readBytesThroughput;
775 }
776
777
778
779
780 public final double getWrittenBytesThroughput() {
781 return writtenBytesThroughput;
782 }
783
784
785
786
787 public final double getReadMessagesThroughput() {
788 return readMessagesThroughput;
789 }
790
791
792
793
794 public final double getWrittenMessagesThroughput() {
795 return writtenMessagesThroughput;
796 }
797
798
799
800
801 public final void updateThroughput(long currentTime, boolean force) {
802 int interval = (int) (currentTime - lastThroughputCalculationTime);
803
804 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
805
806 if (((minInterval == 0) || (interval < minInterval)) && !force) {
807 return;
808 }
809
810 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
811 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
812 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
813 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
814
815 lastReadBytes = readBytes;
816 lastWrittenBytes = writtenBytes;
817 lastReadMessages = readMessages;
818 lastWrittenMessages = writtenMessages;
819
820 lastThroughputCalculationTime = currentTime;
821 }
822
823
824
825
826 public final long getScheduledWriteBytes() {
827 return scheduledWriteBytes.get();
828 }
829
830
831
832
833 public final int getScheduledWriteMessages() {
834 return scheduledWriteMessages.get();
835 }
836
837
838
839
840
841
842 protected void setScheduledWriteBytes(int byteCount) {
843 scheduledWriteBytes.set(byteCount);
844 }
845
846
847
848
849
850
851 protected void setScheduledWriteMessages(int messages) {
852 scheduledWriteMessages.set(messages);
853 }
854
855
856
857
858
859
860
861 public final void increaseReadBytes(long increment, long currentTime) {
862 if (increment <= 0) {
863 return;
864 }
865
866 readBytes += increment;
867 lastReadTime = currentTime;
868 idleCountForBoth.set(0);
869 idleCountForRead.set(0);
870
871 if (getService() instanceof AbstractIoService) {
872 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
873 }
874 }
875
876
877
878
879
880
881 public final void increaseReadMessages(long currentTime) {
882 readMessages++;
883 lastReadTime = currentTime;
884 idleCountForBoth.set(0);
885 idleCountForRead.set(0);
886
887 if (getService() instanceof AbstractIoService) {
888 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
889 }
890 }
891
892
893
894
895
896
897
898 public final void increaseWrittenBytes(int increment, long currentTime) {
899 if (increment <= 0) {
900 return;
901 }
902
903 writtenBytes += increment;
904 lastWriteTime = currentTime;
905 idleCountForBoth.set(0);
906 idleCountForWrite.set(0);
907
908 if (getService() instanceof AbstractIoService) {
909 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
910 }
911
912 increaseScheduledWriteBytes(-increment);
913 }
914
915
916
917
918
919
920
921 public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
922 Object message = request.getMessage();
923
924 if (message instanceof IoBuffer) {
925 IoBuffer b = (IoBuffer) message;
926
927 if (b.hasRemaining()) {
928 return;
929 }
930 }
931
932 writtenMessages++;
933 lastWriteTime = currentTime;
934
935 if (getService() instanceof AbstractIoService) {
936 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
937 }
938
939 decreaseScheduledWriteMessages();
940 }
941
942
943
944
945
946
947 public final void increaseScheduledWriteBytes(int increment) {
948 scheduledWriteBytes.addAndGet(increment);
949 if (getService() instanceof AbstractIoService) {
950 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
951 }
952 }
953
954
955
956
957 public final void increaseScheduledWriteMessages() {
958 scheduledWriteMessages.incrementAndGet();
959
960 if (getService() instanceof AbstractIoService) {
961 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
962 }
963 }
964
965
966
967
968 private void decreaseScheduledWriteMessages() {
969 scheduledWriteMessages.decrementAndGet();
970 if (getService() instanceof AbstractIoService) {
971 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
972 }
973 }
974
975
976
977
978
979
980 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
981 Object message = request.getMessage();
982
983 if (message instanceof IoBuffer) {
984 IoBuffer b = (IoBuffer) message;
985
986 if (b.hasRemaining()) {
987 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
988 } else {
989 decreaseScheduledWriteMessages();
990 }
991 } else {
992 decreaseScheduledWriteMessages();
993 }
994 }
995
996
997
998
999 public final WriteRequestQueue getWriteRequestQueue() {
1000 if (writeRequestQueue == null) {
1001 throw new IllegalStateException();
1002 }
1003
1004 return writeRequestQueue;
1005 }
1006
1007
1008
1009
1010 public final WriteRequest getCurrentWriteRequest() {
1011 return currentWriteRequest;
1012 }
1013
1014
1015
1016
1017 public final Object getCurrentWriteMessage() {
1018 WriteRequest req = getCurrentWriteRequest();
1019
1020 if (req == null) {
1021 return null;
1022 }
1023 return req.getMessage();
1024 }
1025
1026
1027
1028
1029 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1030 this.currentWriteRequest = currentWriteRequest;
1031 }
1032
1033
1034
1035
1036 public final void increaseReadBufferSize() {
1037 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1038 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1039 getConfig().setReadBufferSize(newReadBufferSize);
1040 } else {
1041 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1042 }
1043
1044 deferDecreaseReadBuffer = true;
1045 }
1046
1047
1048
1049
1050 public final void decreaseReadBufferSize() {
1051 if (deferDecreaseReadBuffer) {
1052 deferDecreaseReadBuffer = false;
1053 return;
1054 }
1055
1056 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1057 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1058 }
1059
1060 deferDecreaseReadBuffer = true;
1061 }
1062
1063
1064
1065
1066 public final long getCreationTime() {
1067 return creationTime;
1068 }
1069
1070
1071
1072
1073 public final long getLastIoTime() {
1074 return Math.max(lastReadTime, lastWriteTime);
1075 }
1076
1077
1078
1079
1080 public final long getLastReadTime() {
1081 return lastReadTime;
1082 }
1083
1084
1085
1086
1087 public final long getLastWriteTime() {
1088 return lastWriteTime;
1089 }
1090
1091
1092
1093
1094 public final boolean isIdle(IdleStatus status) {
1095 if (status == IdleStatus.BOTH_IDLE) {
1096 return idleCountForBoth.get() > 0;
1097 }
1098
1099 if (status == IdleStatus.READER_IDLE) {
1100 return idleCountForRead.get() > 0;
1101 }
1102
1103 if (status == IdleStatus.WRITER_IDLE) {
1104 return idleCountForWrite.get() > 0;
1105 }
1106
1107 throw new IllegalArgumentException("Unknown idle status: " + status);
1108 }
1109
1110
1111
1112
1113 public final boolean isBothIdle() {
1114 return isIdle(IdleStatus.BOTH_IDLE);
1115 }
1116
1117
1118
1119
1120 public final boolean isReaderIdle() {
1121 return isIdle(IdleStatus.READER_IDLE);
1122 }
1123
1124
1125
1126
1127 public final boolean isWriterIdle() {
1128 return isIdle(IdleStatus.WRITER_IDLE);
1129 }
1130
1131
1132
1133
1134 public final int getIdleCount(IdleStatus status) {
1135 if (getConfig().getIdleTime(status) == 0) {
1136 if (status == IdleStatus.BOTH_IDLE) {
1137 idleCountForBoth.set(0);
1138 }
1139
1140 if (status == IdleStatus.READER_IDLE) {
1141 idleCountForRead.set(0);
1142 }
1143
1144 if (status == IdleStatus.WRITER_IDLE) {
1145 idleCountForWrite.set(0);
1146 }
1147 }
1148
1149 if (status == IdleStatus.BOTH_IDLE) {
1150 return idleCountForBoth.get();
1151 }
1152
1153 if (status == IdleStatus.READER_IDLE) {
1154 return idleCountForRead.get();
1155 }
1156
1157 if (status == IdleStatus.WRITER_IDLE) {
1158 return idleCountForWrite.get();
1159 }
1160
1161 throw new IllegalArgumentException("Unknown idle status: " + status);
1162 }
1163
1164
1165
1166
1167 public final long getLastIdleTime(IdleStatus status) {
1168 if (status == IdleStatus.BOTH_IDLE) {
1169 return lastIdleTimeForBoth;
1170 }
1171
1172 if (status == IdleStatus.READER_IDLE) {
1173 return lastIdleTimeForRead;
1174 }
1175
1176 if (status == IdleStatus.WRITER_IDLE) {
1177 return lastIdleTimeForWrite;
1178 }
1179
1180 throw new IllegalArgumentException("Unknown idle status: " + status);
1181 }
1182
1183
1184
1185
1186
1187
1188
1189 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1190 if (status == IdleStatus.BOTH_IDLE) {
1191 idleCountForBoth.incrementAndGet();
1192 lastIdleTimeForBoth = currentTime;
1193 } else if (status == IdleStatus.READER_IDLE) {
1194 idleCountForRead.incrementAndGet();
1195 lastIdleTimeForRead = currentTime;
1196 } else if (status == IdleStatus.WRITER_IDLE) {
1197 idleCountForWrite.incrementAndGet();
1198 lastIdleTimeForWrite = currentTime;
1199 } else {
1200 throw new IllegalArgumentException("Unknown idle status: " + status);
1201 }
1202 }
1203
1204
1205
1206
1207 public final int getBothIdleCount() {
1208 return getIdleCount(IdleStatus.BOTH_IDLE);
1209 }
1210
1211
1212
1213
1214 public final long getLastBothIdleTime() {
1215 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1216 }
1217
1218
1219
1220
1221 public final long getLastReaderIdleTime() {
1222 return getLastIdleTime(IdleStatus.READER_IDLE);
1223 }
1224
1225
1226
1227
1228 public final long getLastWriterIdleTime() {
1229 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1230 }
1231
1232
1233
1234
1235 public final int getReaderIdleCount() {
1236 return getIdleCount(IdleStatus.READER_IDLE);
1237 }
1238
1239
1240
1241
1242 public final int getWriterIdleCount() {
1243 return getIdleCount(IdleStatus.WRITER_IDLE);
1244 }
1245
1246
1247
1248
1249 public SocketAddress getServiceAddress() {
1250 IoService service = getService();
1251 if (service instanceof IoAcceptor) {
1252 return ((IoAcceptor) service).getLocalAddress();
1253 }
1254
1255 return getRemoteAddress();
1256 }
1257
1258
1259
1260
1261 @Override
1262 public final int hashCode() {
1263 return super.hashCode();
1264 }
1265
1266
1267
1268
1269
1270 @Override
1271 public final boolean equals(Object o) {
1272 return super.equals(o);
1273 }
1274
1275
1276
1277
1278 @Override
1279 public String toString() {
1280 if (isConnected() || isClosing()) {
1281 String remote = null;
1282 String local = null;
1283
1284 try {
1285 remote = String.valueOf(getRemoteAddress());
1286 } catch (Exception e) {
1287 remote = "Cannot get the remote address informations: " + e.getMessage();
1288 }
1289
1290 try {
1291 local = String.valueOf(getLocalAddress());
1292 } catch (Exception e) {
1293 }
1294
1295 if (getService() instanceof IoAcceptor) {
1296 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1297 }
1298
1299 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1300 }
1301
1302 return "(" + getIdAsString() + ") Session disconnected ...";
1303 }
1304
1305
1306
1307
1308 private String getIdAsString() {
1309 String id = Long.toHexString(getId()).toUpperCase();
1310
1311 if (id.length() <= 8) {
1312 return "0x00000000".substring(0, 10 - id.length()) + id;
1313 } else {
1314 return "0x" + id;
1315 }
1316 }
1317
1318
1319
1320
1321 private String getServiceName() {
1322 TransportMetadata tm = getTransportMetadata();
1323 if (tm == null) {
1324 return "null";
1325 }
1326
1327 return tm.getProviderName() + ' ' + tm.getName();
1328 }
1329
1330
1331
1332
1333 public IoService getService() {
1334 return service;
1335 }
1336
1337
1338
1339
1340
1341
1342
1343
1344 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1345 while (sessions.hasNext()) {
1346 IoSession session = sessions.next();
1347
1348 if (!session.getCloseFuture().isClosed()) {
1349 notifyIdleSession(session, currentTime);
1350 }
1351 }
1352 }
1353
1354
1355
1356
1357
1358
1359
1360
1361 public static void notifyIdleSession(IoSession session, long currentTime) {
1362 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1363 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1364
1365 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1366 IdleStatus.READER_IDLE,
1367 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1368
1369 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1370 IdleStatus.WRITER_IDLE,
1371 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1372
1373 notifyWriteTimeout(session, currentTime);
1374 }
1375
1376 private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1377 long lastIoTime) {
1378 if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1379 session.getFilterChain().fireSessionIdle(status);
1380 }
1381 }
1382
1383 private static void notifyWriteTimeout(IoSession session, long currentTime) {
1384
1385 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1386 if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1387 && !session.getWriteRequestQueue().isEmpty(session)) {
1388 WriteRequest request = session.getCurrentWriteRequest();
1389 if (request != null) {
1390 session.setCurrentWriteRequest(null);
1391 WriteTimeoutException cause = new WriteTimeoutException(request);
1392 request.getFuture().setException(cause);
1393 session.getFilterChain().fireExceptionCaught(cause);
1394
1395 session.close(true);
1396 }
1397 }
1398 }
1399
1400
1401
1402
1403
1404
1405
1406 private class CloseAwareWriteQueue implements WriteRequestQueue {
1407
1408 private final WriteRequestQueue queue;
1409
1410
1411
1412
1413 public CloseAwareWriteQueue(WriteRequestQueue queue) {
1414 this.queue = queue;
1415 }
1416
1417
1418
1419
1420 public synchronized WriteRequest poll(IoSession session) {
1421 WriteRequest answer = queue.poll(session);
1422
1423 if (answer == CLOSE_REQUEST) {
1424 AbstractIoSession.this.close( true );
1425 dispose(session);
1426 answer = null;
1427 }
1428
1429 return answer;
1430 }
1431
1432
1433
1434
1435 public void offer(IoSession session, WriteRequest e) {
1436 queue.offer(session, e);
1437 }
1438
1439
1440
1441
1442 public boolean isEmpty(IoSession session) {
1443 return queue.isEmpty(session);
1444 }
1445
1446
1447
1448
1449 public void clear(IoSession session) {
1450 queue.clear(session);
1451 }
1452
1453
1454
1455
1456 public void dispose(IoSession session) {
1457 queue.dispose(session);
1458 }
1459
1460
1461
1462
1463 public int size() {
1464 return queue.size();
1465 }
1466 }
1467 }