1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.ThrowableUtil;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.net.SocketAddress;
28
29
30
31
32 public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
33 extends ChannelDuplexHandler {
34
35 private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
36
37 private DelegatingChannelHandlerContext inboundCtx;
38 private DelegatingChannelHandlerContext outboundCtx;
39 private volatile boolean handlerAdded;
40
41 private I inboundHandler;
42 private O outboundHandler;
43
44
45
46
47
48
49 protected CombinedChannelDuplexHandler() {
50 ensureNotSharable();
51 }
52
53
54
55
56 public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
57 ensureNotSharable();
58 init(inboundHandler, outboundHandler);
59 }
60
61
62
63
64
65
66
67
68
69 protected final void init(I inboundHandler, O outboundHandler) {
70 validate(inboundHandler, outboundHandler);
71 this.inboundHandler = inboundHandler;
72 this.outboundHandler = outboundHandler;
73 }
74
75 private void validate(I inboundHandler, O outboundHandler) {
76 if (this.inboundHandler != null) {
77 throw new IllegalStateException(
78 "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
79 " was constructed with non-default constructor.");
80 }
81
82 ObjectUtil.checkNotNull(inboundHandler, "inboundHandler");
83 ObjectUtil.checkNotNull(outboundHandler, "outboundHandler");
84
85 if (inboundHandler instanceof ChannelOutboundHandler) {
86 throw new IllegalArgumentException(
87 "inboundHandler must not implement " +
88 ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
89 }
90 if (outboundHandler instanceof ChannelInboundHandler) {
91 throw new IllegalArgumentException(
92 "outboundHandler must not implement " +
93 ChannelInboundHandler.class.getSimpleName() + " to get combined.");
94 }
95 }
96
97 protected final I inboundHandler() {
98 return inboundHandler;
99 }
100
101 protected final O outboundHandler() {
102 return outboundHandler;
103 }
104
105 private void checkAdded() {
106 if (!handlerAdded) {
107 throw new IllegalStateException("handler not added to pipeline yet");
108 }
109 }
110
111
112
113
114 public final void removeInboundHandler() {
115 checkAdded();
116 inboundCtx.remove();
117 }
118
119
120
121
122 public final void removeOutboundHandler() {
123 checkAdded();
124 outboundCtx.remove();
125 }
126
127 @Override
128 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
129 if (inboundHandler == null) {
130 throw new IllegalStateException(
131 "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
132 " if " + CombinedChannelDuplexHandler.class.getSimpleName() +
133 " was constructed with the default constructor.");
134 }
135
136 outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
137 inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
138 @SuppressWarnings("deprecation")
139 @Override
140 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
141 if (!outboundCtx.removed) {
142 try {
143
144
145 outboundHandler.exceptionCaught(outboundCtx, cause);
146 } catch (Throwable error) {
147 if (logger.isDebugEnabled()) {
148 logger.debug(
149 "An exception {}" +
150 "was thrown by a user handler's exceptionCaught() " +
151 "method while handling the following exception:",
152 ThrowableUtil.stackTraceToString(error), cause);
153 } else if (logger.isWarnEnabled()) {
154 logger.warn(
155 "An exception '{}' [enable DEBUG level for full stacktrace] " +
156 "was thrown by a user handler's exceptionCaught() " +
157 "method while handling the following exception:", error, cause);
158 }
159 }
160 } else {
161 super.fireExceptionCaught(cause);
162 }
163 return this;
164 }
165 };
166
167
168
169 handlerAdded = true;
170
171 try {
172 inboundHandler.handlerAdded(inboundCtx);
173 } finally {
174 outboundHandler.handlerAdded(outboundCtx);
175 }
176 }
177
178 @Override
179 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
180 try {
181 inboundCtx.remove();
182 } finally {
183 outboundCtx.remove();
184 }
185 }
186
187 @Override
188 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
189 assert ctx == inboundCtx.ctx;
190 if (!inboundCtx.removed) {
191 inboundHandler.channelRegistered(inboundCtx);
192 } else {
193 inboundCtx.fireChannelRegistered();
194 }
195 }
196
197 @Override
198 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
199 assert ctx == inboundCtx.ctx;
200 if (!inboundCtx.removed) {
201 inboundHandler.channelUnregistered(inboundCtx);
202 } else {
203 inboundCtx.fireChannelUnregistered();
204 }
205 }
206
207 @Override
208 public void channelActive(ChannelHandlerContext ctx) throws Exception {
209 assert ctx == inboundCtx.ctx;
210 if (!inboundCtx.removed) {
211 inboundHandler.channelActive(inboundCtx);
212 } else {
213 inboundCtx.fireChannelActive();
214 }
215 }
216
217 @Override
218 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
219 assert ctx == inboundCtx.ctx;
220 if (!inboundCtx.removed) {
221 inboundHandler.channelInactive(inboundCtx);
222 } else {
223 inboundCtx.fireChannelInactive();
224 }
225 }
226
227 @Override
228 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
229 assert ctx == inboundCtx.ctx;
230 if (!inboundCtx.removed) {
231 inboundHandler.exceptionCaught(inboundCtx, cause);
232 } else {
233 inboundCtx.fireExceptionCaught(cause);
234 }
235 }
236
237 @Override
238 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
239 assert ctx == inboundCtx.ctx;
240 if (!inboundCtx.removed) {
241 inboundHandler.userEventTriggered(inboundCtx, evt);
242 } else {
243 inboundCtx.fireUserEventTriggered(evt);
244 }
245 }
246
247 @Override
248 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
249 assert ctx == inboundCtx.ctx;
250 if (!inboundCtx.removed) {
251 inboundHandler.channelRead(inboundCtx, msg);
252 } else {
253 inboundCtx.fireChannelRead(msg);
254 }
255 }
256
257 @Override
258 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
259 assert ctx == inboundCtx.ctx;
260 if (!inboundCtx.removed) {
261 inboundHandler.channelReadComplete(inboundCtx);
262 } else {
263 inboundCtx.fireChannelReadComplete();
264 }
265 }
266
267 @Override
268 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
269 assert ctx == inboundCtx.ctx;
270 if (!inboundCtx.removed) {
271 inboundHandler.channelWritabilityChanged(inboundCtx);
272 } else {
273 inboundCtx.fireChannelWritabilityChanged();
274 }
275 }
276
277 @Override
278 public void bind(
279 ChannelHandlerContext ctx,
280 SocketAddress localAddress, ChannelPromise promise) throws Exception {
281 assert ctx == outboundCtx.ctx;
282 if (!outboundCtx.removed) {
283 outboundHandler.bind(outboundCtx, localAddress, promise);
284 } else {
285 outboundCtx.bind(localAddress, promise);
286 }
287 }
288
289 @Override
290 public void connect(
291 ChannelHandlerContext ctx,
292 SocketAddress remoteAddress, SocketAddress localAddress,
293 ChannelPromise promise) throws Exception {
294 assert ctx == outboundCtx.ctx;
295 if (!outboundCtx.removed) {
296 outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
297 } else {
298 outboundCtx.connect(remoteAddress, localAddress, promise);
299 }
300 }
301
302 @Override
303 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
304 assert ctx == outboundCtx.ctx;
305 if (!outboundCtx.removed) {
306 outboundHandler.disconnect(outboundCtx, promise);
307 } else {
308 outboundCtx.disconnect(promise);
309 }
310 }
311
312 @Override
313 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
314 assert ctx == outboundCtx.ctx;
315 if (!outboundCtx.removed) {
316 outboundHandler.close(outboundCtx, promise);
317 } else {
318 outboundCtx.close(promise);
319 }
320 }
321
322 @Override
323 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
324 assert ctx == outboundCtx.ctx;
325 if (!outboundCtx.removed) {
326 outboundHandler.deregister(outboundCtx, promise);
327 } else {
328 outboundCtx.deregister(promise);
329 }
330 }
331
332 @Override
333 public void read(ChannelHandlerContext ctx) throws Exception {
334 assert ctx == outboundCtx.ctx;
335 if (!outboundCtx.removed) {
336 outboundHandler.read(outboundCtx);
337 } else {
338 outboundCtx.read();
339 }
340 }
341
342 @Override
343 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
344 assert ctx == outboundCtx.ctx;
345 if (!outboundCtx.removed) {
346 outboundHandler.write(outboundCtx, msg, promise);
347 } else {
348 outboundCtx.write(msg, promise);
349 }
350 }
351
352 @Override
353 public void flush(ChannelHandlerContext ctx) throws Exception {
354 assert ctx == outboundCtx.ctx;
355 if (!outboundCtx.removed) {
356 outboundHandler.flush(outboundCtx);
357 } else {
358 outboundCtx.flush();
359 }
360 }
361
362 private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
363
364 private final ChannelHandlerContext ctx;
365 private final ChannelHandler handler;
366 boolean removed;
367
368 DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
369 this.ctx = ctx;
370 this.handler = handler;
371 }
372
373 @Override
374 public Channel channel() {
375 return ctx.channel();
376 }
377
378 @Override
379 public EventExecutor executor() {
380 return ctx.executor();
381 }
382
383 @Override
384 public String name() {
385 return ctx.name();
386 }
387
388 @Override
389 public ChannelHandler handler() {
390 return ctx.handler();
391 }
392
393 @Override
394 public boolean isRemoved() {
395 return removed || ctx.isRemoved();
396 }
397
398 @Override
399 public ChannelHandlerContext fireChannelRegistered() {
400 ctx.fireChannelRegistered();
401 return this;
402 }
403
404 @Override
405 public ChannelHandlerContext fireChannelUnregistered() {
406 ctx.fireChannelUnregistered();
407 return this;
408 }
409
410 @Override
411 public ChannelHandlerContext fireChannelActive() {
412 ctx.fireChannelActive();
413 return this;
414 }
415
416 @Override
417 public ChannelHandlerContext fireChannelInactive() {
418 ctx.fireChannelInactive();
419 return this;
420 }
421
422 @Override
423 public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
424 ctx.fireExceptionCaught(cause);
425 return this;
426 }
427
428 @Override
429 public ChannelHandlerContext fireUserEventTriggered(Object event) {
430 ctx.fireUserEventTriggered(event);
431 return this;
432 }
433
434 @Override
435 public ChannelHandlerContext fireChannelRead(Object msg) {
436 ctx.fireChannelRead(msg);
437 return this;
438 }
439
440 @Override
441 public ChannelHandlerContext fireChannelReadComplete() {
442 ctx.fireChannelReadComplete();
443 return this;
444 }
445
446 @Override
447 public ChannelHandlerContext fireChannelWritabilityChanged() {
448 ctx.fireChannelWritabilityChanged();
449 return this;
450 }
451
452 @Override
453 public ChannelFuture bind(SocketAddress localAddress) {
454 return ctx.bind(localAddress);
455 }
456
457 @Override
458 public ChannelFuture connect(SocketAddress remoteAddress) {
459 return ctx.connect(remoteAddress);
460 }
461
462 @Override
463 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
464 return ctx.connect(remoteAddress, localAddress);
465 }
466
467 @Override
468 public ChannelFuture disconnect() {
469 return ctx.disconnect();
470 }
471
472 @Override
473 public ChannelFuture close() {
474 return ctx.close();
475 }
476
477 @Override
478 public ChannelFuture deregister() {
479 return ctx.deregister();
480 }
481
482 @Override
483 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
484 return ctx.bind(localAddress, promise);
485 }
486
487 @Override
488 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
489 return ctx.connect(remoteAddress, promise);
490 }
491
492 @Override
493 public ChannelFuture connect(
494 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
495 return ctx.connect(remoteAddress, localAddress, promise);
496 }
497
498 @Override
499 public ChannelFuture disconnect(ChannelPromise promise) {
500 return ctx.disconnect(promise);
501 }
502
503 @Override
504 public ChannelFuture close(ChannelPromise promise) {
505 return ctx.close(promise);
506 }
507
508 @Override
509 public ChannelFuture deregister(ChannelPromise promise) {
510 return ctx.deregister(promise);
511 }
512
513 @Override
514 public ChannelHandlerContext read() {
515 ctx.read();
516 return this;
517 }
518
519 @Override
520 public ChannelFuture write(Object msg) {
521 return ctx.write(msg);
522 }
523
524 @Override
525 public ChannelFuture write(Object msg, ChannelPromise promise) {
526 return ctx.write(msg, promise);
527 }
528
529 @Override
530 public ChannelHandlerContext flush() {
531 ctx.flush();
532 return this;
533 }
534
535 @Override
536 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
537 return ctx.writeAndFlush(msg, promise);
538 }
539
540 @Override
541 public ChannelFuture writeAndFlush(Object msg) {
542 return ctx.writeAndFlush(msg);
543 }
544
545 @Override
546 public ChannelPipeline pipeline() {
547 return ctx.pipeline();
548 }
549
550 @Override
551 public ByteBufAllocator alloc() {
552 return ctx.alloc();
553 }
554
555 @Override
556 public ChannelPromise newPromise() {
557 return ctx.newPromise();
558 }
559
560 @Override
561 public ChannelProgressivePromise newProgressivePromise() {
562 return ctx.newProgressivePromise();
563 }
564
565 @Override
566 public ChannelFuture newSucceededFuture() {
567 return ctx.newSucceededFuture();
568 }
569
570 @Override
571 public ChannelFuture newFailedFuture(Throwable cause) {
572 return ctx.newFailedFuture(cause);
573 }
574
575 @Override
576 public ChannelPromise voidPromise() {
577 return ctx.voidPromise();
578 }
579
580 @Override
581 public <T> Attribute<T> attr(AttributeKey<T> key) {
582 return ctx.channel().attr(key);
583 }
584
585 @Override
586 public <T> boolean hasAttr(AttributeKey<T> key) {
587 return ctx.channel().hasAttr(key);
588 }
589
590 final void remove() {
591 EventExecutor executor = executor();
592 if (executor.inEventLoop()) {
593 remove0();
594 } else {
595 executor.execute(new Runnable() {
596 @Override
597 public void run() {
598 remove0();
599 }
600 });
601 }
602 }
603
604 private void remove0() {
605 if (!removed) {
606 removed = true;
607 try {
608 handler.handlerRemoved(this);
609 } catch (Throwable cause) {
610 fireExceptionCaught(new ChannelPipelineException(
611 handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
612 }
613 }
614 }
615 }
616 }