1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
21 import io.netty.util.internal.ObjectUtil;
22 import net.jpountz.lz4.LZ4Exception;
23 import net.jpountz.lz4.LZ4Factory;
24 import net.jpountz.lz4.LZ4FastDecompressor;
25
26 import java.util.List;
27 import java.util.zip.Checksum;
28
29 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
30 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
31 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
32 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
33 import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
34 import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
35 import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class Lz4FrameDecoder extends ByteToMessageDecoder {
54
55
56
57 private enum State {
58 INIT_BLOCK,
59 DECOMPRESS_DATA,
60 FINISHED,
61 CORRUPTED
62 }
63
64 private State currentState = State.INIT_BLOCK;
65
66
67
68
69 private LZ4FastDecompressor decompressor;
70
71
72
73
74 private ByteBufChecksum checksum;
75
76
77
78
79 private int blockType;
80
81
82
83
84 private int compressedLength;
85
86
87
88
89 private int decompressedLength;
90
91
92
93
94 private int currentChecksum;
95
96
97
98
99
100
101
102
103
104
105 public Lz4FrameDecoder() {
106 this(false);
107 }
108
109
110
111
112
113
114
115
116 public Lz4FrameDecoder(boolean validateChecksums) {
117 this(LZ4Factory.fastestInstance(), validateChecksums);
118 }
119
120
121
122
123
124
125
126
127
128
129
130
131
132 public Lz4FrameDecoder(LZ4Factory factory, boolean validateChecksums) {
133 this(factory, validateChecksums ? new Lz4XXHash32(DEFAULT_SEED) : null);
134 }
135
136
137
138
139
140
141
142
143
144
145 public Lz4FrameDecoder(LZ4Factory factory, Checksum checksum) {
146 decompressor = ObjectUtil.checkNotNull(factory, "factory").fastDecompressor();
147 this.checksum = checksum == null ? null : ByteBufChecksum.wrapChecksum(checksum);
148 }
149
150 @Override
151 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
152 try {
153 switch (currentState) {
154 case INIT_BLOCK:
155 if (in.readableBytes() < HEADER_LENGTH) {
156 break;
157 }
158 final long magic = in.readLong();
159 if (magic != MAGIC_NUMBER) {
160 throw new DecompressionException("unexpected block identifier");
161 }
162
163 final int token = in.readByte();
164 final int compressionLevel = (token & 0x0F) + COMPRESSION_LEVEL_BASE;
165 int blockType = token & 0xF0;
166
167 int compressedLength = Integer.reverseBytes(in.readInt());
168 if (compressedLength < 0 || compressedLength > MAX_BLOCK_SIZE) {
169 throw new DecompressionException(String.format(
170 "invalid compressedLength: %d (expected: 0-%d)",
171 compressedLength, MAX_BLOCK_SIZE));
172 }
173
174 int decompressedLength = Integer.reverseBytes(in.readInt());
175 final int maxDecompressedLength = 1 << compressionLevel;
176 if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
177 throw new DecompressionException(String.format(
178 "invalid decompressedLength: %d (expected: 0-%d)",
179 decompressedLength, maxDecompressedLength));
180 }
181 if (decompressedLength == 0 && compressedLength != 0
182 || decompressedLength != 0 && compressedLength == 0
183 || blockType == BLOCK_TYPE_NON_COMPRESSED && decompressedLength != compressedLength) {
184 throw new DecompressionException(String.format(
185 "stream corrupted: compressedLength(%d) and decompressedLength(%d) mismatch",
186 compressedLength, decompressedLength));
187 }
188
189 int currentChecksum = Integer.reverseBytes(in.readInt());
190 if (decompressedLength == 0 && compressedLength == 0) {
191 if (currentChecksum != 0) {
192 throw new DecompressionException("stream corrupted: checksum error");
193 }
194 currentState = State.FINISHED;
195 decompressor = null;
196 checksum = null;
197 break;
198 }
199
200 this.blockType = blockType;
201 this.compressedLength = compressedLength;
202 this.decompressedLength = decompressedLength;
203 this.currentChecksum = currentChecksum;
204
205 currentState = State.DECOMPRESS_DATA;
206
207 case DECOMPRESS_DATA:
208 blockType = this.blockType;
209 compressedLength = this.compressedLength;
210 decompressedLength = this.decompressedLength;
211 currentChecksum = this.currentChecksum;
212
213 if (in.readableBytes() < compressedLength) {
214 break;
215 }
216
217 final ByteBufChecksum checksum = this.checksum;
218 ByteBuf uncompressed = null;
219
220 try {
221 switch (blockType) {
222 case BLOCK_TYPE_NON_COMPRESSED:
223
224
225 uncompressed = in.retainedSlice(in.readerIndex(), decompressedLength);
226 break;
227 case BLOCK_TYPE_COMPRESSED:
228 uncompressed = ctx.alloc().buffer(decompressedLength, decompressedLength);
229
230 decompressor.decompress(CompressionUtil.safeReadableNioBuffer(in),
231 uncompressed.internalNioBuffer(uncompressed.writerIndex(), decompressedLength));
232
233 uncompressed.writerIndex(uncompressed.writerIndex() + decompressedLength);
234 break;
235 default:
236 throw new DecompressionException(String.format(
237 "unexpected blockType: %d (expected: %d or %d)",
238 blockType, BLOCK_TYPE_NON_COMPRESSED, BLOCK_TYPE_COMPRESSED));
239 }
240
241 in.skipBytes(compressedLength);
242
243 if (checksum != null) {
244 CompressionUtil.checkChecksum(checksum, uncompressed, currentChecksum);
245 }
246 out.add(uncompressed);
247 uncompressed = null;
248 currentState = State.INIT_BLOCK;
249 } catch (LZ4Exception e) {
250 throw new DecompressionException(e);
251 } finally {
252 if (uncompressed != null) {
253 uncompressed.release();
254 }
255 }
256 break;
257 case FINISHED:
258 case CORRUPTED:
259 in.skipBytes(in.readableBytes());
260 break;
261 default:
262 throw new IllegalStateException();
263 }
264 } catch (Exception e) {
265 currentState = State.CORRUPTED;
266 throw e;
267 }
268 }
269
270
271
272
273
274 public boolean isClosed() {
275 return currentState == State.FINISHED;
276 }
277 }