1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.file.FileRegion;
27 import org.apache.mina.core.filterchain.IoFilter;
28 import org.apache.mina.core.filterchain.IoFilterAdapter;
29 import org.apache.mina.core.filterchain.IoFilterChain;
30 import org.apache.mina.core.future.DefaultWriteFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.session.AttributeKey;
33 import org.apache.mina.core.session.IoSession;
34 import org.apache.mina.core.write.DefaultWriteRequest;
35 import org.apache.mina.core.write.NothingWrittenException;
36 import org.apache.mina.core.write.WriteRequest;
37 import org.apache.mina.core.write.WriteRequestWrapper;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47
48
49 public class ProtocolCodecFilter extends IoFilterAdapter {
50
51 private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52
53 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54
55 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56
57 private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58
59 private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60
61 private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62
63 private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64
65
66 private final ProtocolCodecFactory factory;
67
68
69
70
71
72
73
74 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
75 if (factory == null) {
76 throw new IllegalArgumentException("factory");
77 }
78
79 this.factory = factory;
80 }
81
82
83
84
85
86
87
88
89
90 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
91 if (encoder == null) {
92 throw new IllegalArgumentException("encoder");
93 }
94 if (decoder == null) {
95 throw new IllegalArgumentException("decoder");
96 }
97
98
99 this.factory = new ProtocolCodecFactory() {
100 public ProtocolEncoder getEncoder(IoSession session) {
101 return encoder;
102 }
103
104 public ProtocolDecoder getDecoder(IoSession session) {
105 return decoder;
106 }
107 };
108 }
109
110
111
112
113
114
115
116
117
118
119 public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
120 final Class<? extends ProtocolDecoder> decoderClass) {
121 if (encoderClass == null) {
122 throw new IllegalArgumentException("encoderClass");
123 }
124 if (decoderClass == null) {
125 throw new IllegalArgumentException("decoderClass");
126 }
127 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
128 throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
129 }
130 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
131 throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
132 }
133 try {
134 encoderClass.getConstructor(EMPTY_PARAMS);
135 } catch (NoSuchMethodException e) {
136 throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
137 }
138 try {
139 decoderClass.getConstructor(EMPTY_PARAMS);
140 } catch (NoSuchMethodException e) {
141 throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
142 }
143
144 final ProtocolEncoder encoder;
145
146 try {
147 encoder = encoderClass.newInstance();
148 } catch (Exception e) {
149 throw new IllegalArgumentException("encoderClass cannot be initialized");
150 }
151
152 final ProtocolDecoder decoder;
153
154 try {
155 decoder = decoderClass.newInstance();
156 } catch (Exception e) {
157 throw new IllegalArgumentException("decoderClass cannot be initialized");
158 }
159
160
161 this.factory = new ProtocolCodecFactory() {
162 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
163 return encoder;
164 }
165
166 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
167 return decoder;
168 }
169 };
170 }
171
172
173
174
175
176
177
178 public ProtocolEncoder getEncoder(IoSession session) {
179 return (ProtocolEncoder) session.getAttribute(ENCODER);
180 }
181
182 @Override
183 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
184 if (parent.contains(this)) {
185 throw new IllegalArgumentException(
186 "You can't add the same filter instance more than once. Create another instance and add it.");
187 }
188 }
189
190 @Override
191 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
192
193 disposeCodec(parent.getSession());
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208 @Override
209 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
210 LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
211
212 if (!(message instanceof IoBuffer)) {
213 nextFilter.messageReceived(session, message);
214 return;
215 }
216
217 IoBuffer in = (IoBuffer) message;
218 ProtocolDecoder decoder = factory.getDecoder(session);
219 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
220
221
222
223
224
225 while (in.hasRemaining()) {
226 int oldPos = in.position();
227 try {
228 synchronized (session) {
229
230 decoder.decode(session, in, decoderOut);
231 }
232
233 decoderOut.flush(nextFilter, session);
234 } catch (Exception e) {
235 ProtocolDecoderException pde;
236 if (e instanceof ProtocolDecoderException) {
237 pde = (ProtocolDecoderException) e;
238 } else {
239 pde = new ProtocolDecoderException(e);
240 }
241 if (pde.getHexdump() == null) {
242
243 int curPos = in.position();
244 in.position(oldPos);
245 pde.setHexdump(in.getHexDump());
246 in.position(curPos);
247 }
248
249 decoderOut.flush(nextFilter, session);
250 nextFilter.exceptionCaught(session, pde);
251
252
253
254
255 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
256 break;
257 }
258 }
259 }
260 }
261
262 @Override
263 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
264 if (writeRequest instanceof EncodedWriteRequest) {
265 return;
266 }
267
268 if (writeRequest instanceof MessageWriteRequest) {
269 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
270 nextFilter.messageSent(session, wrappedRequest.getParentRequest());
271 } else {
272 nextFilter.messageSent(session, writeRequest);
273 }
274 }
275
276 @Override
277 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
278 Object message = writeRequest.getMessage();
279
280
281
282 if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
283 nextFilter.filterWrite(session, writeRequest);
284 return;
285 }
286
287
288 ProtocolEncoder encoder = factory.getEncoder(session);
289
290 ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
291
292 if (encoder == null) {
293 throw new ProtocolEncoderException("The encoder is null for the session " + session);
294 }
295
296 try {
297
298 encoder.encode(session, message, encoderOut);
299
300
301 Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
302
303
304 while (!bufferQueue.isEmpty()) {
305 Object encodedMessage = bufferQueue.poll();
306
307 if (encodedMessage == null) {
308 break;
309 }
310
311
312 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
313 SocketAddress destination = writeRequest.getDestination();
314 WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
315
316 nextFilter.filterWrite(session, encodedWriteRequest);
317 }
318 }
319
320
321 nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
322 } catch (Exception e) {
323 ProtocolEncoderException pee;
324
325
326 if (e instanceof ProtocolEncoderException) {
327 pee = (ProtocolEncoderException) e;
328 } else {
329 pee = new ProtocolEncoderException(e);
330 }
331
332 throw pee;
333 }
334 }
335
336 @Override
337 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
338
339 ProtocolDecoder decoder = factory.getDecoder(session);
340 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
341
342 try {
343 decoder.finishDecode(session, decoderOut);
344 } catch (Exception e) {
345 ProtocolDecoderException pde;
346 if (e instanceof ProtocolDecoderException) {
347 pde = (ProtocolDecoderException) e;
348 } else {
349 pde = new ProtocolDecoderException(e);
350 }
351 throw pde;
352 } finally {
353
354 disposeCodec(session);
355 decoderOut.flush(nextFilter, session);
356 }
357
358
359 nextFilter.sessionClosed(session);
360 }
361
362 private static class EncodedWriteRequest extends DefaultWriteRequest {
363 public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
364 super(encodedMessage, future, destination);
365 }
366
367 public boolean isEncoded() {
368 return true;
369 }
370 }
371
372 private static class MessageWriteRequest extends WriteRequestWrapper {
373 public MessageWriteRequest(WriteRequest writeRequest) {
374 super(writeRequest);
375 }
376
377 @Override
378 public Object getMessage() {
379 return EMPTY_BUFFER;
380 }
381
382 @Override
383 public String toString() {
384 return "MessageWriteRequest, parent : " + super.toString();
385 }
386 }
387
388 private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
389 public ProtocolDecoderOutputImpl() {
390
391 }
392
393 public void flush(NextFilter nextFilter, IoSession session) {
394 Queue<Object> messageQueue = getMessageQueue();
395
396 while (!messageQueue.isEmpty()) {
397 nextFilter.messageReceived(session, messageQueue.poll());
398 }
399 }
400 }
401
402 private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
403 private final IoSession session;
404
405 private final NextFilter nextFilter;
406
407
408 private final SocketAddress destination;
409
410 public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
411 this.session = session;
412 this.nextFilter = nextFilter;
413
414
415 destination = writeRequest.getDestination();
416 }
417
418 public WriteFuture flush() {
419 Queue<Object> bufferQueue = getMessageQueue();
420 WriteFuture future = null;
421
422 while (!bufferQueue.isEmpty()) {
423 Object encodedMessage = bufferQueue.poll();
424
425 if (encodedMessage == null) {
426 break;
427 }
428
429
430 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
431 future = new DefaultWriteFuture(session);
432 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
433 }
434 }
435
436 if (future == null) {
437
438 WriteRequest writeRequest = new DefaultWriteRequest(
439 DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
440 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
441 }
442
443 return future;
444 }
445 }
446
447
448
449
450
451
452 private void disposeCodec(IoSession session) {
453
454
455 disposeEncoder(session);
456 disposeDecoder(session);
457
458
459 disposeDecoderOut(session);
460 }
461
462
463
464
465
466
467 private void disposeEncoder(IoSession session) {
468 ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
469 if (encoder == null) {
470 return;
471 }
472
473 try {
474 encoder.dispose(session);
475 } catch (Exception e) {
476 LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
477 }
478 }
479
480
481
482
483
484
485 private void disposeDecoder(IoSession session) {
486 ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
487 if (decoder == null) {
488 return;
489 }
490
491 try {
492 decoder.dispose(session);
493 } catch (Exception e) {
494 LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
495 }
496 }
497
498
499
500
501
502 private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
503 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
504
505 if (out == null) {
506
507 out = new ProtocolDecoderOutputImpl();
508 session.setAttribute(DECODER_OUT, out);
509 }
510
511 return out;
512 }
513
514 private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
515 ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
516
517 if (out == null) {
518
519 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
520 session.setAttribute(ENCODER_OUT, out);
521 }
522
523 return out;
524 }
525
526
527
528
529 private void disposeDecoderOut(IoSession session) {
530 session.removeAttribute(DECODER_OUT);
531 }
532 }