1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.util.Signal;
24 import io.netty.util.internal.StringUtil;
25
26 import java.util.List;
27
28 /**
29 * A specialized variation of {@link ByteToMessageDecoder} which enables implementation
30 * of a non-blocking decoder in the blocking I/O paradigm.
31 * <p>
32 * The biggest difference between {@link ReplayingDecoder} and
33 * {@link ByteToMessageDecoder} is that {@link ReplayingDecoder} allows you to
34 * implement the {@code decode()} and {@code decodeLast()} methods just like
35 * all required bytes were received already, rather than checking the
36 * availability of the required bytes. For example, the following
37 * {@link ByteToMessageDecoder} implementation:
38 * <pre>
39 * public class IntegerHeaderFrameDecoder extends {@link ByteToMessageDecoder} {
40 *
41 * {@code @Override}
42 * protected void decode({@link ChannelHandlerContext} ctx,
43 * {@link ByteBuf} buf, List<Object> out) throws Exception {
44 *
45 * if (buf.readableBytes() < 4) {
46 * return;
47 * }
48 *
49 * buf.markReaderIndex();
50 * int length = buf.readInt();
51 *
52 * if (buf.readableBytes() < length) {
53 * buf.resetReaderIndex();
54 * return;
55 * }
56 *
57 * out.add(buf.readBytes(length));
58 * }
59 * }
60 * </pre>
61 * is simplified like the following with {@link ReplayingDecoder}:
62 * <pre>
63 * public class IntegerHeaderFrameDecoder
64 * extends {@link ReplayingDecoder}<{@link Void}> {
65 *
66 * protected void decode({@link ChannelHandlerContext} ctx,
67 * {@link ByteBuf} buf) throws Exception {
68 *
69 * out.add(buf.readBytes(buf.readInt()));
70 * }
71 * }
72 * </pre>
73 *
74 * <h3>How does this work?</h3>
75 * <p>
76 * {@link ReplayingDecoder} passes a specialized {@link ByteBuf}
77 * implementation which throws an {@link Error} of certain type when there's not
78 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above,
79 * you just assumed that there will be 4 or more bytes in the buffer when
80 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer,
81 * it will return the integer header as you expected. Otherwise, the
82 * {@link Error} will be raised and the control will be returned to
83 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the
84 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
85 * back to the 'initial' position (i.e. the beginning of the buffer) and call
86 * the {@code decode(..)} method again when more data is received into the
87 * buffer.
88 * <p>
89 * Please note that {@link ReplayingDecoder} always throws the same cached
90 * {@link Error} instance to avoid the overhead of creating a new {@link Error}
91 * and filling its stack trace for every throw.
92 *
93 * <h3>Limitations</h3>
94 * <p>
95 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
96 * limitations:
97 * <ul>
98 * <li>Some buffer operations are prohibited.</li>
99 * <li>Performance can be worse if the network is slow and the message
100 * format is complicated unlike the example above. In this case, your
101 * decoder might have to decode the same part of the message over and over
102 * again.</li>
103 * <li>You must keep in mind that {@code decode(..)} method can be called many
104 * times to decode a single message. For example, the following code will
105 * not work:
106 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> {
107 *
108 * private final Queue<Integer> values = new LinkedList<Integer>();
109 *
110 * {@code @Override}
111 * public void decode(.., {@link ByteBuf} buf, List<Object> out) throws Exception {
112 *
113 * // A message contains 2 integers.
114 * values.offer(buf.readInt());
115 * values.offer(buf.readInt());
116 *
117 * // This assertion will fail intermittently since values.offer()
118 * // can be called more than two times!
119 * assert values.size() == 2;
120 * out.add(values.poll() + values.poll());
121 * }
122 * }</pre>
123 * The correct implementation looks like the following, and you can also
124 * utilize the 'checkpoint' feature which is explained in detail in the
125 * next section.
126 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> {
127 *
128 * private final Queue<Integer> values = new LinkedList<Integer>();
129 *
130 * {@code @Override}
131 * public void decode(.., {@link ByteBuf} buf, List<Object> out) throws Exception {
132 *
133 * // Revert the state of the variable that might have been changed
134 * // since the last partial decode.
135 * values.clear();
136 *
137 * // A message contains 2 integers.
138 * values.offer(buf.readInt());
139 * values.offer(buf.readInt());
140 *
141 * // Now we know this assertion will never fail.
142 * assert values.size() == 2;
143 * out.add(values.poll() + values.poll());
144 * }
145 * }</pre>
146 * </li>
147 * </ul>
148 *
149 * <h3>Improving the performance</h3>
150 * <p>
151 * Fortunately, the performance of a complex decoder implementation can be
152 * improved significantly with the {@code checkpoint()} method. The
153 * {@code checkpoint()} method updates the 'initial' position of the buffer so
154 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
155 * to the last position where you called the {@code checkpoint()} method.
156 *
157 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
158 * <p>
159 * Although you can just use {@code checkpoint()} method and manage the state
160 * of the decoder by yourself, the easiest way to manage the state of the
161 * decoder is to create an {@link Enum} type which represents the current state
162 * of the decoder and to call {@code checkpoint(T)} method whenever the state
163 * changes. You can have as many states as you want depending on the
164 * complexity of the message you want to decode:
165 *
166 * <pre>
167 * public enum MyDecoderState {
168 * READ_LENGTH,
169 * READ_CONTENT;
170 * }
171 *
172 * public class IntegerHeaderFrameDecoder
173 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> {
174 *
175 * private int length;
176 *
177 * public IntegerHeaderFrameDecoder() {
178 * // Set the initial state.
179 * <strong>super(MyDecoderState.READ_LENGTH);</strong>
180 * }
181 *
182 * {@code @Override}
183 * protected void decode({@link ChannelHandlerContext} ctx,
184 * {@link ByteBuf} buf, List<Object> out) throws Exception {
185 * switch (state()) {
186 * case READ_LENGTH:
187 * length = buf.readInt();
188 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
189 * case READ_CONTENT:
190 * ByteBuf frame = buf.readBytes(length);
191 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
192 * out.add(frame);
193 * break;
194 * default:
195 * throw new Error("Shouldn't reach here.");
196 * }
197 * }
198 * }
199 * </pre>
200 *
201 * <h4>Calling {@code checkpoint()} with no parameter</h4>
202 * <p>
203 * An alternative way to manage the decoder state is to manage it by yourself.
204 * <pre>
205 * public class IntegerHeaderFrameDecoder
206 * extends {@link ReplayingDecoder}<<strong>{@link Void}</strong>> {
207 *
208 * <strong>private boolean readLength;</strong>
209 * private int length;
210 *
211 * {@code @Override}
212 * protected void decode({@link ChannelHandlerContext} ctx,
213 * {@link ByteBuf} buf, List<Object> out) throws Exception {
214 * if (!readLength) {
215 * length = buf.readInt();
216 * <strong>readLength = true;</strong>
217 * <strong>checkpoint();</strong>
218 * }
219 *
220 * if (readLength) {
221 * ByteBuf frame = buf.readBytes(length);
222 * <strong>readLength = false;</strong>
223 * <strong>checkpoint();</strong>
224 * out.add(frame);
225 * }
226 * }
227 * }
228 * </pre>
229 *
230 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
231 * <p>
232 * If you are going to write a protocol multiplexer, you will probably want to
233 * replace a {@link ReplayingDecoder} (protocol detector) with another
234 * {@link ReplayingDecoder}, {@link ByteToMessageDecoder} or {@link MessageToMessageDecoder}
235 * (actual protocol decoder).
236 * It is not possible to achieve this simply by calling
237 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
238 * some additional steps are required:
239 * <pre>
240 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link Void}> {
241 *
242 * {@code @Override}
243 * protected void decode({@link ChannelHandlerContext} ctx,
244 * {@link ByteBuf} buf, List<Object> out) {
245 * ...
246 * // Decode the first message
247 * Object firstMessage = ...;
248 *
249 * // Add the second decoder
250 * ctx.pipeline().addLast("second", new SecondDecoder());
251 *
252 * if (buf.isReadable()) {
253 * // Hand off the remaining data to the second decoder
254 * out.add(firstMessage);
255 * out.add(buf.readBytes(<b>super.actualReadableBytes()</b>));
256 * } else {
257 * // Nothing to hand off
258 * out.add(firstMessage);
259 * }
260 * // Remove the first decoder (me)
261 * ctx.pipeline().remove(this);
262 * }
263 * </pre>
264 * @param <S>
265 * the state type which is usually an {@link Enum}; use {@link Void} if state management is
266 * unused
267 */
268 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
269
270 static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class.getName() + ".REPLAY");
271
272 private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();
273 private S state;
274 private int checkpoint = -1;
275
276 /**
277 * Creates a new instance with no initial state (i.e: {@code null}).
278 */
279 protected ReplayingDecoder() {
280 this(null);
281 }
282
283 /**
284 * Creates a new instance with the specified initial state.
285 */
286 protected ReplayingDecoder(S initialState) {
287 state = initialState;
288 }
289
290 /**
291 * Stores the internal cumulative buffer's reader position.
292 */
293 protected void checkpoint() {
294 checkpoint = internalBuffer().readerIndex();
295 }
296
297 /**
298 * Stores the internal cumulative buffer's reader position and updates
299 * the current decoder state.
300 */
301 protected void checkpoint(S state) {
302 checkpoint();
303 state(state);
304 }
305
306 /**
307 * Returns the current state of this decoder.
308 * @return the current state of this decoder
309 */
310 protected S state() {
311 return state;
312 }
313
314 /**
315 * Sets the current state of this decoder.
316 * @return the old state of this decoder
317 */
318 protected S state(S newState) {
319 S oldState = state;
320 state = newState;
321 return oldState;
322 }
323
324 @Override
325 final void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
326 try {
327 replayable.terminate();
328 if (cumulation != null) {
329 callDecode(ctx, internalBuffer(), out);
330 decodeLast(ctx, replayable, out);
331 } else {
332 replayable.setCumulation(Unpooled.EMPTY_BUFFER);
333 decodeLast(ctx, replayable, out);
334 }
335 } catch (Signal replay) {
336 // Ignore
337 replay.expect(REPLAY);
338 }
339 }
340
341 @Override
342 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
343 replayable.setCumulation(in);
344 try {
345 while (in.isReadable()) {
346 int oldReaderIndex = checkpoint = in.readerIndex();
347 int outSize = out.size();
348
349 if (outSize > 0) {
350 fireChannelRead(ctx, out, outSize);
351 out.clear();
352
353 // Check if this handler was removed before continuing with decoding.
354 // If it was removed, it is not safe to continue to operate on the buffer.
355 //
356 // See:
357 // - https://github.com/netty/netty/issues/4635
358 if (ctx.isRemoved()) {
359 break;
360 }
361 outSize = 0;
362 }
363
364 S oldState = state;
365 int oldInputLength = in.readableBytes();
366 try {
367 decodeRemovalReentryProtection(ctx, replayable, out);
368
369 // Check if this handler was removed before continuing the loop.
370 // If it was removed, it is not safe to continue to operate on the buffer.
371 //
372 // See https://github.com/netty/netty/issues/1664
373 if (ctx.isRemoved()) {
374 break;
375 }
376
377 if (outSize == out.size()) {
378 if (oldInputLength == in.readableBytes() && oldState == state) {
379 throw new DecoderException(
380 StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
381 "data or change its state if it did not decode anything.");
382 } else {
383 // Previous data has been discarded or caused state transition.
384 // Probably it is reading on.
385 continue;
386 }
387 }
388 } catch (Signal replay) {
389 replay.expect(REPLAY);
390
391 // Check if this handler was removed before continuing the loop.
392 // If it was removed, it is not safe to continue to operate on the buffer.
393 //
394 // See https://github.com/netty/netty/issues/1664
395 if (ctx.isRemoved()) {
396 break;
397 }
398
399 // Return to the checkpoint (or oldPosition) and retry.
400 int checkpoint = this.checkpoint;
401 if (checkpoint >= 0) {
402 in.readerIndex(checkpoint);
403 } else {
404 // Called by cleanup() - no need to maintain the readerIndex
405 // anymore because the buffer has been released already.
406 }
407 break;
408 }
409
410 if (oldReaderIndex == in.readerIndex() && oldState == state) {
411 throw new DecoderException(
412 StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
413 "or change its state if it decoded something.");
414 }
415 if (isSingleDecode()) {
416 break;
417 }
418 }
419 } catch (DecoderException e) {
420 throw e;
421 } catch (Exception cause) {
422 throw new DecoderException(cause);
423 }
424 }
425 }