查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.http.FullHttpMessage;
21  import io.netty.handler.codec.http.FullHttpRequest;
22  import io.netty.handler.codec.http.FullHttpResponse;
23  import io.netty.handler.codec.http.HttpHeaderNames;
24  import io.netty.handler.codec.http.HttpStatusClass;
25  import io.netty.handler.codec.http.HttpUtil;
26  import io.netty.util.internal.UnstableApi;
27  
28  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
29  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
30  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
31  import static io.netty.handler.codec.http.HttpResponseStatus.OK;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static io.netty.util.internal.ObjectUtil.checkPositive;
34  
35  /**
36   * This adapter provides just header/data events from the HTTP message flow defined
37   * in <a href="https://tools.ietf.org/html/rfc7540#section-8.1">[RFC 7540], Section 8.1</a>.
38   * <p>
39   * See {@link HttpToHttp2ConnectionHandler} to get translation from HTTP/1.x objects to HTTP/2 frames for writes.
40   */
41  @UnstableApi
42  public class InboundHttp2ToHttpAdapter extends Http2EventAdapter {
43      private static final ImmediateSendDetector DEFAULT_SEND_DETECTOR = new ImmediateSendDetector() {
44          @Override
45          public boolean mustSendImmediately(FullHttpMessage msg) {
46              if (msg instanceof FullHttpResponse) {
47                  return ((FullHttpResponse) msg).status().codeClass() == HttpStatusClass.INFORMATIONAL;
48              }
49              if (msg instanceof FullHttpRequest) {
50                  return msg.headers().contains(HttpHeaderNames.EXPECT);
51              }
52              return false;
53          }
54  
55          @Override
56          public FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg) {
57              if (msg instanceof FullHttpRequest) {
58                  FullHttpRequest copy = ((FullHttpRequest) msg).replace(allocator.buffer(0));
59                  copy.headers().remove(HttpHeaderNames.EXPECT);
60                  return copy;
61              }
62              return null;
63          }
64      };
65  
66      private final int maxContentLength;
67      private final ImmediateSendDetector sendDetector;
68      private final Http2Connection.PropertyKey messageKey;
69      private final boolean propagateSettings;
70      protected final Http2Connection connection;
71      protected final boolean validateHttpHeaders;
72  
73      protected InboundHttp2ToHttpAdapter(Http2Connection connection, int maxContentLength,
74                                          boolean validateHttpHeaders, boolean propagateSettings) {
75          this.connection = checkNotNull(connection, "connection");
76          this.maxContentLength = checkPositive(maxContentLength, "maxContentLength");
77          this.validateHttpHeaders = validateHttpHeaders;
78          this.propagateSettings = propagateSettings;
79          sendDetector = DEFAULT_SEND_DETECTOR;
80          messageKey = connection.newKey();
81      }
82  
83      /**
84       * The stream is out of scope for the HTTP message flow and will no longer be tracked
85       * @param stream The stream to remove associated state with
86       * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
87       */
88      protected final void removeMessage(Http2Stream stream, boolean release) {
89          FullHttpMessage msg = stream.removeProperty(messageKey);
90          if (release && msg != null) {
91              msg.release();
92          }
93      }
94  
95      /**
96       * Get the {@link FullHttpMessage} associated with {@code stream}.
97       * @param stream The stream to get the associated state from
98       * @return The {@link FullHttpMessage} associated with {@code stream}.
99       */
100     protected final FullHttpMessage getMessage(Http2Stream stream) {
101         return (FullHttpMessage) stream.getProperty(messageKey);
102     }
103 
104     /**
105      * Make {@code message} be the state associated with {@code stream}.
106      * @param stream The stream which {@code message} is associated with.
107      * @param message The message which contains the HTTP semantics.
108      */
109     protected final void putMessage(Http2Stream stream, FullHttpMessage message) {
110         FullHttpMessage previous = stream.setProperty(messageKey, message);
111         if (previous != message && previous != null) {
112             previous.release();
113         }
114     }
115 
116     @Override
117     public void onStreamRemoved(Http2Stream stream) {
118         removeMessage(stream, true);
119     }
120 
121     /**
122      * Set final headers and fire a channel read event
123      *
124      * @param ctx The context to fire the event on
125      * @param msg The message to send
126      * @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
127      * @param stream the stream of the message which is being fired
128      */
129     protected void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
130                                    Http2Stream stream) {
131         removeMessage(stream, release);
132         HttpUtil.setContentLength(msg, msg.content().readableBytes());
133         ctx.fireChannelRead(msg);
134     }
135 
136     /**
137      * Create a new {@link FullHttpMessage} based upon the current connection parameters
138      *
139      * @param stream The stream to create a message for
140      * @param headers The headers associated with {@code stream}
141      * @param validateHttpHeaders
142      * <ul>
143      * <li>{@code true} to validate HTTP headers in the http-codec</li>
144      * <li>{@code false} not to validate HTTP headers in the http-codec</li>
145      * </ul>
146      * @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
147      * @throws Http2Exception If there is an error when creating {@link FullHttpMessage} from
148      *                        {@link Http2Stream} and {@link Http2Headers}
149      */
150     protected FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
151                                          ByteBufAllocator alloc) throws Http2Exception {
152         return connection.isServer() ? HttpConversionUtil.toFullHttpRequest(stream.id(), headers, alloc,
153                 validateHttpHeaders) : HttpConversionUtil.toFullHttpResponse(stream.id(), headers, alloc,
154                 validateHttpHeaders);
155     }
156 
157     /**
158      * Provides translation between HTTP/2 and HTTP header objects while ensuring the stream
159      * is in a valid state for additional headers.
160      *
161      * @param ctx The context for which this message has been received.
162      * Used to send informational header if detected.
163      * @param stream The stream the {@code headers} apply to
164      * @param headers The headers to process
165      * @param endOfStream {@code true} if the {@code stream} has received the end of stream flag
166      * @param allowAppend
167      * <ul>
168      * <li>{@code true} if headers will be appended if the stream already exists.</li>
169      * <li>if {@code false} and the stream already exists this method returns {@code null}.</li>
170      * </ul>
171      * @param appendToTrailer
172      * <ul>
173      * <li>{@code true} if a message {@code stream} already exists then the headers
174      * should be added to the trailing headers.</li>
175      * <li>{@code false} then appends will be done to the initial headers.</li>
176      * </ul>
177      * @return The object used to track the stream corresponding to {@code stream}. {@code null} if
178      *         {@code allowAppend} is {@code false} and the stream already exists.
179      * @throws Http2Exception If the stream id is not in the correct state to process the headers request
180      */
181     protected FullHttpMessage processHeadersBegin(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
182                                                   boolean endOfStream, boolean allowAppend, boolean appendToTrailer)
183             throws Http2Exception {
184         FullHttpMessage msg = getMessage(stream);
185         boolean release = true;
186         if (msg == null) {
187             msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
188         } else if (allowAppend) {
189             release = false;
190             HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
191         } else {
192             release = false;
193             msg = null;
194         }
195 
196         if (sendDetector.mustSendImmediately(msg)) {
197             // Copy the message (if necessary) before sending. The content is not expected to be copied (or used) in
198             // this operation but just in case it is used do the copy before sending and the resource may be released
199             final FullHttpMessage copy = endOfStream ? null : sendDetector.copyIfNeeded(ctx.alloc(), msg);
200             fireChannelRead(ctx, msg, release, stream);
201             return copy;
202         }
203 
204         return msg;
205     }
206 
207     /**
208      * After HTTP/2 headers have been processed by {@link #processHeadersBegin} this method either
209      * sends the result up the pipeline or retains the message for future processing.
210      *
211      * @param ctx The context for which this message has been received
212      * @param stream The stream the {@code objAccumulator} corresponds to
213      * @param msg The object which represents all headers/data for corresponding to {@code stream}
214      * @param endOfStream {@code true} if this is the last event for the stream
215      */
216     private void processHeadersEnd(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
217                                    boolean endOfStream) {
218         if (endOfStream) {
219             // Release if the msg from the map is different from the object being forwarded up the pipeline.
220             fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
221         } else {
222             putMessage(stream, msg);
223         }
224     }
225 
226     @Override
227     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
228             throws Http2Exception {
229         Http2Stream stream = connection.stream(streamId);
230         FullHttpMessage msg = getMessage(stream);
231         if (msg == null) {
232             throw connectionError(PROTOCOL_ERROR, "Data Frame received for unknown stream id %d", streamId);
233         }
234 
235         ByteBuf content = msg.content();
236         final int dataReadableBytes = data.readableBytes();
237         if (content.readableBytes() > maxContentLength - dataReadableBytes) {
238             throw connectionError(INTERNAL_ERROR,
239                     "Content length exceeded max of %d for stream id %d", maxContentLength, streamId);
240         }
241 
242         content.writeBytes(data, data.readerIndex(), dataReadableBytes);
243 
244         if (endOfStream) {
245             fireChannelRead(ctx, msg, false, stream);
246         }
247 
248         // All bytes have been processed.
249         return dataReadableBytes + padding;
250     }
251 
252     @Override
253     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
254                               boolean endOfStream) throws Http2Exception {
255         Http2Stream stream = connection.stream(streamId);
256         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
257         if (msg != null) {
258             processHeadersEnd(ctx, stream, msg, endOfStream);
259         }
260     }
261 
262     @Override
263     public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
264                               short weight, boolean exclusive, int padding, boolean endOfStream)
265             throws Http2Exception {
266         Http2Stream stream = connection.stream(streamId);
267         FullHttpMessage msg = processHeadersBegin(ctx, stream, headers, endOfStream, true, true);
268         if (msg != null) {
269             // Add headers for dependency and weight.
270             // See https://github.com/netty/netty/issues/5866
271             if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
272                 msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
273                         streamDependency);
274             }
275             msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
276 
277             processHeadersEnd(ctx, stream, msg, endOfStream);
278         }
279     }
280 
281     @Override
282     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
283         Http2Stream stream = connection.stream(streamId);
284         FullHttpMessage msg = getMessage(stream);
285         if (msg != null) {
286             onRstStreamRead(stream, msg);
287         }
288         ctx.fireExceptionCaught(Http2Exception.streamError(streamId, Http2Error.valueOf(errorCode),
289                 "HTTP/2 to HTTP layer caught stream reset"));
290     }
291 
292     @Override
293     public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
294                                   Http2Headers headers, int padding) throws Http2Exception {
295         // A push promise should not be allowed to add headers to an existing stream
296         Http2Stream promisedStream = connection.stream(promisedStreamId);
297         if (headers.status() == null) {
298             // A PUSH_PROMISE frame has no Http response status.
299             // https://tools.ietf.org/html/rfc7540#section-8.2.1
300             // Server push is semantically equivalent to a server responding to a
301             // request; however, in this case, that request is also sent by the
302             // server, as a PUSH_PROMISE frame.
303             headers.status(OK.codeAsText());
304         }
305         FullHttpMessage msg = processHeadersBegin(ctx, promisedStream, headers, false, false, false);
306         if (msg == null) {
307             throw connectionError(PROTOCOL_ERROR, "Push Promise Frame received for pre-existing stream id %d",
308                     promisedStreamId);
309         }
310 
311         msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
312         msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
313                 Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
314 
315         processHeadersEnd(ctx, promisedStream, msg, false);
316     }
317 
318     @Override
319     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
320         if (propagateSettings) {
321             // Provide an interface for non-listeners to capture settings
322             ctx.fireChannelRead(settings);
323         }
324     }
325 
326     /**
327      * Called if a {@code RST_STREAM} is received but we have some data for that stream.
328      */
329     protected void onRstStreamRead(Http2Stream stream, FullHttpMessage msg) {
330         removeMessage(stream, true);
331     }
332 
333     /**
334      * Allows messages to be sent up the pipeline before the next phase in the
335      * HTTP message flow is detected.
336      */
337     private interface ImmediateSendDetector {
338         /**
339          * Determine if the response should be sent immediately, or wait for the end of the stream
340          *
341          * @param msg The response to test
342          * @return {@code true} if the message should be sent immediately
343          *         {@code false) if we should wait for the end of the stream
344          */
345         boolean mustSendImmediately(FullHttpMessage msg);
346 
347         /**
348          * Determine if a copy must be made after an immediate send happens.
349          * <p>
350          * An example of this use case is if a request is received
351          * with a 'Expect: 100-continue' header. The message will be sent immediately,
352          * and the data will be queued and sent at the end of the stream.
353          *
354          * @param allocator The {@link ByteBufAllocator} that can be used to allocate
355          * @param msg The message which has just been sent due to {@link #mustSendImmediately(FullHttpMessage)}
356          * @return A modified copy of the {@code msg} or {@code null} if a copy is not needed.
357          */
358         FullHttpMessage copyIfNeeded(ByteBufAllocator allocator, FullHttpMessage msg);
359     }
360 }