1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.LinkedList;
23 import java.util.Queue;
24 import java.util.concurrent.Executor;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.mina.common.IdleStatus;
30 import org.apache.mina.common.IoFilterAdapter;
31 import org.apache.mina.common.IoFilterChain;
32 import org.apache.mina.common.IoSession;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44
45
46
47 public class ExecutorFilter extends IoFilterAdapter {
48 private final Logger logger = LoggerFactory.getLogger(getClass());
49
50 private final Executor executor;
51
52
53
54
55
56 public ExecutorFilter() {
57 this(new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS,
58 new LinkedBlockingQueue<Runnable>()));
59 }
60
61
62
63
64 public ExecutorFilter(Executor executor) {
65 if (executor == null) {
66 throw new NullPointerException("executor");
67 }
68
69 this.executor = executor;
70 }
71
72
73
74
75 public Executor getExecutor() {
76 return executor;
77 }
78
79 private void fireEvent(NextFilter nextFilter, IoSession session,
80 EventType type, Object data) {
81 Event event = new Event(type, nextFilter, data);
82 SessionBuffer buf = SessionBuffer.getSessionBuffer(session);
83
84 boolean execute;
85 synchronized (buf.eventQueue) {
86 buf.eventQueue.offer(event);
87 if (buf.processingCompleted) {
88 buf.processingCompleted = false;
89 execute = true;
90 } else {
91 execute = false;
92 }
93 }
94
95 if (execute) {
96 if (logger.isDebugEnabled()) {
97 logger.debug("Launching thread for "
98 + session.getRemoteAddress());
99 }
100
101 executor.execute(new ProcessEventsRunnable(buf));
102 }
103 }
104
105 private static class SessionBuffer {
106 private static final String KEY = SessionBuffer.class.getName()
107 + ".KEY";
108
109 private static SessionBuffer getSessionBuffer(IoSession session) {
110 synchronized (session) {
111 SessionBuffer buf = (SessionBuffer) session.getAttribute(KEY);
112 if (buf == null) {
113 buf = new SessionBuffer(session);
114 session.setAttribute(KEY, buf);
115 }
116 return buf;
117 }
118 }
119
120 private final IoSession session;
121
122 private final Queue<Event> eventQueue = new LinkedList<Event>();
123
124 private boolean processingCompleted = true;
125
126 private SessionBuffer(IoSession session) {
127 this.session = session;
128 }
129 }
130
131 protected static class EventType {
132 public static final EventType OPENED = new EventType("OPENED");
133
134 public static final EventType CLOSED = new EventType("CLOSED");
135
136 public static final EventType READ = new EventType("READ");
137
138 public static final EventType WRITTEN = new EventType("WRITTEN");
139
140 public static final EventType RECEIVED = new EventType("RECEIVED");
141
142 public static final EventType SENT = new EventType("SENT");
143
144 public static final EventType IDLE = new EventType("IDLE");
145
146 public static final EventType EXCEPTION = new EventType("EXCEPTION");
147
148 private final String value;
149
150 private EventType(String value) {
151 this.value = value;
152 }
153
154 public String toString() {
155 return value;
156 }
157 }
158
159 protected static class Event {
160 private final EventType type;
161
162 private final NextFilter nextFilter;
163
164 private final Object data;
165
166 Event(EventType type, NextFilter nextFilter, Object data) {
167 this.type = type;
168 this.nextFilter = nextFilter;
169 this.data = data;
170 }
171
172 public Object getData() {
173 return data;
174 }
175
176 public NextFilter getNextFilter() {
177 return nextFilter;
178 }
179
180 public EventType getType() {
181 return type;
182 }
183 }
184
185 public void sessionCreated(NextFilter nextFilter, IoSession session) {
186 nextFilter.sessionCreated(session);
187 }
188
189 public void sessionOpened(NextFilter nextFilter, IoSession session) {
190 fireEvent(nextFilter, session, EventType.OPENED, null);
191 }
192
193 public void sessionClosed(NextFilter nextFilter, IoSession session) {
194 fireEvent(nextFilter, session, EventType.CLOSED, null);
195 }
196
197 public void sessionIdle(NextFilter nextFilter, IoSession session,
198 IdleStatus status) {
199 fireEvent(nextFilter, session, EventType.IDLE, status);
200 }
201
202 public void exceptionCaught(NextFilter nextFilter, IoSession session,
203 Throwable cause) {
204 fireEvent(nextFilter, session, EventType.EXCEPTION, cause);
205 }
206
207 public void messageReceived(NextFilter nextFilter, IoSession session,
208 Object message) {
209 fireEvent(nextFilter, session, EventType.RECEIVED, message);
210 }
211
212 public void messageSent(NextFilter nextFilter, IoSession session,
213 Object message) {
214 fireEvent(nextFilter, session, EventType.SENT, message);
215 }
216
217 protected void processEvent(NextFilter nextFilter, IoSession session,
218 EventType type, Object data) {
219 if (type == EventType.RECEIVED) {
220 nextFilter.messageReceived(session, data);
221 } else if (type == EventType.SENT) {
222 nextFilter.messageSent(session, data);
223 } else if (type == EventType.EXCEPTION) {
224 nextFilter.exceptionCaught(session, (Throwable) data);
225 } else if (type == EventType.IDLE) {
226 nextFilter.sessionIdle(session, (IdleStatus) data);
227 } else if (type == EventType.OPENED) {
228 nextFilter.sessionOpened(session);
229 } else if (type == EventType.CLOSED) {
230 nextFilter.sessionClosed(session);
231 }
232 }
233
234 public void filterWrite(NextFilter nextFilter, IoSession session,
235 WriteRequest writeRequest) {
236 nextFilter.filterWrite(session, writeRequest);
237 }
238
239 public void filterClose(NextFilter nextFilter, IoSession session)
240 throws Exception {
241 nextFilter.filterClose(session);
242 }
243
244 private class ProcessEventsRunnable implements Runnable {
245 private final SessionBuffer buffer;
246
247 ProcessEventsRunnable(SessionBuffer buffer) {
248 this.buffer = buffer;
249 }
250
251 public void run() {
252 while (true) {
253 Event event;
254
255 synchronized (buffer.eventQueue) {
256 event = buffer.eventQueue.poll();
257
258 if (event == null) {
259 buffer.processingCompleted = true;
260 break;
261 }
262 }
263
264 processEvent(event.getNextFilter(), buffer.session, event
265 .getType(), event.getData());
266 }
267
268 if (logger.isDebugEnabled()) {
269 logger.debug("Exiting since queue is empty for "
270 + buffer.session.getRemoteAddress());
271 }
272 }
273 }
274 }