查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.handler.codec.sctp;
18  
19  import io.netty.buffer.ByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandler;
23  import io.netty.channel.sctp.SctpMessage;
24  import io.netty.handler.codec.MessageToMessageDecoder;
25  import io.netty.util.collection.IntObjectHashMap;
26  import io.netty.util.collection.IntObjectMap;
27  
28  import java.util.List;
29  
30  /**
31   * {@link MessageToMessageDecoder} which will take care of handle fragmented {@link SctpMessage}s, so
32   * only <strong>complete</strong> {@link SctpMessage}s will be forwarded to the next
33   * {@link ChannelInboundHandler}.
34   */
35  public class SctpMessageCompletionHandler extends MessageToMessageDecoder<SctpMessage> {
36      private final IntObjectMap<ByteBuf> fragments = new IntObjectHashMap<ByteBuf>();
37  
38      @Override
39      protected void decode(ChannelHandlerContext ctx, SctpMessage msg, List<Object> out) throws Exception {
40          final ByteBuf byteBuf = msg.content();
41          final int protocolIdentifier = msg.protocolIdentifier();
42          final int streamIdentifier = msg.streamIdentifier();
43          final boolean isComplete = msg.isComplete();
44          final boolean isUnordered = msg.isUnordered();
45  
46          ByteBuf frag = fragments.remove(streamIdentifier);
47          if (frag == null) {
48              frag = Unpooled.EMPTY_BUFFER;
49          }
50  
51          if (isComplete && !frag.isReadable()) {
52              //data chunk is not fragmented
53              out.add(msg);
54          } else if (!isComplete && frag.isReadable()) {
55              //more message to complete
56              fragments.put(streamIdentifier, Unpooled.wrappedBuffer(frag, byteBuf));
57          } else if (isComplete && frag.isReadable()) {
58              //last message to complete
59              SctpMessage assembledMsg = new SctpMessage(
60                      protocolIdentifier,
61                      streamIdentifier,
62                      isUnordered,
63                      Unpooled.wrappedBuffer(frag, byteBuf));
64              out.add(assembledMsg);
65          } else {
66              //first incomplete message
67              fragments.put(streamIdentifier, byteBuf);
68          }
69          byteBuf.retain();
70      }
71  
72      @Override
73      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
74          for (ByteBuf buffer: fragments.values()) {
75              buffer.release();
76          }
77          fragments.clear();
78          super.handlerRemoved(ctx);
79      }
80  }