查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelMetadata;
27  import io.netty.channel.ChannelOutboundBuffer;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.MessageSizeEstimator;
35  import io.netty.channel.RecvByteBufAllocator;
36  import io.netty.channel.VoidChannelPromise;
37  import io.netty.channel.WriteBufferWaterMark;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.ChannelOutputShutdownEvent;
40  import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
41  import io.netty.handler.ssl.SslCloseCompletionEvent;
42  import io.netty.util.DefaultAttributeMap;
43  import io.netty.util.ReferenceCountUtil;
44  import io.netty.util.internal.StringUtil;
45  import io.netty.util.internal.logging.InternalLogger;
46  import io.netty.util.internal.logging.InternalLoggerFactory;
47  
48  import java.io.IOException;
49  import java.net.SocketAddress;
50  import java.nio.channels.ClosedChannelException;
51  import java.util.ArrayDeque;
52  import java.util.Queue;
53  import java.util.concurrent.RejectedExecutionException;
54  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
55  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
56  
57  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
58  import static io.netty.util.internal.ObjectUtil.checkNotNull;
59  import static java.lang.Math.min;
60  
61  abstract class AbstractHttp2StreamChannel extends DefaultAttributeMap implements Http2StreamChannel {
62  
63      static final Http2FrameStreamVisitor WRITABLE_VISITOR = new Http2FrameStreamVisitor() {
64          @Override
65          public boolean visit(Http2FrameStream stream) {
66              final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
67                      ((DefaultHttp2FrameStream) stream).attachment;
68              childChannel.trySetWritable();
69              return true;
70          }
71      };
72  
73      static final Http2FrameStreamVisitor CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR =
74              new UserEventStreamVisitor(ChannelInputShutdownReadComplete.INSTANCE);
75  
76      static final Http2FrameStreamVisitor CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR =
77              new UserEventStreamVisitor(ChannelOutputShutdownEvent.INSTANCE);
78  
79      static final Http2FrameStreamVisitor SSL_CLOSE_COMPLETION_EVENT_VISITOR =
80              new UserEventStreamVisitor(SslCloseCompletionEvent.SUCCESS);
81  
82      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractHttp2StreamChannel.class);
83  
84      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
85  
86      /**
87       * Number of bytes to consider non-payload messages. 9 is arbitrary, but also the minimum size of an HTTP/2 frame.
88       * Primarily is non-zero.
89       */
90      private static final int MIN_HTTP2_FRAME_SIZE = 9;
91  
92      /**
93       * {@link Http2FrameStreamVisitor} that fires the user event for every active stream pipeline.
94       */
95      private static final class UserEventStreamVisitor implements Http2FrameStreamVisitor {
96  
97          private final Object event;
98  
99          UserEventStreamVisitor(Object event) {
100             this.event = checkNotNull(event, "event");
101         }
102 
103         @Override
104         public boolean visit(Http2FrameStream stream) {
105             final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
106                     ((DefaultHttp2FrameStream) stream).attachment;
107             childChannel.pipeline().fireUserEventTriggered(event);
108             return true;
109         }
110     }
111 
112     /**
113      * Returns the flow-control size for DATA frames, and {@value MIN_HTTP2_FRAME_SIZE} for all other frames.
114      */
115     private static final class FlowControlledFrameSizeEstimator implements MessageSizeEstimator {
116 
117         static final FlowControlledFrameSizeEstimator INSTANCE = new FlowControlledFrameSizeEstimator();
118 
119         private static final Handle HANDLE_INSTANCE = new Handle() {
120             @Override
121             public int size(Object msg) {
122                 return msg instanceof Http2DataFrame ?
123                         // Guard against overflow.
124                         (int) min(Integer.MAX_VALUE, ((Http2DataFrame) msg).initialFlowControlledBytes() +
125                                 (long) MIN_HTTP2_FRAME_SIZE) : MIN_HTTP2_FRAME_SIZE;
126             }
127         };
128 
129         @Override
130         public Handle newHandle() {
131             return HANDLE_INSTANCE;
132         }
133     }
134 
135     private static final AtomicLongFieldUpdater<AbstractHttp2StreamChannel> TOTAL_PENDING_SIZE_UPDATER =
136             AtomicLongFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "totalPendingSize");
137 
138     private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
139             AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");
140 
141     private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
142         Throwable cause = future.cause();
143         if (cause != null) {
144             Throwable unwrappedCause;
145             // Unwrap if needed
146             if (cause instanceof Http2FrameStreamException && (unwrappedCause = cause.getCause()) != null) {
147                 cause = unwrappedCause;
148             }
149 
150             // Notify the child-channel and close it.
151             streamChannel.pipeline().fireExceptionCaught(cause);
152             streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
153         }
154     }
155 
156     private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
157         @Override
158         public void operationComplete(ChannelFuture future) {
159             windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
160         }
161     };
162 
163     /**
164      * The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
165      */
166     private enum ReadStatus {
167         /**
168          * No read in progress and no read was requested (yet)
169          */
170         IDLE,
171 
172         /**
173          * Reading in progress
174          */
175         IN_PROGRESS,
176 
177         /**
178          * A read operation was requested.
179          */
180         REQUESTED
181     }
182 
183     private final Http2StreamChannelConfig config = new Http2StreamChannelConfig(this);
184     private final Http2ChannelUnsafe unsafe = new Http2ChannelUnsafe();
185     private final ChannelId channelId;
186     private final ChannelPipeline pipeline;
187     private final DefaultHttp2FrameStream stream;
188     private final ChannelPromise closePromise;
189 
190     private volatile boolean registered;
191 
192     private volatile long totalPendingSize;
193     private volatile int unwritable;
194 
195     // Cached to reduce GC
196     private Runnable fireChannelWritabilityChangedTask;
197 
198     private boolean outboundClosed;
199     private int flowControlledBytes;
200 
201     /**
202      * This variable represents if a read is in progress for the current channel or was requested.
203      * Note that depending upon the {@link RecvByteBufAllocator} behavior a read may extend beyond the
204      * {@link Http2ChannelUnsafe#beginRead()} method scope. The {@link Http2ChannelUnsafe#beginRead()} loop may
205      * drain all pending data, and then if the parent channel is reading this channel may still accept frames.
206      */
207     private ReadStatus readStatus = ReadStatus.IDLE;
208 
209     private Queue<Object> inboundBuffer;
210 
211     /** {@code true} after the first HEADERS frame has been written **/
212     private boolean firstFrameWritten;
213     private boolean readCompletePending;
214 
215     AbstractHttp2StreamChannel(DefaultHttp2FrameStream stream, int id, ChannelHandler inboundHandler) {
216         this.stream = stream;
217         stream.attachment = this;
218         pipeline = new DefaultChannelPipeline(this) {
219             @Override
220             protected void incrementPendingOutboundBytes(long size) {
221                 AbstractHttp2StreamChannel.this.incrementPendingOutboundBytes(size, true);
222             }
223 
224             @Override
225             protected void decrementPendingOutboundBytes(long size) {
226                 AbstractHttp2StreamChannel.this.decrementPendingOutboundBytes(size, true);
227             }
228 
229             @Override
230             protected void onUnhandledInboundException(Throwable cause) {
231                 // Ensure we use the correct Http2Error to close the channel.
232                 if (cause instanceof Http2FrameStreamException) {
233                     closeWithError(((Http2FrameStreamException) cause).error());
234                     return;
235                 } else {
236                     Http2Exception exception = Http2CodecUtil.getEmbeddedHttp2Exception(cause);
237                     if (exception != null) {
238                         closeWithError(exception.error());
239                         return;
240                     }
241                 }
242                 super.onUnhandledInboundException(cause);
243             }
244         };
245 
246         closePromise = pipeline.newPromise();
247         channelId = new Http2StreamChannelId(parent().id(), id);
248 
249         if (inboundHandler != null) {
250             // Add the handler to the pipeline now that we are registered.
251             pipeline.addLast(inboundHandler);
252         }
253     }
254 
255     private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
256         if (size == 0) {
257             return;
258         }
259 
260         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
261         if (newWriteBufferSize > config().getWriteBufferHighWaterMark()) {
262             setUnwritable(invokeLater);
263         }
264     }
265 
266     private void decrementPendingOutboundBytes(long size, boolean invokeLater) {
267         if (size == 0) {
268             return;
269         }
270 
271         long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
272         // Once the totalPendingSize dropped below the low water-mark we can mark the child channel
273         // as writable again. Before doing so we also need to ensure the parent channel is writable to
274         // prevent excessive buffering in the parent outbound buffer. If the parent is not writable
275         // we will mark the child channel as writable once the parent becomes writable by calling
276         // trySetWritable() later.
277         if (newWriteBufferSize < config().getWriteBufferLowWaterMark() && parent().isWritable()) {
278             setWritable(invokeLater);
279         }
280     }
281 
282     final void trySetWritable() {
283         // The parent is writable again but the child channel itself may still not be writable.
284         // Lets try to set the child channel writable to match the state of the parent channel
285         // if (and only if) the totalPendingSize is smaller then the low water-mark.
286         // If this is not the case we will try again later once we drop under it.
287         if (totalPendingSize < config().getWriteBufferLowWaterMark()) {
288             setWritable(false);
289         }
290     }
291 
292     private void setWritable(boolean invokeLater) {
293         for (;;) {
294             final int oldValue = unwritable;
295             final int newValue = oldValue & ~1;
296             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
297                 if (oldValue != 0 && newValue == 0) {
298                     fireChannelWritabilityChanged(invokeLater);
299                 }
300                 break;
301             }
302         }
303     }
304 
305     private void setUnwritable(boolean invokeLater) {
306         for (;;) {
307             final int oldValue = unwritable;
308             final int newValue = oldValue | 1;
309             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
310                 if (oldValue == 0) {
311                     fireChannelWritabilityChanged(invokeLater);
312                 }
313                 break;
314             }
315         }
316     }
317 
318     private void fireChannelWritabilityChanged(boolean invokeLater) {
319         final ChannelPipeline pipeline = pipeline();
320         if (invokeLater) {
321             Runnable task = fireChannelWritabilityChangedTask;
322             if (task == null) {
323                 fireChannelWritabilityChangedTask = task = new Runnable() {
324                     @Override
325                     public void run() {
326                         pipeline.fireChannelWritabilityChanged();
327                     }
328                 };
329             }
330             eventLoop().execute(task);
331         } else {
332             pipeline.fireChannelWritabilityChanged();
333         }
334     }
335     @Override
336     public Http2FrameStream stream() {
337         return stream;
338     }
339 
340     void closeOutbound() {
341         outboundClosed = true;
342     }
343 
344     void streamClosed() {
345         unsafe.readEOS();
346         // Attempt to drain any queued data from the queue and deliver it to the application before closing this
347         // channel.
348         unsafe.doBeginRead();
349     }
350 
351     @Override
352     public ChannelMetadata metadata() {
353         return METADATA;
354     }
355 
356     @Override
357     public ChannelConfig config() {
358         return config;
359     }
360 
361     @Override
362     public boolean isOpen() {
363         return !closePromise.isDone();
364     }
365 
366     @Override
367     public boolean isActive() {
368         return isOpen();
369     }
370 
371     @Override
372     public boolean isWritable() {
373         return unwritable == 0;
374     }
375 
376     @Override
377     public ChannelId id() {
378         return channelId;
379     }
380 
381     @Override
382     public EventLoop eventLoop() {
383         return parent().eventLoop();
384     }
385 
386     @Override
387     public Channel parent() {
388         return parentContext().channel();
389     }
390 
391     @Override
392     public boolean isRegistered() {
393         return registered;
394     }
395 
396     @Override
397     public SocketAddress localAddress() {
398         return parent().localAddress();
399     }
400 
401     @Override
402     public SocketAddress remoteAddress() {
403         return parent().remoteAddress();
404     }
405 
406     @Override
407     public ChannelFuture closeFuture() {
408         return closePromise;
409     }
410 
411     @Override
412     public long bytesBeforeUnwritable() {
413         // +1 because writability doesn't change until the threshold is crossed (not equal to).
414         long bytes = config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
415         // If bytes is negative we know we are not writable, but if bytes is non-negative we have to check
416         // writability. Note that totalPendingSize and isWritable() use different volatile variables that are not
417         // synchronized together. totalPendingSize will be updated before isWritable().
418         return bytes > 0 && isWritable() ? bytes : 0;
419     }
420 
421     @Override
422     public long bytesBeforeWritable() {
423         // +1 because writability doesn't change until the threshold is crossed (not equal to).
424         long bytes = totalPendingSize - config().getWriteBufferLowWaterMark() + 1;
425         // If bytes is negative we know we are writable, but if bytes is non-negative we have to check writability.
426         // Note that totalPendingSize and isWritable() use different volatile variables that are not synchronized
427         // together. totalPendingSize will be updated before isWritable().
428         return bytes <= 0 || isWritable() ? 0 : bytes;
429     }
430 
431     @Override
432     public Unsafe unsafe() {
433         return unsafe;
434     }
435 
436     @Override
437     public ChannelPipeline pipeline() {
438         return pipeline;
439     }
440 
441     @Override
442     public ByteBufAllocator alloc() {
443         return config().getAllocator();
444     }
445 
446     @Override
447     public Channel read() {
448         pipeline().read();
449         return this;
450     }
451 
452     @Override
453     public Channel flush() {
454         pipeline().flush();
455         return this;
456     }
457 
458     @Override
459     public ChannelFuture bind(SocketAddress localAddress) {
460         return pipeline().bind(localAddress);
461     }
462 
463     @Override
464     public ChannelFuture connect(SocketAddress remoteAddress) {
465         return pipeline().connect(remoteAddress);
466     }
467 
468     @Override
469     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
470         return pipeline().connect(remoteAddress, localAddress);
471     }
472 
473     @Override
474     public ChannelFuture disconnect() {
475         return pipeline().disconnect();
476     }
477 
478     @Override
479     public ChannelFuture close() {
480         return pipeline().close();
481     }
482 
483     @Override
484     public ChannelFuture deregister() {
485         return pipeline().deregister();
486     }
487 
488     @Override
489     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
490         return pipeline().bind(localAddress, promise);
491     }
492 
493     @Override
494     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
495         return pipeline().connect(remoteAddress, promise);
496     }
497 
498     @Override
499     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
500         return pipeline().connect(remoteAddress, localAddress, promise);
501     }
502 
503     @Override
504     public ChannelFuture disconnect(ChannelPromise promise) {
505         return pipeline().disconnect(promise);
506     }
507 
508     @Override
509     public ChannelFuture close(ChannelPromise promise) {
510         return pipeline().close(promise);
511     }
512 
513     @Override
514     public ChannelFuture deregister(ChannelPromise promise) {
515         return pipeline().deregister(promise);
516     }
517 
518     @Override
519     public ChannelFuture write(Object msg) {
520         return pipeline().write(msg);
521     }
522 
523     @Override
524     public ChannelFuture write(Object msg, ChannelPromise promise) {
525         return pipeline().write(msg, promise);
526     }
527 
528     @Override
529     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
530         return pipeline().writeAndFlush(msg, promise);
531     }
532 
533     @Override
534     public ChannelFuture writeAndFlush(Object msg) {
535         return pipeline().writeAndFlush(msg);
536     }
537 
538     @Override
539     public ChannelPromise newPromise() {
540         return pipeline().newPromise();
541     }
542 
543     @Override
544     public ChannelProgressivePromise newProgressivePromise() {
545         return pipeline().newProgressivePromise();
546     }
547 
548     @Override
549     public ChannelFuture newSucceededFuture() {
550         return pipeline().newSucceededFuture();
551     }
552 
553     @Override
554     public ChannelFuture newFailedFuture(Throwable cause) {
555         return pipeline().newFailedFuture(cause);
556     }
557 
558     @Override
559     public ChannelPromise voidPromise() {
560         return pipeline().voidPromise();
561     }
562 
563     @Override
564     public int hashCode() {
565         return id().hashCode();
566     }
567 
568     @Override
569     public boolean equals(Object o) {
570         return this == o;
571     }
572 
573     @Override
574     public int compareTo(Channel o) {
575         if (this == o) {
576             return 0;
577         }
578 
579         return id().compareTo(o.id());
580     }
581 
582     @Override
583     public String toString() {
584         return parent().toString() + "(H2 - " + stream + ')';
585     }
586 
587     /**
588      * Receive a read message. This does not notify handlers unless a read is in progress on the
589      * channel.
590      */
591     void fireChildRead(Http2Frame frame) {
592         assert eventLoop().inEventLoop();
593         if (!isActive()) {
594             ReferenceCountUtil.release(frame);
595         } else if (readStatus != ReadStatus.IDLE) {
596             // If a read is in progress or has been requested, there cannot be anything in the queue,
597             // otherwise we would have drained it from the queue and processed it during the read cycle.
598             assert inboundBuffer == null || inboundBuffer.isEmpty();
599             final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
600 
601             unsafe.doRead0(frame, allocHandle);
602             // We currently don't need to check for readEOS because the parent channel and child channel are limited
603             // to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
604             // read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
605             // cost of additional readComplete notifications on the rare path.
606             if (allocHandle.continueReading()) {
607                 maybeAddChannelToReadCompletePendingQueue();
608             } else {
609                 unsafe.notifyReadComplete(allocHandle, true, false);
610             }
611         } else {
612             if (inboundBuffer == null) {
613                 inboundBuffer = new ArrayDeque<Object>(4);
614             }
615             inboundBuffer.add(frame);
616         }
617     }
618 
619     void fireChildReadComplete() {
620         assert eventLoop().inEventLoop();
621         assert readStatus != ReadStatus.IDLE || !readCompletePending;
622         unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false);
623     }
624 
625     final void closeWithError(Http2Error error) {
626         assert eventLoop().inEventLoop();
627         unsafe.close(unsafe.voidPromise(), error);
628     }
629 
630     private final class Http2ChannelUnsafe implements Unsafe {
631         private final VoidChannelPromise unsafeVoidPromise =
632                 new VoidChannelPromise(AbstractHttp2StreamChannel.this, false);
633         @SuppressWarnings("deprecation")
634         private RecvByteBufAllocator.Handle recvHandle;
635         private boolean writeDoneAndNoFlush;
636         private boolean closeInitiated;
637         private boolean readEOS;
638 
639         private boolean receivedEndOfStream;
640         private boolean sentEndOfStream;
641 
642         @Override
643         public void connect(final SocketAddress remoteAddress,
644                             SocketAddress localAddress, final ChannelPromise promise) {
645             if (!promise.setUncancellable()) {
646                 return;
647             }
648             promise.setFailure(new UnsupportedOperationException());
649         }
650 
651         @Override
652         public RecvByteBufAllocator.Handle recvBufAllocHandle() {
653             if (recvHandle == null) {
654                 recvHandle = config().getRecvByteBufAllocator().newHandle();
655                 recvHandle.reset(config());
656             }
657             return recvHandle;
658         }
659 
660         @Override
661         public SocketAddress localAddress() {
662             return parent().unsafe().localAddress();
663         }
664 
665         @Override
666         public SocketAddress remoteAddress() {
667             return parent().unsafe().remoteAddress();
668         }
669 
670         @Override
671         public void register(EventLoop eventLoop, ChannelPromise promise) {
672             if (!promise.setUncancellable()) {
673                 return;
674             }
675             if (registered) {
676                 promise.setFailure(new UnsupportedOperationException("Re-register is not supported"));
677                 return;
678             }
679 
680             registered = true;
681 
682             promise.setSuccess();
683 
684             pipeline().fireChannelRegistered();
685             if (isActive()) {
686                 pipeline().fireChannelActive();
687             }
688         }
689 
690         @Override
691         public void bind(SocketAddress localAddress, ChannelPromise promise) {
692             if (!promise.setUncancellable()) {
693                 return;
694             }
695             promise.setFailure(new UnsupportedOperationException());
696         }
697 
698         @Override
699         public void disconnect(ChannelPromise promise) {
700             close(promise);
701         }
702 
703         @Override
704         public void close(final ChannelPromise promise) {
705             close(promise, Http2Error.CANCEL);
706         }
707 
708         void close(final ChannelPromise promise, Http2Error error) {
709             if (!promise.setUncancellable()) {
710                 return;
711             }
712             if (closeInitiated) {
713                 if (closePromise.isDone()) {
714                     // Closed already.
715                     promise.setSuccess();
716                 } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
717                     // This means close() was called before so we just register a listener and return
718                     closePromise.addListener(new ChannelFutureListener() {
719                         @Override
720                         public void operationComplete(ChannelFuture future) {
721                             promise.setSuccess();
722                         }
723                     });
724                 }
725                 return;
726             }
727             closeInitiated = true;
728             // Just set to false as removing from an underlying queue would even be more expensive.
729             readCompletePending = false;
730 
731             final boolean wasActive = isActive();
732 
733             // There is no need to update the local window as once the stream is closed all the pending bytes will be
734             // given back to the connection window by the controller itself.
735 
736             // Only ever send a reset frame if the connection is still alive and if the stream was created before
737             // as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
738             if (parent().isActive() && isStreamIdValid(stream.id()) &&
739                     // Also ensure the stream was never "closed" before.
740                     !readEOS && !(receivedEndOfStream && sentEndOfStream)) {
741                 Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(error).stream(stream());
742                 write(resetFrame, unsafe().voidPromise());
743                 flush();
744             }
745 
746             if (inboundBuffer != null) {
747                 for (;;) {
748                     Object msg = inboundBuffer.poll();
749                     if (msg == null) {
750                         break;
751                     }
752                     ReferenceCountUtil.release(msg);
753                 }
754                 inboundBuffer = null;
755             }
756 
757             // The promise should be notified before we call fireChannelInactive().
758             outboundClosed = true;
759             closePromise.setSuccess();
760             promise.setSuccess();
761 
762             fireChannelInactiveAndDeregister(voidPromise(), wasActive);
763         }
764 
765         @Override
766         public void closeForcibly() {
767             close(unsafe().voidPromise());
768         }
769 
770         @Override
771         public void deregister(ChannelPromise promise) {
772             fireChannelInactiveAndDeregister(promise, false);
773         }
774 
775         private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
776                                                       final boolean fireChannelInactive) {
777             if (!promise.setUncancellable()) {
778                 return;
779             }
780 
781             if (!registered) {
782                 promise.setSuccess();
783                 return;
784             }
785 
786             // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
787             // we need to ensure we do the actual deregister operation later. This is necessary to preserve the
788             // behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
789             // events 'later' to ensure the current events in the handler are completed before these events.
790             //
791             // See:
792             // https://github.com/netty/netty/issues/4435
793             invokeLater(new Runnable() {
794                 @Override
795                 public void run() {
796                     if (fireChannelInactive) {
797                         pipeline.fireChannelInactive();
798                     }
799                     // The user can fire `deregister` events multiple times but we only want to fire the pipeline
800                     // event if the channel was actually registered.
801                     if (registered) {
802                         registered = false;
803                         pipeline.fireChannelUnregistered();
804                     }
805                     safeSetSuccess(promise);
806                 }
807             });
808         }
809 
810         private void safeSetSuccess(ChannelPromise promise) {
811             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
812                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
813             }
814         }
815 
816         private void invokeLater(Runnable task) {
817             try {
818                 // This method is used by outbound operation implementations to trigger an inbound event later.
819                 // They do not trigger an inbound event immediately because an outbound operation might have been
820                 // triggered by another inbound event handler method.  If fired immediately, the call stack
821                 // will look like this for example:
822                 //
823                 //   handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
824                 //   -> handlerA.ctx.close()
825                 //     -> channel.unsafe.close()
826                 //       -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
827                 //
828                 // which means the execution of two inbound handler methods of the same handler overlap undesirably.
829                 eventLoop().execute(task);
830             } catch (RejectedExecutionException e) {
831                 logger.warn("Can't invoke task later as EventLoop rejected it", e);
832             }
833         }
834 
835         @Override
836         public void beginRead() {
837             if (!isActive()) {
838                 return;
839             }
840             updateLocalWindowIfNeeded();
841 
842             switch (readStatus) {
843                 case IDLE:
844                     readStatus = ReadStatus.IN_PROGRESS;
845                     doBeginRead();
846                     break;
847                 case IN_PROGRESS:
848                     readStatus = ReadStatus.REQUESTED;
849                     break;
850                 default:
851                     break;
852             }
853         }
854 
855         private Object pollQueuedMessage() {
856             return inboundBuffer == null ? null : inboundBuffer.poll();
857         }
858 
859         void doBeginRead() {
860             if (readStatus == ReadStatus.IDLE) {
861                 // Don't wait for the user to request a read to notify of channel closure.
862                 if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) {
863                     // Double check there is nothing left to flush such as a window update frame.
864                     flush();
865                     unsafe.closeForcibly();
866                 }
867             } else {
868                 do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS.
869                     Object message = pollQueuedMessage();
870                     if (message == null) {
871                         // Double check there is nothing left to flush such as a window update frame.
872                         flush();
873                         if (readEOS) {
874                             unsafe.closeForcibly();
875                         }
876                         break;
877                     }
878                     final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
879                     allocHandle.reset(config());
880                     boolean continueReading = false;
881                     do {
882                         doRead0((Http2Frame) message, allocHandle);
883                     } while ((readEOS || (continueReading = allocHandle.continueReading()))
884                             && (message = pollQueuedMessage()) != null);
885 
886                     if (continueReading && isParentReadInProgress() && !readEOS) {
887                         // Currently the parent and child channel are on the same EventLoop thread. If the parent is
888                         // currently reading it is possible that more frames will be delivered to this child channel. In
889                         // the case that this child channel still wants to read we delay the channelReadComplete on this
890                         // child channel until the parent is done reading.
891                         maybeAddChannelToReadCompletePendingQueue();
892                     } else {
893                         notifyReadComplete(allocHandle, true, true);
894 
895                         // While in the read loop reset the readState AFTER calling readComplete (or other pipeline
896                         // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls
897                         // read on each readComplete) and StackOverflowException.
898                         resetReadStatus();
899                     }
900                 } while (readStatus != ReadStatus.IDLE);
901             }
902         }
903 
904         void readEOS() {
905             readEOS = true;
906         }
907 
908         private void updateLocalWindowIfNeeded() {
909             if (flowControlledBytes != 0 && !parentContext().isRemoved()) {
910                 int bytes = flowControlledBytes;
911                 flowControlledBytes = 0;
912                 ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
913                 // window update frames are commonly swallowed by the Http2FrameCodec and the promise is synchronously
914                 // completed but the flow controller _may_ have generated a wire level WINDOW_UPDATE. Therefore we need,
915                 // to assume there was a write done that needs to be flushed or we risk flow control starvation.
916                 writeDoneAndNoFlush = true;
917                 // Add a listener which will notify and teardown the stream
918                 // when a window update fails if needed or check the result of the future directly if it was completed
919                 // already.
920                 // See https://github.com/netty/netty/issues/9663
921                 if (future.isDone()) {
922                     windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
923                 } else {
924                     future.addListener(windowUpdateFrameWriteListener);
925                 }
926             }
927         }
928 
929         private void resetReadStatus() {
930             readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE;
931         }
932 
933         void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete,
934                                 boolean inReadLoop) {
935             if (!readCompletePending && !forceReadComplete) {
936                 return;
937             }
938             // Set to false just in case we added the channel multiple times before.
939             readCompletePending = false;
940 
941             if (!inReadLoop) {
942                 // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow.
943                 resetReadStatus();
944             }
945 
946             allocHandle.readComplete();
947             pipeline().fireChannelReadComplete();
948             // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
949             // channel is not currently reading we need to force a flush at the child channel, because we cannot
950             // rely upon flush occurring in channelReadComplete on the parent channel.
951             flush();
952             if (readEOS) {
953                 unsafe.closeForcibly();
954             }
955         }
956 
957         @SuppressWarnings("deprecation")
958         void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
959             final int bytes;
960             if (frame instanceof Http2DataFrame) {
961                 bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();
962                 // It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
963                 // as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
964                 // in this case that we accounted for it.
965                 //
966                 // See https://github.com/netty/netty/issues/9663
967                 flowControlledBytes += bytes;
968             } else {
969                 bytes = MIN_HTTP2_FRAME_SIZE;
970             }
971 
972             // Let's keep track of what we received as the stream state itself will only be updated once the frame
973             // was dispatched for reading which might cause problems if we try to close the channel in a write future.
974             receivedEndOfStream |= isEndOfStream(frame);
975 
976             // Update before firing event through the pipeline to be consistent with other Channel implementation.
977             allocHandle.attemptedBytesRead(bytes);
978             allocHandle.lastBytesRead(bytes);
979             allocHandle.incMessagesRead(1);
980 
981             pipeline().fireChannelRead(frame);
982         }
983 
984         @Override
985         public void write(Object msg, final ChannelPromise promise) {
986             // After this point its not possible to cancel a write anymore.
987             if (!promise.setUncancellable()) {
988                 ReferenceCountUtil.release(msg);
989                 return;
990             }
991 
992             if (!isActive() ||
993                     // Once the outbound side was closed we should not allow header / data frames
994                     outboundClosed && (msg instanceof Http2HeadersFrame || msg instanceof Http2DataFrame)) {
995                 ReferenceCountUtil.release(msg);
996                 promise.setFailure(new ClosedChannelException());
997                 return;
998             }
999 
1000             try {
1001                 if (msg instanceof Http2StreamFrame) {
1002                     Http2StreamFrame frame = validateStreamFrame((Http2StreamFrame) msg).stream(stream());
1003                     writeHttp2StreamFrame(frame, promise);
1004                 } else {
1005                     String msgStr = msg.toString();
1006                     ReferenceCountUtil.release(msg);
1007                     promise.setFailure(new IllegalArgumentException(
1008                             "Message must be an " + StringUtil.simpleClassName(Http2StreamFrame.class) +
1009                                     ": " + msgStr));
1010                 }
1011             } catch (Throwable t) {
1012                 promise.tryFailure(t);
1013             }
1014         }
1015 
1016         private boolean isEndOfStream(Http2Frame frame) {
1017             if (frame instanceof Http2HeadersFrame) {
1018                 return ((Http2HeadersFrame) frame).isEndStream();
1019             }
1020             if (frame instanceof Http2DataFrame) {
1021                 return ((Http2DataFrame) frame).isEndStream();
1022             }
1023             return false;
1024         }
1025 
1026         private void writeHttp2StreamFrame(Http2StreamFrame frame, final ChannelPromise promise) {
1027             if (!firstFrameWritten && !isStreamIdValid(stream().id()) && !(frame instanceof Http2HeadersFrame)) {
1028                 ReferenceCountUtil.release(frame);
1029                 promise.setFailure(
1030                     new IllegalArgumentException("The first frame must be a headers frame. Was: "
1031                         + frame.name()));
1032                 return;
1033             }
1034 
1035             final boolean firstWrite;
1036             if (firstFrameWritten) {
1037                 firstWrite = false;
1038             } else {
1039                 firstWrite = firstFrameWritten = true;
1040             }
1041 
1042             // Let's keep track of what we send as the stream state itself will only be updated once the frame
1043             // was written which might cause problems if we try to close the channel in a write future.
1044             sentEndOfStream |= isEndOfStream(frame);
1045             ChannelFuture f = write0(parentContext(), frame);
1046             if (f.isDone()) {
1047                 if (firstWrite) {
1048                     firstWriteComplete(f, promise);
1049                 } else {
1050                     writeComplete(f, promise);
1051                 }
1052             } else {
1053                 final long bytes = FlowControlledFrameSizeEstimator.HANDLE_INSTANCE.size(frame);
1054                 incrementPendingOutboundBytes(bytes, false);
1055                 f.addListener(new ChannelFutureListener() {
1056                     @Override
1057                     public void operationComplete(ChannelFuture future) {
1058                         if (firstWrite) {
1059                             firstWriteComplete(future, promise);
1060                         } else {
1061                             writeComplete(future, promise);
1062                         }
1063                         decrementPendingOutboundBytes(bytes, false);
1064                     }
1065                 });
1066                 writeDoneAndNoFlush = true;
1067             }
1068         }
1069 
1070         private void firstWriteComplete(ChannelFuture future, ChannelPromise promise) {
1071             Throwable cause = future.cause();
1072             if (cause == null) {
1073                 promise.setSuccess();
1074             } else {
1075                 // If the first write fails there is not much we can do, just close
1076                 closeForcibly();
1077                 promise.setFailure(wrapStreamClosedError(cause));
1078             }
1079         }
1080 
1081         private void writeComplete(ChannelFuture future, ChannelPromise promise) {
1082             Throwable cause = future.cause();
1083             if (cause == null) {
1084                 promise.setSuccess();
1085             } else {
1086                 Throwable error = wrapStreamClosedError(cause);
1087                 // To make it more consistent with AbstractChannel we handle all IOExceptions here.
1088                 if (error instanceof IOException) {
1089                     if (config.isAutoClose()) {
1090                         // Close channel if needed.
1091                         closeForcibly();
1092                     } else {
1093                         // TODO: Once Http2StreamChannel extends DuplexChannel we should call shutdownOutput(...)
1094                         outboundClosed = true;
1095                     }
1096                 }
1097                 promise.setFailure(error);
1098             }
1099         }
1100 
1101         private Throwable wrapStreamClosedError(Throwable cause) {
1102             // If the error was caused by STREAM_CLOSED we should use a ClosedChannelException to better
1103             // mimic other transports and make it easier to reason about what exceptions to expect.
1104             if (cause instanceof Http2Exception && ((Http2Exception) cause).error() == Http2Error.STREAM_CLOSED) {
1105                 return new ClosedChannelException().initCause(cause);
1106             }
1107             return cause;
1108         }
1109 
1110         private Http2StreamFrame validateStreamFrame(Http2StreamFrame frame) {
1111             if (frame.stream() != null && frame.stream() != stream) {
1112                 String msgString = frame.toString();
1113                 ReferenceCountUtil.release(frame);
1114                 throw new IllegalArgumentException(
1115                         "Stream " + frame.stream() + " must not be set on the frame: " + msgString);
1116             }
1117             return frame;
1118         }
1119 
1120         @Override
1121         public void flush() {
1122             // If we are currently in the parent channel's read loop we should just ignore the flush.
1123             // We will ensure we trigger ctx.flush() after we processed all Channels later on and
1124             // so aggregate the flushes. This is done as ctx.flush() is expensive when as it may trigger an
1125             // write(...) or writev(...) operation on the socket.
1126             if (!writeDoneAndNoFlush || isParentReadInProgress()) {
1127                 // There is nothing to flush so this is a NOOP.
1128                 return;
1129             }
1130             // We need to set this to false before we call flush0(...) as ChannelFutureListener may produce more data
1131             // that are explicit flushed.
1132             writeDoneAndNoFlush = false;
1133             flush0(parentContext());
1134         }
1135 
1136         @Override
1137         public ChannelPromise voidPromise() {
1138             return unsafeVoidPromise;
1139         }
1140 
1141         @Override
1142         public ChannelOutboundBuffer outboundBuffer() {
1143             // Always return null as we not use the ChannelOutboundBuffer and not even support it.
1144             return null;
1145         }
1146     }
1147 
1148     /**
1149      * {@link ChannelConfig} so that the high and low writebuffer watermarks can reflect the outbound flow control
1150      * window, without having to create a new {@link WriteBufferWaterMark} object whenever the flow control window
1151      * changes.
1152      */
1153     private static final class Http2StreamChannelConfig extends DefaultChannelConfig {
1154         Http2StreamChannelConfig(Channel channel) {
1155             super(channel);
1156         }
1157 
1158         @Override
1159         public MessageSizeEstimator getMessageSizeEstimator() {
1160             return FlowControlledFrameSizeEstimator.INSTANCE;
1161         }
1162 
1163         @Override
1164         public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
1165             throw new UnsupportedOperationException();
1166         }
1167 
1168         @Override
1169         public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
1170             if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) {
1171                 throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " +
1172                         RecvByteBufAllocator.ExtendedHandle.class);
1173             }
1174             super.setRecvByteBufAllocator(allocator);
1175             return this;
1176         }
1177     }
1178 
1179     private void maybeAddChannelToReadCompletePendingQueue() {
1180         if (!readCompletePending) {
1181             readCompletePending = true;
1182             addChannelToReadCompletePendingQueue();
1183         }
1184     }
1185 
1186     protected void flush0(ChannelHandlerContext ctx) {
1187         ctx.flush();
1188     }
1189 
1190     protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
1191         ChannelPromise promise = ctx.newPromise();
1192         ctx.write(msg, promise);
1193         return promise;
1194     }
1195 
1196     protected abstract boolean isParentReadInProgress();
1197     protected abstract void addChannelToReadCompletePendingQueue();
1198     protected abstract ChannelHandlerContext parentContext();
1199 }