查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.internal.UnstableApi;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.List;
37  import java.util.concurrent.TimeUnit;
38  
39  import static io.netty.buffer.ByteBufUtil.hexDump;
40  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51  import static io.netty.util.CharsetUtil.UTF_8;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static java.lang.Math.min;
54  import static java.util.concurrent.TimeUnit.MILLISECONDS;
55  
56  /**
57   * Provides the default implementation for processing inbound frame events and delegates to a
58   * {@link Http2FrameListener}
59   * <p>
60   * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
61   * <p>
62   * This interface enforces inbound flow control functionality through
63   * {@link Http2LocalFlowController}
64   */
65  @UnstableApi
66  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67                                                                              ChannelOutboundHandler {
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70  
71      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75  
76      private final Http2ConnectionDecoder decoder;
77      private final Http2ConnectionEncoder encoder;
78      private final Http2Settings initialSettings;
79      private final boolean decoupleCloseAndGoAway;
80      private final boolean flushPreface;
81      private ChannelFutureListener closeListener;
82      private BaseDecoder byteDecoder;
83      private long gracefulShutdownTimeoutMillis;
84  
85      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86                                       Http2Settings initialSettings) {
87          this(decoder, encoder, initialSettings, false);
88      }
89  
90      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
92          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
93      }
94  
95      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
96                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
97                                       boolean flushPreface) {
98          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
99          this.decoder = checkNotNull(decoder, "decoder");
100         this.encoder = checkNotNull(encoder, "encoder");
101         this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
102         this.flushPreface = flushPreface;
103         if (encoder.connection() != decoder.connection()) {
104             throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
105         }
106     }
107 
108     /**
109      * Get the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
110      * the connection during the graceful shutdown process. Returns -1 if this connection is configured to wait
111      * indefinitely for all streams to close.
112      */
113     public long gracefulShutdownTimeoutMillis() {
114         return gracefulShutdownTimeoutMillis;
115     }
116 
117     /**
118      * Set the amount of time (in milliseconds) this endpoint will wait for all streams to be closed before closing
119      * the connection during the graceful shutdown process.
120      * @param gracefulShutdownTimeoutMillis the amount of time (in milliseconds) this endpoint will wait for all
121      * streams to be closed before closing the connection during the graceful shutdown process.
122      */
123     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
124         if (gracefulShutdownTimeoutMillis < -1) {
125             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
126                                                " (expected: -1 for indefinite or >= 0)");
127         }
128         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
129     }
130 
131     public Http2Connection connection() {
132         return encoder.connection();
133     }
134 
135     public Http2ConnectionDecoder decoder() {
136         return decoder;
137     }
138 
139     public Http2ConnectionEncoder encoder() {
140         return encoder;
141     }
142 
143     private boolean prefaceSent() {
144         return byteDecoder != null && byteDecoder.prefaceSent();
145     }
146 
147     /**
148      * Handles the client-side (cleartext) upgrade from HTTP to HTTP/2.
149      * Reserves local stream 1 for the HTTP/2 response.
150      */
151     public void onHttpClientUpgrade() throws Http2Exception {
152         if (connection().isServer()) {
153             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
154         }
155         if (!prefaceSent()) {
156             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
157             // calling this method.
158             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
159         }
160         if (decoder.prefaceReceived()) {
161             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
162         }
163 
164         // Create a local stream used for the HTTP cleartext upgrade.
165         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
166     }
167 
168     /**
169      * Handles the server-side (cleartext) upgrade from HTTP to HTTP/2.
170      * @param settings the settings for the remote endpoint.
171      */
172     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
173         if (!connection().isServer()) {
174             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
175         }
176         if (!prefaceSent()) {
177             // If the preface was not sent yet it most likely means the handler was not added to the pipeline before
178             // calling this method.
179             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
180         }
181         if (decoder.prefaceReceived()) {
182             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
183         }
184 
185         // Apply the settings but no ACK is necessary.
186         encoder.remoteSettings(settings);
187 
188         // Create a stream in the half-closed state.
189         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
190     }
191 
192     @Override
193     public void flush(ChannelHandlerContext ctx) {
194         try {
195             // Trigger pending writes in the remote flow controller.
196             encoder.flowController().writePendingBytes();
197             ctx.flush();
198         } catch (Http2Exception e) {
199             onError(ctx, true, e);
200         } catch (Throwable cause) {
201             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
202         }
203     }
204 
205     private abstract class BaseDecoder {
206         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
207         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
208         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
209 
210         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
211             // Connection has terminated, close the encoder and decoder.
212             encoder().close();
213             decoder().close();
214 
215             // We need to remove all streams (not just the active ones).
216             // See https://github.com/netty/netty/issues/4838.
217             connection().close(ctx.voidPromise());
218         }
219 
220         /**
221          * Determine if the HTTP/2 connection preface been sent.
222          */
223         public boolean prefaceSent() {
224             return true;
225         }
226     }
227 
228     private final class PrefaceDecoder extends BaseDecoder {
229         private ByteBuf clientPrefaceString;
230         private boolean prefaceSent;
231 
232         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
233             clientPrefaceString = clientPrefaceString(encoder.connection());
234             // This handler was just added to the context. In case it was handled after
235             // the connection became active, send the connection preface now.
236             sendPreface(ctx);
237         }
238 
239         @Override
240         public boolean prefaceSent() {
241             return prefaceSent;
242         }
243 
244         @Override
245         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
246             try {
247                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
248                     // After the preface is read, it is time to hand over control to the post initialized decoder.
249                     byteDecoder = new FrameDecoder();
250                     byteDecoder.decode(ctx, in, out);
251                 }
252             } catch (Throwable e) {
253                 onError(ctx, false, e);
254             }
255         }
256 
257         @Override
258         public void channelActive(ChannelHandlerContext ctx) throws Exception {
259             // The channel just became active - send the connection preface to the remote endpoint.
260             sendPreface(ctx);
261 
262             if (flushPreface) {
263                 // As we don't know if any channelReadComplete() events will be triggered at all we need to ensure we
264                 // also flush. Otherwise the remote peer might never see the preface / settings frame.
265                 // See https://github.com/netty/netty/issues/12089
266                 ctx.flush();
267             }
268         }
269 
270         @Override
271         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
272             cleanup();
273             super.channelInactive(ctx);
274         }
275 
276         /**
277          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
278          */
279         @Override
280         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
281             cleanup();
282         }
283 
284         /**
285          * Releases the {@code clientPrefaceString}. Any active streams will be left in the open.
286          */
287         private void cleanup() {
288             if (clientPrefaceString != null) {
289                 clientPrefaceString.release();
290                 clientPrefaceString = null;
291             }
292         }
293 
294         /**
295          * Decodes the client connection preface string from the input buffer.
296          *
297          * @return {@code true} if processing of the client preface string is complete. Since client preface strings can
298          *         only be received by servers, returns true immediately for client endpoints.
299          */
300         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
301             if (clientPrefaceString == null) {
302                 return true;
303             }
304 
305             int prefaceRemaining = clientPrefaceString.readableBytes();
306             int bytesRead = min(in.readableBytes(), prefaceRemaining);
307 
308             // If the input so far doesn't match the preface, break the connection.
309             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
310                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
311                                                       bytesRead)) {
312                 int maxSearch = 1024; // picked because 512 is too little, and 2048 too much
313                 int http1Index =
314                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
315                 if (http1Index != -1) {
316                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
317                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
318                 }
319                 String receivedBytes = hexDump(in, in.readerIndex(),
320                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
321                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
322                                                       "Hex dump for received bytes: %s", receivedBytes);
323             }
324             in.skipBytes(bytesRead);
325             clientPrefaceString.skipBytes(bytesRead);
326 
327             if (!clientPrefaceString.isReadable()) {
328                 // Entire preface has been read.
329                 clientPrefaceString.release();
330                 clientPrefaceString = null;
331                 return true;
332             }
333             return false;
334         }
335 
336         /**
337          * Peeks at that the next frame in the buffer and verifies that it is a non-ack {@code SETTINGS} frame.
338          *
339          * @param in the inbound buffer.
340          * @return {@code true} if the next frame is a non-ack {@code SETTINGS} frame, {@code false} if more
341          * data is required before we can determine the next frame type.
342          * @throws Http2Exception thrown if the next frame is NOT a non-ack {@code SETTINGS} frame.
343          */
344         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
345             if (in.readableBytes() < 5) {
346                 // Need more data before we can see the frame type for the first frame.
347                 return false;
348             }
349 
350             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
351             short flags = in.getUnsignedByte(in.readerIndex() + 4);
352             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
353                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
354                                                       "Hex dump for first 5 bytes: %s",
355                                       hexDump(in, in.readerIndex(), 5));
356             }
357             return true;
358         }
359 
360         /**
361          * Sends the HTTP/2 connection preface upon establishment of the connection, if not already sent.
362          */
363         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364             if (prefaceSent || !ctx.channel().isActive()) {
365                 return;
366             }
367 
368             prefaceSent = true;
369 
370             final boolean isClient = !connection().isServer();
371             if (isClient) {
372                 // Clients must send the preface string as the first bytes on the connection.
373                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374             }
375 
376             // Both client and server must send their initial settings.
377             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378                     ChannelFutureListener.CLOSE_ON_FAILURE);
379 
380             if (isClient) {
381                 // If this handler is extended by the user and we directly fire the userEvent from this context then
382                 // the user will not see the event. We should fire the event starting with this handler so this class
383                 // (and extending classes) have a chance to process the event.
384                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
385             }
386         }
387     }
388 
389     private final class FrameDecoder extends BaseDecoder {
390         @Override
391         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
392             try {
393                 decoder.decodeFrame(ctx, in, out);
394             } catch (Throwable e) {
395                 onError(ctx, false, e);
396             }
397         }
398     }
399 
400     @Override
401     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402         // Initialize the encoder, decoder, flow controllers, and internal state.
403         encoder.lifecycleManager(this);
404         decoder.lifecycleManager(this);
405         encoder.flowController().channelHandlerContext(ctx);
406         decoder.flowController().channelHandlerContext(ctx);
407         byteDecoder = new PrefaceDecoder(ctx);
408     }
409 
410     @Override
411     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
412         if (byteDecoder != null) {
413             byteDecoder.handlerRemoved(ctx);
414             byteDecoder = null;
415         }
416     }
417 
418     @Override
419     public void channelActive(ChannelHandlerContext ctx) throws Exception {
420         if (byteDecoder == null) {
421             byteDecoder = new PrefaceDecoder(ctx);
422         }
423         byteDecoder.channelActive(ctx);
424         super.channelActive(ctx);
425     }
426 
427     @Override
428     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
429         // Call super class first, as this may result in decode being called.
430         super.channelInactive(ctx);
431         if (byteDecoder != null) {
432             byteDecoder.channelInactive(ctx);
433             byteDecoder = null;
434         }
435     }
436 
437     @Override
438     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
439         // Writability is expected to change while we are writing. We cannot allow this event to trigger reentering
440         // the allocation and write loop. Reentering the event loop will lead to over or illegal allocation.
441         try {
442             if (ctx.channel().isWritable()) {
443                 flush(ctx);
444             }
445             encoder.flowController().channelWritabilityChanged();
446         } finally {
447             super.channelWritabilityChanged(ctx);
448         }
449     }
450 
451     @Override
452     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
453         byteDecoder.decode(ctx, in, out);
454     }
455 
456     @Override
457     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
458         ctx.bind(localAddress, promise);
459     }
460 
461     @Override
462     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
463                         ChannelPromise promise) throws Exception {
464         ctx.connect(remoteAddress, localAddress, promise);
465     }
466 
467     @Override
468     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
469         ctx.disconnect(promise);
470     }
471 
472     @Override
473     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
474         if (decoupleCloseAndGoAway) {
475             ctx.close(promise);
476             return;
477         }
478         promise = promise.unvoid();
479         // Avoid NotYetConnectedException and avoid sending before connection preface
480         if (!ctx.channel().isActive() || !prefaceSent()) {
481             ctx.close(promise);
482             return;
483         }
484 
485         // If the user has already sent a GO_AWAY frame they may be attempting to do a graceful shutdown which requires
486         // sending multiple GO_AWAY frames. We should only send a GO_AWAY here if one has not already been sent. If
487         // a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
488         // flushed to the OS.
489         // https://github.com/netty/netty/issues/5307
490         ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
491         ctx.flush();
492         doGracefulShutdown(ctx, f, promise);
493     }
494 
495     private ChannelFutureListener newClosingChannelFutureListener(
496             ChannelHandlerContext ctx, ChannelPromise promise) {
497         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
498         return gracefulShutdownTimeoutMillis < 0 ?
499                 new ClosingChannelFutureListener(ctx, promise) :
500                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
501     }
502 
503     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
504         final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
505         if (isGracefulShutdownComplete()) {
506             // If there are no active streams, close immediately after the GO_AWAY write completes or the timeout
507             // elapsed.
508             future.addListener(listener);
509         } else {
510             // If there are active streams we should wait until they are all closed before closing the connection.
511 
512             // The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
513             // new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
514             if (closeListener == null) {
515                 closeListener = listener;
516             } else if (promise != null) {
517                 final ChannelFutureListener oldCloseListener = closeListener;
518                 closeListener = new ChannelFutureListener() {
519                     @Override
520                     public void operationComplete(ChannelFuture future) throws Exception {
521                         try {
522                             oldCloseListener.operationComplete(future);
523                         } finally {
524                             listener.operationComplete(future);
525                         }
526                     }
527                 };
528             }
529         }
530     }
531 
532     @Override
533     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
534         ctx.deregister(promise);
535     }
536 
537     @Override
538     public void read(ChannelHandlerContext ctx) throws Exception {
539         ctx.read();
540     }
541 
542     @Override
543     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
544         ctx.write(msg, promise);
545     }
546 
547     @Override
548     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
549         // Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
550         // for flow-control the read may release window that causes data to be written that can now be flushed.
551         try {
552             // First call channelReadComplete0(...) as this may produce more data that we want to flush
553             channelReadComplete0(ctx);
554         } finally {
555             flush(ctx);
556         }
557     }
558 
559     final void channelReadComplete0(ChannelHandlerContext ctx) {
560         // Discard bytes of the cumulation buffer if needed.
561         discardSomeReadBytes();
562 
563         // Ensure we never stale the HTTP/2 Channel. Flow-control is enforced by HTTP/2.
564         //
565         // See https://tools.ietf.org/html/rfc7540#section-5.2.2
566         if (!ctx.channel().config().isAutoRead()) {
567             ctx.read();
568         }
569 
570         ctx.fireChannelReadComplete();
571     }
572 
573     /**
574      * Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
575      */
576     @Override
577     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
578         if (getEmbeddedHttp2Exception(cause) != null) {
579             // Some exception in the causality chain is an Http2Exception - handle it.
580             onError(ctx, false, cause);
581         } else {
582             super.exceptionCaught(ctx, cause);
583         }
584     }
585 
586     /**
587      * Closes the local side of the given stream. If this causes the stream to be closed, adds a
588      * hook to close the channel after the given future completes.
589      *
590      * @param stream the stream to be half closed.
591      * @param future If closing, the future after which to close the channel.
592      */
593     @Override
594     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
595         switch (stream.state()) {
596             case HALF_CLOSED_LOCAL:
597             case OPEN:
598                 stream.closeLocalSide();
599                 break;
600             default:
601                 closeStream(stream, future);
602                 break;
603         }
604     }
605 
606     /**
607      * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
608      * hook to close the channel after the given future completes.
609      *
610      * @param stream the stream to be half closed.
611      * @param future If closing, the future after which to close the channel.
612      */
613     @Override
614     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
615         switch (stream.state()) {
616             case HALF_CLOSED_REMOTE:
617             case OPEN:
618                 stream.closeRemoteSide();
619                 break;
620             default:
621                 closeStream(stream, future);
622                 break;
623         }
624     }
625 
626     @Override
627     public void closeStream(final Http2Stream stream, ChannelFuture future) {
628         if (future.isDone()) {
629             doCloseStream(stream, future);
630         } else {
631             future.addListener(new ChannelFutureListener() {
632                 @Override
633                 public void operationComplete(ChannelFuture future) {
634                     doCloseStream(stream, future);
635                 }
636             });
637         }
638     }
639 
640     /**
641      * Central handler for all exceptions caught during HTTP/2 processing.
642      */
643     @Override
644     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
645         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
646         if (isStreamError(embedded)) {
647             onStreamError(ctx, outbound, cause, (StreamException) embedded);
648         } else if (embedded instanceof CompositeStreamException) {
649             CompositeStreamException compositException = (CompositeStreamException) embedded;
650             for (StreamException streamException : compositException) {
651                 onStreamError(ctx, outbound, cause, streamException);
652             }
653         } else {
654             onConnectionError(ctx, outbound, cause, embedded);
655         }
656         ctx.flush();
657     }
658 
659     /**
660      * Called by the graceful shutdown logic to determine when it is safe to close the connection. Returns {@code true}
661      * if the graceful shutdown has completed and the connection can be safely closed. This implementation just
662      * guarantees that there are no active streams. Subclasses may override to provide additional checks.
663      */
664     protected boolean isGracefulShutdownComplete() {
665         return connection().numActiveStreams() == 0;
666     }
667 
668     /**
669      * Handler for a connection error. Sends a GO_AWAY frame to the remote endpoint. Once all
670      * streams are closed, the connection is shut down.
671      *
672      * @param ctx the channel context
673      * @param outbound {@code true} if the error was caused by an outbound operation.
674      * @param cause the exception that was caught
675      * @param http2Ex the {@link Http2Exception} that is embedded in the causality chain. This may
676      *            be {@code null} if it's an unknown exception.
677      */
678     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
679                                      Throwable cause, Http2Exception http2Ex) {
680         if (http2Ex == null) {
681             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
682         }
683 
684         ChannelPromise promise = ctx.newPromise();
685         ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
686         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
687             doGracefulShutdown(ctx, future, promise);
688         } else {
689             future.addListener(newClosingChannelFutureListener(ctx, promise));
690         }
691     }
692 
693     /**
694      * Handler for a stream error. Sends a {@code RST_STREAM} frame to the remote endpoint and closes the
695      * stream.
696      *
697      * @param ctx the channel context
698      * @param outbound {@code true} if the error was caused by an outbound operation.
699      * @param cause the exception that was caught
700      * @param http2Ex the {@link StreamException} that is embedded in the causality chain.
701      */
702     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
703                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
704         final int streamId = http2Ex.streamId();
705         Http2Stream stream = connection().stream(streamId);
706 
707         //if this is caused by reading headers that are too large, send a header with status 431
708         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
709             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
710             connection().isServer()) {
711 
712             // NOTE We have to check to make sure that a stream exists before we send our reply.
713             // We likely always create the stream below as the stream isn't created until the
714             // header block is completely processed.
715 
716             // The case of a streamId referring to a stream which was already closed is handled
717             // by createStream and will land us in the catch block below
718             if (stream == null) {
719                 try {
720                     stream = encoder.connection().remote().createStream(streamId, true);
721                 } catch (Http2Exception e) {
722                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
723                     return;
724                 }
725             }
726 
727             // ensure that we have not already sent headers on this stream
728             if (stream != null && !stream.isHeadersSent()) {
729                 try {
730                     handleServerHeaderDecodeSizeError(ctx, stream);
731                 } catch (Throwable cause2) {
732                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
733                 }
734             }
735         }
736 
737         if (stream == null) {
738             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
739                 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
740             }
741         } else {
742             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
743         }
744     }
745 
746     /**
747      * Notifies client that this server has received headers that are larger than what it is
748      * willing to accept. Override to change behavior.
749      *
750      * @param ctx the channel context
751      * @param stream the Http2Stream on which the header was received
752      */
753     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
754         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
755     }
756 
757     protected Http2FrameWriter frameWriter() {
758         return encoder().frameWriter();
759     }
760 
761     /**
762      * Sends a {@code RST_STREAM} frame even if we don't know about the stream. This error condition is most likely
763      * triggered by the first frame of a stream being invalid. That is, there was an error reading the frame before
764      * we could create a new stream.
765      */
766     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
767                                              ChannelPromise promise) {
768         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
769         if (future.isDone()) {
770             closeConnectionOnError(ctx, future);
771         } else {
772             future.addListener(new ChannelFutureListener() {
773                 @Override
774                 public void operationComplete(ChannelFuture future) throws Exception {
775                     closeConnectionOnError(ctx, future);
776                 }
777             });
778         }
779         return future;
780     }
781 
782     @Override
783     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
784                                      ChannelPromise promise) {
785         final Http2Stream stream = connection().stream(streamId);
786         if (stream == null) {
787             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
788         }
789 
790        return resetStream(ctx, stream, errorCode, promise);
791     }
792 
793     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
794                                       long errorCode, ChannelPromise promise) {
795         promise = promise.unvoid();
796         if (stream.isResetSent()) {
797             // Don't write a RST_STREAM frame if we have already written one.
798             return promise.setSuccess();
799         }
800         // Synchronously set the resetSent flag to prevent any subsequent calls
801         // from resulting in multiple reset frames being sent.
802         //
803         // This needs to be done before we notify the promise as the promise may have a listener attached that
804         // call resetStream(...) again.
805         stream.resetSent();
806 
807         final ChannelFuture future;
808         // If the remote peer is not aware of the steam, then we are not allowed to send a RST_STREAM
809         // https://tools.ietf.org/html/rfc7540#section-6.4.
810         if (stream.state() == IDLE ||
811             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
812             future = promise.setSuccess();
813         } else {
814             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
815         }
816         if (future.isDone()) {
817             processRstStreamWriteResult(ctx, stream, future);
818         } else {
819             future.addListener(new ChannelFutureListener() {
820                 @Override
821                 public void operationComplete(ChannelFuture future) throws Exception {
822                     processRstStreamWriteResult(ctx, stream, future);
823                 }
824             });
825         }
826 
827         return future;
828     }
829 
830     @Override
831     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
832                                 final ByteBuf debugData, ChannelPromise promise) {
833         promise = promise.unvoid();
834         final Http2Connection connection = connection();
835         try {
836             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
837                 debugData.release();
838                 promise.trySuccess();
839                 return promise;
840             }
841         } catch (Throwable cause) {
842             debugData.release();
843             promise.tryFailure(cause);
844             return promise;
845         }
846 
847         // Need to retain before we write the buffer because if we do it after the refCnt could already be 0 and
848         // result in an IllegalRefCountException.
849         debugData.retain();
850         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
851 
852         if (future.isDone()) {
853             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
854         } else {
855             future.addListener(new ChannelFutureListener() {
856                 @Override
857                 public void operationComplete(ChannelFuture future) throws Exception {
858                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
859                 }
860             });
861         }
862 
863         return future;
864     }
865 
866     /**
867      * Closes the connection if the graceful shutdown process has completed.
868      * @param future Represents the status that will be passed to the {@link #closeListener}.
869      */
870     private void checkCloseConnection(ChannelFuture future) {
871         // If this connection is closing and the graceful shutdown has completed, close the connection
872         // once this operation completes.
873         if (closeListener != null && isGracefulShutdownComplete()) {
874             ChannelFutureListener closeListener = this.closeListener;
875             // This method could be called multiple times
876             // and we don't want to notify the closeListener multiple times.
877             this.closeListener = null;
878             try {
879                 closeListener.operationComplete(future);
880             } catch (Exception e) {
881                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
882             }
883         }
884     }
885 
886     /**
887      * Close the remote endpoint with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
888      * immediately, this is the responsibility of the caller.
889      */
890     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
891         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
892         int lastKnownStream;
893         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
894             // The hard shutdown could have been triggered during header processing, before updating
895             // lastStreamCreated(). Specifically, any connection errors encountered by Http2FrameReader or HPACK
896             // decoding will fail to update the last known stream. So we must be pessimistic.
897             // https://github.com/netty/netty/issues/10670
898             lastKnownStream = Integer.MAX_VALUE;
899         } else {
900             lastKnownStream = connection().remote().lastStreamCreated();
901         }
902         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
903     }
904 
905     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
906         if (future.isSuccess()) {
907             closeStream(stream, future);
908         } else {
909             // The connection will be closed and so no need to change the resetSent flag to false.
910             onConnectionError(ctx, true, future.cause(), null);
911         }
912     }
913 
914     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
915         if (!future.isSuccess()) {
916             onConnectionError(ctx, true, future.cause(), null);
917         }
918     }
919 
920     private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
921         stream.close();
922         checkCloseConnection(future);
923     }
924 
925     /**
926      * Returns the client preface string if this is a client connection, otherwise returns {@code null}.
927      */
928     private static ByteBuf clientPrefaceString(Http2Connection connection) {
929         return connection.isServer() ? connectionPrefaceBuf() : null;
930     }
931 
932     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
933                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
934         try {
935             if (future.isSuccess()) {
936                 if (errorCode != NO_ERROR.code()) {
937                     if (logger.isDebugEnabled()) {
938                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
939                                      "debugData '{}'. Forcing shutdown of the connection.",
940                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
941                     }
942                     ctx.close();
943                 }
944             } else {
945                 if (logger.isDebugEnabled()) {
946                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
947                                  "debugData '{}'. Forcing shutdown of the connection.",
948                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
949                 }
950                 ctx.close();
951             }
952         } finally {
953             // We're done with the debug data now.
954             debugData.release();
955         }
956     }
957 
958     /**
959      * Closes the channel when the future completes.
960      */
961     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
962         private final ChannelHandlerContext ctx;
963         private final ChannelPromise promise;
964         private final Future<?> timeoutTask;
965         private boolean closed;
966 
967         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
968             this.ctx = ctx;
969             this.promise = promise;
970             timeoutTask = null;
971         }
972 
973         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
974                                      long timeout, TimeUnit unit) {
975             this.ctx = ctx;
976             this.promise = promise;
977             timeoutTask = ctx.executor().schedule(new Runnable() {
978                 @Override
979                 public void run() {
980                     doClose();
981                 }
982             }, timeout, unit);
983         }
984 
985         @Override
986         public void operationComplete(ChannelFuture sentGoAwayFuture) {
987             if (timeoutTask != null) {
988                 timeoutTask.cancel(false);
989             }
990             doClose();
991         }
992 
993         private void doClose() {
994             // We need to guard against multiple calls as the timeout may trigger close() first and then it will be
995             // triggered again because of operationComplete(...) is called.
996             if (closed) {
997                 // This only happens if we also scheduled a timeout task.
998                 assert timeoutTask != null;
999                 return;
1000             }
1001             closed = true;
1002             if (promise == null) {
1003                 ctx.close();
1004             } else {
1005                 ctx.close(promise);
1006             }
1007         }
1008     }
1009 }