查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http.websocketx;
17  
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.handler.codec.http.HttpHeaderNames;
25  import io.netty.handler.codec.http.HttpObject;
26  import io.netty.handler.codec.http.HttpRequest;
27  import io.netty.handler.codec.http.HttpResponse;
28  import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.ServerHandshakeStateEvent;
29  import io.netty.handler.ssl.SslHandler;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.concurrent.Future;
32  import io.netty.util.concurrent.FutureListener;
33  
34  import java.util.concurrent.TimeUnit;
35  
36  import static io.netty.handler.codec.http.HttpUtil.*;
37  import static io.netty.util.internal.ObjectUtil.*;
38  
39  /**
40   * Handles the HTTP handshake (the HTTP Upgrade request) for {@link WebSocketServerProtocolHandler}.
41   */
42  class WebSocketServerProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
43  
44      private final WebSocketServerProtocolConfig serverConfig;
45      private ChannelHandlerContext ctx;
46      private ChannelPromise handshakePromise;
47      private boolean isWebSocketPath;
48  
49      WebSocketServerProtocolHandshakeHandler(WebSocketServerProtocolConfig serverConfig) {
50          this.serverConfig = checkNotNull(serverConfig, "serverConfig");
51      }
52  
53      @Override
54      public void handlerAdded(ChannelHandlerContext ctx) {
55          this.ctx = ctx;
56          handshakePromise = ctx.newPromise();
57      }
58  
59      @Override
60      public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
61          final HttpObject httpObject = (HttpObject) msg;
62  
63          if (httpObject instanceof HttpRequest) {
64              final HttpRequest req = (HttpRequest) httpObject;
65              isWebSocketPath = isWebSocketPath(req);
66              if (!isWebSocketPath) {
67                  ctx.fireChannelRead(msg);
68                  return;
69              }
70  
71              try {
72                  final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
73                          getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
74                          serverConfig.subprotocols(), serverConfig.decoderConfig());
75                  final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
76                  final ChannelPromise localHandshakePromise = handshakePromise;
77                  if (handshaker == null) {
78                      WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
79                  } else {
80                      // Ensure we set the handshaker and replace this handler before we
81                      // trigger the actual handshake. Otherwise we may receive websocket bytes in this handler
82                      // before we had a chance to replace it.
83                      //
84                      // See https://github.com/netty/netty/issues/9471.
85                      WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
86                      ctx.pipeline().remove(this);
87  
88                      final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
89                      handshakeFuture.addListener(new ChannelFutureListener() {
90                          @Override
91                          public void operationComplete(ChannelFuture future) {
92                              if (!future.isSuccess()) {
93                                  localHandshakePromise.tryFailure(future.cause());
94                                  ctx.fireExceptionCaught(future.cause());
95                              } else {
96                                  localHandshakePromise.trySuccess();
97                                  // Kept for compatibility
98                                  ctx.fireUserEventTriggered(
99                                          WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
100                                 ctx.fireUserEventTriggered(
101                                         new WebSocketServerProtocolHandler.HandshakeComplete(
102                                                 req.uri(), req.headers(), handshaker.selectedSubprotocol()));
103                             }
104                         }
105                     });
106                     applyHandshakeTimeout();
107                 }
108             } finally {
109                 ReferenceCountUtil.release(req);
110             }
111         } else if (!isWebSocketPath) {
112             ctx.fireChannelRead(msg);
113         } else {
114             ReferenceCountUtil.release(msg);
115         }
116     }
117 
118     private boolean isWebSocketPath(HttpRequest req) {
119         String websocketPath = serverConfig.websocketPath();
120         String uri = req.uri();
121         boolean checkStartUri = uri.startsWith(websocketPath);
122         boolean checkNextUri = "/".equals(websocketPath) || checkNextUri(uri, websocketPath);
123         return serverConfig.checkStartsWith() ? (checkStartUri && checkNextUri) : uri.equals(websocketPath);
124     }
125 
126     private boolean checkNextUri(String uri, String websocketPath) {
127         int len = websocketPath.length();
128         if (uri.length() > len) {
129             char nextUri = uri.charAt(len);
130             return nextUri == '/' || nextUri == '?';
131         }
132         return true;
133     }
134 
135     private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
136         ChannelFuture f = ctx.writeAndFlush(res);
137         if (!isKeepAlive(req) || res.status().code() != 200) {
138             f.addListener(ChannelFutureListener.CLOSE);
139         }
140     }
141 
142     private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) {
143         String protocol = "ws";
144         if (cp.get(SslHandler.class) != null) {
145             // SSL in use so use Secure WebSockets
146             protocol = "wss";
147         }
148         String host = req.headers().get(HttpHeaderNames.HOST);
149         return protocol + "://" + host + path;
150     }
151 
152     private void applyHandshakeTimeout() {
153         final ChannelPromise localHandshakePromise = handshakePromise;
154         final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
155         if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
156             return;
157         }
158 
159         final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
160             @Override
161             public void run() {
162                 if (!localHandshakePromise.isDone() &&
163                     localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) {
164                     ctx.flush()
165                        .fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
166                        .close();
167                 }
168             }
169         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
170 
171         // Cancel the handshake timeout when handshake is finished.
172         localHandshakePromise.addListener(new FutureListener<Void>() {
173             @Override
174             public void operationComplete(Future<Void> f) {
175                 timeoutFuture.cancel(false);
176             }
177         });
178     }
179 }