1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.compression;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.EncoderException;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.concurrent.PromiseNotifier;
29 import io.netty.util.internal.ObjectUtil;
30 import net.jpountz.lz4.LZ4Compressor;
31 import net.jpountz.lz4.LZ4Exception;
32 import net.jpountz.lz4.LZ4Factory;
33
34 import java.nio.ByteBuffer;
35 import java.util.zip.Checksum;
36
37 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_COMPRESSED;
38 import static io.netty.handler.codec.compression.Lz4Constants.BLOCK_TYPE_NON_COMPRESSED;
39 import static io.netty.handler.codec.compression.Lz4Constants.CHECKSUM_OFFSET;
40 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSED_LENGTH_OFFSET;
41 import static io.netty.handler.codec.compression.Lz4Constants.COMPRESSION_LEVEL_BASE;
42 import static io.netty.handler.codec.compression.Lz4Constants.DECOMPRESSED_LENGTH_OFFSET;
43 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_BLOCK_SIZE;
44 import static io.netty.handler.codec.compression.Lz4Constants.DEFAULT_SEED;
45 import static io.netty.handler.codec.compression.Lz4Constants.HEADER_LENGTH;
46 import static io.netty.handler.codec.compression.Lz4Constants.MAGIC_NUMBER;
47 import static io.netty.handler.codec.compression.Lz4Constants.MAX_BLOCK_SIZE;
48 import static io.netty.handler.codec.compression.Lz4Constants.MIN_BLOCK_SIZE;
49 import static io.netty.handler.codec.compression.Lz4Constants.TOKEN_OFFSET;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class Lz4FrameEncoder extends MessageToByteEncoder<ByteBuf> {
68 static final int DEFAULT_MAX_ENCODE_SIZE = Integer.MAX_VALUE;
69
70 private final int blockSize;
71
72
73
74
75 private final LZ4Compressor compressor;
76
77
78
79
80 private final ByteBufChecksum checksum;
81
82
83
84
85 private final int compressionLevel;
86
87
88
89
90 private ByteBuf buffer;
91
92
93
94
95 private final int maxEncodeSize;
96
97
98
99
100 private volatile boolean finished;
101
102
103
104
105 private volatile ChannelHandlerContext ctx;
106
107
108
109
110
111
112 public Lz4FrameEncoder() {
113 this(false);
114 }
115
116
117
118
119
120
121
122
123
124 public Lz4FrameEncoder(boolean highCompressor) {
125 this(LZ4Factory.fastestInstance(), highCompressor, DEFAULT_BLOCK_SIZE, new Lz4XXHash32(DEFAULT_SEED));
126 }
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize, Checksum checksum) {
141 this(factory, highCompressor, blockSize, checksum, DEFAULT_MAX_ENCODE_SIZE);
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public Lz4FrameEncoder(LZ4Factory factory, boolean highCompressor, int blockSize,
158 Checksum checksum, int maxEncodeSize) {
159 ObjectUtil.checkNotNull(factory, "factory");
160 ObjectUtil.checkNotNull(checksum, "checksum");
161
162 compressor = highCompressor ? factory.highCompressor() : factory.fastCompressor();
163 this.checksum = ByteBufChecksum.wrapChecksum(checksum);
164
165 compressionLevel = compressionLevel(blockSize);
166 this.blockSize = blockSize;
167 this.maxEncodeSize = ObjectUtil.checkPositive(maxEncodeSize, "maxEncodeSize");
168 finished = false;
169 }
170
171
172
173
174 private static int compressionLevel(int blockSize) {
175 if (blockSize < MIN_BLOCK_SIZE || blockSize > MAX_BLOCK_SIZE) {
176 throw new IllegalArgumentException(String.format(
177 "blockSize: %d (expected: %d-%d)", blockSize, MIN_BLOCK_SIZE, MAX_BLOCK_SIZE));
178 }
179 int compressionLevel = 32 - Integer.numberOfLeadingZeros(blockSize - 1);
180 compressionLevel = Math.max(0, compressionLevel - COMPRESSION_LEVEL_BASE);
181 return compressionLevel;
182 }
183
184 @Override
185 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) {
186 return allocateBuffer(ctx, msg, preferDirect, true);
187 }
188
189 private ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect,
190 boolean allowEmptyReturn) {
191 int targetBufSize = 0;
192 int remaining = msg.readableBytes() + buffer.readableBytes();
193
194
195 if (remaining < 0) {
196 throw new EncoderException("too much data to allocate a buffer for compression");
197 }
198
199 while (remaining > 0) {
200 int curSize = Math.min(blockSize, remaining);
201 remaining -= curSize;
202
203 targetBufSize += compressor.maxCompressedLength(curSize) + HEADER_LENGTH;
204 }
205
206
207
208
209 if (targetBufSize > maxEncodeSize || 0 > targetBufSize) {
210 throw new EncoderException(String.format("requested encode buffer size (%d bytes) exceeds the maximum " +
211 "allowable size (%d bytes)", targetBufSize, maxEncodeSize));
212 }
213
214 if (allowEmptyReturn && targetBufSize < blockSize) {
215 return Unpooled.EMPTY_BUFFER;
216 }
217
218 if (preferDirect) {
219 return ctx.alloc().ioBuffer(targetBufSize, targetBufSize);
220 } else {
221 return ctx.alloc().heapBuffer(targetBufSize, targetBufSize);
222 }
223 }
224
225
226
227
228
229
230
231
232 @Override
233 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
234 if (finished) {
235 if (!out.isWritable(in.readableBytes())) {
236
237 throw new IllegalStateException("encode finished and not enough space to write remaining data");
238 }
239 out.writeBytes(in);
240 return;
241 }
242
243 final ByteBuf buffer = this.buffer;
244 int length;
245 while ((length = in.readableBytes()) > 0) {
246 final int nextChunkSize = Math.min(length, buffer.writableBytes());
247 in.readBytes(buffer, nextChunkSize);
248
249 if (!buffer.isWritable()) {
250 flushBufferedData(out);
251 }
252 }
253 }
254
255 private void flushBufferedData(ByteBuf out) {
256 int flushableBytes = buffer.readableBytes();
257 if (flushableBytes == 0) {
258 return;
259 }
260 checksum.reset();
261 checksum.update(buffer, buffer.readerIndex(), flushableBytes);
262 final int check = (int) checksum.getValue();
263
264 final int bufSize = compressor.maxCompressedLength(flushableBytes) + HEADER_LENGTH;
265 out.ensureWritable(bufSize);
266 final int idx = out.writerIndex();
267 int compressedLength;
268 try {
269 ByteBuffer outNioBuffer = out.internalNioBuffer(idx + HEADER_LENGTH, out.writableBytes() - HEADER_LENGTH);
270 int pos = outNioBuffer.position();
271
272 compressor.compress(buffer.internalNioBuffer(buffer.readerIndex(), flushableBytes), outNioBuffer);
273 compressedLength = outNioBuffer.position() - pos;
274 } catch (LZ4Exception e) {
275 throw new CompressionException(e);
276 }
277 final int blockType;
278 if (compressedLength >= flushableBytes) {
279 blockType = BLOCK_TYPE_NON_COMPRESSED;
280 compressedLength = flushableBytes;
281 out.setBytes(idx + HEADER_LENGTH, buffer, buffer.readerIndex(), flushableBytes);
282 } else {
283 blockType = BLOCK_TYPE_COMPRESSED;
284 }
285
286 out.setLong(idx, MAGIC_NUMBER);
287 out.setByte(idx + TOKEN_OFFSET, (byte) (blockType | compressionLevel));
288 out.setIntLE(idx + COMPRESSED_LENGTH_OFFSET, compressedLength);
289 out.setIntLE(idx + DECOMPRESSED_LENGTH_OFFSET, flushableBytes);
290 out.setIntLE(idx + CHECKSUM_OFFSET, check);
291 out.writerIndex(idx + HEADER_LENGTH + compressedLength);
292 buffer.clear();
293 }
294
295 @Override
296 public void flush(final ChannelHandlerContext ctx) throws Exception {
297 if (buffer != null && buffer.isReadable()) {
298 final ByteBuf buf = allocateBuffer(ctx, Unpooled.EMPTY_BUFFER, isPreferDirect(), false);
299 flushBufferedData(buf);
300 ctx.write(buf);
301 }
302 ctx.flush();
303 }
304
305 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
306 if (finished) {
307 promise.setSuccess();
308 return promise;
309 }
310 finished = true;
311
312 final ByteBuf footer = ctx.alloc().heapBuffer(
313 compressor.maxCompressedLength(buffer.readableBytes()) + HEADER_LENGTH);
314 flushBufferedData(footer);
315
316 footer.ensureWritable(HEADER_LENGTH);
317 final int idx = footer.writerIndex();
318 footer.setLong(idx, MAGIC_NUMBER);
319 footer.setByte(idx + TOKEN_OFFSET, (byte) (BLOCK_TYPE_NON_COMPRESSED | compressionLevel));
320 footer.setInt(idx + COMPRESSED_LENGTH_OFFSET, 0);
321 footer.setInt(idx + DECOMPRESSED_LENGTH_OFFSET, 0);
322 footer.setInt(idx + CHECKSUM_OFFSET, 0);
323
324 footer.writerIndex(idx + HEADER_LENGTH);
325
326 return ctx.writeAndFlush(footer, promise);
327 }
328
329
330
331
332 public boolean isClosed() {
333 return finished;
334 }
335
336
337
338
339
340
341 public ChannelFuture close() {
342 return close(ctx().newPromise());
343 }
344
345
346
347
348
349
350 public ChannelFuture close(final ChannelPromise promise) {
351 ChannelHandlerContext ctx = ctx();
352 EventExecutor executor = ctx.executor();
353 if (executor.inEventLoop()) {
354 return finishEncode(ctx, promise);
355 } else {
356 executor.execute(new Runnable() {
357 @Override
358 public void run() {
359 ChannelFuture f = finishEncode(ctx(), promise);
360 PromiseNotifier.cascade(f, promise);
361 }
362 });
363 return promise;
364 }
365 }
366
367 @Override
368 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
369 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
370
371 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
372 }
373
374 private ChannelHandlerContext ctx() {
375 ChannelHandlerContext ctx = this.ctx;
376 if (ctx == null) {
377 throw new IllegalStateException("not added to a pipeline");
378 }
379 return ctx;
380 }
381
382 @Override
383 public void handlerAdded(ChannelHandlerContext ctx) {
384 this.ctx = ctx;
385
386 buffer = Unpooled.wrappedBuffer(new byte[blockSize]);
387 buffer.clear();
388 }
389
390 @Override
391 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
392 super.handlerRemoved(ctx);
393 if (buffer != null) {
394 buffer.release();
395 buffer = null;
396 }
397 }
398
399 final ByteBuf getBackingBuffer() {
400 return buffer;
401 }
402 }