查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.ByteToMessageDecoder;
21  
22  import java.util.List;
23  
24  import static io.netty.handler.codec.compression.Snappy.validateChecksum;
25  
26  /**
27   * Uncompresses a {@link ByteBuf} encoded with the Snappy framing format.
28   *
29   * See <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format</a>.
30   *
31   * Note that by default, validation of the checksum header in each chunk is
32   * DISABLED for performance improvements. If performance is less of an issue,
33   * or if you would prefer the safety that checksum validation brings, please
34   * use the {@link #SnappyFrameDecoder(boolean)} constructor with the argument
35   * set to {@code true}.
36   */
37  public class SnappyFrameDecoder extends ByteToMessageDecoder {
38  
39      private enum ChunkType {
40          STREAM_IDENTIFIER,
41          COMPRESSED_DATA,
42          UNCOMPRESSED_DATA,
43          RESERVED_UNSKIPPABLE,
44          RESERVED_SKIPPABLE
45      }
46  
47      private static final int SNAPPY_IDENTIFIER_LEN = 6;
48      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L95
49      private static final int MAX_UNCOMPRESSED_DATA_SIZE = 65536 + 4;
50      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
51      private static final int MAX_DECOMPRESSED_DATA_SIZE = 65536;
52      // See https://github.com/google/snappy/blob/1.1.9/framing_format.txt#L82
53      private static final int MAX_COMPRESSED_CHUNK_SIZE = 16777216 - 1;
54  
55      private final Snappy snappy = new Snappy();
56      private final boolean validateChecksums;
57  
58      private boolean started;
59      private boolean corrupted;
60      private int numBytesToSkip;
61  
62      /**
63       * Creates a new snappy-framed decoder with validation of checksums
64       * turned OFF. To turn checksum validation on, please use the alternate
65       * {@link #SnappyFrameDecoder(boolean)} constructor.
66       */
67      public SnappyFrameDecoder() {
68          this(false);
69      }
70  
71      /**
72       * Creates a new snappy-framed decoder with validation of checksums
73       * as specified.
74       *
75       * @param validateChecksums
76       *        If true, the checksum field will be validated against the actual
77       *        uncompressed data, and if the checksums do not match, a suitable
78       *        {@link DecompressionException} will be thrown
79       */
80      public SnappyFrameDecoder(boolean validateChecksums) {
81          this.validateChecksums = validateChecksums;
82      }
83  
84      @Override
85      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
86          if (corrupted) {
87              in.skipBytes(in.readableBytes());
88              return;
89          }
90  
91          if (numBytesToSkip != 0) {
92              // The last chunkType we detected was RESERVED_SKIPPABLE and we still have some bytes to skip.
93              int skipBytes = Math.min(numBytesToSkip, in.readableBytes());
94              in.skipBytes(skipBytes);
95              numBytesToSkip -= skipBytes;
96  
97              // Let's return and try again.
98              return;
99          }
100 
101         try {
102             int idx = in.readerIndex();
103             final int inSize = in.readableBytes();
104             if (inSize < 4) {
105                 // We need to be at least able to read the chunk type identifier (one byte),
106                 // and the length of the chunk (3 bytes) in order to proceed
107                 return;
108             }
109 
110             final int chunkTypeVal = in.getUnsignedByte(idx);
111             final ChunkType chunkType = mapChunkType((byte) chunkTypeVal);
112             final int chunkLength = in.getUnsignedMediumLE(idx + 1);
113 
114             switch (chunkType) {
115                 case STREAM_IDENTIFIER:
116                     if (chunkLength != SNAPPY_IDENTIFIER_LEN) {
117                         throw new DecompressionException("Unexpected length of stream identifier: " + chunkLength);
118                     }
119 
120                     if (inSize < 4 + SNAPPY_IDENTIFIER_LEN) {
121                         break;
122                     }
123 
124                     in.skipBytes(4);
125                     int offset = in.readerIndex();
126                     in.skipBytes(SNAPPY_IDENTIFIER_LEN);
127 
128                     checkByte(in.getByte(offset++), (byte) 's');
129                     checkByte(in.getByte(offset++), (byte) 'N');
130                     checkByte(in.getByte(offset++), (byte) 'a');
131                     checkByte(in.getByte(offset++), (byte) 'P');
132                     checkByte(in.getByte(offset++), (byte) 'p');
133                     checkByte(in.getByte(offset), (byte) 'Y');
134 
135                     started = true;
136                     break;
137                 case RESERVED_SKIPPABLE:
138                     if (!started) {
139                         throw new DecompressionException("Received RESERVED_SKIPPABLE tag before STREAM_IDENTIFIER");
140                     }
141 
142                     in.skipBytes(4);
143 
144                     int skipBytes = Math.min(chunkLength, in.readableBytes());
145                     in.skipBytes(skipBytes);
146                     if (skipBytes != chunkLength) {
147                         // We could skip all bytes, let's store the remaining so we can do so once we receive more
148                         // data.
149                         numBytesToSkip = chunkLength - skipBytes;
150                     }
151                     break;
152                 case RESERVED_UNSKIPPABLE:
153                     // The spec mandates that reserved unskippable chunks must immediately
154                     // return an error, as we must assume that we cannot decode the stream
155                     // correctly
156                     throw new DecompressionException(
157                             "Found reserved unskippable chunk type: 0x" + Integer.toHexString(chunkTypeVal));
158                 case UNCOMPRESSED_DATA:
159                     if (!started) {
160                         throw new DecompressionException("Received UNCOMPRESSED_DATA tag before STREAM_IDENTIFIER");
161                     }
162                     if (chunkLength > MAX_UNCOMPRESSED_DATA_SIZE) {
163                         throw new DecompressionException("Received UNCOMPRESSED_DATA larger than " +
164                                 MAX_UNCOMPRESSED_DATA_SIZE + " bytes");
165                     }
166 
167                     if (inSize < 4 + chunkLength) {
168                         return;
169                     }
170 
171                     in.skipBytes(4);
172                     if (validateChecksums) {
173                         int checksum = in.readIntLE();
174                         validateChecksum(checksum, in, in.readerIndex(), chunkLength - 4);
175                     } else {
176                         in.skipBytes(4);
177                     }
178                     out.add(in.readRetainedSlice(chunkLength - 4));
179                     break;
180                 case COMPRESSED_DATA:
181                     if (!started) {
182                         throw new DecompressionException("Received COMPRESSED_DATA tag before STREAM_IDENTIFIER");
183                     }
184 
185                     if (chunkLength > MAX_COMPRESSED_CHUNK_SIZE) {
186                         throw new DecompressionException("Received COMPRESSED_DATA that contains" +
187                                 " chunk that exceeds " + MAX_COMPRESSED_CHUNK_SIZE + " bytes");
188                     }
189 
190                     if (inSize < 4 + chunkLength) {
191                         return;
192                     }
193 
194                     in.skipBytes(4);
195                     int checksum = in.readIntLE();
196 
197                     int uncompressedSize = snappy.getPreamble(in);
198                     if (uncompressedSize > MAX_DECOMPRESSED_DATA_SIZE) {
199                         throw new DecompressionException("Received COMPRESSED_DATA that contains" +
200                                 " uncompressed data that exceeds " + MAX_DECOMPRESSED_DATA_SIZE + " bytes");
201                     }
202 
203                     ByteBuf uncompressed = ctx.alloc().buffer(uncompressedSize, MAX_DECOMPRESSED_DATA_SIZE);
204                     try {
205                         if (validateChecksums) {
206                             int oldWriterIndex = in.writerIndex();
207                             try {
208                                 in.writerIndex(in.readerIndex() + chunkLength - 4);
209                                 snappy.decode(in, uncompressed);
210                             } finally {
211                                 in.writerIndex(oldWriterIndex);
212                             }
213                             validateChecksum(checksum, uncompressed, 0, uncompressed.writerIndex());
214                         } else {
215                             snappy.decode(in.readSlice(chunkLength - 4), uncompressed);
216                         }
217                         out.add(uncompressed);
218                         uncompressed = null;
219                     } finally {
220                         if (uncompressed != null) {
221                             uncompressed.release();
222                         }
223                     }
224                     snappy.reset();
225                     break;
226             }
227         } catch (Exception e) {
228             corrupted = true;
229             throw e;
230         }
231     }
232 
233     private static void checkByte(byte actual, byte expect) {
234         if (actual != expect) {
235             throw new DecompressionException("Unexpected stream identifier contents. Mismatched snappy " +
236                     "protocol version?");
237         }
238     }
239 
240     /**
241      * Decodes the chunk type from the type tag byte.
242      *
243      * @param type The tag byte extracted from the stream
244      * @return The appropriate {@link ChunkType}, defaulting to {@link ChunkType#RESERVED_UNSKIPPABLE}
245      */
246     private static ChunkType mapChunkType(byte type) {
247         if (type == 0) {
248             return ChunkType.COMPRESSED_DATA;
249         } else if (type == 1) {
250             return ChunkType.UNCOMPRESSED_DATA;
251         } else if (type == (byte) 0xff) {
252             return ChunkType.STREAM_IDENTIFIER;
253         } else if ((type & 0x80) == 0x80) {
254             return ChunkType.RESERVED_SKIPPABLE;
255         } else {
256             return ChunkType.RESERVED_UNSKIPPABLE;
257         }
258     }
259 }