1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http.websocketx;
17
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import io.netty.channel.ChannelPromise;
23 import io.netty.handler.codec.http.FullHttpResponse;
24 import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler.ClientHandshakeStateEvent;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.FutureListener;
27
28 import java.util.concurrent.TimeUnit;
29
30 import static io.netty.util.internal.ObjectUtil.*;
31
32 class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapter {
33 private static final long DEFAULT_HANDSHAKE_TIMEOUT_MS = 10000L;
34
35 private final WebSocketClientHandshaker handshaker;
36 private final long handshakeTimeoutMillis;
37 private ChannelHandlerContext ctx;
38 private ChannelPromise handshakePromise;
39
40 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) {
41 this(handshaker, DEFAULT_HANDSHAKE_TIMEOUT_MS);
42 }
43
44 WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker, long handshakeTimeoutMillis) {
45 this.handshaker = handshaker;
46 this.handshakeTimeoutMillis = checkPositive(handshakeTimeoutMillis, "handshakeTimeoutMillis");
47 }
48
49 @Override
50 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
51 this.ctx = ctx;
52 handshakePromise = ctx.newPromise();
53 }
54
55 @Override
56 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
57 super.channelActive(ctx);
58 handshaker.handshake(ctx.channel()).addListener(new ChannelFutureListener() {
59 @Override
60 public void operationComplete(ChannelFuture future) throws Exception {
61 if (!future.isSuccess()) {
62 handshakePromise.tryFailure(future.cause());
63 ctx.fireExceptionCaught(future.cause());
64 } else {
65 ctx.fireUserEventTriggered(
66 WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_ISSUED);
67 }
68 }
69 });
70 applyHandshakeTimeout();
71 }
72
73 @Override
74 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
75 if (!handshakePromise.isDone()) {
76 handshakePromise.tryFailure(new WebSocketClientHandshakeException("channel closed with handshake " +
77 "in progress"));
78 }
79
80 super.channelInactive(ctx);
81 }
82
83 @Override
84 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
85 if (!(msg instanceof FullHttpResponse)) {
86 ctx.fireChannelRead(msg);
87 return;
88 }
89
90 FullHttpResponse response = (FullHttpResponse) msg;
91 try {
92 if (!handshaker.isHandshakeComplete()) {
93 handshaker.finishHandshake(ctx.channel(), response);
94 handshakePromise.trySuccess();
95 ctx.fireUserEventTriggered(
96 WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE);
97 ctx.pipeline().remove(this);
98 return;
99 }
100 throw new IllegalStateException("WebSocketClientHandshaker should have been non finished yet");
101 } finally {
102 response.release();
103 }
104 }
105
106 private void applyHandshakeTimeout() {
107 final ChannelPromise localHandshakePromise = handshakePromise;
108 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
109 return;
110 }
111
112 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
113 @Override
114 public void run() {
115 if (localHandshakePromise.isDone()) {
116 return;
117 }
118
119 if (localHandshakePromise.tryFailure(new WebSocketClientHandshakeException("handshake timed out"))) {
120 ctx.flush()
121 .fireUserEventTriggered(ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT)
122 .close();
123 }
124 }
125 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
126
127
128 localHandshakePromise.addListener(new FutureListener<Void>() {
129 @Override
130 public void operationComplete(Future<Void> f) throws Exception {
131 timeoutFuture.cancel(false);
132 }
133 });
134 }
135
136
137
138
139
140
141 ChannelFuture getHandshakeFuture() {
142 return handshakePromise;
143 }
144 }