查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.common.support;
21  
22  import java.net.SocketAddress;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.ConcurrentHashMap;
27  import java.util.concurrent.ConcurrentMap;
28  import java.util.concurrent.CopyOnWriteArrayList;
29  import java.util.concurrent.CopyOnWriteArraySet;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.apache.mina.common.ExceptionMonitor;
33  import org.apache.mina.common.IoAcceptorConfig;
34  import org.apache.mina.common.IoConnector;
35  import org.apache.mina.common.IoFuture;
36  import org.apache.mina.common.IoFutureListener;
37  import org.apache.mina.common.IoHandler;
38  import org.apache.mina.common.IoService;
39  import org.apache.mina.common.IoServiceConfig;
40  import org.apache.mina.common.IoServiceListener;
41  import org.apache.mina.common.IoSession;
42  import org.apache.mina.common.RuntimeIOException;
43  import org.apache.mina.util.IdentityHashSet;
44  
45  /**
46   * A helper which provides addition and removal of {@link IoServiceListener}s and firing
47   * events.
48   *
49   * @author The Apache Directory Project (mina-dev@directory.apache.org)
50   * @version $Rev: 636193 $, $Date: 2008-03-12 13:17:43 +0900 (Wed, 12 Mar 2008) $
51   */
52  public class IoServiceListenerSupport {
53      /**
54       * A list of {@link IoServiceListener}s.
55       */
56      private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
57  
58      /**
59       * Tracks managed <tt>serviceAddress</tt>es.
60       */
61      private final Set<SocketAddress> managedServiceAddresses = new CopyOnWriteArraySet<SocketAddress>();
62  
63      /**
64       * Tracks managed sesssions with <tt>serviceAddress</tt> as a key.
65       */
66      private final ConcurrentMap<SocketAddress, Set<IoSession>> managedSessions = new ConcurrentHashMap<SocketAddress, Set<IoSession>>();
67  
68      /**
69       * Creates a new instance.
70       */
71      public IoServiceListenerSupport() {
72      }
73  
74      /**
75       * Adds a new listener.
76       */
77      public void add(IoServiceListener listener) {
78          listeners.add(listener);
79      }
80  
81      /**
82       * Removes an existing listener.
83       */
84      public void remove(IoServiceListener listener) {
85          listeners.remove(listener);
86      }
87  
88      public Set<SocketAddress> getManagedServiceAddresses() {
89          return Collections.unmodifiableSet(managedServiceAddresses);
90      }
91  
92      public boolean isManaged(SocketAddress serviceAddress) {
93          return managedServiceAddresses.contains(serviceAddress);
94      }
95  
96      public Set<IoSession> getManagedSessions(SocketAddress serviceAddress) {
97          Set<IoSession> sessions = managedSessions.get(serviceAddress);
98  
99          if (null == sessions) {
100             return Collections.emptySet();
101         }
102 
103         synchronized (sessions) {
104             return new IdentityHashSet<IoSession>(sessions);
105         }
106     }
107 
108     /**
109      * Calls {@link IoServiceListener#serviceActivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
110      * for all registered listeners.
111      */
112     public void fireServiceActivated(IoService service,
113             SocketAddress serviceAddress, IoHandler handler,
114             IoServiceConfig config) {
115         if (!managedServiceAddresses.add(serviceAddress)) {
116             return;
117         }
118 
119         for (IoServiceListener listener : listeners) {
120             try {
121                 listener.serviceActivated(service, serviceAddress, handler, config);
122             } catch (Throwable e) {
123                 ExceptionMonitor.getInstance().exceptionCaught(e);
124             }
125         }
126     }
127 
128     /**
129      * Calls {@link IoServiceListener#serviceDeactivated(IoService, SocketAddress, IoHandler, IoServiceConfig)}
130      * for all registered listeners.
131      */
132     public synchronized void fireServiceDeactivated(IoService service,
133             SocketAddress serviceAddress, IoHandler handler,
134             IoServiceConfig config) {
135         if (!managedServiceAddresses.remove(serviceAddress)) {
136             return;
137         }
138 
139         try {
140             for (IoServiceListener listener : listeners) {
141                 try {
142                     listener.serviceDeactivated(service, serviceAddress, handler,
143                             config);
144                 } catch (Throwable e) {
145                     ExceptionMonitor.getInstance().exceptionCaught(e);
146                 }
147             }
148         } finally {
149             disconnectSessions(serviceAddress, config);
150         }
151     }
152 
153     /**
154      * Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.
155      */
156     public void fireSessionCreated(IoSession session) {
157         SocketAddress serviceAddress = session.getServiceAddress();
158 
159         boolean firstSession;
160         Set<IoSession> s = new IdentityHashSet<IoSession>();
161         synchronized (managedSessions) {
162             // Get the session set.
163             Set<IoSession> sessions = managedSessions.putIfAbsent(serviceAddress,
164                     Collections.synchronizedSet(s));
165     
166             if (null == sessions) {
167                 sessions = s;
168                 firstSession = true;
169             } else {
170                 firstSession = false;
171             }
172     
173             // If already registered, ignore.
174             if (!sessions.add(session)) {
175                 return;
176             }
177         }
178 
179         // If the first connector session, fire a virtual service activation event.
180         if (session.getService() instanceof IoConnector && firstSession) {
181             fireServiceActivated(session.getService(), session
182                     .getServiceAddress(), session.getHandler(), session
183                     .getServiceConfig());
184         }
185 
186         // Fire session events.
187         session.getFilterChain().fireSessionCreated(session);
188         session.getFilterChain().fireSessionOpened(session);
189 
190         // Fire listener events.
191         for (IoServiceListener listener : listeners) {
192             try {
193                 listener.sessionCreated(session);
194             } catch (Throwable e) {
195                 ExceptionMonitor.getInstance().exceptionCaught(e);
196             }
197         }
198     }
199 
200     /**
201      * Calls {@link IoServiceListener#sessionDestroyed(IoSession)} for all registered listeners.
202      */
203     public void fireSessionDestroyed(IoSession session) {
204         SocketAddress serviceAddress = session.getServiceAddress();
205 
206         boolean lastSession = false;
207         synchronized (managedSessions) {
208             // Get the session set.
209             Set<IoSession> sessions = managedSessions.get(serviceAddress);
210             // Ignore if unknown.
211             if (sessions == null) {
212                 return;
213             }
214     
215             sessions.remove(session);
216     
217             // Try to remove the remaining empty session set after removal.
218             if (sessions.isEmpty()) {
219                 lastSession = managedSessions.remove(serviceAddress, sessions);
220             }
221         }
222 
223         // Fire session events.
224         session.getFilterChain().fireSessionClosed(session);
225 
226         // Fire listener events.
227         try {
228             for (IoServiceListener listener : listeners) {
229                 try {
230                     listener.sessionDestroyed(session);
231                 } catch (Throwable e) {
232                     ExceptionMonitor.getInstance().exceptionCaught(e);
233                 }
234             }
235         } finally {
236             // Fire a virtual service deactivation event for the last session of the connector.
237             //TODO double-check that this is *STILL* the last session. May not be the case
238             if (session.getService() instanceof IoConnector && lastSession) {
239                 fireServiceDeactivated(session.getService(), session
240                         .getServiceAddress(), session.getHandler(), session
241                         .getServiceConfig());
242             }
243         }
244     }
245 
246     private void disconnectSessions(SocketAddress serviceAddress,
247             IoServiceConfig config) {
248         if (!(config instanceof IoAcceptorConfig)) {
249             return;
250         }
251 
252         if (!((IoAcceptorConfig) config).isDisconnectOnUnbind()) {
253             return;
254         }
255 
256         Set<IoSession> sessions = getManagedSessions(serviceAddress);
257 
258         if (sessions.isEmpty()) {
259             return;
260         }
261 
262         final CountDownLatch latch = new CountDownLatch(sessions.size());
263 
264         for (IoSession session : sessions) {
265             session.close().addListener(new IoFutureListener() {
266                 public void operationComplete(IoFuture future) {
267                     latch.countDown();
268                 }
269             });
270         }
271 
272         try {
273             latch.await();
274         } catch (InterruptedException e) {
275             throw new RuntimeIOException(e);
276         }
277     }
278 }