查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.embedded.EmbeddedChannel;
23  import io.netty.handler.codec.ByteToMessageDecoder;
24  import io.netty.handler.codec.compression.BrotliEncoder;
25  import io.netty.handler.codec.compression.ZlibCodecFactory;
26  import io.netty.handler.codec.compression.ZlibWrapper;
27  import io.netty.handler.codec.compression.Brotli;
28  import io.netty.handler.codec.compression.BrotliOptions;
29  import io.netty.handler.codec.compression.CompressionOptions;
30  import io.netty.handler.codec.compression.DeflateOptions;
31  import io.netty.handler.codec.compression.GzipOptions;
32  import io.netty.handler.codec.compression.StandardCompressionOptions;
33  import io.netty.handler.codec.compression.Zstd;
34  import io.netty.handler.codec.compression.ZstdEncoder;
35  import io.netty.handler.codec.compression.ZstdOptions;
36  import io.netty.handler.codec.compression.SnappyFrameEncoder;
37  import io.netty.handler.codec.compression.SnappyOptions;
38  import io.netty.util.concurrent.PromiseCombiner;
39  import io.netty.util.internal.ObjectUtil;
40  import io.netty.util.internal.UnstableApi;
41  
42  import java.util.ArrayList;
43  import java.util.List;
44  
45  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
46  import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
47  import static io.netty.handler.codec.http.HttpHeaderValues.BR;
48  import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
49  import static io.netty.handler.codec.http.HttpHeaderValues.GZIP;
50  import static io.netty.handler.codec.http.HttpHeaderValues.IDENTITY;
51  import static io.netty.handler.codec.http.HttpHeaderValues.X_DEFLATE;
52  import static io.netty.handler.codec.http.HttpHeaderValues.X_GZIP;
53  import static io.netty.handler.codec.http.HttpHeaderValues.ZSTD;
54  import static io.netty.handler.codec.http.HttpHeaderValues.SNAPPY;
55  
56  /**
57   * A decorating HTTP2 encoder that will compress data frames according to the {@code content-encoding} header for each
58   * stream. The compression provided by this class will be applied to the data for the entire stream.
59   */
60  @UnstableApi
61  public class CompressorHttp2ConnectionEncoder extends DecoratingHttp2ConnectionEncoder {
62      // We cannot remove this because it'll be breaking change
63      public static final int DEFAULT_COMPRESSION_LEVEL = 6;
64      public static final int DEFAULT_WINDOW_BITS = 15;
65      public static final int DEFAULT_MEM_LEVEL = 8;
66  
67      private int compressionLevel;
68      private int windowBits;
69      private int memLevel;
70      private final Http2Connection.PropertyKey propertyKey;
71  
72      private final boolean supportsCompressionOptions;
73  
74      private BrotliOptions brotliOptions;
75      private GzipOptions gzipCompressionOptions;
76      private DeflateOptions deflateOptions;
77      private ZstdOptions zstdOptions;
78      private SnappyOptions snappyOptions;
79  
80      /**
81       * Create a new {@link CompressorHttp2ConnectionEncoder} instance
82       * with default implementation of {@link StandardCompressionOptions}
83       */
84      public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate) {
85          this(delegate, defaultCompressionOptions());
86      }
87  
88      private static CompressionOptions[] defaultCompressionOptions() {
89          List<CompressionOptions> compressionOptions = new ArrayList<CompressionOptions>();
90          compressionOptions.add(StandardCompressionOptions.gzip());
91          compressionOptions.add(StandardCompressionOptions.deflate());
92          compressionOptions.add(StandardCompressionOptions.snappy());
93          if (Brotli.isAvailable()) {
94              compressionOptions.add(StandardCompressionOptions.brotli());
95          }
96          if (Zstd.isAvailable()) {
97              compressionOptions.add(StandardCompressionOptions.zstd());
98          }
99          return compressionOptions.toArray(new CompressionOptions[0]);
100     }
101 
102     /**
103      * Create a new {@link CompressorHttp2ConnectionEncoder} instance
104      */
105     @Deprecated
106     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate, int compressionLevel, int windowBits,
107                                             int memLevel) {
108         super(delegate);
109         this.compressionLevel = ObjectUtil.checkInRange(compressionLevel, 0, 9, "compressionLevel");
110         this.windowBits = ObjectUtil.checkInRange(windowBits, 9, 15, "windowBits");
111         this.memLevel = ObjectUtil.checkInRange(memLevel, 1, 9, "memLevel");
112 
113         propertyKey = connection().newKey();
114         connection().addListener(new Http2ConnectionAdapter() {
115             @Override
116             public void onStreamRemoved(Http2Stream stream) {
117                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
118                 if (compressor != null) {
119                     cleanup(stream, compressor);
120                 }
121             }
122         });
123 
124         supportsCompressionOptions = false;
125     }
126 
127     /**
128      * Create a new {@link CompressorHttp2ConnectionEncoder} with
129      * specified {@link StandardCompressionOptions}
130      */
131     public CompressorHttp2ConnectionEncoder(Http2ConnectionEncoder delegate,
132                                             CompressionOptions... compressionOptionsArgs) {
133         super(delegate);
134         ObjectUtil.checkNotNull(compressionOptionsArgs, "CompressionOptions");
135         ObjectUtil.deepCheckNotNull("CompressionOptions", compressionOptionsArgs);
136 
137         for (CompressionOptions compressionOptions : compressionOptionsArgs) {
138             // BrotliOptions' class initialization depends on Brotli classes being on the classpath.
139             // The Brotli.isAvailable check ensures that BrotliOptions will only get instantiated if Brotli is on
140             // the classpath.
141             // This results in the static analysis of native-image identifying the instanceof BrotliOptions check
142             // and thus BrotliOptions itself as unreachable, enabling native-image to link all classes at build time
143             // and not complain about the missing Brotli classes.
144             if (Brotli.isAvailable() && compressionOptions instanceof BrotliOptions) {
145                 brotliOptions = (BrotliOptions) compressionOptions;
146             } else if (compressionOptions instanceof GzipOptions) {
147                 gzipCompressionOptions = (GzipOptions) compressionOptions;
148             } else if (compressionOptions instanceof DeflateOptions) {
149                 deflateOptions = (DeflateOptions) compressionOptions;
150             } else if (compressionOptions instanceof ZstdOptions) {
151                 zstdOptions = (ZstdOptions) compressionOptions;
152             } else if (compressionOptions instanceof SnappyOptions) {
153                 snappyOptions = (SnappyOptions) compressionOptions;
154             } else {
155                 throw new IllegalArgumentException("Unsupported " + CompressionOptions.class.getSimpleName() +
156                         ": " + compressionOptions);
157             }
158         }
159 
160         supportsCompressionOptions = true;
161 
162         propertyKey = connection().newKey();
163         connection().addListener(new Http2ConnectionAdapter() {
164             @Override
165             public void onStreamRemoved(Http2Stream stream) {
166                 final EmbeddedChannel compressor = stream.getProperty(propertyKey);
167                 if (compressor != null) {
168                     cleanup(stream, compressor);
169                 }
170             }
171         });
172     }
173 
174     @Override
175     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
176             final boolean endOfStream, ChannelPromise promise) {
177         final Http2Stream stream = connection().stream(streamId);
178         final EmbeddedChannel channel = stream == null ? null : (EmbeddedChannel) stream.getProperty(propertyKey);
179         if (channel == null) {
180             // The compressor may be null if no compatible encoding type was found in this stream's headers
181             return super.writeData(ctx, streamId, data, padding, endOfStream, promise);
182         }
183 
184         try {
185             // The channel will release the buffer after being written
186             channel.writeOutbound(data);
187             ByteBuf buf = nextReadableBuf(channel);
188             if (buf == null) {
189                 if (endOfStream) {
190                     if (channel.finish()) {
191                         buf = nextReadableBuf(channel);
192                     }
193                     return super.writeData(ctx, streamId, buf == null ? Unpooled.EMPTY_BUFFER : buf, padding,
194                             true, promise);
195                 }
196                 // END_STREAM is not set and the assumption is data is still forthcoming.
197                 promise.setSuccess();
198                 return promise;
199             }
200 
201             PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
202             for (;;) {
203                 ByteBuf nextBuf = nextReadableBuf(channel);
204                 boolean compressedEndOfStream = nextBuf == null && endOfStream;
205                 if (compressedEndOfStream && channel.finish()) {
206                     nextBuf = nextReadableBuf(channel);
207                     compressedEndOfStream = nextBuf == null;
208                 }
209 
210                 ChannelPromise bufPromise = ctx.newPromise();
211                 combiner.add(bufPromise);
212                 super.writeData(ctx, streamId, buf, padding, compressedEndOfStream, bufPromise);
213                 if (nextBuf == null) {
214                     break;
215                 }
216 
217                 padding = 0; // Padding is only communicated once on the first iteration
218                 buf = nextBuf;
219             }
220             combiner.finish(promise);
221         } catch (Throwable cause) {
222             promise.tryFailure(cause);
223         } finally {
224             if (endOfStream) {
225                 cleanup(stream, channel);
226             }
227         }
228         return promise;
229     }
230 
231     @Override
232     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
233             boolean endStream, ChannelPromise promise) {
234         try {
235             // Determine if compression is required and sanitize the headers.
236             EmbeddedChannel compressor = newCompressor(ctx, headers, endStream);
237 
238             // Write the headers and create the stream object.
239             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
240 
241             // After the stream object has been created, then attach the compressor as a property for data compression.
242             bindCompressorToStream(compressor, streamId);
243 
244             return future;
245         } catch (Throwable e) {
246             promise.tryFailure(e);
247         }
248         return promise;
249     }
250 
251     @Override
252     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId, final Http2Headers headers,
253             final int streamDependency, final short weight, final boolean exclusive, final int padding,
254             final boolean endOfStream, final ChannelPromise promise) {
255         try {
256             // Determine if compression is required and sanitize the headers.
257             EmbeddedChannel compressor = newCompressor(ctx, headers, endOfStream);
258 
259             // Write the headers and create the stream object.
260             ChannelFuture future = super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
261                                                       padding, endOfStream, promise);
262 
263             // After the stream object has been created, then attach the compressor as a property for data compression.
264             bindCompressorToStream(compressor, streamId);
265 
266             return future;
267         } catch (Throwable e) {
268             promise.tryFailure(e);
269         }
270         return promise;
271     }
272 
273     /**
274      * Returns a new {@link EmbeddedChannel} that encodes the HTTP2 message content encoded in the specified
275      * {@code contentEncoding}.
276      *
277      * @param ctx the context.
278      * @param contentEncoding the value of the {@code content-encoding} header
279      * @return a new {@link ByteToMessageDecoder} if the specified encoding is supported. {@code null} otherwise
280      * (alternatively, you can throw a {@link Http2Exception} to block unknown encoding).
281      * @throws Http2Exception If the specified encoding is not supported and warrants an exception
282      */
283     protected EmbeddedChannel newContentCompressor(ChannelHandlerContext ctx, CharSequence contentEncoding)
284             throws Http2Exception {
285         if (GZIP.contentEqualsIgnoreCase(contentEncoding) || X_GZIP.contentEqualsIgnoreCase(contentEncoding)) {
286             return newCompressionChannel(ctx, ZlibWrapper.GZIP);
287         }
288         if (DEFLATE.contentEqualsIgnoreCase(contentEncoding) || X_DEFLATE.contentEqualsIgnoreCase(contentEncoding)) {
289             return newCompressionChannel(ctx, ZlibWrapper.ZLIB);
290         }
291         if (Brotli.isAvailable() && brotliOptions != null && BR.contentEqualsIgnoreCase(contentEncoding)) {
292             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
293                     ctx.channel().config(), new BrotliEncoder(brotliOptions.parameters()));
294         }
295         if (zstdOptions != null && ZSTD.contentEqualsIgnoreCase(contentEncoding)) {
296             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
297                     ctx.channel().config(), new ZstdEncoder(zstdOptions.compressionLevel(),
298                     zstdOptions.blockSize(), zstdOptions.maxEncodeSize()));
299         }
300         if (snappyOptions != null && SNAPPY.contentEqualsIgnoreCase(contentEncoding)) {
301             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
302                     ctx.channel().config(), new SnappyFrameEncoder());
303         }
304         // 'identity' or unsupported
305         return null;
306     }
307 
308     /**
309      * Returns the expected content encoding of the decoded content. Returning {@code contentEncoding} is the default
310      * behavior, which is the case for most compressors.
311      *
312      * @param contentEncoding the value of the {@code content-encoding} header
313      * @return the expected content encoding of the new content.
314      * @throws Http2Exception if the {@code contentEncoding} is not supported and warrants an exception
315      */
316     protected CharSequence getTargetContentEncoding(CharSequence contentEncoding) throws Http2Exception {
317         return contentEncoding;
318     }
319 
320     /**
321      * Generate a new instance of an {@link EmbeddedChannel} capable of compressing data
322      * @param ctx the context.
323      * @param wrapper Defines what type of encoder should be used
324      */
325     private EmbeddedChannel newCompressionChannel(final ChannelHandlerContext ctx, ZlibWrapper wrapper) {
326         if (supportsCompressionOptions) {
327             if (wrapper == ZlibWrapper.GZIP && gzipCompressionOptions != null) {
328                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
329                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
330                         gzipCompressionOptions.compressionLevel(), gzipCompressionOptions.windowBits(),
331                         gzipCompressionOptions.memLevel()));
332             } else if (wrapper == ZlibWrapper.ZLIB && deflateOptions != null) {
333                 return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
334                         ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper,
335                         deflateOptions.compressionLevel(), deflateOptions.windowBits(),
336                         deflateOptions.memLevel()));
337             } else {
338                 throw new IllegalArgumentException("Unsupported ZlibWrapper: " + wrapper);
339             }
340         } else {
341             return new EmbeddedChannel(ctx.channel().id(), ctx.channel().metadata().hasDisconnect(),
342                     ctx.channel().config(), ZlibCodecFactory.newZlibEncoder(wrapper, compressionLevel, windowBits,
343                     memLevel));
344         }
345     }
346 
347     /**
348      * Checks if a new compressor object is needed for the stream identified by {@code streamId}. This method will
349      * modify the {@code content-encoding} header contained in {@code headers}.
350      *
351      * @param ctx the context.
352      * @param headers Object representing headers which are to be written
353      * @param endOfStream Indicates if the stream has ended
354      * @return The channel used to compress data.
355      * @throws Http2Exception if any problems occur during initialization.
356      */
357     private EmbeddedChannel newCompressor(ChannelHandlerContext ctx, Http2Headers headers, boolean endOfStream)
358             throws Http2Exception {
359         if (endOfStream) {
360             return null;
361         }
362 
363         CharSequence encoding = headers.get(CONTENT_ENCODING);
364         if (encoding == null) {
365             encoding = IDENTITY;
366         }
367         final EmbeddedChannel compressor = newContentCompressor(ctx, encoding);
368         if (compressor != null) {
369             CharSequence targetContentEncoding = getTargetContentEncoding(encoding);
370             if (IDENTITY.contentEqualsIgnoreCase(targetContentEncoding)) {
371                 headers.remove(CONTENT_ENCODING);
372             } else {
373                 headers.set(CONTENT_ENCODING, targetContentEncoding);
374             }
375 
376             // The content length will be for the decompressed data. Since we will compress the data
377             // this content-length will not be correct. Instead of queuing messages or delaying sending
378             // header frames...just remove the content-length header
379             headers.remove(CONTENT_LENGTH);
380         }
381 
382         return compressor;
383     }
384 
385     /**
386      * Called after the super class has written the headers and created any associated stream objects.
387      * @param compressor The compressor associated with the stream identified by {@code streamId}.
388      * @param streamId The stream id for which the headers were written.
389      */
390     private void bindCompressorToStream(EmbeddedChannel compressor, int streamId) {
391         if (compressor != null) {
392             Http2Stream stream = connection().stream(streamId);
393             if (stream != null) {
394                 stream.setProperty(propertyKey, compressor);
395             }
396         }
397     }
398 
399     /**
400      * Release remaining content from {@link EmbeddedChannel} and remove the compressor from the {@link Http2Stream}.
401      *
402      * @param stream The stream for which {@code compressor} is the compressor for
403      * @param compressor The compressor for {@code stream}
404      */
405     void cleanup(Http2Stream stream, EmbeddedChannel compressor) {
406         compressor.finishAndReleaseAll();
407         stream.removeProperty(propertyKey);
408     }
409 
410     /**
411      * Read the next compressed {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist.
412      *
413      * @param compressor The channel to read from
414      * @return The next decoded {@link ByteBuf} from the {@link EmbeddedChannel} or {@code null} if one does not exist
415      */
416     private static ByteBuf nextReadableBuf(EmbeddedChannel compressor) {
417         for (;;) {
418             final ByteBuf buf = compressor.readOutbound();
419             if (buf == null) {
420                 return null;
421             }
422             if (!buf.isReadable()) {
423                 buf.release();
424                 continue;
425             }
426             return buf;
427         }
428     }
429 }