1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.ServerChannel;
28 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29 import io.netty.channel.socket.ChannelOutputShutdownEvent;
30 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31 import io.netty.handler.ssl.SslCloseCompletionEvent;
32 import io.netty.util.ReferenceCounted;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.UnstableApi;
35
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38 import javax.net.ssl.SSLException;
39
40 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
41 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
42 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @UnstableApi
102 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
103
104 static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
105 @Override
106 public void operationComplete(ChannelFuture future) {
107 registerDone(future);
108 }
109 };
110
111 private final ChannelHandler inboundStreamHandler;
112 private final ChannelHandler upgradeStreamHandler;
113 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
114 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
115
116 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
117
118 private boolean parentReadInProgress;
119 private int idCount;
120
121
122 private volatile ChannelHandlerContext ctx;
123
124
125
126
127
128
129
130 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
131 this(inboundStreamHandler, null);
132 }
133
134
135
136
137
138
139
140
141
142 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
143 this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
144 this.upgradeStreamHandler = upgradeStreamHandler;
145 }
146
147 static void registerDone(ChannelFuture future) {
148
149
150
151 if (!future.isSuccess()) {
152 Channel childChannel = future.channel();
153 if (childChannel.isRegistered()) {
154 childChannel.close();
155 } else {
156 childChannel.unsafe().closeForcibly();
157 }
158 }
159 }
160
161 @Override
162 protected void handlerAdded0(ChannelHandlerContext ctx) {
163 if (ctx.executor() != ctx.channel().eventLoop()) {
164 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
165 }
166 this.ctx = ctx;
167 }
168
169 @Override
170 protected void handlerRemoved0(ChannelHandlerContext ctx) {
171 readCompletePendingQueue.clear();
172 }
173
174 @Override
175 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
176 parentReadInProgress = true;
177 if (msg instanceof Http2StreamFrame) {
178 if (msg instanceof Http2WindowUpdateFrame) {
179
180 return;
181 }
182 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
183 DefaultHttp2FrameStream s =
184 (DefaultHttp2FrameStream) streamFrame.stream();
185
186 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
187 if (msg instanceof Http2ResetFrame) {
188
189
190 channel.pipeline().fireUserEventTriggered(msg);
191
192
193
194 } else {
195 channel.fireChildRead(streamFrame);
196 }
197 return;
198 }
199
200 if (msg instanceof Http2GoAwayFrame) {
201
202
203 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
204 }
205
206
207 ctx.fireChannelRead(msg);
208 }
209
210 @Override
211 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
212 if (ctx.channel().isWritable()) {
213
214
215 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
216 }
217
218 ctx.fireChannelWritabilityChanged();
219 }
220
221 @Override
222 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
223 if (evt instanceof Http2FrameStreamEvent) {
224 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
225 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
226 if (event.type() == Http2FrameStreamEvent.Type.State) {
227 switch (stream.state()) {
228 case HALF_CLOSED_LOCAL:
229 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
230
231 break;
232 }
233
234 case HALF_CLOSED_REMOTE:
235
236 case OPEN:
237 if (stream.attachment != null) {
238
239 break;
240 }
241 final AbstractHttp2StreamChannel ch;
242
243 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
244
245 if (upgradeStreamHandler == null) {
246 throw connectionError(INTERNAL_ERROR,
247 "Client is misconfigured for upgrade requests");
248 }
249 ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
250 ch.closeOutbound();
251 } else {
252 ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
253 }
254 ChannelFuture future = ctx.channel().eventLoop().register(ch);
255 if (future.isDone()) {
256 registerDone(future);
257 } else {
258 future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
259 }
260 break;
261 case CLOSED:
262 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
263 if (channel != null) {
264 channel.streamClosed();
265 }
266 break;
267 default:
268
269 break;
270 }
271 }
272 return;
273 }
274 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
275 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
276 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
277 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
278 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
279 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
280 }
281 ctx.fireUserEventTriggered(evt);
282 }
283
284
285 Http2StreamChannel newOutboundStream() {
286 return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
287 }
288
289 @Override
290 public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
291 if (cause instanceof Http2FrameStreamException) {
292 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
293 Http2FrameStream stream = exception.stream();
294 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
295 ((DefaultHttp2FrameStream) stream).attachment;
296 try {
297 childChannel.pipeline().fireExceptionCaught(cause.getCause());
298 } finally {
299
300
301 childChannel.closeWithError(exception.error());
302 }
303 return;
304 }
305 if (cause instanceof Http2MultiplexActiveStreamsException) {
306
307 fireExceptionCaughtForActiveStream(cause.getCause());
308 return;
309 }
310
311 if (cause.getCause() instanceof SSLException) {
312 fireExceptionCaughtForActiveStream(cause);
313 }
314 ctx.fireExceptionCaught(cause);
315 }
316
317 private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
318 forEachActiveStream(new Http2FrameStreamVisitor() {
319 @Override
320 public boolean visit(Http2FrameStream stream) {
321 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
322 ((DefaultHttp2FrameStream) stream).attachment;
323 childChannel.pipeline().fireExceptionCaught(cause);
324 return true;
325 }
326 });
327 }
328
329 private static boolean isServer(ChannelHandlerContext ctx) {
330 return ctx.channel().parent() instanceof ServerChannel;
331 }
332
333 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
334 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
335
336 return;
337 }
338
339 try {
340 final boolean server = isServer(ctx);
341 forEachActiveStream(new Http2FrameStreamVisitor() {
342 @Override
343 public boolean visit(Http2FrameStream stream) {
344 final int streamId = stream.id();
345 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
346 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
347 ((DefaultHttp2FrameStream) stream).attachment;
348 childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
349 }
350 return true;
351 }
352 });
353 } catch (Http2Exception e) {
354 ctx.fireExceptionCaught(e);
355 ctx.close();
356 }
357 }
358
359
360
361
362 @Override
363 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
364 processPendingReadCompleteQueue();
365 ctx.fireChannelReadComplete();
366 }
367
368 private void processPendingReadCompleteQueue() {
369 parentReadInProgress = true;
370
371
372
373 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
374 if (childChannel != null) {
375 try {
376 do {
377 childChannel.fireChildReadComplete();
378 childChannel = readCompletePendingQueue.poll();
379 } while (childChannel != null);
380 } finally {
381 parentReadInProgress = false;
382 readCompletePendingQueue.clear();
383 ctx.flush();
384 }
385 } else {
386 parentReadInProgress = false;
387 }
388 }
389
390 private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
391
392 Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
393 super(stream, ++idCount, inboundHandler);
394 }
395
396 @Override
397 protected boolean isParentReadInProgress() {
398 return parentReadInProgress;
399 }
400
401 @Override
402 protected void addChannelToReadCompletePendingQueue() {
403
404
405 while (!readCompletePendingQueue.offer(this)) {
406 processPendingReadCompleteQueue();
407 }
408 }
409
410 @Override
411 protected ChannelHandlerContext parentContext() {
412 return ctx;
413 }
414 }
415 }