1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelDownstreamHandler;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.Channels;
30 import org.jboss.netty.channel.DefaultChannelFuture;
31 import org.jboss.netty.channel.DownstreamMessageEvent;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.handler.codec.frame.FrameDecoder;
35 import org.jboss.netty.logging.InternalLogger;
36 import org.jboss.netty.logging.InternalLoggerFactory;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40 import org.jboss.netty.util.internal.DetectionUtil;
41 import org.jboss.netty.util.internal.NonReentrantLock;
42
43 import javax.net.ssl.SSLEngine;
44 import javax.net.ssl.SSLEngineResult;
45 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
46 import javax.net.ssl.SSLEngineResult.Status;
47 import javax.net.ssl.SSLException;
48 import java.io.IOException;
49 import java.nio.ByteBuffer;
50 import java.nio.channels.ClosedChannelException;
51 import java.nio.channels.DatagramChannel;
52 import java.nio.channels.SocketChannel;
53 import java.util.ArrayList;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Queue;
57 import java.util.concurrent.ConcurrentLinkedQueue;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
60 import java.util.regex.Pattern;
61
62 import static org.jboss.netty.channel.Channels.*;
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 public class SslHandler extends FrameDecoder
181 implements ChannelDownstreamHandler {
182
183 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class);
184
185 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
186
187 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
188 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
189 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
190 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
191
192 private static SslBufferPool defaultBufferPool;
193
194
195
196
197
198 public static synchronized SslBufferPool getDefaultBufferPool() {
199 if (defaultBufferPool == null) {
200 defaultBufferPool = new SslBufferPool();
201 }
202 return defaultBufferPool;
203 }
204
205 private volatile ChannelHandlerContext ctx;
206 private final SSLEngine engine;
207 private final SslBufferPool bufferPool;
208 private final boolean startTls;
209
210 private volatile boolean enableRenegotiation = true;
211
212 final Object handshakeLock = new Object();
213 private boolean handshaking;
214 private volatile boolean handshaken;
215 private volatile ChannelFuture handshakeFuture;
216
217 @SuppressWarnings("UnusedDeclaration")
218 private volatile int sentFirstMessage;
219 @SuppressWarnings("UnusedDeclaration")
220 private volatile int sentCloseNotify;
221 @SuppressWarnings("UnusedDeclaration")
222 private volatile int closedOutboundAndChannel;
223
224 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_FIRST_MESSAGE_UPDATER =
225 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentFirstMessage");
226 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_CLOSE_NOTIFY_UPDATER =
227 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentCloseNotify");
228 private static final AtomicIntegerFieldUpdater<SslHandler> CLOSED_OUTBOUND_AND_CHANNEL_UPDATER =
229 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "closedOutboundAndChannel");
230
231 int ignoreClosedChannelException;
232 final Object ignoreClosedChannelExceptionLock = new Object();
233 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
234 private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
235 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
236 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
237
238 private volatile boolean issueHandshake;
239 private volatile boolean writeBeforeHandshakeDone;
240 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
241
242 private boolean closeOnSslException;
243
244 private int packetLength;
245
246 private final Timer timer;
247 private final long handshakeTimeoutInMillis;
248 private Timeout handshakeTimeout;
249
250
251
252
253
254
255 public SslHandler(SSLEngine engine) {
256 this(engine, getDefaultBufferPool(), false, null, 0);
257 }
258
259
260
261
262
263
264
265
266 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
267 this(engine, bufferPool, false, null, 0);
268 }
269
270
271
272
273
274
275
276
277 public SslHandler(SSLEngine engine, boolean startTls) {
278 this(engine, getDefaultBufferPool(), startTls);
279 }
280
281
282
283
284
285
286
287
288
289
290 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
291 this(engine, bufferPool, startTls, null, 0);
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Timer timer,
313 long handshakeTimeoutInMillis) {
314 if (engine == null) {
315 throw new NullPointerException("engine");
316 }
317 if (bufferPool == null) {
318 throw new NullPointerException("bufferPool");
319 }
320 if (timer == null && handshakeTimeoutInMillis > 0) {
321 throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
322 }
323
324 this.engine = engine;
325 this.bufferPool = bufferPool;
326 this.startTls = startTls;
327 this.timer = timer;
328 this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
329 }
330
331
332
333
334 public SSLEngine getEngine() {
335 return engine;
336 }
337
338
339
340
341
342
343
344 public ChannelFuture handshake() {
345 synchronized (handshakeLock) {
346 if (handshaken && !isEnableRenegotiation()) {
347 throw new IllegalStateException("renegotiation disabled");
348 }
349
350 final ChannelHandlerContext ctx = this.ctx;
351 final Channel channel = ctx.getChannel();
352 ChannelFuture handshakeFuture;
353 Exception exception = null;
354
355 if (handshaking) {
356 return this.handshakeFuture;
357 }
358
359 handshaking = true;
360 try {
361 engine.beginHandshake();
362 runDelegatedTasks();
363 handshakeFuture = this.handshakeFuture = future(channel);
364 if (handshakeTimeoutInMillis > 0) {
365 handshakeTimeout = timer.newTimeout(new TimerTask() {
366 public void run(Timeout timeout) throws Exception {
367 ChannelFuture future = SslHandler.this.handshakeFuture;
368 if (future != null && future.isDone()) {
369 return;
370 }
371
372 setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
373 handshakeTimeoutInMillis + "ms"));
374 }
375 }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
376 }
377 } catch (Exception e) {
378 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
379 exception = e;
380 }
381
382 if (exception == null) {
383 try {
384 final ChannelFuture hsFuture = handshakeFuture;
385 wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
386 public void operationComplete(ChannelFuture future) throws Exception {
387 if (!future.isSuccess()) {
388 Throwable cause = future.getCause();
389 hsFuture.setFailure(cause);
390
391 fireExceptionCaught(ctx, cause);
392 if (closeOnSslException) {
393 Channels.close(ctx, future(channel));
394 }
395 }
396 }
397 });
398 } catch (SSLException e) {
399 handshakeFuture.setFailure(e);
400
401 fireExceptionCaught(ctx, e);
402 if (closeOnSslException) {
403 Channels.close(ctx, future(channel));
404 }
405 }
406 } else {
407 fireExceptionCaught(ctx, exception);
408 if (closeOnSslException) {
409 Channels.close(ctx, future(channel));
410 }
411 }
412 return handshakeFuture;
413 }
414 }
415
416
417
418
419
420 public ChannelFuture close() {
421 ChannelHandlerContext ctx = this.ctx;
422 Channel channel = ctx.getChannel();
423 try {
424 engine.closeOutbound();
425 return wrapNonAppData(ctx, channel);
426 } catch (SSLException e) {
427 fireExceptionCaught(ctx, e);
428 if (closeOnSslException) {
429 Channels.close(ctx, future(channel));
430 }
431 return failedFuture(channel, e);
432 }
433 }
434
435
436
437
438 public boolean isEnableRenegotiation() {
439 return enableRenegotiation;
440 }
441
442
443
444
445 public void setEnableRenegotiation(boolean enableRenegotiation) {
446 this.enableRenegotiation = enableRenegotiation;
447 }
448
449
450
451
452
453
454 public void setIssueHandshake(boolean issueHandshake) {
455 this.issueHandshake = issueHandshake;
456 }
457
458
459
460
461 public boolean isIssueHandshake() {
462 return issueHandshake;
463 }
464
465
466
467
468
469
470
471
472
473 public ChannelFuture getSSLEngineInboundCloseFuture() {
474 return sslEngineCloseFuture;
475 }
476
477
478
479
480
481 public long getHandshakeTimeout() {
482 return handshakeTimeoutInMillis;
483 }
484
485
486
487
488
489
490
491
492
493
494 public void setCloseOnSSLException(boolean closeOnSslException) {
495 if (ctx != null) {
496 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
497 }
498 this.closeOnSslException = closeOnSslException;
499 }
500
501 public boolean getCloseOnSSLException() {
502 return closeOnSslException;
503 }
504
505 public void handleDownstream(
506 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
507 if (evt instanceof ChannelStateEvent) {
508 ChannelStateEvent e = (ChannelStateEvent) evt;
509 switch (e.getState()) {
510 case OPEN:
511 case CONNECTED:
512 case BOUND:
513 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
514 closeOutboundAndChannel(context, e);
515 return;
516 }
517 }
518 }
519 if (!(evt instanceof MessageEvent)) {
520 context.sendDownstream(evt);
521 return;
522 }
523
524 MessageEvent e = (MessageEvent) evt;
525 if (!(e.getMessage() instanceof ChannelBuffer)) {
526 context.sendDownstream(evt);
527 return;
528 }
529
530
531
532 if (startTls && SENT_FIRST_MESSAGE_UPDATER.compareAndSet(this, 0, 1)) {
533 context.sendDownstream(evt);
534 return;
535 }
536
537
538 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
539 PendingWrite pendingWrite;
540
541 if (msg.readable()) {
542 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
543 } else {
544 pendingWrite = new PendingWrite(evt.getFuture(), null);
545 }
546
547 pendingUnencryptedWritesLock.lock();
548 try {
549 pendingUnencryptedWrites.add(pendingWrite);
550 } finally {
551 pendingUnencryptedWritesLock.unlock();
552 }
553
554 if (handshakeFuture == null || !handshakeFuture.isDone()) {
555 writeBeforeHandshakeDone = true;
556 }
557 wrap(context, evt.getChannel());
558 }
559
560 private void cancelHandshakeTimeout() {
561 if (handshakeTimeout != null) {
562
563 handshakeTimeout.cancel();
564 }
565 }
566
567 @Override
568 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
569
570
571
572 synchronized (handshakeLock) {
573 if (handshaking) {
574 cancelHandshakeTimeout();
575 handshakeFuture.setFailure(new ClosedChannelException());
576 }
577 }
578
579 try {
580 super.channelDisconnected(ctx, e);
581 } finally {
582 unwrapNonAppData(ctx, e.getChannel(), false);
583 closeEngine();
584 }
585 }
586
587 private void closeEngine() {
588 engine.closeOutbound();
589 if (sentCloseNotify == 0 && handshaken) {
590 try {
591 engine.closeInbound();
592 } catch (SSLException ex) {
593 if (logger.isDebugEnabled()) {
594 logger.debug("Failed to clean up SSLEngine.", ex);
595 }
596 }
597 }
598 }
599
600 @Override
601 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
602 throws Exception {
603
604 Throwable cause = e.getCause();
605 if (cause instanceof IOException) {
606 if (cause instanceof ClosedChannelException) {
607 synchronized (ignoreClosedChannelExceptionLock) {
608 if (ignoreClosedChannelException > 0) {
609 ignoreClosedChannelException --;
610 if (logger.isDebugEnabled()) {
611 logger.debug(
612 "Swallowing an exception raised while " +
613 "writing non-app data", cause);
614 }
615
616 return;
617 }
618 }
619 } else {
620 if (ignoreException(cause)) {
621 return;
622 }
623 }
624 }
625
626 ctx.sendUpstream(e);
627 }
628
629
630
631
632
633
634
635
636
637
638 private boolean ignoreException(Throwable t) {
639 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
640 String message = String.valueOf(t.getMessage()).toLowerCase();
641
642
643
644 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
645 return true;
646 }
647
648
649 StackTraceElement[] elements = t.getStackTrace();
650 for (StackTraceElement element: elements) {
651 String classname = element.getClassName();
652 String methodname = element.getMethodName();
653
654
655 if (classname.startsWith("org.jboss.netty.")) {
656 continue;
657 }
658
659
660 if (!"read".equals(methodname)) {
661 continue;
662 }
663
664
665
666 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
667 return true;
668 }
669
670 try {
671
672
673
674 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
675
676 if (SocketChannel.class.isAssignableFrom(clazz)
677 || DatagramChannel.class.isAssignableFrom(clazz)) {
678 return true;
679 }
680
681
682 if (DetectionUtil.javaVersion() >= 7
683 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
684 return true;
685 }
686 } catch (ClassNotFoundException e) {
687
688 }
689 }
690 }
691
692 return false;
693 }
694
695
696
697
698
699
700
701
702
703
704
705
706
707 public static boolean isEncrypted(ChannelBuffer buffer) {
708 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != -1;
709 }
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724 private static int getEncryptedPacketLength(ChannelBuffer buffer, int offset) {
725 int packetLength = 0;
726
727
728 boolean tls;
729 switch (buffer.getUnsignedByte(offset)) {
730 case 20:
731 case 21:
732 case 22:
733 case 23:
734 tls = true;
735 break;
736 default:
737
738 tls = false;
739 }
740
741 if (tls) {
742
743 int majorVersion = buffer.getUnsignedByte(offset + 1);
744 if (majorVersion == 3) {
745
746 packetLength = (getShort(buffer, offset + 3) & 0xFFFF) + 5;
747 if (packetLength <= 5) {
748
749 tls = false;
750 }
751 } else {
752
753 tls = false;
754 }
755 }
756
757 if (!tls) {
758
759 boolean sslv2 = true;
760 int headerLength = (buffer.getUnsignedByte(offset) & 0x80) != 0 ? 2 : 3;
761 int majorVersion = buffer.getUnsignedByte(offset + headerLength + 1);
762 if (majorVersion == 2 || majorVersion == 3) {
763
764 if (headerLength == 2) {
765 packetLength = (getShort(buffer, offset) & 0x7FFF) + 2;
766 } else {
767 packetLength = (getShort(buffer, offset) & 0x3FFF) + 3;
768 }
769 if (packetLength <= headerLength) {
770 sslv2 = false;
771 }
772 } else {
773 sslv2 = false;
774 }
775
776 if (!sslv2) {
777 return -1;
778 }
779 }
780 return packetLength;
781 }
782
783 @Override
784 protected Object decode(
785 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception {
786
787 final int startOffset = in.readerIndex();
788 final int endOffset = in.writerIndex();
789 int offset = startOffset;
790 int totalLength = 0;
791
792
793 if (packetLength > 0) {
794 if (endOffset - startOffset < packetLength) {
795 return null;
796 } else {
797 offset += packetLength;
798 totalLength = packetLength;
799 packetLength = 0;
800 }
801 }
802
803 boolean nonSslRecord = false;
804
805 while (totalLength < OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
806 final int readableBytes = endOffset - offset;
807 if (readableBytes < 5) {
808 break;
809 }
810
811 final int packetLength = getEncryptedPacketLength(in, offset);
812 if (packetLength == -1) {
813 nonSslRecord = true;
814 break;
815 }
816
817 assert packetLength > 0;
818
819 if (packetLength > readableBytes) {
820
821 this.packetLength = packetLength;
822 break;
823 }
824
825 int newTotalLength = totalLength + packetLength;
826 if (newTotalLength > OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
827
828 break;
829 }
830
831
832
833 offset += packetLength;
834 totalLength = newTotalLength;
835 }
836
837 ChannelBuffer unwrapped = null;
838 if (totalLength > 0) {
839
840
841
842
843
844
845
846
847
848
849
850 in.skipBytes(totalLength);
851 final ByteBuffer inNetBuf = in.toByteBuffer(startOffset, totalLength);
852 unwrapped = unwrap(ctx, channel, inNetBuf, totalLength, true);
853 }
854
855 if (nonSslRecord) {
856
857 NotSslRecordException e = new NotSslRecordException(
858 "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
859 in.skipBytes(in.readableBytes());
860 if (closeOnSslException) {
861
862 fireExceptionCaught(ctx, e);
863 Channels.close(ctx, future(channel));
864
865
866
867 return null;
868 } else {
869 throw e;
870 }
871 }
872
873 return unwrapped;
874 }
875
876
877
878
879
880 private static short getShort(ChannelBuffer buf, int offset) {
881 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
882 }
883
884 private void wrap(ChannelHandlerContext context, Channel channel) throws SSLException {
885 ChannelBuffer msg;
886 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
887 boolean success = true;
888 boolean offered = false;
889 boolean needsUnwrap = false;
890 PendingWrite pendingWrite = null;
891
892 try {
893 loop:
894 for (;;) {
895
896
897
898 pendingUnencryptedWritesLock.lock();
899 try {
900 pendingWrite = pendingUnencryptedWrites.peek();
901 if (pendingWrite == null) {
902 break;
903 }
904
905 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
906 if (outAppBuf == null) {
907
908 pendingUnencryptedWrites.remove();
909 offerEncryptedWriteRequest(
910 new DownstreamMessageEvent(
911 channel, pendingWrite.future,
912 ChannelBuffers.EMPTY_BUFFER,
913 channel.getRemoteAddress()));
914 offered = true;
915 } else {
916 synchronized (handshakeLock) {
917 SSLEngineResult result = null;
918 try {
919 result = engine.wrap(outAppBuf, outNetBuf);
920 } finally {
921 if (!outAppBuf.hasRemaining()) {
922 pendingUnencryptedWrites.remove();
923 }
924 }
925
926 if (result.bytesProduced() > 0) {
927 outNetBuf.flip();
928 int remaining = outNetBuf.remaining();
929 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
930
931
932
933
934
935 msg.writeBytes(outNetBuf);
936 outNetBuf.clear();
937
938 ChannelFuture future;
939 if (pendingWrite.outAppBuf.hasRemaining()) {
940
941
942 future = succeededFuture(channel);
943 } else {
944 future = pendingWrite.future;
945 }
946
947 MessageEvent encryptedWrite = new DownstreamMessageEvent(
948 channel, future, msg, channel.getRemoteAddress());
949 offerEncryptedWriteRequest(encryptedWrite);
950 offered = true;
951 } else if (result.getStatus() == Status.CLOSED) {
952
953
954 success = false;
955 break;
956 } else {
957 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
958 handleRenegotiation(handshakeStatus);
959 switch (handshakeStatus) {
960 case NEED_WRAP:
961 if (outAppBuf.hasRemaining()) {
962 break;
963 } else {
964 break loop;
965 }
966 case NEED_UNWRAP:
967 needsUnwrap = true;
968 break loop;
969 case NEED_TASK:
970 runDelegatedTasks();
971 break;
972 case FINISHED:
973 setHandshakeSuccess(channel);
974 if (result.getStatus() == Status.CLOSED) {
975 success = false;
976 }
977 break loop;
978 case NOT_HANDSHAKING:
979 setHandshakeSuccessIfStillHandshaking(channel);
980 if (result.getStatus() == Status.CLOSED) {
981 success = false;
982 }
983 break loop;
984 default:
985 throw new IllegalStateException(
986 "Unknown handshake status: " +
987 handshakeStatus);
988 }
989 }
990 }
991 }
992 } finally {
993 pendingUnencryptedWritesLock.unlock();
994 }
995 }
996 } catch (SSLException e) {
997 success = false;
998 setHandshakeFailure(channel, e);
999 throw e;
1000 } finally {
1001 bufferPool.releaseBuffer(outNetBuf);
1002
1003 if (offered) {
1004 flushPendingEncryptedWrites(context);
1005 }
1006
1007 if (!success) {
1008 IllegalStateException cause =
1009 new IllegalStateException("SSLEngine already closed");
1010
1011
1012
1013 if (pendingWrite != null) {
1014 pendingWrite.future.setFailure(cause);
1015 }
1016
1017
1018
1019
1020
1021 for (;;) {
1022 pendingUnencryptedWritesLock.lock();
1023 try {
1024 pendingWrite = pendingUnencryptedWrites.poll();
1025 if (pendingWrite == null) {
1026 break;
1027 }
1028 } finally {
1029 pendingUnencryptedWritesLock.unlock();
1030 }
1031
1032 pendingWrite.future.setFailure(cause);
1033 }
1034 }
1035 }
1036
1037 if (needsUnwrap) {
1038 unwrapNonAppData(ctx, channel, true);
1039 }
1040 }
1041
1042 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1043 final boolean locked = pendingEncryptedWritesLock.tryLock();
1044 try {
1045 pendingEncryptedWrites.add(encryptedWrite);
1046 } finally {
1047 if (locked) {
1048 pendingEncryptedWritesLock.unlock();
1049 }
1050 }
1051 }
1052
1053 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1054 while (!pendingEncryptedWrites.isEmpty()) {
1055
1056
1057
1058 if (!pendingEncryptedWritesLock.tryLock()) {
1059 return;
1060 }
1061
1062 try {
1063 MessageEvent e;
1064 while ((e = pendingEncryptedWrites.poll()) != null) {
1065 ctx.sendDownstream(e);
1066 }
1067 } finally {
1068 pendingEncryptedWritesLock.unlock();
1069 }
1070
1071
1072 }
1073 }
1074
1075 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1076 ChannelFuture future = null;
1077 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1078
1079 SSLEngineResult result;
1080 try {
1081 for (;;) {
1082 synchronized (handshakeLock) {
1083 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1084 }
1085
1086 if (result.bytesProduced() > 0) {
1087 outNetBuf.flip();
1088 ChannelBuffer msg =
1089 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1090
1091
1092
1093
1094
1095 msg.writeBytes(outNetBuf);
1096 outNetBuf.clear();
1097
1098 future = future(channel);
1099 future.addListener(new ChannelFutureListener() {
1100 public void operationComplete(ChannelFuture future)
1101 throws Exception {
1102 if (future.getCause() instanceof ClosedChannelException) {
1103 synchronized (ignoreClosedChannelExceptionLock) {
1104 ignoreClosedChannelException ++;
1105 }
1106 }
1107 }
1108 });
1109
1110 write(ctx, future, msg);
1111 }
1112
1113 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1114 handleRenegotiation(handshakeStatus);
1115 switch (handshakeStatus) {
1116 case FINISHED:
1117 setHandshakeSuccess(channel);
1118 runDelegatedTasks();
1119 break;
1120 case NEED_TASK:
1121 runDelegatedTasks();
1122 break;
1123 case NEED_UNWRAP:
1124 if (!Thread.holdsLock(handshakeLock)) {
1125
1126
1127
1128 unwrapNonAppData(ctx, channel, true);
1129 }
1130 break;
1131 case NOT_HANDSHAKING:
1132 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1133 runDelegatedTasks();
1134 }
1135 break;
1136 case NEED_WRAP:
1137 break;
1138 default:
1139 throw new IllegalStateException(
1140 "Unexpected handshake status: " + handshakeStatus);
1141 }
1142
1143 if (result.bytesProduced() == 0) {
1144 break;
1145 }
1146 }
1147 } catch (SSLException e) {
1148 setHandshakeFailure(channel, e);
1149 throw e;
1150 } finally {
1151 bufferPool.releaseBuffer(outNetBuf);
1152 }
1153
1154 if (future == null) {
1155 future = succeededFuture(channel);
1156 }
1157
1158 return future;
1159 }
1160
1161
1162
1163
1164 private void unwrapNonAppData(
1165 ChannelHandlerContext ctx, Channel channel, boolean mightNeedHandshake) throws SSLException {
1166 unwrap(ctx, channel, EMPTY_BUFFER, -1, mightNeedHandshake);
1167 }
1168
1169
1170
1171
1172 private ChannelBuffer unwrap(
1173 ChannelHandlerContext ctx, Channel channel,
1174 ByteBuffer nioInNetBuf,
1175 int initialNettyOutAppBufCapacity, boolean mightNeedHandshake) throws SSLException {
1176
1177 final int nioInNetBufStartOffset = nioInNetBuf.position();
1178 final ByteBuffer nioOutAppBuf = bufferPool.acquireBuffer();
1179
1180 ChannelBuffer nettyOutAppBuf = null;
1181
1182 try {
1183 boolean needsWrap = false;
1184 for (;;) {
1185 SSLEngineResult result;
1186 boolean needsHandshake = false;
1187 if (mightNeedHandshake) {
1188 synchronized (handshakeLock) {
1189 if (!handshaken && !handshaking &&
1190 !engine.getUseClientMode() &&
1191 !engine.isInboundDone() && !engine.isOutboundDone()) {
1192 needsHandshake = true;
1193 }
1194 }
1195 }
1196
1197 if (needsHandshake) {
1198 handshake();
1199 }
1200
1201 synchronized (handshakeLock) {
1202
1203
1204
1205
1206 for (;;) {
1207 final int outAppBufSize = engine.getSession().getApplicationBufferSize();
1208 final ByteBuffer outAppBuf;
1209 if (nioOutAppBuf.capacity() < outAppBufSize) {
1210
1211
1212 outAppBuf = ByteBuffer.allocate(outAppBufSize);
1213 } else {
1214 outAppBuf = nioOutAppBuf;
1215 }
1216
1217 try {
1218 result = engine.unwrap(nioInNetBuf, outAppBuf);
1219 switch (result.getStatus()) {
1220 case CLOSED:
1221
1222 sslEngineCloseFuture.setClosed();
1223 break;
1224 case BUFFER_OVERFLOW:
1225
1226
1227 continue;
1228 }
1229
1230 break;
1231 } finally {
1232 outAppBuf.flip();
1233
1234
1235 if (outAppBuf.hasRemaining()) {
1236 if (nettyOutAppBuf == null) {
1237 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
1238 nettyOutAppBuf = factory.getBuffer(initialNettyOutAppBufCapacity);
1239 }
1240 nettyOutAppBuf.writeBytes(outAppBuf);
1241 }
1242 outAppBuf.clear();
1243 }
1244 }
1245
1246 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1247 handleRenegotiation(handshakeStatus);
1248 switch (handshakeStatus) {
1249 case NEED_UNWRAP:
1250 break;
1251 case NEED_WRAP:
1252 wrapNonAppData(ctx, channel);
1253 break;
1254 case NEED_TASK:
1255 runDelegatedTasks();
1256 break;
1257 case FINISHED:
1258 setHandshakeSuccess(channel);
1259 needsWrap = true;
1260 continue;
1261 case NOT_HANDSHAKING:
1262 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1263 needsWrap = true;
1264 continue;
1265 }
1266 if (writeBeforeHandshakeDone) {
1267
1268
1269
1270 writeBeforeHandshakeDone = false;
1271 needsWrap = true;
1272 }
1273 break;
1274 default:
1275 throw new IllegalStateException(
1276 "Unknown handshake status: " + handshakeStatus);
1277 }
1278
1279 if (result.getStatus() == Status.BUFFER_UNDERFLOW ||
1280 result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
1281 if (nioInNetBuf.hasRemaining() && !engine.isInboundDone()) {
1282
1283
1284
1285
1286
1287 logger.warn("Unexpected leftover data after SSLEngine.unwrap():"
1288 + " status=" + result.getStatus()
1289 + " handshakeStatus=" + result.getHandshakeStatus()
1290 + " consumed=" + result.bytesConsumed()
1291 + " produced=" + result.bytesProduced()
1292 + " remaining=" + nioInNetBuf.remaining()
1293 + " data=" + ChannelBuffers.hexDump(ChannelBuffers.wrappedBuffer(nioInNetBuf)));
1294 }
1295 break;
1296 }
1297 }
1298 }
1299
1300 if (needsWrap) {
1301
1302
1303
1304
1305
1306
1307
1308
1309 if (!Thread.holdsLock(handshakeLock) && !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1310 wrap(ctx, channel);
1311 }
1312 }
1313 } catch (SSLException e) {
1314 setHandshakeFailure(channel, e);
1315 throw e;
1316 } finally {
1317 bufferPool.releaseBuffer(nioOutAppBuf);
1318 }
1319
1320 if (nettyOutAppBuf != null && nettyOutAppBuf.readable()) {
1321 return nettyOutAppBuf;
1322 } else {
1323 return null;
1324 }
1325 }
1326
1327 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1328 synchronized (handshakeLock) {
1329 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1330 handshakeStatus == HandshakeStatus.FINISHED) {
1331
1332 return;
1333 }
1334
1335 if (!handshaken) {
1336
1337 return;
1338 }
1339
1340 final boolean renegotiate;
1341 if (handshaking) {
1342
1343
1344 return;
1345 }
1346
1347 if (engine.isInboundDone() || engine.isOutboundDone()) {
1348
1349 return;
1350 }
1351
1352 if (isEnableRenegotiation()) {
1353
1354 renegotiate = true;
1355 } else {
1356
1357 renegotiate = false;
1358
1359 handshaking = true;
1360 }
1361
1362 if (renegotiate) {
1363
1364 handshake();
1365 } else {
1366
1367 fireExceptionCaught(
1368 ctx, new SSLException(
1369 "renegotiation attempted by peer; " +
1370 "closing the connection"));
1371
1372
1373 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1374 }
1375 }
1376 }
1377
1378
1379
1380
1381 private void runDelegatedTasks() {
1382 for (;;) {
1383 final Runnable task;
1384 synchronized (handshakeLock) {
1385 task = engine.getDelegatedTask();
1386 }
1387
1388 if (task == null) {
1389 break;
1390 }
1391
1392 task.run();
1393 }
1394 }
1395
1396
1397
1398
1399
1400
1401
1402
1403 private boolean setHandshakeSuccessIfStillHandshaking(Channel channel) {
1404 if (handshaking && !handshakeFuture.isDone()) {
1405 setHandshakeSuccess(channel);
1406 return true;
1407 }
1408 return false;
1409 }
1410
1411 private void setHandshakeSuccess(Channel channel) {
1412 synchronized (handshakeLock) {
1413 handshaking = false;
1414 handshaken = true;
1415
1416 if (handshakeFuture == null) {
1417 handshakeFuture = future(channel);
1418 }
1419 cancelHandshakeTimeout();
1420 }
1421
1422 if (logger.isDebugEnabled()) {
1423 logger.debug(channel + " HANDSHAKEN: " + engine.getSession().getCipherSuite());
1424 }
1425
1426 handshakeFuture.setSuccess();
1427 }
1428
1429 private void setHandshakeFailure(Channel channel, SSLException cause) {
1430 synchronized (handshakeLock) {
1431 if (!handshaking) {
1432 return;
1433 }
1434 handshaking = false;
1435 handshaken = false;
1436
1437 if (handshakeFuture == null) {
1438 handshakeFuture = future(channel);
1439 }
1440
1441
1442 cancelHandshakeTimeout();
1443
1444
1445
1446
1447 engine.closeOutbound();
1448
1449 try {
1450 engine.closeInbound();
1451 } catch (SSLException e) {
1452 if (logger.isDebugEnabled()) {
1453 logger.debug(
1454 "SSLEngine.closeInbound() raised an exception after " +
1455 "a handshake failure.", e);
1456 }
1457 }
1458 }
1459
1460 handshakeFuture.setFailure(cause);
1461 if (closeOnSslException) {
1462 Channels.close(ctx, future(channel));
1463 }
1464 }
1465
1466 private void closeOutboundAndChannel(
1467 final ChannelHandlerContext context, final ChannelStateEvent e) {
1468 if (!e.getChannel().isConnected()) {
1469 context.sendDownstream(e);
1470 return;
1471 }
1472
1473
1474 if (!CLOSED_OUTBOUND_AND_CHANNEL_UPDATER.compareAndSet(this, 0, 1)) {
1475
1476
1477
1478 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
1479 public void operationComplete(ChannelFuture future) throws Exception {
1480 context.sendDownstream(e);
1481 }
1482 });
1483 return;
1484 }
1485
1486 boolean passthrough = true;
1487 try {
1488 try {
1489 unwrapNonAppData(ctx, e.getChannel(), false);
1490 } catch (SSLException ex) {
1491 if (logger.isDebugEnabled()) {
1492 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1493 }
1494 }
1495
1496 if (!engine.isOutboundDone()) {
1497 if (SENT_CLOSE_NOTIFY_UPDATER.compareAndSet(this, 0, 1)) {
1498 engine.closeOutbound();
1499 try {
1500 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1501 closeNotifyFuture.addListener(
1502 new ClosingChannelFutureListener(context, e));
1503 passthrough = false;
1504 } catch (SSLException ex) {
1505 if (logger.isDebugEnabled()) {
1506 logger.debug("Failed to encode a close_notify message", ex);
1507 }
1508 }
1509 }
1510 }
1511 } finally {
1512 if (passthrough) {
1513 context.sendDownstream(e);
1514 }
1515 }
1516 }
1517
1518 private static final class PendingWrite {
1519 final ChannelFuture future;
1520 final ByteBuffer outAppBuf;
1521
1522 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1523 this.future = future;
1524 this.outAppBuf = outAppBuf;
1525 }
1526 }
1527
1528 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1529
1530 private final ChannelHandlerContext context;
1531 private final ChannelStateEvent e;
1532
1533 ClosingChannelFutureListener(
1534 ChannelHandlerContext context, ChannelStateEvent e) {
1535 this.context = context;
1536 this.e = e;
1537 }
1538
1539 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1540 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1541 Channels.close(context, e.getFuture());
1542 } else {
1543 e.getFuture().setSuccess();
1544 }
1545 }
1546 }
1547
1548 @Override
1549 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1550 super.beforeAdd(ctx);
1551 this.ctx = ctx;
1552 }
1553
1554
1555
1556
1557 @Override
1558 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1559 closeEngine();
1560
1561
1562 Throwable cause = null;
1563 for (;;) {
1564 PendingWrite pw = pendingUnencryptedWrites.poll();
1565 if (pw == null) {
1566 break;
1567 }
1568 if (cause == null) {
1569 cause = new IOException("Unable to write data");
1570 }
1571 pw.future.setFailure(cause);
1572 }
1573
1574 for (;;) {
1575 MessageEvent ev = pendingEncryptedWrites.poll();
1576 if (ev == null) {
1577 break;
1578 }
1579 if (cause == null) {
1580 cause = new IOException("Unable to write data");
1581 }
1582 ev.getFuture().setFailure(cause);
1583 }
1584
1585 if (cause != null) {
1586 fireExceptionCaughtLater(ctx, cause);
1587 }
1588 }
1589
1590
1591
1592
1593 @Override
1594 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1595 if (issueHandshake) {
1596
1597
1598 handshake().addListener(new ChannelFutureListener() {
1599
1600 public void operationComplete(ChannelFuture future) throws Exception {
1601 if (future.isSuccess()) {
1602
1603
1604
1605 ctx.sendUpstream(e);
1606 }
1607 }
1608 });
1609 } else {
1610 super.channelConnected(ctx, e);
1611 }
1612 }
1613
1614
1615
1616
1617
1618
1619 @Override
1620 public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1621
1622
1623 ctx.getPipeline().execute(new Runnable() {
1624 public void run() {
1625 if (!pendingUnencryptedWritesLock.tryLock()) {
1626 return;
1627 }
1628
1629 List<ChannelFuture> futures = null;
1630 try {
1631 for (;;) {
1632 PendingWrite pw = pendingUnencryptedWrites.poll();
1633 if (pw == null) {
1634 break;
1635 }
1636 if (futures == null) {
1637 futures = new ArrayList<ChannelFuture>();
1638 }
1639 futures.add(pw.future);
1640 }
1641
1642 for (;;) {
1643 MessageEvent ev = pendingEncryptedWrites.poll();
1644 if (ev == null) {
1645 break;
1646 }
1647 if (futures == null) {
1648 futures = new ArrayList<ChannelFuture>();
1649 }
1650 futures.add(ev.getFuture());
1651 }
1652 } finally {
1653 pendingUnencryptedWritesLock.unlock();
1654 }
1655
1656 if (futures != null) {
1657 final ClosedChannelException cause = new ClosedChannelException();
1658 final int size = futures.size();
1659 for (int i = 0; i < size; i ++) {
1660 futures.get(i).setFailure(cause);
1661 }
1662 fireExceptionCaught(ctx, cause);
1663 }
1664 }
1665 });
1666
1667 super.channelClosed(ctx, e);
1668 }
1669
1670 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1671 SSLEngineInboundCloseFuture() {
1672 super(null, true);
1673 }
1674
1675 void setClosed() {
1676 super.setSuccess();
1677 }
1678
1679 @Override
1680 public Channel getChannel() {
1681 if (ctx == null) {
1682
1683 return null;
1684 } else {
1685 return ctx.getChannel();
1686 }
1687 }
1688
1689 @Override
1690 public boolean setSuccess() {
1691 return false;
1692 }
1693
1694 @Override
1695 public boolean setFailure(Throwable cause) {
1696 return false;
1697 }
1698 }
1699 }