查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.http.websocketx;
17  
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.ChannelOutboundInvoker;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.handler.codec.http.DefaultFullHttpResponse;
29  import io.netty.handler.codec.http.EmptyHttpHeaders;
30  import io.netty.handler.codec.http.FullHttpRequest;
31  import io.netty.handler.codec.http.FullHttpResponse;
32  import io.netty.handler.codec.http.HttpClientCodec;
33  import io.netty.handler.codec.http.HttpContentDecompressor;
34  import io.netty.handler.codec.http.HttpHeaderNames;
35  import io.netty.handler.codec.http.HttpHeaders;
36  import io.netty.handler.codec.http.HttpObject;
37  import io.netty.handler.codec.http.HttpObjectAggregator;
38  import io.netty.handler.codec.http.HttpRequestEncoder;
39  import io.netty.handler.codec.http.HttpResponse;
40  import io.netty.handler.codec.http.HttpResponseDecoder;
41  import io.netty.handler.codec.http.HttpScheme;
42  import io.netty.handler.codec.http.LastHttpContent;
43  import io.netty.util.NetUtil;
44  import io.netty.util.ReferenceCountUtil;
45  import io.netty.util.internal.ObjectUtil;
46  
47  import java.net.URI;
48  import java.nio.channels.ClosedChannelException;
49  import java.util.Locale;
50  import java.util.concurrent.Future;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
53  
54  /**
55   * Base class for web socket client handshake implementations
56   */
57  public abstract class WebSocketClientHandshaker {
58  
59      private static final String HTTP_SCHEME_PREFIX = HttpScheme.HTTP + "://";
60      private static final String HTTPS_SCHEME_PREFIX = HttpScheme.HTTPS + "://";
61      protected static final int DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS = 10000;
62  
63      private final URI uri;
64  
65      private final WebSocketVersion version;
66  
67      private volatile boolean handshakeComplete;
68  
69      private volatile long forceCloseTimeoutMillis = DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS;
70  
71      private volatile int forceCloseInit;
72  
73      private static final AtomicIntegerFieldUpdater<WebSocketClientHandshaker> FORCE_CLOSE_INIT_UPDATER =
74              AtomicIntegerFieldUpdater.newUpdater(WebSocketClientHandshaker.class, "forceCloseInit");
75  
76      private volatile boolean forceCloseComplete;
77  
78      private final String expectedSubprotocol;
79  
80      private volatile String actualSubprotocol;
81  
82      protected final HttpHeaders customHeaders;
83  
84      private final int maxFramePayloadLength;
85  
86      private final boolean absoluteUpgradeUrl;
87  
88      protected final boolean generateOriginHeader;
89  
90      /**
91       * Base constructor
92       *
93       * @param uri
94       *            URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
95       *            sent to this URL.
96       * @param version
97       *            Version of web socket specification to use to connect to the server
98       * @param subprotocol
99       *            Sub protocol request sent to the server.
100      * @param customHeaders
101      *            Map of custom headers to add to the client request
102      * @param maxFramePayloadLength
103      *            Maximum length of a frame's payload
104      */
105     protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
106                                         HttpHeaders customHeaders, int maxFramePayloadLength) {
107         this(uri, version, subprotocol, customHeaders, maxFramePayloadLength, DEFAULT_FORCE_CLOSE_TIMEOUT_MILLIS);
108     }
109 
110     /**
111      * Base constructor
112      *
113      * @param uri
114      *            URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
115      *            sent to this URL.
116      * @param version
117      *            Version of web socket specification to use to connect to the server
118      * @param subprotocol
119      *            Sub protocol request sent to the server.
120      * @param customHeaders
121      *            Map of custom headers to add to the client request
122      * @param maxFramePayloadLength
123      *            Maximum length of a frame's payload
124      * @param forceCloseTimeoutMillis
125      *            Close the connection if it was not closed by the server after timeout specified
126      */
127     protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
128                                         HttpHeaders customHeaders, int maxFramePayloadLength,
129                                         long forceCloseTimeoutMillis) {
130         this(uri, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis, false);
131     }
132 
133     /**
134      * Base constructor
135      *
136      * @param uri
137      *            URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
138      *            sent to this URL.
139      * @param version
140      *            Version of web socket specification to use to connect to the server
141      * @param subprotocol
142      *            Sub protocol request sent to the server.
143      * @param customHeaders
144      *            Map of custom headers to add to the client request
145      * @param maxFramePayloadLength
146      *            Maximum length of a frame's payload
147      * @param forceCloseTimeoutMillis
148      *            Close the connection if it was not closed by the server after timeout specified
149      * @param  absoluteUpgradeUrl
150      *            Use an absolute url for the Upgrade request, typically when connecting through an HTTP proxy over
151      *            clear HTTP
152      */
153     protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
154                                         HttpHeaders customHeaders, int maxFramePayloadLength,
155                                         long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl) {
156         this(uri, version, subprotocol, customHeaders, maxFramePayloadLength, forceCloseTimeoutMillis,
157                 absoluteUpgradeUrl, true);
158     }
159 
160     /**
161      * Base constructor
162      *
163      * @param uri
164      *            URL for web socket communications. e.g "ws://myhost.com/mypath". Subsequent web socket frames will be
165      *            sent to this URL.
166      * @param version
167      *            Version of web socket specification to use to connect to the server
168      * @param subprotocol
169      *            Sub protocol request sent to the server.
170      * @param customHeaders
171      *            Map of custom headers to add to the client request
172      * @param maxFramePayloadLength
173      *            Maximum length of a frame's payload
174      * @param forceCloseTimeoutMillis
175      *            Close the connection if it was not closed by the server after timeout specified
176      * @param  absoluteUpgradeUrl
177      *            Use an absolute url for the Upgrade request, typically when connecting through an HTTP proxy over
178      *            clear HTTP
179      * @param generateOriginHeader
180      *            Allows to generate the `Origin`|`Sec-WebSocket-Origin` header value for handshake request
181      *            according to the given webSocketURL
182      */
183     protected WebSocketClientHandshaker(URI uri, WebSocketVersion version, String subprotocol,
184             HttpHeaders customHeaders, int maxFramePayloadLength,
185             long forceCloseTimeoutMillis, boolean absoluteUpgradeUrl, boolean generateOriginHeader) {
186         this.uri = uri;
187         this.version = version;
188         expectedSubprotocol = subprotocol;
189         this.customHeaders = customHeaders;
190         this.maxFramePayloadLength = maxFramePayloadLength;
191         this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
192         this.absoluteUpgradeUrl = absoluteUpgradeUrl;
193         this.generateOriginHeader = generateOriginHeader;
194     }
195 
196     /**
197      * Returns the URI to the web socket. e.g. "ws://myhost.com/path"
198      */
199     public URI uri() {
200         return uri;
201     }
202 
203     /**
204      * Version of the web socket specification that is being used
205      */
206     public WebSocketVersion version() {
207         return version;
208     }
209 
210     /**
211      * Returns the max length for any frame's payload
212      */
213     public int maxFramePayloadLength() {
214         return maxFramePayloadLength;
215     }
216 
217     /**
218      * Flag to indicate if the opening handshake is complete
219      */
220     public boolean isHandshakeComplete() {
221         return handshakeComplete;
222     }
223 
224     private void setHandshakeComplete() {
225         handshakeComplete = true;
226     }
227 
228     /**
229      * Returns the CSV of requested subprotocol(s) sent to the server as specified in the constructor
230      */
231     public String expectedSubprotocol() {
232         return expectedSubprotocol;
233     }
234 
235     /**
236      * Returns the subprotocol response sent by the server. Only available after end of handshake.
237      * Null if no subprotocol was requested or confirmed by the server.
238      */
239     public String actualSubprotocol() {
240         return actualSubprotocol;
241     }
242 
243     private void setActualSubprotocol(String actualSubprotocol) {
244         this.actualSubprotocol = actualSubprotocol;
245     }
246 
247     public long forceCloseTimeoutMillis() {
248         return forceCloseTimeoutMillis;
249     }
250 
251     /**
252      * Flag to indicate if the closing handshake was initiated because of timeout.
253      * For testing only.
254      */
255     protected boolean isForceCloseComplete() {
256         return forceCloseComplete;
257     }
258 
259     /**
260      * Sets timeout to close the connection if it was not closed by the server.
261      *
262      * @param forceCloseTimeoutMillis
263      *            Close the connection if it was not closed by the server after timeout specified
264      */
265     public WebSocketClientHandshaker setForceCloseTimeoutMillis(long forceCloseTimeoutMillis) {
266         this.forceCloseTimeoutMillis = forceCloseTimeoutMillis;
267         return this;
268     }
269 
270     /**
271      * Begins the opening handshake
272      *
273      * @param channel
274      *            Channel
275      */
276     public ChannelFuture handshake(Channel channel) {
277         ObjectUtil.checkNotNull(channel, "channel");
278         return handshake(channel, channel.newPromise());
279     }
280 
281     /**
282      * Begins the opening handshake
283      *
284      * @param channel
285      *            Channel
286      * @param promise
287      *            the {@link ChannelPromise} to be notified when the opening handshake is sent
288      */
289     public final ChannelFuture handshake(Channel channel, final ChannelPromise promise) {
290         ChannelPipeline pipeline = channel.pipeline();
291         HttpResponseDecoder decoder = pipeline.get(HttpResponseDecoder.class);
292         if (decoder == null) {
293             HttpClientCodec codec = pipeline.get(HttpClientCodec.class);
294             if (codec == null) {
295                promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
296                        "an HttpResponseDecoder or HttpClientCodec"));
297                return promise;
298             }
299         }
300 
301         if (uri.getHost() == null) {
302             if (customHeaders == null || !customHeaders.contains(HttpHeaderNames.HOST)) {
303                 promise.setFailure(new IllegalArgumentException("Cannot generate the 'host' header value," +
304                         " webSocketURI should contain host or passed through customHeaders"));
305                 return promise;
306             }
307 
308             if (generateOriginHeader && !customHeaders.contains(HttpHeaderNames.ORIGIN)) {
309                 final String originName;
310                 if (version == WebSocketVersion.V07 || version == WebSocketVersion.V08) {
311                     originName = HttpHeaderNames.SEC_WEBSOCKET_ORIGIN.toString();
312                 } else {
313                     originName = HttpHeaderNames.ORIGIN.toString();
314                 }
315 
316                 promise.setFailure(new IllegalArgumentException("Cannot generate the '" + originName + "' header" +
317                         " value, webSocketURI should contain host or disable generateOriginHeader or pass value" +
318                         " through customHeaders"));
319                 return promise;
320             }
321         }
322 
323         FullHttpRequest request = newHandshakeRequest();
324 
325         channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
326             @Override
327             public void operationComplete(ChannelFuture future) {
328                 if (future.isSuccess()) {
329                     ChannelPipeline p = future.channel().pipeline();
330                     ChannelHandlerContext ctx = p.context(HttpRequestEncoder.class);
331                     if (ctx == null) {
332                         ctx = p.context(HttpClientCodec.class);
333                     }
334                     if (ctx == null) {
335                         promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
336                                 "an HttpRequestEncoder or HttpClientCodec"));
337                         return;
338                     }
339                     p.addAfter(ctx.name(), "ws-encoder", newWebSocketEncoder());
340 
341                     promise.setSuccess();
342                 } else {
343                     promise.setFailure(future.cause());
344                 }
345             }
346         });
347         return promise;
348     }
349 
350     /**
351      * Returns a new {@link FullHttpRequest) which will be used for the handshake.
352      */
353     protected abstract FullHttpRequest newHandshakeRequest();
354 
355     /**
356      * Validates and finishes the opening handshake initiated by {@link #handshake}}.
357      *
358      * @param channel
359      *            Channel
360      * @param response
361      *            HTTP response containing the closing handshake details
362      */
363     public final void finishHandshake(Channel channel, FullHttpResponse response) {
364         verify(response);
365 
366         // Verify the subprotocol that we received from the server.
367         // This must be one of our expected subprotocols - or null/empty if we didn't want to speak a subprotocol
368         String receivedProtocol = response.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL);
369         receivedProtocol = receivedProtocol != null ? receivedProtocol.trim() : null;
370         String expectedProtocol = expectedSubprotocol != null ? expectedSubprotocol : "";
371         boolean protocolValid = false;
372 
373         if (expectedProtocol.isEmpty() && receivedProtocol == null) {
374             // No subprotocol required and none received
375             protocolValid = true;
376             setActualSubprotocol(expectedSubprotocol); // null or "" - we echo what the user requested
377         } else if (!expectedProtocol.isEmpty() && receivedProtocol != null && !receivedProtocol.isEmpty()) {
378             // We require a subprotocol and received one -> verify it
379             for (String protocol : expectedProtocol.split(",")) {
380                 if (protocol.trim().equals(receivedProtocol)) {
381                     protocolValid = true;
382                     setActualSubprotocol(receivedProtocol);
383                     break;
384                 }
385             }
386         } // else mixed cases - which are all errors
387 
388         if (!protocolValid) {
389             throw new WebSocketClientHandshakeException(String.format(
390                     "Invalid subprotocol. Actual: %s. Expected one of: %s",
391                     receivedProtocol, expectedSubprotocol), response);
392         }
393 
394         setHandshakeComplete();
395 
396         final ChannelPipeline p = channel.pipeline();
397         // Remove decompressor from pipeline if its in use
398         HttpContentDecompressor decompressor = p.get(HttpContentDecompressor.class);
399         if (decompressor != null) {
400             p.remove(decompressor);
401         }
402 
403         // Remove aggregator if present before
404         HttpObjectAggregator aggregator = p.get(HttpObjectAggregator.class);
405         if (aggregator != null) {
406             p.remove(aggregator);
407         }
408 
409         ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);
410         if (ctx == null) {
411             ctx = p.context(HttpClientCodec.class);
412             if (ctx == null) {
413                 throw new IllegalStateException("ChannelPipeline does not contain " +
414                         "an HttpRequestEncoder or HttpClientCodec");
415             }
416             final HttpClientCodec codec =  (HttpClientCodec) ctx.handler();
417             // Remove the encoder part of the codec as the user may start writing frames after this method returns.
418             codec.removeOutboundHandler();
419 
420             p.addAfter(ctx.name(), "ws-decoder", newWebsocketDecoder());
421 
422             // Delay the removal of the decoder so the user can setup the pipeline if needed to handle
423             // WebSocketFrame messages.
424             // See https://github.com/netty/netty/issues/4533
425             channel.eventLoop().execute(new Runnable() {
426                 @Override
427                 public void run() {
428                     p.remove(codec);
429                 }
430             });
431         } else {
432             if (p.get(HttpRequestEncoder.class) != null) {
433                 // Remove the encoder part of the codec as the user may start writing frames after this method returns.
434                 p.remove(HttpRequestEncoder.class);
435             }
436             final ChannelHandlerContext context = ctx;
437             p.addAfter(context.name(), "ws-decoder", newWebsocketDecoder());
438 
439             // Delay the removal of the decoder so the user can setup the pipeline if needed to handle
440             // WebSocketFrame messages.
441             // See https://github.com/netty/netty/issues/4533
442             channel.eventLoop().execute(new Runnable() {
443                 @Override
444                 public void run() {
445                     p.remove(context.handler());
446                 }
447             });
448         }
449     }
450 
451     /**
452      * Process the opening handshake initiated by {@link #handshake}}.
453      *
454      * @param channel
455      *            Channel
456      * @param response
457      *            HTTP response containing the closing handshake details
458      * @return future
459      *            the {@link ChannelFuture} which is notified once the handshake completes.
460      */
461     public final ChannelFuture processHandshake(final Channel channel, HttpResponse response) {
462         return processHandshake(channel, response, channel.newPromise());
463     }
464 
465     /**
466      * Process the opening handshake initiated by {@link #handshake}}.
467      *
468      * @param channel
469      *            Channel
470      * @param response
471      *            HTTP response containing the closing handshake details
472      * @param promise
473      *            the {@link ChannelPromise} to notify once the handshake completes.
474      * @return future
475      *            the {@link ChannelFuture} which is notified once the handshake completes.
476      */
477     public final ChannelFuture processHandshake(final Channel channel, HttpResponse response,
478                                                 final ChannelPromise promise) {
479         if (response instanceof FullHttpResponse) {
480             try {
481                 finishHandshake(channel, (FullHttpResponse) response);
482                 promise.setSuccess();
483             } catch (Throwable cause) {
484                 promise.setFailure(cause);
485             }
486         } else {
487             ChannelPipeline p = channel.pipeline();
488             ChannelHandlerContext ctx = p.context(HttpResponseDecoder.class);
489             if (ctx == null) {
490                 ctx = p.context(HttpClientCodec.class);
491                 if (ctx == null) {
492                     return promise.setFailure(new IllegalStateException("ChannelPipeline does not contain " +
493                             "an HttpResponseDecoder or HttpClientCodec"));
494                 }
495             }
496 
497             String aggregatorCtx = ctx.name();
498             // Content-Length and Transfer-Encoding must not be sent in any response with a status code of 1xx or 204.
499             if (version == WebSocketVersion.V00) {
500                 // Add aggregator and ensure we feed the HttpResponse so it is aggregated. A limit of 8192 should be
501                 // more then enough for the websockets handshake payload.
502                 aggregatorCtx = "httpAggregator";
503                 p.addAfter(ctx.name(), aggregatorCtx, new HttpObjectAggregator(8192));
504             }
505 
506             p.addAfter(aggregatorCtx, "handshaker", new ChannelInboundHandlerAdapter() {
507 
508                 private FullHttpResponse fullHttpResponse;
509 
510                 @Override
511                 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
512                     if (msg instanceof HttpObject) {
513                         try {
514                             handleHandshakeResponse(ctx, (HttpObject) msg);
515                         } finally {
516                             ReferenceCountUtil.release(msg);
517                         }
518                     } else {
519                         super.channelRead(ctx, msg);
520                     }
521                 }
522 
523                 @Override
524                 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
525                     // Remove ourself and fail the handshake promise.
526                     ctx.pipeline().remove(this);
527                     promise.setFailure(cause);
528                 }
529 
530                 @Override
531                 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
532                     try {
533                         // Fail promise if Channel was closed
534                         if (!promise.isDone()) {
535                             promise.tryFailure(new ClosedChannelException());
536                         }
537                         ctx.fireChannelInactive();
538                     } finally {
539                         releaseFullHttpResponse();
540                     }
541                 }
542 
543                 @Override
544                 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
545                     releaseFullHttpResponse();
546                 }
547 
548                 private void handleHandshakeResponse(ChannelHandlerContext ctx, HttpObject response) {
549                     if (response instanceof FullHttpResponse) {
550                         ctx.pipeline().remove(this);
551                         tryFinishHandshake((FullHttpResponse) response);
552                         return;
553                     }
554 
555                     if (response instanceof LastHttpContent) {
556                         assert fullHttpResponse != null;
557                         FullHttpResponse handshakeResponse = fullHttpResponse;
558                         fullHttpResponse = null;
559                         try {
560                             ctx.pipeline().remove(this);
561                             tryFinishHandshake(handshakeResponse);
562                         } finally {
563                             handshakeResponse.release();
564                         }
565                         return;
566                     }
567 
568                     if (response instanceof HttpResponse) {
569                         HttpResponse httpResponse = (HttpResponse) response;
570                         fullHttpResponse = new DefaultFullHttpResponse(httpResponse.protocolVersion(),
571                             httpResponse.status(), Unpooled.EMPTY_BUFFER, httpResponse.headers(),
572                             EmptyHttpHeaders.INSTANCE);
573                         if (httpResponse.decoderResult().isFailure()) {
574                             fullHttpResponse.setDecoderResult(httpResponse.decoderResult());
575                         }
576                     }
577                 }
578 
579                 private void tryFinishHandshake(FullHttpResponse fullHttpResponse) {
580                     try {
581                         finishHandshake(channel, fullHttpResponse);
582                         promise.setSuccess();
583                     } catch (Throwable cause) {
584                         promise.setFailure(cause);
585                     }
586                 }
587 
588                 private void releaseFullHttpResponse() {
589                     if (fullHttpResponse != null) {
590                         fullHttpResponse.release();
591                         fullHttpResponse = null;
592                     }
593                 }
594             });
595             try {
596                 ctx.fireChannelRead(ReferenceCountUtil.retain(response));
597             } catch (Throwable cause) {
598                 promise.setFailure(cause);
599             }
600         }
601         return promise;
602     }
603 
604     /**
605      * Verify the {@link FullHttpResponse} and throws a {@link WebSocketHandshakeException} if something is wrong.
606      */
607     protected abstract void verify(FullHttpResponse response);
608 
609     /**
610      * Returns the decoder to use after handshake is complete.
611      */
612     protected abstract WebSocketFrameDecoder newWebsocketDecoder();
613 
614     /**
615      * Returns the encoder to use after the handshake is complete.
616      */
617     protected abstract WebSocketFrameEncoder newWebSocketEncoder();
618 
619     /**
620      * Performs the closing handshake.
621      *
622      * When called from within a {@link ChannelHandler} you most likely want to use
623      * {@link #close(ChannelHandlerContext, CloseWebSocketFrame)}.
624      *
625      * @param channel
626      *            Channel
627      * @param frame
628      *            Closing Frame that was received
629      */
630     public ChannelFuture close(Channel channel, CloseWebSocketFrame frame) {
631         ObjectUtil.checkNotNull(channel, "channel");
632         return close(channel, frame, channel.newPromise());
633     }
634 
635     /**
636      * Performs the closing handshake
637      *
638      * When called from within a {@link ChannelHandler} you most likely want to use
639      * {@link #close(ChannelHandlerContext, CloseWebSocketFrame, ChannelPromise)}.
640      *
641      * @param channel
642      *            Channel
643      * @param frame
644      *            Closing Frame that was received
645      * @param promise
646      *            the {@link ChannelPromise} to be notified when the closing handshake is done
647      */
648     public ChannelFuture close(Channel channel, CloseWebSocketFrame frame, ChannelPromise promise) {
649         ObjectUtil.checkNotNull(channel, "channel");
650         return close0(channel, channel, frame, promise);
651     }
652 
653     /**
654      * Performs the closing handshake
655      *
656      * @param ctx
657      *            the {@link ChannelHandlerContext} to use.
658      * @param frame
659      *            Closing Frame that was received
660      */
661     public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame) {
662         ObjectUtil.checkNotNull(ctx, "ctx");
663         return close(ctx, frame, ctx.newPromise());
664     }
665 
666     /**
667      * Performs the closing handshake
668      *
669      * @param ctx
670      *            the {@link ChannelHandlerContext} to use.
671      * @param frame
672      *            Closing Frame that was received
673      * @param promise
674      *            the {@link ChannelPromise} to be notified when the closing handshake is done
675      */
676     public ChannelFuture close(ChannelHandlerContext ctx, CloseWebSocketFrame frame, ChannelPromise promise) {
677         ObjectUtil.checkNotNull(ctx, "ctx");
678         return close0(ctx, ctx.channel(), frame, promise);
679     }
680 
681     private ChannelFuture close0(final ChannelOutboundInvoker invoker, final Channel channel,
682                                  CloseWebSocketFrame frame, ChannelPromise promise) {
683         invoker.writeAndFlush(frame, promise);
684         final long forceCloseTimeoutMillis = this.forceCloseTimeoutMillis;
685         final WebSocketClientHandshaker handshaker = this;
686         if (forceCloseTimeoutMillis <= 0 || !channel.isActive() || forceCloseInit != 0) {
687             return promise;
688         }
689 
690         promise.addListener(new ChannelFutureListener() {
691             @Override
692             public void operationComplete(ChannelFuture future) {
693                 // If flush operation failed, there is no reason to expect
694                 // a server to receive CloseFrame. Thus this should be handled
695                 // by the application separately.
696                 // Also, close might be called twice from different threads.
697                 if (future.isSuccess() && channel.isActive() &&
698                         FORCE_CLOSE_INIT_UPDATER.compareAndSet(handshaker, 0, 1)) {
699                     final Future<?> forceCloseFuture = channel.eventLoop().schedule(new Runnable() {
700                         @Override
701                         public void run() {
702                             if (channel.isActive()) {
703                                 invoker.close();
704                                 forceCloseComplete = true;
705                             }
706                         }
707                     }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);
708 
709                     channel.closeFuture().addListener(new ChannelFutureListener() {
710                         @Override
711                         public void operationComplete(ChannelFuture future) throws Exception {
712                             forceCloseFuture.cancel(false);
713                         }
714                     });
715                 }
716             }
717         });
718         return promise;
719     }
720 
721     /**
722      * Return the constructed raw path for the give {@link URI}.
723      */
724     protected String upgradeUrl(URI wsURL) {
725         if (absoluteUpgradeUrl) {
726             return wsURL.toString();
727         }
728 
729         String path = wsURL.getRawPath();
730         path = path == null || path.isEmpty() ? "/" : path;
731         String query = wsURL.getRawQuery();
732         return query != null && !query.isEmpty() ? path + '?' + query : path;
733     }
734 
735     static CharSequence websocketHostValue(URI wsURL) {
736         int port = wsURL.getPort();
737         if (port == -1) {
738             return wsURL.getHost();
739         }
740         String host = wsURL.getHost();
741         String scheme = wsURL.getScheme();
742         if (port == HttpScheme.HTTP.port()) {
743             return HttpScheme.HTTP.name().contentEquals(scheme)
744                     || WebSocketScheme.WS.name().contentEquals(scheme) ?
745                     host : NetUtil.toSocketAddressString(host, port);
746         }
747         if (port == HttpScheme.HTTPS.port()) {
748             return HttpScheme.HTTPS.name().contentEquals(scheme)
749                     || WebSocketScheme.WSS.name().contentEquals(scheme) ?
750                     host : NetUtil.toSocketAddressString(host, port);
751         }
752 
753         // if the port is not standard (80/443) its needed to add the port to the header.
754         // See https://tools.ietf.org/html/rfc6454#section-6.2
755         return NetUtil.toSocketAddressString(host, port);
756     }
757 
758     static CharSequence websocketOriginValue(URI wsURL) {
759         String scheme = wsURL.getScheme();
760         final String schemePrefix;
761         int port = wsURL.getPort();
762         final int defaultPort;
763         if (WebSocketScheme.WSS.name().contentEquals(scheme)
764             || HttpScheme.HTTPS.name().contentEquals(scheme)
765             || (scheme == null && port == WebSocketScheme.WSS.port())) {
766 
767             schemePrefix = HTTPS_SCHEME_PREFIX;
768             defaultPort = WebSocketScheme.WSS.port();
769         } else {
770             schemePrefix = HTTP_SCHEME_PREFIX;
771             defaultPort = WebSocketScheme.WS.port();
772         }
773 
774         // Convert uri-host to lower case (by RFC 6454, chapter 4 "Origin of a URI")
775         String host = wsURL.getHost().toLowerCase(Locale.US);
776 
777         if (port != defaultPort && port != -1) {
778             // if the port is not standard (80/443) its needed to add the port to the header.
779             // See https://tools.ietf.org/html/rfc6454#section-6.2
780             return schemePrefix + NetUtil.toSocketAddressString(host, port);
781         }
782         return schemePrefix + host;
783     }
784 }