1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractCoalescingBufferQueue;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelException;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelFutureListener;
29 import io.netty.channel.ChannelHandlerContext;
30 import io.netty.channel.ChannelInboundHandler;
31 import io.netty.channel.ChannelOption;
32 import io.netty.channel.ChannelOutboundBuffer;
33 import io.netty.channel.ChannelOutboundHandler;
34 import io.netty.channel.ChannelPipeline;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.unix.UnixChannel;
37 import io.netty.handler.codec.ByteToMessageDecoder;
38 import io.netty.handler.codec.DecoderException;
39 import io.netty.handler.codec.UnsupportedMessageTypeException;
40 import io.netty.util.ReferenceCountUtil;
41 import io.netty.util.concurrent.DefaultPromise;
42 import io.netty.util.concurrent.EventExecutor;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.concurrent.FutureListener;
45 import io.netty.util.concurrent.ImmediateExecutor;
46 import io.netty.util.concurrent.Promise;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.ObjectUtil;
49 import io.netty.util.internal.PlatformDependent;
50 import io.netty.util.internal.ThrowableUtil;
51 import io.netty.util.internal.UnstableApi;
52 import io.netty.util.internal.logging.InternalLogger;
53 import io.netty.util.internal.logging.InternalLoggerFactory;
54
55 import java.io.IOException;
56 import java.net.SocketAddress;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.SocketChannel;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
76 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
77 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
78 import static io.netty.util.internal.ObjectUtil.checkNotNull;
79 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
80
81 /**
82 * Adds <a href="https://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
83 * · TLS</a> and StartTLS support to a {@link Channel}. Please refer
84 * to the <strong>"SecureChat"</strong> example in the distribution or the web
85 * site for the detailed usage.
86 *
87 * <h3>Beginning the handshake</h3>
88 * <p>
89 * Beside using the handshake {@link ChannelFuture} to get notified about the completion of the handshake it's
90 * also possible to detect it by implement the
91 * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
92 * method and check for a {@link SslHandshakeCompletionEvent}.
93 *
94 * <h3>Handshake</h3>
95 * <p>
96 * The handshake will be automatically issued for you once the {@link Channel} is active and
97 * {@link SSLEngine#getUseClientMode()} returns {@code true}.
98 * So no need to bother with it by your self.
99 *
100 * <h3>Closing the session</h3>
101 * <p>
102 * To close the SSL session, the {@link #closeOutbound()} method should be
103 * called to send the {@code close_notify} message to the remote peer. One
104 * exception is when you close the {@link Channel} - {@link SslHandler}
105 * intercepts the close request and send the {@code close_notify} message
106 * before the channel closure automatically. Once the SSL session is closed,
107 * it is not reusable, and consequently you should create a new
108 * {@link SslHandler} with a new {@link SSLEngine} as explained in the
109 * following section.
110 *
111 * <h3>Restarting the session</h3>
112 * <p>
113 * To restart the SSL session, you must remove the existing closed
114 * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
115 * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
116 * and start the handshake process as described in the first section.
117 *
118 * <h3>Implementing StartTLS</h3>
119 * <p>
120 * <a href="https://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
121 * communication pattern that secures the wire in the middle of the plaintext
122 * connection. Please note that it is different from SSL · TLS, that
123 * secures the wire from the beginning of the connection. Typically, StartTLS
124 * is composed of three steps:
125 * <ol>
126 * <li>Client sends a StartTLS request to server.</li>
127 * <li>Server sends a StartTLS response to client.</li>
128 * <li>Client begins SSL handshake.</li>
129 * </ol>
130 * If you implement a server, you need to:
131 * <ol>
132 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
133 * to {@code true},</li>
134 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
135 * <li>write a StartTLS response.</li>
136 * </ol>
137 * Please note that you must insert {@link SslHandler} <em>before</em> sending
138 * the StartTLS response. Otherwise the client can send begin SSL handshake
139 * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
140 * data corruption.
141 * <p>
142 * The client-side implementation is much simpler.
143 * <ol>
144 * <li>Write a StartTLS request,</li>
145 * <li>wait for the StartTLS response,</li>
146 * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
147 * to {@code false},</li>
148 * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
149 * <li>Initiate SSL handshake.</li>
150 * </ol>
151 *
152 * <h3>Known issues</h3>
153 * <p>
154 * Because of a known issue with the current implementation of the SslEngine that comes
155 * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
156 * <p>
157 * So if you are affected you can workaround this problem by adjust the cache settings
158 * like shown below:
159 *
160 * <pre>
161 * SslContext context = ...;
162 * context.getServerSessionContext().setSessionCacheSize(someSaneSize);
163 * context.getServerSessionContext().setSessionTime(someSameTimeout);
164 * </pre>
165 * <p>
166 * What values to use here depends on the nature of your application and should be set
167 * based on monitoring and debugging of it.
168 * For more details see
169 * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
170 */
171 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
172 private static final InternalLogger logger =
173 InternalLoggerFactory.getInstance(SslHandler.class);
174 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
175 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
176 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
177 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
178 private static final int STATE_SENT_FIRST_MESSAGE = 1;
179 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
180 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
181 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
182 /**
183 * Set by wrap*() methods when something is produced.
184 * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
185 */
186 private static final int STATE_NEEDS_FLUSH = 1 << 4;
187 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
188 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
189 private static final int STATE_PROCESS_TASK = 1 << 7;
190 /**
191 * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
192 * when {@link ChannelConfig#isAutoRead()} is {@code false}.
193 */
194 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
195 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
196
197 /**
198 * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
199 * allowed by the TLS RFC.
200 */
201 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
202
203 private enum SslEngineType {
204 TCNATIVE(true, COMPOSITE_CUMULATOR) {
205 @Override
206 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
207 int nioBufferCount = in.nioBufferCount();
208 int writerIndex = out.writerIndex();
209 final SSLEngineResult result;
210 if (nioBufferCount > 1) {
211 /*
212 * If {@link OpenSslEngine} is in use,
213 * we can use a special {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} method
214 * that accepts multiple {@link ByteBuffer}s without additional memory copies.
215 */
216 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
217 try {
218 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
219 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
220 } finally {
221 handler.singleBuffer[0] = null;
222 }
223 } else {
224 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
225 toByteBuffer(out, writerIndex, out.writableBytes()));
226 }
227 out.writerIndex(writerIndex + result.bytesProduced());
228 return result;
229 }
230
231 @Override
232 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
233 int pendingBytes, int numComponents) {
234 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
235 .calculateOutNetBufSize(pendingBytes, numComponents));
236 }
237
238 @Override
239 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
240 return ((ReferenceCountedOpenSslEngine) handler.engine)
241 .calculateMaxLengthForWrap(pendingBytes, numComponents);
242 }
243
244 @Override
245 int calculatePendingData(SslHandler handler, int guess) {
246 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
247 return sslPending > 0 ? sslPending : guess;
248 }
249
250 @Override
251 boolean jdkCompatibilityMode(SSLEngine engine) {
252 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
253 }
254 },
255 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
256 @Override
257 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
258 int nioBufferCount = in.nioBufferCount();
259 int writerIndex = out.writerIndex();
260 final SSLEngineResult result;
261 if (nioBufferCount > 1) {
262 /*
263 * Use a special unwrap method without additional memory copies.
264 */
265 try {
266 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
267 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
268 in.nioBuffers(in.readerIndex(), len),
269 handler.singleBuffer);
270 } finally {
271 handler.singleBuffer[0] = null;
272 }
273 } else {
274 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
275 toByteBuffer(out, writerIndex, out.writableBytes()));
276 }
277 out.writerIndex(writerIndex + result.bytesProduced());
278 return result;
279 }
280
281 @Override
282 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
283 int pendingBytes, int numComponents) {
284 return allocator.directBuffer(
285 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
286 }
287
288 @Override
289 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
290 return ((ConscryptAlpnSslEngine) handler.engine)
291 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
292 }
293
294 @Override
295 int calculatePendingData(SslHandler handler, int guess) {
296 return guess;
297 }
298
299 @Override
300 boolean jdkCompatibilityMode(SSLEngine engine) {
301 return true;
302 }
303 },
304 JDK(false, MERGE_CUMULATOR) {
305 @Override
306 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
307 int writerIndex = out.writerIndex();
308 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
309 int position = inNioBuffer.position();
310 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
311 toByteBuffer(out, writerIndex, out.writableBytes()));
312 out.writerIndex(writerIndex + result.bytesProduced());
313
314 // This is a workaround for a bug in Android 5.0. Android 5.0 does not correctly update the
315 // SSLEngineResult.bytesConsumed() in some cases and just return 0.
316 //
317 // See:
318 // - https://android-review.googlesource.com/c/platform/external/conscrypt/+/122080
319 // - https://github.com/netty/netty/issues/7758
320 if (result.bytesConsumed() == 0) {
321 int consumed = inNioBuffer.position() - position;
322 if (consumed != result.bytesConsumed()) {
323 // Create a new SSLEngineResult with the correct bytesConsumed().
324 return new SSLEngineResult(
325 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
326 }
327 }
328 return result;
329 }
330
331 @Override
332 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
333 int pendingBytes, int numComponents) {
334 // For JDK we don't have a good source for the max wrap overhead. We need at least one packet buffer
335 // size, but may be able to fit more in based on the total requested.
336 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
337 }
338
339 @Override
340 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
341 // As for the JDK SSLEngine we always need to operate on buffer space required by the SSLEngine
342 // (normally ~16KB). This is required even if the amount of data to encrypt is very small. Use heap
343 // buffers to reduce the native memory usage.
344 //
345 // Beside this the JDK SSLEngine also (as of today) will do an extra heap to direct buffer copy
346 // if a direct buffer is used as its internals operate on byte[].
347 return handler.engine.getSession().getPacketBufferSize();
348 }
349
350 @Override
351 int calculatePendingData(SslHandler handler, int guess) {
352 return guess;
353 }
354
355 @Override
356 boolean jdkCompatibilityMode(SSLEngine engine) {
357 return true;
358 }
359 };
360
361 static SslEngineType forEngine(SSLEngine engine) {
362 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
363 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
364 }
365
366 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
367 this.wantsDirectBuffer = wantsDirectBuffer;
368 this.cumulator = cumulator;
369 }
370
371 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
372
373 abstract int calculatePendingData(SslHandler handler, int guess);
374
375 abstract boolean jdkCompatibilityMode(SSLEngine engine);
376
377 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
378 int pendingBytes, int numComponents);
379
380 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
381
382 // BEGIN Platform-dependent flags
383
384 /**
385 * {@code true} if and only if {@link SSLEngine} expects a direct buffer and so if a heap buffer
386 * is given will make an extra memory copy.
387 */
388 final boolean wantsDirectBuffer;
389
390 // END Platform-dependent flags
391
392 /**
393 * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
394 * one {@link ByteBuffer}.
395 *
396 * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
397 * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
398 * and which does not need to do extra memory copies.
399 */
400 final Cumulator cumulator;
401 }
402
403 private volatile ChannelHandlerContext ctx;
404 private final SSLEngine engine;
405 private final SslEngineType engineType;
406 private final Executor delegatedTaskExecutor;
407 private final boolean jdkCompatibilityMode;
408
409 /**
410 * Used if {@link SSLEngine#wrap(ByteBuffer[], ByteBuffer)} and {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer[])}
411 * should be called with a {@link ByteBuf} that is only backed by one {@link ByteBuffer} to reduce the object
412 * creation.
413 */
414 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
415
416 private final boolean startTls;
417
418 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
419 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
420
421 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
422 private Promise<Channel> handshakePromise = new LazyChannelPromise();
423 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
424
425 private int packetLength;
426 private short state;
427
428 private volatile long handshakeTimeoutMillis = 10000;
429 private volatile long closeNotifyFlushTimeoutMillis = 3000;
430 private volatile long closeNotifyReadTimeoutMillis;
431 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
432
433 /**
434 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
435 *
436 * @param engine the {@link SSLEngine} this handler will use
437 */
438 public SslHandler(SSLEngine engine) {
439 this(engine, false);
440 }
441
442 /**
443 * Creates a new instance which runs all delegated tasks directly on the {@link EventExecutor}.
444 *
445 * @param engine the {@link SSLEngine} this handler will use
446 * @param startTls {@code true} if the first write request shouldn't be
447 * encrypted by the {@link SSLEngine}
448 */
449 public SslHandler(SSLEngine engine, boolean startTls) {
450 this(engine, startTls, ImmediateExecutor.INSTANCE);
451 }
452
453 /**
454 * Creates a new instance.
455 *
456 * @param engine the {@link SSLEngine} this handler will use
457 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
458 * {@link SSLEngine#getDelegatedTask()}.
459 */
460 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
461 this(engine, false, delegatedTaskExecutor);
462 }
463
464 /**
465 * Creates a new instance.
466 *
467 * @param engine the {@link SSLEngine} this handler will use
468 * @param startTls {@code true} if the first write request shouldn't be
469 * encrypted by the {@link SSLEngine}
470 * @param delegatedTaskExecutor the {@link Executor} that will be used to execute tasks that are returned by
471 * {@link SSLEngine#getDelegatedTask()}.
472 */
473 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
474 this.engine = ObjectUtil.checkNotNull(engine, "engine");
475 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
476 engineType = SslEngineType.forEngine(engine);
477 this.startTls = startTls;
478 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
479 setCumulator(engineType.cumulator);
480 }
481
482 public long getHandshakeTimeoutMillis() {
483 return handshakeTimeoutMillis;
484 }
485
486 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
487 checkNotNull(unit, "unit");
488 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
489 }
490
491 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
492 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
493 }
494
495 /**
496 * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
497 * <p>
498 * This value will partition data which is passed to write
499 * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
500 * <ul>
501 * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
502 * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
503 * <li>If {@code wrapDataSize > data size} Else if {@code wrapDataSize <= data size} then we will divide the data
504 * into chunks of {@code wrapDataSize} when writing.</li>
505 * </ul>
506 * <p>
507 * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
508 * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
509 * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
510 * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
511 * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
512 * @param wrapDataSize the number of bytes which will be passed to each
513 * {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
514 */
515 @UnstableApi
516 public final void setWrapDataSize(int wrapDataSize) {
517 this.wrapDataSize = wrapDataSize;
518 }
519
520 /**
521 * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
522 */
523 @Deprecated
524 public long getCloseNotifyTimeoutMillis() {
525 return getCloseNotifyFlushTimeoutMillis();
526 }
527
528 /**
529 * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
530 */
531 @Deprecated
532 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
533 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
534 }
535
536 /**
537 * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
538 */
539 @Deprecated
540 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
541 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
542 }
543
544 /**
545 * Gets the timeout for flushing the close_notify that was triggered by closing the
546 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
547 * forcibly.
548 */
549 public final long getCloseNotifyFlushTimeoutMillis() {
550 return closeNotifyFlushTimeoutMillis;
551 }
552
553 /**
554 * Sets the timeout for flushing the close_notify that was triggered by closing the
555 * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
556 * forcibly.
557 */
558 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
559 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
560 }
561
562 /**
563 * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
564 */
565 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
566 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
567 "closeNotifyFlushTimeoutMillis");
568 }
569
570 /**
571 * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
572 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
573 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
574 */
575 public final long getCloseNotifyReadTimeoutMillis() {
576 return closeNotifyReadTimeoutMillis;
577 }
578
579 /**
580 * Sets the timeout for receiving the response for the close_notify that was triggered by closing the
581 * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
582 * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
583 */
584 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
585 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
586 }
587
588 /**
589 * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
590 */
591 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
592 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
593 "closeNotifyReadTimeoutMillis");
594 }
595
596 /**
597 * Returns the {@link SSLEngine} which is used by this handler.
598 */
599 public SSLEngine engine() {
600 return engine;
601 }
602
603 /**
604 * Returns the name of the current application-level protocol.
605 *
606 * @return the protocol name or {@code null} if application-level protocol has not been negotiated
607 */
608 public String applicationProtocol() {
609 SSLEngine engine = engine();
610 if (!(engine instanceof ApplicationProtocolAccessor)) {
611 return null;
612 }
613
614 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
615 }
616
617 /**
618 * Returns a {@link Future} that will get notified once the current TLS handshake completes.
619 *
620 * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
621 * The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
622 */
623 public Future<Channel> handshakeFuture() {
624 return handshakePromise;
625 }
626
627 /**
628 * Use {@link #closeOutbound()}
629 */
630 @Deprecated
631 public ChannelFuture close() {
632 return closeOutbound();
633 }
634
635 /**
636 * Use {@link #closeOutbound(ChannelPromise)}
637 */
638 @Deprecated
639 public ChannelFuture close(ChannelPromise promise) {
640 return closeOutbound(promise);
641 }
642
643 /**
644 * Sends an SSL {@code close_notify} message to the specified channel and
645 * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
646 * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
647 * {@link ChannelHandlerContext#close()}
648 */
649 public ChannelFuture closeOutbound() {
650 return closeOutbound(ctx.newPromise());
651 }
652
653 /**
654 * Sends an SSL {@code close_notify} message to the specified channel and
655 * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
656 * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
657 * {@link ChannelHandlerContext#close()}
658 */
659 public ChannelFuture closeOutbound(final ChannelPromise promise) {
660 final ChannelHandlerContext ctx = this.ctx;
661 if (ctx.executor().inEventLoop()) {
662 closeOutbound0(promise);
663 } else {
664 ctx.executor().execute(new Runnable() {
665 @Override
666 public void run() {
667 closeOutbound0(promise);
668 }
669 });
670 }
671 return promise;
672 }
673
674 private void closeOutbound0(ChannelPromise promise) {
675 setState(STATE_OUTBOUND_CLOSED);
676 engine.closeOutbound();
677 try {
678 flush(ctx, promise);
679 } catch (Exception e) {
680 if (!promise.tryFailure(e)) {
681 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
682 }
683 }
684 }
685
686 /**
687 * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
688 *
689 * This method will return the same {@link Future} all the time.
690 *
691 * @see SSLEngine
692 */
693 public Future<Channel> sslCloseFuture() {
694 return sslClosePromise;
695 }
696
697 @Override
698 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
699 try {
700 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
701 // Check if queue is not empty first because create a new ChannelException is expensive
702 pendingUnencryptedWrites.releaseAndFailAll(ctx,
703 new ChannelException("Pending write on removal of SslHandler"));
704 }
705 pendingUnencryptedWrites = null;
706
707 SSLException cause = null;
708
709 // If the handshake or SSLEngine closure is not done yet we should fail corresponding promise and
710 // notify the rest of the
711 // pipeline.
712 if (!handshakePromise.isDone()) {
713 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
714 if (handshakePromise.tryFailure(cause)) {
715 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
716 }
717 }
718 if (!sslClosePromise.isDone()) {
719 if (cause == null) {
720 cause = new SSLException("SslHandler removed before SSLEngine was closed");
721 }
722 notifyClosePromise(cause);
723 }
724 } finally {
725 ReferenceCountUtil.release(engine);
726 }
727 }
728
729 @Override
730 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
731 ctx.bind(localAddress, promise);
732 }
733
734 @Override
735 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
736 ChannelPromise promise) throws Exception {
737 ctx.connect(remoteAddress, localAddress, promise);
738 }
739
740 @Override
741 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
742 ctx.deregister(promise);
743 }
744
745 @Override
746 public void disconnect(final ChannelHandlerContext ctx,
747 final ChannelPromise promise) throws Exception {
748 closeOutboundAndChannel(ctx, promise, true);
749 }
750
751 @Override
752 public void close(final ChannelHandlerContext ctx,
753 final ChannelPromise promise) throws Exception {
754 closeOutboundAndChannel(ctx, promise, false);
755 }
756
757 @Override
758 public void read(ChannelHandlerContext ctx) throws Exception {
759 if (!handshakePromise.isDone()) {
760 setState(STATE_READ_DURING_HANDSHAKE);
761 }
762
763 ctx.read();
764 }
765
766 private static IllegalStateException newPendingWritesNullException() {
767 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
768 }
769
770 @Override
771 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
772 if (!(msg instanceof ByteBuf)) {
773 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
774 ReferenceCountUtil.safeRelease(msg);
775 promise.setFailure(exception);
776 } else if (pendingUnencryptedWrites == null) {
777 ReferenceCountUtil.safeRelease(msg);
778 promise.setFailure(newPendingWritesNullException());
779 } else {
780 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
781 }
782 }
783
784 @Override
785 public void flush(ChannelHandlerContext ctx) throws Exception {
786 // Do not encrypt the first write request if this handler is
787 // created with startTLS flag turned on.
788 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
789 setState(STATE_SENT_FIRST_MESSAGE);
790 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
791 forceFlush(ctx);
792 // Explicit start handshake processing once we send the first message. This will also ensure
793 // we will schedule the timeout if needed.
794 startHandshakeProcessing(true);
795 return;
796 }
797
798 if (isStateSet(STATE_PROCESS_TASK)) {
799 return;
800 }
801
802 try {
803 wrapAndFlush(ctx);
804 } catch (Throwable cause) {
805 setHandshakeFailure(ctx, cause);
806 PlatformDependent.throwException(cause);
807 }
808 }
809
810 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
811 if (pendingUnencryptedWrites.isEmpty()) {
812 // It's important to NOT use a voidPromise here as the user
813 // may want to add a ChannelFutureListener to the ChannelPromise later.
814 //
815 // See https://github.com/netty/netty/issues/3364
816 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
817 }
818 if (!handshakePromise.isDone()) {
819 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
820 }
821 try {
822 wrap(ctx, false);
823 } finally {
824 // We may have written some parts of data before an exception was thrown so ensure we always flush.
825 // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
826 forceFlush(ctx);
827 }
828 }
829
830 // This method will not call setHandshakeFailure(...) !
831 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
832 ByteBuf out = null;
833 ByteBufAllocator alloc = ctx.alloc();
834 try {
835 final int wrapDataSize = this.wrapDataSize;
836 // Only continue to loop if the handler was not removed in the meantime.
837 // See https://github.com/netty/netty/issues/5860
838 outer: while (!ctx.isRemoved()) {
839 ChannelPromise promise = ctx.newPromise();
840 ByteBuf buf = wrapDataSize > 0 ?
841 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
842 pendingUnencryptedWrites.removeFirst(promise);
843 if (buf == null) {
844 break;
845 }
846
847 SSLEngineResult result;
848
849 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
850 // If we pulled a buffer larger than the supported packet size, we can slice it up and iteratively,
851 // encrypting multiple packets into a single larger buffer. This substantially saves on allocations
852 // for large responses. Here we estimate how large of a buffer we need. If we overestimate a bit,
853 // that's fine. If we underestimate, we'll simply re-enqueue the remaining buffer and get it on the
854 // next outer loop.
855 int readableBytes = buf.readableBytes();
856 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
857 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
858 numPackets += 1;
859 }
860
861 if (out == null) {
862 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
863 }
864 result = wrapMultiple(alloc, engine, buf, out);
865 } else {
866 if (out == null) {
867 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
868 }
869 result = wrap(alloc, engine, buf, out);
870 }
871
872 if (buf.isReadable()) {
873 pendingUnencryptedWrites.addFirst(buf, promise);
874 // When we add the buffer/promise pair back we need to be sure we don't complete the promise
875 // later. We only complete the promise if the buffer is completely consumed.
876 promise = null;
877 } else {
878 buf.release();
879 }
880
881 // We need to write any data before we invoke any methods which may trigger re-entry, otherwise
882 // writes may occur out of order and TLS sequencing may be off (e.g. SSLV3_ALERT_BAD_RECORD_MAC).
883 if (out.isReadable()) {
884 final ByteBuf b = out;
885 out = null;
886 if (promise != null) {
887 ctx.write(b, promise);
888 } else {
889 ctx.write(b);
890 }
891 } else if (promise != null) {
892 ctx.write(Unpooled.EMPTY_BUFFER, promise);
893 }
894 // else out is not readable we can re-use it and so save an extra allocation
895
896 if (result.getStatus() == Status.CLOSED) {
897 // First check if there is any write left that needs to be failed, if there is none we don't need
898 // to create a new exception or obtain an existing one.
899 if (!pendingUnencryptedWrites.isEmpty()) {
900 // Make a best effort to preserve any exception that way previously encountered from the
901 // handshake or the transport, else fallback to a general error.
902 Throwable exception = handshakePromise.cause();
903 if (exception == null) {
904 exception = sslClosePromise.cause();
905 if (exception == null) {
906 exception = new SslClosedEngineException("SSLEngine closed already");
907 }
908 }
909 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
910 }
911
912 return;
913 } else {
914 switch (result.getHandshakeStatus()) {
915 case NEED_TASK:
916 if (!runDelegatedTasks(inUnwrap)) {
917 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
918 // resume once the task completes.
919 break outer;
920 }
921 break;
922 case FINISHED:
923 case NOT_HANDSHAKING: // work around for android bug that skips the FINISHED state.
924 setHandshakeSuccess();
925 break;
926 case NEED_WRAP:
927 // If we are expected to wrap again and we produced some data we need to ensure there
928 // is something in the queue to process as otherwise we will not try again before there
929 // was more added. Failing to do so may fail to produce an alert that can be
930 // consumed by the remote peer.
931 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
932 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
933 }
934 break;
935 case NEED_UNWRAP:
936 // The underlying engine is starving so we need to feed it with more data.
937 // See https://github.com/netty/netty/pull/5039
938 readIfNeeded(ctx);
939 return;
940 default:
941 throw new IllegalStateException(
942 "Unknown handshake status: " + result.getHandshakeStatus());
943 }
944 }
945 }
946 } finally {
947 if (out != null) {
948 out.release();
949 }
950 if (inUnwrap) {
951 setState(STATE_NEEDS_FLUSH);
952 }
953 }
954 }
955
956 /**
957 * This method will not call
958 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
959 * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
960 * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
961 */
962 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
963 ByteBuf out = null;
964 ByteBufAllocator alloc = ctx.alloc();
965 try {
966 // Only continue to loop if the handler was not removed in the meantime.
967 // See https://github.com/netty/netty/issues/5860
968 outer: while (!ctx.isRemoved()) {
969 if (out == null) {
970 // As this is called for the handshake we have no real idea how big the buffer needs to be.
971 // That said 2048 should give us enough room to include everything like ALPN / NPN data.
972 // If this is not enough we will increase the buffer in wrap(...).
973 out = allocateOutNetBuf(ctx, 2048, 1);
974 }
975 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
976 if (result.bytesProduced() > 0) {
977 ctx.write(out).addListener(new ChannelFutureListener() {
978 @Override
979 public void operationComplete(ChannelFuture future) {
980 Throwable cause = future.cause();
981 if (cause != null) {
982 setHandshakeFailureTransportFailure(ctx, cause);
983 }
984 }
985 });
986 if (inUnwrap) {
987 setState(STATE_NEEDS_FLUSH);
988 }
989 out = null;
990 }
991
992 HandshakeStatus status = result.getHandshakeStatus();
993 switch (status) {
994 case FINISHED:
995 // We may be here because we read data and discovered the remote peer initiated a renegotiation
996 // and this write is to complete the new handshake. The user may have previously done a
997 // writeAndFlush which wasn't able to wrap data due to needing the pending handshake, so we
998 // attempt to wrap application data here if any is pending.
999 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1000 wrap(ctx, true);
1001 }
1002 return false;
1003 case NEED_TASK:
1004 if (!runDelegatedTasks(inUnwrap)) {
1005 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1006 // resume once the task completes.
1007 break outer;
1008 }
1009 break;
1010 case NEED_UNWRAP:
1011 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1012 // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
1013 // no use in trying to call wrap again because we have already attempted (or will after we
1014 // return) to feed more data to the engine.
1015 return false;
1016 }
1017 break;
1018 case NEED_WRAP:
1019 break;
1020 case NOT_HANDSHAKING:
1021 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1022 wrap(ctx, true);
1023 }
1024 // Workaround for TLS False Start problem reported at:
1025 // https://github.com/netty/netty/issues/1108#issuecomment-14266970
1026 if (!inUnwrap) {
1027 unwrapNonAppData(ctx);
1028 }
1029 return true;
1030 default:
1031 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1032 }
1033
1034 // Check if did not produce any bytes and if so break out of the loop, but only if we did not process
1035 // a task as last action. It's fine to not produce any data as part of executing a task.
1036 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1037 break;
1038 }
1039
1040 // It should not consume empty buffers when it is not handshaking
1041 // Fix for Android, where it was encrypting empty buffers even when not handshaking
1042 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1043 break;
1044 }
1045 }
1046 } finally {
1047 if (out != null) {
1048 out.release();
1049 }
1050 }
1051 return false;
1052 }
1053
1054 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1055 throws SSLException {
1056 SSLEngineResult result = null;
1057
1058 do {
1059 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1060 // This call over-estimates, because we are slicing and not every nioBuffer will be part of
1061 // every slice. We could improve the estimate by having an nioBufferCount(offset, length).
1062 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1063
1064 if (!out.isWritable(nextOutSize)) {
1065 if (result != null) {
1066 // We underestimated the space needed to encrypt the entire in buf. Break out, and
1067 // upstream will re-enqueue the buffer for later.
1068 break;
1069 }
1070 // This shouldn't happen, as the out buf was properly sized for at least packetLength
1071 // prior to calling wrap.
1072 out.ensureWritable(nextOutSize);
1073 }
1074
1075 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1076 result = wrap(alloc, engine, wrapBuf, out);
1077
1078 if (result.getStatus() == Status.CLOSED) {
1079 // If the engine gets closed, we can exit out early. Otherwise, we'll do a full handling of
1080 // possible results once finished.
1081 break;
1082 }
1083
1084 if (wrapBuf.isReadable()) {
1085 // There may be some left-over, in which case we can just pick it up next loop, so reset the original
1086 // reader index so its included again in the next slice.
1087 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1088 }
1089 } while (in.readableBytes() > 0);
1090
1091 return result;
1092 }
1093
1094 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1095 throws SSLException {
1096 ByteBuf newDirectIn = null;
1097 try {
1098 int readerIndex = in.readerIndex();
1099 int readableBytes = in.readableBytes();
1100
1101 // We will call SslEngine.wrap(ByteBuffer[], ByteBuffer) to allow efficient handling of
1102 // CompositeByteBuf without force an extra memory copy when CompositeByteBuffer.nioBuffer() is called.
1103 final ByteBuffer[] in0;
1104 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1105 // As CompositeByteBuf.nioBufferCount() can be expensive (as it needs to check all composed ByteBuf
1106 // to calculate the count) we will just assume a CompositeByteBuf contains more then 1 ByteBuf.
1107 // The worst that can happen is that we allocate an extra ByteBuffer[] in CompositeByteBuf.nioBuffers()
1108 // which is better then walking the composed ByteBuf in most cases.
1109 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1110 in0 = singleBuffer;
1111 // We know its only backed by 1 ByteBuffer so use internalNioBuffer to keep object allocation
1112 // to a minimum.
1113 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1114 } else {
1115 in0 = in.nioBuffers();
1116 }
1117 } else {
1118 // We could even go further here and check if its a CompositeByteBuf and if so try to decompose it and
1119 // only replace the ByteBuffer that are not direct. At the moment we just will replace the whole
1120 // CompositeByteBuf to keep the complexity to a minimum
1121 newDirectIn = alloc.directBuffer(readableBytes);
1122 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1123 in0 = singleBuffer;
1124 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1125 }
1126
1127 for (;;) {
1128 // Use toByteBuffer(...) which might be able to return the internal ByteBuffer and so reduce
1129 // allocations.
1130 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1131 SSLEngineResult result = engine.wrap(in0, out0);
1132 in.skipBytes(result.bytesConsumed());
1133 out.writerIndex(out.writerIndex() + result.bytesProduced());
1134
1135 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1136 out.ensureWritable(engine.getSession().getPacketBufferSize());
1137 } else {
1138 return result;
1139 }
1140 }
1141 } finally {
1142 // Null out to allow GC of ByteBuffer
1143 singleBuffer[0] = null;
1144
1145 if (newDirectIn != null) {
1146 newDirectIn.release();
1147 }
1148 }
1149 }
1150
1151 @Override
1152 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1153 boolean handshakeFailed = handshakePromise.cause() != null;
1154
1155 // Channel closed, we will generate 'ClosedChannelException' now.
1156 ClosedChannelException exception = new ClosedChannelException();
1157
1158 // Add a supressed exception if the handshake was not completed yet.
1159 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1160 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1161 "Connection closed while SSL/TLS handshake was in progress",
1162 SslHandler.class, "channelInactive"));
1163 }
1164
1165 // Make sure to release SSLEngine,
1166 // and notify the handshake future if the connection has been closed during handshake.
1167 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1168 false);
1169
1170 // Ensure we always notify the sslClosePromise as well
1171 notifyClosePromise(exception);
1172
1173 try {
1174 super.channelInactive(ctx);
1175 } catch (DecoderException e) {
1176 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1177 // We only rethrow the exception if the handshake did not fail before channelInactive(...) was called
1178 // as otherwise this may produce duplicated failures as super.channelInactive(...) will also call
1179 // channelRead(...).
1180 //
1181 // See https://github.com/netty/netty/issues/10119
1182 throw e;
1183 }
1184 }
1185 }
1186
1187 @Override
1188 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1189 if (ignoreException(cause)) {
1190 // It is safe to ignore the 'connection reset by peer' or
1191 // 'broken pipe' error after sending close_notify.
1192 if (logger.isDebugEnabled()) {
1193 logger.debug(
1194 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1195 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1196 }
1197
1198 // Close the connection explicitly just in case the transport
1199 // did not close the connection automatically.
1200 if (ctx.channel().isActive()) {
1201 ctx.close();
1202 }
1203 } else {
1204 ctx.fireExceptionCaught(cause);
1205 }
1206 }
1207
1208 /**
1209 * Checks if the given {@link Throwable} can be ignore and just "swallowed"
1210 *
1211 * When an ssl connection is closed a close_notify message is sent.
1212 * After that the peer also sends close_notify however, it's not mandatory to receive
1213 * the close_notify. The party who sent the initial close_notify can close the connection immediately
1214 * then the peer will get connection reset error.
1215 *
1216 */
1217 private boolean ignoreException(Throwable t) {
1218 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1219 String message = t.getMessage();
1220
1221 // first try to match connection reset / broke peer based on the regex. This is the fastest way
1222 // but may fail on different jdk impls or OS's
1223 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1224 return true;
1225 }
1226
1227 // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
1228 StackTraceElement[] elements = t.getStackTrace();
1229 for (StackTraceElement element: elements) {
1230 String classname = element.getClassName();
1231 String methodname = element.getMethodName();
1232
1233 // skip all classes that belong to the io.netty package
1234 if (classname.startsWith("io.netty.")) {
1235 continue;
1236 }
1237
1238 // check if the method name is read if not skip it
1239 if (!"read".equals(methodname)) {
1240 continue;
1241 }
1242
1243 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
1244 // also others
1245 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1246 return true;
1247 }
1248
1249 try {
1250 // No match by now.. Try to load the class via classloader and inspect it.
1251 // This is mainly done as other JDK implementations may differ in name of
1252 // the impl.
1253 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1254
1255 if (SocketChannel.class.isAssignableFrom(clazz)
1256 || DatagramChannel.class.isAssignableFrom(clazz)) {
1257 return true;
1258 }
1259
1260 // also match against SctpChannel via String matching as it may not present.
1261 if (PlatformDependent.javaVersion() >= 7
1262 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1263 return true;
1264 }
1265 } catch (Throwable cause) {
1266 if (logger.isDebugEnabled()) {
1267 logger.debug("Unexpected exception while loading class {} classname {}",
1268 getClass(), classname, cause);
1269 }
1270 }
1271 }
1272 }
1273
1274 return false;
1275 }
1276
1277 /**
1278 * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1279 * will not increase the readerIndex of the given {@link ByteBuf}.
1280 *
1281 * @param buffer
1282 * The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1283 * otherwise it will throw an {@link IllegalArgumentException}.
1284 * @return encrypted
1285 * {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1286 * @throws IllegalArgumentException
1287 * Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1288 */
1289 public static boolean isEncrypted(ByteBuf buffer) {
1290 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1291 throw new IllegalArgumentException(
1292 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1293 }
1294 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1295 }
1296
1297 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1298 int packetLength = this.packetLength;
1299 // If we calculated the length of the current SSL record before, use that information.
1300 if (packetLength > 0) {
1301 if (in.readableBytes() < packetLength) {
1302 return;
1303 }
1304 } else {
1305 // Get the packet length and wait until we get a packets worth of data to unwrap.
1306 final int readableBytes = in.readableBytes();
1307 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1308 return;
1309 }
1310 packetLength = getEncryptedPacketLength(in, in.readerIndex());
1311 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1312 // Not an SSL/TLS packet
1313 NotSslRecordException e = new NotSslRecordException(
1314 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1315 in.skipBytes(in.readableBytes());
1316
1317 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1318 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1319 setHandshakeFailure(ctx, e);
1320
1321 throw e;
1322 }
1323 if (packetLength == NOT_ENOUGH_DATA) {
1324 return;
1325 }
1326 assert packetLength > 0;
1327 if (packetLength > readableBytes) {
1328 // wait until the whole packet can be read
1329 this.packetLength = packetLength;
1330 return;
1331 }
1332 }
1333
1334 // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1335 // be consumed by the SSLEngine.
1336 this.packetLength = 0;
1337 try {
1338 final int bytesConsumed = unwrap(ctx, in, packetLength);
1339 assert bytesConsumed == packetLength || engine.isInboundDone() :
1340 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1341 bytesConsumed;
1342 } catch (Throwable cause) {
1343 handleUnwrapThrowable(ctx, cause);
1344 }
1345 }
1346
1347 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1348 try {
1349 unwrap(ctx, in, in.readableBytes());
1350 } catch (Throwable cause) {
1351 handleUnwrapThrowable(ctx, cause);
1352 }
1353 }
1354
1355 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1356 try {
1357 // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1358 // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1359 // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1360 // has been closed.
1361 if (handshakePromise.tryFailure(cause)) {
1362 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1363 }
1364
1365 // Let's check if the handler was removed in the meantime and so pendingUnencryptedWrites is null.
1366 if (pendingUnencryptedWrites != null) {
1367 // We need to flush one time as there may be an alert that we should send to the remote peer because
1368 // of the SSLException reported here.
1369 wrapAndFlush(ctx);
1370 }
1371 } catch (SSLException ex) {
1372 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1373 " because of an previous SSLException, ignoring...", ex);
1374 } finally {
1375 // ensure we always flush and close the channel.
1376 setHandshakeFailure(ctx, cause, true, false, true);
1377 }
1378 PlatformDependent.throwException(cause);
1379 }
1380
1381 @Override
1382 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1383 if (isStateSet(STATE_PROCESS_TASK)) {
1384 return;
1385 }
1386 if (jdkCompatibilityMode) {
1387 decodeJdkCompatible(ctx, in);
1388 } else {
1389 decodeNonJdkCompatible(ctx, in);
1390 }
1391 }
1392
1393 @Override
1394 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1395 channelReadComplete0(ctx);
1396 }
1397
1398 private void channelReadComplete0(ChannelHandlerContext ctx) {
1399 // Discard bytes of the cumulation buffer if needed.
1400 discardSomeReadBytes();
1401
1402 flushIfNeeded(ctx);
1403 readIfNeeded(ctx);
1404
1405 clearState(STATE_FIRE_CHANNEL_READ);
1406 ctx.fireChannelReadComplete();
1407 }
1408
1409 private void readIfNeeded(ChannelHandlerContext ctx) {
1410 // If handshake is not finished yet, we need more data.
1411 if (!ctx.channel().config().isAutoRead() &&
1412 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1413 // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1414 // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1415 ctx.read();
1416 }
1417 }
1418
1419 private void flushIfNeeded(ChannelHandlerContext ctx) {
1420 if (isStateSet(STATE_NEEDS_FLUSH)) {
1421 forceFlush(ctx);
1422 }
1423 }
1424
1425 /**
1426 * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1427 */
1428 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1429 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1430 }
1431
1432 /**
1433 * Unwraps inbound SSL records.
1434 */
1435 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1436 final int originalLength = length;
1437 boolean wrapLater = false;
1438 boolean notifyClosure = false;
1439 boolean executedRead = false;
1440 ByteBuf decodeOut = allocate(ctx, length);
1441 try {
1442 // Only continue to loop if the handler was not removed in the meantime.
1443 // See https://github.com/netty/netty/issues/5860
1444 do {
1445 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1446 final Status status = result.getStatus();
1447 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1448 final int produced = result.bytesProduced();
1449 final int consumed = result.bytesConsumed();
1450
1451 // Skip bytes now in case unwrap is called in a re-entry scenario. For example LocalChannel.read()
1452 // may entry this method in a re-entry fashion and if the peer is writing into a shared buffer we may
1453 // unwrap the same data multiple times.
1454 packet.skipBytes(consumed);
1455 length -= consumed;
1456
1457 // The expected sequence of events is:
1458 // 1. Notify of handshake success
1459 // 2. fireChannelRead for unwrapped data
1460 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1461 wrapLater |= (decodeOut.isReadable() ?
1462 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1463 handshakeStatus == HandshakeStatus.FINISHED;
1464 }
1465
1466 // Dispatch decoded data after we have notified of handshake success. If this method has been invoked
1467 // in a re-entry fashion we execute a task on the executor queue to process after the stack unwinds
1468 // to preserve order of events.
1469 if (decodeOut.isReadable()) {
1470 setState(STATE_FIRE_CHANNEL_READ);
1471 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1472 executedRead = true;
1473 executeChannelRead(ctx, decodeOut);
1474 } else {
1475 ctx.fireChannelRead(decodeOut);
1476 }
1477 decodeOut = null;
1478 }
1479
1480 if (status == Status.CLOSED) {
1481 notifyClosure = true; // notify about the CLOSED state of the SSLEngine. See #137
1482 } else if (status == Status.BUFFER_OVERFLOW) {
1483 if (decodeOut != null) {
1484 decodeOut.release();
1485 }
1486 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1487 // Allocate a new buffer which can hold all the rest data and loop again.
1488 // It may happen that applicationBufferSize < produced while there is still more to unwrap, in this
1489 // case we will just allocate a new buffer with the capacity of applicationBufferSize and call
1490 // unwrap again.
1491 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1492 applicationBufferSize : applicationBufferSize - produced));
1493 continue;
1494 }
1495
1496 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1497 boolean pending = runDelegatedTasks(true);
1498 if (!pending) {
1499 // We scheduled a task on the delegatingTaskExecutor, so stop processing as we will
1500 // resume once the task completes.
1501 //
1502 // We break out of the loop only and do NOT return here as we still may need to notify
1503 // about the closure of the SSLEngine.
1504 wrapLater = false;
1505 break;
1506 }
1507 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1508 // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1509 // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1510 // costly unwrap operation and break out of the loop.
1511 if (wrapNonAppData(ctx, true) && length == 0) {
1512 break;
1513 }
1514 }
1515
1516 if (status == Status.BUFFER_UNDERFLOW ||
1517 // If we processed NEED_TASK we should try again even we did not consume or produce anything.
1518 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1519 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1520 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1521 // The underlying engine is starving so we need to feed it with more data.
1522 // See https://github.com/netty/netty/pull/5039
1523 readIfNeeded(ctx);
1524 }
1525
1526 break;
1527 } else if (decodeOut == null) {
1528 decodeOut = allocate(ctx, length);
1529 }
1530 } while (!ctx.isRemoved());
1531
1532 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1533 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1534 // we do not stale.
1535 //
1536 // See https://github.com/netty/netty/pull/2437
1537 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1538 wrapLater = true;
1539 }
1540
1541 if (wrapLater) {
1542 wrap(ctx, true);
1543 }
1544 } finally {
1545 if (decodeOut != null) {
1546 decodeOut.release();
1547 }
1548
1549 if (notifyClosure) {
1550 if (executedRead) {
1551 executeNotifyClosePromise(ctx);
1552 } else {
1553 notifyClosePromise(null);
1554 }
1555 }
1556 }
1557 return originalLength - length;
1558 }
1559
1560 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1561 // setHandshakeSuccess calls out to external methods which may trigger re-entry. We need to preserve ordering of
1562 // fireChannelRead for decodeOut relative to re-entry data.
1563 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1564 if (setReentryState) {
1565 setState(STATE_UNWRAP_REENTRY);
1566 }
1567 try {
1568 return setHandshakeSuccess();
1569 } finally {
1570 // It is unlikely this specific method will be re-entry because handshake completion is infrequent, but just
1571 // in case we only clear the state if we set it in the first place.
1572 if (setReentryState) {
1573 clearState(STATE_UNWRAP_REENTRY);
1574 }
1575 }
1576 }
1577
1578 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1579 try {
1580 ctx.executor().execute(new Runnable() {
1581 @Override
1582 public void run() {
1583 notifyClosePromise(null);
1584 }
1585 });
1586 } catch (RejectedExecutionException e) {
1587 notifyClosePromise(e);
1588 }
1589 }
1590
1591 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1592 try {
1593 ctx.executor().execute(new Runnable() {
1594 @Override
1595 public void run() {
1596 ctx.fireChannelRead(decodedOut);
1597 }
1598 });
1599 } catch (RejectedExecutionException e) {
1600 decodedOut.release();
1601 throw e;
1602 }
1603 }
1604
1605 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1606 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1607 out.nioBuffer(index, len);
1608 }
1609
1610 private static boolean inEventLoop(Executor executor) {
1611 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1612 }
1613
1614 /**
1615 * Will either run the delegated task directly calling {@link Runnable#run()} and return {@code true} or will
1616 * offload the delegated task using {@link Executor#execute(Runnable)} and return {@code false}.
1617 *
1618 * If the task is offloaded it will take care to resume its work on the {@link EventExecutor} once there are no
1619 * more tasks to process.
1620 */
1621 private boolean runDelegatedTasks(boolean inUnwrap) {
1622 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1623 // We should run the task directly in the EventExecutor thread and not offload at all. As we are on the
1624 // EventLoop we can just run all tasks at once.
1625 for (;;) {
1626 Runnable task = engine.getDelegatedTask();
1627 if (task == null) {
1628 return true;
1629 }
1630 setState(STATE_PROCESS_TASK);
1631 if (task instanceof AsyncRunnable) {
1632 // Let's set the task to processing task before we try to execute it.
1633 boolean pending = false;
1634 try {
1635 AsyncRunnable asyncTask = (AsyncRunnable) task;
1636 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1637 asyncTask.run(completionHandler);
1638 pending = completionHandler.resumeLater();
1639 if (pending) {
1640 return false;
1641 }
1642 } finally {
1643 if (!pending) {
1644 // The task has completed, lets clear the state. If it is not completed we will clear the
1645 // state once it is.
1646 clearState(STATE_PROCESS_TASK);
1647 }
1648 }
1649 } else {
1650 try {
1651 task.run();
1652 } finally {
1653 clearState(STATE_PROCESS_TASK);
1654 }
1655 }
1656 }
1657 } else {
1658 executeDelegatedTask(inUnwrap);
1659 return false;
1660 }
1661 }
1662
1663 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1664 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1665 }
1666
1667 private void executeDelegatedTask(boolean inUnwrap) {
1668 executeDelegatedTask(getTaskRunner(inUnwrap));
1669 }
1670
1671 private void executeDelegatedTask(SslTasksRunner task) {
1672 setState(STATE_PROCESS_TASK);
1673 try {
1674 delegatedTaskExecutor.execute(task);
1675 } catch (RejectedExecutionException e) {
1676 clearState(STATE_PROCESS_TASK);
1677 throw e;
1678 }
1679 }
1680
1681 private final class AsyncTaskCompletionHandler implements Runnable {
1682 private final boolean inUnwrap;
1683 boolean didRun;
1684 boolean resumeLater;
1685
1686 AsyncTaskCompletionHandler(boolean inUnwrap) {
1687 this.inUnwrap = inUnwrap;
1688 }
1689
1690 @Override
1691 public void run() {
1692 didRun = true;
1693 if (resumeLater) {
1694 getTaskRunner(inUnwrap).runComplete();
1695 }
1696 }
1697
1698 boolean resumeLater() {
1699 if (!didRun) {
1700 resumeLater = true;
1701 return true;
1702 }
1703 return false;
1704 }
1705 }
1706
1707 /**
1708 * {@link Runnable} that will be scheduled on the {@code delegatedTaskExecutor} and will take care
1709 * of resume work on the {@link EventExecutor} once the task was executed.
1710 */
1711 private final class SslTasksRunner implements Runnable {
1712 private final boolean inUnwrap;
1713 private final Runnable runCompleteTask = new Runnable() {
1714 @Override
1715 public void run() {
1716 runComplete();
1717 }
1718 };
1719
1720 SslTasksRunner(boolean inUnwrap) {
1721 this.inUnwrap = inUnwrap;
1722 }
1723
1724 // Handle errors which happened during task processing.
1725 private void taskError(Throwable e) {
1726 if (inUnwrap) {
1727 // As the error happened while the task was scheduled as part of unwrap(...) we also need to ensure
1728 // we fire it through the pipeline as inbound error to be consistent with what we do in decode(...).
1729 //
1730 // This will also ensure we fail the handshake future and flush all produced data.
1731 try {
1732 handleUnwrapThrowable(ctx, e);
1733 } catch (Throwable cause) {
1734 safeExceptionCaught(cause);
1735 }
1736 } else {
1737 setHandshakeFailure(ctx, e);
1738 forceFlush(ctx);
1739 }
1740 }
1741
1742 // Try to call exceptionCaught(...)
1743 private void safeExceptionCaught(Throwable cause) {
1744 try {
1745 exceptionCaught(ctx, wrapIfNeeded(cause));
1746 } catch (Throwable error) {
1747 ctx.fireExceptionCaught(error);
1748 }
1749 }
1750
1751 private Throwable wrapIfNeeded(Throwable cause) {
1752 if (!inUnwrap) {
1753 // If we are not in unwrap(...) we can just rethrow without wrapping at all.
1754 return cause;
1755 }
1756 // As the exception would have been triggered by an inbound operation we will need to wrap it in a
1757 // DecoderException to mimic what a decoder would do when decode(...) throws.
1758 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1759 }
1760
1761 private void tryDecodeAgain() {
1762 try {
1763 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1764 } catch (Throwable cause) {
1765 safeExceptionCaught(cause);
1766 } finally {
1767 // As we called channelRead(...) we also need to call channelReadComplete(...) which
1768 // will ensure we either call ctx.fireChannelReadComplete() or will trigger a ctx.read() if
1769 // more data is needed.
1770 channelReadComplete0(ctx);
1771 }
1772 }
1773
1774 /**
1775 * Executed after the wrapped {@code task} was executed via {@code delegatedTaskExecutor} to resume work
1776 * on the {@link EventExecutor}.
1777 */
1778 private void resumeOnEventExecutor() {
1779 assert ctx.executor().inEventLoop();
1780 clearState(STATE_PROCESS_TASK);
1781 try {
1782 HandshakeStatus status = engine.getHandshakeStatus();
1783 switch (status) {
1784 // There is another task that needs to be executed and offloaded to the delegatingTaskExecutor as
1785 // a result of this. Let's reschedule....
1786 case NEED_TASK:
1787 executeDelegatedTask(this);
1788
1789 break;
1790
1791 // The handshake finished, lets notify about the completion of it and resume processing.
1792 case FINISHED:
1793 // Not handshaking anymore, lets notify about the completion if not done yet and resume processing.
1794 case NOT_HANDSHAKING:
1795 setHandshakeSuccess(); // NOT_HANDSHAKING -> workaround for android skipping FINISHED state.
1796 try {
1797 // Lets call wrap to ensure we produce the alert if there is any pending and also to
1798 // ensure we flush any queued data..
1799 wrap(ctx, inUnwrap);
1800 } catch (Throwable e) {
1801 taskError(e);
1802 return;
1803 }
1804 if (inUnwrap) {
1805 // If we were in the unwrap call when the task was processed we should also try to unwrap
1806 // non app data first as there may not anything left in the inbound buffer to process.
1807 unwrapNonAppData(ctx);
1808 }
1809
1810 // Flush now as we may have written some data as part of the wrap call.
1811 forceFlush(ctx);
1812
1813 tryDecodeAgain();
1814 break;
1815
1816 // We need more data so lets try to unwrap first and then call decode again which will feed us
1817 // with buffered data (if there is any).
1818 case NEED_UNWRAP:
1819 try {
1820 unwrapNonAppData(ctx);
1821 } catch (SSLException e) {
1822 handleUnwrapThrowable(ctx, e);
1823 return;
1824 }
1825 tryDecodeAgain();
1826 break;
1827
1828 // To make progress we need to call SSLEngine.wrap(...) which may produce more output data
1829 // that will be written to the Channel.
1830 case NEED_WRAP:
1831 try {
1832 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1833 // The handshake finished in wrapNonAppData(...), we need to try call
1834 // unwrapNonAppData(...) as we may have some alert that we should read.
1835 //
1836 // This mimics what we would do when we are calling this method while in unwrap(...).
1837 unwrapNonAppData(ctx);
1838 }
1839
1840 // Flush now as we may have written some data as part of the wrap call.
1841 forceFlush(ctx);
1842 } catch (Throwable e) {
1843 taskError(e);
1844 return;
1845 }
1846
1847 // Now try to feed in more data that we have buffered.
1848 tryDecodeAgain();
1849 break;
1850
1851 default:
1852 // Should never reach here as we handle all cases.
1853 throw new AssertionError();
1854 }
1855 } catch (Throwable cause) {
1856 safeExceptionCaught(cause);
1857 }
1858 }
1859
1860 void runComplete() {
1861 EventExecutor executor = ctx.executor();
1862 // Jump back on the EventExecutor. We do this even if we are already on the EventLoop to guard against
1863 // reentrancy issues. Failing to do so could lead to the situation of tryDecode(...) be called and so
1864 // channelRead(...) while still in the decode loop. In this case channelRead(...) might release the input
1865 // buffer if its empty which would then result in an IllegalReferenceCountException when we try to continue
1866 // decoding.
1867 //
1868 // See https://github.com/netty/netty-tcnative/issues/680
1869 executor.execute(new Runnable() {
1870 @Override
1871 public void run() {
1872 resumeOnEventExecutor();
1873 }
1874 });
1875 }
1876
1877 @Override
1878 public void run() {
1879 try {
1880 Runnable task = engine.getDelegatedTask();
1881 if (task == null) {
1882 // The task was processed in the meantime. Let's just return.
1883 return;
1884 }
1885 if (task instanceof AsyncRunnable) {
1886 AsyncRunnable asyncTask = (AsyncRunnable) task;
1887 asyncTask.run(runCompleteTask);
1888 } else {
1889 task.run();
1890 runComplete();
1891 }
1892 } catch (final Throwable cause) {
1893 handleException(cause);
1894 }
1895 }
1896
1897 private void handleException(final Throwable cause) {
1898 EventExecutor executor = ctx.executor();
1899 if (executor.inEventLoop()) {
1900 clearState(STATE_PROCESS_TASK);
1901 safeExceptionCaught(cause);
1902 } else {
1903 try {
1904 executor.execute(new Runnable() {
1905 @Override
1906 public void run() {
1907 clearState(STATE_PROCESS_TASK);
1908 safeExceptionCaught(cause);
1909 }
1910 });
1911 } catch (RejectedExecutionException ignore) {
1912 clearState(STATE_PROCESS_TASK);
1913 // the context itself will handle the rejected exception when try to schedule the operation so
1914 // ignore the RejectedExecutionException
1915 ctx.fireExceptionCaught(cause);
1916 }
1917 }
1918 }
1919 }
1920
1921 /**
1922 * Notify all the handshake futures about the successfully handshake
1923 * @return {@code true} if {@link #handshakePromise} was set successfully and a {@link SslHandshakeCompletionEvent}
1924 * was fired. {@code false} otherwise.
1925 */
1926 private boolean setHandshakeSuccess() {
1927 // Our control flow may invoke this method multiple times for a single FINISHED event. For example
1928 // wrapNonAppData may drain pendingUnencryptedWrites in wrap which transitions to handshake from FINISHED to
1929 // NOT_HANDSHAKING which invokes setHandshakeSuccess, and then wrapNonAppData also directly invokes this method.
1930 final boolean notified;
1931 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1932 if (logger.isDebugEnabled()) {
1933 SSLSession session = engine.getSession();
1934 logger.debug(
1935 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1936 ctx.channel(),
1937 session.getProtocol(),
1938 session.getCipherSuite());
1939 }
1940 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1941 }
1942 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1943 clearState(STATE_READ_DURING_HANDSHAKE);
1944 if (!ctx.channel().config().isAutoRead()) {
1945 ctx.read();
1946 }
1947 }
1948 return notified;
1949 }
1950
1951 /**
1952 * Notify all the handshake futures about the failure during the handshake.
1953 */
1954 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1955 setHandshakeFailure(ctx, cause, true, true, false);
1956 }
1957
1958 /**
1959 * Notify all the handshake futures about the failure during the handshake.
1960 */
1961 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1962 boolean notify, boolean alwaysFlushAndClose) {
1963 try {
1964 // Release all resources such as internal buffers that SSLEngine is managing.
1965 setState(STATE_OUTBOUND_CLOSED);
1966 engine.closeOutbound();
1967
1968 if (closeInbound) {
1969 try {
1970 engine.closeInbound();
1971 } catch (SSLException e) {
1972 if (logger.isDebugEnabled()) {
1973 // only log in debug mode as it most likely harmless and latest chrome still trigger
1974 // this all the time.
1975 //
1976 // See https://github.com/netty/netty/issues/1340
1977 String msg = e.getMessage();
1978 if (msg == null || !(msg.contains("possible truncation attack") ||
1979 msg.contains("closing inbound before receiving peer's close_notify"))) {
1980 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1981 }
1982 }
1983 }
1984 }
1985 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1986 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1987 }
1988 } finally {
1989 // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1990 releaseAndFailAll(ctx, cause);
1991 }
1992 }
1993
1994 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1995 // If TLS control frames fail to write we are in an unknown state and may become out of
1996 // sync with our peer. We give up and close the channel. This will also take care of
1997 // cleaning up any outstanding state (e.g. handshake promise, queued unencrypted data).
1998 try {
1999 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2000 releaseAndFailAll(ctx, transportFailure);
2001 if (handshakePromise.tryFailure(transportFailure)) {
2002 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2003 }
2004 } finally {
2005 ctx.close();
2006 }
2007 }
2008
2009 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2010 if (pendingUnencryptedWrites != null) {
2011 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2012 }
2013 }
2014
2015 private void notifyClosePromise(Throwable cause) {
2016 if (cause == null) {
2017 if (sslClosePromise.trySuccess(ctx.channel())) {
2018 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2019 }
2020 } else {
2021 if (sslClosePromise.tryFailure(cause)) {
2022 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2023 }
2024 }
2025 }
2026
2027 private void closeOutboundAndChannel(
2028 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2029 setState(STATE_OUTBOUND_CLOSED);
2030 engine.closeOutbound();
2031
2032 if (!ctx.channel().isActive()) {
2033 if (disconnect) {
2034 ctx.disconnect(promise);
2035 } else {
2036 ctx.close(promise);
2037 }
2038 return;
2039 }
2040
2041 ChannelPromise closeNotifyPromise = ctx.newPromise();
2042 try {
2043 flush(ctx, closeNotifyPromise);
2044 } finally {
2045 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2046 setState(STATE_CLOSE_NOTIFY);
2047 // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
2048 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
2049 // to fail the promise because of this. This will then fail as it was already completed by
2050 // safeClose(...). We create a new ChannelPromise and try to notify the original ChannelPromise
2051 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
2052 // because of a propagated Exception.
2053 //
2054 // See https://github.com/netty/netty/issues/5931
2055 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2056 } else {
2057 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
2058 sslClosePromise.addListener(new FutureListener<Channel>() {
2059 @Override
2060 public void operationComplete(Future<Channel> future) {
2061 promise.setSuccess();
2062 }
2063 });
2064 }
2065 }
2066 }
2067
2068 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2069 if (pendingUnencryptedWrites != null) {
2070 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2071 } else {
2072 promise.setFailure(newPendingWritesNullException());
2073 }
2074 flush(ctx);
2075 }
2076
2077 @Override
2078 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2079 this.ctx = ctx;
2080 Channel channel = ctx.channel();
2081 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
2082
2083 setOpensslEngineSocketFd(channel);
2084 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2085 boolean active = channel.isActive();
2086 if (active || fastOpen) {
2087 // Explicitly flush the handshake only if the channel is already active.
2088 // With TCP Fast Open, we write to the outbound buffer before the TCP connect is established.
2089 // The buffer will then be flushed as part of establishing the connection, saving us a round-trip.
2090 startHandshakeProcessing(active);
2091 // If we weren't able to include client_hello in the TCP SYN (e.g. no token, disabled at the OS) we have to
2092 // flush pending data in the outbound buffer later in channelActive().
2093 final ChannelOutboundBuffer outboundBuffer;
2094 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2095 outboundBuffer.totalPendingWriteBytes() > 0)) {
2096 setState(STATE_NEEDS_FLUSH);
2097 }
2098 }
2099 }
2100
2101 private void startHandshakeProcessing(boolean flushAtEnd) {
2102 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2103 setState(STATE_HANDSHAKE_STARTED);
2104 if (engine.getUseClientMode()) {
2105 // Begin the initial handshake.
2106 // channelActive() event has been fired already, which means this.channelActive() will
2107 // not be invoked. We have to initialize here instead.
2108 handshake(flushAtEnd);
2109 }
2110 applyHandshakeTimeout();
2111 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2112 forceFlush(ctx);
2113 }
2114 }
2115
2116 /**
2117 * Performs TLS renegotiation.
2118 */
2119 public Future<Channel> renegotiate() {
2120 ChannelHandlerContext ctx = this.ctx;
2121 if (ctx == null) {
2122 throw new IllegalStateException();
2123 }
2124
2125 return renegotiate(ctx.executor().<Channel>newPromise());
2126 }
2127
2128 /**
2129 * Performs TLS renegotiation.
2130 */
2131 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2132 ObjectUtil.checkNotNull(promise, "promise");
2133
2134 ChannelHandlerContext ctx = this.ctx;
2135 if (ctx == null) {
2136 throw new IllegalStateException();
2137 }
2138
2139 EventExecutor executor = ctx.executor();
2140 if (!executor.inEventLoop()) {
2141 executor.execute(new Runnable() {
2142 @Override
2143 public void run() {
2144 renegotiateOnEventLoop(promise);
2145 }
2146 });
2147 return promise;
2148 }
2149
2150 renegotiateOnEventLoop(promise);
2151 return promise;
2152 }
2153
2154 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2155 final Promise<Channel> oldHandshakePromise = handshakePromise;
2156 if (!oldHandshakePromise.isDone()) {
2157 // There's no need to handshake because handshake is in progress already.
2158 // Merge the new promise into the old one.
2159 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2160 } else {
2161 handshakePromise = newHandshakePromise;
2162 handshake(true);
2163 applyHandshakeTimeout();
2164 }
2165 }
2166
2167 /**
2168 * Performs TLS (re)negotiation.
2169 * @param flushAtEnd Set to {@code true} if the outbound buffer should be flushed (written to the network) at the
2170 * end. Set to {@code false} if the handshake will be flushed later, e.g. as part of TCP Fast Open
2171 * connect.
2172 */
2173 private void handshake(boolean flushAtEnd) {
2174 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2175 // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
2176 // is in progress. See https://github.com/netty/netty/issues/4718.
2177 return;
2178 }
2179 if (handshakePromise.isDone()) {
2180 // If the handshake is done already lets just return directly as there is no need to trigger it again.
2181 // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
2182 // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
2183 // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
2184 // call fireChannelActive() on the pipeline.
2185 return;
2186 }
2187
2188 // Begin handshake.
2189 final ChannelHandlerContext ctx = this.ctx;
2190 try {
2191 engine.beginHandshake();
2192 wrapNonAppData(ctx, false);
2193 } catch (Throwable e) {
2194 setHandshakeFailure(ctx, e);
2195 } finally {
2196 if (flushAtEnd) {
2197 forceFlush(ctx);
2198 }
2199 }
2200 }
2201
2202 private void applyHandshakeTimeout() {
2203 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2204
2205 // Set timeout if necessary.
2206 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2207 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2208 return;
2209 }
2210
2211 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2212 @Override
2213 public void run() {
2214 if (localHandshakePromise.isDone()) {
2215 return;
2216 }
2217 SSLException exception =
2218 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2219 try {
2220 if (localHandshakePromise.tryFailure(exception)) {
2221 SslUtils.handleHandshakeFailure(ctx, exception, true);
2222 }
2223 } finally {
2224 releaseAndFailAll(ctx, exception);
2225 }
2226 }
2227 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2228
2229 // Cancel the handshake timeout when handshake is finished.
2230 localHandshakePromise.addListener(new FutureListener<Channel>() {
2231 @Override
2232 public void operationComplete(Future<Channel> f) throws Exception {
2233 timeoutFuture.cancel(false);
2234 }
2235 });
2236 }
2237
2238 private void forceFlush(ChannelHandlerContext ctx) {
2239 clearState(STATE_NEEDS_FLUSH);
2240 ctx.flush();
2241 }
2242
2243 private void setOpensslEngineSocketFd(Channel c) {
2244 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2245 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2246 }
2247 }
2248
2249 /**
2250 * Issues an initial TLS handshake once connected when used in client-mode
2251 */
2252 @Override
2253 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2254 setOpensslEngineSocketFd(ctx.channel());
2255 if (!startTls) {
2256 startHandshakeProcessing(true);
2257 }
2258 ctx.fireChannelActive();
2259 }
2260
2261 private void safeClose(
2262 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2263 final ChannelPromise promise) {
2264 if (!ctx.channel().isActive()) {
2265 ctx.close(promise);
2266 return;
2267 }
2268
2269 final Future<?> timeoutFuture;
2270 if (!flushFuture.isDone()) {
2271 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2272 if (closeNotifyTimeout > 0) {
2273 // Force-close the connection if close_notify is not fully sent in time.
2274 timeoutFuture = ctx.executor().schedule(new Runnable() {
2275 @Override
2276 public void run() {
2277 // May be done in the meantime as cancel(...) is only best effort.
2278 if (!flushFuture.isDone()) {
2279 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2280 ctx.channel());
2281 addCloseListener(ctx.close(ctx.newPromise()), promise);
2282 }
2283 }
2284 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2285 } else {
2286 timeoutFuture = null;
2287 }
2288 } else {
2289 timeoutFuture = null;
2290 }
2291
2292 // Close the connection if close_notify is sent in time.
2293 flushFuture.addListener(new ChannelFutureListener() {
2294 @Override
2295 public void operationComplete(ChannelFuture f) {
2296 if (timeoutFuture != null) {
2297 timeoutFuture.cancel(false);
2298 }
2299 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2300 if (closeNotifyReadTimeout <= 0) {
2301 // Trigger the close in all cases to make sure the promise is notified
2302 // See https://github.com/netty/netty/issues/2358
2303 addCloseListener(ctx.close(ctx.newPromise()), promise);
2304 } else {
2305 final Future<?> closeNotifyReadTimeoutFuture;
2306
2307 if (!sslClosePromise.isDone()) {
2308 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2309 @Override
2310 public void run() {
2311 if (!sslClosePromise.isDone()) {
2312 logger.debug(
2313 "{} did not receive close_notify in {}ms; force-closing the connection.",
2314 ctx.channel(), closeNotifyReadTimeout);
2315
2316 // Do the close now...
2317 addCloseListener(ctx.close(ctx.newPromise()), promise);
2318 }
2319 }
2320 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2321 } else {
2322 closeNotifyReadTimeoutFuture = null;
2323 }
2324
2325 // Do the close once the we received the close_notify.
2326 sslClosePromise.addListener(new FutureListener<Channel>() {
2327 @Override
2328 public void operationComplete(Future<Channel> future) throws Exception {
2329 if (closeNotifyReadTimeoutFuture != null) {
2330 closeNotifyReadTimeoutFuture.cancel(false);
2331 }
2332 addCloseListener(ctx.close(ctx.newPromise()), promise);
2333 }
2334 });
2335 }
2336 }
2337 });
2338 }
2339
2340 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2341 // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
2342 // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
2343 // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
2344 // IllegalStateException.
2345 // Also we not want to log if the notification happens as this is expected in some cases.
2346 // See https://github.com/netty/netty/issues/5598
2347 PromiseNotifier.cascade(false, future, promise);
2348 }
2349
2350 /**
2351 * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
2352 * in {@link OpenSslEngine}.
2353 */
2354 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2355 ByteBufAllocator alloc = ctx.alloc();
2356 if (engineType.wantsDirectBuffer) {
2357 return alloc.directBuffer(capacity);
2358 } else {
2359 return alloc.buffer(capacity);
2360 }
2361 }
2362
2363 /**
2364 * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
2365 * the specified amount of pending bytes.
2366 */
2367 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2368 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2369 }
2370
2371 private boolean isStateSet(int bit) {
2372 return (state & bit) == bit;
2373 }
2374
2375 private void setState(int bit) {
2376 state |= bit;
2377 }
2378
2379 private void clearState(int bit) {
2380 state &= ~bit;
2381 }
2382
2383 /**
2384 * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
2385 * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
2386 * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
2387 */
2388 private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2389
2390 SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
2391 super(channel, initSize);
2392 }
2393
2394 @Override
2395 protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
2396 final int wrapDataSize = SslHandler.this.wrapDataSize;
2397 if (cumulation instanceof CompositeByteBuf) {
2398 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
2399 int numComponents = composite.numComponents();
2400 if (numComponents == 0 ||
2401 !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
2402 composite.addComponent(true, next);
2403 }
2404 return composite;
2405 }
2406 return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
2407 copyAndCompose(alloc, cumulation, next);
2408 }
2409
2410 @Override
2411 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
2412 if (first instanceof CompositeByteBuf) {
2413 CompositeByteBuf composite = (CompositeByteBuf) first;
2414 if (engineType.wantsDirectBuffer) {
2415 first = allocator.directBuffer(composite.readableBytes());
2416 } else {
2417 first = allocator.heapBuffer(composite.readableBytes());
2418 }
2419 try {
2420 first.writeBytes(composite);
2421 } catch (Throwable cause) {
2422 first.release();
2423 PlatformDependent.throwException(cause);
2424 }
2425 composite.release();
2426 }
2427 return first;
2428 }
2429
2430 @Override
2431 protected ByteBuf removeEmptyValue() {
2432 return null;
2433 }
2434 }
2435
2436 private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
2437 final int inReadableBytes = next.readableBytes();
2438 final int cumulationCapacity = cumulation.capacity();
2439 if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
2440 // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
2441 // Only copy if there is enough space available and the capacity is large enough, and attempt to
2442 // resize if the capacity is small.
2443 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
2444 cumulationCapacity < wrapDataSize &&
2445 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
2446 cumulation.writeBytes(next);
2447 next.release();
2448 return true;
2449 }
2450 return false;
2451 }
2452
2453 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2454
2455 @Override
2456 protected EventExecutor executor() {
2457 if (ctx == null) {
2458 throw new IllegalStateException();
2459 }
2460 return ctx.executor();
2461 }
2462
2463 @Override
2464 protected void checkDeadLock() {
2465 if (ctx == null) {
2466 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2467 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2468 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2469 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2470 // ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an
2471 // IllegalStateException when trying to call executor().
2472 return;
2473 }
2474 super.checkDeadLock();
2475 }
2476 }
2477 }