查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelHandlerContext;
19  import io.netty.handler.codec.http.HttpHeaderNames;
20  import io.netty.handler.codec.http.HttpStatusClass;
21  import io.netty.handler.codec.http.HttpUtil;
22  import io.netty.handler.codec.http2.Http2Connection.Endpoint;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map.Entry;
30  
31  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
32  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
33  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
34  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
35  import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
36  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
37  import static io.netty.handler.codec.http2.Http2Exception.streamError;
38  import static io.netty.handler.codec.http2.Http2PromisedRequestVerifier.ALWAYS_VERIFY;
39  import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
40  import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
41  import static io.netty.util.internal.ObjectUtil.checkNotNull;
42  import static java.lang.Integer.MAX_VALUE;
43  import static java.lang.Math.min;
44  
45  /**
46   * Provides the default implementation for processing inbound frame events and delegates to a
47   * {@link Http2FrameListener}
48   * <p>
49   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
50   * <p>
51   * This interface enforces inbound flow control functionality through
52   * {@link Http2LocalFlowController}
53   */
54  @UnstableApi
55  public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
56      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2ConnectionDecoder.class);
57      private Http2FrameListener internalFrameListener = new PrefaceFrameListener();
58      private final Http2Connection connection;
59      private Http2LifecycleManager lifecycleManager;
60      private final Http2ConnectionEncoder encoder;
61      private final Http2FrameReader frameReader;
62      private Http2FrameListener listener;
63      private final Http2PromisedRequestVerifier requestVerifier;
64      private final Http2SettingsReceivedConsumer settingsReceivedConsumer;
65      private final boolean autoAckPing;
66      private final Http2Connection.PropertyKey contentLengthKey;
67      private final boolean validateHeaders;
68  
69      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
70                                           Http2ConnectionEncoder encoder,
71                                           Http2FrameReader frameReader) {
72          this(connection, encoder, frameReader, ALWAYS_VERIFY);
73      }
74  
75      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
76                                           Http2ConnectionEncoder encoder,
77                                           Http2FrameReader frameReader,
78                                           Http2PromisedRequestVerifier requestVerifier) {
79          this(connection, encoder, frameReader, requestVerifier, true);
80      }
81  
82      /**
83       * Create a new instance.
84       * @param connection The {@link Http2Connection} associated with this decoder.
85       * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
86       * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
87       *                    h2 semantics on top of the frames.
88       * @param requestVerifier Determines if push promised streams are valid.
89       * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
90       *  The {@code Http2ConnectionEncoder} is expected to be an instance of {@link Http2SettingsReceivedConsumer} and
91       *  will apply the earliest received but not yet ACKed SETTINGS when writing the SETTINGS ACKs.
92       * {@code true} to enable automatically applying and sending settings acknowledge frame.
93       */
94      public DefaultHttp2ConnectionDecoder(Http2Connection connection,
95                                           Http2ConnectionEncoder encoder,
96                                           Http2FrameReader frameReader,
97                                           Http2PromisedRequestVerifier requestVerifier,
98                                           boolean autoAckSettings) {
99          this(connection, encoder, frameReader, requestVerifier, autoAckSettings, true);
100     }
101 
102     @Deprecated
103     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
104                                          Http2ConnectionEncoder encoder,
105                                          Http2FrameReader frameReader,
106                                          Http2PromisedRequestVerifier requestVerifier,
107                                          boolean autoAckSettings,
108                                          boolean autoAckPing) {
109         this(connection, encoder, frameReader, requestVerifier, autoAckSettings, true, true);
110     }
111 
112     /**
113      * Create a new instance.
114      * @param connection The {@link Http2Connection} associated with this decoder.
115      * @param encoder The {@link Http2ConnectionEncoder} associated with this decoder.
116      * @param frameReader Responsible for reading/parsing the raw frames. As opposed to this object which applies
117      *                    h2 semantics on top of the frames.
118      * @param requestVerifier Determines if push promised streams are valid.
119      * @param autoAckSettings {@code false} to disable automatically applying and sending settings acknowledge frame.
120      *                        The {@code Http2ConnectionEncoder} is expected to be an instance of
121      *                        {@link Http2SettingsReceivedConsumer} and will apply the earliest received but not yet
122      *                        ACKed SETTINGS when writing the SETTINGS ACKs. {@code true} to enable automatically
123      *                        applying and sending settings acknowledge frame.
124      * @param autoAckPing {@code false} to disable automatically sending ping acknowledge frame. {@code true} to enable
125      *                    automatically sending ping ack frame.
126      */
127     public DefaultHttp2ConnectionDecoder(Http2Connection connection,
128                                          Http2ConnectionEncoder encoder,
129                                          Http2FrameReader frameReader,
130                                          Http2PromisedRequestVerifier requestVerifier,
131                                          boolean autoAckSettings,
132                                          boolean autoAckPing,
133                                          boolean validateHeaders) {
134         this.validateHeaders = validateHeaders;
135         this.autoAckPing = autoAckPing;
136         if (autoAckSettings) {
137             settingsReceivedConsumer = null;
138         } else {
139             if (!(encoder instanceof Http2SettingsReceivedConsumer)) {
140                 throw new IllegalArgumentException("disabling autoAckSettings requires the encoder to be a " +
141                         Http2SettingsReceivedConsumer.class);
142             }
143             settingsReceivedConsumer = (Http2SettingsReceivedConsumer) encoder;
144         }
145         this.connection = checkNotNull(connection, "connection");
146         contentLengthKey = this.connection.newKey();
147         this.frameReader = checkNotNull(frameReader, "frameReader");
148         this.encoder = checkNotNull(encoder, "encoder");
149         this.requestVerifier = checkNotNull(requestVerifier, "requestVerifier");
150         if (connection.local().flowController() == null) {
151             connection.local().flowController(new DefaultHttp2LocalFlowController(connection));
152         }
153         connection.local().flowController().frameWriter(encoder.frameWriter());
154     }
155 
156     @Override
157     public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
158         this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
159     }
160 
161     @Override
162     public Http2Connection connection() {
163         return connection;
164     }
165 
166     @Override
167     public final Http2LocalFlowController flowController() {
168         return connection.local().flowController();
169     }
170 
171     @Override
172     public void frameListener(Http2FrameListener listener) {
173         this.listener = checkNotNull(listener, "listener");
174     }
175 
176     @Override
177     public Http2FrameListener frameListener() {
178         return listener;
179     }
180 
181     @Override
182     public boolean prefaceReceived() {
183         return FrameReadListener.class == internalFrameListener.getClass();
184     }
185 
186     @Override
187     public void decodeFrame(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Http2Exception {
188         frameReader.readFrame(ctx, in, internalFrameListener);
189     }
190 
191     @Override
192     public Http2Settings localSettings() {
193         Http2Settings settings = new Http2Settings();
194         Http2FrameReader.Configuration config = frameReader.configuration();
195         Http2HeadersDecoder.Configuration headersConfig = config.headersConfiguration();
196         Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
197         settings.initialWindowSize(flowController().initialWindowSize());
198         settings.maxConcurrentStreams(connection.remote().maxActiveStreams());
199         settings.headerTableSize(headersConfig.maxHeaderTableSize());
200         settings.maxFrameSize(frameSizePolicy.maxFrameSize());
201         settings.maxHeaderListSize(headersConfig.maxHeaderListSize());
202         if (!connection.isServer()) {
203             // Only set the pushEnabled flag if this is a client endpoint.
204             settings.pushEnabled(connection.local().allowPushTo());
205         }
206         return settings;
207     }
208 
209     @Override
210     public void close() {
211         frameReader.close();
212     }
213 
214     /**
215      * Calculate the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
216      * @param maxHeaderListSize
217      *      <a href="https://tools.ietf.org/html/rfc7540#section-6.5.2">SETTINGS_MAX_HEADER_LIST_SIZE</a> for the local
218      *      endpoint.
219      * @return the threshold in bytes which should trigger a {@code GO_AWAY} if a set of headers exceeds this amount.
220      */
221     protected long calculateMaxHeaderListSizeGoAway(long maxHeaderListSize) {
222         return Http2CodecUtil.calculateMaxHeaderListSizeGoAway(maxHeaderListSize);
223     }
224 
225     private int unconsumedBytes(Http2Stream stream) {
226         return flowController().unconsumedBytes(stream);
227     }
228 
229     void onGoAwayRead0(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
230             throws Http2Exception {
231         listener.onGoAwayRead(ctx, lastStreamId, errorCode, debugData);
232         connection.goAwayReceived(lastStreamId, errorCode, debugData);
233     }
234 
235     void onUnknownFrame0(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
236             ByteBuf payload) throws Http2Exception {
237         listener.onUnknownFrame(ctx, frameType, streamId, flags, payload);
238     }
239 
240     // See https://tools.ietf.org/html/rfc7540#section-8.1.2.6
241     private void verifyContentLength(Http2Stream stream, int data, boolean isEnd) throws Http2Exception {
242         ContentLength contentLength = stream.getProperty(contentLengthKey);
243         if (contentLength != null) {
244             try {
245                 contentLength.increaseReceivedBytes(connection.isServer(), stream.id(), data, isEnd);
246             } finally {
247                 if (isEnd) {
248                     stream.removeProperty(contentLengthKey);
249                 }
250             }
251         }
252     }
253 
254     /**
255      * Handles all inbound frames from the network.
256      */
257     private final class FrameReadListener implements Http2FrameListener {
258         @Override
259         public int onDataRead(final ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
260                               boolean endOfStream) throws Http2Exception {
261             Http2Stream stream = connection.stream(streamId);
262             Http2LocalFlowController flowController = flowController();
263             int readable = data.readableBytes();
264             int bytesToReturn = readable + padding;
265 
266             final boolean shouldIgnore;
267             try {
268                 shouldIgnore = shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, endOfStream, "DATA");
269             } catch (Http2Exception e) {
270                 // Ignoring this frame. We still need to count the frame towards the connection flow control
271                 // window, but we immediately mark all bytes as consumed.
272                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
273                 flowController.consumeBytes(stream, bytesToReturn);
274                 throw e;
275             } catch (Throwable t) {
276                 throw connectionError(INTERNAL_ERROR, t, "Unhandled error on data stream id %d", streamId);
277             }
278 
279             if (shouldIgnore) {
280                 // Ignoring this frame. We still need to count the frame towards the connection flow control
281                 // window, but we immediately mark all bytes as consumed.
282                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
283                 flowController.consumeBytes(stream, bytesToReturn);
284 
285                 // Verify that the stream may have existed after we apply flow control.
286                 verifyStreamMayHaveExisted(streamId, endOfStream, "DATA");
287 
288                 // All bytes have been consumed.
289                 return bytesToReturn;
290             }
291             Http2Exception error = null;
292             switch (stream.state()) {
293                 case OPEN:
294                 case HALF_CLOSED_LOCAL:
295                     break;
296                 case HALF_CLOSED_REMOTE:
297                 case CLOSED:
298                     error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
299                         stream.id(), stream.state());
300                     break;
301                 default:
302                     error = streamError(stream.id(), PROTOCOL_ERROR,
303                         "Stream %d in unexpected state: %s", stream.id(), stream.state());
304                     break;
305             }
306 
307             int unconsumedBytes = unconsumedBytes(stream);
308             try {
309                 flowController.receiveFlowControlledFrame(stream, data, padding, endOfStream);
310                 // Update the unconsumed bytes after flow control is applied.
311                 unconsumedBytes = unconsumedBytes(stream);
312 
313                 // If the stream is in an invalid state to receive the frame, throw the error.
314                 if (error != null) {
315                     throw error;
316                 }
317 
318                 verifyContentLength(stream, readable, endOfStream);
319 
320                 // Call back the application and retrieve the number of bytes that have been
321                 // immediately processed.
322                 bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
323 
324                 if (endOfStream) {
325                     lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
326                 }
327 
328                 return bytesToReturn;
329             } catch (Http2Exception e) {
330                 // If an exception happened during delivery, the listener may have returned part
331                 // of the bytes before the error occurred. If that's the case, subtract that from
332                 // the total processed bytes so that we don't return too many bytes.
333                 int delta = unconsumedBytes - unconsumedBytes(stream);
334                 bytesToReturn -= delta;
335                 throw e;
336             } catch (RuntimeException e) {
337                 // If an exception happened during delivery, the listener may have returned part
338                 // of the bytes before the error occurred. If that's the case, subtract that from
339                 // the total processed bytes so that we don't return too many bytes.
340                 int delta = unconsumedBytes - unconsumedBytes(stream);
341                 bytesToReturn -= delta;
342                 throw e;
343             } finally {
344                 // If appropriate, return the processed bytes to the flow controller.
345                 flowController.consumeBytes(stream, bytesToReturn);
346             }
347         }
348 
349         @Override
350         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
351                 boolean endOfStream) throws Http2Exception {
352             onHeadersRead(ctx, streamId, headers, 0, DEFAULT_PRIORITY_WEIGHT, false, padding, endOfStream);
353         }
354 
355         @Override
356         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
357                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
358             Http2Stream stream = connection.stream(streamId);
359             boolean allowHalfClosedRemote = false;
360             boolean isTrailers = false;
361             if (stream == null && !connection.streamMayHaveExisted(streamId)) {
362                 stream = connection.remote().createStream(streamId, endOfStream);
363                 // Allow the state to be HALF_CLOSE_REMOTE if we're creating it in that state.
364                 allowHalfClosedRemote = stream.state() == HALF_CLOSED_REMOTE;
365             } else if (stream != null) {
366                 isTrailers = stream.isHeadersReceived();
367             }
368 
369             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, stream, endOfStream, "HEADERS")) {
370                 return;
371             }
372 
373             boolean isInformational = !connection.isServer() &&
374                     HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
375             if ((isInformational || !endOfStream) && stream.isHeadersReceived() || stream.isTrailersReceived()) {
376                 throw streamError(streamId, PROTOCOL_ERROR,
377                                   "Stream %d received too many headers EOS: %s state: %s",
378                                   streamId, endOfStream, stream.state());
379             }
380 
381             switch (stream.state()) {
382                 case RESERVED_REMOTE:
383                     stream.open(endOfStream);
384                     break;
385                 case OPEN:
386                 case HALF_CLOSED_LOCAL:
387                     // Allowed to receive headers in these states.
388                     break;
389                 case HALF_CLOSED_REMOTE:
390                     if (!allowHalfClosedRemote) {
391                         throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
392                                 stream.id(), stream.state());
393                     }
394                     break;
395                 case CLOSED:
396                     throw streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
397                             stream.id(), stream.state());
398                 default:
399                     // Connection error.
400                     throw connectionError(PROTOCOL_ERROR, "Stream %d in unexpected state: %s", stream.id(),
401                             stream.state());
402             }
403 
404             if (!isTrailers) {
405                 // extract the content-length header
406                 List<? extends CharSequence> contentLength = headers.getAll(HttpHeaderNames.CONTENT_LENGTH);
407                 if (contentLength != null && !contentLength.isEmpty()) {
408                     try {
409                         long cLength = HttpUtil.normalizeAndGetContentLength(contentLength, false, true);
410                         if (cLength != -1) {
411                             headers.setLong(HttpHeaderNames.CONTENT_LENGTH, cLength);
412                             stream.setProperty(contentLengthKey, new ContentLength(cLength));
413                         }
414                     } catch (IllegalArgumentException e) {
415                         throw streamError(stream.id(), PROTOCOL_ERROR, e,
416                                 "Multiple content-length headers received");
417                     }
418                 }
419                 // Use size() instead of isEmpty() for backward compatibility with grpc-java prior to 1.59.1,
420                 // see https://github.com/grpc/grpc-java/issues/10665
421             } else if (validateHeaders && headers.size() > 0) {
422                 // Need to check trailers don't contain pseudo headers. According to RFC 9113
423                 // Trailers MUST NOT include pseudo-header fields (Section 8.3).
424                 for (Iterator<Entry<CharSequence, CharSequence>> iterator =
425                     headers.iterator(); iterator.hasNext();) {
426                     CharSequence name = iterator.next().getKey();
427                     if (Http2Headers.PseudoHeaderName.hasPseudoHeaderFormat(name)) {
428                         throw streamError(stream.id(), PROTOCOL_ERROR,
429                                 "Found invalid Pseudo-Header in trailers: %s", name);
430                     }
431                 }
432             }
433 
434             stream.headersReceived(isInformational);
435             verifyContentLength(stream, 0, endOfStream);
436             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
437             listener.onHeadersRead(ctx, streamId, headers, streamDependency,
438                     weight, exclusive, padding, endOfStream);
439             // If the headers completes this stream, close it.
440             if (endOfStream) {
441                 lifecycleManager.closeStreamRemote(stream, ctx.newSucceededFuture());
442             }
443         }
444 
445         @Override
446         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
447                 boolean exclusive) throws Http2Exception {
448             encoder.flowController().updateDependencyTree(streamId, streamDependency, weight, exclusive);
449 
450             listener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
451         }
452 
453         @Override
454         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
455             Http2Stream stream = connection.stream(streamId);
456             if (stream == null) {
457                 verifyStreamMayHaveExisted(streamId, false, "RST_STREAM");
458                 return;
459             }
460 
461             switch(stream.state()) {
462             case IDLE:
463                 throw connectionError(PROTOCOL_ERROR, "RST_STREAM received for IDLE stream %d", streamId);
464             case CLOSED:
465                 return; // RST_STREAM frames must be ignored for closed streams.
466             default:
467                 break;
468             }
469 
470             listener.onRstStreamRead(ctx, streamId, errorCode);
471 
472             lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
473         }
474 
475         @Override
476         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
477             // Apply oldest outstanding local settings here. This is a synchronization point between endpoints.
478             Http2Settings settings = encoder.pollSentSettings();
479 
480             if (settings != null) {
481                 applyLocalSettings(settings);
482             }
483 
484             listener.onSettingsAckRead(ctx);
485         }
486 
487         /**
488          * Applies settings sent from the local endpoint.
489          * <p>
490          * This method is only called after the local settings have been acknowledged from the remote endpoint.
491          */
492         private void applyLocalSettings(Http2Settings settings) throws Http2Exception {
493             Boolean pushEnabled = settings.pushEnabled();
494             final Http2FrameReader.Configuration config = frameReader.configuration();
495             final Http2HeadersDecoder.Configuration headerConfig = config.headersConfiguration();
496             final Http2FrameSizePolicy frameSizePolicy = config.frameSizePolicy();
497             if (pushEnabled != null) {
498                 if (connection.isServer()) {
499                     throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
500                 }
501                 connection.local().allowPushTo(pushEnabled);
502             }
503 
504             Long maxConcurrentStreams = settings.maxConcurrentStreams();
505             if (maxConcurrentStreams != null) {
506                 connection.remote().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
507             }
508 
509             Long headerTableSize = settings.headerTableSize();
510             if (headerTableSize != null) {
511                 headerConfig.maxHeaderTableSize(headerTableSize);
512             }
513 
514             Long maxHeaderListSize = settings.maxHeaderListSize();
515             if (maxHeaderListSize != null) {
516                 headerConfig.maxHeaderListSize(maxHeaderListSize, calculateMaxHeaderListSizeGoAway(maxHeaderListSize));
517             }
518 
519             Integer maxFrameSize = settings.maxFrameSize();
520             if (maxFrameSize != null) {
521                 frameSizePolicy.maxFrameSize(maxFrameSize);
522             }
523 
524             Integer initialWindowSize = settings.initialWindowSize();
525             if (initialWindowSize != null) {
526                 flowController().initialWindowSize(initialWindowSize);
527             }
528         }
529 
530         @Override
531         public void onSettingsRead(final ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
532             if (settingsReceivedConsumer == null) {
533                 // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
534                 // remote peer applies these settings before any subsequent frames that we may send which depend upon
535                 // these new settings. See https://github.com/netty/netty/issues/6520.
536                 encoder.writeSettingsAck(ctx, ctx.newPromise());
537 
538                 encoder.remoteSettings(settings);
539             } else {
540                 settingsReceivedConsumer.consumeReceivedSettings(settings);
541             }
542 
543             listener.onSettingsRead(ctx, settings);
544         }
545 
546         @Override
547         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
548             if (autoAckPing) {
549                 // Send an ack back to the remote client.
550                 encoder.writePing(ctx, true, data, ctx.newPromise());
551             }
552             listener.onPingRead(ctx, data);
553         }
554 
555         @Override
556         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
557             listener.onPingAckRead(ctx, data);
558         }
559 
560         @Override
561         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
562                 Http2Headers headers, int padding) throws Http2Exception {
563             // A client cannot push.
564             if (connection().isServer()) {
565                 throw connectionError(PROTOCOL_ERROR, "A client cannot push.");
566             }
567 
568             Http2Stream parentStream = connection.stream(streamId);
569 
570             if (shouldIgnoreHeadersOrDataFrame(ctx, streamId, parentStream, false, "PUSH_PROMISE")) {
571                 return;
572             }
573 
574             switch (parentStream.state()) {
575               case OPEN:
576               case HALF_CLOSED_LOCAL:
577                   // Allowed to receive push promise in these states.
578                   break;
579               default:
580                   // Connection error.
581                   throw connectionError(PROTOCOL_ERROR,
582                       "Stream %d in unexpected state for receiving push promise: %s",
583                       parentStream.id(), parentStream.state());
584             }
585 
586             if (!requestVerifier.isAuthoritative(ctx, headers)) {
587                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
588                         "Promised request on stream %d for promised stream %d is not authoritative",
589                         streamId, promisedStreamId);
590             }
591             if (!requestVerifier.isCacheable(headers)) {
592                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
593                         "Promised request on stream %d for promised stream %d is not known to be cacheable",
594                         streamId, promisedStreamId);
595             }
596             if (!requestVerifier.isSafe(headers)) {
597                 throw streamError(promisedStreamId, PROTOCOL_ERROR,
598                         "Promised request on stream %d for promised stream %d is not known to be safe",
599                         streamId, promisedStreamId);
600             }
601 
602             // Reserve the push stream based with a priority based on the current stream's priority.
603             connection.remote().reservePushStream(promisedStreamId, parentStream);
604 
605             listener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
606         }
607 
608         @Override
609         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
610                 throws Http2Exception {
611             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
612         }
613 
614         @Override
615         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
616                 throws Http2Exception {
617             Http2Stream stream = connection.stream(streamId);
618             if (stream == null || stream.state() == CLOSED || streamCreatedAfterGoAwaySent(streamId)) {
619                 // Ignore this frame.
620                 verifyStreamMayHaveExisted(streamId, false, "WINDOW_UPDATE");
621                 return;
622             }
623 
624             // Update the outbound flow control window.
625             encoder.flowController().incrementWindowSize(stream, windowSizeIncrement);
626 
627             listener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
628         }
629 
630         @Override
631         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
632                 ByteBuf payload) throws Http2Exception {
633             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
634         }
635 
636         /**
637          * Helper method to determine if a frame that has the semantics of headers or data should be ignored for the
638          * {@code stream} (which may be {@code null}) associated with {@code streamId}.
639          */
640         private boolean shouldIgnoreHeadersOrDataFrame(ChannelHandlerContext ctx, int streamId, Http2Stream stream,
641                 boolean endOfStream, String frameName) throws Http2Exception {
642             if (stream == null) {
643                 if (streamCreatedAfterGoAwaySent(streamId)) {
644                     logger.info("{} ignoring {} frame for stream {}. Stream sent after GOAWAY sent",
645                             ctx.channel(), frameName, streamId);
646                     return true;
647                 }
648 
649                 // Make sure it's not an out-of-order frame, like a rogue DATA frame, for a stream that could
650                 // never have existed.
651                 verifyStreamMayHaveExisted(streamId, endOfStream, frameName);
652 
653                 // Its possible that this frame would result in stream ID out of order creation (PROTOCOL ERROR) and its
654                 // also possible that this frame is received on a CLOSED stream (STREAM_CLOSED after a RST_STREAM is
655                 // sent). We don't have enough information to know for sure, so we choose the lesser of the two errors.
656                 throw streamError(streamId, STREAM_CLOSED, "Received %s frame for an unknown stream %d",
657                                   frameName, streamId);
658             }
659             if (stream.isResetSent() || streamCreatedAfterGoAwaySent(streamId)) {
660                 // If we have sent a reset stream it is assumed the stream will be closed after the write completes.
661                 // If we have not sent a reset, but the stream was created after a GoAway this is not supported by
662                 // DefaultHttp2Connection and if a custom Http2Connection is used it is assumed the lifetime is managed
663                 // elsewhere so we don't close the stream or otherwise modify the stream's state.
664 
665                 if (logger.isInfoEnabled()) {
666                     logger.info("{} ignoring {} frame for stream {}", ctx.channel(), frameName,
667                             stream.isResetSent() ? "RST_STREAM sent." :
668                                     "Stream created after GOAWAY sent. Last known stream by peer " +
669                                      connection.remote().lastStreamKnownByPeer());
670                 }
671 
672                 return true;
673             }
674             return false;
675         }
676 
677         /**
678          * Helper method for determining whether or not to ignore inbound frames. A stream is considered to be created
679          * after a {@code GOAWAY} is sent if the following conditions hold:
680          * <p/>
681          * <ul>
682          *     <li>A {@code GOAWAY} must have been sent by the local endpoint</li>
683          *     <li>The {@code streamId} must identify a legitimate stream id for the remote endpoint to be creating</li>
684          *     <li>{@code streamId} is greater than the Last Known Stream ID which was sent by the local endpoint
685          *     in the last {@code GOAWAY} frame</li>
686          * </ul>
687          * <p/>
688          */
689         private boolean streamCreatedAfterGoAwaySent(int streamId) {
690             Endpoint<?> remote = connection.remote();
691             return connection.goAwaySent() && remote.isValidStreamId(streamId) &&
692                     streamId > remote.lastStreamKnownByPeer();
693         }
694 
695         private void verifyStreamMayHaveExisted(int streamId, boolean endOfStream, String frameName)
696                 throws Http2Exception {
697             if (!connection.streamMayHaveExisted(streamId)) {
698                 throw connectionError(PROTOCOL_ERROR,
699                         "Stream %d does not exist for inbound frame %s, endOfStream = %b",
700                         streamId, frameName, endOfStream);
701             }
702         }
703     }
704 
705     private final class PrefaceFrameListener implements Http2FrameListener {
706         /**
707          * Verifies that the HTTP/2 connection preface has been received from the remote endpoint.
708          * It is possible that the current call to
709          * {@link Http2FrameReader#readFrame(ChannelHandlerContext, ByteBuf, Http2FrameListener)} will have multiple
710          * frames to dispatch. So it may be OK for this class to get legitimate frames for the first readFrame.
711          */
712         private void verifyPrefaceReceived() throws Http2Exception {
713             if (!prefaceReceived()) {
714                 throw connectionError(PROTOCOL_ERROR, "Received non-SETTINGS as first frame.");
715             }
716         }
717 
718         @Override
719         public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
720                 throws Http2Exception {
721             verifyPrefaceReceived();
722             return internalFrameListener.onDataRead(ctx, streamId, data, padding, endOfStream);
723         }
724 
725         @Override
726         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
727                 boolean endOfStream) throws Http2Exception {
728             verifyPrefaceReceived();
729             internalFrameListener.onHeadersRead(ctx, streamId, headers, padding, endOfStream);
730         }
731 
732         @Override
733         public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
734                 short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
735             verifyPrefaceReceived();
736             internalFrameListener.onHeadersRead(ctx, streamId, headers, streamDependency, weight,
737                     exclusive, padding, endOfStream);
738         }
739 
740         @Override
741         public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
742                 boolean exclusive) throws Http2Exception {
743             verifyPrefaceReceived();
744             internalFrameListener.onPriorityRead(ctx, streamId, streamDependency, weight, exclusive);
745         }
746 
747         @Override
748         public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
749             verifyPrefaceReceived();
750             internalFrameListener.onRstStreamRead(ctx, streamId, errorCode);
751         }
752 
753         @Override
754         public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
755             verifyPrefaceReceived();
756             internalFrameListener.onSettingsAckRead(ctx);
757         }
758 
759         @Override
760         public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
761             // The first settings should change the internalFrameListener to the "real" listener
762             // that expects the preface to be verified.
763             if (!prefaceReceived()) {
764                 internalFrameListener = new FrameReadListener();
765             }
766             internalFrameListener.onSettingsRead(ctx, settings);
767         }
768 
769         @Override
770         public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
771             verifyPrefaceReceived();
772             internalFrameListener.onPingRead(ctx, data);
773         }
774 
775         @Override
776         public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
777             verifyPrefaceReceived();
778             internalFrameListener.onPingAckRead(ctx, data);
779         }
780 
781         @Override
782         public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
783                 Http2Headers headers, int padding) throws Http2Exception {
784             verifyPrefaceReceived();
785             internalFrameListener.onPushPromiseRead(ctx, streamId, promisedStreamId, headers, padding);
786         }
787 
788         @Override
789         public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData)
790                 throws Http2Exception {
791             onGoAwayRead0(ctx, lastStreamId, errorCode, debugData);
792         }
793 
794         @Override
795         public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement)
796                 throws Http2Exception {
797             verifyPrefaceReceived();
798             internalFrameListener.onWindowUpdateRead(ctx, streamId, windowSizeIncrement);
799         }
800 
801         @Override
802         public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
803                 ByteBuf payload) throws Http2Exception {
804             onUnknownFrame0(ctx, frameType, streamId, flags, payload);
805         }
806     }
807 
808     private static final class ContentLength {
809         private final long expected;
810         private long seen;
811 
812         ContentLength(long expected) {
813             this.expected = expected;
814         }
815 
816         void increaseReceivedBytes(boolean server, int streamId, int bytes, boolean isEnd) throws Http2Exception {
817             seen += bytes;
818             // Check for overflow
819             if (seen < 0) {
820                 throw streamError(streamId, PROTOCOL_ERROR,
821                         "Received amount of data did overflow and so not match content-length header %d", expected);
822             }
823             // Check if we received more data then what was advertised via the content-length header.
824             if (seen > expected) {
825                 throw streamError(streamId, PROTOCOL_ERROR,
826                         "Received amount of data %d does not match content-length header %d", seen, expected);
827             }
828 
829             if (isEnd) {
830                 if (seen == 0 && !server) {
831                     // This may be a response to a HEAD request, let's just allow it.
832                     return;
833                 }
834 
835                 // Check that we really saw what was told via the content-length header.
836                 if (expected > seen) {
837                     throw streamError(streamId, PROTOCOL_ERROR,
838                             "Received amount of data %d does not match content-length header %d", seen, expected);
839                 }
840             }
841         }
842     }
843 }