1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.EventLoop;
26 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
27 import io.netty.channel.socket.ChannelOutputShutdownEvent;
28 import io.netty.handler.ssl.SslCloseCompletionEvent;
29 import io.netty.util.ReferenceCounted;
30 import io.netty.util.internal.UnstableApi;
31
32 import java.util.ArrayDeque;
33 import java.util.Queue;
34
35 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
36 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
37 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
38 import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
39 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
40 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 @Deprecated
90 @UnstableApi
91 public class Http2MultiplexCodec extends Http2FrameCodec {
92
93 private final ChannelHandler inboundStreamHandler;
94 private final ChannelHandler upgradeStreamHandler;
95 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
96 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
97
98 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
99
100 private boolean parentReadInProgress;
101 private int idCount;
102
103
104 volatile ChannelHandlerContext ctx;
105
106 Http2MultiplexCodec(Http2ConnectionEncoder encoder,
107 Http2ConnectionDecoder decoder,
108 Http2Settings initialSettings,
109 ChannelHandler inboundStreamHandler,
110 ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway, boolean flushPreface) {
111 super(encoder, decoder, initialSettings, decoupleCloseAndGoAway, flushPreface);
112 this.inboundStreamHandler = inboundStreamHandler;
113 this.upgradeStreamHandler = upgradeStreamHandler;
114 }
115
116 @Override
117 public void onHttpClientUpgrade() throws Http2Exception {
118
119 if (upgradeStreamHandler == null) {
120 throw connectionError(INTERNAL_ERROR, "Client is misconfigured for upgrade requests");
121 }
122
123 super.onHttpClientUpgrade();
124 }
125
126 @Override
127 public final void handlerAdded0(ChannelHandlerContext ctx) throws Exception {
128 if (ctx.executor() != ctx.channel().eventLoop()) {
129 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
130 }
131 this.ctx = ctx;
132 }
133
134 @Override
135 public final void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
136 super.handlerRemoved0(ctx);
137
138 readCompletePendingQueue.clear();
139 }
140
141 @Override
142 final void onHttp2Frame(ChannelHandlerContext ctx, Http2Frame frame) {
143 if (frame instanceof Http2StreamFrame) {
144 Http2StreamFrame streamFrame = (Http2StreamFrame) frame;
145 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
146 ((DefaultHttp2FrameStream) streamFrame.stream()).attachment;
147 channel.fireChildRead(streamFrame);
148 return;
149 }
150 if (frame instanceof Http2GoAwayFrame) {
151 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) frame);
152 }
153
154 ctx.fireChannelRead(frame);
155 }
156
157 @Override
158 final void onHttp2StreamStateChanged(ChannelHandlerContext ctx, DefaultHttp2FrameStream stream) {
159 switch (stream.state()) {
160 case HALF_CLOSED_LOCAL:
161 if (stream.id() != HTTP_UPGRADE_STREAM_ID) {
162
163 break;
164 }
165
166 case HALF_CLOSED_REMOTE:
167
168 case OPEN:
169 if (stream.attachment != null) {
170
171 break;
172 }
173 final Http2MultiplexCodecStreamChannel streamChannel;
174
175 if (stream.id() == HTTP_UPGRADE_STREAM_ID && !connection().isServer()) {
176
177
178 assert upgradeStreamHandler != null;
179 streamChannel = new Http2MultiplexCodecStreamChannel(stream, upgradeStreamHandler);
180 streamChannel.closeOutbound();
181 } else {
182 streamChannel = new Http2MultiplexCodecStreamChannel(stream, inboundStreamHandler);
183 }
184 ChannelFuture future = ctx.channel().eventLoop().register(streamChannel);
185 if (future.isDone()) {
186 Http2MultiplexHandler.registerDone(future);
187 } else {
188 future.addListener(Http2MultiplexHandler.CHILD_CHANNEL_REGISTRATION_LISTENER);
189 }
190 break;
191 case CLOSED:
192 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
193 if (channel != null) {
194 channel.streamClosed();
195 }
196 break;
197 default:
198
199 break;
200 }
201 }
202
203
204 final Http2StreamChannel newOutboundStream() {
205 return new Http2MultiplexCodecStreamChannel(newStream(), null);
206 }
207
208 @Override
209 final void onHttp2FrameStreamException(ChannelHandlerContext ctx, Http2FrameStreamException cause) {
210 Http2FrameStream stream = cause.stream();
211 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) ((DefaultHttp2FrameStream) stream).attachment;
212
213 try {
214 channel.pipeline().fireExceptionCaught(cause.getCause());
215 } finally {
216
217
218 channel.closeWithError(cause.error());
219 }
220 }
221
222 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
223 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
224
225 return;
226 }
227
228 try {
229 forEachActiveStream(new Http2FrameStreamVisitor() {
230 @Override
231 public boolean visit(Http2FrameStream stream) {
232 final int streamId = stream.id();
233 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel)
234 ((DefaultHttp2FrameStream) stream).attachment;
235 if (streamId > goAwayFrame.lastStreamId() && connection().local().isValidStreamId(streamId)) {
236 channel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
237 }
238 return true;
239 }
240 });
241 } catch (Http2Exception e) {
242 ctx.fireExceptionCaught(e);
243 ctx.close();
244 }
245 }
246
247
248
249
250 @Override
251 public final void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
252 processPendingReadCompleteQueue();
253 channelReadComplete0(ctx);
254 }
255
256 private void processPendingReadCompleteQueue() {
257 parentReadInProgress = true;
258 try {
259
260
261
262 for (;;) {
263 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
264 if (childChannel == null) {
265 break;
266 }
267 childChannel.fireChildReadComplete();
268 }
269 } finally {
270 parentReadInProgress = false;
271 readCompletePendingQueue.clear();
272
273 flush0(ctx);
274 }
275 }
276 @Override
277 public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
278 parentReadInProgress = true;
279 super.channelRead(ctx, msg);
280 }
281
282 @Override
283 public final void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
284 if (ctx.channel().isWritable()) {
285
286
287 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
288 }
289
290 super.channelWritabilityChanged(ctx);
291 }
292
293 @Override
294 final void onUserEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
295 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
296 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
297 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
298 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
299 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
300 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
301 }
302 super.onUserEventTriggered(ctx, evt);
303 }
304
305 final void flush0(ChannelHandlerContext ctx) {
306 flush(ctx);
307 }
308
309 private final class Http2MultiplexCodecStreamChannel extends AbstractHttp2StreamChannel {
310
311 Http2MultiplexCodecStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
312 super(stream, ++idCount, inboundHandler);
313 }
314
315 @Override
316 protected boolean isParentReadInProgress() {
317 return parentReadInProgress;
318 }
319
320 @Override
321 protected void addChannelToReadCompletePendingQueue() {
322
323
324 while (!readCompletePendingQueue.offer(this)) {
325 processPendingReadCompleteQueue();
326 }
327 }
328
329 @Override
330 protected ChannelHandlerContext parentContext() {
331 return ctx;
332 }
333
334 @Override
335 protected ChannelFuture write0(ChannelHandlerContext ctx, Object msg) {
336 ChannelPromise promise = ctx.newPromise();
337 Http2MultiplexCodec.this.write(ctx, msg, promise);
338 return promise;
339 }
340
341 @Override
342 protected void flush0(ChannelHandlerContext ctx) {
343 Http2MultiplexCodec.this.flush0(ctx);
344 }
345 }
346 }