1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufUtil;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelOutboundHandler;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.handler.codec.ByteToMessageDecoder;
26  import io.netty.handler.codec.http.HttpResponseStatus;
27  import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28  import io.netty.handler.codec.http2.Http2Exception.StreamException;
29  import io.netty.util.CharsetUtil;
30  import io.netty.util.concurrent.Future;
31  import io.netty.util.internal.UnstableApi;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.List;
37  import java.util.concurrent.TimeUnit;
38  
39  import static io.netty.buffer.ByteBufUtil.hexDump;
40  import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41  import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42  import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43  import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45  import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48  import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49  import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50  import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51  import static io.netty.util.CharsetUtil.UTF_8;
52  import static io.netty.util.internal.ObjectUtil.checkNotNull;
53  import static java.lang.Math.min;
54  import static java.util.concurrent.TimeUnit.MILLISECONDS;
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  @UnstableApi
66  public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67                                                                              ChannelOutboundHandler {
68  
69      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70  
71      private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72              HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73      private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74          Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75  
76      private final Http2ConnectionDecoder decoder;
77      private final Http2ConnectionEncoder encoder;
78      private final Http2Settings initialSettings;
79      private final boolean decoupleCloseAndGoAway;
80      private final boolean flushPreface;
81      private ChannelFutureListener closeListener;
82      private BaseDecoder byteDecoder;
83      private long gracefulShutdownTimeoutMillis;
84  
85      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86                                       Http2Settings initialSettings) {
87          this(decoder, encoder, initialSettings, false);
88      }
89  
90      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
92          this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
93      }
94  
95      protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
96                                       Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
97                                       boolean flushPreface) {
98          this.initialSettings = checkNotNull(initialSettings, "initialSettings");
99          this.decoder = checkNotNull(decoder, "decoder");
100         this.encoder = checkNotNull(encoder, "encoder");
101         this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
102         this.flushPreface = flushPreface;
103         if (encoder.connection() != decoder.connection()) {
104             throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
105         }
106     }
107 
108     
109 
110 
111 
112 
113     public long gracefulShutdownTimeoutMillis() {
114         return gracefulShutdownTimeoutMillis;
115     }
116 
117     
118 
119 
120 
121 
122 
123     public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
124         if (gracefulShutdownTimeoutMillis < -1) {
125             throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
126                                                " (expected: -1 for indefinite or >= 0)");
127         }
128         this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
129     }
130 
131     public Http2Connection connection() {
132         return encoder.connection();
133     }
134 
135     public Http2ConnectionDecoder decoder() {
136         return decoder;
137     }
138 
139     public Http2ConnectionEncoder encoder() {
140         return encoder;
141     }
142 
143     private boolean prefaceSent() {
144         return byteDecoder != null && byteDecoder.prefaceSent();
145     }
146 
147     
148 
149 
150 
151     public void onHttpClientUpgrade() throws Http2Exception {
152         if (connection().isServer()) {
153             throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
154         }
155         if (!prefaceSent()) {
156             
157             
158             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
159         }
160         if (decoder.prefaceReceived()) {
161             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
162         }
163 
164         
165         connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
166     }
167 
168     
169 
170 
171 
172     public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
173         if (!connection().isServer()) {
174             throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
175         }
176         if (!prefaceSent()) {
177             
178             
179             throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
180         }
181         if (decoder.prefaceReceived()) {
182             throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
183         }
184 
185         
186         encoder.remoteSettings(settings);
187 
188         
189         connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
190     }
191 
192     @Override
193     public void flush(ChannelHandlerContext ctx) {
194         try {
195             
196             encoder.flowController().writePendingBytes();
197             ctx.flush();
198         } catch (Http2Exception e) {
199             onError(ctx, true, e);
200         } catch (Throwable cause) {
201             onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
202         }
203     }
204 
205     private abstract class BaseDecoder {
206         public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
207         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
208         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
209 
210         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
211             
212             encoder().close();
213             decoder().close();
214 
215             
216             
217             connection().close(ctx.voidPromise());
218         }
219 
220         
221 
222 
223         public boolean prefaceSent() {
224             return true;
225         }
226     }
227 
228     private final class PrefaceDecoder extends BaseDecoder {
229         private ByteBuf clientPrefaceString;
230         private boolean prefaceSent;
231 
232         PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
233             clientPrefaceString = clientPrefaceString(encoder.connection());
234             
235             
236             sendPreface(ctx);
237         }
238 
239         @Override
240         public boolean prefaceSent() {
241             return prefaceSent;
242         }
243 
244         @Override
245         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
246             try {
247                 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
248                     
249                     byteDecoder = new FrameDecoder();
250                     byteDecoder.decode(ctx, in, out);
251                 }
252             } catch (Throwable e) {
253                 onError(ctx, false, e);
254             }
255         }
256 
257         @Override
258         public void channelActive(ChannelHandlerContext ctx) throws Exception {
259             
260             sendPreface(ctx);
261 
262             if (flushPreface) {
263                 
264                 
265                 
266                 ctx.flush();
267             }
268         }
269 
270         @Override
271         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
272             cleanup();
273             super.channelInactive(ctx);
274         }
275 
276         
277 
278 
279         @Override
280         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
281             cleanup();
282         }
283 
284         
285 
286 
287         private void cleanup() {
288             if (clientPrefaceString != null) {
289                 clientPrefaceString.release();
290                 clientPrefaceString = null;
291             }
292         }
293 
294         
295 
296 
297 
298 
299 
300         private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
301             if (clientPrefaceString == null) {
302                 return true;
303             }
304 
305             int prefaceRemaining = clientPrefaceString.readableBytes();
306             int bytesRead = min(in.readableBytes(), prefaceRemaining);
307 
308             
309             if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
310                                                       clientPrefaceString, clientPrefaceString.readerIndex(),
311                                                       bytesRead)) {
312                 int maxSearch = 1024; 
313                 int http1Index =
314                     ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
315                 if (http1Index != -1) {
316                     String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
317                     throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
318                 }
319                 String receivedBytes = hexDump(in, in.readerIndex(),
320                                                min(in.readableBytes(), clientPrefaceString.readableBytes()));
321                 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
322                                                       "Hex dump for received bytes: %s", receivedBytes);
323             }
324             in.skipBytes(bytesRead);
325             clientPrefaceString.skipBytes(bytesRead);
326 
327             if (!clientPrefaceString.isReadable()) {
328                 
329                 clientPrefaceString.release();
330                 clientPrefaceString = null;
331                 return true;
332             }
333             return false;
334         }
335 
336         
337 
338 
339 
340 
341 
342 
343 
344         private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
345             if (in.readableBytes() < 5) {
346                 
347                 return false;
348             }
349 
350             short frameType = in.getUnsignedByte(in.readerIndex() + 3);
351             short flags = in.getUnsignedByte(in.readerIndex() + 4);
352             if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
353                 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
354                                                       "Hex dump for first 5 bytes: %s",
355                                       hexDump(in, in.readerIndex(), 5));
356             }
357             return true;
358         }
359 
360         
361 
362 
363         private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364             if (prefaceSent || !ctx.channel().isActive()) {
365                 return;
366             }
367 
368             prefaceSent = true;
369 
370             final boolean isClient = !connection().isServer();
371             if (isClient) {
372                 
373                 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374             }
375 
376             
377             encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378                     ChannelFutureListener.CLOSE_ON_FAILURE);
379 
380             if (isClient) {
381                 
382                 
383                 
384                 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
385             }
386         }
387     }
388 
389     private final class FrameDecoder extends BaseDecoder {
390         @Override
391         public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
392             try {
393                 decoder.decodeFrame(ctx, in, out);
394             } catch (Throwable e) {
395                 onError(ctx, false, e);
396             }
397         }
398     }
399 
400     @Override
401     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402         
403         encoder.lifecycleManager(this);
404         decoder.lifecycleManager(this);
405         encoder.flowController().channelHandlerContext(ctx);
406         decoder.flowController().channelHandlerContext(ctx);
407         byteDecoder = new PrefaceDecoder(ctx);
408     }
409 
410     @Override
411     protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
412         if (byteDecoder != null) {
413             byteDecoder.handlerRemoved(ctx);
414             byteDecoder = null;
415         }
416     }
417 
418     @Override
419     public void channelActive(ChannelHandlerContext ctx) throws Exception {
420         if (byteDecoder == null) {
421             byteDecoder = new PrefaceDecoder(ctx);
422         }
423         byteDecoder.channelActive(ctx);
424         super.channelActive(ctx);
425     }
426 
427     @Override
428     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
429         
430         super.channelInactive(ctx);
431         if (byteDecoder != null) {
432             byteDecoder.channelInactive(ctx);
433             byteDecoder = null;
434         }
435     }
436 
437     @Override
438     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
439         
440         
441         try {
442             if (ctx.channel().isWritable()) {
443                 flush(ctx);
444             }
445             encoder.flowController().channelWritabilityChanged();
446         } finally {
447             super.channelWritabilityChanged(ctx);
448         }
449     }
450 
451     @Override
452     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
453         byteDecoder.decode(ctx, in, out);
454     }
455 
456     @Override
457     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
458         ctx.bind(localAddress, promise);
459     }
460 
461     @Override
462     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
463                         ChannelPromise promise) throws Exception {
464         ctx.connect(remoteAddress, localAddress, promise);
465     }
466 
467     @Override
468     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
469         ctx.disconnect(promise);
470     }
471 
472     @Override
473     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
474         if (decoupleCloseAndGoAway) {
475             ctx.close(promise);
476             return;
477         }
478         promise = promise.unvoid();
479         
480         if (!ctx.channel().isActive() || !prefaceSent()) {
481             ctx.close(promise);
482             return;
483         }
484 
485         
486         
487         
488         
489         
490         ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
491         ctx.flush();
492         doGracefulShutdown(ctx, f, promise);
493     }
494 
495     private ChannelFutureListener newClosingChannelFutureListener(
496             ChannelHandlerContext ctx, ChannelPromise promise) {
497         long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
498         return gracefulShutdownTimeoutMillis < 0 ?
499                 new ClosingChannelFutureListener(ctx, promise) :
500                 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
501     }
502 
503     private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
504         final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
505         if (isGracefulShutdownComplete()) {
506             
507             
508             future.addListener(listener);
509         } else {
510             
511 
512             
513             
514             if (closeListener == null) {
515                 closeListener = listener;
516             } else if (promise != null) {
517                 final ChannelFutureListener oldCloseListener = closeListener;
518                 closeListener = new ChannelFutureListener() {
519                     @Override
520                     public void operationComplete(ChannelFuture future) throws Exception {
521                         try {
522                             oldCloseListener.operationComplete(future);
523                         } finally {
524                             listener.operationComplete(future);
525                         }
526                     }
527                 };
528             }
529         }
530     }
531 
532     @Override
533     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
534         ctx.deregister(promise);
535     }
536 
537     @Override
538     public void read(ChannelHandlerContext ctx) throws Exception {
539         ctx.read();
540     }
541 
542     @Override
543     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
544         ctx.write(msg, promise);
545     }
546 
547     @Override
548     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
549         
550         
551         try {
552             
553             channelReadComplete0(ctx);
554         } finally {
555             flush(ctx);
556         }
557     }
558 
559     final void channelReadComplete0(ChannelHandlerContext ctx) {
560         
561         discardSomeReadBytes();
562 
563         
564         
565         
566         if (!ctx.channel().config().isAutoRead()) {
567             ctx.read();
568         }
569 
570         ctx.fireChannelReadComplete();
571     }
572 
573     
574 
575 
576     @Override
577     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
578         if (getEmbeddedHttp2Exception(cause) != null) {
579             
580             onError(ctx, false, cause);
581         } else {
582             super.exceptionCaught(ctx, cause);
583         }
584     }
585 
586     
587 
588 
589 
590 
591 
592 
593     @Override
594     public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
595         switch (stream.state()) {
596             case HALF_CLOSED_LOCAL:
597             case OPEN:
598                 stream.closeLocalSide();
599                 break;
600             default:
601                 closeStream(stream, future);
602                 break;
603         }
604     }
605 
606     
607 
608 
609 
610 
611 
612 
613     @Override
614     public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
615         switch (stream.state()) {
616             case HALF_CLOSED_REMOTE:
617             case OPEN:
618                 stream.closeRemoteSide();
619                 break;
620             default:
621                 closeStream(stream, future);
622                 break;
623         }
624     }
625 
626     @Override
627     public void closeStream(final Http2Stream stream, ChannelFuture future) {
628         if (future.isDone()) {
629             doCloseStream(stream, future);
630         } else {
631             future.addListener(new ChannelFutureListener() {
632                 @Override
633                 public void operationComplete(ChannelFuture future) {
634                     doCloseStream(stream, future);
635                 }
636             });
637         }
638     }
639 
640     
641 
642 
643     @Override
644     public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
645         Http2Exception embedded = getEmbeddedHttp2Exception(cause);
646         if (isStreamError(embedded)) {
647             onStreamError(ctx, outbound, cause, (StreamException) embedded);
648         } else if (embedded instanceof CompositeStreamException) {
649             CompositeStreamException compositException = (CompositeStreamException) embedded;
650             for (StreamException streamException : compositException) {
651                 onStreamError(ctx, outbound, cause, streamException);
652             }
653         } else {
654             onConnectionError(ctx, outbound, cause, embedded);
655         }
656         ctx.flush();
657     }
658 
659     
660 
661 
662 
663 
664     protected boolean isGracefulShutdownComplete() {
665         return connection().numActiveStreams() == 0;
666     }
667 
668     
669 
670 
671 
672 
673 
674 
675 
676 
677 
678     protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
679                                      Throwable cause, Http2Exception http2Ex) {
680         if (http2Ex == null) {
681             http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
682         }
683 
684         ChannelPromise promise = ctx.newPromise();
685         ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
686         if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
687             doGracefulShutdown(ctx, future, promise);
688         } else {
689             future.addListener(newClosingChannelFutureListener(ctx, promise));
690         }
691     }
692 
693     
694 
695 
696 
697 
698 
699 
700 
701 
702     protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
703                                  @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
704         final int streamId = http2Ex.streamId();
705         Http2Stream stream = connection().stream(streamId);
706 
707         
708         if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
709             ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
710             connection().isServer()) {
711 
712             
713             
714             
715 
716             
717             
718             if (stream == null) {
719                 try {
720                     stream = encoder.connection().remote().createStream(streamId, true);
721                 } catch (Http2Exception e) {
722                     resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
723                     return;
724                 }
725             }
726 
727             
728             if (stream != null && !stream.isHeadersSent()) {
729                 try {
730                     handleServerHeaderDecodeSizeError(ctx, stream);
731                 } catch (Throwable cause2) {
732                     onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
733                 }
734             }
735         }
736 
737         if (stream == null) {
738             if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
739                 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
740             }
741         } else {
742             resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
743         }
744     }
745 
746     
747 
748 
749 
750 
751 
752 
753     protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
754         encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
755     }
756 
757     protected Http2FrameWriter frameWriter() {
758         return encoder().frameWriter();
759     }
760 
761     
762 
763 
764 
765 
766     private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
767                                              ChannelPromise promise) {
768         ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
769         if (future.isDone()) {
770             closeConnectionOnError(ctx, future);
771         } else {
772             future.addListener(new ChannelFutureListener() {
773                 @Override
774                 public void operationComplete(ChannelFuture future) throws Exception {
775                     closeConnectionOnError(ctx, future);
776                 }
777             });
778         }
779         return future;
780     }
781 
782     @Override
783     public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
784                                      ChannelPromise promise) {
785         final Http2Stream stream = connection().stream(streamId);
786         if (stream == null) {
787             return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
788         }
789 
790        return resetStream(ctx, stream, errorCode, promise);
791     }
792 
793     private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
794                                       long errorCode, ChannelPromise promise) {
795         promise = promise.unvoid();
796         if (stream.isResetSent()) {
797             
798             return promise.setSuccess();
799         }
800         
801         
802         
803         
804         
805         stream.resetSent();
806 
807         final ChannelFuture future;
808         
809         
810         if (stream.state() == IDLE ||
811             connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
812             future = promise.setSuccess();
813         } else {
814             future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
815         }
816         if (future.isDone()) {
817             processRstStreamWriteResult(ctx, stream, future);
818         } else {
819             future.addListener(new ChannelFutureListener() {
820                 @Override
821                 public void operationComplete(ChannelFuture future) throws Exception {
822                     processRstStreamWriteResult(ctx, stream, future);
823                 }
824             });
825         }
826 
827         return future;
828     }
829 
830     @Override
831     public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
832                                 final ByteBuf debugData, ChannelPromise promise) {
833         promise = promise.unvoid();
834         final Http2Connection connection = connection();
835         try {
836             if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
837                 debugData.release();
838                 promise.trySuccess();
839                 return promise;
840             }
841         } catch (Throwable cause) {
842             debugData.release();
843             promise.tryFailure(cause);
844             return promise;
845         }
846 
847         
848         
849         debugData.retain();
850         ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
851 
852         if (future.isDone()) {
853             processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
854         } else {
855             future.addListener(new ChannelFutureListener() {
856                 @Override
857                 public void operationComplete(ChannelFuture future) throws Exception {
858                     processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
859                 }
860             });
861         }
862 
863         return future;
864     }
865 
866     
867 
868 
869 
870     private void checkCloseConnection(ChannelFuture future) {
871         
872         
873         if (closeListener != null && isGracefulShutdownComplete()) {
874             ChannelFutureListener closeListener = this.closeListener;
875             
876             
877             this.closeListener = null;
878             try {
879                 closeListener.operationComplete(future);
880             } catch (Exception e) {
881                 throw new IllegalStateException("Close listener threw an unexpected exception", e);
882             }
883         }
884     }
885 
886     
887 
888 
889 
890     private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
891         long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
892         int lastKnownStream;
893         if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
894             
895             
896             
897             
898             lastKnownStream = Integer.MAX_VALUE;
899         } else {
900             lastKnownStream = connection().remote().lastStreamCreated();
901         }
902         return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
903     }
904 
905     private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
906         if (future.isSuccess()) {
907             closeStream(stream, future);
908         } else {
909             
910             onConnectionError(ctx, true, future.cause(), null);
911         }
912     }
913 
914     private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
915         if (!future.isSuccess()) {
916             onConnectionError(ctx, true, future.cause(), null);
917         }
918     }
919 
920     private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
921         stream.close();
922         checkCloseConnection(future);
923     }
924 
925     
926 
927 
928     private static ByteBuf clientPrefaceString(Http2Connection connection) {
929         return connection.isServer() ? connectionPrefaceBuf() : null;
930     }
931 
932     private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
933                                                  final long errorCode, final ByteBuf debugData, ChannelFuture future) {
934         try {
935             if (future.isSuccess()) {
936                 if (errorCode != NO_ERROR.code()) {
937                     if (logger.isDebugEnabled()) {
938                         logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
939                                      "debugData '{}'. Forcing shutdown of the connection.",
940                                      ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
941                     }
942                     ctx.close();
943                 }
944             } else {
945                 if (logger.isDebugEnabled()) {
946                     logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
947                                  "debugData '{}'. Forcing shutdown of the connection.",
948                                  ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
949                 }
950                 ctx.close();
951             }
952         } finally {
953             
954             debugData.release();
955         }
956     }
957 
958     
959 
960 
961     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
962         private final ChannelHandlerContext ctx;
963         private final ChannelPromise promise;
964         private final Future<?> timeoutTask;
965         private boolean closed;
966 
967         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
968             this.ctx = ctx;
969             this.promise = promise;
970             timeoutTask = null;
971         }
972 
973         ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
974                                      long timeout, TimeUnit unit) {
975             this.ctx = ctx;
976             this.promise = promise;
977             timeoutTask = ctx.executor().schedule(new Runnable() {
978                 @Override
979                 public void run() {
980                     doClose();
981                 }
982             }, timeout, unit);
983         }
984 
985         @Override
986         public void operationComplete(ChannelFuture sentGoAwayFuture) {
987             if (timeoutTask != null) {
988                 timeoutTask.cancel(false);
989             }
990             doClose();
991         }
992 
993         private void doClose() {
994             
995             
996             if (closed) {
997                 
998                 assert timeoutTask != null;
999                 return;
1000             }
1001             closed = true;
1002             if (promise == null) {
1003                 ctx.close();
1004             } else {
1005                 ctx.close(promise);
1006             }
1007         }
1008     }
1009 }