1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common.support;
21
22 import java.net.SocketAddress;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.CopyOnWriteArrayList;
29 import java.util.concurrent.CopyOnWriteArraySet;
30 import java.util.concurrent.CountDownLatch;
31
32 import org.apache.mina.common.ExceptionMonitor;
33 import org.apache.mina.common.IoAcceptorConfig;
34 import org.apache.mina.common.IoConnector;
35 import org.apache.mina.common.IoFuture;
36 import org.apache.mina.common.IoFutureListener;
37 import org.apache.mina.common.IoHandler;
38 import org.apache.mina.common.IoService;
39 import org.apache.mina.common.IoServiceConfig;
40 import org.apache.mina.common.IoServiceListener;
41 import org.apache.mina.common.IoSession;
42 import org.apache.mina.common.RuntimeIOException;
43 import org.apache.mina.util.IdentityHashSet;
44
45
46
47
48
49
50
51
52 public class IoServiceListenerSupport {
53
54
55
56 private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
57
58
59
60
61 private final Set<SocketAddress> managedServiceAddresses = new CopyOnWriteArraySet<SocketAddress>();
62
63
64
65
66 private final ConcurrentMap<SocketAddress, Set<IoSession>> managedSessions = new ConcurrentHashMap<SocketAddress, Set<IoSession>>();
67
68
69
70
71 public IoServiceListenerSupport() {
72 }
73
74
75
76
77 public void add(IoServiceListener listener) {
78 listeners.add(listener);
79 }
80
81
82
83
84 public void remove(IoServiceListener listener) {
85 listeners.remove(listener);
86 }
87
88 public Set<SocketAddress> getManagedServiceAddresses() {
89 return Collections.unmodifiableSet(managedServiceAddresses);
90 }
91
92 public boolean isManaged(SocketAddress serviceAddress) {
93 return managedServiceAddresses.contains(serviceAddress);
94 }
95
96 public Set<IoSession> getManagedSessions(SocketAddress serviceAddress) {
97 Set<IoSession> sessions = managedSessions.get(serviceAddress);
98
99 if (null == sessions) {
100 return Collections.emptySet();
101 }
102
103 synchronized (sessions) {
104 return new IdentityHashSet<IoSession>(sessions);
105 }
106 }
107
108
109
110
111
112 public void fireServiceActivated(IoService service,
113 SocketAddress serviceAddress, IoHandler handler,
114 IoServiceConfig config) {
115 if (!managedServiceAddresses.add(serviceAddress)) {
116 return;
117 }
118
119 for (IoServiceListener listener : listeners) {
120 try {
121 listener.serviceActivated(service, serviceAddress, handler, config);
122 } catch (Throwable e) {
123 ExceptionMonitor.getInstance().exceptionCaught(e);
124 }
125 }
126 }
127
128
129
130
131
132 public synchronized void fireServiceDeactivated(IoService service,
133 SocketAddress serviceAddress, IoHandler handler,
134 IoServiceConfig config) {
135 if (!managedServiceAddresses.remove(serviceAddress)) {
136 return;
137 }
138
139 try {
140 for (IoServiceListener listener : listeners) {
141 try {
142 listener.serviceDeactivated(service, serviceAddress, handler,
143 config);
144 } catch (Throwable e) {
145 ExceptionMonitor.getInstance().exceptionCaught(e);
146 }
147 }
148 } finally {
149 disconnectSessions(serviceAddress, config);
150 }
151 }
152
153
154
155
156 public void fireSessionCreated(IoSession session) {
157 SocketAddress serviceAddress = session.getServiceAddress();
158
159 boolean firstSession;
160 Set<IoSession> s = new IdentityHashSet<IoSession>();
161 synchronized (managedSessions) {
162
163 Set<IoSession> sessions = managedSessions.putIfAbsent(serviceAddress,
164 Collections.synchronizedSet(s));
165
166 if (null == sessions) {
167 sessions = s;
168 firstSession = true;
169 } else {
170 firstSession = false;
171 }
172
173
174 if (!sessions.add(session)) {
175 return;
176 }
177 }
178
179
180 if (session.getService() instanceof IoConnector && firstSession) {
181 fireServiceActivated(session.getService(), session
182 .getServiceAddress(), session.getHandler(), session
183 .getServiceConfig());
184 }
185
186
187 session.getFilterChain().fireSessionCreated(session);
188 session.getFilterChain().fireSessionOpened(session);
189
190
191 for (IoServiceListener listener : listeners) {
192 try {
193 listener.sessionCreated(session);
194 } catch (Throwable e) {
195 ExceptionMonitor.getInstance().exceptionCaught(e);
196 }
197 }
198 }
199
200
201
202
203 public void fireSessionDestroyed(IoSession session) {
204 SocketAddress serviceAddress = session.getServiceAddress();
205
206 boolean lastSession = false;
207 synchronized (managedSessions) {
208
209 Set<IoSession> sessions = managedSessions.get(serviceAddress);
210
211 if (sessions == null) {
212 return;
213 }
214
215 sessions.remove(session);
216
217
218 if (sessions.isEmpty()) {
219 lastSession = managedSessions.remove(serviceAddress, sessions);
220 }
221 }
222
223
224 session.getFilterChain().fireSessionClosed(session);
225
226
227 try {
228 for (IoServiceListener listener : listeners) {
229 try {
230 listener.sessionDestroyed(session);
231 } catch (Throwable e) {
232 ExceptionMonitor.getInstance().exceptionCaught(e);
233 }
234 }
235 } finally {
236
237
238 if (session.getService() instanceof IoConnector && lastSession) {
239 fireServiceDeactivated(session.getService(), session
240 .getServiceAddress(), session.getHandler(), session
241 .getServiceConfig());
242 }
243 }
244 }
245
246 private void disconnectSessions(SocketAddress serviceAddress,
247 IoServiceConfig config) {
248 if (!(config instanceof IoAcceptorConfig)) {
249 return;
250 }
251
252 if (!((IoAcceptorConfig) config).isDisconnectOnUnbind()) {
253 return;
254 }
255
256 Set<IoSession> sessions = getManagedSessions(serviceAddress);
257
258 if (sessions.isEmpty()) {
259 return;
260 }
261
262 final CountDownLatch latch = new CountDownLatch(sessions.size());
263
264 for (IoSession session : sessions) {
265 session.close().addListener(new IoFutureListener() {
266 public void operationComplete(IoFuture future) {
267 latch.countDown();
268 }
269 });
270 }
271
272 try {
273 latch.await();
274 } catch (InterruptedException e) {
275 throw new RuntimeIOException(e);
276 }
277 }
278 }