查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.ThrowableUtil;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.net.SocketAddress;
28  
29  /**
30   *  Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}.
31   */
32  public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
33          extends ChannelDuplexHandler {
34  
35      private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class);
36  
37      private DelegatingChannelHandlerContext inboundCtx;
38      private DelegatingChannelHandlerContext outboundCtx;
39      private volatile boolean handlerAdded;
40  
41      private I inboundHandler;
42      private O outboundHandler;
43  
44      /**
45       * Creates a new uninitialized instance. A class that extends this handler must invoke
46       * {@link #init(ChannelInboundHandler, ChannelOutboundHandler)} before adding this handler into a
47       * {@link ChannelPipeline}.
48       */
49      protected CombinedChannelDuplexHandler() {
50          ensureNotSharable();
51      }
52  
53      /**
54       * Creates a new instance that combines the specified two handlers into one.
55       */
56      public CombinedChannelDuplexHandler(I inboundHandler, O outboundHandler) {
57          ensureNotSharable();
58          init(inboundHandler, outboundHandler);
59      }
60  
61      /**
62       * Initialized this handler with the specified handlers.
63       *
64       * @throws IllegalStateException if this handler was not constructed via the default constructor or
65       *                               if this handler does not implement all required handler interfaces
66       * @throws IllegalArgumentException if the specified handlers cannot be combined into one due to a conflict
67       *                                  in the type hierarchy
68       */
69      protected final void init(I inboundHandler, O outboundHandler) {
70          validate(inboundHandler, outboundHandler);
71          this.inboundHandler = inboundHandler;
72          this.outboundHandler = outboundHandler;
73      }
74  
75      private void validate(I inboundHandler, O outboundHandler) {
76          if (this.inboundHandler != null) {
77              throw new IllegalStateException(
78                      "init() can not be invoked if " + CombinedChannelDuplexHandler.class.getSimpleName() +
79                              " was constructed with non-default constructor.");
80          }
81  
82          ObjectUtil.checkNotNull(inboundHandler, "inboundHandler");
83          ObjectUtil.checkNotNull(outboundHandler, "outboundHandler");
84  
85          if (inboundHandler instanceof ChannelOutboundHandler) {
86              throw new IllegalArgumentException(
87                      "inboundHandler must not implement " +
88                      ChannelOutboundHandler.class.getSimpleName() + " to get combined.");
89          }
90          if (outboundHandler instanceof ChannelInboundHandler) {
91              throw new IllegalArgumentException(
92                      "outboundHandler must not implement " +
93                      ChannelInboundHandler.class.getSimpleName() + " to get combined.");
94          }
95      }
96  
97      protected final I inboundHandler() {
98          return inboundHandler;
99      }
100 
101     protected final O outboundHandler() {
102         return outboundHandler;
103     }
104 
105     private void checkAdded() {
106         if (!handlerAdded) {
107             throw new IllegalStateException("handler not added to pipeline yet");
108         }
109     }
110 
111     /**
112      * Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
113      */
114     public final void removeInboundHandler() {
115         checkAdded();
116         inboundCtx.remove();
117     }
118 
119     /**
120      * Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}.
121      */
122     public final void removeOutboundHandler() {
123         checkAdded();
124         outboundCtx.remove();
125     }
126 
127     @Override
128     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
129         if (inboundHandler == null) {
130             throw new IllegalStateException(
131                     "init() must be invoked before being added to a " + ChannelPipeline.class.getSimpleName() +
132                             " if " +  CombinedChannelDuplexHandler.class.getSimpleName() +
133                             " was constructed with the default constructor.");
134         }
135 
136         outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler);
137         inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) {
138             @SuppressWarnings("deprecation")
139             @Override
140             public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
141                 if (!outboundCtx.removed) {
142                     try {
143                         // We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...)
144                         // as well
145                         outboundHandler.exceptionCaught(outboundCtx, cause);
146                     } catch (Throwable error) {
147                         if (logger.isDebugEnabled()) {
148                             logger.debug(
149                                     "An exception {}" +
150                                     "was thrown by a user handler's exceptionCaught() " +
151                                     "method while handling the following exception:",
152                                     ThrowableUtil.stackTraceToString(error), cause);
153                         } else if (logger.isWarnEnabled()) {
154                             logger.warn(
155                                     "An exception '{}' [enable DEBUG level for full stacktrace] " +
156                                     "was thrown by a user handler's exceptionCaught() " +
157                                     "method while handling the following exception:", error, cause);
158                         }
159                     }
160                 } else {
161                     super.fireExceptionCaught(cause);
162                 }
163                 return this;
164             }
165         };
166 
167         // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and
168         // removeOutboundHandler().
169         handlerAdded = true;
170 
171         try {
172             inboundHandler.handlerAdded(inboundCtx);
173         } finally {
174             outboundHandler.handlerAdded(outboundCtx);
175         }
176     }
177 
178     @Override
179     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
180         try {
181             inboundCtx.remove();
182         } finally {
183             outboundCtx.remove();
184         }
185     }
186 
187     @Override
188     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
189         assert ctx == inboundCtx.ctx;
190         if (!inboundCtx.removed) {
191             inboundHandler.channelRegistered(inboundCtx);
192         } else {
193             inboundCtx.fireChannelRegistered();
194         }
195     }
196 
197     @Override
198     public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
199         assert ctx == inboundCtx.ctx;
200         if (!inboundCtx.removed) {
201             inboundHandler.channelUnregistered(inboundCtx);
202         } else {
203             inboundCtx.fireChannelUnregistered();
204         }
205     }
206 
207     @Override
208     public void channelActive(ChannelHandlerContext ctx) throws Exception {
209         assert ctx == inboundCtx.ctx;
210         if (!inboundCtx.removed) {
211             inboundHandler.channelActive(inboundCtx);
212         } else {
213             inboundCtx.fireChannelActive();
214         }
215     }
216 
217     @Override
218     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
219         assert ctx == inboundCtx.ctx;
220         if (!inboundCtx.removed) {
221             inboundHandler.channelInactive(inboundCtx);
222         } else {
223             inboundCtx.fireChannelInactive();
224         }
225     }
226 
227     @Override
228     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
229         assert ctx == inboundCtx.ctx;
230         if (!inboundCtx.removed) {
231             inboundHandler.exceptionCaught(inboundCtx, cause);
232         } else {
233             inboundCtx.fireExceptionCaught(cause);
234         }
235     }
236 
237     @Override
238     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
239         assert ctx == inboundCtx.ctx;
240         if (!inboundCtx.removed) {
241             inboundHandler.userEventTriggered(inboundCtx, evt);
242         } else {
243             inboundCtx.fireUserEventTriggered(evt);
244         }
245     }
246 
247     @Override
248     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
249         assert ctx == inboundCtx.ctx;
250         if (!inboundCtx.removed) {
251             inboundHandler.channelRead(inboundCtx, msg);
252         } else {
253             inboundCtx.fireChannelRead(msg);
254         }
255     }
256 
257     @Override
258     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
259         assert ctx == inboundCtx.ctx;
260         if (!inboundCtx.removed) {
261             inboundHandler.channelReadComplete(inboundCtx);
262         } else {
263             inboundCtx.fireChannelReadComplete();
264         }
265     }
266 
267     @Override
268     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
269         assert ctx == inboundCtx.ctx;
270         if (!inboundCtx.removed) {
271             inboundHandler.channelWritabilityChanged(inboundCtx);
272         } else {
273             inboundCtx.fireChannelWritabilityChanged();
274         }
275     }
276 
277     @Override
278     public void bind(
279             ChannelHandlerContext ctx,
280             SocketAddress localAddress, ChannelPromise promise) throws Exception {
281         assert ctx == outboundCtx.ctx;
282         if (!outboundCtx.removed) {
283             outboundHandler.bind(outboundCtx, localAddress, promise);
284         } else {
285             outboundCtx.bind(localAddress, promise);
286         }
287     }
288 
289     @Override
290     public void connect(
291             ChannelHandlerContext ctx,
292             SocketAddress remoteAddress, SocketAddress localAddress,
293             ChannelPromise promise) throws Exception {
294         assert ctx == outboundCtx.ctx;
295         if (!outboundCtx.removed) {
296             outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise);
297         } else {
298             outboundCtx.connect(remoteAddress, localAddress, promise);
299         }
300     }
301 
302     @Override
303     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
304         assert ctx == outboundCtx.ctx;
305         if (!outboundCtx.removed) {
306             outboundHandler.disconnect(outboundCtx, promise);
307         } else {
308             outboundCtx.disconnect(promise);
309         }
310     }
311 
312     @Override
313     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
314         assert ctx == outboundCtx.ctx;
315         if (!outboundCtx.removed) {
316             outboundHandler.close(outboundCtx, promise);
317         } else {
318             outboundCtx.close(promise);
319         }
320     }
321 
322     @Override
323     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
324         assert ctx == outboundCtx.ctx;
325         if (!outboundCtx.removed) {
326             outboundHandler.deregister(outboundCtx, promise);
327         } else {
328             outboundCtx.deregister(promise);
329         }
330     }
331 
332     @Override
333     public void read(ChannelHandlerContext ctx) throws Exception {
334         assert ctx == outboundCtx.ctx;
335         if (!outboundCtx.removed) {
336             outboundHandler.read(outboundCtx);
337         } else {
338             outboundCtx.read();
339         }
340     }
341 
342     @Override
343     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
344         assert ctx == outboundCtx.ctx;
345         if (!outboundCtx.removed) {
346             outboundHandler.write(outboundCtx, msg, promise);
347         } else {
348             outboundCtx.write(msg, promise);
349         }
350     }
351 
352     @Override
353     public void flush(ChannelHandlerContext ctx) throws Exception {
354         assert ctx == outboundCtx.ctx;
355         if (!outboundCtx.removed) {
356             outboundHandler.flush(outboundCtx);
357         } else {
358             outboundCtx.flush();
359         }
360     }
361 
362     private static class DelegatingChannelHandlerContext implements ChannelHandlerContext {
363 
364         private final ChannelHandlerContext ctx;
365         private final ChannelHandler handler;
366         boolean removed;
367 
368         DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) {
369             this.ctx = ctx;
370             this.handler = handler;
371         }
372 
373         @Override
374         public Channel channel() {
375             return ctx.channel();
376         }
377 
378         @Override
379         public EventExecutor executor() {
380             return ctx.executor();
381         }
382 
383         @Override
384         public String name() {
385             return ctx.name();
386         }
387 
388         @Override
389         public ChannelHandler handler() {
390             return ctx.handler();
391         }
392 
393         @Override
394         public boolean isRemoved() {
395             return removed || ctx.isRemoved();
396         }
397 
398         @Override
399         public ChannelHandlerContext fireChannelRegistered() {
400             ctx.fireChannelRegistered();
401             return this;
402         }
403 
404         @Override
405         public ChannelHandlerContext fireChannelUnregistered() {
406             ctx.fireChannelUnregistered();
407             return this;
408         }
409 
410         @Override
411         public ChannelHandlerContext fireChannelActive() {
412             ctx.fireChannelActive();
413             return this;
414         }
415 
416         @Override
417         public ChannelHandlerContext fireChannelInactive() {
418             ctx.fireChannelInactive();
419             return this;
420         }
421 
422         @Override
423         public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
424             ctx.fireExceptionCaught(cause);
425             return this;
426         }
427 
428         @Override
429         public ChannelHandlerContext fireUserEventTriggered(Object event) {
430             ctx.fireUserEventTriggered(event);
431             return this;
432         }
433 
434         @Override
435         public ChannelHandlerContext fireChannelRead(Object msg) {
436             ctx.fireChannelRead(msg);
437             return this;
438         }
439 
440         @Override
441         public ChannelHandlerContext fireChannelReadComplete() {
442             ctx.fireChannelReadComplete();
443             return this;
444         }
445 
446         @Override
447         public ChannelHandlerContext fireChannelWritabilityChanged() {
448             ctx.fireChannelWritabilityChanged();
449             return this;
450         }
451 
452         @Override
453         public ChannelFuture bind(SocketAddress localAddress) {
454             return ctx.bind(localAddress);
455         }
456 
457         @Override
458         public ChannelFuture connect(SocketAddress remoteAddress) {
459             return ctx.connect(remoteAddress);
460         }
461 
462         @Override
463         public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
464             return ctx.connect(remoteAddress, localAddress);
465         }
466 
467         @Override
468         public ChannelFuture disconnect() {
469             return ctx.disconnect();
470         }
471 
472         @Override
473         public ChannelFuture close() {
474             return ctx.close();
475         }
476 
477         @Override
478         public ChannelFuture deregister() {
479             return ctx.deregister();
480         }
481 
482         @Override
483         public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
484             return ctx.bind(localAddress, promise);
485         }
486 
487         @Override
488         public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
489             return ctx.connect(remoteAddress, promise);
490         }
491 
492         @Override
493         public ChannelFuture connect(
494                 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
495             return ctx.connect(remoteAddress, localAddress, promise);
496         }
497 
498         @Override
499         public ChannelFuture disconnect(ChannelPromise promise) {
500             return ctx.disconnect(promise);
501         }
502 
503         @Override
504         public ChannelFuture close(ChannelPromise promise) {
505             return ctx.close(promise);
506         }
507 
508         @Override
509         public ChannelFuture deregister(ChannelPromise promise) {
510             return ctx.deregister(promise);
511         }
512 
513         @Override
514         public ChannelHandlerContext read() {
515             ctx.read();
516             return this;
517         }
518 
519         @Override
520         public ChannelFuture write(Object msg) {
521             return ctx.write(msg);
522         }
523 
524         @Override
525         public ChannelFuture write(Object msg, ChannelPromise promise) {
526             return ctx.write(msg, promise);
527         }
528 
529         @Override
530         public ChannelHandlerContext flush() {
531             ctx.flush();
532             return this;
533         }
534 
535         @Override
536         public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
537             return ctx.writeAndFlush(msg, promise);
538         }
539 
540         @Override
541         public ChannelFuture writeAndFlush(Object msg) {
542             return ctx.writeAndFlush(msg);
543         }
544 
545         @Override
546         public ChannelPipeline pipeline() {
547             return ctx.pipeline();
548         }
549 
550         @Override
551         public ByteBufAllocator alloc() {
552             return ctx.alloc();
553         }
554 
555         @Override
556         public ChannelPromise newPromise() {
557             return ctx.newPromise();
558         }
559 
560         @Override
561         public ChannelProgressivePromise newProgressivePromise() {
562             return ctx.newProgressivePromise();
563         }
564 
565         @Override
566         public ChannelFuture newSucceededFuture() {
567             return ctx.newSucceededFuture();
568         }
569 
570         @Override
571         public ChannelFuture newFailedFuture(Throwable cause) {
572             return ctx.newFailedFuture(cause);
573         }
574 
575         @Override
576         public ChannelPromise voidPromise() {
577             return ctx.voidPromise();
578         }
579 
580         @Override
581         public <T> Attribute<T> attr(AttributeKey<T> key) {
582             return ctx.channel().attr(key);
583         }
584 
585         @Override
586         public <T> boolean hasAttr(AttributeKey<T> key) {
587             return ctx.channel().hasAttr(key);
588         }
589 
590         final void remove() {
591             EventExecutor executor = executor();
592             if (executor.inEventLoop()) {
593                 remove0();
594             } else {
595                 executor.execute(new Runnable() {
596                     @Override
597                     public void run() {
598                         remove0();
599                     }
600                 });
601             }
602         }
603 
604         private void remove0() {
605             if (!removed) {
606                 removed = true;
607                 try {
608                     handler.handlerRemoved(this);
609                 } catch (Throwable cause) {
610                     fireExceptionCaught(new ChannelPipelineException(
611                             handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause));
612                 }
613             }
614         }
615     }
616 }