1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package io.netty.handler.codec; 17 18 import io.netty.buffer.ByteBuf; 19 import io.netty.buffer.Unpooled; 20 import io.netty.channel.ChannelHandler; 21 import io.netty.channel.ChannelHandlerContext; 22 import io.netty.channel.ChannelPipeline; 23 import io.netty.util.Signal; 24 import io.netty.util.internal.StringUtil; 25 26 import java.util.List; 27 28 /** 29 * A specialized variation of {@link ByteToMessageDecoder} which enables implementation 30 * of a non-blocking decoder in the blocking I/O paradigm. 31 * <p> 32 * The biggest difference between {@link ReplayingDecoder} and 33 * {@link ByteToMessageDecoder} is that {@link ReplayingDecoder} allows you to 34 * implement the {@code decode()} and {@code decodeLast()} methods just like 35 * all required bytes were received already, rather than checking the 36 * availability of the required bytes. For example, the following 37 * {@link ByteToMessageDecoder} implementation: 38 * <pre> 39 * public class IntegerHeaderFrameDecoder extends {@link ByteToMessageDecoder} { 40 * 41 * {@code @Override} 42 * protected void decode({@link ChannelHandlerContext} ctx, 43 * {@link ByteBuf} buf, List<Object> out) throws Exception { 44 * 45 * if (buf.readableBytes() < 4) { 46 * return; 47 * } 48 * 49 * buf.markReaderIndex(); 50 * int length = buf.readInt(); 51 * 52 * if (buf.readableBytes() < length) { 53 * buf.resetReaderIndex(); 54 * return; 55 * } 56 * 57 * out.add(buf.readBytes(length)); 58 * } 59 * } 60 * </pre> 61 * is simplified like the following with {@link ReplayingDecoder}: 62 * <pre> 63 * public class IntegerHeaderFrameDecoder 64 * extends {@link ReplayingDecoder}<{@link Void}> { 65 * 66 * protected void decode({@link ChannelHandlerContext} ctx, 67 * {@link ByteBuf} buf, List<Object> out) throws Exception { 68 * 69 * out.add(buf.readBytes(buf.readInt())); 70 * } 71 * } 72 * </pre> 73 * 74 * <h3>How does this work?</h3> 75 * <p> 76 * {@link ReplayingDecoder} passes a specialized {@link ByteBuf} 77 * implementation which throws an {@link Error} of certain type when there's not 78 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above, 79 * you just assumed that there will be 4 or more bytes in the buffer when 80 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer, 81 * it will return the integer header as you expected. Otherwise, the 82 * {@link Error} will be raised and the control will be returned to 83 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the 84 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer 85 * back to the 'initial' position (i.e. the beginning of the buffer) and call 86 * the {@code decode(..)} method again when more data is received into the 87 * buffer. 88 * <p> 89 * Please note that {@link ReplayingDecoder} always throws the same cached 90 * {@link Error} instance to avoid the overhead of creating a new {@link Error} 91 * and filling its stack trace for every throw. 92 * 93 * <h3>Limitations</h3> 94 * <p> 95 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few 96 * limitations: 97 * <ul> 98 * <li>Some buffer operations are prohibited.</li> 99 * <li>Performance can be worse if the network is slow and the message 100 * format is complicated unlike the example above. In this case, your 101 * decoder might have to decode the same part of the message over and over 102 * again.</li> 103 * <li>You must keep in mind that {@code decode(..)} method can be called many 104 * times to decode a single message. For example, the following code will 105 * not work: 106 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> { 107 * 108 * private final Queue<Integer> values = new LinkedList<Integer>(); 109 * 110 * {@code @Override} 111 * public void decode(.., {@link ByteBuf} buf, List<Object> out) throws Exception { 112 * 113 * // A message contains 2 integers. 114 * values.offer(buf.readInt()); 115 * values.offer(buf.readInt()); 116 * 117 * // This assertion will fail intermittently since values.offer() 118 * // can be called more than two times! 119 * assert values.size() == 2; 120 * out.add(values.poll() + values.poll()); 121 * } 122 * }</pre> 123 * The correct implementation looks like the following, and you can also 124 * utilize the 'checkpoint' feature which is explained in detail in the 125 * next section. 126 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link Void}> { 127 * 128 * private final Queue<Integer> values = new LinkedList<Integer>(); 129 * 130 * {@code @Override} 131 * public void decode(.., {@link ByteBuf} buf, List<Object> out) throws Exception { 132 * 133 * // Revert the state of the variable that might have been changed 134 * // since the last partial decode. 135 * values.clear(); 136 * 137 * // A message contains 2 integers. 138 * values.offer(buf.readInt()); 139 * values.offer(buf.readInt()); 140 * 141 * // Now we know this assertion will never fail. 142 * assert values.size() == 2; 143 * out.add(values.poll() + values.poll()); 144 * } 145 * }</pre> 146 * </li> 147 * </ul> 148 * 149 * <h3>Improving the performance</h3> 150 * <p> 151 * Fortunately, the performance of a complex decoder implementation can be 152 * improved significantly with the {@code checkpoint()} method. The 153 * {@code checkpoint()} method updates the 'initial' position of the buffer so 154 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer 155 * to the last position where you called the {@code checkpoint()} method. 156 * 157 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4> 158 * <p> 159 * Although you can just use {@code checkpoint()} method and manage the state 160 * of the decoder by yourself, the easiest way to manage the state of the 161 * decoder is to create an {@link Enum} type which represents the current state 162 * of the decoder and to call {@code checkpoint(T)} method whenever the state 163 * changes. You can have as many states as you want depending on the 164 * complexity of the message you want to decode: 165 * 166 * <pre> 167 * public enum MyDecoderState { 168 * READ_LENGTH, 169 * READ_CONTENT; 170 * } 171 * 172 * public class IntegerHeaderFrameDecoder 173 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> { 174 * 175 * private int length; 176 * 177 * public IntegerHeaderFrameDecoder() { 178 * // Set the initial state. 179 * <strong>super(MyDecoderState.READ_LENGTH);</strong> 180 * } 181 * 182 * {@code @Override} 183 * protected void decode({@link ChannelHandlerContext} ctx, 184 * {@link ByteBuf} buf, List<Object> out) throws Exception { 185 * switch (state()) { 186 * case READ_LENGTH: 187 * length = buf.readInt(); 188 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong> 189 * case READ_CONTENT: 190 * ByteBuf frame = buf.readBytes(length); 191 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong> 192 * out.add(frame); 193 * break; 194 * default: 195 * throw new Error("Shouldn't reach here."); 196 * } 197 * } 198 * } 199 * </pre> 200 * 201 * <h4>Calling {@code checkpoint()} with no parameter</h4> 202 * <p> 203 * An alternative way to manage the decoder state is to manage it by yourself. 204 * <pre> 205 * public class IntegerHeaderFrameDecoder 206 * extends {@link ReplayingDecoder}<<strong>{@link Void}</strong>> { 207 * 208 * <strong>private boolean readLength;</strong> 209 * private int length; 210 * 211 * {@code @Override} 212 * protected void decode({@link ChannelHandlerContext} ctx, 213 * {@link ByteBuf} buf, List<Object> out) throws Exception { 214 * if (!readLength) { 215 * length = buf.readInt(); 216 * <strong>readLength = true;</strong> 217 * <strong>checkpoint();</strong> 218 * } 219 * 220 * if (readLength) { 221 * ByteBuf frame = buf.readBytes(length); 222 * <strong>readLength = false;</strong> 223 * <strong>checkpoint();</strong> 224 * out.add(frame); 225 * } 226 * } 227 * } 228 * </pre> 229 * 230 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 231 * <p> 232 * If you are going to write a protocol multiplexer, you will probably want to 233 * replace a {@link ReplayingDecoder} (protocol detector) with another 234 * {@link ReplayingDecoder}, {@link ByteToMessageDecoder} or {@link MessageToMessageDecoder} 235 * (actual protocol decoder). 236 * It is not possible to achieve this simply by calling 237 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 238 * some additional steps are required: 239 * <pre> 240 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link Void}> { 241 * 242 * {@code @Override} 243 * protected void decode({@link ChannelHandlerContext} ctx, 244 * {@link ByteBuf} buf, List<Object> out) { 245 * ... 246 * // Decode the first message 247 * Object firstMessage = ...; 248 * 249 * // Add the second decoder 250 * ctx.pipeline().addLast("second", new SecondDecoder()); 251 * 252 * if (buf.isReadable()) { 253 * // Hand off the remaining data to the second decoder 254 * out.add(firstMessage); 255 * out.add(buf.readBytes(<b>super.actualReadableBytes()</b>)); 256 * } else { 257 * // Nothing to hand off 258 * out.add(firstMessage); 259 * } 260 * // Remove the first decoder (me) 261 * ctx.pipeline().remove(this); 262 * } 263 * </pre> 264 * @param <S> 265 * the state type which is usually an {@link Enum}; use {@link Void} if state management is 266 * unused 267 */ 268 public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder { 269 270 static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY"); 271 272 private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf(); 273 private S state; 274 private int checkpoint = -1; 275 276 /** 277 * Creates a new instance with no initial state (i.e: {@code null}). 278 */ 279 protected ReplayingDecoder() { 280 this(null); 281 } 282 283 /** 284 * Creates a new instance with the specified initial state. 285 */ 286 protected ReplayingDecoder(S initialState) { 287 state = initialState; 288 } 289 290 /** 291 * Stores the internal cumulative buffer's reader position. 292 */ 293 protected void checkpoint() { 294 checkpoint = internalBuffer().readerIndex(); 295 } 296 297 /** 298 * Stores the internal cumulative buffer's reader position and updates 299 * the current decoder state. 300 */ 301 protected void checkpoint(S state) { 302 checkpoint(); 303 state(state); 304 } 305 306 /** 307 * Returns the current state of this decoder. 308 * @return the current state of this decoder 309 */ 310 protected S state() { 311 return state; 312 } 313 314 /** 315 * Sets the current state of this decoder. 316 * @return the old state of this decoder 317 */ 318 protected S state(S newState) { 319 S oldState = state; 320 state = newState; 321 return oldState; 322 } 323 324 @Override 325 final void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception { 326 try { 327 replayable.terminate(); 328 if (cumulation != null) { 329 callDecode(ctx, internalBuffer(), out); 330 } else { 331 replayable.setCumulation(Unpooled.EMPTY_BUFFER); 332 } 333 decodeLast(ctx, replayable, out); 334 } catch (Signal replay) { 335 // Ignore 336 replay.expect(REPLAY); 337 } 338 } 339 340 @Override 341 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 342 replayable.setCumulation(in); 343 try { 344 while (in.isReadable()) { 345 int oldReaderIndex = checkpoint = in.readerIndex(); 346 int outSize = out.size(); 347 348 if (outSize > 0) { 349 fireChannelRead(ctx, out, outSize); 350 out.clear(); 351 352 // Check if this handler was removed before continuing with decoding. 353 // If it was removed, it is not safe to continue to operate on the buffer. 354 // 355 // See: 356 // - https://github.com/netty/netty/issues/4635 357 if (ctx.isRemoved()) { 358 break; 359 } 360 outSize = 0; 361 } 362 363 S oldState = state; 364 int oldInputLength = in.readableBytes(); 365 try { 366 decodeRemovalReentryProtection(ctx, replayable, out); 367 368 // Check if this handler was removed before continuing the loop. 369 // If it was removed, it is not safe to continue to operate on the buffer. 370 // 371 // See https://github.com/netty/netty/issues/1664 372 if (ctx.isRemoved()) { 373 break; 374 } 375 376 if (outSize == out.size()) { 377 if (oldInputLength == in.readableBytes() && oldState == state) { 378 throw new DecoderException( 379 StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " + 380 "data or change its state if it did not decode anything."); 381 } else { 382 // Previous data has been discarded or caused state transition. 383 // Probably it is reading on. 384 continue; 385 } 386 } 387 } catch (Signal replay) { 388 replay.expect(REPLAY); 389 390 // Check if this handler was removed before continuing the loop. 391 // If it was removed, it is not safe to continue to operate on the buffer. 392 // 393 // See https://github.com/netty/netty/issues/1664 394 if (ctx.isRemoved()) { 395 break; 396 } 397 398 // Return to the checkpoint (or oldPosition) and retry. 399 int checkpoint = this.checkpoint; 400 if (checkpoint >= 0) { 401 in.readerIndex(checkpoint); 402 } else { 403 // Called by cleanup() - no need to maintain the readerIndex 404 // anymore because the buffer has been released already. 405 } 406 break; 407 } 408 409 if (oldReaderIndex == in.readerIndex() && oldState == state) { 410 throw new DecoderException( 411 StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " + 412 "or change its state if it decoded something."); 413 } 414 if (isSingleDecode()) { 415 break; 416 } 417 } 418 } catch (DecoderException e) { 419 throw e; 420 } catch (Exception cause) { 421 throw new DecoderException(cause); 422 } 423 } 424 }