1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelPromise;
21 import io.netty.handler.codec.http2.Http2Stream.State;
22 import io.netty.util.collection.IntObjectHashMap;
23 import io.netty.util.collection.IntObjectMap;
24 import io.netty.util.collection.IntObjectMap.PrimitiveEntry;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.concurrent.PromiseNotifier;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.UnstableApi;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 import java.util.ArrayDeque;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.Iterator;
37 import java.util.LinkedHashSet;
38 import java.util.List;
39 import java.util.Queue;
40 import java.util.Set;
41
42 import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
43 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_RESERVED_STREAMS;
44 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
45 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
46 import static io.netty.handler.codec.http2.Http2Error.REFUSED_STREAM;
47 import static io.netty.handler.codec.http2.Http2Exception.closedStreamError;
48 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
49 import static io.netty.handler.codec.http2.Http2Exception.streamError;
50 import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
51 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
52 import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
53 import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
54 import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
55 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
56 import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
57 import static io.netty.util.internal.ObjectUtil.checkNotNull;
58 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
59 import static java.lang.Integer.MAX_VALUE;
60
61
62
63
64 @UnstableApi
65 public class DefaultHttp2Connection implements Http2Connection {
66 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2Connection.class);
67
68 final IntObjectMap<Http2Stream> streamMap = new IntObjectHashMap<Http2Stream>();
69 final PropertyKeyRegistry propertyKeyRegistry = new PropertyKeyRegistry();
70 final ConnectionStream connectionStream = new ConnectionStream();
71 final DefaultEndpoint<Http2LocalFlowController> localEndpoint;
72 final DefaultEndpoint<Http2RemoteFlowController> remoteEndpoint;
73
74
75
76
77
78
79
80
81
82 final List<Listener> listeners = new ArrayList<Listener>(4);
83 final ActiveStreams activeStreams;
84 Promise<Void> closePromise;
85
86
87
88
89
90 public DefaultHttp2Connection(boolean server) {
91 this(server, DEFAULT_MAX_RESERVED_STREAMS);
92 }
93
94
95
96
97
98
99 public DefaultHttp2Connection(boolean server, int maxReservedStreams) {
100 activeStreams = new ActiveStreams(listeners);
101
102
103
104
105
106 localEndpoint = new DefaultEndpoint<Http2LocalFlowController>(server, server ? MAX_VALUE : maxReservedStreams);
107 remoteEndpoint = new DefaultEndpoint<Http2RemoteFlowController>(!server, maxReservedStreams);
108
109
110 streamMap.put(connectionStream.id(), connectionStream);
111 }
112
113
114
115
116 final boolean isClosed() {
117 return closePromise != null;
118 }
119
120 @Override
121 public Future<Void> close(final Promise<Void> promise) {
122 checkNotNull(promise, "promise");
123
124
125 if (closePromise != null) {
126 if (closePromise == promise) {
127
128 } else if (promise instanceof ChannelPromise && ((ChannelFuture) closePromise).isVoid()) {
129 closePromise = promise;
130 } else {
131 PromiseNotifier.cascade(closePromise, promise);
132 }
133 } else {
134 closePromise = promise;
135 }
136 if (isStreamMapEmpty()) {
137 promise.trySuccess(null);
138 return promise;
139 }
140
141 Iterator<PrimitiveEntry<Http2Stream>> itr = streamMap.entries().iterator();
142
143
144 if (activeStreams.allowModifications()) {
145 activeStreams.incrementPendingIterations();
146 try {
147 while (itr.hasNext()) {
148 DefaultStream stream = (DefaultStream) itr.next().value();
149 if (stream.id() != CONNECTION_STREAM_ID) {
150
151
152
153 stream.close(itr);
154 }
155 }
156 } finally {
157 activeStreams.decrementPendingIterations();
158 }
159 } else {
160 while (itr.hasNext()) {
161 Http2Stream stream = itr.next().value();
162 if (stream.id() != CONNECTION_STREAM_ID) {
163
164
165 stream.close();
166 }
167 }
168 }
169 return closePromise;
170 }
171
172 @Override
173 public void addListener(Listener listener) {
174 listeners.add(listener);
175 }
176
177 @Override
178 public void removeListener(Listener listener) {
179 listeners.remove(listener);
180 }
181
182 @Override
183 public boolean isServer() {
184 return localEndpoint.isServer();
185 }
186
187 @Override
188 public Http2Stream connectionStream() {
189 return connectionStream;
190 }
191
192 @Override
193 public Http2Stream stream(int streamId) {
194 return streamMap.get(streamId);
195 }
196
197 @Override
198 public boolean streamMayHaveExisted(int streamId) {
199 return remoteEndpoint.mayHaveCreatedStream(streamId) || localEndpoint.mayHaveCreatedStream(streamId);
200 }
201
202 @Override
203 public int numActiveStreams() {
204 return activeStreams.size();
205 }
206
207 @Override
208 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
209 return activeStreams.forEachActiveStream(visitor);
210 }
211
212 @Override
213 public Endpoint<Http2LocalFlowController> local() {
214 return localEndpoint;
215 }
216
217 @Override
218 public Endpoint<Http2RemoteFlowController> remote() {
219 return remoteEndpoint;
220 }
221
222 @Override
223 public boolean goAwayReceived() {
224 return localEndpoint.lastStreamKnownByPeer >= 0;
225 }
226
227 @Override
228 public void goAwayReceived(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
229 if (localEndpoint.lastStreamKnownByPeer() >= 0 && localEndpoint.lastStreamKnownByPeer() < lastKnownStream) {
230 throw connectionError(PROTOCOL_ERROR, "lastStreamId MUST NOT increase. Current value: %d new value: %d",
231 localEndpoint.lastStreamKnownByPeer(), lastKnownStream);
232 }
233
234 localEndpoint.lastStreamKnownByPeer(lastKnownStream);
235 for (int i = 0; i < listeners.size(); ++i) {
236 try {
237 listeners.get(i).onGoAwayReceived(lastKnownStream, errorCode, debugData);
238 } catch (Throwable cause) {
239 logger.error("Caught Throwable from listener onGoAwayReceived.", cause);
240 }
241 }
242
243 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, localEndpoint);
244 }
245
246 @Override
247 public boolean goAwaySent() {
248 return remoteEndpoint.lastStreamKnownByPeer >= 0;
249 }
250
251 @Override
252 public boolean goAwaySent(final int lastKnownStream, long errorCode, ByteBuf debugData) throws Http2Exception {
253 if (remoteEndpoint.lastStreamKnownByPeer() >= 0) {
254
255
256 if (lastKnownStream == remoteEndpoint.lastStreamKnownByPeer()) {
257 return false;
258 }
259 if (lastKnownStream > remoteEndpoint.lastStreamKnownByPeer()) {
260 throw connectionError(PROTOCOL_ERROR, "Last stream identifier must not increase between " +
261 "sending multiple GOAWAY frames (was '%d', is '%d').",
262 remoteEndpoint.lastStreamKnownByPeer(), lastKnownStream);
263 }
264 }
265
266 remoteEndpoint.lastStreamKnownByPeer(lastKnownStream);
267 for (int i = 0; i < listeners.size(); ++i) {
268 try {
269 listeners.get(i).onGoAwaySent(lastKnownStream, errorCode, debugData);
270 } catch (Throwable cause) {
271 logger.error("Caught Throwable from listener onGoAwaySent.", cause);
272 }
273 }
274
275 closeStreamsGreaterThanLastKnownStreamId(lastKnownStream, remoteEndpoint);
276 return true;
277 }
278
279 private void closeStreamsGreaterThanLastKnownStreamId(final int lastKnownStream,
280 final DefaultEndpoint<?> endpoint) throws Http2Exception {
281 forEachActiveStream(new Http2StreamVisitor() {
282 @Override
283 public boolean visit(Http2Stream stream) {
284 if (stream.id() > lastKnownStream && endpoint.isValidStreamId(stream.id())) {
285 stream.close();
286 }
287 return true;
288 }
289 });
290 }
291
292
293
294
295 private boolean isStreamMapEmpty() {
296 return streamMap.size() == 1;
297 }
298
299
300
301
302
303
304
305 void removeStream(DefaultStream stream, Iterator<?> itr) {
306 final boolean removed;
307 if (itr == null) {
308 removed = streamMap.remove(stream.id()) != null;
309 } else {
310 itr.remove();
311 removed = true;
312 }
313
314 if (removed) {
315 for (int i = 0; i < listeners.size(); i++) {
316 try {
317 listeners.get(i).onStreamRemoved(stream);
318 } catch (Throwable cause) {
319 logger.error("Caught Throwable from listener onStreamRemoved.", cause);
320 }
321 }
322
323 if (closePromise != null && isStreamMapEmpty()) {
324 closePromise.trySuccess(null);
325 }
326 }
327 }
328
329 static State activeState(int streamId, State initialState, boolean isLocal, boolean halfClosed)
330 throws Http2Exception {
331 switch (initialState) {
332 case IDLE:
333 return halfClosed ? isLocal ? HALF_CLOSED_LOCAL : HALF_CLOSED_REMOTE : OPEN;
334 case RESERVED_LOCAL:
335 return HALF_CLOSED_REMOTE;
336 case RESERVED_REMOTE:
337 return HALF_CLOSED_LOCAL;
338 default:
339 throw streamError(streamId, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: "
340 + initialState);
341 }
342 }
343
344 void notifyHalfClosed(Http2Stream stream) {
345 for (int i = 0; i < listeners.size(); i++) {
346 try {
347 listeners.get(i).onStreamHalfClosed(stream);
348 } catch (Throwable cause) {
349 logger.error("Caught Throwable from listener onStreamHalfClosed.", cause);
350 }
351 }
352 }
353
354 void notifyClosed(Http2Stream stream) {
355 for (int i = 0; i < listeners.size(); i++) {
356 try {
357 listeners.get(i).onStreamClosed(stream);
358 } catch (Throwable cause) {
359 logger.error("Caught Throwable from listener onStreamClosed.", cause);
360 }
361 }
362 }
363
364 @Override
365 public PropertyKey newKey() {
366 return propertyKeyRegistry.newKey();
367 }
368
369
370
371
372
373
374
375
376 final DefaultPropertyKey verifyKey(PropertyKey key) {
377 return checkNotNull((DefaultPropertyKey) key, "key").verifyConnection(this);
378 }
379
380
381
382
383 private class DefaultStream implements Http2Stream {
384 private static final byte META_STATE_SENT_RST = 1;
385 private static final byte META_STATE_SENT_HEADERS = 1 << 1;
386 private static final byte META_STATE_SENT_TRAILERS = 1 << 2;
387 private static final byte META_STATE_SENT_PUSHPROMISE = 1 << 3;
388 private static final byte META_STATE_RECV_HEADERS = 1 << 4;
389 private static final byte META_STATE_RECV_TRAILERS = 1 << 5;
390 private final int id;
391 private final long identity;
392 private final PropertyMap properties = new PropertyMap();
393 private State state;
394 private byte metaState;
395
396 DefaultStream(long identity, int id, State state) {
397 this.identity = identity;
398 this.id = id;
399 this.state = state;
400 }
401
402 @Override
403 public final int id() {
404 return id;
405 }
406
407 @Override
408 public final State state() {
409 return state;
410 }
411
412 @Override
413 public boolean isResetSent() {
414 return (metaState & META_STATE_SENT_RST) != 0;
415 }
416
417 @Override
418 public Http2Stream resetSent() {
419 metaState |= META_STATE_SENT_RST;
420 return this;
421 }
422
423 @Override
424 public Http2Stream headersSent(boolean isInformational) {
425 if (!isInformational) {
426 metaState |= isHeadersSent() ? META_STATE_SENT_TRAILERS : META_STATE_SENT_HEADERS;
427 }
428 return this;
429 }
430
431 @Override
432 public boolean isHeadersSent() {
433 return (metaState & META_STATE_SENT_HEADERS) != 0;
434 }
435
436 @Override
437 public boolean isTrailersSent() {
438 return (metaState & META_STATE_SENT_TRAILERS) != 0;
439 }
440
441 @Override
442 public Http2Stream headersReceived(boolean isInformational) {
443 if (!isInformational) {
444 metaState |= isHeadersReceived() ? META_STATE_RECV_TRAILERS : META_STATE_RECV_HEADERS;
445 }
446 return this;
447 }
448
449 @Override
450 public boolean isHeadersReceived() {
451 return (metaState & META_STATE_RECV_HEADERS) != 0;
452 }
453
454 @Override
455 public boolean isTrailersReceived() {
456 return (metaState & META_STATE_RECV_TRAILERS) != 0;
457 }
458
459 @Override
460 public Http2Stream pushPromiseSent() {
461 metaState |= META_STATE_SENT_PUSHPROMISE;
462 return this;
463 }
464
465 @Override
466 public boolean isPushPromiseSent() {
467 return (metaState & META_STATE_SENT_PUSHPROMISE) != 0;
468 }
469
470 @Override
471 public final <V> V setProperty(PropertyKey key, V value) {
472 return properties.add(verifyKey(key), value);
473 }
474
475 @Override
476 public final <V> V getProperty(PropertyKey key) {
477 return properties.get(verifyKey(key));
478 }
479
480 @Override
481 public final <V> V removeProperty(PropertyKey key) {
482 return properties.remove(verifyKey(key));
483 }
484
485 @Override
486 public Http2Stream open(boolean halfClosed) throws Http2Exception {
487 state = activeState(id, state, isLocal(), halfClosed);
488 final DefaultEndpoint<? extends Http2FlowController> endpoint = createdBy();
489 if (!endpoint.canOpenStream()) {
490 throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint: " +
491 endpoint.maxActiveStreams());
492 }
493
494 activate();
495 return this;
496 }
497
498 void activate() {
499
500
501 if (state == HALF_CLOSED_LOCAL) {
502 headersSent( false);
503 } else if (state == HALF_CLOSED_REMOTE) {
504 headersReceived( false);
505 }
506 activeStreams.activate(this);
507 }
508
509 Http2Stream close(Iterator<?> itr) {
510 if (state == CLOSED) {
511 return this;
512 }
513
514 state = CLOSED;
515
516 --createdBy().numStreams;
517 activeStreams.deactivate(this, itr);
518 return this;
519 }
520
521 @Override
522 public Http2Stream close() {
523 return close(null);
524 }
525
526 @Override
527 public Http2Stream closeLocalSide() {
528 switch (state) {
529 case OPEN:
530 state = HALF_CLOSED_LOCAL;
531 notifyHalfClosed(this);
532 break;
533 case HALF_CLOSED_LOCAL:
534 break;
535 default:
536 close();
537 break;
538 }
539 return this;
540 }
541
542 @Override
543 public Http2Stream closeRemoteSide() {
544 switch (state) {
545 case OPEN:
546 state = HALF_CLOSED_REMOTE;
547 notifyHalfClosed(this);
548 break;
549 case HALF_CLOSED_REMOTE:
550 break;
551 default:
552 close();
553 break;
554 }
555 return this;
556 }
557
558 DefaultEndpoint<? extends Http2FlowController> createdBy() {
559 return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
560 }
561
562 final boolean isLocal() {
563 return localEndpoint.isValidStreamId(id);
564 }
565
566
567
568
569 private class PropertyMap {
570 Object[] values = EmptyArrays.EMPTY_OBJECTS;
571
572 <V> V add(DefaultPropertyKey key, V value) {
573 resizeIfNecessary(key.index);
574 @SuppressWarnings("unchecked")
575 V prevValue = (V) values[key.index];
576 values[key.index] = value;
577 return prevValue;
578 }
579
580 @SuppressWarnings("unchecked")
581 <V> V get(DefaultPropertyKey key) {
582 if (key.index >= values.length) {
583 return null;
584 }
585 return (V) values[key.index];
586 }
587
588 @SuppressWarnings("unchecked")
589 <V> V remove(DefaultPropertyKey key) {
590 V prevValue = null;
591 if (key.index < values.length) {
592 prevValue = (V) values[key.index];
593 values[key.index] = null;
594 }
595 return prevValue;
596 }
597
598 void resizeIfNecessary(int index) {
599 if (index >= values.length) {
600 values = Arrays.copyOf(values, propertyKeyRegistry.size());
601 }
602 }
603 }
604
605 @Override
606 public boolean equals(final Object obj) {
607 return super.equals(obj);
608 }
609
610 @Override
611 public int hashCode() {
612 long value = identity;
613 if (value == 0) {
614 return System.identityHashCode(this);
615 }
616 return (int) (value ^ (value >>> 32));
617 }
618 }
619
620
621
622
623 private final class ConnectionStream extends DefaultStream {
624 ConnectionStream() {
625 super(0, CONNECTION_STREAM_ID, IDLE);
626 }
627
628 @Override
629 public boolean isResetSent() {
630 return false;
631 }
632
633 @Override
634 DefaultEndpoint<? extends Http2FlowController> createdBy() {
635 return null;
636 }
637
638 @Override
639 public Http2Stream resetSent() {
640 throw new UnsupportedOperationException();
641 }
642
643 @Override
644 public Http2Stream open(boolean halfClosed) {
645 throw new UnsupportedOperationException();
646 }
647
648 @Override
649 public Http2Stream close() {
650 throw new UnsupportedOperationException();
651 }
652
653 @Override
654 public Http2Stream closeLocalSide() {
655 throw new UnsupportedOperationException();
656 }
657
658 @Override
659 public Http2Stream closeRemoteSide() {
660 throw new UnsupportedOperationException();
661 }
662
663 @Override
664 public Http2Stream headersSent(boolean isInformational) {
665 throw new UnsupportedOperationException();
666 }
667
668 @Override
669 public boolean isHeadersSent() {
670 throw new UnsupportedOperationException();
671 }
672
673 @Override
674 public Http2Stream pushPromiseSent() {
675 throw new UnsupportedOperationException();
676 }
677
678 @Override
679 public boolean isPushPromiseSent() {
680 throw new UnsupportedOperationException();
681 }
682 }
683
684
685
686
687 private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
688 private final boolean server;
689
690
691
692 private long lastCreatedStreamIdentity;
693
694
695
696
697
698 private int nextStreamIdToCreate;
699
700
701
702
703
704
705 private int nextReservationStreamId;
706 private int lastStreamKnownByPeer = -1;
707 private boolean pushToAllowed;
708 private F flowController;
709 private int maxStreams;
710 private int maxActiveStreams;
711 private final int maxReservedStreams;
712
713 int numActiveStreams;
714 int numStreams;
715
716 DefaultEndpoint(boolean server, int maxReservedStreams) {
717 this.lastCreatedStreamIdentity = 0;
718 this.server = server;
719
720
721
722
723
724 if (server) {
725 nextStreamIdToCreate = 2;
726 nextReservationStreamId = 0;
727 } else {
728 nextStreamIdToCreate = 1;
729
730 nextReservationStreamId = 1;
731 }
732
733
734 pushToAllowed = !server;
735 maxActiveStreams = MAX_VALUE;
736 this.maxReservedStreams = checkPositiveOrZero(maxReservedStreams, "maxReservedStreams");
737 updateMaxStreams();
738 }
739
740 @Override
741 public int incrementAndGetNextStreamId() {
742 return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
743 }
744
745 private void incrementExpectedStreamId(int streamId) {
746 if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
747 nextReservationStreamId = streamId;
748 }
749 nextStreamIdToCreate = streamId + 2;
750 ++numStreams;
751 }
752
753 @Override
754 public boolean isValidStreamId(int streamId) {
755 return streamId > 0 && server == ((streamId & 1) == 0);
756 }
757
758 @Override
759 public boolean mayHaveCreatedStream(int streamId) {
760 return isValidStreamId(streamId) && streamId <= lastStreamCreated();
761 }
762
763 @Override
764 public boolean canOpenStream() {
765 return numActiveStreams < maxActiveStreams;
766 }
767
768 @Override
769 public DefaultStream createStream(int streamId, boolean halfClosed) throws Http2Exception {
770 State state = activeState(streamId, IDLE, isLocal(), halfClosed);
771
772 checkNewStreamAllowed(streamId, state);
773
774 lastCreatedStreamIdentity++;
775
776
777 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
778
779 incrementExpectedStreamId(streamId);
780
781 addStream(stream);
782
783 stream.activate();
784 return stream;
785 }
786
787 @Override
788 public boolean created(Http2Stream stream) {
789 return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
790 }
791
792 @Override
793 public boolean isServer() {
794 return server;
795 }
796
797 @Override
798 public DefaultStream reservePushStream(int streamId, Http2Stream parent) throws Http2Exception {
799 if (parent == null) {
800 throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
801 }
802 if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
803 throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
804 }
805 if (!opposite().allowPushTo()) {
806 throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint");
807 }
808 State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
809 checkNewStreamAllowed(streamId, state);
810
811 lastCreatedStreamIdentity++;
812
813
814 DefaultStream stream = new DefaultStream(lastCreatedStreamIdentity, streamId, state);
815
816 incrementExpectedStreamId(streamId);
817
818
819 addStream(stream);
820 return stream;
821 }
822
823 private void addStream(DefaultStream stream) {
824
825 streamMap.put(stream.id(), stream);
826
827
828 for (int i = 0; i < listeners.size(); i++) {
829 try {
830 listeners.get(i).onStreamAdded(stream);
831 } catch (Throwable cause) {
832 logger.error("Caught Throwable from listener onStreamAdded.", cause);
833 }
834 }
835 }
836
837 @Override
838 public void allowPushTo(boolean allow) {
839 if (allow && server) {
840 throw new IllegalArgumentException("Servers do not allow push");
841 }
842 pushToAllowed = allow;
843 }
844
845 @Override
846 public boolean allowPushTo() {
847 return pushToAllowed;
848 }
849
850 @Override
851 public int numActiveStreams() {
852 return numActiveStreams;
853 }
854
855 @Override
856 public int maxActiveStreams() {
857 return maxActiveStreams;
858 }
859
860 @Override
861 public void maxActiveStreams(int maxActiveStreams) {
862 this.maxActiveStreams = maxActiveStreams;
863 updateMaxStreams();
864 }
865
866 @Override
867 public int lastStreamCreated() {
868
869
870
871
872 return Math.max(0, nextStreamIdToCreate - 2);
873 }
874
875 @Override
876 public int lastStreamKnownByPeer() {
877 return lastStreamKnownByPeer;
878 }
879
880 private void lastStreamKnownByPeer(int lastKnownStream) {
881 lastStreamKnownByPeer = lastKnownStream;
882 }
883
884 @Override
885 public F flowController() {
886 return flowController;
887 }
888
889 @Override
890 public void flowController(F flowController) {
891 this.flowController = checkNotNull(flowController, "flowController");
892 }
893
894 @Override
895 public Endpoint<? extends Http2FlowController> opposite() {
896 return isLocal() ? remoteEndpoint : localEndpoint;
897 }
898
899 private void updateMaxStreams() {
900 maxStreams = (int) Math.min(MAX_VALUE, (long) maxActiveStreams + maxReservedStreams);
901 }
902
903 private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
904 assert state != IDLE;
905 if (lastStreamKnownByPeer >= 0 && streamId > lastStreamKnownByPeer) {
906 throw streamError(streamId, REFUSED_STREAM,
907 "Cannot create stream %d greater than Last-Stream-ID %d from GOAWAY.",
908 streamId, lastStreamKnownByPeer);
909 }
910 if (!isValidStreamId(streamId)) {
911 if (streamId < 0) {
912 throw new Http2NoMoreStreamIdsException();
913 }
914 throw connectionError(PROTOCOL_ERROR, "Request stream %d is not correct for %s connection", streamId,
915 server ? "server" : "client");
916 }
917
918
919 if (streamId < nextStreamIdToCreate) {
920 throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
921 streamId, nextStreamIdToCreate);
922 }
923 if (nextStreamIdToCreate <= 0) {
924
925
926 throw new Http2Exception(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.",
927 Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN);
928 }
929 boolean isReserved = state == RESERVED_LOCAL || state == RESERVED_REMOTE;
930 if (!isReserved && !canOpenStream() || isReserved && numStreams >= maxStreams) {
931 throw streamError(streamId, REFUSED_STREAM, "Maximum active streams violated for this endpoint: " +
932 (isReserved ? maxStreams : maxActiveStreams));
933 }
934 if (isClosed()) {
935 throw connectionError(INTERNAL_ERROR, "Attempted to create stream id %d after connection was closed",
936 streamId);
937 }
938 }
939
940 private boolean isLocal() {
941 return this == localEndpoint;
942 }
943 }
944
945
946
947
948
949 interface Event {
950
951
952
953
954
955
956 void process();
957 }
958
959
960
961
962
963 private final class ActiveStreams {
964 private final List<Listener> listeners;
965 private final Queue<Event> pendingEvents = new ArrayDeque<Event>(4);
966 private final Set<Http2Stream> streams = new LinkedHashSet<Http2Stream>();
967 private int pendingIterations;
968
969 ActiveStreams(List<Listener> listeners) {
970 this.listeners = listeners;
971 }
972
973 public int size() {
974 return streams.size();
975 }
976
977 public void activate(final DefaultStream stream) {
978 if (allowModifications()) {
979 addToActiveStreams(stream);
980 } else {
981 pendingEvents.add(new Event() {
982 @Override
983 public void process() {
984 addToActiveStreams(stream);
985 }
986 });
987 }
988 }
989
990 public void deactivate(final DefaultStream stream, final Iterator<?> itr) {
991 if (allowModifications() || itr != null) {
992 removeFromActiveStreams(stream, itr);
993 } else {
994 pendingEvents.add(new Event() {
995 @Override
996 public void process() {
997 removeFromActiveStreams(stream, itr);
998 }
999 });
1000 }
1001 }
1002
1003 public Http2Stream forEachActiveStream(Http2StreamVisitor visitor) throws Http2Exception {
1004 incrementPendingIterations();
1005 try {
1006 for (Http2Stream stream : streams) {
1007 if (!visitor.visit(stream)) {
1008 return stream;
1009 }
1010 }
1011 return null;
1012 } finally {
1013 decrementPendingIterations();
1014 }
1015 }
1016
1017 void addToActiveStreams(DefaultStream stream) {
1018 if (streams.add(stream)) {
1019
1020 stream.createdBy().numActiveStreams++;
1021
1022 for (int i = 0; i < listeners.size(); i++) {
1023 try {
1024 listeners.get(i).onStreamActive(stream);
1025 } catch (Throwable cause) {
1026 logger.error("Caught Throwable from listener onStreamActive.", cause);
1027 }
1028 }
1029 }
1030 }
1031
1032 void removeFromActiveStreams(DefaultStream stream, Iterator<?> itr) {
1033 if (streams.remove(stream)) {
1034
1035 stream.createdBy().numActiveStreams--;
1036 notifyClosed(stream);
1037 }
1038 removeStream(stream, itr);
1039 }
1040
1041 boolean allowModifications() {
1042 return pendingIterations == 0;
1043 }
1044
1045 void incrementPendingIterations() {
1046 ++pendingIterations;
1047 }
1048
1049 void decrementPendingIterations() {
1050 --pendingIterations;
1051 if (allowModifications()) {
1052 for (;;) {
1053 Event event = pendingEvents.poll();
1054 if (event == null) {
1055 break;
1056 }
1057 try {
1058 event.process();
1059 } catch (Throwable cause) {
1060 logger.error("Caught Throwable while processing pending ActiveStreams$Event.", cause);
1061 }
1062 }
1063 }
1064 }
1065 }
1066
1067
1068
1069
1070 final class DefaultPropertyKey implements PropertyKey {
1071 final int index;
1072
1073 DefaultPropertyKey(int index) {
1074 this.index = index;
1075 }
1076
1077 DefaultPropertyKey verifyConnection(Http2Connection connection) {
1078 if (connection != DefaultHttp2Connection.this) {
1079 throw new IllegalArgumentException("Using a key that was not created by this connection");
1080 }
1081 return this;
1082 }
1083 }
1084
1085
1086
1087
1088 private final class PropertyKeyRegistry {
1089
1090
1091
1092
1093
1094 final List<DefaultPropertyKey> keys = new ArrayList<DefaultPropertyKey>(4);
1095
1096
1097
1098
1099 DefaultPropertyKey newKey() {
1100 DefaultPropertyKey key = new DefaultPropertyKey(keys.size());
1101 keys.add(key);
1102 return key;
1103 }
1104
1105 int size() {
1106 return keys.size();
1107 }
1108 }
1109 }