1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.handler.codec.MessageToByteEncoder;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.PromiseNotifier;
26
27 import static io.netty.handler.codec.compression.Bzip2Constants.BASE_BLOCK_SIZE;
28 import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_1;
29 import static io.netty.handler.codec.compression.Bzip2Constants.END_OF_STREAM_MAGIC_2;
30 import static io.netty.handler.codec.compression.Bzip2Constants.MAGIC_NUMBER;
31 import static io.netty.handler.codec.compression.Bzip2Constants.MAX_BLOCK_SIZE;
32 import static io.netty.handler.codec.compression.Bzip2Constants.MIN_BLOCK_SIZE;
33
34
35
36
37
38
39 public class Bzip2Encoder extends MessageToByteEncoder<ByteBuf> {
40
41
42
43 private enum State {
44 INIT,
45 INIT_BLOCK,
46 WRITE_DATA,
47 CLOSE_BLOCK
48 }
49
50 private State currentState = State.INIT;
51
52
53
54
55 private final Bzip2BitWriter writer = new Bzip2BitWriter();
56
57
58
59
60 private final int streamBlockSize;
61
62
63
64
65 private int streamCRC;
66
67
68
69
70 private Bzip2BlockCompressor blockCompressor;
71
72
73
74
75 private volatile boolean finished;
76
77
78
79
80 private volatile ChannelHandlerContext ctx;
81
82
83
84
85 public Bzip2Encoder() {
86 this(MAX_BLOCK_SIZE);
87 }
88
89
90
91
92
93
94
95
96 public Bzip2Encoder(final int blockSizeMultiplier) {
97 if (blockSizeMultiplier < MIN_BLOCK_SIZE || blockSizeMultiplier > MAX_BLOCK_SIZE) {
98 throw new IllegalArgumentException(
99 "blockSizeMultiplier: " + blockSizeMultiplier + " (expected: 1-9)");
100 }
101 streamBlockSize = blockSizeMultiplier * BASE_BLOCK_SIZE;
102 }
103
104 @Override
105 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
106 if (finished) {
107 out.writeBytes(in);
108 return;
109 }
110
111 for (;;) {
112 switch (currentState) {
113 case INIT:
114 out.ensureWritable(4);
115 out.writeMedium(MAGIC_NUMBER);
116 out.writeByte('0' + streamBlockSize / BASE_BLOCK_SIZE);
117 currentState = State.INIT_BLOCK;
118
119 case INIT_BLOCK:
120 blockCompressor = new Bzip2BlockCompressor(writer, streamBlockSize);
121 currentState = State.WRITE_DATA;
122
123 case WRITE_DATA:
124 if (!in.isReadable()) {
125 return;
126 }
127 Bzip2BlockCompressor blockCompressor = this.blockCompressor;
128 final int length = Math.min(in.readableBytes(), blockCompressor.availableSize());
129 final int bytesWritten = blockCompressor.write(in, in.readerIndex(), length);
130 in.skipBytes(bytesWritten);
131 if (!blockCompressor.isFull()) {
132 if (in.isReadable()) {
133 break;
134 } else {
135 return;
136 }
137 }
138 currentState = State.CLOSE_BLOCK;
139
140 case CLOSE_BLOCK:
141 closeBlock(out);
142 currentState = State.INIT_BLOCK;
143 break;
144 default:
145 throw new IllegalStateException();
146 }
147 }
148 }
149
150
151
152
153 private void closeBlock(ByteBuf out) {
154 final Bzip2BlockCompressor blockCompressor = this.blockCompressor;
155 if (!blockCompressor.isEmpty()) {
156 blockCompressor.close(out);
157 final int blockCRC = blockCompressor.crc();
158 streamCRC = (streamCRC << 1 | streamCRC >>> 31) ^ blockCRC;
159 }
160 }
161
162
163
164
165 public boolean isClosed() {
166 return finished;
167 }
168
169
170
171
172
173
174 public ChannelFuture close() {
175 return close(ctx().newPromise());
176 }
177
178
179
180
181
182
183 public ChannelFuture close(final ChannelPromise promise) {
184 ChannelHandlerContext ctx = ctx();
185 EventExecutor executor = ctx.executor();
186 if (executor.inEventLoop()) {
187 return finishEncode(ctx, promise);
188 } else {
189 executor.execute(new Runnable() {
190 @Override
191 public void run() {
192 ChannelFuture f = finishEncode(ctx(), promise);
193 PromiseNotifier.cascade(f, promise);
194 }
195 });
196 return promise;
197 }
198 }
199
200 @Override
201 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
202 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
203 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
204 }
205
206 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, ChannelPromise promise) {
207 if (finished) {
208 promise.setSuccess();
209 return promise;
210 }
211 finished = true;
212
213 final ByteBuf footer = ctx.alloc().buffer();
214 closeBlock(footer);
215
216 final int streamCRC = this.streamCRC;
217 final Bzip2BitWriter writer = this.writer;
218 try {
219 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_1);
220 writer.writeBits(footer, 24, END_OF_STREAM_MAGIC_2);
221 writer.writeInt(footer, streamCRC);
222 writer.flush(footer);
223 } finally {
224 blockCompressor = null;
225 }
226 return ctx.writeAndFlush(footer, promise);
227 }
228
229 private ChannelHandlerContext ctx() {
230 ChannelHandlerContext ctx = this.ctx;
231 if (ctx == null) {
232 throw new IllegalStateException("not added to a pipeline");
233 }
234 return ctx;
235 }
236
237 @Override
238 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
239 this.ctx = ctx;
240 }
241 }