1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractCoalescingBufferQueue;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelException;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelFutureListener;
29 import io.netty.channel.ChannelHandlerContext;
30 import io.netty.channel.ChannelInboundHandler;
31 import io.netty.channel.ChannelOption;
32 import io.netty.channel.ChannelOutboundBuffer;
33 import io.netty.channel.ChannelOutboundHandler;
34 import io.netty.channel.ChannelPipeline;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.unix.UnixChannel;
37 import io.netty.handler.codec.ByteToMessageDecoder;
38 import io.netty.handler.codec.DecoderException;
39 import io.netty.handler.codec.UnsupportedMessageTypeException;
40 import io.netty.util.ReferenceCountUtil;
41 import io.netty.util.concurrent.DefaultPromise;
42 import io.netty.util.concurrent.EventExecutor;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.concurrent.FutureListener;
45 import io.netty.util.concurrent.ImmediateExecutor;
46 import io.netty.util.concurrent.Promise;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.ObjectUtil;
49 import io.netty.util.internal.PlatformDependent;
50 import io.netty.util.internal.ThrowableUtil;
51 import io.netty.util.internal.UnstableApi;
52 import io.netty.util.internal.logging.InternalLogger;
53 import io.netty.util.internal.logging.InternalLoggerFactory;
54
55 import java.io.IOException;
56 import java.net.SocketAddress;
57 import java.nio.ByteBuffer;
58 import java.nio.channels.ClosedChannelException;
59 import java.nio.channels.DatagramChannel;
60 import java.nio.channels.SocketChannel;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
76 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
77 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
78 import static io.netty.util.internal.ObjectUtil.checkNotNull;
79 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
172 private static final InternalLogger logger =
173 InternalLoggerFactory.getInstance(SslHandler.class);
174 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
175 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
176 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
177 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
178 private static final int STATE_SENT_FIRST_MESSAGE = 1;
179 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
180 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
181 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
182
183
184
185
186 private static final int STATE_NEEDS_FLUSH = 1 << 4;
187 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
188 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
189 private static final int STATE_PROCESS_TASK = 1 << 7;
190
191
192
193
194 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
195 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
196
197
198
199
200
201 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
202
203 private enum SslEngineType {
204 TCNATIVE(true, COMPOSITE_CUMULATOR) {
205 @Override
206 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
207 int nioBufferCount = in.nioBufferCount();
208 int writerIndex = out.writerIndex();
209 final SSLEngineResult result;
210 if (nioBufferCount > 1) {
211
212
213
214
215
216 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
217 try {
218 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
219 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
220 } finally {
221 handler.singleBuffer[0] = null;
222 }
223 } else {
224 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
225 toByteBuffer(out, writerIndex, out.writableBytes()));
226 }
227 out.writerIndex(writerIndex + result.bytesProduced());
228 return result;
229 }
230
231 @Override
232 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
233 int pendingBytes, int numComponents) {
234 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
235 .calculateOutNetBufSize(pendingBytes, numComponents));
236 }
237
238 @Override
239 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
240 return ((ReferenceCountedOpenSslEngine) handler.engine)
241 .calculateMaxLengthForWrap(pendingBytes, numComponents);
242 }
243
244 @Override
245 int calculatePendingData(SslHandler handler, int guess) {
246 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
247 return sslPending > 0 ? sslPending : guess;
248 }
249
250 @Override
251 boolean jdkCompatibilityMode(SSLEngine engine) {
252 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
253 }
254 },
255 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
256 @Override
257 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
258 int nioBufferCount = in.nioBufferCount();
259 int writerIndex = out.writerIndex();
260 final SSLEngineResult result;
261 if (nioBufferCount > 1) {
262
263
264
265 try {
266 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
267 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
268 in.nioBuffers(in.readerIndex(), len),
269 handler.singleBuffer);
270 } finally {
271 handler.singleBuffer[0] = null;
272 }
273 } else {
274 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
275 toByteBuffer(out, writerIndex, out.writableBytes()));
276 }
277 out.writerIndex(writerIndex + result.bytesProduced());
278 return result;
279 }
280
281 @Override
282 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
283 int pendingBytes, int numComponents) {
284 return allocator.directBuffer(
285 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
286 }
287
288 @Override
289 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
290 return ((ConscryptAlpnSslEngine) handler.engine)
291 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
292 }
293
294 @Override
295 int calculatePendingData(SslHandler handler, int guess) {
296 return guess;
297 }
298
299 @Override
300 boolean jdkCompatibilityMode(SSLEngine engine) {
301 return true;
302 }
303 },
304 JDK(false, MERGE_CUMULATOR) {
305 @Override
306 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
307 int writerIndex = out.writerIndex();
308 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
309 int position = inNioBuffer.position();
310 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
311 toByteBuffer(out, writerIndex, out.writableBytes()));
312 out.writerIndex(writerIndex + result.bytesProduced());
313
314
315
316
317
318
319
320 if (result.bytesConsumed() == 0) {
321 int consumed = inNioBuffer.position() - position;
322 if (consumed != result.bytesConsumed()) {
323
324 return new SSLEngineResult(
325 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
326 }
327 }
328 return result;
329 }
330
331 @Override
332 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
333 int pendingBytes, int numComponents) {
334
335
336 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
337 }
338
339 @Override
340 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
341
342
343
344
345
346
347 return handler.engine.getSession().getPacketBufferSize();
348 }
349
350 @Override
351 int calculatePendingData(SslHandler handler, int guess) {
352 return guess;
353 }
354
355 @Override
356 boolean jdkCompatibilityMode(SSLEngine engine) {
357 return true;
358 }
359 };
360
361 static SslEngineType forEngine(SSLEngine engine) {
362 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
363 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
364 }
365
366 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
367 this.wantsDirectBuffer = wantsDirectBuffer;
368 this.cumulator = cumulator;
369 }
370
371 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
372
373 abstract int calculatePendingData(SslHandler handler, int guess);
374
375 abstract boolean jdkCompatibilityMode(SSLEngine engine);
376
377 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
378 int pendingBytes, int numComponents);
379
380 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
381
382
383
384
385
386
387
388 final boolean wantsDirectBuffer;
389
390
391
392
393
394
395
396
397
398
399
400 final Cumulator cumulator;
401 }
402
403 private volatile ChannelHandlerContext ctx;
404 private final SSLEngine engine;
405 private final SslEngineType engineType;
406 private final Executor delegatedTaskExecutor;
407 private final boolean jdkCompatibilityMode;
408
409
410
411
412
413
414 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
415
416 private final boolean startTls;
417
418 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
419 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
420
421 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
422 private Promise<Channel> handshakePromise = new LazyChannelPromise();
423 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
424
425 private int packetLength;
426 private short state;
427
428 private volatile long handshakeTimeoutMillis = 10000;
429 private volatile long closeNotifyFlushTimeoutMillis = 3000;
430 private volatile long closeNotifyReadTimeoutMillis;
431 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
432
433
434
435
436
437
438 public SslHandler(SSLEngine engine) {
439 this(engine, false);
440 }
441
442
443
444
445
446
447
448
449 public SslHandler(SSLEngine engine, boolean startTls) {
450 this(engine, startTls, ImmediateExecutor.INSTANCE);
451 }
452
453
454
455
456
457
458
459
460 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
461 this(engine, false, delegatedTaskExecutor);
462 }
463
464
465
466
467
468
469
470
471
472
473 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
474 this.engine = ObjectUtil.checkNotNull(engine, "engine");
475 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
476 engineType = SslEngineType.forEngine(engine);
477 this.startTls = startTls;
478 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
479 setCumulator(engineType.cumulator);
480 }
481
482 public long getHandshakeTimeoutMillis() {
483 return handshakeTimeoutMillis;
484 }
485
486 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
487 checkNotNull(unit, "unit");
488 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
489 }
490
491 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
492 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515 @UnstableApi
516 public final void setWrapDataSize(int wrapDataSize) {
517 this.wrapDataSize = wrapDataSize;
518 }
519
520
521
522
523 @Deprecated
524 public long getCloseNotifyTimeoutMillis() {
525 return getCloseNotifyFlushTimeoutMillis();
526 }
527
528
529
530
531 @Deprecated
532 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
533 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
534 }
535
536
537
538
539 @Deprecated
540 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
541 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
542 }
543
544
545
546
547
548
549 public final long getCloseNotifyFlushTimeoutMillis() {
550 return closeNotifyFlushTimeoutMillis;
551 }
552
553
554
555
556
557
558 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
559 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
560 }
561
562
563
564
565 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
566 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
567 "closeNotifyFlushTimeoutMillis");
568 }
569
570
571
572
573
574
575 public final long getCloseNotifyReadTimeoutMillis() {
576 return closeNotifyReadTimeoutMillis;
577 }
578
579
580
581
582
583
584 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
585 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
586 }
587
588
589
590
591 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
592 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
593 "closeNotifyReadTimeoutMillis");
594 }
595
596
597
598
599 public SSLEngine engine() {
600 return engine;
601 }
602
603
604
605
606
607
608 public String applicationProtocol() {
609 SSLEngine engine = engine();
610 if (!(engine instanceof ApplicationProtocolAccessor)) {
611 return null;
612 }
613
614 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
615 }
616
617
618
619
620
621
622
623 public Future<Channel> handshakeFuture() {
624 return handshakePromise;
625 }
626
627
628
629
630 @Deprecated
631 public ChannelFuture close() {
632 return closeOutbound();
633 }
634
635
636
637
638 @Deprecated
639 public ChannelFuture close(ChannelPromise promise) {
640 return closeOutbound(promise);
641 }
642
643
644
645
646
647
648
649 public ChannelFuture closeOutbound() {
650 return closeOutbound(ctx.newPromise());
651 }
652
653
654
655
656
657
658
659 public ChannelFuture closeOutbound(final ChannelPromise promise) {
660 final ChannelHandlerContext ctx = this.ctx;
661 if (ctx.executor().inEventLoop()) {
662 closeOutbound0(promise);
663 } else {
664 ctx.executor().execute(new Runnable() {
665 @Override
666 public void run() {
667 closeOutbound0(promise);
668 }
669 });
670 }
671 return promise;
672 }
673
674 private void closeOutbound0(ChannelPromise promise) {
675 setState(STATE_OUTBOUND_CLOSED);
676 engine.closeOutbound();
677 try {
678 flush(ctx, promise);
679 } catch (Exception e) {
680 if (!promise.tryFailure(e)) {
681 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
682 }
683 }
684 }
685
686
687
688
689
690
691
692
693 public Future<Channel> sslCloseFuture() {
694 return sslClosePromise;
695 }
696
697 @Override
698 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
699 try {
700 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
701
702 pendingUnencryptedWrites.releaseAndFailAll(ctx,
703 new ChannelException("Pending write on removal of SslHandler"));
704 }
705 pendingUnencryptedWrites = null;
706
707 SSLException cause = null;
708
709
710
711
712 if (!handshakePromise.isDone()) {
713 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
714 if (handshakePromise.tryFailure(cause)) {
715 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
716 }
717 }
718 if (!sslClosePromise.isDone()) {
719 if (cause == null) {
720 cause = new SSLException("SslHandler removed before SSLEngine was closed");
721 }
722 notifyClosePromise(cause);
723 }
724 } finally {
725 ReferenceCountUtil.release(engine);
726 }
727 }
728
729 @Override
730 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
731 ctx.bind(localAddress, promise);
732 }
733
734 @Override
735 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
736 ChannelPromise promise) throws Exception {
737 ctx.connect(remoteAddress, localAddress, promise);
738 }
739
740 @Override
741 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
742 ctx.deregister(promise);
743 }
744
745 @Override
746 public void disconnect(final ChannelHandlerContext ctx,
747 final ChannelPromise promise) throws Exception {
748 closeOutboundAndChannel(ctx, promise, true);
749 }
750
751 @Override
752 public void close(final ChannelHandlerContext ctx,
753 final ChannelPromise promise) throws Exception {
754 closeOutboundAndChannel(ctx, promise, false);
755 }
756
757 @Override
758 public void read(ChannelHandlerContext ctx) throws Exception {
759 if (!handshakePromise.isDone()) {
760 setState(STATE_READ_DURING_HANDSHAKE);
761 }
762
763 ctx.read();
764 }
765
766 private static IllegalStateException newPendingWritesNullException() {
767 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
768 }
769
770 @Override
771 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
772 if (!(msg instanceof ByteBuf)) {
773 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
774 ReferenceCountUtil.safeRelease(msg);
775 promise.setFailure(exception);
776 } else if (pendingUnencryptedWrites == null) {
777 ReferenceCountUtil.safeRelease(msg);
778 promise.setFailure(newPendingWritesNullException());
779 } else {
780 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
781 }
782 }
783
784 @Override
785 public void flush(ChannelHandlerContext ctx) throws Exception {
786
787
788 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
789 setState(STATE_SENT_FIRST_MESSAGE);
790 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
791 forceFlush(ctx);
792
793
794 startHandshakeProcessing(true);
795 return;
796 }
797
798 if (isStateSet(STATE_PROCESS_TASK)) {
799 return;
800 }
801
802 try {
803 wrapAndFlush(ctx);
804 } catch (Throwable cause) {
805 setHandshakeFailure(ctx, cause);
806 PlatformDependent.throwException(cause);
807 }
808 }
809
810 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
811 if (pendingUnencryptedWrites.isEmpty()) {
812
813
814
815
816 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
817 }
818 if (!handshakePromise.isDone()) {
819 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
820 }
821 try {
822 wrap(ctx, false);
823 } finally {
824
825
826 forceFlush(ctx);
827 }
828 }
829
830
831 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
832 ByteBuf out = null;
833 ByteBufAllocator alloc = ctx.alloc();
834 try {
835 final int wrapDataSize = this.wrapDataSize;
836
837
838 outer: while (!ctx.isRemoved()) {
839 ChannelPromise promise = ctx.newPromise();
840 ByteBuf buf = wrapDataSize > 0 ?
841 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
842 pendingUnencryptedWrites.removeFirst(promise);
843 if (buf == null) {
844 break;
845 }
846
847 SSLEngineResult result;
848
849 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
850
851
852
853
854
855 int readableBytes = buf.readableBytes();
856 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
857 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
858 numPackets += 1;
859 }
860
861 if (out == null) {
862 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
863 }
864 result = wrapMultiple(alloc, engine, buf, out);
865 } else {
866 if (out == null) {
867 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
868 }
869 result = wrap(alloc, engine, buf, out);
870 }
871
872 if (buf.isReadable()) {
873 pendingUnencryptedWrites.addFirst(buf, promise);
874
875
876 promise = null;
877 } else {
878 buf.release();
879 }
880
881
882
883 if (out.isReadable()) {
884 final ByteBuf b = out;
885 out = null;
886 if (promise != null) {
887 ctx.write(b, promise);
888 } else {
889 ctx.write(b);
890 }
891 } else if (promise != null) {
892 ctx.write(Unpooled.EMPTY_BUFFER, promise);
893 }
894
895
896 if (result.getStatus() == Status.CLOSED) {
897
898
899 if (!pendingUnencryptedWrites.isEmpty()) {
900
901
902 Throwable exception = handshakePromise.cause();
903 if (exception == null) {
904 exception = sslClosePromise.cause();
905 if (exception == null) {
906 exception = new SslClosedEngineException("SSLEngine closed already");
907 }
908 }
909 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
910 }
911
912 return;
913 } else {
914 switch (result.getHandshakeStatus()) {
915 case NEED_TASK:
916 if (!runDelegatedTasks(inUnwrap)) {
917
918
919 break outer;
920 }
921 break;
922 case FINISHED:
923 case NOT_HANDSHAKING:
924 setHandshakeSuccess();
925 break;
926 case NEED_WRAP:
927
928
929
930
931 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
932 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
933 }
934 break;
935 case NEED_UNWRAP:
936
937
938 readIfNeeded(ctx);
939 return;
940 default:
941 throw new IllegalStateException(
942 "Unknown handshake status: " + result.getHandshakeStatus());
943 }
944 }
945 }
946 } finally {
947 if (out != null) {
948 out.release();
949 }
950 if (inUnwrap) {
951 setState(STATE_NEEDS_FLUSH);
952 }
953 }
954 }
955
956
957
958
959
960
961
962 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
963 ByteBuf out = null;
964 ByteBufAllocator alloc = ctx.alloc();
965 try {
966
967
968 outer: while (!ctx.isRemoved()) {
969 if (out == null) {
970
971
972
973 out = allocateOutNetBuf(ctx, 2048, 1);
974 }
975 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
976 if (result.bytesProduced() > 0) {
977 ctx.write(out).addListener(new ChannelFutureListener() {
978 @Override
979 public void operationComplete(ChannelFuture future) {
980 Throwable cause = future.cause();
981 if (cause != null) {
982 setHandshakeFailureTransportFailure(ctx, cause);
983 }
984 }
985 });
986 if (inUnwrap) {
987 setState(STATE_NEEDS_FLUSH);
988 }
989 out = null;
990 }
991
992 HandshakeStatus status = result.getHandshakeStatus();
993 switch (status) {
994 case FINISHED:
995
996
997
998
999 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1000 wrap(ctx, true);
1001 }
1002 return false;
1003 case NEED_TASK:
1004 if (!runDelegatedTasks(inUnwrap)) {
1005
1006
1007 break outer;
1008 }
1009 break;
1010 case NEED_UNWRAP:
1011 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1012
1013
1014
1015 return false;
1016 }
1017 break;
1018 case NEED_WRAP:
1019 break;
1020 case NOT_HANDSHAKING:
1021 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1022 wrap(ctx, true);
1023 }
1024
1025
1026 if (!inUnwrap) {
1027 unwrapNonAppData(ctx);
1028 }
1029 return true;
1030 default:
1031 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1032 }
1033
1034
1035
1036 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1037 break;
1038 }
1039
1040
1041
1042 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1043 break;
1044 }
1045 }
1046 } finally {
1047 if (out != null) {
1048 out.release();
1049 }
1050 }
1051 return false;
1052 }
1053
1054 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1055 throws SSLException {
1056 SSLEngineResult result = null;
1057
1058 do {
1059 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1060
1061
1062 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1063
1064 if (!out.isWritable(nextOutSize)) {
1065 if (result != null) {
1066
1067
1068 break;
1069 }
1070
1071
1072 out.ensureWritable(nextOutSize);
1073 }
1074
1075 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1076 result = wrap(alloc, engine, wrapBuf, out);
1077
1078 if (result.getStatus() == Status.CLOSED) {
1079
1080
1081 break;
1082 }
1083
1084 if (wrapBuf.isReadable()) {
1085
1086
1087 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1088 }
1089 } while (in.readableBytes() > 0);
1090
1091 return result;
1092 }
1093
1094 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1095 throws SSLException {
1096 ByteBuf newDirectIn = null;
1097 try {
1098 int readerIndex = in.readerIndex();
1099 int readableBytes = in.readableBytes();
1100
1101
1102
1103 final ByteBuffer[] in0;
1104 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1105
1106
1107
1108
1109 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1110 in0 = singleBuffer;
1111
1112
1113 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1114 } else {
1115 in0 = in.nioBuffers();
1116 }
1117 } else {
1118
1119
1120
1121 newDirectIn = alloc.directBuffer(readableBytes);
1122 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1123 in0 = singleBuffer;
1124 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1125 }
1126
1127 for (;;) {
1128
1129
1130 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1131 SSLEngineResult result = engine.wrap(in0, out0);
1132 in.skipBytes(result.bytesConsumed());
1133 out.writerIndex(out.writerIndex() + result.bytesProduced());
1134
1135 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1136 out.ensureWritable(engine.getSession().getPacketBufferSize());
1137 } else {
1138 return result;
1139 }
1140 }
1141 } finally {
1142
1143 singleBuffer[0] = null;
1144
1145 if (newDirectIn != null) {
1146 newDirectIn.release();
1147 }
1148 }
1149 }
1150
1151 @Override
1152 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1153 boolean handshakeFailed = handshakePromise.cause() != null;
1154
1155
1156 ClosedChannelException exception = new ClosedChannelException();
1157
1158
1159 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1160 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1161 "Connection closed while SSL/TLS handshake was in progress",
1162 SslHandler.class, "channelInactive"));
1163 }
1164
1165
1166
1167 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1168 false);
1169
1170
1171 notifyClosePromise(exception);
1172
1173 try {
1174 super.channelInactive(ctx);
1175 } catch (DecoderException e) {
1176 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1177
1178
1179
1180
1181
1182 throw e;
1183 }
1184 }
1185 }
1186
1187 @Override
1188 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1189 if (ignoreException(cause)) {
1190
1191
1192 if (logger.isDebugEnabled()) {
1193 logger.debug(
1194 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1195 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1196 }
1197
1198
1199
1200 if (ctx.channel().isActive()) {
1201 ctx.close();
1202 }
1203 } else {
1204 ctx.fireExceptionCaught(cause);
1205 }
1206 }
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217 private boolean ignoreException(Throwable t) {
1218 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1219 String message = t.getMessage();
1220
1221
1222
1223 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1224 return true;
1225 }
1226
1227
1228 StackTraceElement[] elements = t.getStackTrace();
1229 for (StackTraceElement element: elements) {
1230 String classname = element.getClassName();
1231 String methodname = element.getMethodName();
1232
1233
1234 if (classname.startsWith("io.netty.")) {
1235 continue;
1236 }
1237
1238
1239 if (!"read".equals(methodname)) {
1240 continue;
1241 }
1242
1243
1244
1245 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1246 return true;
1247 }
1248
1249 try {
1250
1251
1252
1253 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1254
1255 if (SocketChannel.class.isAssignableFrom(clazz)
1256 || DatagramChannel.class.isAssignableFrom(clazz)) {
1257 return true;
1258 }
1259
1260
1261 if (PlatformDependent.javaVersion() >= 7
1262 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1263 return true;
1264 }
1265 } catch (Throwable cause) {
1266 if (logger.isDebugEnabled()) {
1267 logger.debug("Unexpected exception while loading class {} classname {}",
1268 getClass(), classname, cause);
1269 }
1270 }
1271 }
1272 }
1273
1274 return false;
1275 }
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289 public static boolean isEncrypted(ByteBuf buffer) {
1290 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1291 throw new IllegalArgumentException(
1292 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1293 }
1294 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1295 }
1296
1297 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1298 int packetLength = this.packetLength;
1299
1300 if (packetLength > 0) {
1301 if (in.readableBytes() < packetLength) {
1302 return;
1303 }
1304 } else {
1305
1306 final int readableBytes = in.readableBytes();
1307 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1308 return;
1309 }
1310 packetLength = getEncryptedPacketLength(in, in.readerIndex());
1311 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1312
1313 NotSslRecordException e = new NotSslRecordException(
1314 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1315 in.skipBytes(in.readableBytes());
1316
1317
1318
1319 setHandshakeFailure(ctx, e);
1320
1321 throw e;
1322 }
1323 if (packetLength == NOT_ENOUGH_DATA) {
1324 return;
1325 }
1326 assert packetLength > 0;
1327 if (packetLength > readableBytes) {
1328
1329 this.packetLength = packetLength;
1330 return;
1331 }
1332 }
1333
1334
1335
1336 this.packetLength = 0;
1337 try {
1338 final int bytesConsumed = unwrap(ctx, in, packetLength);
1339 assert bytesConsumed == packetLength || engine.isInboundDone() :
1340 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1341 bytesConsumed;
1342 } catch (Throwable cause) {
1343 handleUnwrapThrowable(ctx, cause);
1344 }
1345 }
1346
1347 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1348 try {
1349 unwrap(ctx, in, in.readableBytes());
1350 } catch (Throwable cause) {
1351 handleUnwrapThrowable(ctx, cause);
1352 }
1353 }
1354
1355 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1356 try {
1357
1358
1359
1360
1361 if (handshakePromise.tryFailure(cause)) {
1362 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1363 }
1364
1365
1366 if (pendingUnencryptedWrites != null) {
1367
1368
1369 wrapAndFlush(ctx);
1370 }
1371 } catch (SSLException ex) {
1372 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1373 " because of an previous SSLException, ignoring...", ex);
1374 } finally {
1375
1376 setHandshakeFailure(ctx, cause, true, false, true);
1377 }
1378 PlatformDependent.throwException(cause);
1379 }
1380
1381 @Override
1382 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1383 if (isStateSet(STATE_PROCESS_TASK)) {
1384 return;
1385 }
1386 if (jdkCompatibilityMode) {
1387 decodeJdkCompatible(ctx, in);
1388 } else {
1389 decodeNonJdkCompatible(ctx, in);
1390 }
1391 }
1392
1393 @Override
1394 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1395 channelReadComplete0(ctx);
1396 }
1397
1398 private void channelReadComplete0(ChannelHandlerContext ctx) {
1399
1400 discardSomeReadBytes();
1401
1402 flushIfNeeded(ctx);
1403 readIfNeeded(ctx);
1404
1405 clearState(STATE_FIRE_CHANNEL_READ);
1406 ctx.fireChannelReadComplete();
1407 }
1408
1409 private void readIfNeeded(ChannelHandlerContext ctx) {
1410
1411 if (!ctx.channel().config().isAutoRead() &&
1412 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1413
1414
1415 ctx.read();
1416 }
1417 }
1418
1419 private void flushIfNeeded(ChannelHandlerContext ctx) {
1420 if (isStateSet(STATE_NEEDS_FLUSH)) {
1421 forceFlush(ctx);
1422 }
1423 }
1424
1425
1426
1427
1428 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1429 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1430 }
1431
1432
1433
1434
1435 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1436 final int originalLength = length;
1437 boolean wrapLater = false;
1438 boolean notifyClosure = false;
1439 boolean executedRead = false;
1440 ByteBuf decodeOut = allocate(ctx, length);
1441 try {
1442
1443
1444 do {
1445 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1446 final Status status = result.getStatus();
1447 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1448 final int produced = result.bytesProduced();
1449 final int consumed = result.bytesConsumed();
1450
1451
1452
1453
1454 packet.skipBytes(consumed);
1455 length -= consumed;
1456
1457
1458
1459
1460 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1461 wrapLater |= (decodeOut.isReadable() ?
1462 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1463 handshakeStatus == HandshakeStatus.FINISHED;
1464 }
1465
1466
1467
1468
1469 if (decodeOut.isReadable()) {
1470 setState(STATE_FIRE_CHANNEL_READ);
1471 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1472 executedRead = true;
1473 executeChannelRead(ctx, decodeOut);
1474 } else {
1475 ctx.fireChannelRead(decodeOut);
1476 }
1477 decodeOut = null;
1478 }
1479
1480 if (status == Status.CLOSED) {
1481 notifyClosure = true;
1482 } else if (status == Status.BUFFER_OVERFLOW) {
1483 if (decodeOut != null) {
1484 decodeOut.release();
1485 }
1486 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1487
1488
1489
1490
1491 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1492 applicationBufferSize : applicationBufferSize - produced));
1493 continue;
1494 }
1495
1496 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1497 boolean pending = runDelegatedTasks(true);
1498 if (!pending) {
1499
1500
1501
1502
1503
1504 wrapLater = false;
1505 break;
1506 }
1507 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1508
1509
1510
1511 if (wrapNonAppData(ctx, true) && length == 0) {
1512 break;
1513 }
1514 }
1515
1516 if (status == Status.BUFFER_UNDERFLOW ||
1517
1518 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1519 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1520 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1521
1522
1523 readIfNeeded(ctx);
1524 }
1525
1526 break;
1527 } else if (decodeOut == null) {
1528 decodeOut = allocate(ctx, length);
1529 }
1530 } while (!ctx.isRemoved());
1531
1532 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1533
1534
1535
1536
1537 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1538 wrapLater = true;
1539 }
1540
1541 if (wrapLater) {
1542 wrap(ctx, true);
1543 }
1544 } finally {
1545 if (decodeOut != null) {
1546 decodeOut.release();
1547 }
1548
1549 if (notifyClosure) {
1550 if (executedRead) {
1551 executeNotifyClosePromise(ctx);
1552 } else {
1553 notifyClosePromise(null);
1554 }
1555 }
1556 }
1557 return originalLength - length;
1558 }
1559
1560 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1561
1562
1563 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1564 if (setReentryState) {
1565 setState(STATE_UNWRAP_REENTRY);
1566 }
1567 try {
1568 return setHandshakeSuccess();
1569 } finally {
1570
1571
1572 if (setReentryState) {
1573 clearState(STATE_UNWRAP_REENTRY);
1574 }
1575 }
1576 }
1577
1578 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1579 try {
1580 ctx.executor().execute(new Runnable() {
1581 @Override
1582 public void run() {
1583 notifyClosePromise(null);
1584 }
1585 });
1586 } catch (RejectedExecutionException e) {
1587 notifyClosePromise(e);
1588 }
1589 }
1590
1591 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1592 try {
1593 ctx.executor().execute(new Runnable() {
1594 @Override
1595 public void run() {
1596 ctx.fireChannelRead(decodedOut);
1597 }
1598 });
1599 } catch (RejectedExecutionException e) {
1600 decodedOut.release();
1601 throw e;
1602 }
1603 }
1604
1605 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1606 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1607 out.nioBuffer(index, len);
1608 }
1609
1610 private static boolean inEventLoop(Executor executor) {
1611 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1612 }
1613
1614
1615
1616
1617
1618
1619
1620
1621 private boolean runDelegatedTasks(boolean inUnwrap) {
1622 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1623
1624
1625 for (;;) {
1626 Runnable task = engine.getDelegatedTask();
1627 if (task == null) {
1628 return true;
1629 }
1630 setState(STATE_PROCESS_TASK);
1631 if (task instanceof AsyncRunnable) {
1632
1633 boolean pending = false;
1634 try {
1635 AsyncRunnable asyncTask = (AsyncRunnable) task;
1636 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1637 asyncTask.run(completionHandler);
1638 pending = completionHandler.resumeLater();
1639 if (pending) {
1640 return false;
1641 }
1642 } finally {
1643 if (!pending) {
1644
1645
1646 clearState(STATE_PROCESS_TASK);
1647 }
1648 }
1649 } else {
1650 try {
1651 task.run();
1652 } finally {
1653 clearState(STATE_PROCESS_TASK);
1654 }
1655 }
1656 }
1657 } else {
1658 executeDelegatedTask(inUnwrap);
1659 return false;
1660 }
1661 }
1662
1663 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1664 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1665 }
1666
1667 private void executeDelegatedTask(boolean inUnwrap) {
1668 executeDelegatedTask(getTaskRunner(inUnwrap));
1669 }
1670
1671 private void executeDelegatedTask(SslTasksRunner task) {
1672 setState(STATE_PROCESS_TASK);
1673 try {
1674 delegatedTaskExecutor.execute(task);
1675 } catch (RejectedExecutionException e) {
1676 clearState(STATE_PROCESS_TASK);
1677 throw e;
1678 }
1679 }
1680
1681 private final class AsyncTaskCompletionHandler implements Runnable {
1682 private final boolean inUnwrap;
1683 boolean didRun;
1684 boolean resumeLater;
1685
1686 AsyncTaskCompletionHandler(boolean inUnwrap) {
1687 this.inUnwrap = inUnwrap;
1688 }
1689
1690 @Override
1691 public void run() {
1692 didRun = true;
1693 if (resumeLater) {
1694 getTaskRunner(inUnwrap).runComplete();
1695 }
1696 }
1697
1698 boolean resumeLater() {
1699 if (!didRun) {
1700 resumeLater = true;
1701 return true;
1702 }
1703 return false;
1704 }
1705 }
1706
1707
1708
1709
1710
1711 private final class SslTasksRunner implements Runnable {
1712 private final boolean inUnwrap;
1713 private final Runnable runCompleteTask = new Runnable() {
1714 @Override
1715 public void run() {
1716 runComplete();
1717 }
1718 };
1719
1720 SslTasksRunner(boolean inUnwrap) {
1721 this.inUnwrap = inUnwrap;
1722 }
1723
1724
1725 private void taskError(Throwable e) {
1726 if (inUnwrap) {
1727
1728
1729
1730
1731 try {
1732 handleUnwrapThrowable(ctx, e);
1733 } catch (Throwable cause) {
1734 safeExceptionCaught(cause);
1735 }
1736 } else {
1737 setHandshakeFailure(ctx, e);
1738 forceFlush(ctx);
1739 }
1740 }
1741
1742
1743 private void safeExceptionCaught(Throwable cause) {
1744 try {
1745 exceptionCaught(ctx, wrapIfNeeded(cause));
1746 } catch (Throwable error) {
1747 ctx.fireExceptionCaught(error);
1748 }
1749 }
1750
1751 private Throwable wrapIfNeeded(Throwable cause) {
1752 if (!inUnwrap) {
1753
1754 return cause;
1755 }
1756
1757
1758 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1759 }
1760
1761 private void tryDecodeAgain() {
1762 try {
1763 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1764 } catch (Throwable cause) {
1765 safeExceptionCaught(cause);
1766 } finally {
1767
1768
1769
1770 channelReadComplete0(ctx);
1771 }
1772 }
1773
1774
1775
1776
1777
1778 private void resumeOnEventExecutor() {
1779 assert ctx.executor().inEventLoop();
1780 clearState(STATE_PROCESS_TASK);
1781 try {
1782 HandshakeStatus status = engine.getHandshakeStatus();
1783 switch (status) {
1784
1785
1786 case NEED_TASK:
1787 executeDelegatedTask(this);
1788
1789 break;
1790
1791
1792 case FINISHED:
1793
1794 case NOT_HANDSHAKING:
1795 setHandshakeSuccess();
1796 try {
1797
1798
1799 wrap(ctx, inUnwrap);
1800 } catch (Throwable e) {
1801 taskError(e);
1802 return;
1803 }
1804 if (inUnwrap) {
1805
1806
1807 unwrapNonAppData(ctx);
1808 }
1809
1810
1811 forceFlush(ctx);
1812
1813 tryDecodeAgain();
1814 break;
1815
1816
1817
1818 case NEED_UNWRAP:
1819 try {
1820 unwrapNonAppData(ctx);
1821 } catch (SSLException e) {
1822 handleUnwrapThrowable(ctx, e);
1823 return;
1824 }
1825 tryDecodeAgain();
1826 break;
1827
1828
1829
1830 case NEED_WRAP:
1831 try {
1832 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1833
1834
1835
1836
1837 unwrapNonAppData(ctx);
1838 }
1839
1840
1841 forceFlush(ctx);
1842 } catch (Throwable e) {
1843 taskError(e);
1844 return;
1845 }
1846
1847
1848 tryDecodeAgain();
1849 break;
1850
1851 default:
1852
1853 throw new AssertionError();
1854 }
1855 } catch (Throwable cause) {
1856 safeExceptionCaught(cause);
1857 }
1858 }
1859
1860 void runComplete() {
1861 EventExecutor executor = ctx.executor();
1862
1863
1864
1865
1866
1867
1868
1869 executor.execute(new Runnable() {
1870 @Override
1871 public void run() {
1872 resumeOnEventExecutor();
1873 }
1874 });
1875 }
1876
1877 @Override
1878 public void run() {
1879 try {
1880 Runnable task = engine.getDelegatedTask();
1881 if (task == null) {
1882
1883 return;
1884 }
1885 if (task instanceof AsyncRunnable) {
1886 AsyncRunnable asyncTask = (AsyncRunnable) task;
1887 asyncTask.run(runCompleteTask);
1888 } else {
1889 task.run();
1890 runComplete();
1891 }
1892 } catch (final Throwable cause) {
1893 handleException(cause);
1894 }
1895 }
1896
1897 private void handleException(final Throwable cause) {
1898 EventExecutor executor = ctx.executor();
1899 if (executor.inEventLoop()) {
1900 clearState(STATE_PROCESS_TASK);
1901 safeExceptionCaught(cause);
1902 } else {
1903 try {
1904 executor.execute(new Runnable() {
1905 @Override
1906 public void run() {
1907 clearState(STATE_PROCESS_TASK);
1908 safeExceptionCaught(cause);
1909 }
1910 });
1911 } catch (RejectedExecutionException ignore) {
1912 clearState(STATE_PROCESS_TASK);
1913
1914
1915 ctx.fireExceptionCaught(cause);
1916 }
1917 }
1918 }
1919 }
1920
1921
1922
1923
1924
1925
1926 private boolean setHandshakeSuccess() {
1927
1928
1929
1930 final boolean notified;
1931 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1932 if (logger.isDebugEnabled()) {
1933 SSLSession session = engine.getSession();
1934 logger.debug(
1935 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1936 ctx.channel(),
1937 session.getProtocol(),
1938 session.getCipherSuite());
1939 }
1940 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1941 }
1942 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1943 clearState(STATE_READ_DURING_HANDSHAKE);
1944 if (!ctx.channel().config().isAutoRead()) {
1945 ctx.read();
1946 }
1947 }
1948 return notified;
1949 }
1950
1951
1952
1953
1954 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1955 setHandshakeFailure(ctx, cause, true, true, false);
1956 }
1957
1958
1959
1960
1961 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1962 boolean notify, boolean alwaysFlushAndClose) {
1963 try {
1964
1965 setState(STATE_OUTBOUND_CLOSED);
1966 engine.closeOutbound();
1967
1968 if (closeInbound) {
1969 try {
1970 engine.closeInbound();
1971 } catch (SSLException e) {
1972 if (logger.isDebugEnabled()) {
1973
1974
1975
1976
1977 String msg = e.getMessage();
1978 if (msg == null || !(msg.contains("possible truncation attack") ||
1979 msg.contains("closing inbound before receiving peer's close_notify"))) {
1980 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1981 }
1982 }
1983 }
1984 }
1985 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1986 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1987 }
1988 } finally {
1989
1990 releaseAndFailAll(ctx, cause);
1991 }
1992 }
1993
1994 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1995
1996
1997
1998 try {
1999 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2000 releaseAndFailAll(ctx, transportFailure);
2001 if (handshakePromise.tryFailure(transportFailure)) {
2002 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2003 }
2004 } finally {
2005 ctx.close();
2006 }
2007 }
2008
2009 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2010 if (pendingUnencryptedWrites != null) {
2011 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2012 }
2013 }
2014
2015 private void notifyClosePromise(Throwable cause) {
2016 if (cause == null) {
2017 if (sslClosePromise.trySuccess(ctx.channel())) {
2018 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2019 }
2020 } else {
2021 if (sslClosePromise.tryFailure(cause)) {
2022 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2023 }
2024 }
2025 }
2026
2027 private void closeOutboundAndChannel(
2028 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2029 setState(STATE_OUTBOUND_CLOSED);
2030 engine.closeOutbound();
2031
2032 if (!ctx.channel().isActive()) {
2033 if (disconnect) {
2034 ctx.disconnect(promise);
2035 } else {
2036 ctx.close(promise);
2037 }
2038 return;
2039 }
2040
2041 ChannelPromise closeNotifyPromise = ctx.newPromise();
2042 try {
2043 flush(ctx, closeNotifyPromise);
2044 } finally {
2045 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2046 setState(STATE_CLOSE_NOTIFY);
2047
2048
2049
2050
2051
2052
2053
2054
2055 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2056 } else {
2057
2058 sslClosePromise.addListener(new FutureListener<Channel>() {
2059 @Override
2060 public void operationComplete(Future<Channel> future) {
2061 promise.setSuccess();
2062 }
2063 });
2064 }
2065 }
2066 }
2067
2068 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2069 if (pendingUnencryptedWrites != null) {
2070 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2071 } else {
2072 promise.setFailure(newPendingWritesNullException());
2073 }
2074 flush(ctx);
2075 }
2076
2077 @Override
2078 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2079 this.ctx = ctx;
2080 Channel channel = ctx.channel();
2081 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
2082
2083 setOpensslEngineSocketFd(channel);
2084 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2085 boolean active = channel.isActive();
2086 if (active || fastOpen) {
2087
2088
2089
2090 startHandshakeProcessing(active);
2091
2092
2093 final ChannelOutboundBuffer outboundBuffer;
2094 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2095 outboundBuffer.totalPendingWriteBytes() > 0)) {
2096 setState(STATE_NEEDS_FLUSH);
2097 }
2098 }
2099 }
2100
2101 private void startHandshakeProcessing(boolean flushAtEnd) {
2102 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2103 setState(STATE_HANDSHAKE_STARTED);
2104 if (engine.getUseClientMode()) {
2105
2106
2107
2108 handshake(flushAtEnd);
2109 }
2110 applyHandshakeTimeout();
2111 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2112 forceFlush(ctx);
2113 }
2114 }
2115
2116
2117
2118
2119 public Future<Channel> renegotiate() {
2120 ChannelHandlerContext ctx = this.ctx;
2121 if (ctx == null) {
2122 throw new IllegalStateException();
2123 }
2124
2125 return renegotiate(ctx.executor().<Channel>newPromise());
2126 }
2127
2128
2129
2130
2131 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2132 ObjectUtil.checkNotNull(promise, "promise");
2133
2134 ChannelHandlerContext ctx = this.ctx;
2135 if (ctx == null) {
2136 throw new IllegalStateException();
2137 }
2138
2139 EventExecutor executor = ctx.executor();
2140 if (!executor.inEventLoop()) {
2141 executor.execute(new Runnable() {
2142 @Override
2143 public void run() {
2144 renegotiateOnEventLoop(promise);
2145 }
2146 });
2147 return promise;
2148 }
2149
2150 renegotiateOnEventLoop(promise);
2151 return promise;
2152 }
2153
2154 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2155 final Promise<Channel> oldHandshakePromise = handshakePromise;
2156 if (!oldHandshakePromise.isDone()) {
2157
2158
2159 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2160 } else {
2161 handshakePromise = newHandshakePromise;
2162 handshake(true);
2163 applyHandshakeTimeout();
2164 }
2165 }
2166
2167
2168
2169
2170
2171
2172
2173 private void handshake(boolean flushAtEnd) {
2174 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2175
2176
2177 return;
2178 }
2179 if (handshakePromise.isDone()) {
2180
2181
2182
2183
2184
2185 return;
2186 }
2187
2188
2189 final ChannelHandlerContext ctx = this.ctx;
2190 try {
2191 engine.beginHandshake();
2192 wrapNonAppData(ctx, false);
2193 } catch (Throwable e) {
2194 setHandshakeFailure(ctx, e);
2195 } finally {
2196 if (flushAtEnd) {
2197 forceFlush(ctx);
2198 }
2199 }
2200 }
2201
2202 private void applyHandshakeTimeout() {
2203 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2204
2205
2206 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2207 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2208 return;
2209 }
2210
2211 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2212 @Override
2213 public void run() {
2214 if (localHandshakePromise.isDone()) {
2215 return;
2216 }
2217 SSLException exception =
2218 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2219 try {
2220 if (localHandshakePromise.tryFailure(exception)) {
2221 SslUtils.handleHandshakeFailure(ctx, exception, true);
2222 }
2223 } finally {
2224 releaseAndFailAll(ctx, exception);
2225 }
2226 }
2227 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2228
2229
2230 localHandshakePromise.addListener(new FutureListener<Channel>() {
2231 @Override
2232 public void operationComplete(Future<Channel> f) throws Exception {
2233 timeoutFuture.cancel(false);
2234 }
2235 });
2236 }
2237
2238 private void forceFlush(ChannelHandlerContext ctx) {
2239 clearState(STATE_NEEDS_FLUSH);
2240 ctx.flush();
2241 }
2242
2243 private void setOpensslEngineSocketFd(Channel c) {
2244 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2245 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2246 }
2247 }
2248
2249
2250
2251
2252 @Override
2253 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2254 setOpensslEngineSocketFd(ctx.channel());
2255 if (!startTls) {
2256 startHandshakeProcessing(true);
2257 }
2258 ctx.fireChannelActive();
2259 }
2260
2261 private void safeClose(
2262 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2263 final ChannelPromise promise) {
2264 if (!ctx.channel().isActive()) {
2265 ctx.close(promise);
2266 return;
2267 }
2268
2269 final Future<?> timeoutFuture;
2270 if (!flushFuture.isDone()) {
2271 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2272 if (closeNotifyTimeout > 0) {
2273
2274 timeoutFuture = ctx.executor().schedule(new Runnable() {
2275 @Override
2276 public void run() {
2277
2278 if (!flushFuture.isDone()) {
2279 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2280 ctx.channel());
2281 addCloseListener(ctx.close(ctx.newPromise()), promise);
2282 }
2283 }
2284 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2285 } else {
2286 timeoutFuture = null;
2287 }
2288 } else {
2289 timeoutFuture = null;
2290 }
2291
2292
2293 flushFuture.addListener(new ChannelFutureListener() {
2294 @Override
2295 public void operationComplete(ChannelFuture f) {
2296 if (timeoutFuture != null) {
2297 timeoutFuture.cancel(false);
2298 }
2299 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2300 if (closeNotifyReadTimeout <= 0) {
2301
2302
2303 addCloseListener(ctx.close(ctx.newPromise()), promise);
2304 } else {
2305 final Future<?> closeNotifyReadTimeoutFuture;
2306
2307 if (!sslClosePromise.isDone()) {
2308 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2309 @Override
2310 public void run() {
2311 if (!sslClosePromise.isDone()) {
2312 logger.debug(
2313 "{} did not receive close_notify in {}ms; force-closing the connection.",
2314 ctx.channel(), closeNotifyReadTimeout);
2315
2316
2317 addCloseListener(ctx.close(ctx.newPromise()), promise);
2318 }
2319 }
2320 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2321 } else {
2322 closeNotifyReadTimeoutFuture = null;
2323 }
2324
2325
2326 sslClosePromise.addListener(new FutureListener<Channel>() {
2327 @Override
2328 public void operationComplete(Future<Channel> future) throws Exception {
2329 if (closeNotifyReadTimeoutFuture != null) {
2330 closeNotifyReadTimeoutFuture.cancel(false);
2331 }
2332 addCloseListener(ctx.close(ctx.newPromise()), promise);
2333 }
2334 });
2335 }
2336 }
2337 });
2338 }
2339
2340 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2341
2342
2343
2344
2345
2346
2347 PromiseNotifier.cascade(false, future, promise);
2348 }
2349
2350
2351
2352
2353
2354 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2355 ByteBufAllocator alloc = ctx.alloc();
2356 if (engineType.wantsDirectBuffer) {
2357 return alloc.directBuffer(capacity);
2358 } else {
2359 return alloc.buffer(capacity);
2360 }
2361 }
2362
2363
2364
2365
2366
2367 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2368 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2369 }
2370
2371 private boolean isStateSet(int bit) {
2372 return (state & bit) == bit;
2373 }
2374
2375 private void setState(int bit) {
2376 state |= bit;
2377 }
2378
2379 private void clearState(int bit) {
2380 state &= ~bit;
2381 }
2382
2383
2384
2385
2386
2387
2388 private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2389
2390 SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
2391 super(channel, initSize);
2392 }
2393
2394 @Override
2395 protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
2396 final int wrapDataSize = SslHandler.this.wrapDataSize;
2397 if (cumulation instanceof CompositeByteBuf) {
2398 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
2399 int numComponents = composite.numComponents();
2400 if (numComponents == 0 ||
2401 !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
2402 composite.addComponent(true, next);
2403 }
2404 return composite;
2405 }
2406 return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
2407 copyAndCompose(alloc, cumulation, next);
2408 }
2409
2410 @Override
2411 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
2412 if (first instanceof CompositeByteBuf) {
2413 CompositeByteBuf composite = (CompositeByteBuf) first;
2414 if (engineType.wantsDirectBuffer) {
2415 first = allocator.directBuffer(composite.readableBytes());
2416 } else {
2417 first = allocator.heapBuffer(composite.readableBytes());
2418 }
2419 try {
2420 first.writeBytes(composite);
2421 } catch (Throwable cause) {
2422 first.release();
2423 PlatformDependent.throwException(cause);
2424 }
2425 composite.release();
2426 }
2427 return first;
2428 }
2429
2430 @Override
2431 protected ByteBuf removeEmptyValue() {
2432 return null;
2433 }
2434 }
2435
2436 private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
2437 final int inReadableBytes = next.readableBytes();
2438 final int cumulationCapacity = cumulation.capacity();
2439 if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
2440
2441
2442
2443 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
2444 cumulationCapacity < wrapDataSize &&
2445 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
2446 cumulation.writeBytes(next);
2447 next.release();
2448 return true;
2449 }
2450 return false;
2451 }
2452
2453 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2454
2455 @Override
2456 protected EventExecutor executor() {
2457 if (ctx == null) {
2458 throw new IllegalStateException();
2459 }
2460 return ctx.executor();
2461 }
2462
2463 @Override
2464 protected void checkDeadLock() {
2465 if (ctx == null) {
2466
2467
2468
2469
2470
2471
2472 return;
2473 }
2474 super.checkDeadLock();
2475 }
2476 }
2477 }