1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.mina.core.filterchain.IoFilterChain;
32 import org.apache.mina.core.future.IoFuture;
33 import org.apache.mina.core.future.IoFutureListener;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.util.ExceptionMonitor;
36
37
38
39
40
41
42
43 public class IoServiceListenerSupport {
44
45 private final IoService service;
46
47
48 private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();
49
50
51 private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();
52
53
54 private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);
55
56 private final AtomicBoolean activated = new AtomicBoolean();
57
58
59 private volatile long activationTime;
60
61
62 private volatile int largestManagedSessionCount = 0;
63
64
65 private AtomicLong cumulativeManagedSessionCount = new AtomicLong(0);
66
67
68
69
70
71
72 public IoServiceListenerSupport(IoService service) {
73 if (service == null) {
74 throw new IllegalArgumentException("service");
75 }
76
77 this.service = service;
78 }
79
80
81
82
83
84
85 public void add(IoServiceListener listener) {
86 if (listener != null) {
87 listeners.add(listener);
88 }
89 }
90
91
92
93
94
95
96 public void remove(IoServiceListener listener) {
97 if (listener != null) {
98 listeners.remove(listener);
99 }
100 }
101
102
103
104
105 public long getActivationTime() {
106 return activationTime;
107 }
108
109 public Map<Long, IoSession> getManagedSessions() {
110 return readOnlyManagedSessions;
111 }
112
113 public int getManagedSessionCount() {
114 return managedSessions.size();
115 }
116
117
118
119
120
121 public int getLargestManagedSessionCount() {
122 return largestManagedSessionCount;
123 }
124
125
126
127
128
129 public long getCumulativeManagedSessionCount() {
130 return cumulativeManagedSessionCount.get();
131 }
132
133
134
135
136 public boolean isActive() {
137 return activated.get();
138 }
139
140
141
142
143
144 public void fireServiceActivated() {
145 if (!activated.compareAndSet(false, true)) {
146
147 return;
148 }
149
150 activationTime = System.currentTimeMillis();
151
152
153 for (IoServiceListener listener : listeners) {
154 try {
155 listener.serviceActivated(service);
156 } catch (Exception e) {
157 ExceptionMonitor.getInstance().exceptionCaught(e);
158 }
159 }
160 }
161
162
163
164
165
166 public void fireServiceDeactivated() {
167 if (!activated.compareAndSet(true, false)) {
168
169 return;
170 }
171
172
173 try {
174 for (IoServiceListener listener : listeners) {
175 try {
176 listener.serviceDeactivated(service);
177 } catch (Exception e) {
178 ExceptionMonitor.getInstance().exceptionCaught(e);
179 }
180 }
181 } finally {
182 disconnectSessions();
183 }
184 }
185
186
187
188
189
190
191 public void fireSessionCreated(IoSession session) {
192 boolean firstSession = false;
193
194 if (session.getService() instanceof IoConnector) {
195 synchronized (managedSessions) {
196 firstSession = managedSessions.isEmpty();
197 }
198 }
199
200
201 if (managedSessions.putIfAbsent(session.getId(), session) != null) {
202 return;
203 }
204
205
206 if (firstSession) {
207 fireServiceActivated();
208 }
209
210
211 IoFilterChain filterChain = session.getFilterChain();
212 filterChain.fireSessionCreated();
213 filterChain.fireSessionOpened();
214
215 int managedSessionCount = managedSessions.size();
216
217 if (managedSessionCount > largestManagedSessionCount) {
218 largestManagedSessionCount = managedSessionCount;
219 }
220
221 cumulativeManagedSessionCount.incrementAndGet();
222
223
224 for (IoServiceListener l : listeners) {
225 try {
226 l.sessionCreated(session);
227 } catch (Exception e) {
228 ExceptionMonitor.getInstance().exceptionCaught(e);
229 }
230 }
231 }
232
233
234
235
236
237
238 public void fireSessionDestroyed(IoSession session) {
239
240 if (managedSessions.remove(session.getId()) == null) {
241 return;
242 }
243
244
245 session.getFilterChain().fireSessionClosed();
246
247
248 try {
249 for (IoServiceListener l : listeners) {
250 try {
251 l.sessionDestroyed(session);
252 } catch (Exception e) {
253 ExceptionMonitor.getInstance().exceptionCaught(e);
254 }
255 }
256 } finally {
257
258 if (session.getService() instanceof IoConnector) {
259 boolean lastSession = false;
260
261 synchronized (managedSessions) {
262 lastSession = managedSessions.isEmpty();
263 }
264
265 if (lastSession) {
266 fireServiceDeactivated();
267 }
268 }
269 }
270 }
271
272
273
274
275
276
277 private void disconnectSessions() {
278 if (!(service instanceof IoAcceptor)) {
279
280 return;
281 }
282
283 if (!((IoAcceptor) service).isCloseOnDeactivation()) {
284 return;
285 }
286
287 Object lock = new Object();
288 IoFutureListener<IoFuture> listener = new LockNotifyingListener(lock);
289
290 for (IoSession s : managedSessions.values()) {
291 s.close(true).addListener(listener);
292 }
293
294 try {
295 synchronized (lock) {
296 while (!managedSessions.isEmpty()) {
297 lock.wait(500);
298 }
299 }
300 } catch (InterruptedException ie) {
301
302 }
303 }
304
305
306
307
308 private static class LockNotifyingListener implements IoFutureListener<IoFuture> {
309 private final Object lock;
310
311 public LockNotifyingListener(Object lock) {
312 this.lock = lock;
313 }
314
315 public void operationComplete(IoFuture future) {
316 synchronized (lock) {
317 lock.notifyAll();
318 }
319 }
320 }
321 }