查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec;
17  
18  import io.netty.channel.ChannelHandlerContext;
19  import io.netty.channel.ChannelOutboundHandler;
20  import io.netty.channel.ChannelOutboundHandlerAdapter;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.ReferenceCountUtil;
24  import io.netty.util.ReferenceCounted;
25  import io.netty.util.concurrent.PromiseCombiner;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.StringUtil;
28  import io.netty.util.internal.TypeParameterMatcher;
29  
30  import java.util.List;
31  
32  /**
33   * {@link ChannelOutboundHandlerAdapter} which encodes from one message to an other message
34   *
35   * For example here is an implementation which decodes an {@link Integer} to an {@link String}.
36   *
37   * <pre>
38   *     public class IntegerToStringEncoder extends
39   *             {@link MessageToMessageEncoder}&lt;{@link Integer}&gt; {
40   *
41   *         {@code @Override}
42   *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} message, List&lt;Object&gt; out)
43   *                 throws {@link Exception} {
44   *             out.add(message.toString());
45   *         }
46   *     }
47   * </pre>
48   *
49   * Be aware that you need to call {@link ReferenceCounted#retain()} on messages that are just passed through if they
50   * are of type {@link ReferenceCounted}. This is needed as the {@link MessageToMessageEncoder} will call
51   * {@link ReferenceCounted#release()} on encoded messages.
52   */
53  public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
54  
55      private final TypeParameterMatcher matcher;
56  
57      /**
58       * Create a new instance which will try to detect the types to match out of the type parameter of the class.
59       */
60      protected MessageToMessageEncoder() {
61          matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
62      }
63  
64      /**
65       * Create a new instance
66       *
67       * @param outboundMessageType   The type of messages to match and so encode
68       */
69      protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
70          matcher = TypeParameterMatcher.get(outboundMessageType);
71      }
72  
73      /**
74       * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
75       * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
76       */
77      public boolean acceptOutboundMessage(Object msg) throws Exception {
78          return matcher.match(msg);
79      }
80  
81      @Override
82      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
83          CodecOutputList out = null;
84          try {
85              if (acceptOutboundMessage(msg)) {
86                  out = CodecOutputList.newInstance();
87                  @SuppressWarnings("unchecked")
88                  I cast = (I) msg;
89                  try {
90                      encode(ctx, cast, out);
91                  } catch (Throwable th) {
92                      ReferenceCountUtil.safeRelease(cast);
93                      PlatformDependent.throwException(th);
94                  }
95                  ReferenceCountUtil.release(cast);
96  
97                  if (out.isEmpty()) {
98                      throw new EncoderException(
99                              StringUtil.simpleClassName(this) + " must produce at least one message.");
100                 }
101             } else {
102                 ctx.write(msg, promise);
103             }
104         } catch (EncoderException e) {
105             throw e;
106         } catch (Throwable t) {
107             throw new EncoderException(t);
108         } finally {
109             if (out != null) {
110                 try {
111                     final int sizeMinusOne = out.size() - 1;
112                     if (sizeMinusOne == 0) {
113                         ctx.write(out.getUnsafe(0), promise);
114                     } else if (sizeMinusOne > 0) {
115                         // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
116                         // See https://github.com/netty/netty/issues/2525
117                         if (promise == ctx.voidPromise()) {
118                             writeVoidPromise(ctx, out);
119                         } else {
120                             writePromiseCombiner(ctx, out, promise);
121                         }
122                     }
123                 } finally {
124                     out.recycle();
125                 }
126             }
127         }
128     }
129 
130     private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
131         final ChannelPromise voidPromise = ctx.voidPromise();
132         for (int i = 0; i < out.size(); i++) {
133             ctx.write(out.getUnsafe(i), voidPromise);
134         }
135     }
136 
137     private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
138         final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
139         for (int i = 0; i < out.size(); i++) {
140             combiner.add(ctx.write(out.getUnsafe(i)));
141         }
142         combiner.finish(promise);
143     }
144 
145     /**
146      * Encode from one message to an other. This method will be called for each written message that can be handled
147      * by this encoder.
148      *
149      * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
150      * @param msg           the message to encode to an other one
151      * @param out           the {@link List} into which the encoded msg should be added
152      *                      needs to do some kind of aggregation
153      * @throws Exception    is thrown if an error occurs
154      */
155     protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
156 }