1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.sctp;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelInboundHandler;
23 import io.netty.channel.sctp.SctpMessage;
24 import io.netty.handler.codec.MessageToMessageDecoder;
25 import io.netty.util.collection.IntObjectHashMap;
26 import io.netty.util.collection.IntObjectMap;
27
28 import java.util.List;
29
30
31
32
33
34
35 public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
36 private final IntObjectMap<ByteBuf> fragments = new IntObjectHashMap<ByteBuf>();
37
38 @Override
39 protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
40 final ByteBuf byteBuf = msg.content();
41 final int protocolIdentifier = msg.protocolIdentifier();
42 final int streamIdentifier = msg.streamIdentifier();
43 final boolean isComplete = msg.isComplete();
44 final boolean isUnordered = msg.isUnordered();
45
46 ByteBuf frag = fragments.remove(streamIdentifier);
47 if (frag == null) {
48 frag = Unpooled.EMPTY_BUFFER;
49 }
50
51 if (isComplete && !frag.isReadable()) {
52
53 out.add(msg);
54 } else if (!isComplete && frag.isReadable()) {
55
56 fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
57 } else if (isComplete && frag.isReadable()) {
58
59 SctpMessage assembledMsg = new SctpMessage(
60 protocolIdentifier,
61 streamIdentifier,
62 isUnordered,
63 Unpooled.wrappedBuffer(frag, byteBuf));
64 out.add(assembledMsg);
65 } else {
66
67 fragments.put(streamIdentifier, byteBuf);
68 }
69 byteBuf.retain();
70 }
71
72 @Override
73 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
74 for (ByteBuf buffer: fragments.values()) {
75 buffer.release();
76 }
77 fragments.clear();
78 super.handlerRemoved(ctx);
79 }
80 }