1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common.support;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.mina.common.CloseFuture;
32 import org.apache.mina.common.IdleStatus;
33 import org.apache.mina.common.IoFuture;
34 import org.apache.mina.common.IoFutureListener;
35 import org.apache.mina.common.IoService;
36 import org.apache.mina.common.IoSession;
37 import org.apache.mina.common.TrafficMask;
38 import org.apache.mina.common.WriteFuture;
39 import org.apache.mina.common.IoFilter.WriteRequest;
40
41
42
43
44
45
46
47 public abstract class BaseIoSession implements IoSession {
48
49 private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
50 new IoFutureListener() {
51 public void operationComplete(IoFuture future) {
52 BaseIoSession s = (BaseIoSession) future.getSession();
53 s.scheduledWriteBytes.set(0);
54 s.scheduledWriteRequests.set(0);
55 }
56 };
57
58 private final Object lock = new Object();
59
60 private final Map<String, Object> attributes = Collections
61 .synchronizedMap(new HashMap<String, Object>(8));
62
63 private final long creationTime;
64
65
66
67
68 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
69
70 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
71
72 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
73
74 private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
75
76 private volatile boolean closing;
77
78
79 private int idleTimeForRead;
80
81 private int idleTimeForWrite;
82
83 private int idleTimeForBoth;
84
85 private int writeTimeout;
86
87 private TrafficMask trafficMask = TrafficMask.ALL;
88
89
90 private long readBytes;
91
92 private long writtenBytes;
93
94 private long readMessages;
95
96 private long writtenMessages;
97
98 private long lastReadTime;
99
100 private long lastWriteTime;
101
102 private int idleCountForBoth;
103
104 private int idleCountForRead;
105
106 private int idleCountForWrite;
107
108 private long lastIdleTimeForBoth;
109
110 private long lastIdleTimeForRead;
111
112 private long lastIdleTimeForWrite;
113
114 protected BaseIoSession() {
115 creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
116 .currentTimeMillis();
117 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
118 }
119
120 public boolean isConnected() {
121 return !closeFuture.isClosed();
122 }
123
124 public boolean isClosing() {
125 return closing || closeFuture.isClosed();
126 }
127
128 public CloseFuture getCloseFuture() {
129 return closeFuture;
130 }
131
132 public boolean isScheduledForFlush() {
133 return scheduledForFlush.get();
134 }
135
136 public boolean setScheduledForFlush(boolean flag) {
137 if (flag) {
138 return scheduledForFlush.compareAndSet(false, true);
139 } else {
140 scheduledForFlush.set(false);
141 return true;
142 }
143 }
144
145 public CloseFuture close() {
146 synchronized (lock) {
147 if (isClosing()) {
148 return closeFuture;
149 } else {
150 closing = true;
151 }
152 }
153
154 close0();
155 return closeFuture;
156 }
157
158
159
160
161
162
163 protected void close0() {
164 closeFuture.setClosed();
165 }
166
167 public WriteFuture write(Object message) {
168 return write(message, null);
169 }
170
171 public WriteFuture write(Object message, SocketAddress remoteAddress) {
172 if (isClosing() ) {
173 return DefaultWriteFuture.newNotWrittenFuture(this);
174 }
175
176 WriteFuture future = new DefaultWriteFuture(this);
177 write0(new WriteRequest(message, future, remoteAddress));
178
179 return future;
180 }
181
182
183
184
185
186
187
188
189
190
191 protected void write0(WriteRequest writeRequest) {
192 writeRequest.getFuture().setWritten(false);
193 }
194
195 public Object getAttachment() {
196 return getAttribute("");
197 }
198
199 public Object setAttachment(Object attachment) {
200 return setAttribute("", attachment);
201 }
202
203 public Object getAttribute(String key) {
204 return attributes.get(key);
205 }
206
207 public Object setAttribute(String key, Object value) {
208 if (value == null) {
209 return removeAttribute(key);
210 } else {
211 return attributes.put(key, value);
212 }
213 }
214
215 public Object setAttribute(String key) {
216 return setAttribute(key, Boolean.TRUE);
217 }
218
219 public Object removeAttribute(String key) {
220 return attributes.remove(key);
221 }
222
223 public boolean containsAttribute(String key) {
224 return getAttribute(key) != null;
225 }
226
227 public Set<String> getAttributeKeys() {
228 synchronized (attributes) {
229 return new HashSet<String>(attributes.keySet());
230 }
231 }
232
233 public int getIdleTime(IdleStatus status) {
234 if (status == IdleStatus.BOTH_IDLE)
235 return idleTimeForBoth;
236
237 if (status == IdleStatus.READER_IDLE)
238 return idleTimeForRead;
239
240 if (status == IdleStatus.WRITER_IDLE)
241 return idleTimeForWrite;
242
243 throw new IllegalArgumentException("Unknown idle status: " + status);
244 }
245
246 public long getIdleTimeInMillis(IdleStatus status) {
247 return getIdleTime(status) * 1000L;
248 }
249
250 public void setIdleTime(IdleStatus status, int idleTime) {
251 if (idleTime < 0)
252 throw new IllegalArgumentException("Illegal idle time: " + idleTime);
253
254 if (status == IdleStatus.BOTH_IDLE)
255 idleTimeForBoth = idleTime;
256 else if (status == IdleStatus.READER_IDLE)
257 idleTimeForRead = idleTime;
258 else if (status == IdleStatus.WRITER_IDLE)
259 idleTimeForWrite = idleTime;
260 else
261 throw new IllegalArgumentException("Unknown idle status: " + status);
262 }
263
264 public int getWriteTimeout() {
265 return writeTimeout;
266 }
267
268 public long getWriteTimeoutInMillis() {
269 return writeTimeout * 1000L;
270 }
271
272 public void setWriteTimeout(int writeTimeout) {
273 if (writeTimeout < 0)
274 throw new IllegalArgumentException("Illegal write timeout: "
275 + writeTimeout);
276 this.writeTimeout = writeTimeout;
277 }
278
279 public TrafficMask getTrafficMask() {
280 return trafficMask;
281 }
282
283 public void setTrafficMask(TrafficMask trafficMask) {
284 if (trafficMask == null) {
285 throw new NullPointerException("trafficMask");
286 }
287
288 if (this.trafficMask == trafficMask) {
289 return;
290 }
291
292 this.trafficMask = trafficMask;
293 updateTrafficMask();
294 }
295
296 public void suspendRead() {
297 setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
298 }
299
300 public void suspendWrite() {
301 setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
302 }
303
304 public void resumeRead() {
305 setTrafficMask(getTrafficMask().or(TrafficMask.READ));
306 }
307
308 public void resumeWrite() {
309 setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
310 }
311
312
313
314
315
316 protected abstract void updateTrafficMask();
317
318 public long getReadBytes() {
319 return readBytes;
320 }
321
322 public long getWrittenBytes() {
323 return writtenBytes;
324 }
325
326 public long getWrittenWriteRequests() {
327 return writtenMessages;
328 }
329
330 public long getReadMessages() {
331 return readMessages;
332 }
333
334 public long getWrittenMessages() {
335 return writtenMessages;
336 }
337
338 public int getScheduledWriteBytes() {
339 return scheduledWriteBytes.get();
340 }
341
342 public int getScheduledWriteRequests() {
343 return scheduledWriteRequests.get();
344 }
345
346 public void increaseReadBytes(int increment) {
347 if (increment > 0) {
348 readBytes += increment;
349 lastReadTime = System.currentTimeMillis();
350 idleCountForBoth = 0;
351 idleCountForRead = 0;
352 }
353 }
354
355 public void increaseWrittenBytes(int increment) {
356 if (increment > 0) {
357 writtenBytes += increment;
358 lastWriteTime = System.currentTimeMillis();
359 idleCountForBoth = 0;
360 idleCountForWrite = 0;
361
362 scheduledWriteBytes.addAndGet(-increment);
363 }
364 }
365
366 public void increaseReadMessages() {
367 readMessages++;
368 lastReadTime = System.currentTimeMillis();
369 }
370
371 public void increaseWrittenMessages() {
372 writtenMessages++;
373 lastWriteTime = System.currentTimeMillis();
374 scheduledWriteRequests.decrementAndGet();
375 }
376
377 public void increaseScheduledWriteBytes(int increment) {
378 scheduledWriteBytes.addAndGet(increment);
379 }
380
381 public void increaseScheduledWriteRequests() {
382 scheduledWriteRequests.incrementAndGet();
383 }
384
385 public long getCreationTime() {
386 return creationTime;
387 }
388
389 public long getLastIoTime() {
390 return Math.max(lastReadTime, lastWriteTime);
391 }
392
393 public long getLastReadTime() {
394 return lastReadTime;
395 }
396
397 public long getLastWriteTime() {
398 return lastWriteTime;
399 }
400
401 public boolean isIdle(IdleStatus status) {
402 if (status == IdleStatus.BOTH_IDLE)
403 return idleCountForBoth > 0;
404
405 if (status == IdleStatus.READER_IDLE)
406 return idleCountForRead > 0;
407
408 if (status == IdleStatus.WRITER_IDLE)
409 return idleCountForWrite > 0;
410
411 throw new IllegalArgumentException("Unknown idle status: " + status);
412 }
413
414 public int getIdleCount(IdleStatus status) {
415 if (status == IdleStatus.BOTH_IDLE)
416 return idleCountForBoth;
417
418 if (status == IdleStatus.READER_IDLE)
419 return idleCountForRead;
420
421 if (status == IdleStatus.WRITER_IDLE)
422 return idleCountForWrite;
423
424 throw new IllegalArgumentException("Unknown idle status: " + status);
425 }
426
427 public long getLastIdleTime(IdleStatus status) {
428 if (status == IdleStatus.BOTH_IDLE)
429 return lastIdleTimeForBoth;
430
431 if (status == IdleStatus.READER_IDLE)
432 return lastIdleTimeForRead;
433
434 if (status == IdleStatus.WRITER_IDLE)
435 return lastIdleTimeForWrite;
436
437 throw new IllegalArgumentException("Unknown idle status: " + status);
438 }
439
440 public void increaseIdleCount(IdleStatus status) {
441 if (status == IdleStatus.BOTH_IDLE) {
442 idleCountForBoth++;
443 lastIdleTimeForBoth = System.currentTimeMillis();
444 } else if (status == IdleStatus.READER_IDLE) {
445 idleCountForRead++;
446 lastIdleTimeForRead = System.currentTimeMillis();
447 } else if (status == IdleStatus.WRITER_IDLE) {
448 idleCountForWrite++;
449 lastIdleTimeForWrite = System.currentTimeMillis();
450 } else
451 throw new IllegalArgumentException("Unknown idle status: " + status);
452 }
453
454 @Override
455 public String toString() {
456 return "(" + getTransportType() + ", R: " + getRemoteAddress()
457 + ", L: " + getLocalAddress() + ", S: " + getServiceAddress()
458 + ')';
459 }
460 }