1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelMetadata;
27 import io.netty.channel.ChannelOutboundBuffer;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.MessageSizeEstimator;
35 import io.netty.channel.RecvByteBufAllocator;
36 import io.netty.channel.VoidChannelPromise;
37 import io.netty.channel.WriteBufferWaterMark;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.ChannelOutputShutdownEvent;
40 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
41 import io.netty.handler.ssl.SslCloseCompletionEvent;
42 import io.netty.util.DefaultAttributeMap;
43 import io.netty.util.ReferenceCountUtil;
44 import io.netty.util.internal.StringUtil;
45 import io.netty.util.internal.logging.InternalLogger;
46 import io.netty.util.internal.logging.InternalLoggerFactory;
47
48 import java.io.IOException;
49 import java.net.SocketAddress;
50 import java.nio.channels.ClosedChannelException;
51 import java.util.ArrayDeque;
52 import java.util.Queue;
53 import java.util.concurrent.RejectedExecutionException;
54 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56
57 import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
58 import static io.netty.util.internal.ObjectUtil.checkNotNull;
59 import static java.lang.Math.min;
60
61 abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
62
63 static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
64 @Override
65 public boolean visit(Http2FrameStream stream) {
66 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
67 ((DefaultHttp2FrameStream) stream).attachment;
68 childChannel.trySetWritable();
69 return true;
70 }
71 };
72
73 static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
74 new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
75
76 static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
77 new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
78
79 static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
80 new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
81
82 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
83
84 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
85
86
87
88
89
90 private static final int MIN_HTTP2_FRAME_SIZE = 9;
91
92
93
94
95 private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
96
97 private final Object event;
98
99 UserEventStreamVisitor(Object event) {
100 this.event = checkNotNull(event, "event");
101 }
102
103 @Override
104 public boolean visit(Http2FrameStream stream) {
105 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
106 ((DefaultHttp2FrameStream) stream).attachment;
107 childChannel.pipeline().fireUserEventTriggered(event);
108 return true;
109 }
110 }
111
112
113
114
115 private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
116
117 static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
118
119 private static final Handle HANDLE_INSTANCE = new Handle() {
120 @Override
121 public int size(Object msg) {
122 return msg instanceof Http2DataFrame ?
123
124 (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
125 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
126 }
127 };
128
129 @Override
130 public Handle newHandle() {
131 return HANDLE_INSTANCE;
132 }
133 }
134
135 private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
136 AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
137
138 private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
139 AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
140
141 private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
142 Throwable cause = future.cause();
143 if (cause != null) {
144 Throwable unwrappedCause;
145
146 if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
147 cause = unwrappedCause;
148 }
149
150
151 streamChannel.pipeline().fireExceptionCaught(cause);
152 streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
153 }
154 }
155
156 private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
157 @Override
158 public void operationComplete(ChannelFuture future) {
159 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
160 }
161 };
162
163
164
165
166 private enum ReadStatus {
167
168
169
170 IDLE,
171
172
173
174
175 IN_PROGRESS,
176
177
178
179
180 REQUESTED
181 }
182
183 private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184 private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185 private final ChannelId channelId;
186 private final ChannelPipeline pipeline;
187 private final DefaultHttp2FrameStream stream;
188 private final ChannelPromise closePromise;
189
190 private volatile boolean registered;
191
192 private volatile long totalPendingSize;
193 private volatile int unwritable;
194
195
196 private Runnable fireChannelWritabilityChangedTask;
197
198 private boolean outboundClosed;
199 private int flowControlledBytes;
200
201
202
203
204
205
206
207 private ReadStatus readStatus = ReadStatus.IDLE;
208
209 private Queue<Object> inboundBuffer;
210
211
212 private boolean firstFrameWritten;
213 private boolean readCompletePending;
214
215 AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216 this.stream = stream;
217 stream.attachment = this;
218 pipeline = new DefaultChannelPipeline(this) {
219 @Override
220 protected void incrementPendingOutboundBytes(long size) {
221 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222 }
223
224 @Override
225 protected void decrementPendingOutboundBytes(long size) {
226 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227 }
228
229 @Override
230 protected void onUnhandledInboundException(Throwable cause) {
231
232 if (cause instanceof Http2FrameStreamException) {
233 closeWithError(((Http2FrameStreamException) cause).error());
234 return;
235 } else {
236 Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237 if (exception != null) {
238 closeWithError(exception.error());
239 return;
240 }
241 }
242 super.onUnhandledInboundException(cause);
243 }
244 };
245
246 closePromise = pipeline.newPromise();
247 channelId = new Http2StreamChannelId(parent().id(), id);
248
249 if (inboundHandler != null) {
250
251 pipeline.addLast(inboundHandler);
252 }
253 }
254
255 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256 if (size == 0) {
257 return;
258 }
259
260 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261 if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262 setUnwritable(invokeLater);
263 }
264 }
265
266 private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267 if (size == 0) {
268 return;
269 }
270
271 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272
273
274
275
276
277 if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278 setWritable(invokeLater);
279 }
280 }
281
282 final void trySetWritable() {
283
284
285
286
287 if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288 setWritable(false);
289 }
290 }
291
292 private void setWritable(boolean invokeLater) {
293 for (;;) {
294 final int oldValue = unwritable;
295 final int newValue = oldValue & ~1;
296 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297 if (oldValue != 0 && newValue == 0) {
298 fireChannelWritabilityChanged(invokeLater);
299 }
300 break;
301 }
302 }
303 }
304
305 private void setUnwritable(boolean invokeLater) {
306 for (;;) {
307 final int oldValue = unwritable;
308 final int newValue = oldValue | 1;
309 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310 if (oldValue == 0) {
311 fireChannelWritabilityChanged(invokeLater);
312 }
313 break;
314 }
315 }
316 }
317
318 private void fireChannelWritabilityChanged(boolean invokeLater) {
319 final ChannelPipeline pipeline = pipeline();
320 if (invokeLater) {
321 Runnable task = fireChannelWritabilityChangedTask;
322 if (task == null) {
323 fireChannelWritabilityChangedTask = task = new Runnable() {
324 @Override
325 public void run() {
326 pipeline.fireChannelWritabilityChanged();
327 }
328 };
329 }
330 eventLoop().execute(task);
331 } else {
332 pipeline.fireChannelWritabilityChanged();
333 }
334 }
335 @Override
336 public Http2FrameStream stream() {
337 return stream;
338 }
339
340 void closeOutbound() {
341 outboundClosed = true;
342 }
343
344 void streamClosed() {
345 unsafe.readEOS();
346
347
348 unsafe.doBeginRead();
349 }
350
351 @Override
352 public ChannelMetadata metadata() {
353 return METADATA;
354 }
355
356 @Override
357 public ChannelConfig config() {
358 return config;
359 }
360
361 @Override
362 public boolean isOpen() {
363 return !closePromise.isDone();
364 }
365
366 @Override
367 public boolean isActive() {
368 return isOpen();
369 }
370
371 @Override
372 public boolean isWritable() {
373 return unwritable == 0;
374 }
375
376 @Override
377 public ChannelId id() {
378 return channelId;
379 }
380
381 @Override
382 public EventLoop eventLoop() {
383 return parent().eventLoop();
384 }
385
386 @Override
387 public Channel parent() {
388 return parentContext().channel();
389 }
390
391 @Override
392 public boolean isRegistered() {
393 return registered;
394 }
395
396 @Override
397 public SocketAddress localAddress() {
398 return parent().localAddress();
399 }
400
401 @Override
402 public SocketAddress remoteAddress() {
403 return parent().remoteAddress();
404 }
405
406 @Override
407 public ChannelFuture closeFuture() {
408 return closePromise;
409 }
410
411 @Override
412 public long bytesBeforeUnwritable() {
413
414 long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415
416
417
418 return bytes > 0 && isWritable() ? bytes : 0;
419 }
420
421 @Override
422 public long bytesBeforeWritable() {
423
424 long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425
426
427
428 return bytes <= 0 || isWritable() ? 0 : bytes;
429 }
430
431 @Override
432 public Unsafe unsafe() {
433 return unsafe;
434 }
435
436 @Override
437 public ChannelPipeline pipeline() {
438 return pipeline;
439 }
440
441 @Override
442 public ByteBufAllocator alloc() {
443 return config().getAllocator();
444 }
445
446 @Override
447 public Channel read() {
448 pipeline().read();
449 return this;
450 }
451
452 @Override
453 public Channel flush() {
454 pipeline().flush();
455 return this;
456 }
457
458 @Override
459 public ChannelFuture bind(SocketAddress localAddress) {
460 return pipeline().bind(localAddress);
461 }
462
463 @Override
464 public ChannelFuture connect(SocketAddress remoteAddress) {
465 return pipeline().connect(remoteAddress);
466 }
467
468 @Override
469 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470 return pipeline().connect(remoteAddress, localAddress);
471 }
472
473 @Override
474 public ChannelFuture disconnect() {
475 return pipeline().disconnect();
476 }
477
478 @Override
479 public ChannelFuture close() {
480 return pipeline().close();
481 }
482
483 @Override
484 public ChannelFuture deregister() {
485 return pipeline().deregister();
486 }
487
488 @Override
489 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490 return pipeline().bind(localAddress, promise);
491 }
492
493 @Override
494 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495 return pipeline().connect(remoteAddress, promise);
496 }
497
498 @Override
499 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500 return pipeline().connect(remoteAddress, localAddress, promise);
501 }
502
503 @Override
504 public ChannelFuture disconnect(ChannelPromise promise) {
505 return pipeline().disconnect(promise);
506 }
507
508 @Override
509 public ChannelFuture close(ChannelPromise promise) {
510 return pipeline().close(promise);
511 }
512
513 @Override
514 public ChannelFuture deregister(ChannelPromise promise) {
515 return pipeline().deregister(promise);
516 }
517
518 @Override
519 public ChannelFuture write(Object msg) {
520 return pipeline().write(msg);
521 }
522
523 @Override
524 public ChannelFuture write(Object msg, ChannelPromise promise) {
525 return pipeline().write(msg, promise);
526 }
527
528 @Override
529 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530 return pipeline().writeAndFlush(msg, promise);
531 }
532
533 @Override
534 public ChannelFuture writeAndFlush(Object msg) {
535 return pipeline().writeAndFlush(msg);
536 }
537
538 @Override
539 public ChannelPromise newPromise() {
540 return pipeline().newPromise();
541 }
542
543 @Override
544 public ChannelProgressivePromise newProgressivePromise() {
545 return pipeline().newProgressivePromise();
546 }
547
548 @Override
549 public ChannelFuture newSucceededFuture() {
550 return pipeline().newSucceededFuture();
551 }
552
553 @Override
554 public ChannelFuture newFailedFuture(Throwable cause) {
555 return pipeline().newFailedFuture(cause);
556 }
557
558 @Override
559 public ChannelPromise voidPromise() {
560 return pipeline().voidPromise();
561 }
562
563 @Override
564 public int hashCode() {
565 return id().hashCode();
566 }
567
568 @Override
569 public boolean equals(Object o) {
570 return this == o;
571 }
572
573 @Override
574 public int compareTo(Channel o) {
575 if (this == o) {
576 return 0;
577 }
578
579 return id().compareTo(o.id());
580 }
581
582 @Override
583 public String toString() {
584 return parent().toString() + "(H2 - " + stream + ')';
585 }
586
587
588
589
590
591 void fireChildRead(Http2Frame frame) {
592 assert eventLoop().inEventLoop();
593 if (!isActive()) {
594 ReferenceCountUtil.release(frame);
595 } else if (readStatus != ReadStatus.IDLE) {
596
597
598 assert inboundBuffer == null || inboundBuffer.isEmpty();
599 final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600
601 unsafe.doRead0(frame, allocHandle);
602
603
604
605
606 if (allocHandle.continueReading()) {
607 maybeAddChannelToReadCompletePendingQueue();
608 } else {
609 unsafe.notifyReadComplete(allocHandle, true, false);
610 }
611 } else {
612 if (inboundBuffer == null) {
613 inboundBuffer = new ArrayDeque<Object>(4);
614 }
615 inboundBuffer.add(frame);
616 }
617 }
618
619 void fireChildReadComplete() {
620 assert eventLoop().inEventLoop();
621 assert readStatus != ReadStatus.IDLE || !readCompletePending;
622 unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623 }
624
625 final void closeWithError(Http2Error error) {
626 assert eventLoop().inEventLoop();
627 unsafe.close(unsafe.voidPromise(), error);
628 }
629
630 private final class Http2ChannelUnsafe implements Unsafe {
631 private final VoidChannelPromise unsafeVoidPromise =
632 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633 @SuppressWarnings("deprecation")
634 private RecvByteBufAllocator.Handle recvHandle;
635 private boolean writeDoneAndNoFlush;
636 private boolean closeInitiated;
637 private boolean readEOS;
638
639 private boolean receivedEndOfStream;
640 private boolean sentEndOfStream;
641
642 @Override
643 public void connect(final SocketAddress remoteAddress,
644 SocketAddress localAddress, final ChannelPromise promise) {
645 if (!promise.setUncancellable()) {
646 return;
647 }
648 promise.setFailure(new UnsupportedOperationException());
649 }
650
651 @Override
652 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653 if (recvHandle == null) {
654 recvHandle = config().getRecvByteBufAllocator().newHandle();
655 recvHandle.reset(config());
656 }
657 return recvHandle;
658 }
659
660 @Override
661 public SocketAddress localAddress() {
662 return parent().unsafe().localAddress();
663 }
664
665 @Override
666 public SocketAddress remoteAddress() {
667 return parent().unsafe().remoteAddress();
668 }
669
670 @Override
671 public void register(EventLoop eventLoop, ChannelPromise promise) {
672 if (!promise.setUncancellable()) {
673 return;
674 }
675 if (registered) {
676 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677 return;
678 }
679
680 registered = true;
681
682 promise.setSuccess();
683
684 pipeline().fireChannelRegistered();
685 if (isActive()) {
686 pipeline().fireChannelActive();
687 }
688 }
689
690 @Override
691 public void bind(SocketAddress localAddress, ChannelPromise promise) {
692 if (!promise.setUncancellable()) {
693 return;
694 }
695 promise.setFailure(new UnsupportedOperationException());
696 }
697
698 @Override
699 public void disconnect(ChannelPromise promise) {
700 close(promise);
701 }
702
703 @Override
704 public void close(final ChannelPromise promise) {
705 close(promise, Http2Error.CANCEL);
706 }
707
708 void close(final ChannelPromise promise, Http2Error error) {
709 if (!promise.setUncancellable()) {
710 return;
711 }
712 if (closeInitiated) {
713 if (closePromise.isDone()) {
714
715 promise.setSuccess();
716 } else if (!(promise instanceof VoidChannelPromise)) {
717
718 closePromise.addListener(new ChannelFutureListener() {
719 @Override
720 public void operationComplete(ChannelFuture future) {
721 promise.setSuccess();
722 }
723 });
724 }
725 return;
726 }
727 closeInitiated = true;
728
729 readCompletePending = false;
730
731 final boolean wasActive = isActive();
732
733
734
735
736
737
738 if (parent().isActive() && isStreamIdValid(stream.id()) &&
739
740 !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
741 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
742 write(resetFrame, unsafe().voidPromise());
743 flush();
744 }
745
746 if (inboundBuffer != null) {
747 for (;;) {
748 Object msg = inboundBuffer.poll();
749 if (msg == null) {
750 break;
751 }
752 ReferenceCountUtil.release(msg);
753 }
754 inboundBuffer = null;
755 }
756
757
758 outboundClosed = true;
759 closePromise.setSuccess();
760 promise.setSuccess();
761
762 fireChannelInactiveAndDeregister(voidPromise(), wasActive);
763 }
764
765 @Override
766 public void closeForcibly() {
767 close(unsafe().voidPromise());
768 }
769
770 @Override
771 public void deregister(ChannelPromise promise) {
772 fireChannelInactiveAndDeregister(promise, false);
773 }
774
775 private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
776 final boolean fireChannelInactive) {
777 if (!promise.setUncancellable()) {
778 return;
779 }
780
781 if (!registered) {
782 promise.setSuccess();
783 return;
784 }
785
786
787
788
789
790
791
792
793 invokeLater(new Runnable() {
794 @Override
795 public void run() {
796 if (fireChannelInactive) {
797 pipeline.fireChannelInactive();
798 }
799
800
801 if (registered) {
802 registered = false;
803 pipeline.fireChannelUnregistered();
804 }
805 safeSetSuccess(promise);
806 }
807 });
808 }
809
810 private void safeSetSuccess(ChannelPromise promise) {
811 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
812 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
813 }
814 }
815
816 private void invokeLater(Runnable task) {
817 try {
818
819
820
821
822
823
824
825
826
827
828
829 eventLoop().execute(task);
830 } catch (RejectedExecutionException e) {
831 logger.warn("Can't invoke task later as EventLoop rejected it", e);
832 }
833 }
834
835 @Override
836 public void beginRead() {
837 if (!isActive()) {
838 return;
839 }
840 updateLocalWindowIfNeeded();
841
842 switch (readStatus) {
843 case IDLE:
844 readStatus = ReadStatus.IN_PROGRESS;
845 doBeginRead();
846 break;
847 case IN_PROGRESS:
848 readStatus = ReadStatus.REQUESTED;
849 break;
850 default:
851 break;
852 }
853 }
854
855 private Object pollQueuedMessage() {
856 return inboundBuffer == null ? null : inboundBuffer.poll();
857 }
858
859 void doBeginRead() {
860 if (readStatus == ReadStatus.IDLE) {
861
862 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
863
864 flush();
865 unsafe.closeForcibly();
866 }
867 } else {
868 do {
869 Object message = pollQueuedMessage();
870 if (message == null) {
871
872 flush();
873 if (readEOS) {
874 unsafe.closeForcibly();
875 }
876 break;
877 }
878 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
879 allocHandle.reset(config());
880 boolean continueReading = false;
881 do {
882 doRead0((Http2Frame) message, allocHandle);
883 } while ((readEOS || (continueReading = allocHandle.continueReading()))
884 && (message = pollQueuedMessage()) != null);
885
886 if (continueReading && isParentReadInProgress() && !readEOS) {
887
888
889
890
891 maybeAddChannelToReadCompletePendingQueue();
892 } else {
893 notifyReadComplete(allocHandle, true, true);
894
895
896
897
898 resetReadStatus();
899 }
900 } while (readStatus != ReadStatus.IDLE);
901 }
902 }
903
904 void readEOS() {
905 readEOS = true;
906 }
907
908 private void updateLocalWindowIfNeeded() {
909 if (flowControlledBytes != 0 && !parentContext().isRemoved()) {
910 int bytes = flowControlledBytes;
911 flowControlledBytes = 0;
912 ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
913
914
915
916 writeDoneAndNoFlush = true;
917
918
919
920
921 if (future.isDone()) {
922 windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
923 } else {
924 future.addListener(windowUpdateFrameWriteListener);
925 }
926 }
927 }
928
929 private void resetReadStatus() {
930 readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
931 }
932
933 void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
934 boolean inReadLoop) {
935 if (!readCompletePending && !forceReadComplete) {
936 return;
937 }
938
939 readCompletePending = false;
940
941 if (!inReadLoop) {
942
943 resetReadStatus();
944 }
945
946 allocHandle.readComplete();
947 pipeline().fireChannelReadComplete();
948
949
950
951 flush();
952 if (readEOS) {
953 unsafe.closeForcibly();
954 }
955 }
956
957 @SuppressWarnings("deprecation")
958 void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
959 final int bytes;
960 if (frame instanceof Http2DataFrame) {
961 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
962
963
964
965
966
967 flowControlledBytes += bytes;
968 } else {
969 bytes = MIN_HTTP2_FRAME_SIZE;
970 }
971
972
973
974 receivedEndOfStream |= isEndOfStream(frame);
975
976
977 allocHandle.attemptedBytesRead(bytes);
978 allocHandle.lastBytesRead(bytes);
979 allocHandle.incMessagesRead(1);
980
981 pipeline().fireChannelRead(frame);
982 }
983
984 @Override
985 public void write(Object msg, final ChannelPromise promise) {
986
987 if (!promise.setUncancellable()) {
988 ReferenceCountUtil.release(msg);
989 return;
990 }
991
992 if (!isActive() ||
993
994 outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
995 ReferenceCountUtil.release(msg);
996 promise.setFailure(new ClosedChannelException());
997 return;
998 }
999
1000 try {
1001 if (msg instanceof Http2StreamFrame) {
1002 Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1003 writeHttp2StreamFrame(frame, promise);
1004 } else {
1005 String msgStr = msg.toString();
1006 ReferenceCountUtil.release(msg);
1007 promise.setFailure(new IllegalArgumentException(
1008 "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1009 ": " + msgStr));
1010 }
1011 } catch (Throwable t) {
1012 promise.tryFailure(t);
1013 }
1014 }
1015
1016 private boolean isEndOfStream(Http2Frame frame) {
1017 if (frame instanceof Http2HeadersFrame) {
1018 return ((Http2HeadersFrame) frame).isEndStream();
1019 }
1020 if (frame instanceof Http2DataFrame) {
1021 return ((Http2DataFrame) frame).isEndStream();
1022 }
1023 return false;
1024 }
1025
1026 private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1027 if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1028 ReferenceCountUtil.release(frame);
1029 promise.setFailure(
1030 new IllegalArgumentException("The first frame must be a headers frame. Was: "
1031 + frame.name()));
1032 return;
1033 }
1034
1035 final boolean firstWrite;
1036 if (firstFrameWritten) {
1037 firstWrite = false;
1038 } else {
1039 firstWrite = firstFrameWritten = true;
1040 }
1041
1042
1043
1044 sentEndOfStream |= isEndOfStream(frame);
1045 ChannelFuture f = write0(parentContext(), frame);
1046 if (f.isDone()) {
1047 if (firstWrite) {
1048 firstWriteComplete(f, promise);
1049 } else {
1050 writeComplete(f, promise);
1051 }
1052 } else {
1053 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1054 incrementPendingOutboundBytes(bytes, false);
1055 f.addListener(new ChannelFutureListener() {
1056 @Override
1057 public void operationComplete(ChannelFuture future) {
1058 if (firstWrite) {
1059 firstWriteComplete(future, promise);
1060 } else {
1061 writeComplete(future, promise);
1062 }
1063 decrementPendingOutboundBytes(bytes, false);
1064 }
1065 });
1066 writeDoneAndNoFlush = true;
1067 }
1068 }
1069
1070 private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1071 Throwable cause = future.cause();
1072 if (cause == null) {
1073 promise.setSuccess();
1074 } else {
1075
1076 closeForcibly();
1077 promise.setFailure(wrapStreamClosedError(cause));
1078 }
1079 }
1080
1081 private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1082 Throwable cause = future.cause();
1083 if (cause == null) {
1084 promise.setSuccess();
1085 } else {
1086 Throwable error = wrapStreamClosedError(cause);
1087
1088 if (error instanceof IOException) {
1089 if (config.isAutoClose()) {
1090
1091 closeForcibly();
1092 } else {
1093
1094 outboundClosed = true;
1095 }
1096 }
1097 promise.setFailure(error);
1098 }
1099 }
1100
1101 private Throwable wrapStreamClosedError(Throwable cause) {
1102
1103
1104 if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1105 return new ClosedChannelException().initCause(cause);
1106 }
1107 return cause;
1108 }
1109
1110 private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1111 if (frame.stream() != null && frame.stream() != stream) {
1112 String msgString = frame.toString();
1113 ReferenceCountUtil.release(frame);
1114 throw new IllegalArgumentException(
1115 "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1116 }
1117 return frame;
1118 }
1119
1120 @Override
1121 public void flush() {
1122
1123
1124
1125
1126 if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1127
1128 return;
1129 }
1130
1131
1132 writeDoneAndNoFlush = false;
1133 flush0(parentContext());
1134 }
1135
1136 @Override
1137 public ChannelPromise voidPromise() {
1138 return unsafeVoidPromise;
1139 }
1140
1141 @Override
1142 public ChannelOutboundBuffer outboundBuffer() {
1143
1144 return null;
1145 }
1146 }
1147
1148
1149
1150
1151
1152
1153 private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1154 Http2StreamChannelConfig(Channel channel) {
1155 super(channel);
1156 }
1157
1158 @Override
1159 public MessageSizeEstimator getMessageSizeEstimator() {
1160 return FlowControlledFrameSizeEstimator.INSTANCE;
1161 }
1162
1163 @Override
1164 public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1165 throw new UnsupportedOperationException();
1166 }
1167
1168 @Override
1169 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1170 if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1171 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1172 RecvByteBufAllocator.ExtendedHandle.class);
1173 }
1174 super.setRecvByteBufAllocator(allocator);
1175 return this;
1176 }
1177 }
1178
1179 private void maybeAddChannelToReadCompletePendingQueue() {
1180 if (!readCompletePending) {
1181 readCompletePending = true;
1182 addChannelToReadCompletePendingQueue();
1183 }
1184 }
1185
1186 protected void flush0(ChannelHandlerContext ctx) {
1187 ctx.flush();
1188 }
1189
1190 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1191 ChannelPromise promise = ctx.newPromise();
1192 ctx.write(msg, promise);
1193 return promise;
1194 }
1195
1196 protected abstract boolean isParentReadInProgress();
1197 protected abstract void addChannelToReadCompletePendingQueue();
1198 protected abstract ChannelHandlerContext parentContext();
1199 }