1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.service;
21
22 import java.util.AbstractSet;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicInteger;
32
33 import org.apache.mina.core.IoUtil;
34 import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35 import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37 import org.apache.mina.core.future.ConnectFuture;
38 import org.apache.mina.core.future.DefaultIoFuture;
39 import org.apache.mina.core.future.IoFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.session.AbstractIoSession;
42 import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43 import org.apache.mina.core.session.IdleStatus;
44 import org.apache.mina.core.session.IoSession;
45 import org.apache.mina.core.session.IoSessionConfig;
46 import org.apache.mina.core.session.IoSessionDataStructureFactory;
47 import org.apache.mina.core.session.IoSessionInitializationException;
48 import org.apache.mina.core.session.IoSessionInitializer;
49 import org.apache.mina.util.ExceptionMonitor;
50 import org.apache.mina.util.NamePreservingRunnable;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55
56
57
58
59
60
61
62 public abstract class AbstractIoService implements IoService {
63
64 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65
66
67
68
69
70 private static final AtomicInteger id = new AtomicInteger();
71
72
73
74
75
76 private final String threadName;
77
78
79
80
81 private final Executor executor;
82
83
84
85
86
87
88
89
90 private final boolean createdExecutor;
91
92
93
94
95 private IoHandler handler;
96
97
98
99
100 protected final IoSessionConfig sessionConfig;
101
102 private final IoServiceListener serviceActivationListener = new IoServiceListener() {
103 public void serviceActivated(IoService service) {
104
105 AbstractIoService s = (AbstractIoService) service;
106 IoServiceStatistics _stats = s.getStatistics();
107 _stats.setLastReadTime(s.getActivationTime());
108 _stats.setLastWriteTime(s.getActivationTime());
109 _stats.setLastThroughputCalculationTime(s.getActivationTime());
110
111 }
112
113 public void serviceDeactivated(IoService service) throws Exception {
114
115 }
116
117 public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
118
119 }
120
121 public void sessionCreated(IoSession session) throws Exception {
122
123 }
124
125 public void sessionClosed(IoSession session) throws Exception {
126
127 }
128
129 public void sessionDestroyed(IoSession session) throws Exception {
130
131 }
132 };
133
134
135
136
137 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
138
139 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
140
141
142
143
144 private final IoServiceListenerSupport listeners;
145
146
147
148
149
150 protected final Object disposalLock = new Object();
151
152 private volatile boolean disposing;
153
154 private volatile boolean disposed;
155
156
157
158
159 private IoServiceStatistics stats = new IoServiceStatistics(this);
160
161
162
163
164
165
166
167
168
169
170
171
172
173 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
174 if (sessionConfig == null) {
175 throw new IllegalArgumentException("sessionConfig");
176 }
177
178 if (getTransportMetadata() == null) {
179 throw new IllegalArgumentException("TransportMetadata");
180 }
181
182 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
183 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
184 + getTransportMetadata().getSessionConfigType() + ")");
185 }
186
187
188
189 listeners = new IoServiceListenerSupport(this);
190 listeners.add(serviceActivationListener);
191
192
193 this.sessionConfig = sessionConfig;
194
195
196
197 ExceptionMonitor.getInstance();
198
199 if (executor == null) {
200 this.executor = Executors.newCachedThreadPool();
201 createdExecutor = true;
202 } else {
203 this.executor = executor;
204 createdExecutor = false;
205 }
206
207 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
208 }
209
210
211
212
213 public final IoFilterChainBuilder getFilterChainBuilder() {
214 return filterChainBuilder;
215 }
216
217
218
219
220 public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
221 if (builder == null) {
222 builder = new DefaultIoFilterChainBuilder();
223 }
224 filterChainBuilder = builder;
225 }
226
227
228
229
230 public final DefaultIoFilterChainBuilder getFilterChain() {
231 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
232 return (DefaultIoFilterChainBuilder) filterChainBuilder;
233 }
234
235 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
236 }
237
238
239
240
241 public final void addListener(IoServiceListener listener) {
242 listeners.add(listener);
243 }
244
245
246
247
248 public final void removeListener(IoServiceListener listener) {
249 listeners.remove(listener);
250 }
251
252
253
254
255 public final boolean isActive() {
256 return listeners.isActive();
257 }
258
259
260
261
262 public final boolean isDisposing() {
263 return disposing;
264 }
265
266
267
268
269 public final boolean isDisposed() {
270 return disposed;
271 }
272
273
274
275
276 public final void dispose() {
277 dispose(false);
278 }
279
280
281
282
283 public final void dispose(boolean awaitTermination) {
284 if (disposed) {
285 return;
286 }
287
288 synchronized (disposalLock) {
289 if (!disposing) {
290 disposing = true;
291
292 try {
293 dispose0();
294 } catch (Exception e) {
295 ExceptionMonitor.getInstance().exceptionCaught(e);
296 }
297 }
298 }
299
300 if (createdExecutor) {
301 ExecutorService e = (ExecutorService) executor;
302 e.shutdownNow();
303 if (awaitTermination) {
304
305 try {
306 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
307 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
308 LOGGER.debug("awaitTermination on {} finished", this);
309 } catch (InterruptedException e1) {
310 LOGGER.warn("awaitTermination on [{}] was interrupted", this);
311
312 Thread.currentThread().interrupt();
313 }
314 }
315 }
316 disposed = true;
317 }
318
319
320
321
322
323
324
325 protected abstract void dispose0() throws Exception;
326
327
328
329
330 public final Map<Long, IoSession> getManagedSessions() {
331 return listeners.getManagedSessions();
332 }
333
334
335
336
337 public final int getManagedSessionCount() {
338 return listeners.getManagedSessionCount();
339 }
340
341
342
343
344 public final IoHandler getHandler() {
345 return handler;
346 }
347
348
349
350
351 public final void setHandler(IoHandler handler) {
352 if (handler == null) {
353 throw new IllegalArgumentException("handler cannot be null");
354 }
355
356 if (isActive()) {
357 throw new IllegalStateException("handler cannot be set while the service is active.");
358 }
359
360 this.handler = handler;
361 }
362
363
364
365
366 public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
367 return sessionDataStructureFactory;
368 }
369
370
371
372
373 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
374 if (sessionDataStructureFactory == null) {
375 throw new IllegalArgumentException("sessionDataStructureFactory");
376 }
377
378 if (isActive()) {
379 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
380 }
381
382 this.sessionDataStructureFactory = sessionDataStructureFactory;
383 }
384
385
386
387
388 public IoServiceStatistics getStatistics() {
389 return stats;
390 }
391
392
393
394
395 public final long getActivationTime() {
396 return listeners.getActivationTime();
397 }
398
399
400
401
402 public final Set<WriteFuture> broadcast(Object message) {
403
404
405
406 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
407 return new AbstractSet<WriteFuture>() {
408 @Override
409 public Iterator<WriteFuture> iterator() {
410 return futures.iterator();
411 }
412
413 @Override
414 public int size() {
415 return futures.size();
416 }
417 };
418 }
419
420 public final IoServiceListenerSupport getListeners() {
421 return listeners;
422 }
423
424 protected final void executeWorker(Runnable worker) {
425 executeWorker(worker, null);
426 }
427
428 protected final void executeWorker(Runnable worker, String suffix) {
429 String actualThreadName = threadName;
430 if (suffix != null) {
431 actualThreadName = actualThreadName + '-' + suffix;
432 }
433 executor.execute(new NamePreservingRunnable(worker, actualThreadName));
434 }
435
436
437 @SuppressWarnings("unchecked")
438 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
439
440 if (stats.getLastReadTime() == 0) {
441 stats.setLastReadTime(getActivationTime());
442 }
443
444 if (stats.getLastWriteTime() == 0) {
445 stats.setLastWriteTime(getActivationTime());
446 }
447
448
449
450
451
452 try {
453 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
454 .getAttributeMap(session));
455 } catch (IoSessionInitializationException e) {
456 throw e;
457 } catch (Exception e) {
458 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
459 }
460
461 try {
462 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
463 .getWriteRequestQueue(session));
464 } catch (IoSessionInitializationException e) {
465 throw e;
466 } catch (Exception e) {
467 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
468 }
469
470 if ((future != null) && (future instanceof ConnectFuture)) {
471
472 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
473 }
474
475 if (sessionInitializer != null) {
476 sessionInitializer.initializeSession(session, future);
477 }
478
479 finishSessionInitialization0(session, future);
480 }
481
482
483
484
485
486
487
488
489
490
491
492 protected void finishSessionInitialization0(IoSession session, IoFuture future) {
493
494 }
495
496
497
498
499
500
501 protected static class ServiceOperationFuture extends DefaultIoFuture {
502 public ServiceOperationFuture() {
503 super(null);
504 }
505
506 public final boolean isDone() {
507 return getValue() == Boolean.TRUE;
508 }
509
510 public final void setDone() {
511 setValue(Boolean.TRUE);
512 }
513
514 public final Exception getException() {
515 if (getValue() instanceof Exception) {
516 return (Exception) getValue();
517 }
518
519 return null;
520 }
521
522 public final void setException(Exception exception) {
523 if (exception == null) {
524 throw new IllegalArgumentException("exception");
525 }
526
527 setValue(exception);
528 }
529 }
530
531
532
533
534 public int getScheduledWriteBytes() {
535 return stats.getScheduledWriteBytes();
536 }
537
538
539
540
541 public int getScheduledWriteMessages() {
542 return stats.getScheduledWriteMessages();
543 }
544
545 }