1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel;
19 import com.aayushatharva.brotli4j.encoder.Encoder;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelHandler;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.handler.codec.MessageToByteEncoder;
28 import io.netty.util.AttributeKey;
29 import io.netty.util.ReferenceCountUtil;
30 import io.netty.util.internal.ObjectUtil;
31
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.WritableByteChannel;
36
37
38
39
40
41
42 @ChannelHandler.Sharable
43 public final class BrotliEncoder extends MessageToByteEncoder<ByteBuf> {
44
45 private static final AttributeKey<Writer> ATTR = AttributeKey.valueOf("BrotliEncoderWriter");
46
47 private final Encoder.Parameters parameters;
48 private final boolean isSharable;
49 private Writer writer;
50
51
52
53
54
55 public BrotliEncoder() {
56 this(BrotliOptions.DEFAULT);
57 }
58
59
60
61
62
63
64
65 public BrotliEncoder(BrotliOptions brotliOptions) {
66 this(brotliOptions.parameters());
67 }
68
69
70
71
72
73
74
75 public BrotliEncoder(Encoder.Parameters parameters) {
76 this(parameters, true);
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 public BrotliEncoder(Encoder.Parameters parameters, boolean isSharable) {
96 this.parameters = ObjectUtil.checkNotNull(parameters, "Parameters");
97 this.isSharable = isSharable;
98 }
99
100 @Override
101 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
102 Writer writer = new Writer(parameters, ctx);
103 if (isSharable) {
104 ctx.channel().attr(ATTR).set(writer);
105 } else {
106 this.writer = writer;
107 }
108 super.handlerAdded(ctx);
109 }
110
111 @Override
112 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
113 finish(ctx);
114 super.handlerRemoved(ctx);
115 }
116
117 @Override
118 protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
119
120 }
121
122 @Override
123 protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) throws Exception {
124 if (!msg.isReadable()) {
125 return Unpooled.EMPTY_BUFFER;
126 }
127
128 Writer writer;
129 if (isSharable) {
130 writer = ctx.channel().attr(ATTR).get();
131 } else {
132 writer = this.writer;
133 }
134
135
136 if (writer == null) {
137 return Unpooled.EMPTY_BUFFER;
138 } else {
139 writer.encode(msg, preferDirect);
140 return writer.writableBuffer;
141 }
142 }
143
144 @Override
145 public boolean isSharable() {
146 return isSharable;
147 }
148
149
150
151
152
153
154
155 public void finish(ChannelHandlerContext ctx) throws IOException {
156 finishEncode(ctx, ctx.newPromise());
157 }
158
159 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
160 Writer writer;
161
162 if (isSharable) {
163 writer = ctx.channel().attr(ATTR).getAndSet(null);
164 } else {
165 writer = this.writer;
166 }
167
168 if (writer != null) {
169 writer.close();
170 this.writer = null;
171 }
172 return promise;
173 }
174
175 @Override
176 public void close(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
177 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
178 EncoderUtil.closeAfterFinishEncode(ctx, f, promise);
179 }
180
181
182
183
184
185 private static final class Writer implements WritableByteChannel {
186
187 private ByteBuf writableBuffer;
188 private final BrotliEncoderChannel brotliEncoderChannel;
189 private final ChannelHandlerContext ctx;
190 private boolean isClosed;
191
192 private Writer(Encoder.Parameters parameters, ChannelHandlerContext ctx) throws IOException {
193 brotliEncoderChannel = new BrotliEncoderChannel(this, parameters);
194 this.ctx = ctx;
195 }
196
197 private void encode(ByteBuf msg, boolean preferDirect) throws Exception {
198 try {
199 allocate(preferDirect);
200
201
202
203
204
205
206
207
208 ByteBuffer nioBuffer = CompressionUtil.safeReadableNioBuffer(msg);
209 int position = nioBuffer.position();
210 brotliEncoderChannel.write(nioBuffer);
211 msg.skipBytes(nioBuffer.position() - position);
212 brotliEncoderChannel.flush();
213 } catch (Exception e) {
214 ReferenceCountUtil.release(msg);
215 throw e;
216 }
217 }
218
219 private void allocate(boolean preferDirect) {
220 if (preferDirect) {
221 writableBuffer = ctx.alloc().ioBuffer();
222 } else {
223 writableBuffer = ctx.alloc().buffer();
224 }
225 }
226
227 @Override
228 public int write(ByteBuffer src) throws IOException {
229 if (!isOpen()) {
230 throw new ClosedChannelException();
231 }
232
233 return writableBuffer.writeBytes(src).readableBytes();
234 }
235
236 @Override
237 public boolean isOpen() {
238 return !isClosed;
239 }
240
241 @Override
242 public void close() {
243 final ChannelPromise promise = ctx.newPromise();
244
245 ctx.executor().execute(new Runnable() {
246 @Override
247 public void run() {
248 try {
249 finish(promise);
250 } catch (IOException ex) {
251 promise.setFailure(new IllegalStateException("Failed to finish encoding", ex));
252 }
253 }
254 });
255 }
256
257 public void finish(final ChannelPromise promise) throws IOException {
258 if (!isClosed) {
259
260 allocate(true);
261
262 try {
263 brotliEncoderChannel.close();
264 isClosed = true;
265 } catch (Exception ex) {
266 promise.setFailure(ex);
267
268
269
270 ReferenceCountUtil.release(writableBuffer);
271 return;
272 }
273
274 ctx.writeAndFlush(writableBuffer, promise);
275 }
276 }
277 }
278 }