1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.Unpooled;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.embedded.EmbeddedChannel;
23 import io.netty.handler.codec.ByteToMessageDecoder;
24 import io.netty.handler.codec.compression.BrotliEncoder;
25 import io.netty.handler.codec.compression.ZlibCodecFactory;
26 import io.netty.handler.codec.compression.ZlibWrapper;
27 import io.netty.handler.codec.compression.Brotli;
28 import io.netty.handler.codec.compression.BrotliOptions;
29 import io.netty.handler.codec.compression.CompressionOptions;
30 import io.netty.handler.codec.compression.DeflateOptions;
31 import io.netty.handler.codec.compression.GzipOptions;
32 import io.netty.handler.codec.compression.StandardCompressionOptions;
33 import io.netty.handler.codec.compression.Zstd;
34 import io.netty.handler.codec.compression.ZstdEncoder;
35 import io.netty.handler.codec.compression.ZstdOptions;
36 import io.netty.handler.codec.compression.SnappyFrameEncoder;
37 import io.netty.handler.codec.compression.SnappyOptions;
38 import io.netty.util.concurrent.PromiseCombiner;
39 import io.netty.util.internal.ObjectUtil;
40 import io.netty.util.internal.UnstableApi;
41
42 import java.util.ArrayList;
43 import java.util.List;
44
45 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
46 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
47 import static io.netty.handler.codec.http.HttpHeaderValues.BR;
48 import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
49 import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
50 import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
51 import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
52 import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
53 import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
54 import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
55
56
57
58
59
60 @UnstableApi
61 public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
62
63 public static final int DEFAULT_COMPRESSION_LEVEL = 6;
64 public static final int DEFAULT_WINDOW_BITS = 15;
65 public static final int DEFAULT_MEM_LEVEL = 8;
66
67 private int compressionLevel;
68 private int windowBits;
69 private int memLevel;
70 private final Http2Connection.PropertyKey propertyKey;
71
72 private final boolean supportsCompressionOptions;
73
74 private BrotliOptions brotliOptions;
75 private GzipOptions gzipCompressionOptions;
76 private DeflateOptions deflateOptions;
77 private ZstdOptions zstdOptions;
78 private SnappyOptions snappyOptions;
79
80
81
82
83
84 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
85 this(delegate, defaultCompressionOptions());
86 }
87
88 private static CompressionOptions[] defaultCompressionOptions() {
89 List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
90 compressionOptions.add(StandardCompressionOptions.gzip());
91 compressionOptions.add(StandardCompressionOptions.deflate());
92 compressionOptions.add(StandardCompressionOptions.snappy());
93 if (Brotli.isAvailable()) {
94 compressionOptions.add(StandardCompressionOptions.brotli());
95 }
96 if (Zstd.isAvailable()) {
97 compressionOptions.add(StandardCompressionOptions.zstd());
98 }
99 return compressionOptions.toArray(new CompressionOptions[0]);
100 }
101
102
103
104
105 @Deprecated
106 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
107 int memLevel) {
108 super(delegate);
109 this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
110 this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
111 this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
112
113 propertyKey = connection().newKey();
114 connection().addListener(new Http2ConnectionAdapter() {
115 @Override
116 public void onStreamRemoved(Http2Stream stream) {
117 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
118 if (compressor != null) {
119 cleanup(stream, compressor);
120 }
121 }
122 });
123
124 supportsCompressionOptions = false;
125 }
126
127
128
129
130
131 public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
132 CompressionOptions... compressionOptionsArgs) {
133 super(delegate);
134 ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
135 ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
136
137 for (CompressionOptions compressionOptions : compressionOptionsArgs) {
138
139
140
141
142
143
144 if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
145 brotliOptions = (BrotliOptions) compressionOptions;
146 } else if (compressionOptions instanceof GzipOptions) {
147 gzipCompressionOptions = (GzipOptions) compressionOptions;
148 } else if (compressionOptions instanceof DeflateOptions) {
149 deflateOptions = (DeflateOptions) compressionOptions;
150 } else if (compressionOptions instanceof ZstdOptions) {
151 zstdOptions = (ZstdOptions) compressionOptions;
152 } else if (compressionOptions instanceof SnappyOptions) {
153 snappyOptions = (SnappyOptions) compressionOptions;
154 } else {
155 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
156 ": " + compressionOptions);
157 }
158 }
159
160 supportsCompressionOptions = true;
161
162 propertyKey = connection().newKey();
163 connection().addListener(new Http2ConnectionAdapter() {
164 @Override
165 public void onStreamRemoved(Http2Stream stream) {
166 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
167 if (compressor != null) {
168 cleanup(stream, compressor);
169 }
170 }
171 });
172 }
173
174 @Override
175 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
176 final boolean endOfStream, ChannelPromise promise) {
177 final Http2Stream stream = connection().stream(streamId);
178 final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
179 if (channel == null) {
180
181 return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
182 }
183
184 try {
185
186 channel.writeOutbound(data);
187 ByteBuf buf = nextReadableBuf(channel);
188 if (buf == null) {
189 if (endOfStream) {
190 if (channel.finish()) {
191 buf = nextReadableBuf(channel);
192 }
193 return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
194 true, promise);
195 }
196
197 promise.setSuccess();
198 return promise;
199 }
200
201 PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
202 for (;;) {
203 ByteBuf nextBuf = nextReadableBuf(channel);
204 boolean compressedEndOfStream = nextBuf == null && endOfStream;
205 if (compressedEndOfStream && channel.finish()) {
206 nextBuf = nextReadableBuf(channel);
207 compressedEndOfStream = nextBuf == null;
208 }
209
210 ChannelPromise bufPromise = ctx.newPromise();
211 combiner.add(bufPromise);
212 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
213 if (nextBuf == null) {
214 break;
215 }
216
217 padding = 0;
218 buf = nextBuf;
219 }
220 combiner.finish(promise);
221 } catch (Throwable cause) {
222 promise.tryFailure(cause);
223 } finally {
224 if (endOfStream) {
225 cleanup(stream, channel);
226 }
227 }
228 return promise;
229 }
230
231 @Override
232 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
233 boolean endStream, ChannelPromise promise) {
234 try {
235
236 EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
237
238
239 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
240
241
242 bindCompressorToStream(compressor, streamId);
243
244 return future;
245 } catch (Throwable e) {
246 promise.tryFailure(e);
247 }
248 return promise;
249 }
250
251 @Override
252 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
253 final int streamDependency, final short weight, final boolean exclusive, final int padding,
254 final boolean endOfStream, final ChannelPromise promise) {
255 try {
256
257 EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
258
259
260 ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
261 padding, endOfStream, promise);
262
263
264 bindCompressorToStream(compressor, streamId);
265
266 return future;
267 } catch (Throwable e) {
268 promise.tryFailure(e);
269 }
270 return promise;
271 }
272
273
274
275
276
277
278
279
280
281
282
283 protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
284 throws Http2Exception {
285 if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
286 return newCompressionChannel(ctx, ZlibWrapper.GZIP);
287 }
288 if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
289 return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
290 }
291 if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
292 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
293 ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
294 }
295 if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
296 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
297 ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
298 zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
299 }
300 if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
301 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
302 ctx.channel().config(), new SnappyFrameEncoder());
303 }
304
305 return null;
306 }
307
308
309
310
311
312
313
314
315
316 protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
317 return contentEncoding;
318 }
319
320
321
322
323
324
325 private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
326 if (supportsCompressionOptions) {
327 if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
328 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
329 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
330 gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
331 gzipCompressionOptions.memLevel()));
332 } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
333 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
334 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
335 deflateOptions.compressionLevel(), deflateOptions.windowBits(),
336 deflateOptions.memLevel()));
337 } else {
338 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
339 }
340 } else {
341 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
342 ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
343 memLevel));
344 }
345 }
346
347
348
349
350
351
352
353
354
355
356
357 private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
358 throws Http2Exception {
359 if (endOfStream) {
360 return null;
361 }
362
363 CharSequence encoding = headers.get(CONTENT_ENCODING);
364 if (encoding == null) {
365 encoding = IDENTITY;
366 }
367 final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
368 if (compressor != null) {
369 CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
370 if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
371 headers.remove(CONTENT_ENCODING);
372 } else {
373 headers.set(CONTENT_ENCODING, targetContentEncoding);
374 }
375
376
377
378
379 headers.remove(CONTENT_LENGTH);
380 }
381
382 return compressor;
383 }
384
385
386
387
388
389
390 private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
391 if (compressor != null) {
392 Http2Stream stream = connection().stream(streamId);
393 if (stream != null) {
394 stream.setProperty(propertyKey, compressor);
395 }
396 }
397 }
398
399
400
401
402
403
404
405 void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
406 compressor.finishAndReleaseAll();
407 stream.removeProperty(propertyKey);
408 }
409
410
411
412
413
414
415
416 private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
417 for (;;) {
418 final ByteBuf buf = compressor.readOutbound();
419 if (buf == null) {
420 return null;
421 }
422 if (!buf.isReadable()) {
423 buf.release();
424 continue;
425 }
426 return buf;
427 }
428 }
429 }