1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.EventExecutorGroup;
22 import io.netty.util.concurrent.FastThreadLocal;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.StringUtil;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.net.SocketAddress;
29 import java.util.ArrayList;
30 import java.util.IdentityHashMap;
31 import java.util.Iterator;
32 import java.util.LinkedHashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NoSuchElementException;
36 import java.util.WeakHashMap;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39
40
41
42
43
44 public class DefaultChannelPipeline implements ChannelPipeline {
45
46 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
47
48 private static final String HEAD_NAME = generateName0(HeadContext.class);
49 private static final String TAIL_NAME = generateName0(TailContext.class);
50
51 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
52 new FastThreadLocal<Map<Class<?>, String>>() {
53 @Override
54 protected Map<Class<?>, String> initialValue() throws Exception {
55 return new WeakHashMap<Class<?>, String>();
56 }
57 };
58
59 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
60 AtomicReferenceFieldUpdater.newUpdater(
61 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
62 final AbstractChannelHandlerContext head;
63 final AbstractChannelHandlerContext tail;
64
65 private final Channel channel;
66 private Map<EventExecutorGroup, EventExecutor> childExecutors;
67 private volatile MessageSizeEstimator.Handle estimatorHandle;
68 private boolean firstRegistration = true;
69
70
71
72
73
74
75
76
77
78 private PendingHandlerCallback pendingHandlerCallbackHead;
79
80
81
82
83
84 private boolean registered;
85
86 protected DefaultChannelPipeline(Channel channel) {
87 this.channel = ObjectUtil.checkNotNull(channel, "channel");
88
89 tail = new TailContext(this);
90 head = new HeadContext(this);
91
92 head.next = tail;
93 tail.prev = head;
94 }
95
96 final MessageSizeEstimator.Handle estimatorHandle() {
97 MessageSizeEstimator.Handle handle = estimatorHandle;
98 if (handle == null) {
99 handle = channel.config().getMessageSizeEstimator().newHandle();
100 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
101 handle = estimatorHandle;
102 }
103 }
104 return handle;
105 }
106
107 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
108 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
109 }
110
111 private EventExecutor childExecutor(EventExecutorGroup group) {
112 if (group == null) {
113 return null;
114 }
115 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
116 if (pinEventExecutor != null && !pinEventExecutor) {
117 return group.next();
118 }
119 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
120 if (childExecutors == null) {
121
122 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
123 }
124
125
126 EventExecutor childExecutor = childExecutors.get(group);
127 if (childExecutor == null) {
128 childExecutor = group.next();
129 childExecutors.put(group, childExecutor);
130 }
131 return childExecutor;
132 }
133 @Override
134 public final Channel channel() {
135 return channel;
136 }
137
138 @Override
139 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
140 return addFirst(null, name, handler);
141 }
142
143 @Override
144 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
145 final AbstractChannelHandlerContext newCtx;
146 synchronized (this) {
147 checkMultiplicity(handler);
148 name = filterName(name, handler);
149
150 newCtx = newContext(group, name, handler);
151
152 addFirst0(newCtx);
153
154
155
156
157 if (!registered) {
158 newCtx.setAddPending();
159 callHandlerCallbackLater(newCtx, true);
160 return this;
161 }
162
163 EventExecutor executor = newCtx.executor();
164 if (!executor.inEventLoop()) {
165 newCtx.setAddPending();
166 executor.execute(new Runnable() {
167 @Override
168 public void run() {
169 callHandlerAdded0(newCtx);
170 }
171 });
172 return this;
173 }
174 }
175 callHandlerAdded0(newCtx);
176 return this;
177 }
178
179 private void addFirst0(AbstractChannelHandlerContext newCtx) {
180 AbstractChannelHandlerContext nextCtx = head.next;
181 newCtx.prev = head;
182 newCtx.next = nextCtx;
183 head.next = newCtx;
184 nextCtx.prev = newCtx;
185 }
186
187 @Override
188 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
189 return addLast(null, name, handler);
190 }
191
192 @Override
193 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
194 final AbstractChannelHandlerContext newCtx;
195 synchronized (this) {
196 checkMultiplicity(handler);
197
198 newCtx = newContext(group, filterName(name, handler), handler);
199
200 addLast0(newCtx);
201
202
203
204
205 if (!registered) {
206 newCtx.setAddPending();
207 callHandlerCallbackLater(newCtx, true);
208 return this;
209 }
210
211 EventExecutor executor = newCtx.executor();
212 if (!executor.inEventLoop()) {
213 newCtx.setAddPending();
214 executor.execute(new Runnable() {
215 @Override
216 public void run() {
217 callHandlerAdded0(newCtx);
218 }
219 });
220 return this;
221 }
222 }
223 callHandlerAdded0(newCtx);
224 return this;
225 }
226
227 private void addLast0(AbstractChannelHandlerContext newCtx) {
228 AbstractChannelHandlerContext prev = tail.prev;
229 newCtx.prev = prev;
230 newCtx.next = tail;
231 prev.next = newCtx;
232 tail.prev = newCtx;
233 }
234
235 @Override
236 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237 return addBefore(null, baseName, name, handler);
238 }
239
240 @Override
241 public final ChannelPipeline addBefore(
242 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243 final AbstractChannelHandlerContext newCtx;
244 final AbstractChannelHandlerContext ctx;
245 synchronized (this) {
246 checkMultiplicity(handler);
247 name = filterName(name, handler);
248 ctx = getContextOrDie(baseName);
249
250 newCtx = newContext(group, name, handler);
251
252 addBefore0(ctx, newCtx);
253
254
255
256
257 if (!registered) {
258 newCtx.setAddPending();
259 callHandlerCallbackLater(newCtx, true);
260 return this;
261 }
262
263 EventExecutor executor = newCtx.executor();
264 if (!executor.inEventLoop()) {
265 newCtx.setAddPending();
266 executor.execute(new Runnable() {
267 @Override
268 public void run() {
269 callHandlerAdded0(newCtx);
270 }
271 });
272 return this;
273 }
274 }
275 callHandlerAdded0(newCtx);
276 return this;
277 }
278
279 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
280 newCtx.prev = ctx.prev;
281 newCtx.next = ctx;
282 ctx.prev.next = newCtx;
283 ctx.prev = newCtx;
284 }
285
286 private String filterName(String name, ChannelHandler handler) {
287 if (name == null) {
288 return generateName(handler);
289 }
290 checkDuplicateName(name);
291 return name;
292 }
293
294 @Override
295 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
296 return addAfter(null, baseName, name, handler);
297 }
298
299 @Override
300 public final ChannelPipeline addAfter(
301 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
302 final AbstractChannelHandlerContext newCtx;
303 final AbstractChannelHandlerContext ctx;
304
305 synchronized (this) {
306 checkMultiplicity(handler);
307 name = filterName(name, handler);
308 ctx = getContextOrDie(baseName);
309
310 newCtx = newContext(group, name, handler);
311
312 addAfter0(ctx, newCtx);
313
314
315
316
317 if (!registered) {
318 newCtx.setAddPending();
319 callHandlerCallbackLater(newCtx, true);
320 return this;
321 }
322 EventExecutor executor = newCtx.executor();
323 if (!executor.inEventLoop()) {
324 newCtx.setAddPending();
325 executor.execute(new Runnable() {
326 @Override
327 public void run() {
328 callHandlerAdded0(newCtx);
329 }
330 });
331 return this;
332 }
333 }
334 callHandlerAdded0(newCtx);
335 return this;
336 }
337
338 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
339 newCtx.prev = ctx;
340 newCtx.next = ctx.next;
341 ctx.next.prev = newCtx;
342 ctx.next = newCtx;
343 }
344
345 @Override
346 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
347 return addFirst(null, handlers);
348 }
349
350 @Override
351 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
352 if (handlers == null) {
353 throw new NullPointerException("handlers");
354 }
355 if (handlers.length == 0 || handlers[0] == null) {
356 return this;
357 }
358
359 int size;
360 for (size = 1; size < handlers.length; size ++) {
361 if (handlers[size] == null) {
362 break;
363 }
364 }
365
366 for (int i = size - 1; i >= 0; i --) {
367 ChannelHandler h = handlers[i];
368 addFirst(executor, null, h);
369 }
370
371 return this;
372 }
373
374 @Override
375 public final ChannelPipeline addLast(ChannelHandler... handlers) {
376 return addLast(null, handlers);
377 }
378
379 @Override
380 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
381 if (handlers == null) {
382 throw new NullPointerException("handlers");
383 }
384
385 for (ChannelHandler h: handlers) {
386 if (h == null) {
387 break;
388 }
389 addLast(executor, null, h);
390 }
391
392 return this;
393 }
394
395 private String generateName(ChannelHandler handler) {
396 Map<Class<?>, String> cache = nameCaches.get();
397 Class<?> handlerType = handler.getClass();
398 String name = cache.get(handlerType);
399 if (name == null) {
400 name = generateName0(handlerType);
401 cache.put(handlerType, name);
402 }
403
404
405
406 if (context0(name) != null) {
407 String baseName = name.substring(0, name.length() - 1);
408 for (int i = 1;; i ++) {
409 String newName = baseName + i;
410 if (context0(newName) == null) {
411 name = newName;
412 break;
413 }
414 }
415 }
416 return name;
417 }
418
419 private static String generateName0(Class<?> handlerType) {
420 return StringUtil.simpleClassName(handlerType) + "#0";
421 }
422
423 @Override
424 public final ChannelPipeline remove(ChannelHandler handler) {
425 remove(getContextOrDie(handler));
426 return this;
427 }
428
429 @Override
430 public final ChannelHandler remove(String name) {
431 return remove(getContextOrDie(name)).handler();
432 }
433
434 @SuppressWarnings("unchecked")
435 @Override
436 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
437 return (T) remove(getContextOrDie(handlerType)).handler();
438 }
439
440 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
441 assert ctx != head && ctx != tail;
442
443 synchronized (this) {
444 remove0(ctx);
445
446
447
448
449 if (!registered) {
450 callHandlerCallbackLater(ctx, false);
451 return ctx;
452 }
453
454 EventExecutor executor = ctx.executor();
455 if (!executor.inEventLoop()) {
456 executor.execute(new Runnable() {
457 @Override
458 public void run() {
459 callHandlerRemoved0(ctx);
460 }
461 });
462 return ctx;
463 }
464 }
465 callHandlerRemoved0(ctx);
466 return ctx;
467 }
468
469 private static void remove0(AbstractChannelHandlerContext ctx) {
470 AbstractChannelHandlerContext prev = ctx.prev;
471 AbstractChannelHandlerContext next = ctx.next;
472 prev.next = next;
473 next.prev = prev;
474 }
475
476 @Override
477 public final ChannelHandler removeFirst() {
478 if (head.next == tail) {
479 throw new NoSuchElementException();
480 }
481 return remove(head.next).handler();
482 }
483
484 @Override
485 public final ChannelHandler removeLast() {
486 if (head.next == tail) {
487 throw new NoSuchElementException();
488 }
489 return remove(tail.prev).handler();
490 }
491
492 @Override
493 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
494 replace(getContextOrDie(oldHandler), newName, newHandler);
495 return this;
496 }
497
498 @Override
499 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
500 return replace(getContextOrDie(oldName), newName, newHandler);
501 }
502
503 @Override
504 @SuppressWarnings("unchecked")
505 public final <T extends ChannelHandler> T replace(
506 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
507 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
508 }
509
510 private ChannelHandler replace(
511 final AbstractChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
512 assert ctx != head && ctx != tail;
513
514 final AbstractChannelHandlerContext newCtx;
515 synchronized (this) {
516 checkMultiplicity(newHandler);
517 boolean sameName = ctx.name().equals(newName);
518 if (!sameName) {
519 checkDuplicateName(newName);
520 }
521
522 newCtx = newContext(ctx.executor, newName, newHandler);
523
524 replace0(ctx, newCtx);
525
526
527
528
529
530 if (!registered) {
531 callHandlerCallbackLater(newCtx, true);
532 callHandlerCallbackLater(ctx, false);
533 return ctx.handler();
534 }
535 EventExecutor executor = ctx.executor();
536 if (!executor.inEventLoop()) {
537 executor.execute(new Runnable() {
538 @Override
539 public void run() {
540
541
542
543 callHandlerAdded0(newCtx);
544 callHandlerRemoved0(ctx);
545 }
546 });
547 return ctx.handler();
548 }
549 }
550
551
552
553 callHandlerAdded0(newCtx);
554 callHandlerRemoved0(ctx);
555 return ctx.handler();
556 }
557
558 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
559 AbstractChannelHandlerContext prev = oldCtx.prev;
560 AbstractChannelHandlerContext next = oldCtx.next;
561 newCtx.prev = prev;
562 newCtx.next = next;
563
564
565
566
567
568 prev.next = newCtx;
569 next.prev = newCtx;
570
571
572 oldCtx.prev = newCtx;
573 oldCtx.next = newCtx;
574 }
575
576 private static void checkMultiplicity(ChannelHandler handler) {
577 if (handler instanceof ChannelHandlerAdapter) {
578 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
579 if (!h.isSharable() && h.added) {
580 throw new ChannelPipelineException(
581 h.getClass().getName() +
582 " is not a @Sharable handler, so can't be added or removed multiple times.");
583 }
584 h.added = true;
585 }
586 }
587
588 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
589 try {
590
591
592 ctx.setAddComplete();
593 ctx.handler().handlerAdded(ctx);
594 } catch (Throwable t) {
595 boolean removed = false;
596 try {
597 remove0(ctx);
598 try {
599 ctx.handler().handlerRemoved(ctx);
600 } finally {
601 ctx.setRemoved();
602 }
603 removed = true;
604 } catch (Throwable t2) {
605 if (logger.isWarnEnabled()) {
606 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
607 }
608 }
609
610 if (removed) {
611 fireExceptionCaught(new ChannelPipelineException(
612 ctx.handler().getClass().getName() +
613 ".handlerAdded() has thrown an exception; removed.", t));
614 } else {
615 fireExceptionCaught(new ChannelPipelineException(
616 ctx.handler().getClass().getName() +
617 ".handlerAdded() has thrown an exception; also failed to remove.", t));
618 }
619 }
620 }
621
622 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
623
624 try {
625 try {
626 ctx.handler().handlerRemoved(ctx);
627 } finally {
628 ctx.setRemoved();
629 }
630 } catch (Throwable t) {
631 fireExceptionCaught(new ChannelPipelineException(
632 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
633 }
634 }
635
636 final void invokeHandlerAddedIfNeeded() {
637 assert channel.eventLoop().inEventLoop();
638 if (firstRegistration) {
639 firstRegistration = false;
640
641
642 callHandlerAddedForAllHandlers();
643 }
644 }
645
646 @Override
647 public final ChannelHandler first() {
648 ChannelHandlerContext first = firstContext();
649 if (first == null) {
650 return null;
651 }
652 return first.handler();
653 }
654
655 @Override
656 public final ChannelHandlerContext firstContext() {
657 AbstractChannelHandlerContext first = head.next;
658 if (first == tail) {
659 return null;
660 }
661 return head.next;
662 }
663
664 @Override
665 public final ChannelHandler last() {
666 AbstractChannelHandlerContext last = tail.prev;
667 if (last == head) {
668 return null;
669 }
670 return last.handler();
671 }
672
673 @Override
674 public final ChannelHandlerContext lastContext() {
675 AbstractChannelHandlerContext last = tail.prev;
676 if (last == head) {
677 return null;
678 }
679 return last;
680 }
681
682 @Override
683 public final ChannelHandler get(String name) {
684 ChannelHandlerContext ctx = context(name);
685 if (ctx == null) {
686 return null;
687 } else {
688 return ctx.handler();
689 }
690 }
691
692 @SuppressWarnings("unchecked")
693 @Override
694 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
695 ChannelHandlerContext ctx = context(handlerType);
696 if (ctx == null) {
697 return null;
698 } else {
699 return (T) ctx.handler();
700 }
701 }
702
703 @Override
704 public final ChannelHandlerContext context(String name) {
705 if (name == null) {
706 throw new NullPointerException("name");
707 }
708
709 return context0(name);
710 }
711
712 @Override
713 public final ChannelHandlerContext context(ChannelHandler handler) {
714 if (handler == null) {
715 throw new NullPointerException("handler");
716 }
717
718 AbstractChannelHandlerContext ctx = head.next;
719 for (;;) {
720
721 if (ctx == null) {
722 return null;
723 }
724
725 if (ctx.handler() == handler) {
726 return ctx;
727 }
728
729 ctx = ctx.next;
730 }
731 }
732
733 @Override
734 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
735 if (handlerType == null) {
736 throw new NullPointerException("handlerType");
737 }
738
739 AbstractChannelHandlerContext ctx = head.next;
740 for (;;) {
741 if (ctx == null) {
742 return null;
743 }
744 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
745 return ctx;
746 }
747 ctx = ctx.next;
748 }
749 }
750
751 @Override
752 public final List<String> names() {
753 List<String> list = new ArrayList<String>();
754 AbstractChannelHandlerContext ctx = head.next;
755 for (;;) {
756 if (ctx == null) {
757 return list;
758 }
759 list.add(ctx.name());
760 ctx = ctx.next;
761 }
762 }
763
764 @Override
765 public final Map<String, ChannelHandler> toMap() {
766 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
767 AbstractChannelHandlerContext ctx = head.next;
768 for (;;) {
769 if (ctx == tail) {
770 return map;
771 }
772 map.put(ctx.name(), ctx.handler());
773 ctx = ctx.next;
774 }
775 }
776
777 @Override
778 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
779 return toMap().entrySet().iterator();
780 }
781
782
783
784
785 @Override
786 public final String toString() {
787 StringBuilder buf = new StringBuilder()
788 .append(StringUtil.simpleClassName(this))
789 .append('{');
790 AbstractChannelHandlerContext ctx = head.next;
791 for (;;) {
792 if (ctx == tail) {
793 break;
794 }
795
796 buf.append('(')
797 .append(ctx.name())
798 .append(" = ")
799 .append(ctx.handler().getClass().getName())
800 .append(')');
801
802 ctx = ctx.next;
803 if (ctx == tail) {
804 break;
805 }
806
807 buf.append(", ");
808 }
809 buf.append('}');
810 return buf.toString();
811 }
812
813 @Override
814 public final ChannelPipeline fireChannelRegistered() {
815 AbstractChannelHandlerContext.invokeChannelRegistered(head);
816 return this;
817 }
818
819 @Override
820 public final ChannelPipeline fireChannelUnregistered() {
821 AbstractChannelHandlerContext.invokeChannelUnregistered(head);
822 return this;
823 }
824
825
826
827
828
829
830
831
832
833
834
835 private synchronized void destroy() {
836 destroyUp(head.next, false);
837 }
838
839 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
840 final Thread currentThread = Thread.currentThread();
841 final AbstractChannelHandlerContext tail = this.tail;
842 for (;;) {
843 if (ctx == tail) {
844 destroyDown(currentThread, tail.prev, inEventLoop);
845 break;
846 }
847
848 final EventExecutor executor = ctx.executor();
849 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
850 final AbstractChannelHandlerContext finalCtx = ctx;
851 executor.execute(new Runnable() {
852 @Override
853 public void run() {
854 destroyUp(finalCtx, true);
855 }
856 });
857 break;
858 }
859
860 ctx = ctx.next;
861 inEventLoop = false;
862 }
863 }
864
865 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
866
867 final AbstractChannelHandlerContext head = this.head;
868 for (;;) {
869 if (ctx == head) {
870 break;
871 }
872
873 final EventExecutor executor = ctx.executor();
874 if (inEventLoop || executor.inEventLoop(currentThread)) {
875 synchronized (this) {
876 remove0(ctx);
877 }
878 callHandlerRemoved0(ctx);
879 } else {
880 final AbstractChannelHandlerContext finalCtx = ctx;
881 executor.execute(new Runnable() {
882 @Override
883 public void run() {
884 destroyDown(Thread.currentThread(), finalCtx, true);
885 }
886 });
887 break;
888 }
889
890 ctx = ctx.prev;
891 inEventLoop = false;
892 }
893 }
894
895 @Override
896 public final ChannelPipeline fireChannelActive() {
897 AbstractChannelHandlerContext.invokeChannelActive(head);
898 return this;
899 }
900
901 @Override
902 public final ChannelPipeline fireChannelInactive() {
903 AbstractChannelHandlerContext.invokeChannelInactive(head);
904 return this;
905 }
906
907 @Override
908 public final ChannelPipeline fireExceptionCaught(Throwable cause) {
909 AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
910 return this;
911 }
912
913 @Override
914 public final ChannelPipeline fireUserEventTriggered(Object event) {
915 AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
916 return this;
917 }
918
919 @Override
920 public final ChannelPipeline fireChannelRead(Object msg) {
921 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
922 return this;
923 }
924
925 @Override
926 public final ChannelPipeline fireChannelReadComplete() {
927 AbstractChannelHandlerContext.invokeChannelReadComplete(head);
928 return this;
929 }
930
931 @Override
932 public final ChannelPipeline fireChannelWritabilityChanged() {
933 AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
934 return this;
935 }
936
937 @Override
938 public final ChannelFuture bind(SocketAddress localAddress) {
939 return tail.bind(localAddress);
940 }
941
942 @Override
943 public final ChannelFuture connect(SocketAddress remoteAddress) {
944 return tail.connect(remoteAddress);
945 }
946
947 @Override
948 public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
949 return tail.connect(remoteAddress, localAddress);
950 }
951
952 @Override
953 public final ChannelFuture disconnect() {
954 return tail.disconnect();
955 }
956
957 @Override
958 public final ChannelFuture close() {
959 return tail.close();
960 }
961
962 @Override
963 public final ChannelFuture deregister() {
964 return tail.deregister();
965 }
966
967 @Override
968 public final ChannelPipeline flush() {
969 tail.flush();
970 return this;
971 }
972
973 @Override
974 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
975 return tail.bind(localAddress, promise);
976 }
977
978 @Override
979 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
980 return tail.connect(remoteAddress, promise);
981 }
982
983 @Override
984 public final ChannelFuture connect(
985 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
986 return tail.connect(remoteAddress, localAddress, promise);
987 }
988
989 @Override
990 public final ChannelFuture disconnect(ChannelPromise promise) {
991 return tail.disconnect(promise);
992 }
993
994 @Override
995 public final ChannelFuture close(ChannelPromise promise) {
996 return tail.close(promise);
997 }
998
999 @Override
1000 public final ChannelFuture deregister(final ChannelPromise promise) {
1001 return tail.deregister(promise);
1002 }
1003
1004 @Override
1005 public final ChannelPipeline read() {
1006 tail.read();
1007 return this;
1008 }
1009
1010 @Override
1011 public final ChannelFuture write(Object msg) {
1012 return tail.write(msg);
1013 }
1014
1015 @Override
1016 public final ChannelFuture write(Object msg, ChannelPromise promise) {
1017 return tail.write(msg, promise);
1018 }
1019
1020 @Override
1021 public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1022 return tail.writeAndFlush(msg, promise);
1023 }
1024
1025 @Override
1026 public final ChannelFuture writeAndFlush(Object msg) {
1027 return tail.writeAndFlush(msg);
1028 }
1029
1030 private void checkDuplicateName(String name) {
1031 if (context0(name) != null) {
1032 throw new IllegalArgumentException("Duplicate handler name: " + name);
1033 }
1034 }
1035
1036 private AbstractChannelHandlerContext context0(String name) {
1037 AbstractChannelHandlerContext context = head.next;
1038 while (context != tail) {
1039 if (context.name().equals(name)) {
1040 return context;
1041 }
1042 context = context.next;
1043 }
1044 return null;
1045 }
1046
1047 private AbstractChannelHandlerContext getContextOrDie(String name) {
1048 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1049 if (ctx == null) {
1050 throw new NoSuchElementException(name);
1051 } else {
1052 return ctx;
1053 }
1054 }
1055
1056 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1057 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1058 if (ctx == null) {
1059 throw new NoSuchElementException(handler.getClass().getName());
1060 } else {
1061 return ctx;
1062 }
1063 }
1064
1065 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1066 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1067 if (ctx == null) {
1068 throw new NoSuchElementException(handlerType.getName());
1069 } else {
1070 return ctx;
1071 }
1072 }
1073
1074 private void callHandlerAddedForAllHandlers() {
1075 final PendingHandlerCallback pendingHandlerCallbackHead;
1076 synchronized (this) {
1077 assert !registered;
1078
1079
1080 registered = true;
1081
1082 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1083
1084 this.pendingHandlerCallbackHead = null;
1085 }
1086
1087
1088
1089
1090 PendingHandlerCallback task = pendingHandlerCallbackHead;
1091 while (task != null) {
1092 task.execute();
1093 task = task.next;
1094 }
1095 }
1096
1097 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1098 assert !registered;
1099
1100 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1101 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1102 if (pending == null) {
1103 pendingHandlerCallbackHead = task;
1104 } else {
1105
1106 while (pending.next != null) {
1107 pending = pending.next;
1108 }
1109 pending.next = task;
1110 }
1111 }
1112
1113
1114
1115
1116
1117 protected void onUnhandledInboundException(Throwable cause) {
1118 try {
1119 logger.warn(
1120 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1121 "It usually means the last handler in the pipeline did not handle the exception.",
1122 cause);
1123 } finally {
1124 ReferenceCountUtil.release(cause);
1125 }
1126 }
1127
1128
1129
1130
1131
1132
1133 protected void onUnhandledInboundMessage(Object msg) {
1134 try {
1135 logger.debug(
1136 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1137 "Please check your pipeline configuration.", msg);
1138 } finally {
1139 ReferenceCountUtil.release(msg);
1140 }
1141 }
1142
1143
1144 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1145
1146 TailContext(DefaultChannelPipeline pipeline) {
1147 super(pipeline, null, TAIL_NAME, true, false);
1148 setAddComplete();
1149 }
1150
1151 @Override
1152 public ChannelHandler handler() {
1153 return this;
1154 }
1155
1156 @Override
1157 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1158
1159 @Override
1160 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1161
1162 @Override
1163 public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1164
1165 @Override
1166 public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1167
1168 @Override
1169 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1170
1171 @Override
1172 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1173
1174 @Override
1175 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1176
1177 @Override
1178 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1179
1180
1181 ReferenceCountUtil.release(evt);
1182 }
1183
1184 @Override
1185 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1186 onUnhandledInboundException(cause);
1187 }
1188
1189 @Override
1190 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1191 onUnhandledInboundMessage(msg);
1192 }
1193
1194 @Override
1195 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1196 }
1197
1198 final class HeadContext extends AbstractChannelHandlerContext
1199 implements ChannelOutboundHandler, ChannelInboundHandler {
1200
1201 private final Unsafe unsafe;
1202
1203 HeadContext(DefaultChannelPipeline pipeline) {
1204 super(pipeline, null, HEAD_NAME, false, true);
1205 unsafe = pipeline.channel().unsafe();
1206 setAddComplete();
1207 }
1208
1209 @Override
1210 public ChannelHandler handler() {
1211 return this;
1212 }
1213
1214 @Override
1215 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1216
1217 }
1218
1219 @Override
1220 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
1221
1222 }
1223
1224 @Override
1225 public void bind(
1226 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1227 throws Exception {
1228 unsafe.bind(localAddress, promise);
1229 }
1230
1231 @Override
1232 public void connect(
1233 ChannelHandlerContext ctx,
1234 SocketAddress remoteAddress, SocketAddress localAddress,
1235 ChannelPromise promise) throws Exception {
1236 unsafe.connect(remoteAddress, localAddress, promise);
1237 }
1238
1239 @Override
1240 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1241 unsafe.disconnect(promise);
1242 }
1243
1244 @Override
1245 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1246 unsafe.close(promise);
1247 }
1248
1249 @Override
1250 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1251 unsafe.deregister(promise);
1252 }
1253
1254 @Override
1255 public void read(ChannelHandlerContext ctx) {
1256 unsafe.beginRead();
1257 }
1258
1259 @Override
1260 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1261 unsafe.write(msg, promise);
1262 }
1263
1264 @Override
1265 public void flush(ChannelHandlerContext ctx) throws Exception {
1266 unsafe.flush();
1267 }
1268
1269 @Override
1270 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1271 ctx.fireExceptionCaught(cause);
1272 }
1273
1274 @Override
1275 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1276 invokeHandlerAddedIfNeeded();
1277 ctx.fireChannelRegistered();
1278 }
1279
1280 @Override
1281 public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1282 ctx.fireChannelUnregistered();
1283
1284
1285 if (!channel.isOpen()) {
1286 destroy();
1287 }
1288 }
1289
1290 @Override
1291 public void channelActive(ChannelHandlerContext ctx) throws Exception {
1292 ctx.fireChannelActive();
1293
1294 readIfIsAutoRead();
1295 }
1296
1297 @Override
1298 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1299 ctx.fireChannelInactive();
1300 }
1301
1302 @Override
1303 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1304 ctx.fireChannelRead(msg);
1305 }
1306
1307 @Override
1308 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1309 ctx.fireChannelReadComplete();
1310
1311 readIfIsAutoRead();
1312 }
1313
1314 private void readIfIsAutoRead() {
1315 if (channel.config().isAutoRead()) {
1316 channel.read();
1317 }
1318 }
1319
1320 @Override
1321 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1322 ctx.fireUserEventTriggered(evt);
1323 }
1324
1325 @Override
1326 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1327 ctx.fireChannelWritabilityChanged();
1328 }
1329 }
1330
1331 private abstract static class PendingHandlerCallback implements Runnable {
1332 final AbstractChannelHandlerContext ctx;
1333 PendingHandlerCallback next;
1334
1335 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1336 this.ctx = ctx;
1337 }
1338
1339 abstract void execute();
1340 }
1341
1342 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1343
1344 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1345 super(ctx);
1346 }
1347
1348 @Override
1349 public void run() {
1350 callHandlerAdded0(ctx);
1351 }
1352
1353 @Override
1354 void execute() {
1355 EventExecutor executor = ctx.executor();
1356 if (executor.inEventLoop()) {
1357 callHandlerAdded0(ctx);
1358 } else {
1359 try {
1360 executor.execute(this);
1361 } catch (RejectedExecutionException e) {
1362 if (logger.isWarnEnabled()) {
1363 logger.warn(
1364 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1365 executor, ctx.name(), e);
1366 }
1367 remove0(ctx);
1368 ctx.setRemoved();
1369 }
1370 }
1371 }
1372 }
1373
1374 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1375
1376 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1377 super(ctx);
1378 }
1379
1380 @Override
1381 public void run() {
1382 callHandlerRemoved0(ctx);
1383 }
1384
1385 @Override
1386 void execute() {
1387 EventExecutor executor = ctx.executor();
1388 if (executor.inEventLoop()) {
1389 callHandlerRemoved0(ctx);
1390 } else {
1391 try {
1392 executor.execute(this);
1393 } catch (RejectedExecutionException e) {
1394 if (logger.isWarnEnabled()) {
1395 logger.warn(
1396 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1397 " removing handler {}.", executor, ctx.name(), e);
1398 }
1399
1400 ctx.setRemoved();
1401 }
1402 }
1403 }
1404 }
1405 }