查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.filter.executor;
21  
22  import java.util.LinkedList;
23  import java.util.Queue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.LinkedBlockingQueue;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.mina.common.IdleStatus;
30  import org.apache.mina.common.IoFilterAdapter;
31  import org.apache.mina.common.IoFilterChain;
32  import org.apache.mina.common.IoSession;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * A filter that forward events to {@link Executor} in
38   * <a href="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/">backport-util-concurrent</a>.
39   * You can apply various thread model by inserting this filter to the {@link IoFilterChain}.
40   * <p>
41   * Please note that this filter doesn't manage the life cycle of the underlying
42   * {@link Executor}.  You have to destroy or stop it by yourself.
43   *
44   * @author The Apache MINA Project (dev@mina.apache.org)
45   * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 2005) $
46   */
47  public class ExecutorFilter extends IoFilterAdapter {
48      private final Logger logger = LoggerFactory.getLogger(getClass());
49  
50      private final Executor executor;
51  
52      /**
53       * Creates a new instace with the default thread pool implementation
54       * (<tt>new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() )</tt>).
55       */
56      public ExecutorFilter() {
57          this(new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS,
58                  new LinkedBlockingQueue<Runnable>()));
59      }
60  
61      /**
62       * Creates a new instance with the specified <tt>executor</tt>.
63       */
64      public ExecutorFilter(Executor executor) {
65          if (executor == null) {
66              throw new NullPointerException("executor");
67          }
68  
69          this.executor = executor;
70      }
71  
72      /**
73       * Returns the underlying {@link Executor} instance this filter uses.
74       */
75      public Executor getExecutor() {
76          return executor;
77      }
78  
79      private void fireEvent(NextFilter nextFilter, IoSession session,
80              EventType type, Object data) {
81          Event event = new Event(type, nextFilter, data);
82          SessionBuffer buf = SessionBuffer.getSessionBuffer(session);
83  
84          boolean execute;
85          synchronized (buf.eventQueue) {
86              buf.eventQueue.offer(event);
87              if (buf.processingCompleted) {
88                  buf.processingCompleted = false;
89                  execute = true;
90              } else {
91                  execute = false;
92              }
93          }
94  
95          if (execute) {
96              if (logger.isDebugEnabled()) {
97                  logger.debug("Launching thread for "
98                          + session.getRemoteAddress());
99              }
100 
101             executor.execute(new ProcessEventsRunnable(buf));
102         }
103     }
104 
105     private static class SessionBuffer {
106         private static final String KEY = SessionBuffer.class.getName()
107                 + ".KEY";
108 
109         private static SessionBuffer getSessionBuffer(IoSession session) {
110             synchronized (session) {
111                 SessionBuffer buf = (SessionBuffer) session.getAttribute(KEY);
112                 if (buf == null) {
113                     buf = new SessionBuffer(session);
114                     session.setAttribute(KEY, buf);
115                 }
116                 return buf;
117             }
118         }
119 
120         private final IoSession session;
121 
122         private final Queue<Event> eventQueue = new LinkedList<Event>();
123 
124         private boolean processingCompleted = true;
125 
126         private SessionBuffer(IoSession session) {
127             this.session = session;
128         }
129     }
130 
131     protected static class EventType {
132         public static final EventType OPENED = new EventType("OPENED");
133 
134         public static final EventType CLOSED = new EventType("CLOSED");
135 
136         public static final EventType READ = new EventType("READ");
137 
138         public static final EventType WRITTEN = new EventType("WRITTEN");
139 
140         public static final EventType RECEIVED = new EventType("RECEIVED");
141 
142         public static final EventType SENT = new EventType("SENT");
143 
144         public static final EventType IDLE = new EventType("IDLE");
145 
146         public static final EventType EXCEPTION = new EventType("EXCEPTION");
147 
148         private final String value;
149 
150         private EventType(String value) {
151             this.value = value;
152         }
153 
154         public String toString() {
155             return value;
156         }
157     }
158 
159     protected static class Event {
160         private final EventType type;
161 
162         private final NextFilter nextFilter;
163 
164         private final Object data;
165 
166         Event(EventType type, NextFilter nextFilter, Object data) {
167             this.type = type;
168             this.nextFilter = nextFilter;
169             this.data = data;
170         }
171 
172         public Object getData() {
173             return data;
174         }
175 
176         public NextFilter getNextFilter() {
177             return nextFilter;
178         }
179 
180         public EventType getType() {
181             return type;
182         }
183     }
184 
185     public void sessionCreated(NextFilter nextFilter, IoSession session) {
186         nextFilter.sessionCreated(session);
187     }
188 
189     public void sessionOpened(NextFilter nextFilter, IoSession session) {
190         fireEvent(nextFilter, session, EventType.OPENED, null);
191     }
192 
193     public void sessionClosed(NextFilter nextFilter, IoSession session) {
194         fireEvent(nextFilter, session, EventType.CLOSED, null);
195     }
196 
197     public void sessionIdle(NextFilter nextFilter, IoSession session,
198             IdleStatus status) {
199         fireEvent(nextFilter, session, EventType.IDLE, status);
200     }
201 
202     public void exceptionCaught(NextFilter nextFilter, IoSession session,
203             Throwable cause) {
204         fireEvent(nextFilter, session, EventType.EXCEPTION, cause);
205     }
206 
207     public void messageReceived(NextFilter nextFilter, IoSession session,
208             Object message) {
209         fireEvent(nextFilter, session, EventType.RECEIVED, message);
210     }
211 
212     public void messageSent(NextFilter nextFilter, IoSession session,
213             Object message) {
214         fireEvent(nextFilter, session, EventType.SENT, message);
215     }
216 
217     protected void processEvent(NextFilter nextFilter, IoSession session,
218             EventType type, Object data) {
219         if (type == EventType.RECEIVED) {
220             nextFilter.messageReceived(session, data);
221         } else if (type == EventType.SENT) {
222             nextFilter.messageSent(session, data);
223         } else if (type == EventType.EXCEPTION) {
224             nextFilter.exceptionCaught(session, (Throwable) data);
225         } else if (type == EventType.IDLE) {
226             nextFilter.sessionIdle(session, (IdleStatus) data);
227         } else if (type == EventType.OPENED) {
228             nextFilter.sessionOpened(session);
229         } else if (type == EventType.CLOSED) {
230             nextFilter.sessionClosed(session);
231         }
232     }
233 
234     public void filterWrite(NextFilter nextFilter, IoSession session,
235             WriteRequest writeRequest) {
236         nextFilter.filterWrite(session, writeRequest);
237     }
238 
239     public void filterClose(NextFilter nextFilter, IoSession session)
240             throws Exception {
241         nextFilter.filterClose(session);
242     }
243 
244     private class ProcessEventsRunnable implements Runnable {
245         private final SessionBuffer buffer;
246 
247         ProcessEventsRunnable(SessionBuffer buffer) {
248             this.buffer = buffer;
249         }
250 
251         public void run() {
252             while (true) {
253                 Event event;
254 
255                 synchronized (buffer.eventQueue) {
256                     event = buffer.eventQueue.poll();
257 
258                     if (event == null) {
259                         buffer.processingCompleted = true;
260                         break;
261                     }
262                 }
263 
264                 processEvent(event.getNextFilter(), buffer.session, event
265                         .getType(), event.getData());
266             }
267 
268             if (logger.isDebugEnabled()) {
269                 logger.debug("Exiting since queue is empty for "
270                         + buffer.session.getRemoteAddress());
271             }
272         }
273     }
274 }