1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.IllegalReferenceCountException;
27 import io.netty.util.internal.ObjectUtil;
28 import io.netty.util.internal.StringUtil;
29
30 import java.util.List;
31
32 import static io.netty.util.internal.ObjectUtil.checkPositive;
33 import static java.lang.Integer.MAX_VALUE;
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
77
78
79
80
81 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
82 @Override
83 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
84 if (cumulation == in) {
85
86 in.release();
87 return cumulation;
88 }
89 if (!cumulation.isReadable() && in.isContiguous()) {
90
91 cumulation.release();
92 return in;
93 }
94 try {
95 final int required = in.readableBytes();
96 if (required > cumulation.maxWritableBytes() ||
97 required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
98 cumulation.isReadOnly()) {
99
100
101
102
103 return expandCumulation(alloc, cumulation, in);
104 }
105 cumulation.writeBytes(in, in.readerIndex(), required);
106 in.readerIndex(in.writerIndex());
107 return cumulation;
108 } finally {
109
110
111 in.release();
112 }
113 }
114 };
115
116
117
118
119
120
121 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
122 @Override
123 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
124 if (cumulation == in) {
125
126 in.release();
127 return cumulation;
128 }
129 if (!cumulation.isReadable()) {
130 cumulation.release();
131 return in;
132 }
133 CompositeByteBuf composite = null;
134 try {
135 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
136 composite = (CompositeByteBuf) cumulation;
137
138
139 if (composite.writerIndex() != composite.capacity()) {
140 composite.capacity(composite.writerIndex());
141 }
142 } else {
143 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
144 }
145 composite.addFlattenedComponents(true, in);
146 in = null;
147 return composite;
148 } finally {
149 if (in != null) {
150
151 in.release();
152
153 if (composite != null && composite != cumulation) {
154 composite.release();
155 }
156 }
157 }
158 }
159 };
160
161 private static final byte STATE_INIT = 0;
162 private static final byte STATE_CALLING_CHILD_DECODE = 1;
163 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
164
165 ByteBuf cumulation;
166 private Cumulator cumulator = MERGE_CUMULATOR;
167 private boolean singleDecode;
168 private boolean first;
169
170
171
172
173
174 private boolean firedChannelRead;
175
176 private boolean selfFiredChannelRead;
177
178
179
180
181
182
183
184
185
186 private byte decodeState = STATE_INIT;
187 private int discardAfterReads = 16;
188 private int numReads;
189
190 protected ByteToMessageDecoder() {
191 ensureNotSharable();
192 }
193
194
195
196
197
198
199
200 public void setSingleDecode(boolean singleDecode) {
201 this.singleDecode = singleDecode;
202 }
203
204
205
206
207
208
209
210 public boolean isSingleDecode() {
211 return singleDecode;
212 }
213
214
215
216
217 public void setCumulator(Cumulator cumulator) {
218 this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
219 }
220
221
222
223
224
225 public void setDiscardAfterReads(int discardAfterReads) {
226 checkPositive(discardAfterReads, "discardAfterReads");
227 this.discardAfterReads = discardAfterReads;
228 }
229
230
231
232
233
234
235
236 protected int actualReadableBytes() {
237 return internalBuffer().readableBytes();
238 }
239
240
241
242
243
244
245 protected ByteBuf internalBuffer() {
246 if (cumulation != null) {
247 return cumulation;
248 } else {
249 return Unpooled.EMPTY_BUFFER;
250 }
251 }
252
253 @Override
254 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
255 if (decodeState == STATE_CALLING_CHILD_DECODE) {
256 decodeState = STATE_HANDLER_REMOVED_PENDING;
257 return;
258 }
259 ByteBuf buf = cumulation;
260 if (buf != null) {
261
262 cumulation = null;
263 numReads = 0;
264 int readable = buf.readableBytes();
265 if (readable > 0) {
266 ctx.fireChannelRead(buf);
267 ctx.fireChannelReadComplete();
268 } else {
269 buf.release();
270 }
271 }
272 handlerRemoved0(ctx);
273 }
274
275
276
277
278
279 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
280
281 @Override
282 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
283 if (msg instanceof ByteBuf) {
284 selfFiredChannelRead = true;
285 CodecOutputList out = CodecOutputList.newInstance();
286 try {
287 first = cumulation == null;
288 cumulation = cumulator.cumulate(ctx.alloc(),
289 first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
290 callDecode(ctx, cumulation, out);
291 } catch (DecoderException e) {
292 throw e;
293 } catch (Exception e) {
294 throw new DecoderException(e);
295 } finally {
296 try {
297 if (cumulation != null && !cumulation.isReadable()) {
298 numReads = 0;
299 try {
300 cumulation.release();
301 } catch (IllegalReferenceCountException e) {
302
303 throw new IllegalReferenceCountException(
304 getClass().getSimpleName() + "#decode() might have released its input buffer, " +
305 "or passed it down the pipeline without a retain() call, " +
306 "which is not allowed.", e);
307 }
308 cumulation = null;
309 } else if (++numReads >= discardAfterReads) {
310
311
312 numReads = 0;
313 discardSomeReadBytes();
314 }
315
316 int size = out.size();
317 firedChannelRead |= out.insertSinceRecycled();
318 fireChannelRead(ctx, out, size);
319 } finally {
320 out.recycle();
321 }
322 }
323 } else {
324 ctx.fireChannelRead(msg);
325 }
326 }
327
328
329
330
331 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
332 if (msgs instanceof CodecOutputList) {
333 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
334 } else {
335 for (int i = 0; i < numElements; i++) {
336 ctx.fireChannelRead(msgs.get(i));
337 }
338 }
339 }
340
341
342
343
344 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
345 for (int i = 0; i < numElements; i ++) {
346 ctx.fireChannelRead(msgs.getUnsafe(i));
347 }
348 }
349
350 @Override
351 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
352 numReads = 0;
353 discardSomeReadBytes();
354 if (selfFiredChannelRead && !firedChannelRead && !ctx.channel().config().isAutoRead()) {
355 ctx.read();
356 }
357 firedChannelRead = false;
358 selfFiredChannelRead = false;
359 ctx.fireChannelReadComplete();
360 }
361
362 protected final void discardSomeReadBytes() {
363 if (cumulation != null && !first && cumulation.refCnt() == 1) {
364
365
366
367
368
369
370
371 cumulation.discardSomeReadBytes();
372 }
373 }
374
375 @Override
376 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
377 channelInputClosed(ctx, true);
378 }
379
380 @Override
381 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
382 if (evt instanceof ChannelInputShutdownEvent) {
383
384
385
386 channelInputClosed(ctx, false);
387 }
388 super.userEventTriggered(ctx, evt);
389 }
390
391 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
392 CodecOutputList out = CodecOutputList.newInstance();
393 try {
394 channelInputClosed(ctx, out);
395 } catch (DecoderException e) {
396 throw e;
397 } catch (Exception e) {
398 throw new DecoderException(e);
399 } finally {
400 try {
401 if (cumulation != null) {
402 cumulation.release();
403 cumulation = null;
404 }
405 int size = out.size();
406 fireChannelRead(ctx, out, size);
407 if (size > 0) {
408
409 ctx.fireChannelReadComplete();
410 }
411 if (callChannelInactive) {
412 ctx.fireChannelInactive();
413 }
414 } finally {
415
416 out.recycle();
417 }
418 }
419 }
420
421
422
423
424
425 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
426 if (cumulation != null) {
427 callDecode(ctx, cumulation, out);
428
429
430 if (!ctx.isRemoved()) {
431
432
433 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
434 decodeLast(ctx, buffer, out);
435 }
436 } else {
437 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
438 }
439 }
440
441
442
443
444
445
446
447
448
449 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
450 try {
451 while (in.isReadable()) {
452 final int outSize = out.size();
453
454 if (outSize > 0) {
455 fireChannelRead(ctx, out, outSize);
456 out.clear();
457
458
459
460
461
462
463 if (ctx.isRemoved()) {
464 break;
465 }
466 }
467
468 int oldInputLength = in.readableBytes();
469 decodeRemovalReentryProtection(ctx, in, out);
470
471
472
473
474
475 if (ctx.isRemoved()) {
476 break;
477 }
478
479 if (out.isEmpty()) {
480 if (oldInputLength == in.readableBytes()) {
481 break;
482 } else {
483 continue;
484 }
485 }
486
487 if (oldInputLength == in.readableBytes()) {
488 throw new DecoderException(
489 StringUtil.simpleClassName(getClass()) +
490 ".decode() did not read anything but decoded a message.");
491 }
492
493 if (isSingleDecode()) {
494 break;
495 }
496 }
497 } catch (DecoderException e) {
498 throw e;
499 } catch (Exception cause) {
500 throw new DecoderException(cause);
501 }
502 }
503
504
505
506
507
508
509
510
511
512
513
514 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
515
516
517
518
519
520
521
522
523
524
525
526 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
527 throws Exception {
528 decodeState = STATE_CALLING_CHILD_DECODE;
529 try {
530 decode(ctx, in, out);
531 } finally {
532 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
533 decodeState = STATE_INIT;
534 if (removePending) {
535 fireChannelRead(ctx, out, out.size());
536 out.clear();
537 handlerRemoved(ctx);
538 }
539 }
540 }
541
542
543
544
545
546
547
548
549 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
550 if (in.isReadable()) {
551
552
553 decodeRemovalReentryProtection(ctx, in, out);
554 }
555 }
556
557 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
558 int oldBytes = oldCumulation.readableBytes();
559 int newBytes = in.readableBytes();
560 int totalBytes = oldBytes + newBytes;
561 ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
562 ByteBuf toRelease = newCumulation;
563 try {
564
565 newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
566 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
567 .writerIndex(totalBytes);
568 in.readerIndex(in.writerIndex());
569 toRelease = oldCumulation;
570 return newCumulation;
571 } finally {
572 toRelease.release();
573 }
574 }
575
576
577
578
579 public interface Cumulator {
580
581
582
583
584
585 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
586 }
587 }