1 /*
2 * Copyright 2019 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec.http2;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.ServerChannel;
28 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
29 import io.netty.channel.socket.ChannelOutputShutdownEvent;
30 import io.netty.handler.codec.http2.Http2FrameCodec.DefaultHttp2FrameStream;
31 import io.netty.handler.ssl.SslCloseCompletionEvent;
32 import io.netty.util.ReferenceCounted;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.UnstableApi;
35
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38 import javax.net.ssl.SSLException;
39
40 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR;
41 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR;
42 import static io.netty.handler.codec.http2.AbstractHttp2StreamChannel.SSL_CLOSE_COMPLETION_EVENT_VISITOR;
43 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
44 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
45
46 /**
47 * An HTTP/2 handler that creates child channels for each stream. This handler must be used in combination
48 * with {@link Http2FrameCodec}.
49 *
50 * <p>When a new stream is created, a new {@link Http2StreamChannel} is created for it. Applications send and
51 * receive {@link Http2StreamFrame}s on the created channel. {@link ByteBuf}s cannot be processed by the channel;
52 * all writes that reach the head of the pipeline must be an instance of {@link Http2StreamFrame}. Writes that reach
53 * the head of the pipeline are processed directly by this handler and cannot be intercepted.
54 *
55 * <p>The child channel will be notified of user events that impact the stream, such as {@link
56 * Http2GoAwayFrame} and {@link Http2ResetFrame}, as soon as they occur. Although {@code
57 * Http2GoAwayFrame} and {@code Http2ResetFrame} signify that the remote is ignoring further
58 * communication, closing of the channel is delayed until any inbound queue is drained with {@link
59 * Channel#read()}, which follows the default behavior of channels in Netty. Applications are
60 * free to close the channel in response to such events if they don't have use for any queued
61 * messages. Any connection level events like {@link Http2SettingsFrame} and {@link Http2GoAwayFrame}
62 * will be processed internally and also propagated down the pipeline for other handlers to act on.
63 *
64 * <p>Outbound streams are supported via the {@link Http2StreamChannelBootstrap}.
65 *
66 * <p>{@link ChannelConfig#setMaxMessagesPerRead(int)} and {@link ChannelConfig#setAutoRead(boolean)} are supported.
67 *
68 * <h3>Reference Counting</h3>
69 *
70 * Some {@link Http2StreamFrame}s implement the {@link ReferenceCounted} interface, as they carry
71 * reference counted objects (e.g. {@link ByteBuf}s). The multiplex codec will call {@link ReferenceCounted#retain()}
72 * before propagating a reference counted object through the pipeline, and thus an application handler needs to release
73 * such an object after having consumed it. For more information on reference counting take a look at
74 * <a href="https://netty.io/wiki/reference-counted-objects.html">the reference counted docs.</a>
75 *
76 * <h3>Channel Events</h3>
77 *
78 * A child channel becomes active as soon as it is registered to an {@link EventLoop}. Therefore, an active channel
79 * does not map to an active HTTP/2 stream immediately. Only once a {@link Http2HeadersFrame} has been successfully sent
80 * or received, does the channel map to an active HTTP/2 stream. In case it is not possible to open a new HTTP/2 stream
81 * (i.e. due to the maximum number of active streams being exceeded), the child channel receives an exception
82 * indicating the cause and is closed immediately thereafter.
83 *
84 * <h3>Writability and Flow Control</h3>
85 *
86 * A child channel observes outbound/remote flow control via the channel's writability. A channel only becomes writable
87 * when it maps to an active HTTP/2 stream . A child channel does not know about the connection-level flow control
88 * window. {@link ChannelHandler}s are free to ignore the channel's writability, in which case the excessive writes will
89 * be buffered by the parent channel. It's important to note that only {@link Http2DataFrame}s are subject to
90 * HTTP/2 flow control.
91 *
92 * <h3>Closing a {@link Http2StreamChannel}</h3>
93 *
94 * Once you close a {@link Http2StreamChannel} a {@link Http2ResetFrame} will be sent to the remote peer with
95 * {@link Http2Error#CANCEL} if needed. If you want to close the stream with another {@link Http2Error} (due
96 * errors / limits) you should propagate a {@link Http2FrameStreamException} through the {@link ChannelPipeline}.
97 * Once it reaches the end of the {@link ChannelPipeline} it will automatically close the {@link Http2StreamChannel}
98 * and send a {@link Http2ResetFrame} with the unwrapped {@link Http2Error} set. Another possibility is to just
99 * directly write a {@link Http2ResetFrame} to the {@link Http2StreamChannel}l.
100 */
101 @UnstableApi
102 public final class Http2MultiplexHandler extends Http2ChannelDuplexHandler {
103
104 static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
105 @Override
106 public void operationComplete(ChannelFuture future) {
107 registerDone(future);
108 }
109 };
110
111 private final ChannelHandler inboundStreamHandler;
112 private final ChannelHandler upgradeStreamHandler;
113 private final Queue<AbstractHttp2StreamChannel> readCompletePendingQueue =
114 new MaxCapacityQueue<AbstractHttp2StreamChannel>(new ArrayDeque<AbstractHttp2StreamChannel>(8),
115 // Choose 100 which is what is used most of the times as default.
116 Http2CodecUtil.SMALLEST_MAX_CONCURRENT_STREAMS);
117
118 private boolean parentReadInProgress;
119 private int idCount;
120
121 // Need to be volatile as accessed from within the Http2MultiplexHandlerStreamChannel in a multi-threaded fashion.
122 private volatile ChannelHandlerContext ctx;
123
124 /**
125 * Creates a new instance
126 *
127 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
128 * the {@link Channel}s created for new inbound streams.
129 */
130 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler) {
131 this(inboundStreamHandler, null);
132 }
133
134 /**
135 * Creates a new instance
136 *
137 * @param inboundStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of
138 * the {@link Channel}s created for new inbound streams.
139 * @param upgradeStreamHandler the {@link ChannelHandler} that will be added to the {@link ChannelPipeline} of the
140 * upgraded {@link Channel}.
141 */
142 public Http2MultiplexHandler(ChannelHandler inboundStreamHandler, ChannelHandler upgradeStreamHandler) {
143 this.inboundStreamHandler = ObjectUtil.checkNotNull(inboundStreamHandler, "inboundStreamHandler");
144 this.upgradeStreamHandler = upgradeStreamHandler;
145 }
146
147 static void registerDone(ChannelFuture future) {
148 // Handle any errors that occurred on the local thread while registering. Even though
149 // failures can happen after this point, they will be handled by the channel by closing the
150 // childChannel.
151 if (!future.isSuccess()) {
152 Channel childChannel = future.channel();
153 if (childChannel.isRegistered()) {
154 childChannel.close();
155 } else {
156 childChannel.unsafe().closeForcibly();
157 }
158 }
159 }
160
161 @Override
162 protected void handlerAdded0(ChannelHandlerContext ctx) {
163 if (ctx.executor() != ctx.channel().eventLoop()) {
164 throw new IllegalStateException("EventExecutor must be EventLoop of Channel");
165 }
166 this.ctx = ctx;
167 }
168
169 @Override
170 protected void handlerRemoved0(ChannelHandlerContext ctx) {
171 readCompletePendingQueue.clear();
172 }
173
174 @Override
175 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
176 parentReadInProgress = true;
177 if (msg instanceof Http2StreamFrame) {
178 if (msg instanceof Http2WindowUpdateFrame) {
179 // We dont want to propagate update frames to the user
180 return;
181 }
182 Http2StreamFrame streamFrame = (Http2StreamFrame) msg;
183 DefaultHttp2FrameStream s =
184 (DefaultHttp2FrameStream) streamFrame.stream();
185
186 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) s.attachment;
187 if (msg instanceof Http2ResetFrame) {
188 // Reset frames needs to be propagated via user events as these are not flow-controlled and so
189 // must not be controlled by suppressing channel.read() on the child channel.
190 channel.pipeline().fireUserEventTriggered(msg);
191
192 // RST frames will also trigger closing of the streams which then will call
193 // AbstractHttp2StreamChannel.streamClosed()
194 } else {
195 channel.fireChildRead(streamFrame);
196 }
197 return;
198 }
199
200 if (msg instanceof Http2GoAwayFrame) {
201 // goaway frames will also trigger closing of the streams which then will call
202 // AbstractHttp2StreamChannel.streamClosed()
203 onHttp2GoAwayFrame(ctx, (Http2GoAwayFrame) msg);
204 }
205
206 // Send everything down the pipeline
207 ctx.fireChannelRead(msg);
208 }
209
210 @Override
211 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
212 if (ctx.channel().isWritable()) {
213 // While the writability state may change during iterating of the streams we just set all of the streams
214 // to writable to not affect fairness. These will be "limited" by their own watermarks in any case.
215 forEachActiveStream(AbstractHttp2StreamChannel.WRITABLE_VISITOR);
216 }
217
218 ctx.fireChannelWritabilityChanged();
219 }
220
221 @Override
222 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
223 if (evt instanceof Http2FrameStreamEvent) {
224 Http2FrameStreamEvent event = (Http2FrameStreamEvent) evt;
225 DefaultHttp2FrameStream stream = (DefaultHttp2FrameStream) event.stream();
226 if (event.type() == Http2FrameStreamEvent.Type.State) {
227 switch (stream.state()) {
228 case HALF_CLOSED_LOCAL:
229 if (stream.id() != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
230 // Ignore everything which was not caused by an upgrade
231 break;
232 }
233 // fall-through
234 case HALF_CLOSED_REMOTE:
235 // fall-through
236 case OPEN:
237 if (stream.attachment != null) {
238 // ignore if child channel was already created.
239 break;
240 }
241 final AbstractHttp2StreamChannel ch;
242 // We need to handle upgrades special when on the client side.
243 if (stream.id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && !isServer(ctx)) {
244 // We must have an upgrade handler or else we can't handle the stream
245 if (upgradeStreamHandler == null) {
246 throw connectionError(INTERNAL_ERROR,
247 "Client is misconfigured for upgrade requests");
248 }
249 ch = new Http2MultiplexHandlerStreamChannel(stream, upgradeStreamHandler);
250 ch.closeOutbound();
251 } else {
252 ch = new Http2MultiplexHandlerStreamChannel(stream, inboundStreamHandler);
253 }
254 ChannelFuture future = ctx.channel().eventLoop().register(ch);
255 if (future.isDone()) {
256 registerDone(future);
257 } else {
258 future.addListener(CHILD_CHANNEL_REGISTRATION_LISTENER);
259 }
260 break;
261 case CLOSED:
262 AbstractHttp2StreamChannel channel = (AbstractHttp2StreamChannel) stream.attachment;
263 if (channel != null) {
264 channel.streamClosed();
265 }
266 break;
267 default:
268 // ignore for now
269 break;
270 }
271 }
272 return;
273 }
274 if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
275 forEachActiveStream(CHANNEL_INPUT_SHUTDOWN_READ_COMPLETE_VISITOR);
276 } else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
277 forEachActiveStream(CHANNEL_OUTPUT_SHUTDOWN_EVENT_VISITOR);
278 } else if (evt == SslCloseCompletionEvent.SUCCESS) {
279 forEachActiveStream(SSL_CLOSE_COMPLETION_EVENT_VISITOR);
280 }
281 ctx.fireUserEventTriggered(evt);
282 }
283
284 // TODO: This is most likely not the best way to expose this, need to think more about it.
285 Http2StreamChannel newOutboundStream() {
286 return new Http2MultiplexHandlerStreamChannel((DefaultHttp2FrameStream) newStream(), null);
287 }
288
289 @Override
290 public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
291 if (cause instanceof Http2FrameStreamException) {
292 Http2FrameStreamException exception = (Http2FrameStreamException) cause;
293 Http2FrameStream stream = exception.stream();
294 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
295 ((DefaultHttp2FrameStream) stream).attachment;
296 try {
297 childChannel.pipeline().fireExceptionCaught(cause.getCause());
298 } finally {
299 // Close with the correct error that causes this stream exception.
300 // See https://github.com/netty/netty/issues/13235#issuecomment-1441994672
301 childChannel.closeWithError(exception.error());
302 }
303 return;
304 }
305 if (cause instanceof Http2MultiplexActiveStreamsException) {
306 // Unwrap the cause that was used to create it and fire it for all the active streams.
307 fireExceptionCaughtForActiveStream(cause.getCause());
308 return;
309 }
310
311 if (cause.getCause() instanceof SSLException) {
312 fireExceptionCaughtForActiveStream(cause);
313 }
314 ctx.fireExceptionCaught(cause);
315 }
316
317 private void fireExceptionCaughtForActiveStream(final Throwable cause) throws Http2Exception {
318 forEachActiveStream(new Http2FrameStreamVisitor() {
319 @Override
320 public boolean visit(Http2FrameStream stream) {
321 AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
322 ((DefaultHttp2FrameStream) stream).attachment;
323 childChannel.pipeline().fireExceptionCaught(cause);
324 return true;
325 }
326 });
327 }
328
329 private static boolean isServer(ChannelHandlerContext ctx) {
330 return ctx.channel().parent() instanceof ServerChannel;
331 }
332
333 private void onHttp2GoAwayFrame(ChannelHandlerContext ctx, final Http2GoAwayFrame goAwayFrame) {
334 if (goAwayFrame.lastStreamId() == Integer.MAX_VALUE) {
335 // None of the streams can have an id greater than Integer.MAX_VALUE
336 return;
337 }
338 // Notify which streams were not processed by the remote peer and are safe to retry on another connection:
339 try {
340 final boolean server = isServer(ctx);
341 forEachActiveStream(new Http2FrameStreamVisitor() {
342 @Override
343 public boolean visit(Http2FrameStream stream) {
344 final int streamId = stream.id();
345 if (streamId > goAwayFrame.lastStreamId() && Http2CodecUtil.isStreamIdValid(streamId, server)) {
346 final AbstractHttp2StreamChannel childChannel = (AbstractHttp2StreamChannel)
347 ((DefaultHttp2FrameStream) stream).attachment;
348 childChannel.pipeline().fireUserEventTriggered(goAwayFrame.retainedDuplicate());
349 }
350 return true;
351 }
352 });
353 } catch (Http2Exception e) {
354 ctx.fireExceptionCaught(e);
355 ctx.close();
356 }
357 }
358
359 /**
360 * Notifies any child streams of the read completion.
361 */
362 @Override
363 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
364 processPendingReadCompleteQueue();
365 ctx.fireChannelReadComplete();
366 }
367
368 private void processPendingReadCompleteQueue() {
369 parentReadInProgress = true;
370 // If we have many child channel we can optimize for the case when multiple call flush() in
371 // channelReadComplete(...) callbacks and only do it once as otherwise we will end-up with multiple
372 // write calls on the socket which is expensive.
373 AbstractHttp2StreamChannel childChannel = readCompletePendingQueue.poll();
374 if (childChannel != null) {
375 try {
376 do {
377 childChannel.fireChildReadComplete();
378 childChannel = readCompletePendingQueue.poll();
379 } while (childChannel != null);
380 } finally {
381 parentReadInProgress = false;
382 readCompletePendingQueue.clear();
383 ctx.flush();
384 }
385 } else {
386 parentReadInProgress = false;
387 }
388 }
389
390 private final class Http2MultiplexHandlerStreamChannel extends AbstractHttp2StreamChannel {
391
392 Http2MultiplexHandlerStreamChannel(DefaultHttp2FrameStream stream, ChannelHandler inboundHandler) {
393 super(stream, ++idCount, inboundHandler);
394 }
395
396 @Override
397 protected boolean isParentReadInProgress() {
398 return parentReadInProgress;
399 }
400
401 @Override
402 protected void addChannelToReadCompletePendingQueue() {
403 // If there is no space left in the queue, just keep on processing everything that is already
404 // stored there and try again.
405 while (!readCompletePendingQueue.offer(this)) {
406 processPendingReadCompleteQueue();
407 }
408 }
409
410 @Override
411 protected ChannelHandlerContext parentContext() {
412 return ctx;
413 }
414 }
415 }