1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.channel.socket.ChannelOutputShutdownEvent;
20 import io.netty.channel.socket.ChannelOutputShutdownException;
21 import io.netty.util.DefaultAttributeMap;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.UnstableApi;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.io.IOException;
30 import java.net.ConnectException;
31 import java.net.InetSocketAddress;
32 import java.net.NoRouteToHostException;
33 import java.net.SocketAddress;
34 import java.net.SocketException;
35 import java.nio.channels.ClosedChannelException;
36 import java.nio.channels.NotYetConnectedException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.RejectedExecutionException;
39
40
41
42
43 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
44
45 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
46
47 private final Channel parent;
48 private final ChannelId id;
49 private final Unsafe unsafe;
50 private final DefaultChannelPipeline pipeline;
51 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
52 private final CloseFuture closeFuture = new CloseFuture(this);
53
54 private volatile SocketAddress localAddress;
55 private volatile SocketAddress remoteAddress;
56 private volatile EventLoop eventLoop;
57 private volatile boolean registered;
58 private boolean closeInitiated;
59 private Throwable initialCloseCause;
60
61
62 private boolean strValActive;
63 private String strVal;
64
65
66
67
68
69
70
71 protected AbstractChannel(Channel parent) {
72 this.parent = parent;
73 id = newId();
74 unsafe = newUnsafe();
75 pipeline = newChannelPipeline();
76 }
77
78
79
80
81
82
83
84 protected AbstractChannel(Channel parent, ChannelId id) {
85 this.parent = parent;
86 this.id = id;
87 unsafe = newUnsafe();
88 pipeline = newChannelPipeline();
89 }
90
91 protected final int maxMessagesPerWrite() {
92 ChannelConfig config = config();
93 if (config instanceof DefaultChannelConfig) {
94 return ((DefaultChannelConfig) config).getMaxMessagesPerWrite();
95 }
96 Integer value = config.getOption(ChannelOption.MAX_MESSAGES_PER_WRITE);
97 if (value == null) {
98 return Integer.MAX_VALUE;
99 }
100 return value;
101 }
102
103 @Override
104 public final ChannelId id() {
105 return id;
106 }
107
108
109
110
111
112 protected ChannelId newId() {
113 return DefaultChannelId.newInstance();
114 }
115
116
117
118
119 protected DefaultChannelPipeline newChannelPipeline() {
120 return new DefaultChannelPipeline(this);
121 }
122
123 @Override
124 public boolean isWritable() {
125 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
126 return buf != null && buf.isWritable();
127 }
128
129 @Override
130 public long bytesBeforeUnwritable() {
131 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
132
133
134 return buf != null ? buf.bytesBeforeUnwritable() : 0;
135 }
136
137 @Override
138 public long bytesBeforeWritable() {
139 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
140
141
142 return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
143 }
144
145 @Override
146 public Channel parent() {
147 return parent;
148 }
149
150 @Override
151 public ChannelPipeline pipeline() {
152 return pipeline;
153 }
154
155 @Override
156 public ByteBufAllocator alloc() {
157 return config().getAllocator();
158 }
159
160 @Override
161 public EventLoop eventLoop() {
162 EventLoop eventLoop = this.eventLoop;
163 if (eventLoop == null) {
164 throw new IllegalStateException("channel not registered to an event loop");
165 }
166 return eventLoop;
167 }
168
169 @Override
170 public SocketAddress localAddress() {
171 SocketAddress localAddress = this.localAddress;
172 if (localAddress == null) {
173 try {
174 this.localAddress = localAddress = unsafe().localAddress();
175 } catch (Error e) {
176 throw e;
177 } catch (Throwable t) {
178
179 return null;
180 }
181 }
182 return localAddress;
183 }
184
185
186
187
188 @Deprecated
189 protected void invalidateLocalAddress() {
190 localAddress = null;
191 }
192
193 @Override
194 public SocketAddress remoteAddress() {
195 SocketAddress remoteAddress = this.remoteAddress;
196 if (remoteAddress == null) {
197 try {
198 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
199 } catch (Error e) {
200 throw e;
201 } catch (Throwable t) {
202
203 return null;
204 }
205 }
206 return remoteAddress;
207 }
208
209
210
211
212 @Deprecated
213 protected void invalidateRemoteAddress() {
214 remoteAddress = null;
215 }
216
217 @Override
218 public boolean isRegistered() {
219 return registered;
220 }
221
222 @Override
223 public ChannelFuture bind(SocketAddress localAddress) {
224 return pipeline.bind(localAddress);
225 }
226
227 @Override
228 public ChannelFuture connect(SocketAddress remoteAddress) {
229 return pipeline.connect(remoteAddress);
230 }
231
232 @Override
233 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
234 return pipeline.connect(remoteAddress, localAddress);
235 }
236
237 @Override
238 public ChannelFuture disconnect() {
239 return pipeline.disconnect();
240 }
241
242 @Override
243 public ChannelFuture close() {
244 return pipeline.close();
245 }
246
247 @Override
248 public ChannelFuture deregister() {
249 return pipeline.deregister();
250 }
251
252 @Override
253 public Channel flush() {
254 pipeline.flush();
255 return this;
256 }
257
258 @Override
259 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
260 return pipeline.bind(localAddress, promise);
261 }
262
263 @Override
264 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
265 return pipeline.connect(remoteAddress, promise);
266 }
267
268 @Override
269 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
270 return pipeline.connect(remoteAddress, localAddress, promise);
271 }
272
273 @Override
274 public ChannelFuture disconnect(ChannelPromise promise) {
275 return pipeline.disconnect(promise);
276 }
277
278 @Override
279 public ChannelFuture close(ChannelPromise promise) {
280 return pipeline.close(promise);
281 }
282
283 @Override
284 public ChannelFuture deregister(ChannelPromise promise) {
285 return pipeline.deregister(promise);
286 }
287
288 @Override
289 public Channel read() {
290 pipeline.read();
291 return this;
292 }
293
294 @Override
295 public ChannelFuture write(Object msg) {
296 return pipeline.write(msg);
297 }
298
299 @Override
300 public ChannelFuture write(Object msg, ChannelPromise promise) {
301 return pipeline.write(msg, promise);
302 }
303
304 @Override
305 public ChannelFuture writeAndFlush(Object msg) {
306 return pipeline.writeAndFlush(msg);
307 }
308
309 @Override
310 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
311 return pipeline.writeAndFlush(msg, promise);
312 }
313
314 @Override
315 public ChannelPromise newPromise() {
316 return pipeline.newPromise();
317 }
318
319 @Override
320 public ChannelProgressivePromise newProgressivePromise() {
321 return pipeline.newProgressivePromise();
322 }
323
324 @Override
325 public ChannelFuture newSucceededFuture() {
326 return pipeline.newSucceededFuture();
327 }
328
329 @Override
330 public ChannelFuture newFailedFuture(Throwable cause) {
331 return pipeline.newFailedFuture(cause);
332 }
333
334 @Override
335 public ChannelFuture closeFuture() {
336 return closeFuture;
337 }
338
339 @Override
340 public Unsafe unsafe() {
341 return unsafe;
342 }
343
344
345
346
347 protected abstract AbstractUnsafe newUnsafe();
348
349
350
351
352 @Override
353 public final int hashCode() {
354 return id.hashCode();
355 }
356
357
358
359
360
361 @Override
362 public final boolean equals(Object o) {
363 return this == o;
364 }
365
366 @Override
367 public final int compareTo(Channel o) {
368 if (this == o) {
369 return 0;
370 }
371
372 return id().compareTo(o.id());
373 }
374
375
376
377
378
379
380
381 @Override
382 public String toString() {
383 boolean active = isActive();
384 if (strValActive == active && strVal != null) {
385 return strVal;
386 }
387
388 SocketAddress remoteAddr = remoteAddress();
389 SocketAddress localAddr = localAddress();
390 if (remoteAddr != null) {
391 StringBuilder buf = new StringBuilder(96)
392 .append("[id: 0x")
393 .append(id.asShortText())
394 .append(", L:")
395 .append(localAddr)
396 .append(active? " - " : " ! ")
397 .append("R:")
398 .append(remoteAddr)
399 .append(']');
400 strVal = buf.toString();
401 } else if (localAddr != null) {
402 StringBuilder buf = new StringBuilder(64)
403 .append("[id: 0x")
404 .append(id.asShortText())
405 .append(", L:")
406 .append(localAddr)
407 .append(']');
408 strVal = buf.toString();
409 } else {
410 StringBuilder buf = new StringBuilder(16)
411 .append("[id: 0x")
412 .append(id.asShortText())
413 .append(']');
414 strVal = buf.toString();
415 }
416
417 strValActive = active;
418 return strVal;
419 }
420
421 @Override
422 public final ChannelPromise voidPromise() {
423 return pipeline.voidPromise();
424 }
425
426
427
428
429 protected abstract class AbstractUnsafe implements Unsafe {
430
431 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
432 private RecvByteBufAllocator.Handle recvHandle;
433 private boolean inFlush0;
434
435 private boolean neverRegistered = true;
436
437 private void assertEventLoop() {
438 assert !registered || eventLoop.inEventLoop();
439 }
440
441 @Override
442 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
443 if (recvHandle == null) {
444 recvHandle = config().getRecvByteBufAllocator().newHandle();
445 }
446 return recvHandle;
447 }
448
449 @Override
450 public final ChannelOutboundBuffer outboundBuffer() {
451 return outboundBuffer;
452 }
453
454 @Override
455 public final SocketAddress localAddress() {
456 return localAddress0();
457 }
458
459 @Override
460 public final SocketAddress remoteAddress() {
461 return remoteAddress0();
462 }
463
464 @Override
465 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
466 ObjectUtil.checkNotNull(eventLoop, "eventLoop");
467 if (isRegistered()) {
468 promise.setFailure(new IllegalStateException("registered to an event loop already"));
469 return;
470 }
471 if (!isCompatible(eventLoop)) {
472 promise.setFailure(
473 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
474 return;
475 }
476
477 AbstractChannel.this.eventLoop = eventLoop;
478
479 if (eventLoop.inEventLoop()) {
480 register0(promise);
481 } else {
482 try {
483 eventLoop.execute(new Runnable() {
484 @Override
485 public void run() {
486 register0(promise);
487 }
488 });
489 } catch (Throwable t) {
490 logger.warn(
491 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
492 AbstractChannel.this, t);
493 closeForcibly();
494 closeFuture.setClosed();
495 safeSetFailure(promise, t);
496 }
497 }
498 }
499
500 private void register0(ChannelPromise promise) {
501 try {
502
503
504 if (!promise.setUncancellable() || !ensureOpen(promise)) {
505 return;
506 }
507 boolean firstRegistration = neverRegistered;
508 doRegister();
509 neverRegistered = false;
510 registered = true;
511
512
513
514 pipeline.invokeHandlerAddedIfNeeded();
515
516 safeSetSuccess(promise);
517 pipeline.fireChannelRegistered();
518
519
520 if (isActive()) {
521 if (firstRegistration) {
522 pipeline.fireChannelActive();
523 } else if (config().isAutoRead()) {
524
525
526
527
528 beginRead();
529 }
530 }
531 } catch (Throwable t) {
532
533 closeForcibly();
534 closeFuture.setClosed();
535 safeSetFailure(promise, t);
536 }
537 }
538
539 @Override
540 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
541 assertEventLoop();
542
543 if (!promise.setUncancellable() || !ensureOpen(promise)) {
544 return;
545 }
546
547
548 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
549 localAddress instanceof InetSocketAddress &&
550 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
551 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
552
553
554 logger.warn(
555 "A non-root user can't receive a broadcast packet if the socket " +
556 "is not bound to a wildcard address; binding to a non-wildcard " +
557 "address (" + localAddress + ") anyway as requested.");
558 }
559
560 boolean wasActive = isActive();
561 try {
562 doBind(localAddress);
563 } catch (Throwable t) {
564 safeSetFailure(promise, t);
565 closeIfClosed();
566 return;
567 }
568
569 if (!wasActive && isActive()) {
570 invokeLater(new Runnable() {
571 @Override
572 public void run() {
573 pipeline.fireChannelActive();
574 }
575 });
576 }
577
578 safeSetSuccess(promise);
579 }
580
581 @Override
582 public final void disconnect(final ChannelPromise promise) {
583 assertEventLoop();
584
585 if (!promise.setUncancellable()) {
586 return;
587 }
588
589 boolean wasActive = isActive();
590 try {
591 doDisconnect();
592
593 remoteAddress = null;
594 localAddress = null;
595 } catch (Throwable t) {
596 safeSetFailure(promise, t);
597 closeIfClosed();
598 return;
599 }
600
601 if (wasActive && !isActive()) {
602 invokeLater(new Runnable() {
603 @Override
604 public void run() {
605 pipeline.fireChannelInactive();
606 }
607 });
608 }
609
610 safeSetSuccess(promise);
611 closeIfClosed();
612 }
613
614 @Override
615 public void close(final ChannelPromise promise) {
616 assertEventLoop();
617
618 ClosedChannelException closedChannelException =
619 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
620 close(promise, closedChannelException, closedChannelException, false);
621 }
622
623
624
625
626
627 @UnstableApi
628 public final void shutdownOutput(final ChannelPromise promise) {
629 assertEventLoop();
630 shutdownOutput(promise, null);
631 }
632
633
634
635
636
637
638 private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
639 if (!promise.setUncancellable()) {
640 return;
641 }
642
643 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
644 if (outboundBuffer == null) {
645 promise.setFailure(new ClosedChannelException());
646 return;
647 }
648 this.outboundBuffer = null;
649
650 final Throwable shutdownCause = cause == null ?
651 new ChannelOutputShutdownException("Channel output shutdown") :
652 new ChannelOutputShutdownException("Channel output shutdown", cause);
653
654
655
656
657
658 try {
659
660
661 doShutdownOutput();
662 promise.setSuccess();
663 } catch (Throwable err) {
664 promise.setFailure(err);
665 } finally {
666 closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
667 }
668 }
669
670 private void closeOutboundBufferForShutdown(
671 ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {
672 buffer.failFlushed(cause, false);
673 buffer.close(cause, true);
674 pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
675 }
676
677 private void close(final ChannelPromise promise, final Throwable cause,
678 final ClosedChannelException closeCause, final boolean notify) {
679 if (!promise.setUncancellable()) {
680 return;
681 }
682
683 if (closeInitiated) {
684 if (closeFuture.isDone()) {
685
686 safeSetSuccess(promise);
687 } else if (!(promise instanceof VoidChannelPromise)) {
688
689 closeFuture.addListener(new ChannelFutureListener() {
690 @Override
691 public void operationComplete(ChannelFuture future) throws Exception {
692 promise.setSuccess();
693 }
694 });
695 }
696 return;
697 }
698
699 closeInitiated = true;
700
701 final boolean wasActive = isActive();
702 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
703 this.outboundBuffer = null;
704 Executor closeExecutor = prepareToClose();
705 if (closeExecutor != null) {
706 closeExecutor.execute(new Runnable() {
707 @Override
708 public void run() {
709 try {
710
711 doClose0(promise);
712 } finally {
713
714 invokeLater(new Runnable() {
715 @Override
716 public void run() {
717 if (outboundBuffer != null) {
718
719 outboundBuffer.failFlushed(cause, notify);
720 outboundBuffer.close(closeCause);
721 }
722 fireChannelInactiveAndDeregister(wasActive);
723 }
724 });
725 }
726 }
727 });
728 } else {
729 try {
730
731 doClose0(promise);
732 } finally {
733 if (outboundBuffer != null) {
734
735 outboundBuffer.failFlushed(cause, notify);
736 outboundBuffer.close(closeCause);
737 }
738 }
739 if (inFlush0) {
740 invokeLater(new Runnable() {
741 @Override
742 public void run() {
743 fireChannelInactiveAndDeregister(wasActive);
744 }
745 });
746 } else {
747 fireChannelInactiveAndDeregister(wasActive);
748 }
749 }
750 }
751
752 private void doClose0(ChannelPromise promise) {
753 try {
754 doClose();
755 closeFuture.setClosed();
756 safeSetSuccess(promise);
757 } catch (Throwable t) {
758 closeFuture.setClosed();
759 safeSetFailure(promise, t);
760 }
761 }
762
763 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
764 deregister(voidPromise(), wasActive && !isActive());
765 }
766
767 @Override
768 public final void closeForcibly() {
769 assertEventLoop();
770
771 try {
772 doClose();
773 } catch (Exception e) {
774 logger.warn("Failed to close a channel.", e);
775 }
776 }
777
778 @Override
779 public final void deregister(final ChannelPromise promise) {
780 assertEventLoop();
781
782 deregister(promise, false);
783 }
784
785 private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
786 if (!promise.setUncancellable()) {
787 return;
788 }
789
790 if (!registered) {
791 safeSetSuccess(promise);
792 return;
793 }
794
795
796
797
798
799
800
801
802
803
804 invokeLater(new Runnable() {
805 @Override
806 public void run() {
807 try {
808 doDeregister();
809 } catch (Throwable t) {
810 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
811 } finally {
812 if (fireChannelInactive) {
813 pipeline.fireChannelInactive();
814 }
815
816
817
818
819 if (registered) {
820 registered = false;
821 pipeline.fireChannelUnregistered();
822 }
823 safeSetSuccess(promise);
824 }
825 }
826 });
827 }
828
829 @Override
830 public final void beginRead() {
831 assertEventLoop();
832
833 try {
834 doBeginRead();
835 } catch (final Exception e) {
836 invokeLater(new Runnable() {
837 @Override
838 public void run() {
839 pipeline.fireExceptionCaught(e);
840 }
841 });
842 close(voidPromise());
843 }
844 }
845
846 @Override
847 public final void write(Object msg, ChannelPromise promise) {
848 assertEventLoop();
849
850 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
851 if (outboundBuffer == null) {
852 try {
853
854 ReferenceCountUtil.release(msg);
855 } finally {
856
857
858
859
860 safeSetFailure(promise,
861 newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
862 }
863 return;
864 }
865
866 int size;
867 try {
868 msg = filterOutboundMessage(msg);
869 size = pipeline.estimatorHandle().size(msg);
870 if (size < 0) {
871 size = 0;
872 }
873 } catch (Throwable t) {
874 try {
875 ReferenceCountUtil.release(msg);
876 } finally {
877 safeSetFailure(promise, t);
878 }
879 return;
880 }
881
882 outboundBuffer.addMessage(msg, size, promise);
883 }
884
885 @Override
886 public final void flush() {
887 assertEventLoop();
888
889 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
890 if (outboundBuffer == null) {
891 return;
892 }
893
894 outboundBuffer.addFlush();
895 flush0();
896 }
897
898 @SuppressWarnings("deprecation")
899 protected void flush0() {
900 if (inFlush0) {
901
902 return;
903 }
904
905 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
906 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
907 return;
908 }
909
910 inFlush0 = true;
911
912
913 if (!isActive()) {
914 try {
915
916 if (!outboundBuffer.isEmpty()) {
917 if (isOpen()) {
918 outboundBuffer.failFlushed(new NotYetConnectedException(), true);
919 } else {
920
921 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
922 }
923 }
924 } finally {
925 inFlush0 = false;
926 }
927 return;
928 }
929
930 try {
931 doWrite(outboundBuffer);
932 } catch (Throwable t) {
933 handleWriteError(t);
934 } finally {
935 inFlush0 = false;
936 }
937 }
938
939 protected final void handleWriteError(Throwable t) {
940 if (t instanceof IOException && config().isAutoClose()) {
941
942
943
944
945
946
947
948
949 initialCloseCause = t;
950 close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
951 } else {
952 try {
953 shutdownOutput(voidPromise(), t);
954 } catch (Throwable t2) {
955 initialCloseCause = t;
956 close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
957 }
958 }
959 }
960
961 private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
962 ClosedChannelException exception =
963 StacklessClosedChannelException.newInstance(AbstractChannel.AbstractUnsafe.class, method);
964 if (cause != null) {
965 exception.initCause(cause);
966 }
967 return exception;
968 }
969
970 @Override
971 public final ChannelPromise voidPromise() {
972 assertEventLoop();
973
974 return unsafeVoidPromise;
975 }
976
977 protected final boolean ensureOpen(ChannelPromise promise) {
978 if (isOpen()) {
979 return true;
980 }
981
982 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(ChannelPromise)"));
983 return false;
984 }
985
986
987
988
989 protected final void safeSetSuccess(ChannelPromise promise) {
990 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
991 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
992 }
993 }
994
995
996
997
998 protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
999 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
1000 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1001 }
1002 }
1003
1004 protected final void closeIfClosed() {
1005 if (isOpen()) {
1006 return;
1007 }
1008 close(voidPromise());
1009 }
1010
1011 private void invokeLater(Runnable task) {
1012 try {
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 eventLoop().execute(task);
1025 } catch (RejectedExecutionException e) {
1026 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1027 }
1028 }
1029
1030
1031
1032
1033 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1034 if (cause instanceof ConnectException) {
1035 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1036 }
1037 if (cause instanceof NoRouteToHostException) {
1038 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1039 }
1040 if (cause instanceof SocketException) {
1041 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1042 }
1043
1044 return cause;
1045 }
1046
1047
1048
1049
1050
1051
1052
1053 protected Executor prepareToClose() {
1054 return null;
1055 }
1056 }
1057
1058
1059
1060
1061 protected abstract boolean isCompatible(EventLoop loop);
1062
1063
1064
1065
1066 protected abstract SocketAddress localAddress0();
1067
1068
1069
1070
1071 protected abstract SocketAddress remoteAddress0();
1072
1073
1074
1075
1076
1077
1078 protected void doRegister() throws Exception {
1079
1080 }
1081
1082
1083
1084
1085 protected abstract void doBind(SocketAddress localAddress) throws Exception;
1086
1087
1088
1089
1090 protected abstract void doDisconnect() throws Exception;
1091
1092
1093
1094
1095 protected abstract void doClose() throws Exception;
1096
1097
1098
1099
1100
1101 @UnstableApi
1102 protected void doShutdownOutput() throws Exception {
1103 doClose();
1104 }
1105
1106
1107
1108
1109
1110
1111 protected void doDeregister() throws Exception {
1112
1113 }
1114
1115
1116
1117
1118 protected abstract void doBeginRead() throws Exception;
1119
1120
1121
1122
1123 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1124
1125
1126
1127
1128
1129 protected Object filterOutboundMessage(Object msg) throws Exception {
1130 return msg;
1131 }
1132
1133 protected void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1134 DefaultFileRegion.validate(region, position);
1135 }
1136
1137 static final class CloseFuture extends DefaultChannelPromise {
1138
1139 CloseFuture(AbstractChannel ch) {
1140 super(ch);
1141 }
1142
1143 @Override
1144 public ChannelPromise setSuccess() {
1145 throw new IllegalStateException();
1146 }
1147
1148 @Override
1149 public ChannelPromise setFailure(Throwable cause) {
1150 throw new IllegalStateException();
1151 }
1152
1153 @Override
1154 public boolean trySuccess() {
1155 throw new IllegalStateException();
1156 }
1157
1158 @Override
1159 public boolean tryFailure(Throwable cause) {
1160 throw new IllegalStateException();
1161 }
1162
1163 boolean setClosed() {
1164 return super.trySuccess();
1165 }
1166 }
1167
1168 private static final class AnnotatedConnectException extends ConnectException {
1169
1170 private static final long serialVersionUID = 3901958112696433556L;
1171
1172 AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1173 super(exception.getMessage() + ": " + remoteAddress);
1174 initCause(exception);
1175 }
1176
1177
1178 @Override
1179 public Throwable fillInStackTrace() {
1180 return this;
1181 }
1182 }
1183
1184 private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1185
1186 private static final long serialVersionUID = -6801433937592080623L;
1187
1188 AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1189 super(exception.getMessage() + ": " + remoteAddress);
1190 initCause(exception);
1191 }
1192
1193
1194 @Override
1195 public Throwable fillInStackTrace() {
1196 return this;
1197 }
1198 }
1199
1200 private static final class AnnotatedSocketException extends SocketException {
1201
1202 private static final long serialVersionUID = 3896743275010454039L;
1203
1204 AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1205 super(exception.getMessage() + ": " + remoteAddress);
1206 initCause(exception);
1207 }
1208
1209
1210 @Override
1211 public Throwable fillInStackTrace() {
1212 return this;
1213 }
1214 }
1215 }