查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http2;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.UnsupportedMessageTypeException;
26  import io.netty.handler.codec.http.HttpServerUpgradeHandler.UpgradeEvent;
27  import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
28  import io.netty.handler.codec.http2.Http2Stream.State;
29  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
30  import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.ReferenceCounted;
33  import io.netty.util.collection.IntObjectHashMap;
34  import io.netty.util.collection.IntObjectMap;
35  import io.netty.util.internal.UnstableApi;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import static io.netty.buffer.ByteBufUtil.writeAscii;
40  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
42  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
43  import static io.netty.util.internal.logging.InternalLogLevel.DEBUG;
44  
45  /**
46   * <p><em>This API is very immature.</em> The Http2Connection-based API is currently preferred over this API.
47   * This API is targeted to eventually replace or reduce the need for the {@link Http2ConnectionHandler} API.
48   *
49   * <p>An HTTP/2 handler that maps HTTP/2 frames to {@link Http2Frame} objects and vice versa. For every incoming HTTP/2
50   * frame, an {@link Http2Frame} object is created and propagated via {@link #channelRead}. Outbound {@link Http2Frame}
51   * objects received via {@link #write} are converted to the HTTP/2 wire format. HTTP/2 frames specific to a stream
52   * implement the {@link Http2StreamFrame} interface. The {@link Http2FrameCodec} is instantiated using the
53   * {@link Http2FrameCodecBuilder}. It's recommended for channel handlers to inherit from the
54   * {@link Http2ChannelDuplexHandler}, as it provides additional functionality like iterating over all active streams or
55   * creating outbound streams.
56   *
57   * <h3>Stream Lifecycle</h3>
58   * <p>
59   * The frame codec delivers and writes frames for active streams. An active stream is closed when either side sends a
60   * {@code RST_STREAM} frame or both sides send a frame with the {@code END_STREAM} flag set. Each
61   * {@link Http2StreamFrame} has a {@link Http2FrameStream} object attached that uniquely identifies a particular stream.
62   *
63   * <p>{@link Http2StreamFrame}s read from the channel always a {@link Http2FrameStream} object set, while when writing a
64   * {@link Http2StreamFrame} the application code needs to set a {@link Http2FrameStream} object using
65   * {@link Http2StreamFrame#stream(Http2FrameStream)}.
66   *
67   * <h3>Flow control</h3>
68   * <p>
69   * The frame codec automatically increments stream and connection flow control windows.
70   *
71   * <p>Incoming flow controlled frames need to be consumed by writing a {@link Http2WindowUpdateFrame} with the consumed
72   * number of bytes and the corresponding stream identifier set to the frame codec.
73   *
74   * <p>The local stream-level flow control window can be changed by writing a {@link Http2SettingsFrame} with the
75   * {@link Http2Settings#initialWindowSize()} set to the targeted value.
76   *
77   * <p>The connection-level flow control window can be changed by writing a {@link Http2WindowUpdateFrame} with the
78   * desired window size <em>increment</em> in bytes and the stream identifier set to {@code 0}. By default the initial
79   * connection-level flow control window is the same as initial stream-level flow control window.
80   *
81   * <h3>New inbound Streams</h3>
82   * <p>
83   * The first frame of an HTTP/2 stream must be an {@link Http2HeadersFrame}, which will have an {@link Http2FrameStream}
84   * object attached.
85   *
86   * <h3>New outbound Streams</h3>
87   * <p>
88   * A outbound HTTP/2 stream can be created by first instantiating a new {@link Http2FrameStream} object via
89   * {@link Http2ChannelDuplexHandler#newStream()}, and then writing a {@link Http2HeadersFrame} object with the stream
90   * attached.
91   *
92   * <pre>
93   *     final Http2Stream2 stream = handler.newStream();
94   *     ctx.write(headersFrame.stream(stream)).addListener(new ChannelFutureListener() {
95   *
96   *         @Override
97   *         public void operationComplete(ChannelFuture f) {
98   *             if (f.isSuccess()) {
99   *                 // Stream is active and stream.id() returns a valid stream identifier.
100  *                 System.out.println("New stream with id " + stream.id() + " created.");
101  *             } else {
102  *                 // Stream failed to become active. Handle error.
103  *                 if (f.cause() instanceof Http2NoMoreStreamIdsException) {
104  *
105  *                 } else if (f.cause() instanceof Http2GoAwayException) {
106  *
107  *                 } else {
108  *
109  *                 }
110  *             }
111  *         }
112  *     }
113  * </pre>
114  *
115  * <p>If a new stream cannot be created due to stream id exhaustion of the endpoint, the {@link ChannelPromise} of the
116  * HEADERS frame will fail with a {@link Http2NoMoreStreamIdsException}.
117  *
118  * <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
119  * {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
120  * the {@link Http2FrameCodec} can be build with
121  * {@link Http2FrameCodecBuilder#encoderEnforceMaxConcurrentStreams(boolean)} enabled, in which case a new stream and
122  * its associated frames will be buffered until either the limit is increased or an active stream is closed. It's,
123  * however, possible that a buffered stream will never become active. That is, the channel might
124  * get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
125  * {@link Http2ChannelClosedException}. In the second case, all writes of buffered streams with an identifier less than
126  * the last stream identifier of the GO_AWAY frame will fail with a {@link Http2GoAwayException}.
127  *
128  * <h3>Error Handling</h3>
129  * <p>
130  * Exceptions and errors are propagated via {@link ChannelInboundHandler#exceptionCaught}. Exceptions that apply to
131  * a specific HTTP/2 stream are wrapped in a {@link Http2FrameStreamException} and have the corresponding
132  * {@link Http2FrameStream} object attached.
133  *
134  * <h3>Reference Counting</h3>
135  * <p>
136  * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
137  * reference counted objects (e.g. {@link ByteBuf}s). The frame codec will call {@link ReferenceCounted#retain()} before
138  * propagating a reference counted object through the pipeline, and thus an application handler needs to release such
139  * an object after having consumed it. For more information on reference counting take a look at
140  * https://netty.io/wiki/reference-counted-objects.html
141  *
142  * <h3>HTTP Upgrade</h3>
143  * <p>
144  * Server-side HTTP to HTTP/2 upgrade is supported in conjunction with {@link Http2ServerUpgradeCodec}; the necessary
145  * HTTP-to-HTTP/2 conversion is performed automatically.
146  */
147 @UnstableApi
148 public class Http2FrameCodec extends Http2ConnectionHandler {
149 
150     private static final InternalLogger LOG = InternalLoggerFactory.getInstance(Http2FrameCodec.class);
151 
152     private static final Class<?>[] SUPPORTED_MESSAGES = new Class[] {
153             Http2DataFrame.class, Http2HeadersFrame.class, Http2WindowUpdateFrame.class, Http2ResetFrame.class,
154             Http2PingFrame.class, Http2SettingsFrame.class, Http2SettingsAckFrame.class, Http2GoAwayFrame.class,
155             Http2PushPromiseFrame.class, Http2PriorityFrame.class, Http2UnknownFrame.class };
156 
157     protected final PropertyKey streamKey;
158     private final PropertyKey upgradeKey;
159 
160     private final Integer initialFlowControlWindowSize;
161 
162     ChannelHandlerContext ctx;
163 
164     /**
165      * Number of buffered streams if the {@link StreamBufferingEncoder} is used.
166      **/
167     private int numBufferedStreams;
168     private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
169             new IntObjectHashMap<DefaultHttp2FrameStream>(8);
170 
171     Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
172                     boolean decoupleCloseAndGoAway, boolean flushPreface) {
173         super(decoder, encoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
174 
175         decoder.frameListener(new FrameListener());
176         connection().addListener(new ConnectionListener());
177         connection().remote().flowController().listener(new Http2RemoteFlowControllerListener());
178         streamKey = connection().newKey();
179         upgradeKey = connection().newKey();
180         initialFlowControlWindowSize = initialSettings.initialWindowSize();
181     }
182 
183     /**
184      * Creates a new outbound/local stream.
185      */
186     DefaultHttp2FrameStream newStream() {
187         return new DefaultHttp2FrameStream();
188     }
189 
190     /**
191      * Iterates over all active HTTP/2 streams.
192      *
193      * <p>This method must not be called outside of the event loop.
194      */
195     final void forEachActiveStream(final Http2FrameStreamVisitor streamVisitor) throws Http2Exception {
196         assert ctx.executor().inEventLoop();
197         if (connection().numActiveStreams() > 0) {
198             connection().forEachActiveStream(new Http2StreamVisitor() {
199                 @Override
200                 public boolean visit(Http2Stream stream) {
201                     try {
202                         return streamVisitor.visit((Http2FrameStream) stream.getProperty(streamKey));
203                     } catch (Throwable cause) {
204                         onError(ctx, false, cause);
205                         return false;
206                     }
207                 }
208             });
209         }
210     }
211 
212     /**
213      * Retrieve the number of streams currently in the process of being initialized.
214      * <p>
215      * This is package-private for testing only.
216      */
217     int numInitializingStreams() {
218         return frameStreamToInitializeMap.size();
219     }
220 
221     @Override
222     public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
223         this.ctx = ctx;
224         super.handlerAdded(ctx);
225         handlerAdded0(ctx);
226         // Must be after Http2ConnectionHandler does its initialization in handlerAdded above.
227         // The server will not send a connection preface so we are good to send a window update.
228         Http2Connection connection = connection();
229         if (connection.isServer()) {
230             tryExpandConnectionFlowControlWindow(connection);
231         }
232     }
233 
234     private void tryExpandConnectionFlowControlWindow(Http2Connection connection) throws Http2Exception {
235         if (initialFlowControlWindowSize != null) {
236             // The window size in the settings explicitly excludes the connection window. So we manually manipulate the
237             // connection window to accommodate more concurrent data per connection.
238             Http2Stream connectionStream = connection.connectionStream();
239             Http2LocalFlowController localFlowController = connection.local().flowController();
240             final int delta = initialFlowControlWindowSize - localFlowController.initialWindowSize(connectionStream);
241             // Only increase the connection window, don't decrease it.
242             if (delta > 0) {
243                 // Double the delta just so a single stream can't exhaust the connection window.
244                 localFlowController.incrementWindowSize(connectionStream, Math.max(delta << 1, delta));
245                 flush(ctx);
246             }
247         }
248     }
249 
250     void handlerAdded0(@SuppressWarnings("unsed") ChannelHandlerContext ctx) throws Exception {
251         // sub-class can override this for extra steps that needs to be done when the handler is added.
252     }
253 
254     /**
255      * Handles the cleartext HTTP upgrade event. If an upgrade occurred, sends a simple response via
256      * HTTP/2 on stream 1 (the stream specifically reserved for cleartext HTTP upgrade).
257      */
258     @Override
259     public final void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
260         if (evt == Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE) {
261             // The user event implies that we are on the client.
262             tryExpandConnectionFlowControlWindow(connection());
263 
264             // We schedule this on the EventExecutor to allow to have any extra handlers added to the pipeline
265             // before we pass the event to the next handler. This is needed as the event may be called from within
266             // handlerAdded(...) which will be run before other handlers will be added to the pipeline.
267             ctx.executor().execute(new Runnable() {
268                 @Override
269                 public void run() {
270                     ctx.fireUserEventTriggered(evt);
271                 }
272             });
273         } else if (evt instanceof UpgradeEvent) {
274             UpgradeEvent upgrade = (UpgradeEvent) evt;
275             try {
276                 onUpgradeEvent(ctx, upgrade.retain());
277                 Http2Stream stream = connection().stream(HTTP_UPGRADE_STREAM_ID);
278                 if (stream.getProperty(streamKey) == null) {
279                     // TODO: improve handler/stream lifecycle so that stream isn't active before handler added.
280                     // The stream was already made active, but ctx may have been null so it wasn't initialized.
281                     // https://github.com/netty/netty/issues/4942
282                     onStreamActive0(stream);
283                 }
284                 upgrade.upgradeRequest().headers().setInt(
285                         HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), HTTP_UPGRADE_STREAM_ID);
286                 stream.setProperty(upgradeKey, true);
287                 InboundHttpToHttp2Adapter.handle(
288                         ctx, connection(), decoder().frameListener(), upgrade.upgradeRequest().retain());
289             } finally {
290                 upgrade.release();
291             }
292         } else {
293             onUserEventTriggered(ctx, evt);
294             ctx.fireUserEventTriggered(evt);
295         }
296     }
297 
298     void onUserEventTriggered(final ChannelHandlerContext ctx, final Object evt) throws Exception {
299         // noop
300     }
301 
302     /**
303      * Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
304      * streams.
305      */
306     @Override
307     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
308         if (msg instanceof Http2DataFrame) {
309             Http2DataFrame dataFrame = (Http2DataFrame) msg;
310             encoder().writeData(ctx, dataFrame.stream().id(), dataFrame.content(),
311                     dataFrame.padding(), dataFrame.isEndStream(), promise);
312         } else if (msg instanceof Http2HeadersFrame) {
313             writeHeadersFrame(ctx, (Http2HeadersFrame) msg, promise);
314         } else if (msg instanceof Http2WindowUpdateFrame) {
315             Http2WindowUpdateFrame frame = (Http2WindowUpdateFrame) msg;
316             Http2FrameStream frameStream = frame.stream();
317             // It is legit to send a WINDOW_UPDATE frame for the connection stream. The parent channel doesn't attempt
318             // to set the Http2FrameStream so we assume if it is null the WINDOW_UPDATE is for the connection stream.
319             try {
320                 if (frameStream == null) {
321                     increaseInitialConnectionWindow(frame.windowSizeIncrement());
322                 } else {
323                     consumeBytes(frameStream.id(), frame.windowSizeIncrement());
324                 }
325                 promise.setSuccess();
326             } catch (Throwable t) {
327                 promise.setFailure(t);
328             }
329         } else if (msg instanceof Http2ResetFrame) {
330             Http2ResetFrame rstFrame = (Http2ResetFrame) msg;
331             int id = rstFrame.stream().id();
332             // Only ever send a reset frame if stream may have existed before as otherwise we may send a RST on a
333             // stream in an invalid state and cause a connection error.
334             if (connection().streamMayHaveExisted(id)) {
335                 encoder().writeRstStream(ctx, rstFrame.stream().id(), rstFrame.errorCode(), promise);
336             } else {
337                 ReferenceCountUtil.release(rstFrame);
338                 promise.setFailure(Http2Exception.streamError(
339                         rstFrame.stream().id(), Http2Error.PROTOCOL_ERROR, "Stream never existed"));
340             }
341         } else if (msg instanceof Http2PingFrame) {
342             Http2PingFrame frame = (Http2PingFrame) msg;
343             encoder().writePing(ctx, frame.ack(), frame.content(), promise);
344         } else if (msg instanceof Http2SettingsFrame) {
345             encoder().writeSettings(ctx, ((Http2SettingsFrame) msg).settings(), promise);
346         } else if (msg instanceof Http2SettingsAckFrame) {
347             // In the event of manual SETTINGS ACK, it is assumed the encoder will apply the earliest received but not
348             // yet ACKed settings.
349             encoder().writeSettingsAck(ctx, promise);
350         } else if (msg instanceof Http2GoAwayFrame) {
351             writeGoAwayFrame(ctx, (Http2GoAwayFrame) msg, promise);
352         } else if (msg instanceof Http2PushPromiseFrame) {
353             Http2PushPromiseFrame pushPromiseFrame = (Http2PushPromiseFrame) msg;
354             writePushPromise(ctx, pushPromiseFrame, promise);
355         } else if (msg instanceof Http2PriorityFrame) {
356             Http2PriorityFrame priorityFrame = (Http2PriorityFrame) msg;
357             encoder().writePriority(ctx, priorityFrame.stream().id(), priorityFrame.streamDependency(),
358                     priorityFrame.weight(), priorityFrame.exclusive(), promise);
359         } else if (msg instanceof Http2UnknownFrame) {
360             Http2UnknownFrame unknownFrame = (Http2UnknownFrame) msg;
361             encoder().writeFrame(ctx, unknownFrame.frameType(), unknownFrame.stream().id(),
362                     unknownFrame.flags(), unknownFrame.content(), promise);
363         } else if (!(msg instanceof Http2Frame)) {
364             ctx.write(msg, promise);
365         } else {
366             ReferenceCountUtil.release(msg);
367             throw new UnsupportedMessageTypeException(msg, SUPPORTED_MESSAGES);
368         }
369     }
370 
371     private void increaseInitialConnectionWindow(int deltaBytes) throws Http2Exception {
372         // The LocalFlowController is responsible for detecting over/under flow.
373         connection().local().flowController().incrementWindowSize(connection().connectionStream(), deltaBytes);
374     }
375 
376     final boolean consumeBytes(int streamId, int bytes) throws Http2Exception {
377         Http2Stream stream = connection().stream(streamId);
378         // Upgraded requests are ineligible for stream control. We add the null check
379         // in case the stream has been deregistered.
380         if (stream != null && streamId == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
381             Boolean upgraded = stream.getProperty(upgradeKey);
382             if (Boolean.TRUE.equals(upgraded)) {
383                 return false;
384             }
385         }
386 
387         return connection().local().flowController().consumeBytes(stream, bytes);
388     }
389 
390     private void writeGoAwayFrame(ChannelHandlerContext ctx, Http2GoAwayFrame frame, ChannelPromise promise) {
391         if (frame.lastStreamId() > -1) {
392             frame.release();
393             throw new IllegalArgumentException("Last stream id must not be set on GOAWAY frame");
394         }
395 
396         int lastStreamCreated = connection().remote().lastStreamCreated();
397         long lastStreamId = lastStreamCreated + ((long) frame.extraStreamIds()) * 2;
398         // Check if the computation overflowed.
399         if (lastStreamId > Integer.MAX_VALUE) {
400             lastStreamId = Integer.MAX_VALUE;
401         }
402         goAway(ctx, (int) lastStreamId, frame.errorCode(), frame.content(), promise);
403     }
404 
405     private void writeHeadersFrame(final ChannelHandlerContext ctx, Http2HeadersFrame headersFrame,
406                                    ChannelPromise promise) {
407 
408         if (isStreamIdValid(headersFrame.stream().id())) {
409             encoder().writeHeaders(ctx, headersFrame.stream().id(), headersFrame.headers(), headersFrame.padding(),
410                     headersFrame.isEndStream(), promise);
411         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) headersFrame.stream(), promise)) {
412             promise = promise.unvoid();
413 
414             final int streamId = headersFrame.stream().id();
415 
416             encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
417                     headersFrame.isEndStream(), promise);
418 
419             if (!promise.isDone()) {
420                 numBufferedStreams++;
421                 // Clean up the stream being initialized if writing the headers fails and also
422                 // decrement the number of buffered streams.
423                 promise.addListener(new ChannelFutureListener() {
424                     @Override
425                     public void operationComplete(ChannelFuture channelFuture) {
426                         numBufferedStreams--;
427                         handleHeaderFuture(channelFuture, streamId);
428                     }
429                 });
430             } else {
431                 handleHeaderFuture(promise, streamId);
432             }
433         }
434     }
435 
436     private void writePushPromise(final ChannelHandlerContext ctx, Http2PushPromiseFrame pushPromiseFrame,
437                                   final ChannelPromise promise) {
438         if (isStreamIdValid(pushPromiseFrame.pushStream().id())) {
439             encoder().writePushPromise(ctx, pushPromiseFrame.stream().id(), pushPromiseFrame.pushStream().id(),
440                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
441         } else if (initializeNewStream(ctx, (DefaultHttp2FrameStream) pushPromiseFrame.pushStream(), promise)) {
442             final int streamId = pushPromiseFrame.stream().id();
443             encoder().writePushPromise(ctx, streamId, pushPromiseFrame.pushStream().id(),
444                     pushPromiseFrame.http2Headers(), pushPromiseFrame.padding(), promise);
445 
446             if (promise.isDone()) {
447                 handleHeaderFuture(promise, streamId);
448             } else {
449                 numBufferedStreams++;
450                 // Clean up the stream being initialized if writing the headers fails and also
451                 // decrement the number of buffered streams.
452                 promise.addListener(new ChannelFutureListener() {
453                     @Override
454                     public void operationComplete(ChannelFuture channelFuture) {
455                         numBufferedStreams--;
456                         handleHeaderFuture(channelFuture, streamId);
457                     }
458                 });
459             }
460         }
461     }
462 
463     private boolean initializeNewStream(ChannelHandlerContext ctx, DefaultHttp2FrameStream http2FrameStream,
464                                         ChannelPromise promise) {
465         final Http2Connection connection = connection();
466         final int streamId = connection.local().incrementAndGetNextStreamId();
467         if (streamId < 0) {
468             promise.setFailure(new Http2NoMoreStreamIdsException());
469 
470             // Simulate a GOAWAY being received due to stream exhaustion on this connection. We use the maximum
471             // valid stream ID for the current peer.
472             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(connection.isServer() ? Integer.MAX_VALUE :
473                     Integer.MAX_VALUE - 1, NO_ERROR.code(),
474                     writeAscii(ctx.alloc(), "Stream IDs exhausted on local stream creation")));
475 
476             return false;
477         }
478         http2FrameStream.id = streamId;
479 
480         // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
481         // stream in a field directly we may override the stored field before onStreamAdded(...) was called
482         // and so not correctly set the property for the buffered stream.
483         //
484         // See https://github.com/netty/netty/issues/8692
485         Object old = frameStreamToInitializeMap.put(streamId, http2FrameStream);
486 
487         // We should not re-use ids.
488         assert old == null;
489         return true;
490     }
491 
492     private void handleHeaderFuture(ChannelFuture channelFuture, int streamId) {
493         if (!channelFuture.isSuccess()) {
494             frameStreamToInitializeMap.remove(streamId);
495         }
496     }
497 
498     private void onStreamActive0(Http2Stream stream) {
499         if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID &&
500                 connection().local().isValidStreamId(stream.id())) {
501             return;
502         }
503 
504         DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
505         onHttp2StreamStateChanged(ctx, stream2);
506     }
507 
508     private final class ConnectionListener extends Http2ConnectionAdapter {
509         @Override
510         public void onStreamAdded(Http2Stream stream) {
511             DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
512 
513             if (frameStream != null) {
514                 frameStream.setStreamAndProperty(streamKey, stream);
515             }
516         }
517 
518         @Override
519         public void onStreamActive(Http2Stream stream) {
520             onStreamActive0(stream);
521         }
522 
523         @Override
524         public void onStreamClosed(Http2Stream stream) {
525             onHttp2StreamStateChanged0(stream);
526         }
527 
528         @Override
529         public void onStreamHalfClosed(Http2Stream stream) {
530             onHttp2StreamStateChanged0(stream);
531         }
532 
533         private void onHttp2StreamStateChanged0(Http2Stream stream) {
534             DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
535             if (stream2 != null) {
536                 onHttp2StreamStateChanged(ctx, stream2);
537             }
538         }
539     }
540 
541     @Override
542     protected void onConnectionError(
543             ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex) {
544         if (!outbound) {
545             // allow the user to handle it first in the pipeline, and then automatically clean up.
546             // If this is not desired behavior the user can override this method.
547             //
548             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
549             ctx.fireExceptionCaught(cause);
550         }
551         super.onConnectionError(ctx, outbound, cause, http2Ex);
552     }
553 
554     /**
555      * Exceptions for unknown streams, that is streams that have no {@link Http2FrameStream} object attached
556      * are simply logged and replied to by sending a RST_STREAM frame.
557      */
558     @Override
559     protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
560                                        Http2Exception.StreamException streamException) {
561         int streamId = streamException.streamId();
562         Http2Stream connectionStream = connection().stream(streamId);
563         if (connectionStream == null) {
564             onHttp2UnknownStreamError(ctx, cause, streamException);
565             // Write a RST_STREAM
566             super.onStreamError(ctx, outbound, cause, streamException);
567             return;
568         }
569 
570         Http2FrameStream stream = connectionStream.getProperty(streamKey);
571         if (stream == null) {
572             LOG.warn("Stream exception thrown without stream object attached.", cause);
573             // Write a RST_STREAM
574             super.onStreamError(ctx, outbound, cause, streamException);
575             return;
576         }
577 
578         if (!outbound) {
579             // We only forward non outbound errors as outbound errors will already be reflected by failing the promise.
580             onHttp2FrameStreamException(ctx, new Http2FrameStreamException(stream, streamException.error(), cause));
581         }
582     }
583 
584     private static void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx,
585             Throwable cause, Http2Exception.StreamException streamException) {
586         // We log here for debugging purposes. This exception will be propagated to the upper layers through other ways:
587         // - fireExceptionCaught
588         // - fireUserEventTriggered(Http2ResetFrame), see Http2MultiplexHandler#channelRead(...)
589         // - by failing write promise
590         // Receiver of the error is responsible for correct handling of this exception.
591         LOG.log(DEBUG, "Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
592     }
593 
594     @Override
595     protected final boolean isGracefulShutdownComplete() {
596         return super.isGracefulShutdownComplete() && numBufferedStreams == 0;
597     }
598 
599     private final class FrameListener implements Http2FrameListener {
600 
601         @Override
602         public void onUnknownFrame(
603                 ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) {
604             if (streamId == 0) {
605                 // Ignore unknown frames on connection stream, for example: HTTP/2 GREASE testing
606                 return;
607             }
608             onHttp2Frame(ctx, new DefaultHttp2UnknownFrame(frameType, flags, payload)
609                     .stream(requireStream(streamId)).retain());
610         }
611 
612         @Override
613         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
614             onHttp2Frame(ctx, new DefaultHttp2SettingsFrame(settings));
615         }
616 
617         @Override
618         public void onPingRead(ChannelHandlerContext ctx, long data) {
619             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, false));
620         }
621 
622         @Override
623         public void onPingAckRead(ChannelHandlerContext ctx, long data) {
624             onHttp2Frame(ctx, new DefaultHttp2PingFrame(data, true));
625         }
626 
627         @Override
628         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) {
629             onHttp2Frame(ctx, new DefaultHttp2ResetFrame(errorCode).stream(requireStream(streamId)));
630         }
631 
632         @Override
633         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) {
634             if (streamId == 0) {
635                 // Ignore connection window updates.
636                 return;
637             }
638             onHttp2Frame(ctx, new DefaultHttp2WindowUpdateFrame(windowSizeIncrement).stream(requireStream(streamId)));
639         }
640 
641         @Override
642         public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
643                                   Http2Headers headers, int streamDependency, short weight, boolean
644                                           exclusive, int padding, boolean endStream) {
645             onHeadersRead(ctx, streamId, headers, padding, endStream);
646         }
647 
648         @Override
649         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
650                                   int padding, boolean endOfStream) {
651             onHttp2Frame(ctx, new DefaultHttp2HeadersFrame(headers, endOfStream, padding)
652                     .stream(requireStream(streamId)));
653         }
654 
655         @Override
656         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
657                               boolean endOfStream) {
658             onHttp2Frame(ctx, new DefaultHttp2DataFrame(data, endOfStream, padding)
659                     .stream(requireStream(streamId)).retain());
660             // We return the bytes in consumeBytes() once the stream channel consumed the bytes.
661             return 0;
662         }
663 
664         @Override
665         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) {
666             onHttp2Frame(ctx, new DefaultHttp2GoAwayFrame(lastStreamId, errorCode, debugData).retain());
667         }
668 
669         @Override
670         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
671                                    short weight, boolean exclusive) {
672 
673             Http2Stream stream = connection().stream(streamId);
674             if (stream == null) {
675                 // The stream was not opened yet, let's just ignore this for now.
676                 return;
677             }
678             onHttp2Frame(ctx, new DefaultHttp2PriorityFrame(streamDependency, weight, exclusive)
679                     .stream(requireStream(streamId)));
680         }
681 
682         @Override
683         public void onSettingsAckRead(ChannelHandlerContext ctx) {
684             onHttp2Frame(ctx, Http2SettingsAckFrame.INSTANCE);
685         }
686 
687         @Override
688         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
689                                       Http2Headers headers, int padding) {
690             onHttp2Frame(ctx, new DefaultHttp2PushPromiseFrame(headers, padding, promisedStreamId)
691                     .pushStream(new DefaultHttp2FrameStream()
692                             .setStreamAndProperty(streamKey, connection().stream(promisedStreamId)))
693                     .stream(requireStream(streamId)));
694         }
695 
696         private Http2FrameStream requireStream(int streamId) {
697             Http2FrameStream stream = connection().stream(streamId).getProperty(streamKey);
698             if (stream == null) {
699                 throw new IllegalStateException("Stream object required for identifier: " + streamId);
700             }
701             return stream;
702         }
703     }
704 
705     private void onUpgradeEvent(ChannelHandlerContext ctx, UpgradeEvent evt) {
706         ctx.fireUserEventTriggered(evt);
707     }
708 
709     private void onHttp2StreamWritabilityChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream,
710                                                  @SuppressWarnings("unused") boolean writable) {
711         ctx.fireUserEventTriggered(stream.writabilityChanged);
712     }
713 
714     void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
715         ctx.fireUserEventTriggered(stream.stateChanged);
716     }
717 
718     void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
719         ctx.fireChannelRead(frame);
720     }
721 
722     void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
723         ctx.fireExceptionCaught(cause);
724     }
725 
726     private final class Http2RemoteFlowControllerListener implements Http2RemoteFlowController.Listener {
727         @Override
728         public void writabilityChanged(Http2Stream stream) {
729             DefaultHttp2FrameStream frameStream = stream.getProperty(streamKey);
730             if (frameStream == null) {
731                 return;
732             }
733             onHttp2StreamWritabilityChanged(
734                     ctx, frameStream, connection().remote().flowController().isWritable(stream));
735         }
736     }
737 
738     /**
739      * {@link Http2FrameStream} implementation.
740      */
741     // TODO(buchgr): Merge Http2FrameStream and Http2Stream.
742     static class DefaultHttp2FrameStream implements Http2FrameStream {
743 
744         private volatile int id = -1;
745         private volatile Http2Stream stream;
746 
747         final Http2FrameStreamEvent stateChanged = Http2FrameStreamEvent.stateChanged(this);
748         final Http2FrameStreamEvent writabilityChanged = Http2FrameStreamEvent.writabilityChanged(this);
749 
750         Channel attachment;
751 
752         DefaultHttp2FrameStream setStreamAndProperty(PropertyKey streamKey, Http2Stream stream) {
753             assert id == -1 || stream.id() == id;
754             this.stream = stream;
755             stream.setProperty(streamKey, this);
756             return this;
757         }
758 
759         @Override
760         public int id() {
761             Http2Stream stream = this.stream;
762             return stream == null ? id : stream.id();
763         }
764 
765         @Override
766         public State state() {
767             Http2Stream stream = this.stream;
768             return stream == null ? State.IDLE : stream.state();
769         }
770 
771         @Override
772         public String toString() {
773             return String.valueOf(id());
774         }
775     }
776 }