查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.ssl;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.CompositeByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractCoalescingBufferQueue;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelException;
27  import io.netty.channel.ChannelFuture;
28  import io.netty.channel.ChannelFutureListener;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelInboundHandler;
31  import io.netty.channel.ChannelOption;
32  import io.netty.channel.ChannelOutboundBuffer;
33  import io.netty.channel.ChannelOutboundHandler;
34  import io.netty.channel.ChannelPipeline;
35  import io.netty.channel.ChannelPromise;
36  import io.netty.channel.unix.UnixChannel;
37  import io.netty.handler.codec.ByteToMessageDecoder;
38  import io.netty.handler.codec.DecoderException;
39  import io.netty.handler.codec.UnsupportedMessageTypeException;
40  import io.netty.util.ReferenceCountUtil;
41  import io.netty.util.concurrent.DefaultPromise;
42  import io.netty.util.concurrent.EventExecutor;
43  import io.netty.util.concurrent.Future;
44  import io.netty.util.concurrent.FutureListener;
45  import io.netty.util.concurrent.ImmediateExecutor;
46  import io.netty.util.concurrent.Promise;
47  import io.netty.util.concurrent.PromiseNotifier;
48  import io.netty.util.internal.ObjectUtil;
49  import io.netty.util.internal.PlatformDependent;
50  import io.netty.util.internal.ThrowableUtil;
51  import io.netty.util.internal.UnstableApi;
52  import io.netty.util.internal.logging.InternalLogger;
53  import io.netty.util.internal.logging.InternalLoggerFactory;
54  
55  import java.io.IOException;
56  import java.net.SocketAddress;
57  import java.nio.ByteBuffer;
58  import java.nio.channels.ClosedChannelException;
59  import java.nio.channels.DatagramChannel;
60  import java.nio.channels.SocketChannel;
61  import java.util.List;
62  import java.util.concurrent.Executor;
63  import java.util.concurrent.RejectedExecutionException;
64  import java.util.concurrent.TimeUnit;
65  import java.util.regex.Pattern;
66  
67  import javax.net.ssl.SSLEngine;
68  import javax.net.ssl.SSLEngineResult;
69  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70  import javax.net.ssl.SSLEngineResult.Status;
71  import javax.net.ssl.SSLException;
72  import javax.net.ssl.SSLHandshakeException;
73  import javax.net.ssl.SSLSession;
74  
75  import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
76  import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
77  import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
78  import static io.netty.util.internal.ObjectUtil.checkNotNull;
79  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
80  
81  /**
82   * Adds <a href="https://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
83   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
84   * to the <strong>"SecureChat"</strong> example in the distribution or the web
85   * site for the detailed usage.
86   *
87   * <h3>Beginning the handshake</h3>
88   * <p>
89   * Beside using the handshake {@link ChannelFuture} to get notified about the completion of the handshake it's
90   * also possible to detect it by implement the
91   * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
92   * method and check for a {@link SslHandshakeCompletionEvent}.
93   *
94   * <h3>Handshake</h3>
95   * <p>
96   * The handshake will be automatically issued for you once the {@link Channel} is active and
97   * {@link SSLEngine#getUseClientMode()} returns {@code true}.
98   * So no need to bother with it by your self.
99   *
100  * <h3>Closing the session</h3>
101  * <p>
102  * To close the SSL session, the {@link #closeOutbound()} method should be
103  * called to send the {@code close_notify} message to the remote peer. One
104  * exception is when you close the {@link Channel} - {@link SslHandler}
105  * intercepts the close request and send the {@code close_notify} message
106  * before the channel closure automatically.  Once the SSL session is closed,
107  * it is not reusable, and consequently you should create a new
108  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
109  * following section.
110  *
111  * <h3>Restarting the session</h3>
112  * <p>
113  * To restart the SSL session, you must remove the existing closed
114  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
115  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
116  * and start the handshake process as described in the first section.
117  *
118  * <h3>Implementing StartTLS</h3>
119  * <p>
120  * <a href="https://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
121  * communication pattern that secures the wire in the middle of the plaintext
122  * connection.  Please note that it is different from SSL &middot; TLS, that
123  * secures the wire from the beginning of the connection.  Typically, StartTLS
124  * is composed of three steps:
125  * <ol>
126  * <li>Client sends a StartTLS request to server.</li>
127  * <li>Server sends a StartTLS response to client.</li>
128  * <li>Client begins SSL handshake.</li>
129  * </ol>
130  * If you implement a server, you need to:
131  * <ol>
132  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
133  *     to {@code true},</li>
134  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
135  * <li>write a StartTLS response.</li>
136  * </ol>
137  * Please note that you must insert {@link SslHandler} <em>before</em> sending
138  * the StartTLS response.  Otherwise the client can send begin SSL handshake
139  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
140  * data corruption.
141  * <p>
142  * The client-side implementation is much simpler.
143  * <ol>
144  * <li>Write a StartTLS request,</li>
145  * <li>wait for the StartTLS response,</li>
146  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
147  *     to {@code false},</li>
148  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
149  * <li>Initiate SSL handshake.</li>
150  * </ol>
151  *
152  * <h3>Known issues</h3>
153  * <p>
154  * Because of a known issue with the current implementation of the SslEngine that comes
155  * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
156  * <p>
157  * So if you are affected you can workaround this problem by adjust the cache settings
158  * like shown below:
159  *
160  * <pre>
161  *     SslContext context = ...;
162  *     context.getServerSessionContext().setSessionCacheSize(someSaneSize);
163  *     context.getServerSessionContext().setSessionTime(someSameTimeout);
164  * </pre>
165  * <p>
166  * What values to use here depends on the nature of your application and should be set
167  * based on monitoring and debugging of it.
168  * For more details see
169  * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
170  */
171 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
172     private static final InternalLogger logger =
173             InternalLoggerFactory.getInstance(SslHandler.class);
174     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
175             "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
176     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
177             "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
178     private static final int STATE_SENT_FIRST_MESSAGE = 1;
179     private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
180     private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
181     private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
182     /**
183      * Set by wrap*() methods when something is produced.
184      * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
185      */
186     private static final int STATE_NEEDS_FLUSH = 1 << 4;
187     private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
188     private static final int STATE_CLOSE_NOTIFY = 1 << 6;
189     private static final int STATE_PROCESS_TASK = 1 << 7;
190     /**
191      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
192      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
193      */
194     private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
195     private static final int STATE_UNWRAP_REENTRY = 1 << 9;
196 
197     /**
198      * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
199      * allowed by the TLS RFC.
200      */
201     private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
202 
203     private enum SslEngineType {
204         TCNATIVE(true, COMPOSITE_CUMULATOR) {
205             @Override
206             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
207                 int nioBufferCount = in.nioBufferCount();
208                 int writerIndex = out.writerIndex();
209                 final SSLEngineResult result;
210                 if (nioBufferCount > 1) {
211                     /*
212                      * If {@link OpenSslEngine} is in use,
213                      * we can use a special {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} method
214                      * that accepts multiple {@link ByteBuffer}s without additional memory copies.
215                      */
216                     ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
217                     try {
218                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
219                         result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
220                     } finally {
221                         handler.singleBuffer[0] = null;
222                     }
223                 } else {
224                     result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
225                         toByteBuffer(out, writerIndex, out.writableBytes()));
226                 }
227                 out.writerIndex(writerIndex + result.bytesProduced());
228                 return result;
229             }
230 
231             @Override
232             ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
233                                        int pendingBytes, int numComponents) {
234                 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
235                         .calculateOutNetBufSize(pendingBytes, numComponents));
236             }
237 
238             @Override
239             int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
240                 return ((ReferenceCountedOpenSslEngine) handler.engine)
241                         .calculateMaxLengthForWrap(pendingBytes, numComponents);
242             }
243 
244             @Override
245             int calculatePendingData(SslHandler handler, int guess) {
246                 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
247                 return sslPending > 0 ? sslPending : guess;
248             }
249 
250             @Override
251             boolean jdkCompatibilityMode(SSLEngine engine) {
252                 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
253             }
254         },
255         CONSCRYPT(true, COMPOSITE_CUMULATOR) {
256             @Override
257             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
258                 int nioBufferCount = in.nioBufferCount();
259                 int writerIndex = out.writerIndex();
260                 final SSLEngineResult result;
261                 if (nioBufferCount > 1) {
262                     /*
263                      * Use a special unwrap method without additional memory copies.
264                      */
265                     try {
266                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
267                         result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
268                                 in.nioBuffers(in.readerIndex(), len),
269                                 handler.singleBuffer);
270                     } finally {
271                         handler.singleBuffer[0] = null;
272                     }
273                 } else {
274                     result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
275                             toByteBuffer(out, writerIndex, out.writableBytes()));
276                 }
277                 out.writerIndex(writerIndex + result.bytesProduced());
278                 return result;
279             }
280 
281             @Override
282             ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
283                                        int pendingBytes, int numComponents) {
284                 return allocator.directBuffer(
285                         ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
286             }
287 
288             @Override
289             int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
290                 return ((ConscryptAlpnSslEngine) handler.engine)
291                         .calculateRequiredOutBufSpace(pendingBytes, numComponents);
292             }
293 
294             @Override
295             int calculatePendingData(SslHandler handler, int guess) {
296                 return guess;
297             }
298 
299             @Override
300             boolean jdkCompatibilityMode(SSLEngine engine) {
301                 return true;
302             }
303         },
304         JDK(false, MERGE_CUMULATOR) {
305             @Override
306             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
307                 int writerIndex = out.writerIndex();
308                 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
309                 int position = inNioBuffer.position();
310                 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
311                     toByteBuffer(out, writerIndex, out.writableBytes()));
312                 out.writerIndex(writerIndex + result.bytesProduced());
313 
314                 // This is a workaround for a bug in Android 5.0. Android 5.0 does not correctly update the
315                 // SSLEngineResult.bytesConsumed() in some cases and just return 0.
316                 //
317                 // See:
318                 //     - https://android-review.googlesource.com/c/platform/external/conscrypt/+/122080
319                 //     - https://github.com/netty/netty/issues/7758
320                 if (result.bytesConsumed() == 0) {
321                     int consumed = inNioBuffer.position() - position;
322                     if (consumed != result.bytesConsumed()) {
323                         // Create a new SSLEngineResult with the correct bytesConsumed().
324                         return new SSLEngineResult(
325                                 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
326                     }
327                 }
328                 return result;
329             }
330 
331             @Override
332             ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
333                                        int pendingBytes, int numComponents) {
334                 // For JDK we don't have a good source for the max wrap overhead. We need at least one packet buffer
335                 // size, but may be able to fit more in based on the total requested.
336                 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
337             }
338 
339             @Override
340             int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
341                 // As for the JDK SSLEngine we always need to operate on buffer space required by the SSLEngine
342                 // (normally ~16KB). This is required even if the amount of data to encrypt is very small. Use heap
343                 // buffers to reduce the native memory usage.
344                 //
345                 // Beside this the JDK SSLEngine also (as of today) will do an extra heap to direct buffer copy
346                 // if a direct buffer is used as its internals operate on byte[].
347                 return handler.engine.getSession().getPacketBufferSize();
348             }
349 
350             @Override
351             int calculatePendingData(SslHandler handler, int guess) {
352                 return guess;
353             }
354 
355             @Override
356             boolean jdkCompatibilityMode(SSLEngine engine) {
357                 return true;
358             }
359         };
360 
361         static SslEngineType forEngine(SSLEngine engine) {
362             return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
363                    engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
364         }
365 
366         SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
367             this.wantsDirectBuffer = wantsDirectBuffer;
368             this.cumulator = cumulator;
369         }
370 
371         abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
372 
373         abstract int calculatePendingData(SslHandler handler, int guess);
374 
375         abstract boolean jdkCompatibilityMode(SSLEngine engine);
376 
377         abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
378                                             int pendingBytes, int numComponents);
379 
380         abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
381 
382         // BEGIN Platform-dependent flags
383 
384         /**
385          * {@code true} if and only if {@link SSLEngine} expects a direct buffer and so if a heap buffer
386          * is given will make an extra memory copy.
387          */
388         final boolean wantsDirectBuffer;
389 
390         // END Platform-dependent flags
391 
392         /**
393          * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
394          * one {@link ByteBuffer}.
395          *
396          * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
397          * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
398          * and which does not need to do extra memory copies.
399          */
400         final Cumulator cumulator;
401     }
402 
403     private volatile ChannelHandlerContext ctx;
404     private final SSLEngine engine;
405     private final SslEngineType engineType;
406     private final Executor delegatedTaskExecutor;
407     private final boolean jdkCompatibilityMode;
408 
409     /**
410      * Used if {@link SSLEngine#wrap(ByteBuffer[], ByteBuffer)} and {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer[])}
411      * should be called with a {@link ByteBuf} that is only backed by one {@link ByteBuffer} to reduce the object
412      * creation.
413      */
414     private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
415 
416     private final boolean startTls;
417 
418     private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
419     private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
420 
421     private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
422     private Promise<Channel> handshakePromise = new LazyChannelPromise();
423     private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
424 
425     private int packetLength;
426     private short state;
427 
428     private volatile long handshakeTimeoutMillis = 10000;
429     private volatile long closeNotifyFlushTimeoutMillis = 3000;
430     private volatile long closeNotifyReadTimeoutMillis;
431     volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
432 
433     /**
434      * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
435      *
436      * @param engine  the {@link SSLEngine} this handler will use
437      */
438     public SslHandler(SSLEngine engine) {
439         this(engine, false);
440     }
441 
442     /**
443      * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
444      *
445      * @param engine    the {@link SSLEngine} this handler will use
446      * @param startTls  {@code true} if the first write request shouldn't be
447      *                  encrypted by the {@link SSLEngine}
448      */
449     public SslHandler(SSLEngine engine, boolean startTls) {
450         this(engine, startTls, ImmediateExecutor.INSTANCE);
451     }
452 
453     /**
454      * Creates a new instance.
455      *
456      * @param engine  the {@link SSLEngine} this handler will use
457      * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
458      *                              {@link SSLEngine#getDelegatedTask()}.
459      */
460     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
461         this(engine, false, delegatedTaskExecutor);
462     }
463 
464     /**
465      * Creates a new instance.
466      *
467      * @param engine  the {@link SSLEngine} this handler will use
468      * @param startTls  {@code true} if the first write request shouldn't be
469      *                  encrypted by the {@link SSLEngine}
470      * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
471      *                              {@link SSLEngine#getDelegatedTask()}.
472      */
473     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
474         this.engine = ObjectUtil.checkNotNull(engine, "engine");
475         this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
476         engineType = SslEngineType.forEngine(engine);
477         this.startTls = startTls;
478         this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
479         setCumulator(engineType.cumulator);
480     }
481 
482     public long getHandshakeTimeoutMillis() {
483         return handshakeTimeoutMillis;
484     }
485 
486     public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
487         checkNotNull(unit, "unit");
488         setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
489     }
490 
491     public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
492         this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
493     }
494 
495     /**
496      * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
497      * <p>
498      * This value will partition data which is passed to write
499      * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
500      * <ul>
501      * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
502      * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
503      * <li>If {@code wrapDataSize > data size}  Else if {@code wrapDataSize <= data size} then we will divide the data
504      * into chunks of {@code wrapDataSize} when writing.</li>
505      * </ul>
506      * <p>
507      * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
508      * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
509      * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
510      * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
511      * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
512      * @param wrapDataSize the number of bytes which will be passed to each
513      *      {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
514      */
515     @UnstableApi
516     public final void setWrapDataSize(int wrapDataSize) {
517         this.wrapDataSize = wrapDataSize;
518     }
519 
520     /**
521      * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
522      */
523     @Deprecated
524     public long getCloseNotifyTimeoutMillis() {
525         return getCloseNotifyFlushTimeoutMillis();
526     }
527 
528     /**
529      * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
530      */
531     @Deprecated
532     public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
533         setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
534     }
535 
536     /**
537      * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
538      */
539     @Deprecated
540     public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
541         setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
542     }
543 
544     /**
545      * Gets the timeout for flushing the close_notify that was triggered by closing the
546      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
547      * forcibly.
548      */
549     public final long getCloseNotifyFlushTimeoutMillis() {
550         return closeNotifyFlushTimeoutMillis;
551     }
552 
553     /**
554      * Sets the timeout for flushing the close_notify that was triggered by closing the
555      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
556      * forcibly.
557      */
558     public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
559         setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
560     }
561 
562     /**
563      * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
564      */
565     public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
566         this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
567                 "closeNotifyFlushTimeoutMillis");
568     }
569 
570     /**
571      * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
572      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
573      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
574      */
575     public final long getCloseNotifyReadTimeoutMillis() {
576         return closeNotifyReadTimeoutMillis;
577     }
578 
579     /**
580      * Sets the timeout  for receiving the response for the close_notify that was triggered by closing the
581      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
582      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
583      */
584     public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
585         setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
586     }
587 
588     /**
589      * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
590      */
591     public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
592         this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
593                 "closeNotifyReadTimeoutMillis");
594     }
595 
596     /**
597      * Returns the {@link SSLEngine} which is used by this handler.
598      */
599     public SSLEngine engine() {
600         return engine;
601     }
602 
603     /**
604      * Returns the name of the current application-level protocol.
605      *
606      * @return the protocol name or {@code null} if application-level protocol has not been negotiated
607      */
608     public String applicationProtocol() {
609         SSLEngine engine = engine();
610         if (!(engine instanceof ApplicationProtocolAccessor)) {
611             return null;
612         }
613 
614         return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
615     }
616 
617     /**
618      * Returns a {@link Future} that will get notified once the current TLS handshake completes.
619      *
620      * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
621      *         The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
622      */
623     public Future<Channel> handshakeFuture() {
624         return handshakePromise;
625     }
626 
627     /**
628      * Use {@link #closeOutbound()}
629      */
630     @Deprecated
631     public ChannelFuture close() {
632         return closeOutbound();
633     }
634 
635     /**
636      * Use {@link #closeOutbound(ChannelPromise)}
637      */
638     @Deprecated
639     public ChannelFuture close(ChannelPromise promise) {
640         return closeOutbound(promise);
641     }
642 
643     /**
644      * Sends an SSL {@code close_notify} message to the specified channel and
645      * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
646      * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
647      * {@link ChannelHandlerContext#close()}
648      */
649     public ChannelFuture closeOutbound() {
650         return closeOutbound(ctx.newPromise());
651     }
652 
653     /**
654      * Sends an SSL {@code close_notify} message to the specified channel and
655      * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
656      * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
657      * {@link ChannelHandlerContext#close()}
658      */
659     public ChannelFuture closeOutbound(final ChannelPromise promise) {
660         final ChannelHandlerContext ctx = this.ctx;
661         if (ctx.executor().inEventLoop()) {
662             closeOutbound0(promise);
663         } else {
664             ctx.executor().execute(new Runnable() {
665                 @Override
666                 public void run() {
667                     closeOutbound0(promise);
668                 }
669             });
670         }
671         return promise;
672     }
673 
674     private void closeOutbound0(ChannelPromise promise) {
675         setState(STATE_OUTBOUND_CLOSED);
676         engine.closeOutbound();
677         try {
678             flush(ctx, promise);
679         } catch (Exception e) {
680             if (!promise.tryFailure(e)) {
681                 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
682             }
683         }
684     }
685 
686     /**
687      * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
688      *
689      * This method will return the same {@link Future} all the time.
690      *
691      * @see SSLEngine
692      */
693     public Future<Channel> sslCloseFuture() {
694         return sslClosePromise;
695     }
696 
697     @Override
698     public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
699         try {
700             if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
701                 // Check if queue is not empty first because create a new ChannelException is expensive
702                 pendingUnencryptedWrites.releaseAndFailAll(ctx,
703                   new ChannelException("Pending write on removal of SslHandler"));
704             }
705             pendingUnencryptedWrites = null;
706 
707             SSLException cause = null;
708 
709             // If the handshake or SSLEngine closure is not done yet we should fail corresponding promise and
710             // notify the rest of the
711             // pipeline.
712             if (!handshakePromise.isDone()) {
713                 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
714                 if (handshakePromise.tryFailure(cause)) {
715                     ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
716                 }
717             }
718             if (!sslClosePromise.isDone()) {
719                 if (cause == null) {
720                     cause = new SSLException("SslHandler removed before SSLEngine was closed");
721                 }
722                 notifyClosePromise(cause);
723             }
724         } finally {
725             ReferenceCountUtil.release(engine);
726         }
727     }
728 
729     @Override
730     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
731         ctx.bind(localAddress, promise);
732     }
733 
734     @Override
735     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
736                         ChannelPromise promise) throws Exception {
737         ctx.connect(remoteAddress, localAddress, promise);
738     }
739 
740     @Override
741     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
742         ctx.deregister(promise);
743     }
744 
745     @Override
746     public void disconnect(final ChannelHandlerContext ctx,
747                            final ChannelPromise promise) throws Exception {
748         closeOutboundAndChannel(ctx, promise, true);
749     }
750 
751     @Override
752     public void close(final ChannelHandlerContext ctx,
753                       final ChannelPromise promise) throws Exception {
754         closeOutboundAndChannel(ctx, promise, false);
755     }
756 
757     @Override
758     public void read(ChannelHandlerContext ctx) throws Exception {
759         if (!handshakePromise.isDone()) {
760             setState(STATE_READ_DURING_HANDSHAKE);
761         }
762 
763         ctx.read();
764     }
765 
766     private static IllegalStateException newPendingWritesNullException() {
767         return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
768     }
769 
770     @Override
771     public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
772         if (!(msg instanceof ByteBuf)) {
773             UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
774             ReferenceCountUtil.safeRelease(msg);
775             promise.setFailure(exception);
776         } else if (pendingUnencryptedWrites == null) {
777             ReferenceCountUtil.safeRelease(msg);
778             promise.setFailure(newPendingWritesNullException());
779         } else {
780             pendingUnencryptedWrites.add((ByteBuf) msg, promise);
781         }
782     }
783 
784     @Override
785     public void flush(ChannelHandlerContext ctx) throws Exception {
786         // Do not encrypt the first write request if this handler is
787         // created with startTLS flag turned on.
788         if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
789             setState(STATE_SENT_FIRST_MESSAGE);
790             pendingUnencryptedWrites.writeAndRemoveAll(ctx);
791             forceFlush(ctx);
792             // Explicit start handshake processing once we send the first message. This will also ensure
793             // we will schedule the timeout if needed.
794             startHandshakeProcessing(true);
795             return;
796         }
797 
798         if (isStateSet(STATE_PROCESS_TASK)) {
799             return;
800         }
801 
802         try {
803             wrapAndFlush(ctx);
804         } catch (Throwable cause) {
805             setHandshakeFailure(ctx, cause);
806             PlatformDependent.throwException(cause);
807         }
808     }
809 
810     private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
811         if (pendingUnencryptedWrites.isEmpty()) {
812             // It's important to NOT use a voidPromise here as the user
813             // may want to add a ChannelFutureListener to the ChannelPromise later.
814             //
815             // See https://github.com/netty/netty/issues/3364
816             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
817         }
818         if (!handshakePromise.isDone()) {
819             setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
820         }
821         try {
822             wrap(ctx, false);
823         } finally {
824             // We may have written some parts of data before an exception was thrown so ensure we always flush.
825             // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
826             forceFlush(ctx);
827         }
828     }
829 
830     // This method will not call setHandshakeFailure(...) !
831     private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
832         ByteBuf out = null;
833         ByteBufAllocator alloc = ctx.alloc();
834         try {
835             final int wrapDataSize = this.wrapDataSize;
836             // Only continue to loop if the handler was not removed in the meantime.
837             // See https://github.com/netty/netty/issues/5860
838             outer: while (!ctx.isRemoved()) {
839                 ChannelPromise promise = ctx.newPromise();
840                 ByteBuf buf = wrapDataSize > 0 ?
841                         pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
842                         pendingUnencryptedWrites.removeFirst(promise);
843                 if (buf == null) {
844                     break;
845                 }
846 
847                 SSLEngineResult result;
848 
849                 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
850                     // If we pulled a buffer larger than the supported packet size, we can slice it up and iteratively,
851                     // encrypting multiple packets into a single larger buffer. This substantially saves on allocations
852                     // for large responses. Here we estimate how large of a buffer we need. If we overestimate a bit,
853                     // that's fine. If we underestimate, we'll simply re-enqueue the remaining buffer and get it on the
854                     // next outer loop.
855                     int readableBytes = buf.readableBytes();
856                     int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
857                     if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
858                         numPackets += 1;
859                     }
860 
861                     if (out == null) {
862                         out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
863                     }
864                     result = wrapMultiple(alloc, engine, buf, out);
865                 } else {
866                     if (out == null) {
867                         out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
868                     }
869                     result = wrap(alloc, engine, buf, out);
870                 }
871 
872                 if (buf.isReadable()) {
873                     pendingUnencryptedWrites.addFirst(buf, promise);
874                     // When we add the buffer/promise pair back we need to be sure we don't complete the promise
875                     // later. We only complete the promise if the buffer is completely consumed.
876                     promise = null;
877                 } else {
878                     buf.release();
879                 }
880 
881                 // We need to write any data before we invoke any methods which may trigger re-entry, otherwise
882                 // writes may occur out of order and TLS sequencing may be off (e.g. SSLV3_ALERT_BAD_RECORD_MAC).
883                 if (out.isReadable()) {
884                     final ByteBuf b = out;
885                     out = null;
886                     if (promise != null) {
887                         ctx.write(b, promise);
888                     } else {
889                         ctx.write(b);
890                     }
891                 } else if (promise != null) {
892                     ctx.write(Unpooled.EMPTY_BUFFER, promise);
893                 }
894                 // else out is not readable we can re-use it and so save an extra allocation
895 
896                 if (result.getStatus() == Status.CLOSED) {
897                     // First check if there is any write left that needs to be failed, if there is none we don't need
898                     // to create a new exception or obtain an existing one.
899                     if (!pendingUnencryptedWrites.isEmpty()) {
900                         // Make a best effort to preserve any exception that way previously encountered from the
901                         // handshake or the transport, else fallback to a general error.
902                         Throwable exception = handshakePromise.cause();
903                         if (exception == null) {
904                             exception = sslClosePromise.cause();
905                             if (exception == null) {
906                                 exception = new SslClosedEngineException("SSLEngine closed already");
907                             }
908                         }
909                         pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
910                     }
911 
912                     return;
913                 } else {
914                     switch (result.getHandshakeStatus()) {
915                         case NEED_TASK:
916                             if (!runDelegatedTasks(inUnwrap)) {
917                                 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
918                                 // resume once the task completes.
919                                 break outer;
920                             }
921                             break;
922                         case FINISHED:
923                         case NOT_HANDSHAKING: // work around for android bug that skips the FINISHED state.
924                             setHandshakeSuccess();
925                             break;
926                         case NEED_WRAP:
927                             // If we are expected to wrap again and we produced some data we need to ensure there
928                             // is something in the queue to process as otherwise we will not try again before there
929                             // was more added. Failing to do so may fail to produce an alert that can be
930                             // consumed by the remote peer.
931                             if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
932                                 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
933                             }
934                             break;
935                         case NEED_UNWRAP:
936                             // The underlying engine is starving so we need to feed it with more data.
937                             // See https://github.com/netty/netty/pull/5039
938                             readIfNeeded(ctx);
939                             return;
940                         default:
941                             throw new IllegalStateException(
942                                     "Unknown handshake status: " + result.getHandshakeStatus());
943                     }
944                 }
945             }
946         } finally {
947             if (out != null) {
948                 out.release();
949             }
950             if (inUnwrap) {
951                 setState(STATE_NEEDS_FLUSH);
952             }
953         }
954     }
955 
956     /**
957      * This method will not call
958      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
959      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
960      * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
961      */
962     private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
963         ByteBuf out = null;
964         ByteBufAllocator alloc = ctx.alloc();
965         try {
966             // Only continue to loop if the handler was not removed in the meantime.
967             // See https://github.com/netty/netty/issues/5860
968             outer: while (!ctx.isRemoved()) {
969                 if (out == null) {
970                     // As this is called for the handshake we have no real idea how big the buffer needs to be.
971                     // That said 2048 should give us enough room to include everything like ALPN / NPN data.
972                     // If this is not enough we will increase the buffer in wrap(...).
973                     out = allocateOutNetBuf(ctx, 2048, 1);
974                 }
975                 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
976                 if (result.bytesProduced() > 0) {
977                     ctx.write(out).addListener(new ChannelFutureListener() {
978                         @Override
979                         public void operationComplete(ChannelFuture future) {
980                             Throwable cause = future.cause();
981                             if (cause != null) {
982                                 setHandshakeFailureTransportFailure(ctx, cause);
983                             }
984                         }
985                     });
986                     if (inUnwrap) {
987                         setState(STATE_NEEDS_FLUSH);
988                     }
989                     out = null;
990                 }
991 
992                 HandshakeStatus status = result.getHandshakeStatus();
993                 switch (status) {
994                     case FINISHED:
995                         // We may be here because we read data and discovered the remote peer initiated a renegotiation
996                         // and this write is to complete the new handshake. The user may have previously done a
997                         // writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
998                         // attempt to wrap application data here if any is pending.
999                         if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1000                             wrap(ctx, true);
1001                         }
1002                         return false;
1003                     case NEED_TASK:
1004                         if (!runDelegatedTasks(inUnwrap)) {
1005                             // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1006                             // resume once the task completes.
1007                             break outer;
1008                         }
1009                         break;
1010                     case NEED_UNWRAP:
1011                         if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1012                             // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
1013                             // no use in trying to call wrap again because we have already attempted (or will after we
1014                             // return) to feed more data to the engine.
1015                             return false;
1016                         }
1017                         break;
1018                     case NEED_WRAP:
1019                         break;
1020                     case NOT_HANDSHAKING:
1021                         if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1022                             wrap(ctx, true);
1023                         }
1024                         // Workaround for TLS False Start problem reported at:
1025                         // https://github.com/netty/netty/issues/1108#issuecomment-14266970
1026                         if (!inUnwrap) {
1027                             unwrapNonAppData(ctx);
1028                         }
1029                         return true;
1030                     default:
1031                         throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1032                 }
1033 
1034                 // Check if did not produce any bytes and if so break out of the loop, but only if we did not process
1035                 // a task as last action. It's fine to not produce any data as part of executing a task.
1036                 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1037                     break;
1038                 }
1039 
1040                 // It should not consume empty buffers when it is not handshaking
1041                 // Fix for Android, where it was encrypting empty buffers even when not handshaking
1042                 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1043                     break;
1044                 }
1045             }
1046         }  finally {
1047             if (out != null) {
1048                 out.release();
1049             }
1050         }
1051         return false;
1052     }
1053 
1054     private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1055         throws SSLException {
1056         SSLEngineResult result = null;
1057 
1058         do {
1059             int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1060             // This call over-estimates, because we are slicing and not every nioBuffer will be part of
1061             // every slice. We could improve the estimate by having an nioBufferCount(offset, length).
1062             int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1063 
1064             if (!out.isWritable(nextOutSize)) {
1065                 if (result != null) {
1066                     // We underestimated the space needed to encrypt the entire in buf. Break out, and
1067                     // upstream will re-enqueue the buffer for later.
1068                     break;
1069                 }
1070                 // This shouldn't happen, as the out buf was properly sized for at least packetLength
1071                 // prior to calling wrap.
1072                 out.ensureWritable(nextOutSize);
1073             }
1074 
1075             ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1076             result = wrap(alloc, engine, wrapBuf, out);
1077 
1078             if (result.getStatus() == Status.CLOSED) {
1079                 // If the engine gets closed, we can exit out early. Otherwise, we'll do a full handling of
1080                 // possible results once finished.
1081                 break;
1082             }
1083 
1084             if (wrapBuf.isReadable()) {
1085                 // There may be some left-over, in which case we can just pick it up next loop, so reset the original
1086                 // reader index so its included again in the next slice.
1087                 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1088             }
1089         } while (in.readableBytes() > 0);
1090 
1091         return result;
1092     }
1093 
1094     private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1095             throws SSLException {
1096         ByteBuf newDirectIn = null;
1097         try {
1098             int readerIndex = in.readerIndex();
1099             int readableBytes = in.readableBytes();
1100 
1101             // We will call SslEngine.wrap(ByteBuffer[], ByteBuffer) to allow efficient handling of
1102             // CompositeByteBuf without force an extra memory copy when CompositeByteBuffer.nioBuffer() is called.
1103             final ByteBuffer[] in0;
1104             if (in.isDirect() || !engineType.wantsDirectBuffer) {
1105                 // As CompositeByteBuf.nioBufferCount() can be expensive (as it needs to check all composed ByteBuf
1106                 // to calculate the count) we will just assume a CompositeByteBuf contains more then 1 ByteBuf.
1107                 // The worst that can happen is that we allocate an extra ByteBuffer[] in CompositeByteBuf.nioBuffers()
1108                 // which is better then walking the composed ByteBuf in most cases.
1109                 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1110                     in0 = singleBuffer;
1111                     // We know its only backed by 1 ByteBuffer so use internalNioBuffer to keep object allocation
1112                     // to a minimum.
1113                     in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1114                 } else {
1115                     in0 = in.nioBuffers();
1116                 }
1117             } else {
1118                 // We could even go further here and check if its a CompositeByteBuf and if so try to decompose it and
1119                 // only replace the ByteBuffer that are not direct. At the moment we just will replace the whole
1120                 // CompositeByteBuf to keep the complexity to a minimum
1121                 newDirectIn = alloc.directBuffer(readableBytes);
1122                 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1123                 in0 = singleBuffer;
1124                 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1125             }
1126 
1127             for (;;) {
1128                 // Use toByteBuffer(...) which might be able to return the internal ByteBuffer and so reduce
1129                 // allocations.
1130                 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1131                 SSLEngineResult result = engine.wrap(in0, out0);
1132                 in.skipBytes(result.bytesConsumed());
1133                 out.writerIndex(out.writerIndex() + result.bytesProduced());
1134 
1135                 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1136                     out.ensureWritable(engine.getSession().getPacketBufferSize());
1137                 } else {
1138                     return result;
1139                 }
1140             }
1141         } finally {
1142             // Null out to allow GC of ByteBuffer
1143             singleBuffer[0] = null;
1144 
1145             if (newDirectIn != null) {
1146                 newDirectIn.release();
1147             }
1148         }
1149     }
1150 
1151     @Override
1152     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1153         boolean handshakeFailed = handshakePromise.cause() != null;
1154 
1155         // Channel closed, we will generate 'ClosedChannelException' now.
1156         ClosedChannelException exception = new ClosedChannelException();
1157 
1158         // Add a supressed exception if the handshake was not completed yet.
1159         if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1160             ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1161                     "Connection closed while SSL/TLS handshake was in progress",
1162                     SslHandler.class, "channelInactive"));
1163         }
1164 
1165         // Make sure to release SSLEngine,
1166         // and notify the handshake future if the connection has been closed during handshake.
1167         setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1168                 false);
1169 
1170         // Ensure we always notify the sslClosePromise as well
1171         notifyClosePromise(exception);
1172 
1173         try {
1174             super.channelInactive(ctx);
1175         } catch (DecoderException e) {
1176             if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1177                 // We only rethrow the exception if the handshake did not fail before channelInactive(...) was called
1178                 // as otherwise this may produce duplicated failures as super.channelInactive(...) will also call
1179                 // channelRead(...).
1180                 //
1181                 // See https://github.com/netty/netty/issues/10119
1182                 throw e;
1183             }
1184         }
1185     }
1186 
1187     @Override
1188     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1189         if (ignoreException(cause)) {
1190             // It is safe to ignore the 'connection reset by peer' or
1191             // 'broken pipe' error after sending close_notify.
1192             if (logger.isDebugEnabled()) {
1193                 logger.debug(
1194                         "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1195                         "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1196             }
1197 
1198             // Close the connection explicitly just in case the transport
1199             // did not close the connection automatically.
1200             if (ctx.channel().isActive()) {
1201                 ctx.close();
1202             }
1203         } else {
1204             ctx.fireExceptionCaught(cause);
1205         }
1206     }
1207 
1208     /**
1209      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
1210      *
1211      * When an ssl connection is closed a close_notify message is sent.
1212      * After that the peer also sends close_notify however, it's not mandatory to receive
1213      * the close_notify. The party who sent the initial close_notify can close the connection immediately
1214      * then the peer will get connection reset error.
1215      *
1216      */
1217     private boolean ignoreException(Throwable t) {
1218         if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1219             String message = t.getMessage();
1220 
1221             // first try to match connection reset / broke peer based on the regex. This is the fastest way
1222             // but may fail on different jdk impls or OS's
1223             if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1224                 return true;
1225             }
1226 
1227             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
1228             StackTraceElement[] elements = t.getStackTrace();
1229             for (StackTraceElement element: elements) {
1230                 String classname = element.getClassName();
1231                 String methodname = element.getMethodName();
1232 
1233                 // skip all classes that belong to the io.netty package
1234                 if (classname.startsWith("io.netty.")) {
1235                     continue;
1236                 }
1237 
1238                 // check if the method name is read if not skip it
1239                 if (!"read".equals(methodname)) {
1240                     continue;
1241                 }
1242 
1243                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
1244                 // also others
1245                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1246                     return true;
1247                 }
1248 
1249                 try {
1250                     // No match by now.. Try to load the class via classloader and inspect it.
1251                     // This is mainly done as other JDK implementations may differ in name of
1252                     // the impl.
1253                     Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1254 
1255                     if (SocketChannel.class.isAssignableFrom(clazz)
1256                             || DatagramChannel.class.isAssignableFrom(clazz)) {
1257                         return true;
1258                     }
1259 
1260                     // also match against SctpChannel via String matching as it may not present.
1261                     if (PlatformDependent.javaVersion() >= 7
1262                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1263                         return true;
1264                     }
1265                 } catch (Throwable cause) {
1266                     if (logger.isDebugEnabled()) {
1267                         logger.debug("Unexpected exception while loading class {} classname {}",
1268                                 getClass(), classname, cause);
1269                     }
1270                 }
1271             }
1272         }
1273 
1274         return false;
1275     }
1276 
1277     /**
1278      * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1279      * will not increase the readerIndex of the given {@link ByteBuf}.
1280      *
1281      * @param   buffer
1282      *                  The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1283      *                  otherwise it will throw an {@link IllegalArgumentException}.
1284      * @return encrypted
1285      *                  {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1286      * @throws IllegalArgumentException
1287      *                  Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1288      */
1289     public static boolean isEncrypted(ByteBuf buffer) {
1290         if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1291             throw new IllegalArgumentException(
1292                     "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1293         }
1294         return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1295     }
1296 
1297     private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1298         int packetLength = this.packetLength;
1299         // If we calculated the length of the current SSL record before, use that information.
1300         if (packetLength > 0) {
1301             if (in.readableBytes() < packetLength) {
1302                 return;
1303             }
1304         } else {
1305             // Get the packet length and wait until we get a packets worth of data to unwrap.
1306             final int readableBytes = in.readableBytes();
1307             if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1308                 return;
1309             }
1310             packetLength = getEncryptedPacketLength(in, in.readerIndex());
1311             if (packetLength == SslUtils.NOT_ENCRYPTED) {
1312                 // Not an SSL/TLS packet
1313                 NotSslRecordException e = new NotSslRecordException(
1314                         "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1315                 in.skipBytes(in.readableBytes());
1316 
1317                 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1318                 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1319                 setHandshakeFailure(ctx, e);
1320 
1321                 throw e;
1322             }
1323             if (packetLength == NOT_ENOUGH_DATA) {
1324                 return;
1325             }
1326             assert packetLength > 0;
1327             if (packetLength > readableBytes) {
1328                 // wait until the whole packet can be read
1329                 this.packetLength = packetLength;
1330                 return;
1331             }
1332         }
1333 
1334         // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1335         // be consumed by the SSLEngine.
1336         this.packetLength = 0;
1337         try {
1338             final int bytesConsumed = unwrap(ctx, in, packetLength);
1339             assert bytesConsumed == packetLength || engine.isInboundDone() :
1340                     "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1341                             bytesConsumed;
1342         } catch (Throwable cause) {
1343             handleUnwrapThrowable(ctx, cause);
1344         }
1345     }
1346 
1347     private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1348         try {
1349             unwrap(ctx, in, in.readableBytes());
1350         } catch (Throwable cause) {
1351             handleUnwrapThrowable(ctx, cause);
1352         }
1353     }
1354 
1355     private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1356         try {
1357             // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1358             // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1359             // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1360             // has been closed.
1361             if (handshakePromise.tryFailure(cause)) {
1362                 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1363             }
1364 
1365             // Let's check if the handler was removed in the meantime and so pendingUnencryptedWrites is null.
1366             if (pendingUnencryptedWrites != null) {
1367                 // We need to flush one time as there may be an alert that we should send to the remote peer because
1368                 // of the SSLException reported here.
1369                 wrapAndFlush(ctx);
1370             }
1371         } catch (SSLException ex) {
1372             logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1373                     " because of an previous SSLException, ignoring...", ex);
1374         } finally {
1375             // ensure we always flush and close the channel.
1376             setHandshakeFailure(ctx, cause, true, false, true);
1377         }
1378         PlatformDependent.throwException(cause);
1379     }
1380 
1381     @Override
1382     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1383         if (isStateSet(STATE_PROCESS_TASK)) {
1384             return;
1385         }
1386         if (jdkCompatibilityMode) {
1387             decodeJdkCompatible(ctx, in);
1388         } else {
1389             decodeNonJdkCompatible(ctx, in);
1390         }
1391     }
1392 
1393     @Override
1394     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1395         channelReadComplete0(ctx);
1396     }
1397 
1398     private void channelReadComplete0(ChannelHandlerContext ctx) {
1399         // Discard bytes of the cumulation buffer if needed.
1400         discardSomeReadBytes();
1401 
1402         flushIfNeeded(ctx);
1403         readIfNeeded(ctx);
1404 
1405         clearState(STATE_FIRE_CHANNEL_READ);
1406         ctx.fireChannelReadComplete();
1407     }
1408 
1409     private void readIfNeeded(ChannelHandlerContext ctx) {
1410         // If handshake is not finished yet, we need more data.
1411         if (!ctx.channel().config().isAutoRead() &&
1412                 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1413             // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1414             // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1415             ctx.read();
1416         }
1417     }
1418 
1419     private void flushIfNeeded(ChannelHandlerContext ctx) {
1420         if (isStateSet(STATE_NEEDS_FLUSH)) {
1421             forceFlush(ctx);
1422         }
1423     }
1424 
1425     /**
1426      * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1427      */
1428     private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1429         return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1430     }
1431 
1432     /**
1433      * Unwraps inbound SSL records.
1434      */
1435     private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1436         final int originalLength = length;
1437         boolean wrapLater = false;
1438         boolean notifyClosure = false;
1439         boolean executedRead = false;
1440         ByteBuf decodeOut = allocate(ctx, length);
1441         try {
1442             // Only continue to loop if the handler was not removed in the meantime.
1443             // See https://github.com/netty/netty/issues/5860
1444             do {
1445                 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1446                 final Status status = result.getStatus();
1447                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1448                 final int produced = result.bytesProduced();
1449                 final int consumed = result.bytesConsumed();
1450 
1451                 // Skip bytes now in case unwrap is called in a re-entry scenario. For example LocalChannel.read()
1452                 // may entry this method in a re-entry fashion and if the peer is writing into a shared buffer we may
1453                 // unwrap the same data multiple times.
1454                 packet.skipBytes(consumed);
1455                 length -= consumed;
1456 
1457                 // The expected sequence of events is:
1458                 // 1. Notify of handshake success
1459                 // 2. fireChannelRead for unwrapped data
1460                 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1461                     wrapLater |= (decodeOut.isReadable() ?
1462                             setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1463                             handshakeStatus == HandshakeStatus.FINISHED;
1464                 }
1465 
1466                 // Dispatch decoded data after we have notified of handshake success. If this method has been invoked
1467                 // in a re-entry fashion we execute a task on the executor queue to process after the stack unwinds
1468                 // to preserve order of events.
1469                 if (decodeOut.isReadable()) {
1470                     setState(STATE_FIRE_CHANNEL_READ);
1471                     if (isStateSet(STATE_UNWRAP_REENTRY)) {
1472                         executedRead = true;
1473                         executeChannelRead(ctx, decodeOut);
1474                     } else {
1475                         ctx.fireChannelRead(decodeOut);
1476                     }
1477                     decodeOut = null;
1478                 }
1479 
1480                 if (status == Status.CLOSED) {
1481                     notifyClosure = true; // notify about the CLOSED state of the SSLEngine. See #137
1482                 } else if (status == Status.BUFFER_OVERFLOW) {
1483                     if (decodeOut != null) {
1484                         decodeOut.release();
1485                     }
1486                     final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1487                     // Allocate a new buffer which can hold all the rest data and loop again.
1488                     // It may happen that applicationBufferSize < produced while there is still more to unwrap, in this
1489                     // case we will just allocate a new buffer with the capacity of applicationBufferSize and call
1490                     // unwrap again.
1491                     decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1492                             applicationBufferSize : applicationBufferSize - produced));
1493                     continue;
1494                 }
1495 
1496                 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1497                     boolean pending = runDelegatedTasks(true);
1498                     if (!pending) {
1499                         // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1500                         // resume once the task completes.
1501                         //
1502                         // We break out of the loop only and do NOT return here as we still may need to notify
1503                         // about the closure of the SSLEngine.
1504                         wrapLater = false;
1505                         break;
1506                     }
1507                 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1508                     // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1509                     // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1510                     // costly unwrap operation and break out of the loop.
1511                     if (wrapNonAppData(ctx, true) && length == 0) {
1512                         break;
1513                     }
1514                 }
1515 
1516                 if (status == Status.BUFFER_UNDERFLOW ||
1517                         // If we processed NEED_TASK we should try again even we did not consume or produce anything.
1518                         handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1519                                 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1520                     if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1521                         // The underlying engine is starving so we need to feed it with more data.
1522                         // See https://github.com/netty/netty/pull/5039
1523                         readIfNeeded(ctx);
1524                     }
1525 
1526                     break;
1527                 } else if (decodeOut == null) {
1528                     decodeOut = allocate(ctx, length);
1529                 }
1530             } while (!ctx.isRemoved());
1531 
1532             if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1533                 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1534                 // we do not stale.
1535                 //
1536                 // See https://github.com/netty/netty/pull/2437
1537                 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1538                 wrapLater = true;
1539             }
1540 
1541             if (wrapLater) {
1542                 wrap(ctx, true);
1543             }
1544         } finally {
1545             if (decodeOut != null) {
1546                 decodeOut.release();
1547             }
1548 
1549             if (notifyClosure) {
1550                 if (executedRead) {
1551                     executeNotifyClosePromise(ctx);
1552                 } else {
1553                     notifyClosePromise(null);
1554                 }
1555             }
1556         }
1557         return originalLength - length;
1558     }
1559 
1560     private boolean setHandshakeSuccessUnwrapMarkReentry() {
1561         // setHandshakeSuccess calls out to external methods which may trigger re-entry. We need to preserve ordering of
1562         // fireChannelRead for decodeOut relative to re-entry data.
1563         final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1564         if (setReentryState) {
1565             setState(STATE_UNWRAP_REENTRY);
1566         }
1567         try {
1568             return setHandshakeSuccess();
1569         } finally {
1570             // It is unlikely this specific method will be re-entry because handshake completion is infrequent, but just
1571             // in case we only clear the state if we set it in the first place.
1572             if (setReentryState) {
1573                 clearState(STATE_UNWRAP_REENTRY);
1574             }
1575         }
1576     }
1577 
1578     private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1579         try {
1580             ctx.executor().execute(new Runnable() {
1581                 @Override
1582                 public void run() {
1583                     notifyClosePromise(null);
1584                 }
1585             });
1586         } catch (RejectedExecutionException e) {
1587             notifyClosePromise(e);
1588         }
1589     }
1590 
1591     private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1592         try {
1593             ctx.executor().execute(new Runnable() {
1594                 @Override
1595                 public void run() {
1596                     ctx.fireChannelRead(decodedOut);
1597                 }
1598             });
1599         } catch (RejectedExecutionException e) {
1600             decodedOut.release();
1601             throw e;
1602         }
1603     }
1604 
1605     private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1606         return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1607                 out.nioBuffer(index, len);
1608     }
1609 
1610     private static boolean inEventLoop(Executor executor) {
1611         return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1612     }
1613 
1614     /**
1615      * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
1616      * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
1617      *
1618      * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
1619      * more tasks to process.
1620      */
1621     private boolean runDelegatedTasks(boolean inUnwrap) {
1622         if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1623             // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
1624             // EventLoop we can just run all tasks at once.
1625             for (;;) {
1626                 Runnable task = engine.getDelegatedTask();
1627                 if (task == null) {
1628                     return true;
1629                 }
1630                 setState(STATE_PROCESS_TASK);
1631                 if (task instanceof AsyncRunnable) {
1632                     // Let's set the task to processing task before we try to execute it.
1633                     boolean pending = false;
1634                     try {
1635                         AsyncRunnable asyncTask = (AsyncRunnable) task;
1636                         AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1637                         asyncTask.run(completionHandler);
1638                         pending = completionHandler.resumeLater();
1639                         if (pending) {
1640                             return false;
1641                         }
1642                     } finally {
1643                         if (!pending) {
1644                             // The task has completed, lets clear the state. If it is not completed we will clear the
1645                             // state once it is.
1646                             clearState(STATE_PROCESS_TASK);
1647                         }
1648                     }
1649                 } else {
1650                     try {
1651                         task.run();
1652                     } finally {
1653                         clearState(STATE_PROCESS_TASK);
1654                     }
1655                 }
1656             }
1657         } else {
1658             executeDelegatedTask(inUnwrap);
1659             return false;
1660         }
1661     }
1662 
1663     private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1664         return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1665     }
1666 
1667     private void executeDelegatedTask(boolean inUnwrap) {
1668         executeDelegatedTask(getTaskRunner(inUnwrap));
1669     }
1670 
1671     private void executeDelegatedTask(SslTasksRunner task) {
1672         setState(STATE_PROCESS_TASK);
1673         try {
1674             delegatedTaskExecutor.execute(task);
1675         } catch (RejectedExecutionException e) {
1676             clearState(STATE_PROCESS_TASK);
1677             throw e;
1678         }
1679     }
1680 
1681     private final class AsyncTaskCompletionHandler implements Runnable {
1682         private final boolean inUnwrap;
1683         boolean didRun;
1684         boolean resumeLater;
1685 
1686         AsyncTaskCompletionHandler(boolean inUnwrap) {
1687             this.inUnwrap = inUnwrap;
1688         }
1689 
1690         @Override
1691         public void run() {
1692             didRun = true;
1693             if (resumeLater) {
1694                 getTaskRunner(inUnwrap).runComplete();
1695             }
1696         }
1697 
1698         boolean resumeLater() {
1699             if (!didRun) {
1700                 resumeLater = true;
1701                 return true;
1702             }
1703             return false;
1704         }
1705     }
1706 
1707     /**
1708      * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
1709      * of resume work on the {@link EventExecutor} once the task was executed.
1710      */
1711     private final class SslTasksRunner implements Runnable {
1712         private final boolean inUnwrap;
1713         private final Runnable runCompleteTask = new Runnable() {
1714             @Override
1715             public void run() {
1716                 runComplete();
1717             }
1718         };
1719 
1720         SslTasksRunner(boolean inUnwrap) {
1721             this.inUnwrap = inUnwrap;
1722         }
1723 
1724         // Handle errors which happened during task processing.
1725         private void taskError(Throwable e) {
1726             if (inUnwrap) {
1727                 // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
1728                 // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
1729                 //
1730                 // This will also ensure we fail the handshake future and flush all produced data.
1731                 try {
1732                     handleUnwrapThrowable(ctx, e);
1733                 } catch (Throwable cause) {
1734                     safeExceptionCaught(cause);
1735                 }
1736             } else {
1737                 setHandshakeFailure(ctx, e);
1738                 forceFlush(ctx);
1739             }
1740         }
1741 
1742         // Try to call exceptionCaught(...)
1743         private void safeExceptionCaught(Throwable cause) {
1744             try {
1745                 exceptionCaught(ctx, wrapIfNeeded(cause));
1746             } catch (Throwable error) {
1747                 ctx.fireExceptionCaught(error);
1748             }
1749         }
1750 
1751         private Throwable wrapIfNeeded(Throwable cause) {
1752             if (!inUnwrap) {
1753                 // If we are not in unwrap(...) we can just rethrow without wrapping at all.
1754                 return cause;
1755             }
1756             // As the exception would have been triggered by an inbound operation we will need to wrap it in a
1757             // DecoderException to mimic what a decoder would do when decode(...) throws.
1758             return cause instanceof DecoderException ? cause : new DecoderException(cause);
1759         }
1760 
1761         private void tryDecodeAgain() {
1762             try {
1763                 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1764             } catch (Throwable cause) {
1765                 safeExceptionCaught(cause);
1766             } finally {
1767                 // As we called channelRead(...) we also need to call channelReadComplete(...) which
1768                 // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
1769                 // more data is needed.
1770                 channelReadComplete0(ctx);
1771             }
1772         }
1773 
1774         /**
1775          * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
1776          * on the {@link EventExecutor}.
1777          */
1778         private void resumeOnEventExecutor() {
1779             assert ctx.executor().inEventLoop();
1780             clearState(STATE_PROCESS_TASK);
1781             try {
1782                 HandshakeStatus status = engine.getHandshakeStatus();
1783                 switch (status) {
1784                     // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
1785                     // a result of this. Let's reschedule....
1786                     case NEED_TASK:
1787                         executeDelegatedTask(this);
1788 
1789                         break;
1790 
1791                     // The handshake finished, lets notify about the completion of it and resume processing.
1792                     case FINISHED:
1793                     // Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
1794                     case NOT_HANDSHAKING:
1795                         setHandshakeSuccess(); // NOT_HANDSHAKING -> workaround for android skipping FINISHED state.
1796                         try {
1797                             // Lets call wrap to ensure we produce the alert if there is any pending and also to
1798                             // ensure we flush any queued data..
1799                             wrap(ctx, inUnwrap);
1800                         } catch (Throwable e) {
1801                             taskError(e);
1802                             return;
1803                         }
1804                         if (inUnwrap) {
1805                             // If we were in the unwrap call when the task was processed we should also try to unwrap
1806                             // non app data first as there may not anything left in the inbound buffer to process.
1807                             unwrapNonAppData(ctx);
1808                         }
1809 
1810                         // Flush now as we may have written some data as part of the wrap call.
1811                         forceFlush(ctx);
1812 
1813                         tryDecodeAgain();
1814                         break;
1815 
1816                     // We need more data so lets try to unwrap first and then call decode again which will feed us
1817                     // with buffered data (if there is any).
1818                     case NEED_UNWRAP:
1819                         try {
1820                             unwrapNonAppData(ctx);
1821                         } catch (SSLException e) {
1822                             handleUnwrapThrowable(ctx, e);
1823                             return;
1824                         }
1825                         tryDecodeAgain();
1826                         break;
1827 
1828                     // To make progress we need to call SSLEngine.wrap(...) which may produce more output data
1829                     // that will be written to the Channel.
1830                     case NEED_WRAP:
1831                         try {
1832                             if (!wrapNonAppData(ctx, false) && inUnwrap) {
1833                                 // The handshake finished in wrapNonAppData(...), we need to try call
1834                                 // unwrapNonAppData(...) as we may have some alert that we should read.
1835                                 //
1836                                 // This mimics what we would do when we are calling this method while in unwrap(...).
1837                                 unwrapNonAppData(ctx);
1838                             }
1839 
1840                             // Flush now as we may have written some data as part of the wrap call.
1841                             forceFlush(ctx);
1842                         } catch (Throwable e) {
1843                             taskError(e);
1844                             return;
1845                         }
1846 
1847                         // Now try to feed in more data that we have buffered.
1848                         tryDecodeAgain();
1849                         break;
1850 
1851                     default:
1852                         // Should never reach here as we handle all cases.
1853                         throw new AssertionError();
1854                 }
1855             } catch (Throwable cause) {
1856                 safeExceptionCaught(cause);
1857             }
1858         }
1859 
1860         void runComplete() {
1861             EventExecutor executor = ctx.executor();
1862             // Jump back on the EventExecutor. We do this even if we are already on the EventLoop to guard against
1863             // reentrancy issues. Failing to do so could lead to the situation of tryDecode(...) be called and so
1864             // channelRead(...) while still in the decode loop. In this case channelRead(...) might release the input
1865             // buffer if its empty which would then result in an IllegalReferenceCountException when we try to continue
1866             // decoding.
1867             //
1868             // See https://github.com/netty/netty-tcnative/issues/680
1869             executor.execute(new Runnable() {
1870                 @Override
1871                 public void run() {
1872                     resumeOnEventExecutor();
1873                 }
1874             });
1875         }
1876 
1877         @Override
1878         public void run() {
1879             try {
1880                 Runnable task = engine.getDelegatedTask();
1881                 if (task == null) {
1882                     // The task was processed in the meantime. Let's just return.
1883                     return;
1884                 }
1885                 if (task instanceof AsyncRunnable) {
1886                     AsyncRunnable asyncTask = (AsyncRunnable) task;
1887                     asyncTask.run(runCompleteTask);
1888                 } else {
1889                     task.run();
1890                     runComplete();
1891                 }
1892             } catch (final Throwable cause) {
1893                 handleException(cause);
1894             }
1895         }
1896 
1897         private void handleException(final Throwable cause) {
1898             EventExecutor executor = ctx.executor();
1899             if (executor.inEventLoop()) {
1900                 clearState(STATE_PROCESS_TASK);
1901                 safeExceptionCaught(cause);
1902             } else {
1903                 try {
1904                     executor.execute(new Runnable() {
1905                         @Override
1906                         public void run() {
1907                             clearState(STATE_PROCESS_TASK);
1908                             safeExceptionCaught(cause);
1909                         }
1910                     });
1911                 } catch (RejectedExecutionException ignore) {
1912                     clearState(STATE_PROCESS_TASK);
1913                     // the context itself will handle the rejected exception when try to schedule the operation so
1914                     // ignore the RejectedExecutionException
1915                     ctx.fireExceptionCaught(cause);
1916                 }
1917             }
1918         }
1919     }
1920 
1921     /**
1922      * Notify all the handshake futures about the successfully handshake
1923      * @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
1924      * was fired. {@code false} otherwise.
1925      */
1926     private boolean setHandshakeSuccess() {
1927         // Our control flow may invoke this method multiple times for a single FINISHED event. For example
1928         // wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
1929         // NOT_HANDSHAKING which invokes setHandshakeSuccess, and then wrapNonAppData also directly invokes this method.
1930         final boolean notified;
1931         if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1932             if (logger.isDebugEnabled()) {
1933                 SSLSession session = engine.getSession();
1934                 logger.debug(
1935                         "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1936                         ctx.channel(),
1937                         session.getProtocol(),
1938                         session.getCipherSuite());
1939             }
1940             ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1941         }
1942         if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1943             clearState(STATE_READ_DURING_HANDSHAKE);
1944             if (!ctx.channel().config().isAutoRead()) {
1945                 ctx.read();
1946             }
1947         }
1948         return notified;
1949     }
1950 
1951     /**
1952      * Notify all the handshake futures about the failure during the handshake.
1953      */
1954     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1955         setHandshakeFailure(ctx, cause, true, true, false);
1956     }
1957 
1958     /**
1959      * Notify all the handshake futures about the failure during the handshake.
1960      */
1961     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1962                                      boolean notify, boolean alwaysFlushAndClose) {
1963         try {
1964             // Release all resources such as internal buffers that SSLEngine is managing.
1965             setState(STATE_OUTBOUND_CLOSED);
1966             engine.closeOutbound();
1967 
1968             if (closeInbound) {
1969                 try {
1970                     engine.closeInbound();
1971                 } catch (SSLException e) {
1972                     if (logger.isDebugEnabled()) {
1973                         // only log in debug mode as it most likely harmless and latest chrome still trigger
1974                         // this all the time.
1975                         //
1976                         // See https://github.com/netty/netty/issues/1340
1977                         String msg = e.getMessage();
1978                         if (msg == null || !(msg.contains("possible truncation attack") ||
1979                                 msg.contains("closing inbound before receiving peer's close_notify"))) {
1980                             logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1981                         }
1982                     }
1983                 }
1984             }
1985             if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1986                 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1987             }
1988         } finally {
1989             // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1990             releaseAndFailAll(ctx, cause);
1991         }
1992     }
1993 
1994     private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1995         // If TLS control frames fail to write we are in an unknown state and may become out of
1996         // sync with our peer. We give up and close the channel. This will also take care of
1997         // cleaning up any outstanding state (e.g. handshake promise, queued unencrypted data).
1998         try {
1999             SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2000             releaseAndFailAll(ctx, transportFailure);
2001             if (handshakePromise.tryFailure(transportFailure)) {
2002                 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2003             }
2004         } finally {
2005             ctx.close();
2006         }
2007     }
2008 
2009     private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2010         if (pendingUnencryptedWrites != null) {
2011             pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2012         }
2013     }
2014 
2015     private void notifyClosePromise(Throwable cause) {
2016         if (cause == null) {
2017             if (sslClosePromise.trySuccess(ctx.channel())) {
2018                 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2019             }
2020         } else {
2021             if (sslClosePromise.tryFailure(cause)) {
2022                 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2023             }
2024         }
2025     }
2026 
2027     private void closeOutboundAndChannel(
2028             final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2029         setState(STATE_OUTBOUND_CLOSED);
2030         engine.closeOutbound();
2031 
2032         if (!ctx.channel().isActive()) {
2033             if (disconnect) {
2034                 ctx.disconnect(promise);
2035             } else {
2036                 ctx.close(promise);
2037             }
2038             return;
2039         }
2040 
2041         ChannelPromise closeNotifyPromise = ctx.newPromise();
2042         try {
2043             flush(ctx, closeNotifyPromise);
2044         } finally {
2045             if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2046                 setState(STATE_CLOSE_NOTIFY);
2047                 // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
2048                 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
2049                 // to fail the promise because of this. This will then fail as it was already completed by
2050                 // safeClose(...). We create a new ChannelPromise and try to notify the original ChannelPromise
2051                 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
2052                 // because of a propagated Exception.
2053                 //
2054                 // See https://github.com/netty/netty/issues/5931
2055                 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2056             } else {
2057                 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
2058                 sslClosePromise.addListener(new FutureListener<Channel>() {
2059                     @Override
2060                     public void operationComplete(Future<Channel> future) {
2061                         promise.setSuccess();
2062                     }
2063                 });
2064             }
2065         }
2066     }
2067 
2068     private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2069         if (pendingUnencryptedWrites != null) {
2070             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2071         } else {
2072             promise.setFailure(newPendingWritesNullException());
2073         }
2074         flush(ctx);
2075     }
2076 
2077     @Override
2078     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2079         this.ctx = ctx;
2080         Channel channel = ctx.channel();
2081         pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
2082 
2083         setOpensslEngineSocketFd(channel);
2084         boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2085         boolean active = channel.isActive();
2086         if (active || fastOpen) {
2087             // Explicitly flush the handshake only if the channel is already active.
2088             // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
2089             // The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
2090             startHandshakeProcessing(active);
2091             // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
2092             // flush pending data in the outbound buffer later in channelActive().
2093             final ChannelOutboundBuffer outboundBuffer;
2094             if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2095                     outboundBuffer.totalPendingWriteBytes() > 0)) {
2096                 setState(STATE_NEEDS_FLUSH);
2097             }
2098         }
2099     }
2100 
2101     private void startHandshakeProcessing(boolean flushAtEnd) {
2102         if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2103             setState(STATE_HANDSHAKE_STARTED);
2104             if (engine.getUseClientMode()) {
2105                 // Begin the initial handshake.
2106                 // channelActive() event has been fired already, which means this.channelActive() will
2107                 // not be invoked. We have to initialize here instead.
2108                 handshake(flushAtEnd);
2109             }
2110             applyHandshakeTimeout();
2111         } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2112             forceFlush(ctx);
2113         }
2114     }
2115 
2116     /**
2117      * Performs TLS renegotiation.
2118      */
2119     public Future<Channel> renegotiate() {
2120         ChannelHandlerContext ctx = this.ctx;
2121         if (ctx == null) {
2122             throw new IllegalStateException();
2123         }
2124 
2125         return renegotiate(ctx.executor().<Channel>newPromise());
2126     }
2127 
2128     /**
2129      * Performs TLS renegotiation.
2130      */
2131     public Future<Channel> renegotiate(final Promise<Channel> promise) {
2132         ObjectUtil.checkNotNull(promise, "promise");
2133 
2134         ChannelHandlerContext ctx = this.ctx;
2135         if (ctx == null) {
2136             throw new IllegalStateException();
2137         }
2138 
2139         EventExecutor executor = ctx.executor();
2140         if (!executor.inEventLoop()) {
2141             executor.execute(new Runnable() {
2142                 @Override
2143                 public void run() {
2144                     renegotiateOnEventLoop(promise);
2145                 }
2146             });
2147             return promise;
2148         }
2149 
2150         renegotiateOnEventLoop(promise);
2151         return promise;
2152     }
2153 
2154     private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2155         final Promise<Channel> oldHandshakePromise = handshakePromise;
2156         if (!oldHandshakePromise.isDone()) {
2157             // There's no need to handshake because handshake is in progress already.
2158             // Merge the new promise into the old one.
2159             PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2160         } else {
2161             handshakePromise = newHandshakePromise;
2162             handshake(true);
2163             applyHandshakeTimeout();
2164         }
2165     }
2166 
2167     /**
2168      * Performs TLS (re)negotiation.
2169      * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
2170      *                  end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
2171      *                  connect.
2172      */
2173     private void handshake(boolean flushAtEnd) {
2174         if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2175             // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
2176             // is in progress. See https://github.com/netty/netty/issues/4718.
2177             return;
2178         }
2179         if (handshakePromise.isDone()) {
2180             // If the handshake is done already lets just return directly as there is no need to trigger it again.
2181             // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
2182             // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
2183             // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
2184             // call fireChannelActive() on the pipeline.
2185             return;
2186         }
2187 
2188         // Begin handshake.
2189         final ChannelHandlerContext ctx = this.ctx;
2190         try {
2191             engine.beginHandshake();
2192             wrapNonAppData(ctx, false);
2193         } catch (Throwable e) {
2194             setHandshakeFailure(ctx, e);
2195         } finally {
2196             if (flushAtEnd) {
2197                 forceFlush(ctx);
2198             }
2199         }
2200     }
2201 
2202     private void applyHandshakeTimeout() {
2203         final Promise<Channel> localHandshakePromise = this.handshakePromise;
2204 
2205         // Set timeout if necessary.
2206         final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2207         if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2208             return;
2209         }
2210 
2211         final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2212             @Override
2213             public void run() {
2214                 if (localHandshakePromise.isDone()) {
2215                     return;
2216                 }
2217                 SSLException exception =
2218                         new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2219                 try {
2220                     if (localHandshakePromise.tryFailure(exception)) {
2221                         SslUtils.handleHandshakeFailure(ctx, exception, true);
2222                     }
2223                 } finally {
2224                     releaseAndFailAll(ctx, exception);
2225                 }
2226             }
2227         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2228 
2229         // Cancel the handshake timeout when handshake is finished.
2230         localHandshakePromise.addListener(new FutureListener<Channel>() {
2231             @Override
2232             public void operationComplete(Future<Channel> f) throws Exception {
2233                 timeoutFuture.cancel(false);
2234             }
2235         });
2236     }
2237 
2238     private void forceFlush(ChannelHandlerContext ctx) {
2239         clearState(STATE_NEEDS_FLUSH);
2240         ctx.flush();
2241     }
2242 
2243      private void setOpensslEngineSocketFd(Channel c) {
2244          if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2245              ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2246          }
2247      }
2248 
2249     /**
2250      * Issues an initial TLS handshake once connected when used in client-mode
2251      */
2252     @Override
2253     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2254         setOpensslEngineSocketFd(ctx.channel());
2255         if (!startTls) {
2256             startHandshakeProcessing(true);
2257         }
2258         ctx.fireChannelActive();
2259     }
2260 
2261     private void safeClose(
2262             final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2263             final ChannelPromise promise) {
2264         if (!ctx.channel().isActive()) {
2265             ctx.close(promise);
2266             return;
2267         }
2268 
2269         final Future<?> timeoutFuture;
2270         if (!flushFuture.isDone()) {
2271             long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2272             if (closeNotifyTimeout > 0) {
2273                 // Force-close the connection if close_notify is not fully sent in time.
2274                 timeoutFuture = ctx.executor().schedule(new Runnable() {
2275                     @Override
2276                     public void run() {
2277                         // May be done in the meantime as cancel(...) is only best effort.
2278                         if (!flushFuture.isDone()) {
2279                             logger.warn("{} Last write attempt timed out; force-closing the connection.",
2280                                     ctx.channel());
2281                             addCloseListener(ctx.close(ctx.newPromise()), promise);
2282                         }
2283                     }
2284                 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2285             } else {
2286                 timeoutFuture = null;
2287             }
2288         } else {
2289             timeoutFuture = null;
2290         }
2291 
2292         // Close the connection if close_notify is sent in time.
2293         flushFuture.addListener(new ChannelFutureListener() {
2294             @Override
2295             public void operationComplete(ChannelFuture f) {
2296                 if (timeoutFuture != null) {
2297                     timeoutFuture.cancel(false);
2298                 }
2299                 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2300                 if (closeNotifyReadTimeout <= 0) {
2301                     // Trigger the close in all cases to make sure the promise is notified
2302                     // See https://github.com/netty/netty/issues/2358
2303                     addCloseListener(ctx.close(ctx.newPromise()), promise);
2304                 } else {
2305                     final Future<?> closeNotifyReadTimeoutFuture;
2306 
2307                     if (!sslClosePromise.isDone()) {
2308                         closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2309                             @Override
2310                             public void run() {
2311                                 if (!sslClosePromise.isDone()) {
2312                                     logger.debug(
2313                                             "{} did not receive close_notify in {}ms; force-closing the connection.",
2314                                             ctx.channel(), closeNotifyReadTimeout);
2315 
2316                                     // Do the close now...
2317                                     addCloseListener(ctx.close(ctx.newPromise()), promise);
2318                                 }
2319                             }
2320                         }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2321                     } else {
2322                         closeNotifyReadTimeoutFuture = null;
2323                     }
2324 
2325                     // Do the close once the we received the close_notify.
2326                     sslClosePromise.addListener(new FutureListener<Channel>() {
2327                         @Override
2328                         public void operationComplete(Future<Channel> future) throws Exception {
2329                             if (closeNotifyReadTimeoutFuture != null) {
2330                                 closeNotifyReadTimeoutFuture.cancel(false);
2331                             }
2332                             addCloseListener(ctx.close(ctx.newPromise()), promise);
2333                         }
2334                     });
2335                 }
2336             }
2337         });
2338     }
2339 
2340     private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2341         // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
2342         // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
2343         // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
2344         // IllegalStateException.
2345         // Also we not want to log if the notification happens as this is expected in some cases.
2346         // See https://github.com/netty/netty/issues/5598
2347         PromiseNotifier.cascade(false, future, promise);
2348     }
2349 
2350     /**
2351      * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
2352      * in {@link OpenSslEngine}.
2353      */
2354     private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2355         ByteBufAllocator alloc = ctx.alloc();
2356         if (engineType.wantsDirectBuffer) {
2357             return alloc.directBuffer(capacity);
2358         } else {
2359             return alloc.buffer(capacity);
2360         }
2361     }
2362 
2363     /**
2364      * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
2365      * the specified amount of pending bytes.
2366      */
2367     private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2368         return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2369     }
2370 
2371     private boolean isStateSet(int bit) {
2372         return (state & bit) == bit;
2373     }
2374 
2375     private void setState(int bit) {
2376         state |= bit;
2377     }
2378 
2379     private void clearState(int bit) {
2380         state &= ~bit;
2381     }
2382 
2383     /**
2384      * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
2385      * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
2386      * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
2387      */
2388     private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2389 
2390         SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
2391             super(channel, initSize);
2392         }
2393 
2394         @Override
2395         protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
2396             final int wrapDataSize = SslHandler.this.wrapDataSize;
2397             if (cumulation instanceof CompositeByteBuf) {
2398                 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
2399                 int numComponents = composite.numComponents();
2400                 if (numComponents == 0 ||
2401                         !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
2402                     composite.addComponent(true, next);
2403                 }
2404                 return composite;
2405             }
2406             return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
2407                     copyAndCompose(alloc, cumulation, next);
2408         }
2409 
2410         @Override
2411         protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
2412             if (first instanceof CompositeByteBuf) {
2413                 CompositeByteBuf composite = (CompositeByteBuf) first;
2414                 if (engineType.wantsDirectBuffer) {
2415                     first = allocator.directBuffer(composite.readableBytes());
2416                 } else {
2417                     first = allocator.heapBuffer(composite.readableBytes());
2418                 }
2419                 try {
2420                     first.writeBytes(composite);
2421                 } catch (Throwable cause) {
2422                     first.release();
2423                     PlatformDependent.throwException(cause);
2424                 }
2425                 composite.release();
2426             }
2427             return first;
2428         }
2429 
2430         @Override
2431         protected ByteBuf removeEmptyValue() {
2432             return null;
2433         }
2434     }
2435 
2436     private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
2437         final int inReadableBytes = next.readableBytes();
2438         final int cumulationCapacity = cumulation.capacity();
2439         if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
2440                 // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
2441                 // Only copy if there is enough space available and the capacity is large enough, and attempt to
2442                 // resize if the capacity is small.
2443                 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
2444                         cumulationCapacity < wrapDataSize &&
2445                                 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
2446             cumulation.writeBytes(next);
2447             next.release();
2448             return true;
2449         }
2450         return false;
2451     }
2452 
2453     private final class LazyChannelPromise extends DefaultPromise<Channel> {
2454 
2455         @Override
2456         protected EventExecutor executor() {
2457             if (ctx == null) {
2458                 throw new IllegalStateException();
2459             }
2460             return ctx.executor();
2461         }
2462 
2463         @Override
2464         protected void checkDeadLock() {
2465             if (ctx == null) {
2466                 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2467                 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2468                 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2469                 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2470                 // ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an
2471                 // IllegalStateException when trying to call executor().
2472                 return;
2473             }
2474             super.checkDeadLock();
2475         }
2476     }
2477 }