1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.channel.ChannelHandlerContext;
18 import io.netty.util.internal.UnstableApi;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24
25 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
26 import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_WEIGHT;
27 import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_WEIGHT;
28 import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
29 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
30 import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
31 import static io.netty.handler.codec.http2.Http2Exception.streamError;
32 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
33 import static io.netty.util.internal.ObjectUtil.checkNotNull;
34 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
35 import static java.lang.Math.max;
36 import static java.lang.Math.min;
37
38
39
40
41
42
43
44 @UnstableApi
45 public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
46 private static final InternalLogger logger =
47 InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
48 private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
49 private final Http2Connection connection;
50 private final Http2Connection.PropertyKey stateKey;
51 private final StreamByteDistributor streamByteDistributor;
52 private final FlowState connectionState;
53 private int initialWindowSize = DEFAULT_WINDOW_SIZE;
54 private WritabilityMonitor monitor;
55 private ChannelHandlerContext ctx;
56
57 public DefaultHttp2RemoteFlowController(Http2Connection connection) {
58 this(connection, (Listener) null);
59 }
60
61 public DefaultHttp2RemoteFlowController(Http2Connection connection,
62 StreamByteDistributor streamByteDistributor) {
63 this(connection, streamByteDistributor, null);
64 }
65
66 public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
67 this(connection, new WeightedFairQueueByteDistributor(connection), listener);
68 }
69
70 public DefaultHttp2RemoteFlowController(Http2Connection connection,
71 StreamByteDistributor streamByteDistributor,
72 final Listener listener) {
73 this.connection = checkNotNull(connection, "connection");
74 this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");
75
76
77 stateKey = connection.newKey();
78 connectionState = new FlowState(connection.connectionStream());
79 connection.connectionStream().setProperty(stateKey, connectionState);
80
81
82 listener(listener);
83 monitor.windowSize(connectionState, initialWindowSize);
84
85
86 connection.addListener(new Http2ConnectionAdapter() {
87 @Override
88 public void onStreamAdded(Http2Stream stream) {
89
90
91 stream.setProperty(stateKey, new FlowState(stream));
92 }
93
94 @Override
95 public void onStreamActive(Http2Stream stream) {
96
97
98 monitor.windowSize(state(stream), initialWindowSize);
99 }
100
101 @Override
102 public void onStreamClosed(Http2Stream stream) {
103
104
105 state(stream).cancel(STREAM_CLOSED, null);
106 }
107
108 @Override
109 public void onStreamHalfClosed(Http2Stream stream) {
110 if (HALF_CLOSED_LOCAL == stream.state()) {
111
112
113
114
115
116
117
118
119
120
121
122 state(stream).cancel(STREAM_CLOSED, null);
123 }
124 }
125 });
126 }
127
128
129
130
131
132
133 @Override
134 public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
135 this.ctx = checkNotNull(ctx, "ctx");
136
137
138
139 channelWritabilityChanged();
140
141
142
143
144
145 if (isChannelWritable()) {
146 writePendingBytes();
147 }
148 }
149
150 @Override
151 public ChannelHandlerContext channelHandlerContext() {
152 return ctx;
153 }
154
155 @Override
156 public void initialWindowSize(int newWindowSize) throws Http2Exception {
157 assert ctx == null || ctx.executor().inEventLoop();
158 monitor.initialWindowSize(newWindowSize);
159 }
160
161 @Override
162 public int initialWindowSize() {
163 return initialWindowSize;
164 }
165
166 @Override
167 public int windowSize(Http2Stream stream) {
168 return state(stream).windowSize();
169 }
170
171 @Override
172 public boolean isWritable(Http2Stream stream) {
173 return monitor.isWritable(state(stream));
174 }
175
176 @Override
177 public void channelWritabilityChanged() throws Http2Exception {
178 monitor.channelWritabilityChange();
179 }
180
181 @Override
182 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
183
184 assert weight >= MIN_WEIGHT && weight <= MAX_WEIGHT : "Invalid weight";
185 assert childStreamId != parentStreamId : "A stream cannot depend on itself";
186 assert childStreamId > 0 && parentStreamId >= 0 : "childStreamId must be > 0. parentStreamId must be >= 0.";
187
188 streamByteDistributor.updateDependencyTree(childStreamId, parentStreamId, weight, exclusive);
189 }
190
191 private boolean isChannelWritable() {
192 return ctx != null && isChannelWritable0();
193 }
194
195 private boolean isChannelWritable0() {
196 return ctx.channel().isWritable();
197 }
198
199 @Override
200 public void listener(Listener listener) {
201 monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
202 }
203
204 @Override
205 public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
206 assert ctx == null || ctx.executor().inEventLoop();
207 monitor.incrementWindowSize(state(stream), delta);
208 }
209
210 @Override
211 public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
212
213 assert ctx == null || ctx.executor().inEventLoop();
214 checkNotNull(frame, "frame");
215 try {
216 monitor.enqueueFrame(state(stream), frame);
217 } catch (Throwable t) {
218 frame.error(ctx, t);
219 }
220 }
221
222 @Override
223 public boolean hasFlowControlled(Http2Stream stream) {
224 return state(stream).hasFrame();
225 }
226
227 private FlowState state(Http2Stream stream) {
228 return (FlowState) stream.getProperty(stateKey);
229 }
230
231
232
233
234 private int connectionWindowSize() {
235 return connectionState.windowSize();
236 }
237
238 private int minUsableChannelBytes() {
239
240
241
242
243
244
245 return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
246 }
247
248 private int maxUsableChannelBytes() {
249
250 int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
251 int usableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;
252
253
254 return min(connectionState.windowSize(), usableBytes);
255 }
256
257
258
259
260
261 private int writableBytes() {
262 return min(connectionWindowSize(), maxUsableChannelBytes());
263 }
264
265 @Override
266 public void writePendingBytes() throws Http2Exception {
267 monitor.writePendingBytes();
268 }
269
270
271
272
273 private final class FlowState implements StreamByteDistributor.StreamState {
274 private final Http2Stream stream;
275 private final Deque<FlowControlled> pendingWriteQueue;
276 private int window;
277 private long pendingBytes;
278 private boolean markedWritable;
279
280
281
282
283 private boolean writing;
284
285
286
287 private boolean cancelled;
288
289 FlowState(Http2Stream stream) {
290 this.stream = stream;
291 pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
292 }
293
294
295
296
297
298 boolean isWritable() {
299 return windowSize() > pendingBytes() && !cancelled;
300 }
301
302
303
304
305 @Override
306 public Http2Stream stream() {
307 return stream;
308 }
309
310
311
312
313 boolean markedWritability() {
314 return markedWritable;
315 }
316
317
318
319
320 void markedWritability(boolean isWritable) {
321 this.markedWritable = isWritable;
322 }
323
324 @Override
325 public int windowSize() {
326 return window;
327 }
328
329
330
331
332 void windowSize(int initialWindowSize) {
333 window = initialWindowSize;
334 }
335
336
337
338
339
340 int writeAllocatedBytes(int allocated) {
341 final int initialAllocated = allocated;
342 int writtenBytes;
343
344 Throwable cause = null;
345 FlowControlled frame;
346 try {
347 assert !writing;
348 writing = true;
349
350
351 boolean writeOccurred = false;
352 while (!cancelled && (frame = peek()) != null) {
353 int maxBytes = min(allocated, writableWindow());
354 if (maxBytes <= 0 && frame.size() > 0) {
355
356
357 break;
358 }
359 writeOccurred = true;
360 int initialFrameSize = frame.size();
361 try {
362 frame.write(ctx, max(0, maxBytes));
363 if (frame.size() == 0) {
364
365
366
367 pendingWriteQueue.remove();
368 frame.writeComplete();
369 }
370 } finally {
371
372 allocated -= initialFrameSize - frame.size();
373 }
374 }
375
376 if (!writeOccurred) {
377
378 return -1;
379 }
380
381 } catch (Throwable t) {
382
383 cancelled = true;
384 cause = t;
385 } finally {
386 writing = false;
387
388
389 writtenBytes = initialAllocated - allocated;
390
391 decrementPendingBytes(writtenBytes, false);
392 decrementFlowControlWindow(writtenBytes);
393
394
395
396 if (cancelled) {
397 cancel(INTERNAL_ERROR, cause);
398 }
399 }
400 return writtenBytes;
401 }
402
403
404
405
406 int incrementStreamWindow(int delta) throws Http2Exception {
407 if (delta > 0 && Integer.MAX_VALUE - delta < window) {
408 throw streamError(stream.id(), FLOW_CONTROL_ERROR,
409 "Window size overflow for stream: %d", stream.id());
410 }
411 window += delta;
412
413 streamByteDistributor.updateStreamableBytes(this);
414 return window;
415 }
416
417
418
419
420 private int writableWindow() {
421 return min(window, connectionWindowSize());
422 }
423
424 @Override
425 public long pendingBytes() {
426 return pendingBytes;
427 }
428
429
430
431
432 void enqueueFrame(FlowControlled frame) {
433 FlowControlled last = pendingWriteQueue.peekLast();
434 if (last == null) {
435 enqueueFrameWithoutMerge(frame);
436 return;
437 }
438
439 int lastSize = last.size();
440 if (last.merge(ctx, frame)) {
441 incrementPendingBytes(last.size() - lastSize, true);
442 return;
443 }
444 enqueueFrameWithoutMerge(frame);
445 }
446
447 private void enqueueFrameWithoutMerge(FlowControlled frame) {
448 pendingWriteQueue.offer(frame);
449
450
451 incrementPendingBytes(frame.size(), true);
452 }
453
454 @Override
455 public boolean hasFrame() {
456 return !pendingWriteQueue.isEmpty();
457 }
458
459
460
461
462 private FlowControlled peek() {
463 return pendingWriteQueue.peek();
464 }
465
466
467
468
469
470
471 void cancel(Http2Error error, Throwable cause) {
472 cancelled = true;
473
474 if (writing) {
475 return;
476 }
477
478 FlowControlled frame = pendingWriteQueue.poll();
479 if (frame != null) {
480
481 final Http2Exception exception = streamError(stream.id(), error, cause,
482 "Stream closed before write could take place");
483 do {
484 writeError(frame, exception);
485 frame = pendingWriteQueue.poll();
486 } while (frame != null);
487 }
488
489 streamByteDistributor.updateStreamableBytes(this);
490
491 monitor.stateCancelled(this);
492 }
493
494
495
496
497
498 private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
499 pendingBytes += numBytes;
500 monitor.incrementPendingBytes(numBytes);
501 if (updateStreamableBytes) {
502 streamByteDistributor.updateStreamableBytes(this);
503 }
504 }
505
506
507
508
509 private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
510 incrementPendingBytes(-bytes, updateStreamableBytes);
511 }
512
513
514
515
516 private void decrementFlowControlWindow(int bytes) {
517 try {
518 int negativeBytes = -bytes;
519 connectionState.incrementStreamWindow(negativeBytes);
520 incrementStreamWindow(negativeBytes);
521 } catch (Http2Exception e) {
522
523 throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
524 }
525 }
526
527
528
529
530
531 private void writeError(FlowControlled frame, Http2Exception cause) {
532 assert ctx != null;
533 decrementPendingBytes(frame.size(), true);
534 frame.error(ctx, cause);
535 }
536 }
537
538
539
540
541 private class WritabilityMonitor implements StreamByteDistributor.Writer {
542 private boolean inWritePendingBytes;
543 private long totalPendingBytes;
544
545 @Override
546 public final void write(Http2Stream stream, int numBytes) {
547 state(stream).writeAllocatedBytes(numBytes);
548 }
549
550
551
552
553
554 void channelWritabilityChange() throws Http2Exception { }
555
556
557
558
559
560 void stateCancelled(FlowState state) { }
561
562
563
564
565
566
567 void windowSize(FlowState state, int initialWindowSize) {
568 state.windowSize(initialWindowSize);
569 }
570
571
572
573
574
575
576
577 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
578 state.incrementStreamWindow(delta);
579 }
580
581
582
583
584
585
586
587 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
588 state.enqueueFrame(frame);
589 }
590
591
592
593
594
595
596 final void incrementPendingBytes(int delta) {
597 totalPendingBytes += delta;
598
599
600
601 }
602
603
604
605
606
607
608 final boolean isWritable(FlowState state) {
609 return isWritableConnection() && state.isWritable();
610 }
611
612 final void writePendingBytes() throws Http2Exception {
613
614
615
616
617
618 if (inWritePendingBytes) {
619 return;
620 }
621 inWritePendingBytes = true;
622 try {
623 int bytesToWrite = writableBytes();
624
625
626 for (;;) {
627 if (!streamByteDistributor.distribute(bytesToWrite, this) ||
628 (bytesToWrite = writableBytes()) <= 0 ||
629 !isChannelWritable0()) {
630 break;
631 }
632 }
633 } finally {
634 inWritePendingBytes = false;
635 }
636 }
637
638 void initialWindowSize(int newWindowSize) throws Http2Exception {
639 checkPositiveOrZero(newWindowSize, "newWindowSize");
640
641 final int delta = newWindowSize - initialWindowSize;
642 initialWindowSize = newWindowSize;
643 connection.forEachActiveStream(new Http2StreamVisitor() {
644 @Override
645 public boolean visit(Http2Stream stream) throws Http2Exception {
646 state(stream).incrementStreamWindow(delta);
647 return true;
648 }
649 });
650
651 if (delta > 0 && isChannelWritable()) {
652
653 writePendingBytes();
654 }
655 }
656
657 final boolean isWritableConnection() {
658 return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
659 }
660 }
661
662
663
664
665
666
667
668
669
670 private final class ListenerWritabilityMonitor extends WritabilityMonitor implements Http2StreamVisitor {
671 private final Listener listener;
672
673 ListenerWritabilityMonitor(Listener listener) {
674 this.listener = listener;
675 }
676
677 @Override
678 public boolean visit(Http2Stream stream) throws Http2Exception {
679 FlowState state = state(stream);
680 if (isWritable(state) != state.markedWritability()) {
681 notifyWritabilityChanged(state);
682 }
683 return true;
684 }
685
686 @Override
687 void windowSize(FlowState state, int initialWindowSize) {
688 super.windowSize(state, initialWindowSize);
689 try {
690 checkStateWritability(state);
691 } catch (Http2Exception e) {
692 throw new RuntimeException("Caught unexpected exception from window", e);
693 }
694 }
695
696 @Override
697 void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
698 super.incrementWindowSize(state, delta);
699 checkStateWritability(state);
700 }
701
702 @Override
703 void initialWindowSize(int newWindowSize) throws Http2Exception {
704 super.initialWindowSize(newWindowSize);
705 if (isWritableConnection()) {
706
707
708 checkAllWritabilityChanged();
709 }
710 }
711
712 @Override
713 void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
714 super.enqueueFrame(state, frame);
715 checkConnectionThenStreamWritabilityChanged(state);
716 }
717
718 @Override
719 void stateCancelled(FlowState state) {
720 try {
721 checkConnectionThenStreamWritabilityChanged(state);
722 } catch (Http2Exception e) {
723 throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
724 }
725 }
726
727 @Override
728 void channelWritabilityChange() throws Http2Exception {
729 if (connectionState.markedWritability() != isChannelWritable()) {
730 checkAllWritabilityChanged();
731 }
732 }
733
734 private void checkStateWritability(FlowState state) throws Http2Exception {
735 if (isWritable(state) != state.markedWritability()) {
736 if (state == connectionState) {
737 checkAllWritabilityChanged();
738 } else {
739 notifyWritabilityChanged(state);
740 }
741 }
742 }
743
744 private void notifyWritabilityChanged(FlowState state) {
745 state.markedWritability(!state.markedWritability());
746 try {
747 listener.writabilityChanged(state.stream);
748 } catch (Throwable cause) {
749 logger.error("Caught Throwable from listener.writabilityChanged", cause);
750 }
751 }
752
753 private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
754
755 if (isWritableConnection() != connectionState.markedWritability()) {
756 checkAllWritabilityChanged();
757 } else if (isWritable(state) != state.markedWritability()) {
758 notifyWritabilityChanged(state);
759 }
760 }
761
762 private void checkAllWritabilityChanged() throws Http2Exception {
763
764 connectionState.markedWritability(isWritableConnection());
765 connection.forEachActiveStream(this);
766 }
767 }
768 }