1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.CompositeByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelConfig;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.socket.ChannelInputShutdownEvent;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.StringUtil;
28
29 import java.util.List;
30
31 import static io.netty.util.internal.ObjectUtil.checkPositive;
32 import static java.lang.Integer.MAX_VALUE;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
76
77
78
79
80 public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
81 @Override
82 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
83 if (!cumulation.isReadable() && in.isContiguous()) {
84
85 cumulation.release();
86 return in;
87 }
88 try {
89 final int required = in.readableBytes();
90 if (required > cumulation.maxWritableBytes() ||
91 (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
92 cumulation.isReadOnly()) {
93
94
95
96
97 return expandCumulation(alloc, cumulation, in);
98 }
99 cumulation.writeBytes(in, in.readerIndex(), required);
100 in.readerIndex(in.writerIndex());
101 return cumulation;
102 } finally {
103
104
105 in.release();
106 }
107 }
108 };
109
110
111
112
113
114
115 public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
116 @Override
117 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
118 if (!cumulation.isReadable()) {
119 cumulation.release();
120 return in;
121 }
122 CompositeByteBuf composite = null;
123 try {
124 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
125 composite = (CompositeByteBuf) cumulation;
126
127
128 if (composite.writerIndex() != composite.capacity()) {
129 composite.capacity(composite.writerIndex());
130 }
131 } else {
132 composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
133 }
134 composite.addFlattenedComponents(true, in);
135 in = null;
136 return composite;
137 } finally {
138 if (in != null) {
139
140 in.release();
141
142 if (composite != null && composite != cumulation) {
143 composite.release();
144 }
145 }
146 }
147 }
148 };
149
150 private static final byte STATE_INIT = 0;
151 private static final byte STATE_CALLING_CHILD_DECODE = 1;
152 private static final byte STATE_HANDLER_REMOVED_PENDING = 2;
153
154 ByteBuf cumulation;
155 private Cumulator cumulator = MERGE_CUMULATOR;
156 private boolean singleDecode;
157 private boolean first;
158
159
160
161
162
163 private boolean firedChannelRead;
164
165
166
167
168
169
170
171
172
173 private byte decodeState = STATE_INIT;
174 private int discardAfterReads = 16;
175 private int numReads;
176
177 protected ByteToMessageDecoder() {
178 ensureNotSharable();
179 }
180
181
182
183
184
185
186
187 public void setSingleDecode(boolean singleDecode) {
188 this.singleDecode = singleDecode;
189 }
190
191
192
193
194
195
196
197 public boolean isSingleDecode() {
198 return singleDecode;
199 }
200
201
202
203
204 public void setCumulator(Cumulator cumulator) {
205 this.cumulator = ObjectUtil.checkNotNull(cumulator, "cumulator");
206 }
207
208
209
210
211
212 public void setDiscardAfterReads(int discardAfterReads) {
213 checkPositive(discardAfterReads, "discardAfterReads");
214 this.discardAfterReads = discardAfterReads;
215 }
216
217
218
219
220
221
222
223 protected int actualReadableBytes() {
224 return internalBuffer().readableBytes();
225 }
226
227
228
229
230
231
232 protected ByteBuf internalBuffer() {
233 if (cumulation != null) {
234 return cumulation;
235 } else {
236 return Unpooled.EMPTY_BUFFER;
237 }
238 }
239
240 @Override
241 public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
242 if (decodeState == STATE_CALLING_CHILD_DECODE) {
243 decodeState = STATE_HANDLER_REMOVED_PENDING;
244 return;
245 }
246 ByteBuf buf = cumulation;
247 if (buf != null) {
248
249 cumulation = null;
250 numReads = 0;
251 int readable = buf.readableBytes();
252 if (readable > 0) {
253 ctx.fireChannelRead(buf);
254 ctx.fireChannelReadComplete();
255 } else {
256 buf.release();
257 }
258 }
259 handlerRemoved0(ctx);
260 }
261
262
263
264
265
266 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { }
267
268 @Override
269 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
270 if (msg instanceof ByteBuf) {
271 CodecOutputList out = CodecOutputList.newInstance();
272 try {
273 first = cumulation == null;
274 cumulation = cumulator.cumulate(ctx.alloc(),
275 first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
276 callDecode(ctx, cumulation, out);
277 } catch (DecoderException e) {
278 throw e;
279 } catch (Exception e) {
280 throw new DecoderException(e);
281 } finally {
282 try {
283 if (cumulation != null && !cumulation.isReadable()) {
284 numReads = 0;
285 cumulation.release();
286 cumulation = null;
287 } else if (++numReads >= discardAfterReads) {
288
289
290 numReads = 0;
291 discardSomeReadBytes();
292 }
293
294 int size = out.size();
295 firedChannelRead |= out.insertSinceRecycled();
296 fireChannelRead(ctx, out, size);
297 } finally {
298 out.recycle();
299 }
300 }
301 } else {
302 ctx.fireChannelRead(msg);
303 }
304 }
305
306
307
308
309 static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
310 if (msgs instanceof CodecOutputList) {
311 fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
312 } else {
313 for (int i = 0; i < numElements; i++) {
314 ctx.fireChannelRead(msgs.get(i));
315 }
316 }
317 }
318
319
320
321
322 static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
323 for (int i = 0; i < numElements; i ++) {
324 ctx.fireChannelRead(msgs.getUnsafe(i));
325 }
326 }
327
328 @Override
329 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
330 numReads = 0;
331 discardSomeReadBytes();
332 if (!firedChannelRead && !ctx.channel().config().isAutoRead()) {
333 ctx.read();
334 }
335 firedChannelRead = false;
336 ctx.fireChannelReadComplete();
337 }
338
339 protected final void discardSomeReadBytes() {
340 if (cumulation != null && !first && cumulation.refCnt() == 1) {
341
342
343
344
345
346
347
348 cumulation.discardSomeReadBytes();
349 }
350 }
351
352 @Override
353 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
354 channelInputClosed(ctx, true);
355 }
356
357 @Override
358 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
359 if (evt instanceof ChannelInputShutdownEvent) {
360
361
362
363 channelInputClosed(ctx, false);
364 }
365 super.userEventTriggered(ctx, evt);
366 }
367
368 private void channelInputClosed(ChannelHandlerContext ctx, boolean callChannelInactive) {
369 CodecOutputList out = CodecOutputList.newInstance();
370 try {
371 channelInputClosed(ctx, out);
372 } catch (DecoderException e) {
373 throw e;
374 } catch (Exception e) {
375 throw new DecoderException(e);
376 } finally {
377 try {
378 if (cumulation != null) {
379 cumulation.release();
380 cumulation = null;
381 }
382 int size = out.size();
383 fireChannelRead(ctx, out, size);
384 if (size > 0) {
385
386 ctx.fireChannelReadComplete();
387 }
388 if (callChannelInactive) {
389 ctx.fireChannelInactive();
390 }
391 } finally {
392
393 out.recycle();
394 }
395 }
396 }
397
398
399
400
401
402 void channelInputClosed(ChannelHandlerContext ctx, List<Object> out) throws Exception {
403 if (cumulation != null) {
404 callDecode(ctx, cumulation, out);
405
406
407 if (!ctx.isRemoved()) {
408
409
410 ByteBuf buffer = cumulation == null ? Unpooled.EMPTY_BUFFER : cumulation;
411 decodeLast(ctx, buffer, out);
412 }
413 } else {
414 decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
415 }
416 }
417
418
419
420
421
422
423
424
425
426 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
427 try {
428 while (in.isReadable()) {
429 final int outSize = out.size();
430
431 if (outSize > 0) {
432 fireChannelRead(ctx, out, outSize);
433 out.clear();
434
435
436
437
438
439
440 if (ctx.isRemoved()) {
441 break;
442 }
443 }
444
445 int oldInputLength = in.readableBytes();
446 decodeRemovalReentryProtection(ctx, in, out);
447
448
449
450
451
452 if (ctx.isRemoved()) {
453 break;
454 }
455
456 if (out.isEmpty()) {
457 if (oldInputLength == in.readableBytes()) {
458 break;
459 } else {
460 continue;
461 }
462 }
463
464 if (oldInputLength == in.readableBytes()) {
465 throw new DecoderException(
466 StringUtil.simpleClassName(getClass()) +
467 ".decode() did not read anything but decoded a message.");
468 }
469
470 if (isSingleDecode()) {
471 break;
472 }
473 }
474 } catch (DecoderException e) {
475 throw e;
476 } catch (Exception cause) {
477 throw new DecoderException(cause);
478 }
479 }
480
481
482
483
484
485
486
487
488
489
490
491 protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
492
493
494
495
496
497
498
499
500
501
502
503 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
504 throws Exception {
505 decodeState = STATE_CALLING_CHILD_DECODE;
506 try {
507 decode(ctx, in, out);
508 } finally {
509 boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
510 decodeState = STATE_INIT;
511 if (removePending) {
512 fireChannelRead(ctx, out, out.size());
513 out.clear();
514 handlerRemoved(ctx);
515 }
516 }
517 }
518
519
520
521
522
523
524
525
526 protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
527 if (in.isReadable()) {
528
529
530 decodeRemovalReentryProtection(ctx, in, out);
531 }
532 }
533
534 static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
535 int oldBytes = oldCumulation.readableBytes();
536 int newBytes = in.readableBytes();
537 int totalBytes = oldBytes + newBytes;
538 ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
539 ByteBuf toRelease = newCumulation;
540 try {
541
542 newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
543 .setBytes(oldBytes, in, in.readerIndex(), newBytes)
544 .writerIndex(totalBytes);
545 in.readerIndex(in.writerIndex());
546 toRelease = oldCumulation;
547 return newCumulation;
548 } finally {
549 toRelease.release();
550 }
551 }
552
553
554
555
556 public interface Cumulator {
557
558
559
560
561
562 ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
563 }
564 }