1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufUtil;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelOutboundHandler;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.http.HttpResponseStatus;
27 import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
28 import io.netty.handler.codec.http2.Http2Exception.StreamException;
29 import io.netty.util.CharsetUtil;
30 import io.netty.util.concurrent.Future;
31 import io.netty.util.internal.UnstableApi;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.net.SocketAddress;
36 import java.util.List;
37 import java.util.concurrent.TimeUnit;
38
39 import static io.netty.buffer.ByteBufUtil.hexDump;
40 import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
41 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
42 import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
43 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
44 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
46 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
47 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
48 import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
49 import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
50 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
51 import static io.netty.util.CharsetUtil.UTF_8;
52 import static io.netty.util.internal.ObjectUtil.checkNotNull;
53 import static java.lang.Math.min;
54 import static java.util.concurrent.TimeUnit.MILLISECONDS;
55
56
57
58
59
60
61
62
63
64
65 @UnstableApi
66 public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
67 ChannelOutboundHandler {
68
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(Http2ConnectionHandler.class);
70
71 private static final Http2Headers HEADERS_TOO_LARGE_HEADERS = ReadOnlyHttp2Headers.serverHeaders(false,
72 HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE.codeAsText());
73 private static final ByteBuf HTTP_1_X_BUF = Unpooled.unreleasableBuffer(
74 Unpooled.wrappedBuffer(new byte[] {'H', 'T', 'T', 'P', '/', '1', '.'})).asReadOnly();
75
76 private final Http2ConnectionDecoder decoder;
77 private final Http2ConnectionEncoder encoder;
78 private final Http2Settings initialSettings;
79 private final boolean decoupleCloseAndGoAway;
80 private final boolean flushPreface;
81 private ChannelFutureListener closeListener;
82 private BaseDecoder byteDecoder;
83 private long gracefulShutdownTimeoutMillis;
84
85 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
86 Http2Settings initialSettings) {
87 this(decoder, encoder, initialSettings, false);
88 }
89
90 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
91 Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
92 this(decoder, encoder, initialSettings, decoupleCloseAndGoAway, true);
93 }
94
95 protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
96 Http2Settings initialSettings, boolean decoupleCloseAndGoAway,
97 boolean flushPreface) {
98 this.initialSettings = checkNotNull(initialSettings, "initialSettings");
99 this.decoder = checkNotNull(decoder, "decoder");
100 this.encoder = checkNotNull(encoder, "encoder");
101 this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
102 this.flushPreface = flushPreface;
103 if (encoder.connection() != decoder.connection()) {
104 throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
105 }
106 }
107
108
109
110
111
112
113 public long gracefulShutdownTimeoutMillis() {
114 return gracefulShutdownTimeoutMillis;
115 }
116
117
118
119
120
121
122
123 public void gracefulShutdownTimeoutMillis(long gracefulShutdownTimeoutMillis) {
124 if (gracefulShutdownTimeoutMillis < -1) {
125 throw new IllegalArgumentException("gracefulShutdownTimeoutMillis: " + gracefulShutdownTimeoutMillis +
126 " (expected: -1 for indefinite or >= 0)");
127 }
128 this.gracefulShutdownTimeoutMillis = gracefulShutdownTimeoutMillis;
129 }
130
131 public Http2Connection connection() {
132 return encoder.connection();
133 }
134
135 public Http2ConnectionDecoder decoder() {
136 return decoder;
137 }
138
139 public Http2ConnectionEncoder encoder() {
140 return encoder;
141 }
142
143 private boolean prefaceSent() {
144 return byteDecoder != null && byteDecoder.prefaceSent();
145 }
146
147
148
149
150
151 public void onHttpClientUpgrade() throws Http2Exception {
152 if (connection().isServer()) {
153 throw connectionError(PROTOCOL_ERROR, "Client-side HTTP upgrade requested for a server");
154 }
155 if (!prefaceSent()) {
156
157
158 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
159 }
160 if (decoder.prefaceReceived()) {
161 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
162 }
163
164
165 connection().local().createStream(HTTP_UPGRADE_STREAM_ID, true);
166 }
167
168
169
170
171
172 public void onHttpServerUpgrade(Http2Settings settings) throws Http2Exception {
173 if (!connection().isServer()) {
174 throw connectionError(PROTOCOL_ERROR, "Server-side HTTP upgrade requested for a client");
175 }
176 if (!prefaceSent()) {
177
178
179 throw connectionError(INTERNAL_ERROR, "HTTP upgrade must occur after preface was sent");
180 }
181 if (decoder.prefaceReceived()) {
182 throw connectionError(PROTOCOL_ERROR, "HTTP upgrade must occur before HTTP/2 preface is received");
183 }
184
185
186 encoder.remoteSettings(settings);
187
188
189 connection().remote().createStream(HTTP_UPGRADE_STREAM_ID, true);
190 }
191
192 @Override
193 public void flush(ChannelHandlerContext ctx) {
194 try {
195
196 encoder.flowController().writePendingBytes();
197 ctx.flush();
198 } catch (Http2Exception e) {
199 onError(ctx, true, e);
200 } catch (Throwable cause) {
201 onError(ctx, true, connectionError(INTERNAL_ERROR, cause, "Error flushing"));
202 }
203 }
204
205 private abstract class BaseDecoder {
206 public abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
207 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
208 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
209
210 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
211
212 encoder().close();
213 decoder().close();
214
215
216
217 connection().close(ctx.voidPromise());
218 }
219
220
221
222
223 public boolean prefaceSent() {
224 return true;
225 }
226 }
227
228 private final class PrefaceDecoder extends BaseDecoder {
229 private ByteBuf clientPrefaceString;
230 private boolean prefaceSent;
231
232 PrefaceDecoder(ChannelHandlerContext ctx) throws Exception {
233 clientPrefaceString = clientPrefaceString(encoder.connection());
234
235
236 sendPreface(ctx);
237 }
238
239 @Override
240 public boolean prefaceSent() {
241 return prefaceSent;
242 }
243
244 @Override
245 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
246 try {
247 if (ctx.channel().isActive() && readClientPrefaceString(in) && verifyFirstFrameIsSettings(in)) {
248
249 byteDecoder = new FrameDecoder();
250 byteDecoder.decode(ctx, in, out);
251 }
252 } catch (Throwable e) {
253 onError(ctx, false, e);
254 }
255 }
256
257 @Override
258 public void channelActive(ChannelHandlerContext ctx) throws Exception {
259
260 sendPreface(ctx);
261
262 if (flushPreface) {
263
264
265
266 ctx.flush();
267 }
268 }
269
270 @Override
271 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
272 cleanup();
273 super.channelInactive(ctx);
274 }
275
276
277
278
279 @Override
280 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
281 cleanup();
282 }
283
284
285
286
287 private void cleanup() {
288 if (clientPrefaceString != null) {
289 clientPrefaceString.release();
290 clientPrefaceString = null;
291 }
292 }
293
294
295
296
297
298
299
300 private boolean readClientPrefaceString(ByteBuf in) throws Http2Exception {
301 if (clientPrefaceString == null) {
302 return true;
303 }
304
305 int prefaceRemaining = clientPrefaceString.readableBytes();
306 int bytesRead = min(in.readableBytes(), prefaceRemaining);
307
308
309 if (bytesRead == 0 || !ByteBufUtil.equals(in, in.readerIndex(),
310 clientPrefaceString, clientPrefaceString.readerIndex(),
311 bytesRead)) {
312 int maxSearch = 1024;
313 int http1Index =
314 ByteBufUtil.indexOf(HTTP_1_X_BUF, in.slice(in.readerIndex(), min(in.readableBytes(), maxSearch)));
315 if (http1Index != -1) {
316 String chunk = in.toString(in.readerIndex(), http1Index - in.readerIndex(), CharsetUtil.US_ASCII);
317 throw connectionError(PROTOCOL_ERROR, "Unexpected HTTP/1.x request: %s", chunk);
318 }
319 String receivedBytes = hexDump(in, in.readerIndex(),
320 min(in.readableBytes(), clientPrefaceString.readableBytes()));
321 throw connectionError(PROTOCOL_ERROR, "HTTP/2 client preface string missing or corrupt. " +
322 "Hex dump for received bytes: %s", receivedBytes);
323 }
324 in.skipBytes(bytesRead);
325 clientPrefaceString.skipBytes(bytesRead);
326
327 if (!clientPrefaceString.isReadable()) {
328
329 clientPrefaceString.release();
330 clientPrefaceString = null;
331 return true;
332 }
333 return false;
334 }
335
336
337
338
339
340
341
342
343
344 private boolean verifyFirstFrameIsSettings(ByteBuf in) throws Http2Exception {
345 if (in.readableBytes() < 5) {
346
347 return false;
348 }
349
350 short frameType = in.getUnsignedByte(in.readerIndex() + 3);
351 short flags = in.getUnsignedByte(in.readerIndex() + 4);
352 if (frameType != SETTINGS || (flags & Http2Flags.ACK) != 0) {
353 throw connectionError(PROTOCOL_ERROR, "First received frame was not SETTINGS. " +
354 "Hex dump for first 5 bytes: %s",
355 hexDump(in, in.readerIndex(), 5));
356 }
357 return true;
358 }
359
360
361
362
363 private void sendPreface(ChannelHandlerContext ctx) throws Exception {
364 if (prefaceSent || !ctx.channel().isActive()) {
365 return;
366 }
367
368 prefaceSent = true;
369
370 final boolean isClient = !connection().isServer();
371 if (isClient) {
372
373 ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
374 }
375
376
377 encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener(
378 ChannelFutureListener.CLOSE_ON_FAILURE);
379
380 if (isClient) {
381
382
383
384 userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE);
385 }
386 }
387 }
388
389 private final class FrameDecoder extends BaseDecoder {
390 @Override
391 public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
392 try {
393 decoder.decodeFrame(ctx, in, out);
394 } catch (Throwable e) {
395 onError(ctx, false, e);
396 }
397 }
398 }
399
400 @Override
401 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
402
403 encoder.lifecycleManager(this);
404 decoder.lifecycleManager(this);
405 encoder.flowController().channelHandlerContext(ctx);
406 decoder.flowController().channelHandlerContext(ctx);
407 byteDecoder = new PrefaceDecoder(ctx);
408 }
409
410 @Override
411 protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
412 if (byteDecoder != null) {
413 byteDecoder.handlerRemoved(ctx);
414 byteDecoder = null;
415 }
416 }
417
418 @Override
419 public void channelActive(ChannelHandlerContext ctx) throws Exception {
420 if (byteDecoder == null) {
421 byteDecoder = new PrefaceDecoder(ctx);
422 }
423 byteDecoder.channelActive(ctx);
424 super.channelActive(ctx);
425 }
426
427 @Override
428 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
429
430 super.channelInactive(ctx);
431 if (byteDecoder != null) {
432 byteDecoder.channelInactive(ctx);
433 byteDecoder = null;
434 }
435 }
436
437 @Override
438 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
439
440
441 try {
442 if (ctx.channel().isWritable()) {
443 flush(ctx);
444 }
445 encoder.flowController().channelWritabilityChanged();
446 } finally {
447 super.channelWritabilityChanged(ctx);
448 }
449 }
450
451 @Override
452 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
453 byteDecoder.decode(ctx, in, out);
454 }
455
456 @Override
457 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
458 ctx.bind(localAddress, promise);
459 }
460
461 @Override
462 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
463 ChannelPromise promise) throws Exception {
464 ctx.connect(remoteAddress, localAddress, promise);
465 }
466
467 @Override
468 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
469 ctx.disconnect(promise);
470 }
471
472 @Override
473 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
474 if (decoupleCloseAndGoAway) {
475 ctx.close(promise);
476 return;
477 }
478 promise = promise.unvoid();
479
480 if (!ctx.channel().isActive() || !prefaceSent()) {
481 ctx.close(promise);
482 return;
483 }
484
485
486
487
488
489
490 ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
491 ctx.flush();
492 doGracefulShutdown(ctx, f, promise);
493 }
494
495 private ChannelFutureListener newClosingChannelFutureListener(
496 ChannelHandlerContext ctx, ChannelPromise promise) {
497 long gracefulShutdownTimeoutMillis = this.gracefulShutdownTimeoutMillis;
498 return gracefulShutdownTimeoutMillis < 0 ?
499 new ClosingChannelFutureListener(ctx, promise) :
500 new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
501 }
502
503 private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
504 final ChannelFutureListener listener = newClosingChannelFutureListener(ctx, promise);
505 if (isGracefulShutdownComplete()) {
506
507
508 future.addListener(listener);
509 } else {
510
511
512
513
514 if (closeListener == null) {
515 closeListener = listener;
516 } else if (promise != null) {
517 final ChannelFutureListener oldCloseListener = closeListener;
518 closeListener = new ChannelFutureListener() {
519 @Override
520 public void operationComplete(ChannelFuture future) throws Exception {
521 try {
522 oldCloseListener.operationComplete(future);
523 } finally {
524 listener.operationComplete(future);
525 }
526 }
527 };
528 }
529 }
530 }
531
532 @Override
533 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
534 ctx.deregister(promise);
535 }
536
537 @Override
538 public void read(ChannelHandlerContext ctx) throws Exception {
539 ctx.read();
540 }
541
542 @Override
543 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
544 ctx.write(msg, promise);
545 }
546
547 @Override
548 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
549
550
551 try {
552
553 channelReadComplete0(ctx);
554 } finally {
555 flush(ctx);
556 }
557 }
558
559 final void channelReadComplete0(ChannelHandlerContext ctx) {
560
561 discardSomeReadBytes();
562
563
564
565
566 if (!ctx.channel().config().isAutoRead()) {
567 ctx.read();
568 }
569
570 ctx.fireChannelReadComplete();
571 }
572
573
574
575
576 @Override
577 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
578 if (getEmbeddedHttp2Exception(cause) != null) {
579
580 onError(ctx, false, cause);
581 } else {
582 super.exceptionCaught(ctx, cause);
583 }
584 }
585
586
587
588
589
590
591
592
593 @Override
594 public void closeStreamLocal(Http2Stream stream, ChannelFuture future) {
595 switch (stream.state()) {
596 case HALF_CLOSED_LOCAL:
597 case OPEN:
598 stream.closeLocalSide();
599 break;
600 default:
601 closeStream(stream, future);
602 break;
603 }
604 }
605
606
607
608
609
610
611
612
613 @Override
614 public void closeStreamRemote(Http2Stream stream, ChannelFuture future) {
615 switch (stream.state()) {
616 case HALF_CLOSED_REMOTE:
617 case OPEN:
618 stream.closeRemoteSide();
619 break;
620 default:
621 closeStream(stream, future);
622 break;
623 }
624 }
625
626 @Override
627 public void closeStream(final Http2Stream stream, ChannelFuture future) {
628 if (future.isDone()) {
629 doCloseStream(stream, future);
630 } else {
631 future.addListener(new ChannelFutureListener() {
632 @Override
633 public void operationComplete(ChannelFuture future) {
634 doCloseStream(stream, future);
635 }
636 });
637 }
638 }
639
640
641
642
643 @Override
644 public void onError(ChannelHandlerContext ctx, boolean outbound, Throwable cause) {
645 Http2Exception embedded = getEmbeddedHttp2Exception(cause);
646 if (isStreamError(embedded)) {
647 onStreamError(ctx, outbound, cause, (StreamException) embedded);
648 } else if (embedded instanceof CompositeStreamException) {
649 CompositeStreamException compositException = (CompositeStreamException) embedded;
650 for (StreamException streamException : compositException) {
651 onStreamError(ctx, outbound, cause, streamException);
652 }
653 } else {
654 onConnectionError(ctx, outbound, cause, embedded);
655 }
656 ctx.flush();
657 }
658
659
660
661
662
663
664 protected boolean isGracefulShutdownComplete() {
665 return connection().numActiveStreams() == 0;
666 }
667
668
669
670
671
672
673
674
675
676
677
678 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
679 Throwable cause, Http2Exception http2Ex) {
680 if (http2Ex == null) {
681 http2Ex = new Http2Exception(INTERNAL_ERROR, cause.getMessage(), cause);
682 }
683
684 ChannelPromise promise = ctx.newPromise();
685 ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
686 if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
687 doGracefulShutdown(ctx, future, promise);
688 } else {
689 future.addListener(newClosingChannelFutureListener(ctx, promise));
690 }
691 }
692
693
694
695
696
697
698
699
700
701
702 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound,
703 @SuppressWarnings("unused") Throwable cause, StreamException http2Ex) {
704 final int streamId = http2Ex.streamId();
705 Http2Stream stream = connection().stream(streamId);
706
707
708 if (http2Ex instanceof Http2Exception.HeaderListSizeException &&
709 ((Http2Exception.HeaderListSizeException) http2Ex).duringDecode() &&
710 connection().isServer()) {
711
712
713
714
715
716
717
718 if (stream == null) {
719 try {
720 stream = encoder.connection().remote().createStream(streamId, true);
721 } catch (Http2Exception e) {
722 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
723 return;
724 }
725 }
726
727
728 if (stream != null && !stream.isHeadersSent()) {
729 try {
730 handleServerHeaderDecodeSizeError(ctx, stream);
731 } catch (Throwable cause2) {
732 onError(ctx, outbound, connectionError(INTERNAL_ERROR, cause2, "Error DecodeSizeError"));
733 }
734 }
735 }
736
737 if (stream == null) {
738 if (!outbound || connection().local().mayHaveCreatedStream(streamId)) {
739 resetUnknownStream(ctx, streamId, http2Ex.error().code(), ctx.newPromise());
740 }
741 } else {
742 resetStream(ctx, stream, http2Ex.error().code(), ctx.newPromise());
743 }
744 }
745
746
747
748
749
750
751
752
753 protected void handleServerHeaderDecodeSizeError(ChannelHandlerContext ctx, Http2Stream stream) {
754 encoder().writeHeaders(ctx, stream.id(), HEADERS_TOO_LARGE_HEADERS, 0, true, ctx.newPromise());
755 }
756
757 protected Http2FrameWriter frameWriter() {
758 return encoder().frameWriter();
759 }
760
761
762
763
764
765
766 private ChannelFuture resetUnknownStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
767 ChannelPromise promise) {
768 ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
769 if (future.isDone()) {
770 closeConnectionOnError(ctx, future);
771 } else {
772 future.addListener(new ChannelFutureListener() {
773 @Override
774 public void operationComplete(ChannelFuture future) throws Exception {
775 closeConnectionOnError(ctx, future);
776 }
777 });
778 }
779 return future;
780 }
781
782 @Override
783 public ChannelFuture resetStream(final ChannelHandlerContext ctx, int streamId, long errorCode,
784 ChannelPromise promise) {
785 final Http2Stream stream = connection().stream(streamId);
786 if (stream == null) {
787 return resetUnknownStream(ctx, streamId, errorCode, promise.unvoid());
788 }
789
790 return resetStream(ctx, stream, errorCode, promise);
791 }
792
793 private ChannelFuture resetStream(final ChannelHandlerContext ctx, final Http2Stream stream,
794 long errorCode, ChannelPromise promise) {
795 promise = promise.unvoid();
796 if (stream.isResetSent()) {
797
798 return promise.setSuccess();
799 }
800
801
802
803
804
805 stream.resetSent();
806
807 final ChannelFuture future;
808
809
810 if (stream.state() == IDLE ||
811 connection().local().created(stream) && !stream.isHeadersSent() && !stream.isPushPromiseSent()) {
812 future = promise.setSuccess();
813 } else {
814 future = frameWriter().writeRstStream(ctx, stream.id(), errorCode, promise);
815 }
816 if (future.isDone()) {
817 processRstStreamWriteResult(ctx, stream, future);
818 } else {
819 future.addListener(new ChannelFutureListener() {
820 @Override
821 public void operationComplete(ChannelFuture future) throws Exception {
822 processRstStreamWriteResult(ctx, stream, future);
823 }
824 });
825 }
826
827 return future;
828 }
829
830 @Override
831 public ChannelFuture goAway(final ChannelHandlerContext ctx, final int lastStreamId, final long errorCode,
832 final ByteBuf debugData, ChannelPromise promise) {
833 promise = promise.unvoid();
834 final Http2Connection connection = connection();
835 try {
836 if (!connection.goAwaySent(lastStreamId, errorCode, debugData)) {
837 debugData.release();
838 promise.trySuccess();
839 return promise;
840 }
841 } catch (Throwable cause) {
842 debugData.release();
843 promise.tryFailure(cause);
844 return promise;
845 }
846
847
848
849 debugData.retain();
850 ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
851
852 if (future.isDone()) {
853 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
854 } else {
855 future.addListener(new ChannelFutureListener() {
856 @Override
857 public void operationComplete(ChannelFuture future) throws Exception {
858 processGoAwayWriteResult(ctx, lastStreamId, errorCode, debugData, future);
859 }
860 });
861 }
862
863 return future;
864 }
865
866
867
868
869
870 private void checkCloseConnection(ChannelFuture future) {
871
872
873 if (closeListener != null && isGracefulShutdownComplete()) {
874 ChannelFutureListener closeListener = this.closeListener;
875
876
877 this.closeListener = null;
878 try {
879 closeListener.operationComplete(future);
880 } catch (Exception e) {
881 throw new IllegalStateException("Close listener threw an unexpected exception", e);
882 }
883 }
884 }
885
886
887
888
889
890 private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
891 long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
892 int lastKnownStream;
893 if (cause != null && cause.shutdownHint() == Http2Exception.ShutdownHint.HARD_SHUTDOWN) {
894
895
896
897
898 lastKnownStream = Integer.MAX_VALUE;
899 } else {
900 lastKnownStream = connection().remote().lastStreamCreated();
901 }
902 return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
903 }
904
905 private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
906 if (future.isSuccess()) {
907 closeStream(stream, future);
908 } else {
909
910 onConnectionError(ctx, true, future.cause(), null);
911 }
912 }
913
914 private void closeConnectionOnError(ChannelHandlerContext ctx, ChannelFuture future) {
915 if (!future.isSuccess()) {
916 onConnectionError(ctx, true, future.cause(), null);
917 }
918 }
919
920 private void doCloseStream(final Http2Stream stream, ChannelFuture future) {
921 stream.close();
922 checkCloseConnection(future);
923 }
924
925
926
927
928 private static ByteBuf clientPrefaceString(Http2Connection connection) {
929 return connection.isServer() ? connectionPrefaceBuf() : null;
930 }
931
932 private static void processGoAwayWriteResult(final ChannelHandlerContext ctx, final int lastStreamId,
933 final long errorCode, final ByteBuf debugData, ChannelFuture future) {
934 try {
935 if (future.isSuccess()) {
936 if (errorCode != NO_ERROR.code()) {
937 if (logger.isDebugEnabled()) {
938 logger.debug("{} Sent GOAWAY: lastStreamId '{}', errorCode '{}', " +
939 "debugData '{}'. Forcing shutdown of the connection.",
940 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
941 }
942 ctx.close();
943 }
944 } else {
945 if (logger.isDebugEnabled()) {
946 logger.debug("{} Sending GOAWAY failed: lastStreamId '{}', errorCode '{}', " +
947 "debugData '{}'. Forcing shutdown of the connection.",
948 ctx.channel(), lastStreamId, errorCode, debugData.toString(UTF_8), future.cause());
949 }
950 ctx.close();
951 }
952 } finally {
953
954 debugData.release();
955 }
956 }
957
958
959
960
961 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
962 private final ChannelHandlerContext ctx;
963 private final ChannelPromise promise;
964 private final Future<?> timeoutTask;
965 private boolean closed;
966
967 ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
968 this.ctx = ctx;
969 this.promise = promise;
970 timeoutTask = null;
971 }
972
973 ClosingChannelFutureListener(final ChannelHandlerContext ctx, final ChannelPromise promise,
974 long timeout, TimeUnit unit) {
975 this.ctx = ctx;
976 this.promise = promise;
977 timeoutTask = ctx.executor().schedule(new Runnable() {
978 @Override
979 public void run() {
980 doClose();
981 }
982 }, timeout, unit);
983 }
984
985 @Override
986 public void operationComplete(ChannelFuture sentGoAwayFuture) {
987 if (timeoutTask != null) {
988 timeoutTask.cancel(false);
989 }
990 doClose();
991 }
992
993 private void doClose() {
994
995
996 if (closed) {
997
998 assert timeoutTask != null;
999 return;
1000 }
1001 closed = true;
1002 if (promise == null) {
1003 ctx.close();
1004 } else {
1005 ctx.close(promise);
1006 }
1007 }
1008 }
1009 }