查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.service;
21  
22  import java.util.AbstractSet;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.TimeUnit;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.IoUtil;
34  import org.apache.mina.core.filterchain.DefaultIoFilterChain;
35  import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
36  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
37  import org.apache.mina.core.future.ConnectFuture;
38  import org.apache.mina.core.future.DefaultIoFuture;
39  import org.apache.mina.core.future.IoFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory;
43  import org.apache.mina.core.session.IdleStatus;
44  import org.apache.mina.core.session.IoSession;
45  import org.apache.mina.core.session.IoSessionConfig;
46  import org.apache.mina.core.session.IoSessionDataStructureFactory;
47  import org.apache.mina.core.session.IoSessionInitializationException;
48  import org.apache.mina.core.session.IoSessionInitializer;
49  import org.apache.mina.util.ExceptionMonitor;
50  import org.apache.mina.util.NamePreservingRunnable;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  /**
55   * Base implementation of {@link IoService}s.
56   * 
57   * An instance of IoService contains an Executor which will handle the incoming
58   * events.
59   *
60   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
61   */
62  public abstract class AbstractIoService implements IoService {
63  
64      private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class);
65  
66      /**
67       * The unique number identifying the Service. It's incremented
68       * for each new IoService created.
69       */
70      private static final AtomicInteger id = new AtomicInteger();
71  
72      /**
73       * The thread name built from the IoService inherited
74       * instance class name and the IoService Id
75       **/
76      private final String threadName;
77  
78      /**
79       * The associated executor, responsible for handling execution of I/O events.
80       */
81      private final Executor executor;
82  
83      /**
84       * A flag used to indicate that the local executor has been created
85       * inside this instance, and not passed by a caller.
86       * 
87       * If the executor is locally created, then it will be an instance
88       * of the ThreadPoolExecutor class.
89       */
90      private final boolean createdExecutor;
91  
92      /**
93       * The IoHandler in charge of managing all the I/O Events. It is
94       */
95      private IoHandler handler;
96  
97      /**
98       * The default {@link IoSessionConfig} which will be used to configure new sessions.
99       */
100     protected final IoSessionConfig sessionConfig;
101 
102     private final IoServiceListener serviceActivationListener = new IoServiceListener() {
103         public void serviceActivated(IoService service) {
104             // Update lastIoTime.
105             AbstractIoService s = (AbstractIoService) service;
106             IoServiceStatistics _stats = s.getStatistics();
107             _stats.setLastReadTime(s.getActivationTime());
108             _stats.setLastWriteTime(s.getActivationTime());
109             _stats.setLastThroughputCalculationTime(s.getActivationTime());
110 
111         }
112 
113         public void serviceDeactivated(IoService service) throws Exception {
114             // Empty handler
115         }
116 
117         public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
118             // Empty handler
119         }
120 
121         public void sessionCreated(IoSession session) throws Exception {
122             // Empty handler
123         }
124 
125         public void sessionClosed(IoSession session) throws Exception {
126             // Empty handler
127         }
128 
129         public void sessionDestroyed(IoSession session) throws Exception {
130             // Empty handler
131         }
132     };
133 
134     /**
135      * Current filter chain builder.
136      */
137     private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
138 
139     private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
140 
141     /**
142      * Maintains the {@link IoServiceListener}s of this service.
143      */
144     private final IoServiceListenerSupport listeners;
145 
146     /**
147      * A lock object which must be acquired when related resources are
148      * destroyed.
149      */
150     protected final Object disposalLock = new Object();
151 
152     private volatile boolean disposing;
153 
154     private volatile boolean disposed;
155 
156     /**
157      * {@inheritDoc}
158      */
159     private IoServiceStatistics stats = new IoServiceStatistics(this);
160 
161     /**
162      * Constructor for {@link AbstractIoService}. You need to provide a default
163      * session configuration and an {@link Executor} for handling I/O events. If
164      * a null {@link Executor} is provided, a default one will be created using
165      * {@link Executors#newCachedThreadPool()}.
166      * 
167      * @param sessionConfig
168      *            the default configuration for the managed {@link IoSession}
169      * @param executor
170      *            the {@link Executor} used for handling execution of I/O
171      *            events. Can be <code>null</code>.
172      */
173     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
174         if (sessionConfig == null) {
175             throw new IllegalArgumentException("sessionConfig");
176         }
177 
178         if (getTransportMetadata() == null) {
179             throw new IllegalArgumentException("TransportMetadata");
180         }
181 
182         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
183             throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
184                     + getTransportMetadata().getSessionConfigType() + ")");
185         }
186 
187         // Create the listeners, and add a first listener : a activation listener
188         // for this service, which will give information on the service state.
189         listeners = new IoServiceListenerSupport(this);
190         listeners.add(serviceActivationListener);
191 
192         // Stores the given session configuration
193         this.sessionConfig = sessionConfig;
194 
195         // Make JVM load the exception monitor before some transports
196         // change the thread context class loader.
197         ExceptionMonitor.getInstance();
198 
199         if (executor == null) {
200             this.executor = Executors.newCachedThreadPool();
201             createdExecutor = true;
202         } else {
203             this.executor = executor;
204             createdExecutor = false;
205         }
206 
207         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
208     }
209 
210     /**
211      * {@inheritDoc}
212      */
213     public final IoFilterChainBuilder getFilterChainBuilder() {
214         return filterChainBuilder;
215     }
216 
217     /**
218      * {@inheritDoc}
219      */
220     public final void setFilterChainBuilder(IoFilterChainBuilder builder) {
221         if (builder == null) {
222             builder = new DefaultIoFilterChainBuilder();
223         }
224         filterChainBuilder = builder;
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     public final DefaultIoFilterChainBuilder getFilterChain() {
231         if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {
232             return (DefaultIoFilterChainBuilder) filterChainBuilder;
233         }
234 
235         throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder.");
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public final void addListener(IoServiceListener listener) {
242         listeners.add(listener);
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     public final void removeListener(IoServiceListener listener) {
249         listeners.remove(listener);
250     }
251 
252     /**
253      * {@inheritDoc}
254      */
255     public final boolean isActive() {
256         return listeners.isActive();
257     }
258 
259     /**
260      * {@inheritDoc}
261      */
262     public final boolean isDisposing() {
263         return disposing;
264     }
265 
266     /**
267      * {@inheritDoc}
268      */
269     public final boolean isDisposed() {
270         return disposed;
271     }
272 
273     /**
274      * {@inheritDoc}
275      */
276     public final void dispose() {
277         dispose(false);
278     }
279 
280     /**
281      * {@inheritDoc}
282      */
283     public final void dispose(boolean awaitTermination) {
284         if (disposed) {
285             return;
286         }
287 
288         synchronized (disposalLock) {
289             if (!disposing) {
290                 disposing = true;
291 
292                 try {
293                     dispose0();
294                 } catch (Exception e) {
295                     ExceptionMonitor.getInstance().exceptionCaught(e);
296                 }
297             }
298         }
299 
300         if (createdExecutor) {
301             ExecutorService e = (ExecutorService) executor;
302             e.shutdownNow();
303             if (awaitTermination) {
304 
305                 try {
306                     LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName());
307                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
308                     LOGGER.debug("awaitTermination on {} finished", this);
309                 } catch (InterruptedException e1) {
310                     LOGGER.warn("awaitTermination on [{}] was interrupted", this);
311                     // Restore the interrupted status
312                     Thread.currentThread().interrupt();
313                 }
314             }
315         }
316         disposed = true;
317     }
318 
319     /**
320      * Implement this method to release any acquired resources.  This method
321      * is invoked only once by {@link #dispose()}.
322      * 
323      * @throws Exception If the dispose failed
324      */
325     protected abstract void dispose0() throws Exception;
326 
327     /**
328      * {@inheritDoc}
329      */
330     public final Map<Long, IoSession> getManagedSessions() {
331         return listeners.getManagedSessions();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     public final int getManagedSessionCount() {
338         return listeners.getManagedSessionCount();
339     }
340 
341     /**
342      * {@inheritDoc}
343      */
344     public final IoHandler getHandler() {
345         return handler;
346     }
347 
348     /**
349      * {@inheritDoc}
350      */
351     public final void setHandler(IoHandler handler) {
352         if (handler == null) {
353             throw new IllegalArgumentException("handler cannot be null");
354         }
355 
356         if (isActive()) {
357             throw new IllegalStateException("handler cannot be set while the service is active.");
358         }
359 
360         this.handler = handler;
361     }
362 
363     /**
364      * {@inheritDoc}
365      */
366     public final IoSessionDataStructureFactory getSessionDataStructureFactory() {
367         return sessionDataStructureFactory;
368     }
369 
370     /**
371      * {@inheritDoc}
372      */
373     public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) {
374         if (sessionDataStructureFactory == null) {
375             throw new IllegalArgumentException("sessionDataStructureFactory");
376         }
377 
378         if (isActive()) {
379             throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active.");
380         }
381 
382         this.sessionDataStructureFactory = sessionDataStructureFactory;
383     }
384 
385     /**
386      * {@inheritDoc}
387      */
388     public IoServiceStatistics getStatistics() {
389         return stats;
390     }
391 
392     /**
393      * {@inheritDoc}
394      */
395     public final long getActivationTime() {
396         return listeners.getActivationTime();
397     }
398 
399     /**
400      * {@inheritDoc}
401      */
402     public final Set<WriteFuture> broadcast(Object message) {
403         // Convert to Set.  We do not return a List here because only the
404         // direct caller of MessageBroadcaster knows the order of write
405         // operations.
406         final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values());
407         return new AbstractSet<WriteFuture>() {
408             @Override
409             public Iterator<WriteFuture> iterator() {
410                 return futures.iterator();
411             }
412 
413             @Override
414             public int size() {
415                 return futures.size();
416             }
417         };
418     }
419 
420     public final IoServiceListenerSupport getListeners() {
421         return listeners;
422     }
423 
424     protected final void executeWorker(Runnable worker) {
425         executeWorker(worker, null);
426     }
427 
428     protected final void executeWorker(Runnable worker, String suffix) {
429         String actualThreadName = threadName;
430         if (suffix != null) {
431             actualThreadName = actualThreadName + '-' + suffix;
432         }
433         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
434     }
435 
436     // TODO Figure out make it work without causing a compiler error / warning.
437     @SuppressWarnings("unchecked")
438     protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) {
439         // Update lastIoTime if needed.
440         if (stats.getLastReadTime() == 0) {
441             stats.setLastReadTime(getActivationTime());
442         }
443 
444         if (stats.getLastWriteTime() == 0) {
445             stats.setLastWriteTime(getActivationTime());
446         }
447 
448         // Every property but attributeMap should be set now.
449         // Now initialize the attributeMap.  The reason why we initialize
450         // the attributeMap at last is to make sure all session properties
451         // such as remoteAddress are provided to IoSessionDataStructureFactory.
452         try {
453             ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory()
454                     .getAttributeMap(session));
455         } catch (IoSessionInitializationException e) {
456             throw e;
457         } catch (Exception e) {
458             throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e);
459         }
460 
461         try {
462             ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory()
463                     .getWriteRequestQueue(session));
464         } catch (IoSessionInitializationException e) {
465             throw e;
466         } catch (Exception e) {
467             throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e);
468         }
469 
470         if ((future != null) && (future instanceof ConnectFuture)) {
471             // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now).
472             session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);
473         }
474 
475         if (sessionInitializer != null) {
476             sessionInitializer.initializeSession(session, future);
477         }
478 
479         finishSessionInitialization0(session, future);
480     }
481 
482     /**
483      * Implement this method to perform additional tasks required for session
484      * initialization. Do not call this method directly;
485      * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call
486      * this method instead.
487      * 
488      * @param session The session to initialize
489      * @param future The Future to use
490      * 
491      */
492     protected void finishSessionInitialization0(IoSession session, IoFuture future) {
493         // Do nothing. Extended class might add some specific code
494     }
495 
496     /**
497      * A specific class used to 
498      * @author elecharny
499      *
500      */
501     protected static class ServiceOperationFuture extends DefaultIoFuture {
502         public ServiceOperationFuture() {
503             super(null);
504         }
505 
506         public final boolean isDone() {
507             return getValue() == Boolean.TRUE;
508         }
509 
510         public final void setDone() {
511             setValue(Boolean.TRUE);
512         }
513 
514         public final Exception getException() {
515             if (getValue() instanceof Exception) {
516                 return (Exception) getValue();
517             }
518 
519             return null;
520         }
521 
522         public final void setException(Exception exception) {
523             if (exception == null) {
524                 throw new IllegalArgumentException("exception");
525             }
526             
527             setValue(exception);
528         }
529     }
530 
531     /**
532      * {@inheritDoc}
533      */
534     public int getScheduledWriteBytes() {
535         return stats.getScheduledWriteBytes();
536     }
537 
538     /**
539      * {@inheritDoc}
540      */
541     public int getScheduledWriteMessages() {
542         return stats.getScheduledWriteMessages();
543     }
544 
545 }