查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.common.support;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import org.apache.mina.common.CloseFuture;
32  import org.apache.mina.common.IdleStatus;
33  import org.apache.mina.common.IoFuture;
34  import org.apache.mina.common.IoFutureListener;
35  import org.apache.mina.common.IoService;
36  import org.apache.mina.common.IoSession;
37  import org.apache.mina.common.TrafficMask;
38  import org.apache.mina.common.WriteFuture;
39  import org.apache.mina.common.IoFilter.WriteRequest;
40  
41  /**
42   * Base implementation of {@link IoSession}.
43   *
44   * @author The Apache Directory Project (mina-dev@directory.apache.org)
45   * @version $Rev: 592279 $, $Date: 2007-11-06 13:59:46 +0900 (Tue, 06 Nov 2007) $
46   */
47  public abstract class BaseIoSession implements IoSession {
48      
49      private static final IoFutureListener SCHEDULED_COUNTER_RESETTER =
50          new IoFutureListener() {
51              public void operationComplete(IoFuture future) {
52                  BaseIoSession s = (BaseIoSession) future.getSession();
53                  s.scheduledWriteBytes.set(0);
54                  s.scheduledWriteRequests.set(0);
55              }
56          };
57      
58      private final Object lock = new Object();
59  
60      private final Map<String, Object> attributes = Collections
61              .synchronizedMap(new HashMap<String, Object>(8));
62  
63      private final long creationTime;
64  
65      /**
66       * A future that will be set 'closed' when the connection is closed.
67       */
68      private final CloseFuture closeFuture = new DefaultCloseFuture(this);
69      
70      private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
71      
72      private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
73  
74      private final AtomicInteger scheduledWriteRequests = new AtomicInteger();
75  
76      private volatile boolean closing;
77  
78      // Configuration variables
79      private int idleTimeForRead;
80  
81      private int idleTimeForWrite;
82  
83      private int idleTimeForBoth;
84  
85      private int writeTimeout;
86  
87      private TrafficMask trafficMask = TrafficMask.ALL;
88  
89      // Status variables
90      private long readBytes;
91  
92      private long writtenBytes;
93  
94      private long readMessages;
95  
96      private long writtenMessages;
97  
98      private long lastReadTime;
99  
100     private long lastWriteTime;
101 
102     private int idleCountForBoth;
103 
104     private int idleCountForRead;
105 
106     private int idleCountForWrite;
107 
108     private long lastIdleTimeForBoth;
109 
110     private long lastIdleTimeForRead;
111 
112     private long lastIdleTimeForWrite;
113 
114     protected BaseIoSession() {
115         creationTime = lastReadTime = lastWriteTime = lastIdleTimeForBoth = lastIdleTimeForRead = lastIdleTimeForWrite = System
116                 .currentTimeMillis();
117         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
118     }
119 
120     public boolean isConnected() {
121         return !closeFuture.isClosed();
122     }
123 
124     public boolean isClosing() {
125         return closing || closeFuture.isClosed();
126     }
127 
128     public CloseFuture getCloseFuture() {
129         return closeFuture;
130     }
131     
132     public boolean isScheduledForFlush() {
133         return scheduledForFlush.get();
134     }
135     
136     public boolean setScheduledForFlush(boolean flag) {
137         if (flag) {
138             return scheduledForFlush.compareAndSet(false, true);
139         } else {
140             scheduledForFlush.set(false);
141             return true;
142         }
143     }
144 
145     public CloseFuture close() {
146         synchronized (lock) {
147             if (isClosing()) {
148                 return closeFuture;
149             } else {
150                 closing = true;
151             }
152         }
153 
154         close0();
155         return closeFuture;
156     }
157 
158     /**
159      * Implement this method to perform real close operation.
160      * By default, this method is implemented to set the future to
161      * 'closed' immediately.
162      */
163     protected void close0() {
164         closeFuture.setClosed();
165     }
166 
167     public WriteFuture write(Object message) {
168         return write(message, null);
169     }
170 
171     public WriteFuture write(Object message, SocketAddress remoteAddress) {
172         if (isClosing() ) {
173             return DefaultWriteFuture.newNotWrittenFuture(this);
174         }
175 
176         WriteFuture future = new DefaultWriteFuture(this);
177         write0(new WriteRequest(message, future, remoteAddress));
178 
179         return future;
180     }
181 
182     /**
183      * Implement this method to perform real write operation with
184      * the specified <code>writeRequest</code>.
185      *
186      * By default, this method is implemented to set the future to
187      * 'not written' immediately.
188      *
189      * @param writeRequest Write request to make
190      */
191     protected void write0(WriteRequest writeRequest) {
192         writeRequest.getFuture().setWritten(false);
193     }
194 
195     public Object getAttachment() {
196         return getAttribute("");
197     }
198 
199     public Object setAttachment(Object attachment) {
200         return setAttribute("", attachment);
201     }
202 
203     public Object getAttribute(String key) {
204         return attributes.get(key);
205     }
206 
207     public Object setAttribute(String key, Object value) {
208         if (value == null) {
209             return removeAttribute(key);
210         } else {
211             return attributes.put(key, value);
212         }
213     }
214 
215     public Object setAttribute(String key) {
216         return setAttribute(key, Boolean.TRUE);
217     }
218 
219     public Object removeAttribute(String key) {
220         return attributes.remove(key);
221     }
222 
223     public boolean containsAttribute(String key) {
224         return getAttribute(key) != null;
225     }
226 
227     public Set<String> getAttributeKeys() {
228         synchronized (attributes) {
229             return new HashSet<String>(attributes.keySet());
230         }
231     }
232 
233     public int getIdleTime(IdleStatus status) {
234         if (status == IdleStatus.BOTH_IDLE)
235             return idleTimeForBoth;
236 
237         if (status == IdleStatus.READER_IDLE)
238             return idleTimeForRead;
239 
240         if (status == IdleStatus.WRITER_IDLE)
241             return idleTimeForWrite;
242 
243         throw new IllegalArgumentException("Unknown idle status: " + status);
244     }
245 
246     public long getIdleTimeInMillis(IdleStatus status) {
247         return getIdleTime(status) * 1000L;
248     }
249 
250     public void setIdleTime(IdleStatus status, int idleTime) {
251         if (idleTime < 0)
252             throw new IllegalArgumentException("Illegal idle time: " + idleTime);
253 
254         if (status == IdleStatus.BOTH_IDLE)
255             idleTimeForBoth = idleTime;
256         else if (status == IdleStatus.READER_IDLE)
257             idleTimeForRead = idleTime;
258         else if (status == IdleStatus.WRITER_IDLE)
259             idleTimeForWrite = idleTime;
260         else
261             throw new IllegalArgumentException("Unknown idle status: " + status);
262     }
263 
264     public int getWriteTimeout() {
265         return writeTimeout;
266     }
267 
268     public long getWriteTimeoutInMillis() {
269         return writeTimeout * 1000L;
270     }
271 
272     public void setWriteTimeout(int writeTimeout) {
273         if (writeTimeout < 0)
274             throw new IllegalArgumentException("Illegal write timeout: "
275                     + writeTimeout);
276         this.writeTimeout = writeTimeout;
277     }
278 
279     public TrafficMask getTrafficMask() {
280         return trafficMask;
281     }
282 
283     public void setTrafficMask(TrafficMask trafficMask) {
284         if (trafficMask == null) {
285             throw new NullPointerException("trafficMask");
286         }
287 
288         if (this.trafficMask == trafficMask) {
289             return;
290         }
291 
292         this.trafficMask = trafficMask;
293         updateTrafficMask();
294     }
295 
296     public void suspendRead() {
297         setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
298     }
299 
300     public void suspendWrite() {
301         setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
302     }
303 
304     public void resumeRead() {
305         setTrafficMask(getTrafficMask().or(TrafficMask.READ));
306     }
307 
308     public void resumeWrite() {
309         setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
310     }
311 
312     /**
313      * Signals the {@link IoService} that the {@link TrafficMask} of this
314      * session has been changed.
315      */
316     protected abstract void updateTrafficMask();
317 
318     public long getReadBytes() {
319         return readBytes;
320     }
321 
322     public long getWrittenBytes() {
323         return writtenBytes;
324     }
325 
326     public long getWrittenWriteRequests() {
327         return writtenMessages;
328     }
329 
330     public long getReadMessages() {
331         return readMessages;
332     }
333 
334     public long getWrittenMessages() {
335         return writtenMessages;
336     }
337     
338     public int getScheduledWriteBytes() {
339         return scheduledWriteBytes.get();
340     }
341     
342     public int getScheduledWriteRequests() {
343         return scheduledWriteRequests.get();
344     }
345 
346     public void increaseReadBytes(int increment) {
347         if (increment > 0) {
348             readBytes += increment;
349             lastReadTime = System.currentTimeMillis();
350             idleCountForBoth = 0;
351             idleCountForRead = 0;
352         }
353     }
354 
355     public void increaseWrittenBytes(int increment) {
356         if (increment > 0) {
357             writtenBytes += increment;
358             lastWriteTime = System.currentTimeMillis();
359             idleCountForBoth = 0;
360             idleCountForWrite = 0;
361             
362             scheduledWriteBytes.addAndGet(-increment);
363         }
364     }
365     
366     public void increaseReadMessages() {
367         readMessages++;
368         lastReadTime = System.currentTimeMillis();
369     }
370 
371     public void increaseWrittenMessages() {
372         writtenMessages++;
373         lastWriteTime = System.currentTimeMillis();
374         scheduledWriteRequests.decrementAndGet();
375     }
376     
377     public void increaseScheduledWriteBytes(int increment) {
378         scheduledWriteBytes.addAndGet(increment);
379     }
380 
381     public void increaseScheduledWriteRequests() {
382         scheduledWriteRequests.incrementAndGet();
383     }
384 
385     public long getCreationTime() {
386         return creationTime;
387     }
388 
389     public long getLastIoTime() {
390         return Math.max(lastReadTime, lastWriteTime);
391     }
392 
393     public long getLastReadTime() {
394         return lastReadTime;
395     }
396 
397     public long getLastWriteTime() {
398         return lastWriteTime;
399     }
400 
401     public boolean isIdle(IdleStatus status) {
402         if (status == IdleStatus.BOTH_IDLE)
403             return idleCountForBoth > 0;
404 
405         if (status == IdleStatus.READER_IDLE)
406             return idleCountForRead > 0;
407 
408         if (status == IdleStatus.WRITER_IDLE)
409             return idleCountForWrite > 0;
410 
411         throw new IllegalArgumentException("Unknown idle status: " + status);
412     }
413 
414     public int getIdleCount(IdleStatus status) {
415         if (status == IdleStatus.BOTH_IDLE)
416             return idleCountForBoth;
417 
418         if (status == IdleStatus.READER_IDLE)
419             return idleCountForRead;
420 
421         if (status == IdleStatus.WRITER_IDLE)
422             return idleCountForWrite;
423 
424         throw new IllegalArgumentException("Unknown idle status: " + status);
425     }
426 
427     public long getLastIdleTime(IdleStatus status) {
428         if (status == IdleStatus.BOTH_IDLE)
429             return lastIdleTimeForBoth;
430 
431         if (status == IdleStatus.READER_IDLE)
432             return lastIdleTimeForRead;
433 
434         if (status == IdleStatus.WRITER_IDLE)
435             return lastIdleTimeForWrite;
436 
437         throw new IllegalArgumentException("Unknown idle status: " + status);
438     }
439 
440     public void increaseIdleCount(IdleStatus status) {
441         if (status == IdleStatus.BOTH_IDLE) {
442             idleCountForBoth++;
443             lastIdleTimeForBoth = System.currentTimeMillis();
444         } else if (status == IdleStatus.READER_IDLE) {
445             idleCountForRead++;
446             lastIdleTimeForRead = System.currentTimeMillis();
447         } else if (status == IdleStatus.WRITER_IDLE) {
448             idleCountForWrite++;
449             lastIdleTimeForWrite = System.currentTimeMillis();
450         } else
451             throw new IllegalArgumentException("Unknown idle status: " + status);
452     }
453 
454     @Override
455     public String toString() {
456         return "(" + getTransportType() + ", R: " + getRemoteAddress()
457                 + ", L: " + getLocalAddress() + ", S: " + getServiceAddress()
458                 + ')';
459     }
460 }