查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  
22  import java.util.List;
23  import java.util.zip.Adler32;
24  import java.util.zip.Checksum;
25  
26  import static io.netty.handler.codec.compression.FastLz.BLOCK_TYPE_COMPRESSED;
27  import static io.netty.handler.codec.compression.FastLz.BLOCK_WITH_CHECKSUM;
28  import static io.netty.handler.codec.compression.FastLz.MAGIC_NUMBER;
29  import static io.netty.handler.codec.compression.FastLz.decompress;
30  
31  /**
32   * Uncompresses a {@link ByteBuf} encoded by {@link FastLzFrameEncoder} using the FastLZ algorithm.
33   *
34   * See <a href="https://github.com/netty/netty/issues/2750">FastLZ format</a>.
35   */
36  public class FastLzFrameDecoder extends ByteToMessageDecoder {
37      /**
38       * Current state of decompression.
39       */
40      private enum State {
41          INIT_BLOCK,
42          INIT_BLOCK_PARAMS,
43          DECOMPRESS_DATA,
44          CORRUPTED
45      }
46  
47      private State currentState = State.INIT_BLOCK;
48  
49      /**
50       * Underlying checksum calculator in use.
51       */
52      private final ByteBufChecksum checksum;
53  
54      /**
55       * Length of current received chunk of data.
56       */
57      private int chunkLength;
58  
59      /**
60       * Original of current received chunk of data.
61       * It is equal to {@link #chunkLength} for non compressed chunks.
62       */
63      private int originalLength;
64  
65      /**
66       * Indicates is this chunk compressed or not.
67       */
68      private boolean isCompressed;
69  
70      /**
71       * Indicates is this chunk has checksum or not.
72       */
73      private boolean hasChecksum;
74  
75      /**
76       * Checksum value of current received chunk of data which has checksum.
77       */
78      private int currentChecksum;
79  
80      /**
81       * Creates the fastest FastLZ decoder without checksum calculation.
82       */
83      public FastLzFrameDecoder() {
84          this(false);
85      }
86  
87      /**
88       * Creates a FastLZ decoder with calculation of checksums as specified.
89       *
90       * @param validateChecksums
91       *        If true, the checksum field will be validated against the actual
92       *        uncompressed data, and if the checksums do not match, a suitable
93       *        {@link DecompressionException} will be thrown.
94       *        Note, that in this case decoder will use {@link java.util.zip.Adler32}
95       *        as a default checksum calculator.
96       */
97      public FastLzFrameDecoder(boolean validateChecksums) {
98          this(validateChecksums ? new Adler32() : null);
99      }
100 
101     /**
102      * Creates a FastLZ decoder with specified checksum calculator.
103      *
104      * @param checksum
105      *        the {@link Checksum} instance to use to check data for integrity.
106      *        You may set {@code null} if you do not want to validate checksum of each block.
107      */
108     public FastLzFrameDecoder(Checksum checksum) {
109         this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
110     }
111 
112     @Override
113     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
114         try {
115             switch (currentState) {
116             case INIT_BLOCK:
117                 if (in.readableBytes() < 4) {
118                     break;
119                 }
120 
121                 final int magic = in.readUnsignedMedium();
122                 if (magic != MAGIC_NUMBER) {
123                     throw new DecompressionException("unexpected block identifier");
124                 }
125 
126                 final byte options = in.readByte();
127                 isCompressed = (options & 0x01) == BLOCK_TYPE_COMPRESSED;
128                 hasChecksum = (options & 0x10) == BLOCK_WITH_CHECKSUM;
129 
130                 currentState = State.INIT_BLOCK_PARAMS;
131                 // fall through
132             case INIT_BLOCK_PARAMS:
133                 if (in.readableBytes() < 2 + (isCompressed ? 2 : 0) + (hasChecksum ? 4 : 0)) {
134                     break;
135                 }
136                 currentChecksum = hasChecksum ? in.readInt() : 0;
137                 chunkLength = in.readUnsignedShort();
138                 originalLength = isCompressed ? in.readUnsignedShort() : chunkLength;
139 
140                 currentState = State.DECOMPRESS_DATA;
141                 // fall through
142             case DECOMPRESS_DATA:
143                 final int chunkLength = this.chunkLength;
144                 if (in.readableBytes() < chunkLength) {
145                     break;
146                 }
147 
148                 final int idx = in.readerIndex();
149                 final int originalLength = this.originalLength;
150 
151                 ByteBuf output = null;
152 
153                 try {
154                     if (isCompressed) {
155 
156                         output = ctx.alloc().buffer(originalLength);
157                         int outputOffset = output.writerIndex();
158                         final int decompressedBytes = decompress(in, idx, chunkLength,
159                                 output, outputOffset, originalLength);
160                         if (originalLength != decompressedBytes) {
161                             throw new DecompressionException(String.format(
162                                     "stream corrupted: originalLength(%d) and actual length(%d) mismatch",
163                                     originalLength, decompressedBytes));
164                         }
165                         output.writerIndex(output.writerIndex() + decompressedBytes);
166                     } else {
167                         output = in.retainedSlice(idx, chunkLength);
168                     }
169 
170                     final ByteBufChecksum checksum = this.checksum;
171                     if (hasChecksum && checksum != null) {
172                         checksum.reset();
173                         checksum.update(output, output.readerIndex(), output.readableBytes());
174                         final int checksumResult = (int) checksum.getValue();
175                         if (checksumResult != currentChecksum) {
176                             throw new DecompressionException(String.format(
177                                     "stream corrupted: mismatching checksum: %d (expected: %d)",
178                                     checksumResult, currentChecksum));
179                         }
180                     }
181 
182                     if (output.readableBytes() > 0) {
183                         out.add(output);
184                     } else {
185                         output.release();
186                     }
187                     output = null;
188                     in.skipBytes(chunkLength);
189 
190                     currentState = State.INIT_BLOCK;
191                 } finally {
192                     if (output != null) {
193                         output.release();
194                     }
195                 }
196                 break;
197             case CORRUPTED:
198                 in.skipBytes(in.readableBytes());
199                 break;
200             default:
201                 throw new IllegalStateException();
202             }
203         } catch (Exception e) {
204             currentState = State.CORRUPTED;
205             throw e;
206         }
207     }
208 }