查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import com.ning.compress.BufferRecycler;
19  import com.ning.compress.lzf.ChunkEncoder;
20  import com.ning.compress.lzf.LZFChunk;
21  import com.ning.compress.lzf.LZFEncoder;
22  import com.ning.compress.lzf.util.ChunkEncoderFactory;
23  import io.netty.buffer.ByteBuf;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.handler.codec.MessageToByteEncoder;
26  
27  import static com.ning.compress.lzf.LZFChunk.MAX_CHUNK_LEN;
28  
29  /**
30   * Compresses a {@link ByteBuf} using the LZF format.
31   * <p>
32   * See original <a href="http://oldhome.schmorp.de/marc/liblzf.html">LZF package</a>
33   * and <a href="https://github.com/ning/compress/wiki/LZFFormat">LZF format</a> for full description.
34   */
35  public class LzfEncoder extends MessageToByteEncoder<ByteBuf> {
36  
37      /**
38       * Minimum block size ready for compression. Blocks with length
39       * less than {@link #MIN_BLOCK_TO_COMPRESS} will write as uncompressed.
40       */
41      private static final int MIN_BLOCK_TO_COMPRESS = 16;
42  
43      /**
44       * Compress threshold for LZF format. When the amount of input data is less than compressThreshold,
45       * we will construct an uncompressed output according to the LZF format.
46       * <p>
47       * When the value is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, since LZF will not compress data
48       * that is less than {@see ChunkEncoder#MIN_BLOCK_TO_COMPRESS}, compressThreshold will not work.
49       */
50      private final int compressThreshold;
51  
52      /**
53       * Underlying decoder in use.
54       */
55      private final ChunkEncoder encoder;
56  
57      /**
58       * Object that handles details of buffer recycling.
59       */
60      private final BufferRecycler recycler;
61  
62      /**
63       * Creates a new LZF encoder with the most optimal available methods for underlying data access.
64       * It will "unsafe" instance if one can be used on current JVM.
65       * It should be safe to call this constructor as implementations are dynamically loaded; however, on some
66       * non-standard platforms it may be necessary to use {@link #LzfEncoder(boolean)} with {@code true} param.
67       */
68      public LzfEncoder() {
69          this(false);
70      }
71  
72      /**
73       * Creates a new LZF encoder with specified encoding instance.
74       *
75       * @param safeInstance If {@code true} encoder will use {@link ChunkEncoder} that only uses
76       *                     standard JDK access methods, and should work on all Java platforms and JVMs.
77       *                     Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
78       *                     implementation that uses Sun JDK's {@link sun.misc.Unsafe}
79       *                     class (which may be included by other JDK's as well).
80       */
81      public LzfEncoder(boolean safeInstance) {
82          this(safeInstance, MAX_CHUNK_LEN);
83      }
84  
85      /**
86       * Creates a new LZF encoder with specified encoding instance and compressThreshold.
87       *
88       * @param safeInstance      If {@code true} encoder will use {@link ChunkEncoder} that only uses standard
89       *                          JDK access methods, and should work on all Java platforms and JVMs.
90       *                          Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
91       *                          implementation that uses Sun JDK's {@link sun.misc.Unsafe}
92       *                          class (which may be included by other JDK's as well).
93       * @param totalLength       Expected total length of content to compress; only matters for outgoing messages
94       *                          that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
95       */
96      public LzfEncoder(boolean safeInstance, int totalLength) {
97          this(safeInstance, totalLength, MIN_BLOCK_TO_COMPRESS);
98      }
99  
100     /**
101      * Creates a new LZF encoder with specified total length of encoded chunk. You can configure it to encode
102      * your data flow more efficient if you know the average size of messages that you send.
103      *
104      * @param totalLength Expected total length of content to compress;
105      *                    only matters for outgoing messages that is smaller than maximum chunk size (64k),
106      *                    to optimize encoding hash tables.
107      */
108     public LzfEncoder(int totalLength) {
109         this(false, totalLength);
110     }
111 
112     /**
113      * Creates a new LZF encoder with specified settings.
114      *
115      * @param safeInstance          If {@code true} encoder will use {@link ChunkEncoder} that only uses standard JDK
116      *                              access methods, and should work on all Java platforms and JVMs.
117      *                              Otherwise encoder will try to use highly optimized {@link ChunkEncoder}
118      *                              implementation that uses Sun JDK's {@link sun.misc.Unsafe}
119      *                              class (which may be included by other JDK's as well).
120      * @param totalLength           Expected total length of content to compress; only matters for outgoing messages
121      *                              that is smaller than maximum chunk size (64k), to optimize encoding hash tables.
122      * @param compressThreshold     Compress threshold for LZF format. When the amount of input data is less than
123      *                              compressThreshold, we will construct an uncompressed output according
124      *                              to the LZF format.
125      */
126     public LzfEncoder(boolean safeInstance, int totalLength, int compressThreshold) {
127         super(false);
128         if (totalLength < MIN_BLOCK_TO_COMPRESS || totalLength > MAX_CHUNK_LEN) {
129             throw new IllegalArgumentException("totalLength: " + totalLength +
130                     " (expected: " + MIN_BLOCK_TO_COMPRESS + '-' + MAX_CHUNK_LEN + ')');
131         }
132 
133         if (compressThreshold < MIN_BLOCK_TO_COMPRESS) {
134             // not a suitable value.
135             throw new IllegalArgumentException("compressThreshold:" + compressThreshold +
136                     " expected >=" + MIN_BLOCK_TO_COMPRESS);
137         }
138         this.compressThreshold = compressThreshold;
139 
140         this.encoder = safeInstance ?
141                 ChunkEncoderFactory.safeNonAllocatingInstance(totalLength)
142                 : ChunkEncoderFactory.optimalNonAllocatingInstance(totalLength);
143 
144         this.recycler = BufferRecycler.instance();
145     }
146 
147     @Override
148     protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
149         final int length = in.readableBytes();
150         final int idx = in.readerIndex();
151         final byte[] input;
152         final int inputPtr;
153         if (in.hasArray()) {
154             input = in.array();
155             inputPtr = in.arrayOffset() + idx;
156         } else {
157             input = recycler.allocInputBuffer(length);
158             in.getBytes(idx, input, 0, length);
159             inputPtr = 0;
160         }
161 
162         // Estimate may apparently under-count by one in some cases.
163         final int maxOutputLength = LZFEncoder.estimateMaxWorkspaceSize(length) + 1;
164         out.ensureWritable(maxOutputLength);
165         final byte[] output;
166         final int outputPtr;
167         if (out.hasArray()) {
168             output = out.array();
169             outputPtr = out.arrayOffset() + out.writerIndex();
170         } else {
171             output = new byte[maxOutputLength];
172             outputPtr = 0;
173         }
174 
175         final int outputLength;
176         if (length >= compressThreshold) {
177             // compress.
178             outputLength = encodeCompress(input, inputPtr, length, output, outputPtr);
179         } else {
180             // not compress.
181             outputLength = encodeNonCompress(input, inputPtr, length, output, outputPtr);
182         }
183 
184         if (out.hasArray()) {
185             out.writerIndex(out.writerIndex() + outputLength);
186         } else {
187             out.writeBytes(output, 0, outputLength);
188         }
189 
190         in.skipBytes(length);
191 
192         if (!in.hasArray()) {
193             recycler.releaseInputBuffer(input);
194         }
195     }
196 
197     private int encodeCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
198         return LZFEncoder.appendEncoded(encoder,
199                 input, inputPtr, length, output, outputPtr) - outputPtr;
200     }
201 
202     private static int lzfEncodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
203         int left = length;
204         int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
205         outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
206         left -= chunkLen;
207         if (left < 1) {
208             return outputPtr;
209         }
210         inputPtr += chunkLen;
211         do {
212             chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
213             outputPtr = LZFChunk.appendNonCompressed(input, inputPtr, chunkLen, output, outputPtr);
214             inputPtr += chunkLen;
215             left -= chunkLen;
216         } while (left > 0);
217         return outputPtr;
218     }
219 
220     /**
221      * Use lzf uncompressed format to encode a piece of input.
222      */
223     private static int encodeNonCompress(byte[] input, int inputPtr, int length, byte[] output, int outputPtr) {
224         return lzfEncodeNonCompress(input, inputPtr, length, output, outputPtr) - outputPtr;
225     }
226 
227     @Override
228     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
229         encoder.close();
230         super.handlerRemoved(ctx);
231     }
232 }