查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http.websocketx;
17  
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.handler.codec.http.FullHttpResponse;
24  import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler.ClientHandshakeStateEvent;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.FutureListener;
27  
28  import java.util.concurrent.TimeUnit;
29  
30  import static io.netty.util.internal.ObjectUtil.*;
31  
32  class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
33      private static final long DEFAULT_HANDSHAKE_TIMEOUT_MS = 10000L;
34  
35      private final WebSocketClientHandshaker handshaker;
36      private final long handshakeTimeoutMillis;
37      private ChannelHandlerContext ctx;
38      private ChannelPromise handshakePromise;
39  
40      WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) {
41          this(handshaker, DEFAULT_HANDSHAKE_TIMEOUT_MS);
42      }
43  
44      WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker, long handshakeTimeoutMillis) {
45          this.handshaker = handshaker;
46          this.handshakeTimeoutMillis = checkPositive(handshakeTimeoutMillis, "handshakeTimeoutMillis");
47      }
48  
49      @Override
50      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
51          this.ctx = ctx;
52          handshakePromise = ctx.newPromise();
53      }
54  
55      @Override
56      public void channelActive(final ChannelHandlerContext ctx) throws Exception {
57          super.channelActive(ctx);
58          handshaker.handshake(ctx.channel()).addListener(new ChannelFutureListener() {
59              @Override
60              public void operationComplete(ChannelFuture future) throws Exception {
61                  if (!future.isSuccess()) {
62                      handshakePromise.tryFailure(future.cause());
63                      ctx.fireExceptionCaught(future.cause());
64                  } else {
65                      ctx.fireUserEventTriggered(
66                              WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
67                  }
68              }
69          });
70          applyHandshakeTimeout();
71      }
72  
73      @Override
74      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
75          if (!handshakePromise.isDone()) {
76              handshakePromise.tryFailure(new WebSocketClientHandshakeException("channel closed with handshake " +
77                                                                                "in progress"));
78          }
79  
80          super.channelInactive(ctx);
81      }
82  
83      @Override
84      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
85          if (!(msg instanceof FullHttpResponse)) {
86              ctx.fireChannelRead(msg);
87              return;
88          }
89  
90          FullHttpResponse response = (FullHttpResponse) msg;
91          try {
92              if (!handshaker.isHandshakeComplete()) {
93                  handshaker.finishHandshake(ctx.channel(), response);
94                  handshakePromise.trySuccess();
95                  ctx.fireUserEventTriggered(
96                          WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
97                  ctx.pipeline().remove(this);
98                  return;
99              }
100             throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
101         } finally {
102             response.release();
103         }
104     }
105 
106     private void applyHandshakeTimeout() {
107         final ChannelPromise localHandshakePromise = handshakePromise;
108         if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
109             return;
110         }
111 
112         final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
113             @Override
114             public void run() {
115                 if (localHandshakePromise.isDone()) {
116                     return;
117                 }
118 
119                 if (localHandshakePromise.tryFailure(new WebSocketClientHandshakeException("handshake timed out"))) {
120                     ctx.flush()
121                        .fireUserEventTriggered(ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT)
122                        .close();
123                 }
124             }
125         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
126 
127         // Cancel the handshake timeout when handshake is finished.
128         localHandshakePromise.addListener(new FutureListener<Void>() {
129             @Override
130             public void operationComplete(Future<Void> f) throws Exception {
131                 timeoutFuture.cancel(false);
132             }
133         });
134     }
135 
136     /**
137      * This method is visible for testing.
138      *
139      * @return current handshake future
140      */
141     ChannelFuture getHandshakeFuture() {
142         return handshakePromise;
143     }
144 }