查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.CopyOnWriteArrayList;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicLong;
30  
31  import org.apache.mina.core.filterchain.IoFilterChain;
32  import org.apache.mina.core.future.IoFuture;
33  import org.apache.mina.core.future.IoFutureListener;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.util.ExceptionMonitor;
36  
37  /**
38   * A helper class which provides addition and removal of {@link IoServiceListener}s and firing
39   * events.
40   *
41   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
42   */
43  public class IoServiceListenerSupport {
44      /** The {@link IoService} that this instance manages. */
45      private final IoService service;
46  
47      /** A list of {@link IoServiceListener}s. */
48      private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
49  
50      /** Tracks managed sessions. */
51      private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();
52  
53      /**  Read only version of {@link #managedSessions}. */
54      private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
55  
56      private final AtomicBoolean activated = new AtomicBoolean();
57  
58      /** Time this listenerSupport has been activated */
59      private volatile long activationTime;
60  
61      /** A counter used to store the maximum sessions we managed since the listenerSupport has been activated */
62      private volatile int largestManagedSessionCount = 0;
63  
64      /** A global counter to count the number of sessions managed since the start */
65      private AtomicLong cumulativeManagedSessionCount = new AtomicLong(0);
66  
67      /**
68       * Creates a new instance of the listenerSupport.
69       * 
70       * @param service The associated IoService
71       */
72      public IoServiceListenerSupport(IoService service) {
73          if (service == null) {
74              throw new IllegalArgumentException("service");
75          }
76  
77          this.service = service;
78      }
79  
80      /**
81       * Adds a new listener.
82       * 
83       * @param listener The added listener
84       */
85      public void add(IoServiceListener listener) {
86          if (listener != null) {
87              listeners.add(listener);
88          }
89      }
90  
91      /**
92       * Removes an existing listener.
93       * 
94       * @param listener The listener to remove
95       */
96      public void remove(IoServiceListener listener) {
97          if (listener != null) {
98              listeners.remove(listener);
99          }
100     }
101 
102     /**
103      * @return The time (in ms) this instance has been activated
104      */
105     public long getActivationTime() {
106         return activationTime;
107     }
108 
109     public Map<Long, IoSession> getManagedSessions() {
110         return readOnlyManagedSessions;
111     }
112 
113     public int getManagedSessionCount() {
114         return managedSessions.size();
115     }
116 
117     /**
118      * @return The largest number of managed session since the creation of this
119      * listenerSupport
120      */
121     public int getLargestManagedSessionCount() {
122         return largestManagedSessionCount;
123     }
124 
125     /**
126      * @return The total number of sessions managed since the initilization of this
127      * ListenerSupport
128      */
129     public long getCumulativeManagedSessionCount() {
130         return cumulativeManagedSessionCount.get();
131     }
132 
133     /**
134      * @return true if the instance is active
135      */
136     public boolean isActive() {
137         return activated.get();
138     }
139 
140     /**
141      * Calls {@link IoServiceListener#serviceActivated(IoService)}
142      * for all registered listeners.
143      */
144     public void fireServiceActivated() {
145         if (!activated.compareAndSet(false, true)) {
146             // The instance is already active
147             return;
148         }
149 
150         activationTime = System.currentTimeMillis();
151 
152         // Activate all the listeners now
153         for (IoServiceListener listener : listeners) {
154             try {
155                 listener.serviceActivated(service);
156             } catch (Exception e) {
157                 ExceptionMonitor.getInstance().exceptionCaught(e);
158             }
159         }
160     }
161 
162     /**
163      * Calls {@link IoServiceListener#serviceDeactivated(IoService)}
164      * for all registered listeners.
165      */
166     public void fireServiceDeactivated() {
167         if (!activated.compareAndSet(true, false)) {
168             // The instance is already desactivated
169             return;
170         }
171 
172         // Desactivate all the listeners
173         try {
174             for (IoServiceListener listener : listeners) {
175                 try {
176                     listener.serviceDeactivated(service);
177                 } catch (Exception e) {
178                     ExceptionMonitor.getInstance().exceptionCaught(e);
179                 }
180             }
181         } finally {
182             disconnectSessions();
183         }
184     }
185 
186     /**
187      * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
188      * 
189      * @param session The session which has been created
190      */
191     public void fireSessionCreated(IoSession session) {
192         boolean firstSession = false;
193 
194         if (session.getService() instanceof IoConnector) {
195             synchronized (managedSessions) {
196                 firstSession = managedSessions.isEmpty();
197             }
198         }
199 
200         // If already registered, ignore.
201         if (managedSessions.putIfAbsent(session.getId(), session) != null) {
202             return;
203         }
204 
205         // If the first connector session, fire a virtual service activation event.
206         if (firstSession) {
207             fireServiceActivated();
208         }
209 
210         // Fire session events.
211         IoFilterChain filterChain = session.getFilterChain();
212         filterChain.fireSessionCreated();
213         filterChain.fireSessionOpened();
214 
215         int managedSessionCount = managedSessions.size();
216 
217         if (managedSessionCount > largestManagedSessionCount) {
218             largestManagedSessionCount = managedSessionCount;
219         }
220 
221         cumulativeManagedSessionCount.incrementAndGet();
222 
223         // Fire listener events.
224         for (IoServiceListener l : listeners) {
225             try {
226                 l.sessionCreated(session);
227             } catch (Exception e) {
228                 ExceptionMonitor.getInstance().exceptionCaught(e);
229             }
230         }
231     }
232 
233     /**
234      * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
235      * 
236      * @param session The session which has been destroyed
237      */
238     public void fireSessionDestroyed(IoSession session) {
239         // Try to remove the remaining empty session set after removal.
240         if (managedSessions.remove(session.getId()) == null) {
241             return;
242         }
243 
244         // Fire session events.
245         session.getFilterChain().fireSessionClosed();
246 
247         // Fire listener events.
248         try {
249             for (IoServiceListener l : listeners) {
250                 try {
251                     l.sessionDestroyed(session);
252                 } catch (Exception e) {
253                     ExceptionMonitor.getInstance().exceptionCaught(e);
254                 }
255             }
256         } finally {
257             // Fire a virtual service deactivation event for the last session of the connector.
258             if (session.getService() instanceof IoConnector) {
259                 boolean lastSession = false;
260 
261                 synchronized (managedSessions) {
262                     lastSession = managedSessions.isEmpty();
263                 }
264 
265                 if (lastSession) {
266                     fireServiceDeactivated();
267                 }
268             }
269         }
270     }
271 
272     /**
273      * Close all the sessions
274      * TODO disconnectSessions.
275      *
276      */
277     private void disconnectSessions() {
278         if (!(service instanceof IoAcceptor)) {
279             // We don't disconnect sessions for anything but an Acceptor
280             return;
281         }
282 
283         if (!((IoAcceptor) service).isCloseOnDeactivation()) {
284             return;
285         }
286 
287         Object lock = new Object();
288         IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
289 
290         for (IoSession s : managedSessions.values()) {
291             s.close(true).addListener(listener);
292         }
293 
294         try {
295             synchronized (lock) {
296                 while (!managedSessions.isEmpty()) {
297                     lock.wait(500);
298                 }
299             }
300         } catch (InterruptedException ie) {
301             // Ignored
302         }
303     }
304 
305     /**
306      * A listener in charge of releasing the lock when the close has been completed
307      */
308     private static class LockNotifyingListener implements IoFutureListener<IoFuture> {
309         private final Object lock;
310 
311         public LockNotifyingListener(Object lock) {
312             this.lock = lock;
313         }
314 
315         public void operationComplete(IoFuture future) {
316             synchronized (lock) {
317                 lock.notifyAll();
318             }
319         }
320     }
321 }