1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandler;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.nio.channels.ClosedChannelException;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class ChunkedWriteHandler extends ChannelDuplexHandler {
71
72 private static final InternalLogger logger =
73 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74
75 private Queue<PendingWrite> queue;
76 private volatile ChannelHandlerContext ctx;
77
78 public ChunkedWriteHandler() {
79 }
80
81
82
83
84 @Deprecated
85 public ChunkedWriteHandler(int maxPendingWrites) {
86 checkPositive(maxPendingWrites, "maxPendingWrites");
87 }
88
89 private void allocateQueue() {
90 if (queue == null) {
91 queue = new ArrayDeque<PendingWrite>();
92 }
93 }
94
95 private boolean queueIsEmpty() {
96 return queue == null || queue.isEmpty();
97 }
98
99 @Override
100 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
101 this.ctx = ctx;
102 }
103
104
105
106
107 public void resumeTransfer() {
108 final ChannelHandlerContext ctx = this.ctx;
109 if (ctx == null) {
110 return;
111 }
112 if (ctx.executor().inEventLoop()) {
113 resumeTransfer0(ctx);
114 } else {
115
116 ctx.executor().execute(new Runnable() {
117
118 @Override
119 public void run() {
120 resumeTransfer0(ctx);
121 }
122 });
123 }
124 }
125
126 private void resumeTransfer0(ChannelHandlerContext ctx) {
127 try {
128 doFlush(ctx);
129 } catch (Exception e) {
130 logger.warn("Unexpected exception while sending chunks.", e);
131 }
132 }
133
134 @Override
135 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
136 if (!queueIsEmpty() || msg instanceof ChunkedInput) {
137 allocateQueue();
138 queue.add(new PendingWrite(msg, promise));
139 } else {
140 ctx.write(msg, promise);
141 }
142 }
143
144 @Override
145 public void flush(ChannelHandlerContext ctx) throws Exception {
146 doFlush(ctx);
147 }
148
149 @Override
150 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151 doFlush(ctx);
152 ctx.fireChannelInactive();
153 }
154
155 @Override
156 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
157 if (ctx.channel().isWritable()) {
158
159 doFlush(ctx);
160 }
161 ctx.fireChannelWritabilityChanged();
162 }
163
164 private void discard(Throwable cause) {
165 if (queueIsEmpty()) {
166 return;
167 }
168 for (;;) {
169 PendingWrite currentWrite = queue.poll();
170
171 if (currentWrite == null) {
172 break;
173 }
174 Object message = currentWrite.msg;
175 if (message instanceof ChunkedInput) {
176 ChunkedInput<?> in = (ChunkedInput<?>) message;
177 boolean endOfInput;
178 long inputLength;
179 try {
180 endOfInput = in.isEndOfInput();
181 inputLength = in.length();
182 closeInput(in);
183 } catch (Exception e) {
184 closeInput(in);
185 currentWrite.fail(e);
186 logger.warn("ChunkedInput failed", e);
187 continue;
188 }
189
190 if (!endOfInput) {
191 if (cause == null) {
192 cause = new ClosedChannelException();
193 }
194 currentWrite.fail(cause);
195 } else {
196 currentWrite.success(inputLength);
197 }
198 } else {
199 if (cause == null) {
200 cause = new ClosedChannelException();
201 }
202 currentWrite.fail(cause);
203 }
204 }
205 }
206
207 private void doFlush(final ChannelHandlerContext ctx) {
208 final Channel channel = ctx.channel();
209 if (!channel.isActive()) {
210 discard(null);
211 return;
212 }
213
214 if (queueIsEmpty()) {
215 ctx.flush();
216 return;
217 }
218
219 boolean requiresFlush = true;
220 ByteBufAllocator allocator = ctx.alloc();
221 while (channel.isWritable()) {
222 final PendingWrite currentWrite = queue.peek();
223
224 if (currentWrite == null) {
225 break;
226 }
227
228 if (currentWrite.promise.isDone()) {
229
230
231
232
233
234
235
236
237
238 queue.remove();
239 continue;
240 }
241
242 final Object pendingMessage = currentWrite.msg;
243
244 if (pendingMessage instanceof ChunkedInput) {
245 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
246 boolean endOfInput;
247 boolean suspend;
248 Object message = null;
249 try {
250 message = chunks.readChunk(allocator);
251 endOfInput = chunks.isEndOfInput();
252
253 suspend = message == null && !endOfInput;
254
255 } catch (final Throwable t) {
256 queue.remove();
257
258 if (message != null) {
259 ReferenceCountUtil.release(message);
260 }
261
262 closeInput(chunks);
263 currentWrite.fail(t);
264 break;
265 }
266
267 if (suspend) {
268
269
270
271 break;
272 }
273
274 if (message == null) {
275
276
277 message = Unpooled.EMPTY_BUFFER;
278 }
279
280 if (endOfInput) {
281
282
283 queue.remove();
284 }
285
286 ChannelFuture f = ctx.writeAndFlush(message);
287 if (endOfInput) {
288 if (f.isDone()) {
289 handleEndOfInputFuture(f, chunks, currentWrite);
290 } else {
291
292
293
294
295
296 f.addListener(new ChannelFutureListener() {
297 @Override
298 public void operationComplete(ChannelFuture future) {
299 handleEndOfInputFuture(future, chunks, currentWrite);
300 }
301 });
302 }
303 } else {
304 final boolean resume = !channel.isWritable();
305 if (f.isDone()) {
306 handleFuture(f, chunks, currentWrite, resume);
307 } else {
308 f.addListener(new ChannelFutureListener() {
309 @Override
310 public void operationComplete(ChannelFuture future) {
311 handleFuture(future, chunks, currentWrite, resume);
312 }
313 });
314 }
315 }
316 requiresFlush = false;
317 } else {
318 queue.remove();
319 ctx.write(pendingMessage, currentWrite.promise);
320 requiresFlush = true;
321 }
322
323 if (!channel.isActive()) {
324 discard(new ClosedChannelException());
325 break;
326 }
327 }
328
329 if (requiresFlush) {
330 ctx.flush();
331 }
332 }
333
334 private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
335 if (!future.isSuccess()) {
336 closeInput(input);
337 currentWrite.fail(future.cause());
338 } else {
339
340 long inputProgress = input.progress();
341 long inputLength = input.length();
342 closeInput(input);
343 currentWrite.progress(inputProgress, inputLength);
344 currentWrite.success(inputLength);
345 }
346 }
347
348 private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
349 if (!future.isSuccess()) {
350 closeInput(input);
351 currentWrite.fail(future.cause());
352 } else {
353 currentWrite.progress(input.progress(), input.length());
354 if (resume && future.channel().isWritable()) {
355 resumeTransfer();
356 }
357 }
358 }
359
360 private static void closeInput(ChunkedInput<?> chunks) {
361 try {
362 chunks.close();
363 } catch (Throwable t) {
364 logger.warn("Failed to close a ChunkedInput.", t);
365 }
366 }
367
368 private static final class PendingWrite {
369 final Object msg;
370 final ChannelPromise promise;
371
372 PendingWrite(Object msg, ChannelPromise promise) {
373 this.msg = msg;
374 this.promise = promise;
375 }
376
377 void fail(Throwable cause) {
378 ReferenceCountUtil.release(msg);
379 promise.tryFailure(cause);
380 }
381
382 void success(long total) {
383 if (promise.isDone()) {
384
385 return;
386 }
387 progress(total, total);
388 promise.trySuccess();
389 }
390
391 void progress(long progress, long total) {
392 if (promise instanceof ChannelProgressivePromise) {
393 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
394 }
395 }
396 }
397 }