1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.ChannelOutboundHandler;
20 import io.netty.channel.ChannelOutboundHandlerAdapter;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.util.ReferenceCountUtil;
24 import io.netty.util.ReferenceCounted;
25 import io.netty.util.concurrent.PromiseCombiner;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.StringUtil;
28 import io.netty.util.internal.TypeParameterMatcher;
29
30 import java.util.List;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
54
55 private final TypeParameterMatcher matcher;
56
57
58
59
60 protected MessageToMessageEncoder() {
61 matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
62 }
63
64
65
66
67
68
69 protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
70 matcher = TypeParameterMatcher.get(outboundMessageType);
71 }
72
73
74
75
76
77 public boolean acceptOutboundMessage(Object msg) throws Exception {
78 return matcher.match(msg);
79 }
80
81 @Override
82 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
83 CodecOutputList out = null;
84 try {
85 if (acceptOutboundMessage(msg)) {
86 out = CodecOutputList.newInstance();
87 @SuppressWarnings("unchecked")
88 I cast = (I) msg;
89 try {
90 encode(ctx, cast, out);
91 } catch (Throwable th) {
92 ReferenceCountUtil.safeRelease(cast);
93 PlatformDependent.throwException(th);
94 }
95 ReferenceCountUtil.release(cast);
96
97 if (out.isEmpty()) {
98 throw new EncoderException(
99 StringUtil.simpleClassName(this) + " must produce at least one message.");
100 }
101 } else {
102 ctx.write(msg, promise);
103 }
104 } catch (EncoderException e) {
105 throw e;
106 } catch (Throwable t) {
107 throw new EncoderException(t);
108 } finally {
109 if (out != null) {
110 try {
111 final int sizeMinusOne = out.size() - 1;
112 if (sizeMinusOne == 0) {
113 ctx.write(out.getUnsafe(0), promise);
114 } else if (sizeMinusOne > 0) {
115
116
117 if (promise == ctx.voidPromise()) {
118 writeVoidPromise(ctx, out);
119 } else {
120 writePromiseCombiner(ctx, out, promise);
121 }
122 }
123 } finally {
124 out.recycle();
125 }
126 }
127 }
128 }
129
130 private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
131 final ChannelPromise voidPromise = ctx.voidPromise();
132 for (int i = 0; i < out.size(); i++) {
133 ctx.write(out.getUnsafe(i), voidPromise);
134 }
135 }
136
137 private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
138 final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
139 for (int i = 0; i < out.size(); i++) {
140 combiner.add(ctx.write(out.getUnsafe(i)));
141 }
142 combiner.finish(promise);
143 }
144
145
146
147
148
149
150
151
152
153
154
155 protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
156 }