1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.jcraft.jzlib.Deflater;
19 import com.jcraft.jzlib.JZlib;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.util.concurrent.EventExecutor;
27 import io.netty.util.concurrent.Future;
28 import io.netty.util.concurrent.PromiseNotifier;
29 import io.netty.util.internal.EmptyArrays;
30 import io.netty.util.internal.ObjectUtil;
31
32 import java.util.concurrent.TimeUnit;
33
34
35
36
37 public class JZlibEncoder extends ZlibEncoder {
38
39 private final int wrapperOverhead;
40 private final Deflater z = new Deflater();
41 private volatile boolean finished;
42 private volatile ChannelHandlerContext ctx;
43
44 private static final int THREAD_POOL_DELAY_SECONDS = 10;
45
46
47
48
49
50
51
52
53 public JZlibEncoder() {
54 this(6);
55 }
56
57
58
59
60
61
62
63
64
65
66
67
68
69 public JZlibEncoder(int compressionLevel) {
70 this(ZlibWrapper.ZLIB, compressionLevel);
71 }
72
73
74
75
76
77
78
79
80 public JZlibEncoder(ZlibWrapper wrapper) {
81 this(wrapper, 6);
82 }
83
84
85
86
87
88
89
90
91
92
93
94
95
96 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
97 this(wrapper, compressionLevel, 15, 8);
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
123 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
124 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
125 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
126 ObjectUtil.checkNotNull(wrapper, "wrapper");
127
128 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
129 throw new IllegalArgumentException(
130 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
131 "allowed for compression.");
132 }
133
134 int resultCode = z.init(
135 compressionLevel, windowBits, memLevel,
136 ZlibUtil.convertWrapperType(wrapper));
137 if (resultCode != JZlib.Z_OK) {
138 ZlibUtil.fail(z, "initialization failure", resultCode);
139 }
140
141 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155 public JZlibEncoder(byte[] dictionary) {
156 this(6, dictionary);
157 }
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 public JZlibEncoder(int compressionLevel, byte[] dictionary) {
175 this(compressionLevel, 15, 8, dictionary);
176 }
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203 public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
204 ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
205 ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
206 ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
207 ObjectUtil.checkNotNull(dictionary, "dictionary");
208
209 int resultCode;
210 resultCode = z.deflateInit(
211 compressionLevel, windowBits, memLevel,
212 JZlib.W_ZLIB);
213 if (resultCode != JZlib.Z_OK) {
214 ZlibUtil.fail(z, "initialization failure", resultCode);
215 } else {
216 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
217 if (resultCode != JZlib.Z_OK) {
218 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
219 }
220 }
221
222 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
223 }
224
225 @Override
226 public ChannelFuture close() {
227 return close(ctx().channel().newPromise());
228 }
229
230 @Override
231 public ChannelFuture close(final ChannelPromise promise) {
232 ChannelHandlerContext ctx = ctx();
233 EventExecutor executor = ctx.executor();
234 if (executor.inEventLoop()) {
235 return finishEncode(ctx, promise);
236 } else {
237 final ChannelPromise p = ctx.newPromise();
238 executor.execute(new Runnable() {
239 @Override
240 public void run() {
241 ChannelFuture f = finishEncode(ctx(), p);
242 PromiseNotifier.cascade(f, promise);
243 }
244 });
245 return p;
246 }
247 }
248
249 private ChannelHandlerContext ctx() {
250 ChannelHandlerContext ctx = this.ctx;
251 if (ctx == null) {
252 throw new IllegalStateException("not added to a pipeline");
253 }
254 return ctx;
255 }
256
257 @Override
258 public boolean isClosed() {
259 return finished;
260 }
261
262 @Override
263 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
264 if (finished) {
265 out.writeBytes(in);
266 return;
267 }
268
269 int inputLength = in.readableBytes();
270 if (inputLength == 0) {
271 return;
272 }
273
274 try {
275
276 boolean inHasArray = in.hasArray();
277 z.avail_in = inputLength;
278 if (inHasArray) {
279 z.next_in = in.array();
280 z.next_in_index = in.arrayOffset() + in.readerIndex();
281 } else {
282 byte[] array = new byte[inputLength];
283 in.getBytes(in.readerIndex(), array);
284 z.next_in = array;
285 z.next_in_index = 0;
286 }
287 int oldNextInIndex = z.next_in_index;
288
289
290 int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
291 out.ensureWritable(maxOutputLength);
292 z.avail_out = maxOutputLength;
293 z.next_out = out.array();
294 z.next_out_index = out.arrayOffset() + out.writerIndex();
295 int oldNextOutIndex = z.next_out_index;
296
297
298 int resultCode;
299 try {
300 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
301 } finally {
302 in.skipBytes(z.next_in_index - oldNextInIndex);
303 }
304
305 if (resultCode != JZlib.Z_OK) {
306 ZlibUtil.fail(z, "compression failure", resultCode);
307 }
308
309 int outputLength = z.next_out_index - oldNextOutIndex;
310 if (outputLength > 0) {
311 out.writerIndex(out.writerIndex() + outputLength);
312 }
313 } finally {
314
315
316
317
318 z.next_in = null;
319 z.next_out = null;
320 }
321 }
322
323 @Override
324 public void close(
325 final ChannelHandlerContext ctx,
326 final ChannelPromise promise) {
327 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
328
329 if (!f.isDone()) {
330
331 final Future<?> future = ctx.executor().schedule(new Runnable() {
332 @Override
333 public void run() {
334 if (!promise.isDone()) {
335 ctx.close(promise);
336 }
337 }
338 }, THREAD_POOL_DELAY_SECONDS, TimeUnit.SECONDS);
339
340 f.addListener(new ChannelFutureListener() {
341 @Override
342 public void operationComplete(ChannelFuture f) {
343
344 future.cancel(true);
345 if (!promise.isDone()) {
346 ctx.close(promise);
347 }
348 }
349 });
350 } else {
351 ctx.close(promise);
352 }
353 }
354
355 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
356 if (finished) {
357 promise.setSuccess();
358 return promise;
359 }
360 finished = true;
361
362 ByteBuf footer;
363 try {
364
365 z.next_in = EmptyArrays.EMPTY_BYTES;
366 z.next_in_index = 0;
367 z.avail_in = 0;
368
369
370 byte[] out = new byte[32];
371 z.next_out = out;
372 z.next_out_index = 0;
373 z.avail_out = out.length;
374
375
376 int resultCode = z.deflate(JZlib.Z_FINISH);
377 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
378 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
379 return promise;
380 } else if (z.next_out_index != 0) {
381
382
383 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
384 } else {
385 footer = Unpooled.EMPTY_BUFFER;
386 }
387 } finally {
388 z.deflateEnd();
389
390
391
392
393
394 z.next_in = null;
395 z.next_out = null;
396 }
397 return ctx.writeAndFlush(footer, promise);
398 }
399
400 @Override
401 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402 this.ctx = ctx;
403 }
404 }