1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
20 import io.netty.channel.socket.ChannelOutputShutdownException;
21 import io.netty.util.DefaultAttributeMap;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.ThrowableUtil;
25 import io.netty.util.internal.UnstableApi;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.io.IOException;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.NoRouteToHostException;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.nio.channels.ClosedChannelException;
36 import java.nio.channels.NotYetConnectedException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.RejectedExecutionException;
39
40
41
42
43 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46
47 private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48 new ClosedChannelException(), AbstractUnsafe.class, "flush0()");
49 private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
50 new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");
51 private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
52 new ClosedChannelException(), AbstractUnsafe.class, "close(...)");
53 private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
54 new ClosedChannelException(), AbstractUnsafe.class, "write(...)");
55 private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
56 new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");
57
58 private final Channel parent;
59 private final long hashCode = PlatformDependent.threadLocalRandom().nextLong();
60 private final Unsafe unsafe;
61 private final DefaultChannelPipeline pipeline;
62 private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
63 private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
64 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
65 private final CloseFuture closeFuture = new CloseFuture(this);
66
67 private volatile SocketAddress localAddress;
68 private volatile SocketAddress remoteAddress;
69 private volatile EventLoop eventLoop;
70 private volatile boolean registered;
71 private boolean closeInitiated;
72
73
74 private boolean strValActive;
75 private String strVal;
76
77
78
79
80
81
82
83 protected AbstractChannel(Channel parent) {
84 this.parent = parent;
85 unsafe = newUnsafe();
86 pipeline = newChannelPipeline();
87 }
88
89
90
91
92 protected DefaultChannelPipeline newChannelPipeline() {
93 return new DefaultChannelPipeline(this);
94 }
95
96 @Override
97 public boolean isWritable() {
98 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
99 return buf != null && buf.isWritable();
100 }
101
102 @Override
103 public Channel parent() {
104 return parent;
105 }
106
107 @Override
108 public ChannelPipeline pipeline() {
109 return pipeline;
110 }
111
112 @Override
113 public ByteBufAllocator alloc() {
114 return config().getAllocator();
115 }
116
117 @Override
118 public EventLoop eventLoop() {
119 EventLoop eventLoop = this.eventLoop;
120 if (eventLoop == null) {
121 throw new IllegalStateException("channel not registered to an event loop");
122 }
123 return eventLoop;
124 }
125
126 @Override
127 public SocketAddress localAddress() {
128 SocketAddress localAddress = this.localAddress;
129 if (localAddress == null) {
130 try {
131 this.localAddress = localAddress = unsafe().localAddress();
132 } catch (Throwable t) {
133
134 return null;
135 }
136 }
137 return localAddress;
138 }
139
140
141
142
143 @Deprecated
144 protected void invalidateLocalAddress() {
145 localAddress = null;
146 }
147
148 @Override
149 public SocketAddress remoteAddress() {
150 SocketAddress remoteAddress = this.remoteAddress;
151 if (remoteAddress == null) {
152 try {
153 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
154 } catch (Throwable t) {
155
156 return null;
157 }
158 }
159 return remoteAddress;
160 }
161
162
163
164
165 @Deprecated
166 protected void invalidateRemoteAddress() {
167 remoteAddress = null;
168 }
169
170 @Override
171 public boolean isRegistered() {
172 return registered;
173 }
174
175 @Override
176 public ChannelFuture bind(SocketAddress localAddress) {
177 return pipeline.bind(localAddress);
178 }
179
180 @Override
181 public ChannelFuture connect(SocketAddress remoteAddress) {
182 return pipeline.connect(remoteAddress);
183 }
184
185 @Override
186 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
187 return pipeline.connect(remoteAddress, localAddress);
188 }
189
190 @Override
191 public ChannelFuture disconnect() {
192 return pipeline.disconnect();
193 }
194
195 @Override
196 public ChannelFuture close() {
197 return pipeline.close();
198 }
199
200 @Override
201 public ChannelFuture deregister() {
202 return pipeline.deregister();
203 }
204
205 @Override
206 public Channel flush() {
207 pipeline.flush();
208 return this;
209 }
210
211 @Override
212 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
213 return pipeline.bind(localAddress, promise);
214 }
215
216 @Override
217 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
218 return pipeline.connect(remoteAddress, promise);
219 }
220
221 @Override
222 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
223 return pipeline.connect(remoteAddress, localAddress, promise);
224 }
225
226 @Override
227 public ChannelFuture disconnect(ChannelPromise promise) {
228 return pipeline.disconnect(promise);
229 }
230
231 @Override
232 public ChannelFuture close(ChannelPromise promise) {
233 return pipeline.close(promise);
234 }
235
236 @Override
237 public ChannelFuture deregister(ChannelPromise promise) {
238 return pipeline.deregister(promise);
239 }
240
241 @Override
242 public Channel read() {
243 pipeline.read();
244 return this;
245 }
246
247 @Override
248 public ChannelFuture write(Object msg) {
249 return pipeline.write(msg);
250 }
251
252 @Override
253 public ChannelFuture write(Object msg, ChannelPromise promise) {
254 return pipeline.write(msg, promise);
255 }
256
257 @Override
258 public ChannelFuture writeAndFlush(Object msg) {
259 return pipeline.writeAndFlush(msg);
260 }
261
262 @Override
263 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
264 return pipeline.writeAndFlush(msg, promise);
265 }
266
267 @Override
268 public ChannelPromise newPromise() {
269 return new DefaultChannelPromise(this);
270 }
271
272 @Override
273 public ChannelProgressivePromise newProgressivePromise() {
274 return new DefaultChannelProgressivePromise(this);
275 }
276
277 @Override
278 public ChannelFuture newSucceededFuture() {
279 return succeededFuture;
280 }
281
282 @Override
283 public ChannelFuture newFailedFuture(Throwable cause) {
284 return new FailedChannelFuture(this, null, cause);
285 }
286
287 @Override
288 public ChannelFuture closeFuture() {
289 return closeFuture;
290 }
291
292 @Override
293 public Unsafe unsafe() {
294 return unsafe;
295 }
296
297
298
299
300 protected abstract AbstractUnsafe newUnsafe();
301
302
303
304
305 @Override
306 public final int hashCode() {
307 return (int) hashCode;
308 }
309
310
311
312
313
314 @Override
315 public final boolean equals(Object o) {
316 return this == o;
317 }
318
319 @Override
320 public final int compareTo(Channel o) {
321 if (this == o) {
322 return 0;
323 }
324
325 long ret = hashCode - o.hashCode();
326 if (ret > 0) {
327 return 1;
328 }
329 if (ret < 0) {
330 return -1;
331 }
332
333 ret = System.identityHashCode(this) - System.identityHashCode(o);
334 if (ret != 0) {
335 return (int) ret;
336 }
337
338
339 throw new Error();
340 }
341
342
343
344
345
346
347
348 @Override
349 public String toString() {
350 boolean active = isActive();
351 if (strValActive == active && strVal != null) {
352 return strVal;
353 }
354
355 SocketAddress remoteAddr = remoteAddress();
356 SocketAddress localAddr = localAddress();
357 if (remoteAddr != null) {
358 String activeNotation = active? "-" : "!";
359 strVal = String.format("[id: 0x%08x, L:%s %s R:%s]", (int) hashCode, localAddr, activeNotation, remoteAddr);
360 } else if (localAddr != null) {
361 strVal = String.format("[id: 0x%08x, L:%s]", (int) hashCode, localAddr);
362 } else {
363 strVal = String.format("[id: 0x%08x]", (int) hashCode);
364 }
365
366 strValActive = active;
367 return strVal;
368 }
369
370 @Override
371 public final ChannelPromise voidPromise() {
372 return voidPromise;
373 }
374
375
376
377
378 protected abstract class AbstractUnsafe implements Unsafe {
379
380 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
381 private boolean inFlush0;
382
383 private boolean neverRegistered = true;
384
385 private void assertEventLoop() {
386 assert !registered || eventLoop.inEventLoop();
387 }
388
389 @Override
390 public final ChannelOutboundBuffer outboundBuffer() {
391 return outboundBuffer;
392 }
393
394 @Override
395 public final SocketAddress localAddress() {
396 return localAddress0();
397 }
398
399 @Override
400 public final SocketAddress remoteAddress() {
401 return remoteAddress0();
402 }
403
404 @Override
405 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
406 if (eventLoop == null) {
407 throw new NullPointerException("eventLoop");
408 }
409 if (isRegistered()) {
410 promise.setFailure(new IllegalStateException("registered to an event loop already"));
411 return;
412 }
413 if (!isCompatible(eventLoop)) {
414 promise.setFailure(
415 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
416 return;
417 }
418
419 AbstractChannel.this.eventLoop = eventLoop;
420
421 if (eventLoop.inEventLoop()) {
422 register0(promise);
423 } else {
424 try {
425 eventLoop.execute(new Runnable() {
426 @Override
427 public void run() {
428 register0(promise);
429 }
430 });
431 } catch (Throwable t) {
432 logger.warn(
433 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
434 AbstractChannel.this, t);
435 closeForcibly();
436 closeFuture.setClosed();
437 safeSetFailure(promise, t);
438 }
439 }
440 }
441
442 private void register0(ChannelPromise promise) {
443 try {
444
445
446 if (!promise.setUncancellable() || !ensureOpen(promise)) {
447 return;
448 }
449 boolean firstRegistration = neverRegistered;
450 doRegister();
451 neverRegistered = false;
452 registered = true;
453
454
455
456 pipeline.invokeHandlerAddedIfNeeded();
457
458 safeSetSuccess(promise);
459 pipeline.fireChannelRegistered();
460
461
462 if (isActive()) {
463 if (firstRegistration) {
464 pipeline.fireChannelActive();
465 } else if (config().isAutoRead()) {
466
467
468
469
470 beginRead();
471 }
472 }
473 } catch (Throwable t) {
474
475 closeForcibly();
476 closeFuture.setClosed();
477 safeSetFailure(promise, t);
478 }
479 }
480
481 @Override
482 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
483 assertEventLoop();
484
485 if (!promise.setUncancellable() || !ensureOpen(promise)) {
486 return;
487 }
488
489
490 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
491 localAddress instanceof InetSocketAddress &&
492 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
493 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
494
495
496 logger.warn(
497 "A non-root user can't receive a broadcast packet if the socket " +
498 "is not bound to a wildcard address; binding to a non-wildcard " +
499 "address (" + localAddress + ") anyway as requested.");
500 }
501
502 boolean wasActive = isActive();
503 try {
504 doBind(localAddress);
505 } catch (Throwable t) {
506 safeSetFailure(promise, t);
507 closeIfClosed();
508 return;
509 }
510
511 if (!wasActive && isActive()) {
512 invokeLater(new Runnable() {
513 @Override
514 public void run() {
515 pipeline.fireChannelActive();
516 }
517 });
518 }
519
520 safeSetSuccess(promise);
521 }
522
523 @Override
524 public final void disconnect(final ChannelPromise promise) {
525 assertEventLoop();
526
527 if (!promise.setUncancellable()) {
528 return;
529 }
530
531 boolean wasActive = isActive();
532 try {
533 doDisconnect();
534 } catch (Throwable t) {
535 safeSetFailure(promise, t);
536 closeIfClosed();
537 return;
538 }
539
540 if (wasActive && !isActive()) {
541 invokeLater(new Runnable() {
542 @Override
543 public void run() {
544 pipeline.fireChannelInactive();
545 }
546 });
547 }
548
549 safeSetSuccess(promise);
550 closeIfClosed();
551 }
552
553 @Override
554 public final void close(final ChannelPromise promise) {
555 assertEventLoop();
556
557 close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
558 }
559
560
561
562
563
564 @UnstableApi
565 public final void shutdownOutput(final ChannelPromise promise) {
566 assertEventLoop();
567 shutdownOutput(promise, null);
568 }
569
570
571
572
573
574
575 private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
576 if (!promise.setUncancellable()) {
577 return;
578 }
579
580 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
581 if (outboundBuffer == null) {
582 promise.setFailure(CLOSE_CLOSED_CHANNEL_EXCEPTION);
583 return;
584 }
585 this.outboundBuffer = null;
586
587 final Throwable shutdownCause = cause == null ?
588 new ChannelOutputShutdownException("Channel output shutdown") :
589 new ChannelOutputShutdownException("Channel output shutdown", cause);
590 Executor closeExecutor = prepareToClose();
591 if (closeExecutor != null) {
592 closeExecutor.execute(new Runnable() {
593 @Override
594 public void run() {
595 try {
596
597 doShutdownOutput();
598 promise.setSuccess();
599 } catch (Throwable err) {
600 promise.setFailure(err);
601 } finally {
602
603 eventLoop().execute(new Runnable() {
604 @Override
605 public void run() {
606 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
607 }
608 });
609 }
610 }
611 });
612 } else {
613 try {
614
615 doShutdownOutput();
616 promise.setSuccess();
617 } catch (Throwable err) {
618 promise.setFailure(err);
619 } finally {
620 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
621 }
622 }
623 }
624
625 private void closeOutboundBufferForShutdown(
626 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
627 buffer.failFlushed(cause, false);
628 buffer.close(cause, true);
629 pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
630 }
631
632 private void close(final ChannelPromise promise, final Throwable cause,
633 final ClosedChannelException closeCause, final boolean notify) {
634 if (!promise.setUncancellable()) {
635 return;
636 }
637
638 if (closeInitiated) {
639 if (closeFuture.isDone()) {
640
641 safeSetSuccess(promise);
642 } else if (!(promise instanceof VoidChannelPromise)) {
643
644 closeFuture.addListener(new ChannelFutureListener() {
645 @Override
646 public void operationComplete(ChannelFuture future) throws Exception {
647 promise.setSuccess();
648 }
649 });
650 }
651 return;
652 }
653
654 closeInitiated = true;
655
656 final boolean wasActive = isActive();
657 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
658 this.outboundBuffer = null;
659 Executor closeExecutor = prepareToClose();
660 if (closeExecutor != null) {
661 closeExecutor.execute(new Runnable() {
662 @Override
663 public void run() {
664 try {
665
666 doClose0(promise);
667 } finally {
668
669 invokeLater(new Runnable() {
670 @Override
671 public void run() {
672 if (outboundBuffer != null) {
673
674 outboundBuffer.failFlushed(cause, notify);
675 outboundBuffer.close(closeCause);
676 }
677 fireChannelInactiveAndDeregister(wasActive);
678 }
679 });
680 }
681 }
682 });
683 } else {
684 try {
685
686 doClose0(promise);
687 } finally {
688 if (outboundBuffer != null) {
689
690 outboundBuffer.failFlushed(cause, notify);
691 outboundBuffer.close(closeCause);
692 }
693 }
694 if (inFlush0) {
695 invokeLater(new Runnable() {
696 @Override
697 public void run() {
698 fireChannelInactiveAndDeregister(wasActive);
699 }
700 });
701 } else {
702 fireChannelInactiveAndDeregister(wasActive);
703 }
704 }
705 }
706
707 private void doClose0(ChannelPromise promise) {
708 try {
709 doClose();
710 closeFuture.setClosed();
711 safeSetSuccess(promise);
712 } catch (Throwable t) {
713 closeFuture.setClosed();
714 safeSetFailure(promise, t);
715 }
716 }
717
718 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
719 deregister(voidPromise(), wasActive && !isActive());
720 }
721
722 @Override
723 public final void closeForcibly() {
724 assertEventLoop();
725
726 try {
727 doClose();
728 } catch (Exception e) {
729 logger.warn("Failed to close a channel.", e);
730 }
731 }
732
733 @Override
734 public final void deregister(final ChannelPromise promise) {
735 assertEventLoop();
736
737 deregister(promise, false);
738 }
739
740 private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
741 if (!promise.setUncancellable()) {
742 return;
743 }
744
745 if (!registered) {
746 safeSetSuccess(promise);
747 return;
748 }
749
750
751
752
753
754
755
756
757
758
759 invokeLater(new Runnable() {
760 @Override
761 public void run() {
762 try {
763 doDeregister();
764 } catch (Throwable t) {
765 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
766 } finally {
767 if (fireChannelInactive) {
768 pipeline.fireChannelInactive();
769 }
770
771
772
773
774 if (registered) {
775 registered = false;
776 pipeline.fireChannelUnregistered();
777 }
778 safeSetSuccess(promise);
779 }
780 }
781 });
782 }
783
784 @Override
785 public final void beginRead() {
786 assertEventLoop();
787
788 if (!isActive()) {
789 return;
790 }
791
792 try {
793 doBeginRead();
794 } catch (final Exception e) {
795 invokeLater(new Runnable() {
796 @Override
797 public void run() {
798 pipeline.fireExceptionCaught(e);
799 }
800 });
801 close(voidPromise());
802 }
803 }
804
805 @Override
806 public final void write(Object msg, ChannelPromise promise) {
807 assertEventLoop();
808
809 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
810 if (outboundBuffer == null) {
811
812
813
814
815 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
816
817 ReferenceCountUtil.release(msg);
818 return;
819 }
820
821 int size;
822 try {
823 msg = filterOutboundMessage(msg);
824 size = pipeline.estimatorHandle().size(msg);
825 if (size < 0) {
826 size = 0;
827 }
828 } catch (Throwable t) {
829 safeSetFailure(promise, t);
830 ReferenceCountUtil.release(msg);
831 return;
832 }
833
834 outboundBuffer.addMessage(msg, size, promise);
835 }
836
837 @Override
838 public final void flush() {
839 assertEventLoop();
840
841 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
842 if (outboundBuffer == null) {
843 return;
844 }
845
846 outboundBuffer.addFlush();
847 flush0();
848 }
849
850 protected void flush0() {
851 if (inFlush0) {
852
853 return;
854 }
855
856 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
857 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
858 return;
859 }
860
861 inFlush0 = true;
862
863
864 if (!isActive()) {
865 try {
866 if (isOpen()) {
867 outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
868 } else {
869
870 outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
871 }
872 } finally {
873 inFlush0 = false;
874 }
875 return;
876 }
877
878 try {
879 doWrite(outboundBuffer);
880 } catch (Throwable t) {
881 if (t instanceof IOException && config().isAutoClose()) {
882
883
884
885
886
887
888
889
890 close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
891 } else {
892 try {
893 shutdownOutput(voidPromise(), t);
894 } catch (Throwable t2) {
895 close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
896 }
897 }
898 } finally {
899 inFlush0 = false;
900 }
901 }
902
903 @Override
904 public final ChannelPromise voidPromise() {
905 assertEventLoop();
906
907 return unsafeVoidPromise;
908 }
909
910 protected final boolean ensureOpen(ChannelPromise promise) {
911 if (isOpen()) {
912 return true;
913 }
914
915 safeSetFailure(promise, ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION);
916 return false;
917 }
918
919
920
921
922 protected final void safeSetSuccess(ChannelPromise promise) {
923 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
924 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
925 }
926 }
927
928
929
930
931 protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
932 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
933 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
934 }
935 }
936
937 protected final void closeIfClosed() {
938 if (isOpen()) {
939 return;
940 }
941 close(voidPromise());
942 }
943
944 private void invokeLater(Runnable task) {
945 try {
946
947
948
949
950
951
952
953
954
955
956
957 eventLoop().execute(task);
958 } catch (RejectedExecutionException e) {
959 logger.warn("Can't invoke task later as EventLoop rejected it", e);
960 }
961 }
962
963
964
965
966 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
967 if (cause instanceof ConnectException) {
968 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
969 }
970 if (cause instanceof NoRouteToHostException) {
971 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
972 }
973 if (cause instanceof SocketException) {
974 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
975 }
976
977 return cause;
978 }
979
980
981
982
983
984
985
986 protected Executor prepareToClose() {
987 return null;
988 }
989 }
990
991
992
993
994 protected abstract boolean isCompatible(EventLoop loop);
995
996
997
998
999 protected abstract SocketAddress localAddress0();
1000
1001
1002
1003
1004 protected abstract SocketAddress remoteAddress0();
1005
1006
1007
1008
1009
1010
1011 protected void doRegister() throws Exception {
1012
1013 }
1014
1015
1016
1017
1018 protected abstract void doBind(SocketAddress localAddress) throws Exception;
1019
1020
1021
1022
1023 protected abstract void doDisconnect() throws Exception;
1024
1025
1026
1027
1028 protected abstract void doClose() throws Exception;
1029
1030
1031
1032
1033
1034 @UnstableApi
1035 protected void doShutdownOutput() throws Exception {
1036 doClose();
1037 }
1038
1039
1040
1041
1042
1043
1044 protected void doDeregister() throws Exception {
1045
1046 }
1047
1048
1049
1050
1051 protected abstract void doBeginRead() throws Exception;
1052
1053
1054
1055
1056 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1057
1058
1059
1060
1061
1062 protected Object filterOutboundMessage(Object msg) throws Exception {
1063 return msg;
1064 }
1065
1066 static final class CloseFuture extends DefaultChannelPromise {
1067
1068 CloseFuture(AbstractChannel ch) {
1069 super(ch);
1070 }
1071
1072 @Override
1073 public ChannelPromise setSuccess() {
1074 throw new IllegalStateException();
1075 }
1076
1077 @Override
1078 public ChannelPromise setFailure(Throwable cause) {
1079 throw new IllegalStateException();
1080 }
1081
1082 @Override
1083 public boolean trySuccess() {
1084 throw new IllegalStateException();
1085 }
1086
1087 @Override
1088 public boolean tryFailure(Throwable cause) {
1089 throw new IllegalStateException();
1090 }
1091
1092 boolean setClosed() {
1093 return super.trySuccess();
1094 }
1095 }
1096
1097 private static final class AnnotatedConnectException extends ConnectException {
1098
1099 private static final long serialVersionUID = 3901958112696433556L;
1100
1101 AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1102 super(exception.getMessage() + ": " + remoteAddress);
1103 initCause(exception);
1104 setStackTrace(exception.getStackTrace());
1105 }
1106
1107 @Override
1108 public Throwable fillInStackTrace() {
1109 return this;
1110 }
1111 }
1112
1113 private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1114
1115 private static final long serialVersionUID = -6801433937592080623L;
1116
1117 AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1118 super(exception.getMessage() + ": " + remoteAddress);
1119 initCause(exception);
1120 setStackTrace(exception.getStackTrace());
1121 }
1122
1123 @Override
1124 public Throwable fillInStackTrace() {
1125 return this;
1126 }
1127 }
1128
1129 private static final class AnnotatedSocketException extends SocketException {
1130
1131 private static final long serialVersionUID = 3896743275010454039L;
1132
1133 AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1134 super(exception.getMessage() + ": " + remoteAddress);
1135 initCause(exception);
1136 setStackTrace(exception.getStackTrace());
1137 }
1138
1139 @Override
1140 public Throwable fillInStackTrace() {
1141 return this;
1142 }
1143 }
1144 }