查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.buffer.Unpooled;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ReferenceCounted;
23  
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.List;
27  
28  import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
29  import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
30  import static io.netty.util.AsciiString.containsAllContentEqualsIgnoreCase;
31  import static io.netty.util.AsciiString.containsContentEqualsIgnoreCase;
32  import static io.netty.util.internal.ObjectUtil.checkNotNull;
33  import static io.netty.util.internal.StringUtil.COMMA;
34  
35  /**
36   * A server-side handler that receives HTTP requests and optionally performs a protocol switch if
37   * the requested protocol is supported. Once an upgrade is performed, this handler removes itself
38   * from the pipeline.
39   */
40  public class HttpServerUpgradeHandler extends HttpObjectAggregator {
41  
42      /**
43       * The source codec that is used in the pipeline initially.
44       */
45      public interface SourceCodec {
46          /**
47           * Removes this codec (i.e. all associated handlers) from the pipeline.
48           */
49          void upgradeFrom(ChannelHandlerContext ctx);
50      }
51  
52      /**
53       * A codec that the source can be upgraded to.
54       */
55      public interface UpgradeCodec {
56          /**
57           * Gets all protocol-specific headers required by this protocol for a successful upgrade.
58           * Any supplied header will be required to appear in the {@link HttpHeaderNames#CONNECTION} header as well.
59           */
60          Collection<CharSequence> requiredUpgradeHeaders();
61  
62          /**
63           * Prepares the {@code upgradeHeaders} for a protocol update based upon the contents of {@code upgradeRequest}.
64           * This method returns a boolean value to proceed or abort the upgrade in progress. If {@code false} is
65           * returned, the upgrade is aborted and the {@code upgradeRequest} will be passed through the inbound pipeline
66           * as if no upgrade was performed. If {@code true} is returned, the upgrade will proceed to the next
67           * step which invokes {@link #upgradeTo}. When returning {@code true}, you can add headers to
68           * the {@code upgradeHeaders} so that they are added to the 101 Switching protocols response.
69           */
70          boolean prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
71                                      HttpHeaders upgradeHeaders);
72  
73          /**
74           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
75           * adding all handlers required for the new protocol.
76           *
77           * @param ctx the context for the current handler.
78           * @param upgradeRequest the request that triggered the upgrade to this protocol.
79           */
80          void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest);
81      }
82  
83      /**
84       * Creates a new {@link UpgradeCodec} for the requested protocol name.
85       */
86      public interface UpgradeCodecFactory {
87          /**
88           * Invoked by {@link HttpServerUpgradeHandler} for all the requested protocol names in the order of
89           * the client preference. The first non-{@code null} {@link UpgradeCodec} returned by this method
90           * will be selected.
91           *
92           * @return a new {@link UpgradeCodec}, or {@code null} if the specified protocol name is not supported
93           */
94          UpgradeCodec newUpgradeCodec(CharSequence protocol);
95      }
96  
97      /**
98       * User event that is fired to notify about the completion of an HTTP upgrade
99       * to another protocol. Contains the original upgrade request so that the response
100      * (if required) can be sent using the new protocol.
101      */
102     public static final class UpgradeEvent implements ReferenceCounted {
103         private final CharSequence protocol;
104         private final FullHttpRequest upgradeRequest;
105 
106         UpgradeEvent(CharSequence protocol, FullHttpRequest upgradeRequest) {
107             this.protocol = protocol;
108             this.upgradeRequest = upgradeRequest;
109         }
110 
111         /**
112          * The protocol that the channel has been upgraded to.
113          */
114         public CharSequence protocol() {
115             return protocol;
116         }
117 
118         /**
119          * Gets the request that triggered the protocol upgrade.
120          */
121         public FullHttpRequest upgradeRequest() {
122             return upgradeRequest;
123         }
124 
125         @Override
126         public int refCnt() {
127             return upgradeRequest.refCnt();
128         }
129 
130         @Override
131         public UpgradeEvent retain() {
132             upgradeRequest.retain();
133             return this;
134         }
135 
136         @Override
137         public UpgradeEvent retain(int increment) {
138             upgradeRequest.retain(increment);
139             return this;
140         }
141 
142         @Override
143         public UpgradeEvent touch() {
144             upgradeRequest.touch();
145             return this;
146         }
147 
148         @Override
149         public UpgradeEvent touch(Object hint) {
150             upgradeRequest.touch(hint);
151             return this;
152         }
153 
154         @Override
155         public boolean release() {
156             return upgradeRequest.release();
157         }
158 
159         @Override
160         public boolean release(int decrement) {
161             return upgradeRequest.release(decrement);
162         }
163 
164         @Override
165         public String toString() {
166             return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']';
167         }
168     }
169 
170     private final SourceCodec sourceCodec;
171     private final UpgradeCodecFactory upgradeCodecFactory;
172     private final HttpHeadersFactory headersFactory;
173     private final HttpHeadersFactory trailersFactory;
174     private boolean handlingUpgrade;
175 
176     /**
177      * Constructs the upgrader with the supported codecs.
178      * <p>
179      * The handler instantiated by this constructor will reject an upgrade request with non-empty content.
180      * It should not be a concern because an upgrade request is most likely a GET request.
181      * If you have a client that sends a non-GET upgrade request, please consider using
182      * {@link #HttpServerUpgradeHandler(SourceCodec, UpgradeCodecFactory, int)} to specify the maximum
183      * length of the content of an upgrade request.
184      * </p>
185      *
186      * @param sourceCodec the codec that is being used initially
187      * @param upgradeCodecFactory the factory that creates a new upgrade codec
188      *                            for one of the requested upgrade protocols
189      */
190     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory) {
191         this(sourceCodec, upgradeCodecFactory, 0,
192                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
193     }
194 
195     /**
196      * Constructs the upgrader with the supported codecs.
197      *
198      * @param sourceCodec the codec that is being used initially
199      * @param upgradeCodecFactory the factory that creates a new upgrade codec
200      *                            for one of the requested upgrade protocols
201      * @param maxContentLength the maximum length of the content of an upgrade request
202      */
203     public HttpServerUpgradeHandler(
204             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength) {
205         this(sourceCodec, upgradeCodecFactory, maxContentLength,
206                 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
207     }
208 
209     /**
210      * Constructs the upgrader with the supported codecs.
211      *
212      * @param sourceCodec the codec that is being used initially
213      * @param upgradeCodecFactory the factory that creates a new upgrade codec
214      *                            for one of the requested upgrade protocols
215      * @param maxContentLength the maximum length of the content of an upgrade request
216      * @param validateHeaders validate the header names and values of the upgrade response.
217      */
218     public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory,
219                                     int maxContentLength, boolean validateHeaders) {
220         this(sourceCodec, upgradeCodecFactory, maxContentLength,
221                 DefaultHttpHeadersFactory.headersFactory().withValidation(validateHeaders),
222                 DefaultHttpHeadersFactory.trailersFactory().withValidation(validateHeaders));
223     }
224 
225     /**
226      * Constructs the upgrader with the supported codecs.
227      *
228      * @param sourceCodec the codec that is being used initially
229      * @param upgradeCodecFactory the factory that creates a new upgrade codec
230      *                            for one of the requested upgrade protocols
231      * @param maxContentLength the maximum length of the content of an upgrade request
232      * @param headersFactory The {@link HttpHeadersFactory} to use for headers.
233      * The recommended default factory is {@link DefaultHttpHeadersFactory#headersFactory()}.
234      * @param trailersFactory The {@link HttpHeadersFactory} to use for trailers.
235      * The recommended default factory is {@link DefaultHttpHeadersFactory#trailersFactory()}.
236      */
237     public HttpServerUpgradeHandler(
238             SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength,
239             HttpHeadersFactory headersFactory, HttpHeadersFactory trailersFactory) {
240         super(maxContentLength);
241 
242         this.sourceCodec = checkNotNull(sourceCodec, "sourceCodec");
243         this.upgradeCodecFactory = checkNotNull(upgradeCodecFactory, "upgradeCodecFactory");
244         this.headersFactory = checkNotNull(headersFactory, "headersFactory");
245         this.trailersFactory = checkNotNull(trailersFactory, "trailersFactory");
246     }
247 
248     @Override
249     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
250             throws Exception {
251 
252         if (!handlingUpgrade) {
253             // Not handling an upgrade request yet. Check if we received a new upgrade request.
254             if (msg instanceof HttpRequest) {
255                 HttpRequest req = (HttpRequest) msg;
256                 if (req.headers().contains(HttpHeaderNames.UPGRADE) &&
257                     shouldHandleUpgradeRequest(req)) {
258                     handlingUpgrade = true;
259                 } else {
260                     ReferenceCountUtil.retain(msg);
261                     ctx.fireChannelRead(msg);
262                     return;
263                 }
264             } else {
265                 ReferenceCountUtil.retain(msg);
266                 ctx.fireChannelRead(msg);
267                 return;
268             }
269         }
270 
271         FullHttpRequest fullRequest;
272         if (msg instanceof FullHttpRequest) {
273             fullRequest = (FullHttpRequest) msg;
274             ReferenceCountUtil.retain(msg);
275             out.add(msg);
276         } else {
277             // Call the base class to handle the aggregation of the full request.
278             super.decode(ctx, msg, out);
279             if (out.isEmpty()) {
280                 // The full request hasn't been created yet, still awaiting more data.
281                 return;
282             }
283 
284             // Finished aggregating the full request, get it from the output list.
285             assert out.size() == 1;
286             handlingUpgrade = false;
287             fullRequest = (FullHttpRequest) out.get(0);
288         }
289 
290         if (upgrade(ctx, fullRequest)) {
291             // The upgrade was successful, remove the message from the output list
292             // so that it's not propagated to the next handler. This request will
293             // be propagated as a user event instead.
294             out.clear();
295         }
296 
297         // The upgrade did not succeed, just allow the full request to propagate to the
298         // next handler.
299     }
300 
301     /**
302      * Determines whether the specified upgrade {@link HttpRequest} should be handled by this handler or not.
303      * This method will be invoked only when the request contains an {@code Upgrade} header.
304      * It always returns {@code true} by default, which means any request with an {@code Upgrade} header
305      * will be handled. You can override this method to ignore certain {@code Upgrade} headers, for example:
306      * <pre>{@code
307      * @Override
308      * protected boolean isUpgradeRequest(HttpRequest req) {
309      *   // Do not handle WebSocket upgrades.
310      *   return !req.headers().contains(HttpHeaderNames.UPGRADE, "websocket", false);
311      * }
312      * }</pre>
313      */
314     protected boolean shouldHandleUpgradeRequest(HttpRequest req) {
315         return true;
316     }
317 
318     /**
319      * Attempts to upgrade to the protocol(s) identified by the {@link HttpHeaderNames#UPGRADE} header (if provided
320      * in the request).
321      *
322      * @param ctx the context for this handler.
323      * @param request the HTTP request.
324      * @return {@code true} if the upgrade occurred, otherwise {@code false}.
325      */
326     private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
327         // Select the best protocol based on those requested in the UPGRADE header.
328         final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
329         final int numRequestedProtocols = requestedProtocols.size();
330         UpgradeCodec upgradeCodec = null;
331         CharSequence upgradeProtocol = null;
332         for (int i = 0; i < numRequestedProtocols; i ++) {
333             final CharSequence p = requestedProtocols.get(i);
334             final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);
335             if (c != null) {
336                 upgradeProtocol = p;
337                 upgradeCodec = c;
338                 break;
339             }
340         }
341 
342         if (upgradeCodec == null) {
343             // None of the requested protocols are supported, don't upgrade.
344             return false;
345         }
346 
347         // Make sure the CONNECTION header is present.
348         List<String> connectionHeaderValues = request.headers().getAll(HttpHeaderNames.CONNECTION);
349 
350         if (connectionHeaderValues == null || connectionHeaderValues.isEmpty()) {
351             return false;
352         }
353 
354         final StringBuilder concatenatedConnectionValue = new StringBuilder(connectionHeaderValues.size() * 10);
355         for (CharSequence connectionHeaderValue : connectionHeaderValues) {
356             concatenatedConnectionValue.append(connectionHeaderValue).append(COMMA);
357         }
358         concatenatedConnectionValue.setLength(concatenatedConnectionValue.length() - 1);
359 
360         // Make sure the CONNECTION header contains UPGRADE as well as all protocol-specific headers.
361         Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
362         List<CharSequence> values = splitHeader(concatenatedConnectionValue);
363         if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
364                 !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
365             return false;
366         }
367 
368         // Ensure that all required protocol-specific headers are found in the request.
369         for (CharSequence requiredHeader : requiredHeaders) {
370             if (!request.headers().contains(requiredHeader)) {
371                 return false;
372             }
373         }
374 
375         // Prepare and send the upgrade response. Wait for this write to complete before upgrading,
376         // since we need the old codec in-place to properly encode the response.
377         final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);
378         if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
379             return false;
380         }
381 
382         // Create the user event to be fired once the upgrade completes.
383         final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);
384 
385         // After writing the upgrade response we immediately prepare the
386         // pipeline for the next protocol to avoid a race between completion
387         // of the write future and receiving data before the pipeline is
388         // restructured.
389         try {
390             final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
391             // Perform the upgrade to the new protocol.
392             sourceCodec.upgradeFrom(ctx);
393             upgradeCodec.upgradeTo(ctx, request);
394 
395             // Remove this handler from the pipeline.
396             ctx.pipeline().remove(HttpServerUpgradeHandler.this);
397 
398             // Notify that the upgrade has occurred. Retain the event to offset
399             // the release() in the finally block.
400             ctx.fireUserEventTriggered(event.retain());
401 
402             // Add the listener last to avoid firing upgrade logic after
403             // the channel is already closed since the listener may fire
404             // immediately if the write failed eagerly.
405             writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
406         } finally {
407             // Release the event if the upgrade event wasn't fired.
408             event.release();
409         }
410         return true;
411     }
412 
413     /**
414      * Creates the 101 Switching Protocols response message.
415      */
416     private FullHttpResponse createUpgradeResponse(CharSequence upgradeProtocol) {
417         DefaultFullHttpResponse res = new DefaultFullHttpResponse(
418                 HTTP_1_1, SWITCHING_PROTOCOLS, Unpooled.EMPTY_BUFFER, headersFactory, trailersFactory);
419         res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
420         res.headers().add(HttpHeaderNames.UPGRADE, upgradeProtocol);
421         return res;
422     }
423 
424     /**
425      * Splits a comma-separated header value. The returned set is case-insensitive and contains each
426      * part with whitespace removed.
427      */
428     private static List<CharSequence> splitHeader(CharSequence header) {
429         final StringBuilder builder = new StringBuilder(header.length());
430         final List<CharSequence> protocols = new ArrayList<CharSequence>(4);
431         for (int i = 0; i < header.length(); ++i) {
432             char c = header.charAt(i);
433             if (Character.isWhitespace(c)) {
434                 // Don't include any whitespace.
435                 continue;
436             }
437             if (c == ',') {
438                 // Add the string and reset the builder for the next protocol.
439                 protocols.add(builder.toString());
440                 builder.setLength(0);
441             } else {
442                 builder.append(c);
443             }
444         }
445 
446         // Add the last protocol
447         if (builder.length() > 0) {
448             protocols.add(builder.toString());
449         }
450 
451         return protocols;
452     }
453 }