1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.PromiseNotifier;
24 import io.netty.util.internal.EmptyArrays;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.SuppressJava6Requirement;
28 import io.netty.util.internal.SystemPropertyUtil;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.util.zip.CRC32;
33 import java.util.zip.Deflater;
34
35
36
37
38 public class JdkZlibEncoder extends ZlibEncoder {
39
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(JdkZlibEncoder.class);
41
42
43
44
45
46 private static final int MAX_INITIAL_OUTPUT_BUFFER_SIZE;
47
48
49
50 private static final int MAX_INPUT_BUFFER_SIZE;
51
52 private final ZlibWrapper wrapper;
53 private final Deflater deflater;
54 private volatile boolean finished;
55 private volatile ChannelHandlerContext ctx;
56
57
58
59
60 private final CRC32 crc = new CRC32();
61 private static final byte[] gzipHeader = {0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0};
62 private boolean writeHeader = true;
63
64 static {
65 MAX_INITIAL_OUTPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
66 "io.netty.jdkzlib.encoder.maxInitialOutputBufferSize",
67 65536);
68 MAX_INPUT_BUFFER_SIZE = SystemPropertyUtil.getInt(
69 "io.netty.jdkzlib.encoder.maxInputBufferSize",
70 65536);
71
72 if (logger.isDebugEnabled()) {
73 logger.debug("-Dio.netty.jdkzlib.encoder.maxInitialOutputBufferSize={}", MAX_INITIAL_OUTPUT_BUFFER_SIZE);
74 logger.debug("-Dio.netty.jdkzlib.encoder.maxInputBufferSize={}", MAX_INPUT_BUFFER_SIZE);
75 }
76 }
77
78
79
80
81
82
83
84 public JdkZlibEncoder() {
85 this(6);
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99 public JdkZlibEncoder(int compressionLevel) {
100 this(ZlibWrapper.ZLIB, compressionLevel);
101 }
102
103
104
105
106
107
108
109 public JdkZlibEncoder(ZlibWrapper wrapper) {
110 this(wrapper, 6);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124 public JdkZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
125 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
126 ObjectUtil.checkNotNull(wrapper, "wrapper");
127
128 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129 throw new IllegalArgumentException(
130 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131 "allowed for compression.");
132 }
133
134 this.wrapper = wrapper;
135 deflater = new Deflater(compressionLevel, wrapper != ZlibWrapper.ZLIB);
136 }
137
138
139
140
141
142
143
144
145
146
147
148 public JdkZlibEncoder(byte[] dictionary) {
149 this(6, dictionary);
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 public JdkZlibEncoder(int compressionLevel, byte[] dictionary) {
167 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
168 ObjectUtil.checkNotNull(dictionary, "dictionary");
169
170 wrapper = ZlibWrapper.ZLIB;
171 deflater = new Deflater(compressionLevel);
172 deflater.setDictionary(dictionary);
173 }
174
175 @Override
176 public ChannelFuture close() {
177 return close(ctx().newPromise());
178 }
179
180 @Override
181 public ChannelFuture close(final ChannelPromise promise) {
182 ChannelHandlerContext ctx = ctx();
183 EventExecutor executor = ctx.executor();
184 if (executor.inEventLoop()) {
185 return finishEncode(ctx, promise);
186 } else {
187 final ChannelPromise p = ctx.newPromise();
188 executor.execute(new Runnable() {
189 @Override
190 public void run() {
191 ChannelFuture f = finishEncode(ctx(), p);
192 PromiseNotifier.cascade(f, promise);
193 }
194 });
195 return p;
196 }
197 }
198
199 private ChannelHandlerContext ctx() {
200 ChannelHandlerContext ctx = this.ctx;
201 if (ctx == null) {
202 throw new IllegalStateException("not added to a pipeline");
203 }
204 return ctx;
205 }
206
207 @Override
208 public boolean isClosed() {
209 return finished;
210 }
211
212 @Override
213 protected void encode(ChannelHandlerContext ctx, ByteBuf uncompressed, ByteBuf out) throws Exception {
214 if (finished) {
215 out.writeBytes(uncompressed);
216 return;
217 }
218
219 int len = uncompressed.readableBytes();
220 if (len == 0) {
221 return;
222 }
223
224 if (uncompressed.hasArray()) {
225
226 encodeSome(uncompressed, out);
227 } else {
228 int heapBufferSize = Math.min(len, MAX_INPUT_BUFFER_SIZE);
229 ByteBuf heapBuf = ctx.alloc().heapBuffer(heapBufferSize, heapBufferSize);
230 try {
231 while (uncompressed.isReadable()) {
232 uncompressed.readBytes(heapBuf, Math.min(heapBuf.writableBytes(), uncompressed.readableBytes()));
233 encodeSome(heapBuf, out);
234 heapBuf.clear();
235 }
236 } finally {
237 heapBuf.release();
238 }
239 }
240
241 deflater.setInput(EmptyArrays.EMPTY_BYTES);
242 }
243
244 private void encodeSome(ByteBuf in, ByteBuf out) {
245
246
247 byte[] inAry = in.array();
248 int offset = in.arrayOffset() + in.readerIndex();
249
250 if (writeHeader) {
251 writeHeader = false;
252 if (wrapper == ZlibWrapper.GZIP) {
253 out.writeBytes(gzipHeader);
254 }
255 }
256
257 int len = in.readableBytes();
258 if (wrapper == ZlibWrapper.GZIP) {
259 crc.update(inAry, offset, len);
260 }
261
262 deflater.setInput(inAry, offset, len);
263 for (;;) {
264 deflate(out);
265 if (!out.isWritable()) {
266
267
268 out.ensureWritable(out.writerIndex());
269 } else if (deflater.needsInput()) {
270
271 break;
272 }
273 }
274 in.skipBytes(len);
275 }
276
277 @Override
278 protected final ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg,
279 boolean preferDirect) throws Exception {
280 int sizeEstimate = (int) Math.ceil(msg.readableBytes() * 1.001) + 12;
281 if (writeHeader) {
282 switch (wrapper) {
283 case GZIP:
284 sizeEstimate += gzipHeader.length;
285 break;
286 case ZLIB:
287 sizeEstimate += 2;
288 break;
289 default:
290
291 }
292 }
293
294 if (sizeEstimate < 0 || sizeEstimate > MAX_INITIAL_OUTPUT_BUFFER_SIZE) {
295
296 return ctx.alloc().heapBuffer(MAX_INITIAL_OUTPUT_BUFFER_SIZE);
297 }
298 return ctx.alloc().heapBuffer(sizeEstimate);
299 }
300
301 @Override
302 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
303 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
304 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
305 }
306
307 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
308 if (finished) {
309 promise.setSuccess();
310 return promise;
311 }
312
313 finished = true;
314 ByteBuf footer = ctx.alloc().heapBuffer();
315 if (writeHeader && wrapper == ZlibWrapper.GZIP) {
316
317 writeHeader = false;
318 footer.writeBytes(gzipHeader);
319 }
320
321 deflater.finish();
322
323 while (!deflater.finished()) {
324 deflate(footer);
325 if (!footer.isWritable()) {
326
327 ctx.write(footer);
328 footer = ctx.alloc().heapBuffer();
329 }
330 }
331 if (wrapper == ZlibWrapper.GZIP) {
332 int crcValue = (int) crc.getValue();
333 int uncBytes = deflater.getTotalIn();
334 footer.writeByte(crcValue);
335 footer.writeByte(crcValue >>> 8);
336 footer.writeByte(crcValue >>> 16);
337 footer.writeByte(crcValue >>> 24);
338 footer.writeByte(uncBytes);
339 footer.writeByte(uncBytes >>> 8);
340 footer.writeByte(uncBytes >>> 16);
341 footer.writeByte(uncBytes >>> 24);
342 }
343 deflater.end();
344 return ctx.writeAndFlush(footer, promise);
345 }
346
347 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
348 private void deflate(ByteBuf out) {
349 if (PlatformDependent.javaVersion() < 7) {
350 deflateJdk6(out);
351 }
352 int numBytes;
353 do {
354 int writerIndex = out.writerIndex();
355 numBytes = deflater.deflate(
356 out.array(), out.arrayOffset() + writerIndex, out.writableBytes(), Deflater.SYNC_FLUSH);
357 out.writerIndex(writerIndex + numBytes);
358 } while (numBytes > 0);
359 }
360
361 private void deflateJdk6(ByteBuf out) {
362 int numBytes;
363 do {
364 int writerIndex = out.writerIndex();
365 numBytes = deflater.deflate(
366 out.array(), out.arrayOffset() + writerIndex, out.writableBytes());
367 out.writerIndex(writerIndex + numBytes);
368 } while (numBytes > 0);
369 }
370
371 @Override
372 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
373 this.ctx = ctx;
374 }
375 }