查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http;
16  
17  import io.netty.channel.ChannelHandlerContext;
18  import io.netty.channel.ChannelOutboundHandler;
19  import io.netty.channel.ChannelPromise;
20  import io.netty.util.AsciiString;
21  import io.netty.util.internal.ObjectUtil;
22  
23  import java.net.SocketAddress;
24  import java.util.Collection;
25  import java.util.LinkedHashSet;
26  import java.util.List;
27  import java.util.Set;
28  
29  import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
30  import static io.netty.util.ReferenceCountUtil.release;
31  
32  /**
33   * Client-side handler for handling an HTTP upgrade handshake to another protocol. When the first
34   * HTTP request is sent, this handler will add all appropriate headers to perform an upgrade to the
35   * new protocol. If the upgrade fails (i.e. response is not 101 Switching Protocols), this handler
36   * simply removes itself from the pipeline. If the upgrade is successful, upgrades the pipeline to
37   * the new protocol.
38   */
39  public class HttpClientUpgradeHandler extends HttpObjectAggregator implements ChannelOutboundHandler {
40  
41      /**
42       * User events that are fired to notify about upgrade status.
43       */
44      public enum UpgradeEvent {
45          /**
46           * The Upgrade request was sent to the server.
47           */
48          UPGRADE_ISSUED,
49  
50          /**
51           * The Upgrade to the new protocol was successful.
52           */
53          UPGRADE_SUCCESSFUL,
54  
55          /**
56           * The Upgrade was unsuccessful due to the server not issuing
57           * with a 101 Switching Protocols response.
58           */
59          UPGRADE_REJECTED
60      }
61  
62      /**
63       * The source codec that is used in the pipeline initially.
64       */
65      public interface SourceCodec {
66  
67          /**
68           * Removes or disables the encoder of this codec so that the {@link UpgradeCodec} can send an initial greeting
69           * (if any).
70           */
71          void prepareUpgradeFrom(ChannelHandlerContext ctx);
72  
73          /**
74           * Removes this codec (i.e. all associated handlers) from the pipeline.
75           */
76          void upgradeFrom(ChannelHandlerContext ctx);
77      }
78  
79      /**
80       * A codec that the source can be upgraded to.
81       */
82      public interface UpgradeCodec {
83          /**
84           * Returns the name of the protocol supported by this codec, as indicated by the {@code 'UPGRADE'} header.
85           */
86          CharSequence protocol();
87  
88          /**
89           * Sets any protocol-specific headers required to the upgrade request. Returns the names of
90           * all headers that were added. These headers will be used to populate the CONNECTION header.
91           */
92          Collection<CharSequence> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
93  
94          /**
95           * Performs an HTTP protocol upgrade from the source codec. This method is responsible for
96           * adding all handlers required for the new protocol.
97           *
98           * @param ctx the context for the current handler.
99           * @param upgradeResponse the 101 Switching Protocols response that indicates that the server
100          *            has switched to this protocol.
101          */
102         void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception;
103     }
104 
105     private final SourceCodec sourceCodec;
106     private final UpgradeCodec upgradeCodec;
107     private boolean upgradeRequested;
108 
109     /**
110      * Constructs the client upgrade handler.
111      *
112      * @param sourceCodec the codec that is being used initially.
113      * @param upgradeCodec the codec that the client would like to upgrade to.
114      * @param maxContentLength the maximum length of the aggregated content.
115      */
116     public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
117                                     int maxContentLength) {
118         super(maxContentLength);
119         this.sourceCodec = ObjectUtil.checkNotNull(sourceCodec, "sourceCodec");
120         this.upgradeCodec = ObjectUtil.checkNotNull(upgradeCodec, "upgradeCodec");
121     }
122 
123     @Override
124     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
125         ctx.bind(localAddress, promise);
126     }
127 
128     @Override
129     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
130                         ChannelPromise promise) throws Exception {
131         ctx.connect(remoteAddress, localAddress, promise);
132     }
133 
134     @Override
135     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
136         ctx.disconnect(promise);
137     }
138 
139     @Override
140     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
141         ctx.close(promise);
142     }
143 
144     @Override
145     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
146         ctx.deregister(promise);
147     }
148 
149     @Override
150     public void read(ChannelHandlerContext ctx) throws Exception {
151         ctx.read();
152     }
153 
154     @Override
155     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
156             throws Exception {
157         if (!(msg instanceof HttpRequest)) {
158             ctx.write(msg, promise);
159             return;
160         }
161 
162         if (upgradeRequested) {
163             promise.setFailure(new IllegalStateException(
164                     "Attempting to write HTTP request with upgrade in progress"));
165             return;
166         }
167 
168         upgradeRequested = true;
169         setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
170 
171         // Continue writing the request.
172         ctx.write(msg, promise);
173 
174         // Notify that the upgrade request was issued.
175         ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED);
176         // Now we wait for the next HTTP response to see if we switch protocols.
177     }
178 
179     @Override
180     public void flush(ChannelHandlerContext ctx) throws Exception {
181         ctx.flush();
182     }
183 
184     @Override
185     protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
186             throws Exception {
187         FullHttpResponse response = null;
188         try {
189             if (!upgradeRequested) {
190                 throw new IllegalStateException("Read HTTP response without requesting protocol switch");
191             }
192 
193             if (msg instanceof HttpResponse) {
194                 HttpResponse rep = (HttpResponse) msg;
195                 if (!SWITCHING_PROTOCOLS.equals(rep.status())) {
196                     // The server does not support the requested protocol, just remove this handler
197                     // and continue processing HTTP.
198                     // NOTE: not releasing the response since we're letting it propagate to the
199                     // next handler.
200                     ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
201                     removeThisHandler(ctx);
202                     ctx.fireChannelRead(msg);
203                     return;
204                 }
205             }
206 
207             if (msg instanceof FullHttpResponse) {
208                 response = (FullHttpResponse) msg;
209                 // Need to retain since the base class will release after returning from this method.
210                 response.retain();
211                 out.add(response);
212             } else {
213                 // Call the base class to handle the aggregation of the full request.
214                 super.decode(ctx, msg, out);
215                 if (out.isEmpty()) {
216                     // The full request hasn't been created yet, still awaiting more data.
217                     return;
218                 }
219 
220                 assert out.size() == 1;
221                 response = (FullHttpResponse) out.get(0);
222             }
223 
224             CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE);
225             if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) {
226                 throw new IllegalStateException(
227                         "Switching Protocols response with unexpected UPGRADE protocol: " + upgradeHeader);
228             }
229 
230             // Upgrade to the new protocol.
231             sourceCodec.prepareUpgradeFrom(ctx);
232             upgradeCodec.upgradeTo(ctx, response);
233 
234             // Notify that the upgrade to the new protocol completed successfully.
235             ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
236 
237             // We guarantee UPGRADE_SUCCESSFUL event will be arrived at the next handler
238             // before http2 setting frame and http response.
239             sourceCodec.upgradeFrom(ctx);
240 
241             // We switched protocols, so we're done with the upgrade response.
242             // Release it and clear it from the output.
243             response.release();
244             out.clear();
245             removeThisHandler(ctx);
246         } catch (Throwable t) {
247             release(response);
248             ctx.fireExceptionCaught(t);
249             removeThisHandler(ctx);
250         }
251     }
252 
253     private static void removeThisHandler(ChannelHandlerContext ctx) {
254         ctx.pipeline().remove(ctx.name());
255     }
256 
257     /**
258      * Adds all upgrade request headers necessary for an upgrade to the supported protocols.
259      */
260     private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
261         // Set the UPGRADE header on the request.
262         request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
263 
264         // Add all protocol-specific headers to the request.
265         Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2);
266         connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
267 
268         // Set the CONNECTION header from the set of all protocol-specific headers that were added.
269         StringBuilder builder = new StringBuilder();
270         for (CharSequence part : connectionParts) {
271             builder.append(part);
272             builder.append(',');
273         }
274         builder.append(HttpHeaderValues.UPGRADE);
275         request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());
276     }
277 }