1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.ResourceLeakDetector;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.EventExecutorGroup;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.StringUtil;
26 import io.netty.util.internal.UnstableApi;
27 import io.netty.util.internal.logging.InternalLogger;
28 import io.netty.util.internal.logging.InternalLoggerFactory;
29
30 import java.net.SocketAddress;
31 import java.util.ArrayList;
32 import java.util.IdentityHashMap;
33 import java.util.Iterator;
34 import java.util.LinkedHashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.NoSuchElementException;
38 import java.util.WeakHashMap;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41
42
43
44
45
46 public class DefaultChannelPipeline implements ChannelPipeline {
47
48 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49
50 private static final String HEAD_NAME = generateName0(HeadContext.class);
51 private static final String TAIL_NAME = generateName0(TailContext.class);
52
53 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
54 new FastThreadLocal<Map<Class<?>, String>>() {
55 @Override
56 protected Map<Class<?>, String> initialValue() {
57 return new WeakHashMap<Class<?>, String>();
58 }
59 };
60
61 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
62 AtomicReferenceFieldUpdater.newUpdater(
63 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
64 final HeadContext head;
65 final TailContext tail;
66
67 private final Channel channel;
68 private final ChannelFuture succeededFuture;
69 private final VoidChannelPromise voidPromise;
70 private final boolean touch = ResourceLeakDetector.isEnabled();
71
72 private Map<EventExecutorGroup, EventExecutor> childExecutors;
73 private volatile MessageSizeEstimator.Handle estimatorHandle;
74 private boolean firstRegistration = true;
75
76
77
78
79
80
81
82
83
84 private PendingHandlerCallback pendingHandlerCallbackHead;
85
86
87
88
89
90 private boolean registered;
91
92 protected DefaultChannelPipeline(Channel channel) {
93 this.channel = ObjectUtil.checkNotNull(channel, "channel");
94 succeededFuture = new SucceededChannelFuture(channel, null);
95 voidPromise = new VoidChannelPromise(channel, true);
96
97 tail = new TailContext(this);
98 head = new HeadContext(this);
99
100 head.next = tail;
101 tail.prev = head;
102 }
103
104 final MessageSizeEstimator.Handle estimatorHandle() {
105 MessageSizeEstimator.Handle handle = estimatorHandle;
106 if (handle == null) {
107 handle = channel.config().getMessageSizeEstimator().newHandle();
108 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
109 handle = estimatorHandle;
110 }
111 }
112 return handle;
113 }
114
115 final Object touch(Object msg, AbstractChannelHandlerContext next) {
116 return touch ? ReferenceCountUtil.touch(msg, next) : msg;
117 }
118
119 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
120 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
121 }
122
123 private EventExecutor childExecutor(EventExecutorGroup group) {
124 if (group == null) {
125 return null;
126 }
127 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
128 if (pinEventExecutor != null && !pinEventExecutor) {
129 return group.next();
130 }
131 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
132 if (childExecutors == null) {
133
134 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
135 }
136
137
138 EventExecutor childExecutor = childExecutors.get(group);
139 if (childExecutor == null) {
140 childExecutor = group.next();
141 childExecutors.put(group, childExecutor);
142 }
143 return childExecutor;
144 }
145 @Override
146 public final Channel channel() {
147 return channel;
148 }
149
150 @Override
151 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
152 return addFirst(null, name, handler);
153 }
154
155 @Override
156 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
157 final AbstractChannelHandlerContext newCtx;
158 synchronized (this) {
159 checkMultiplicity(handler);
160 name = filterName(name, handler);
161
162 newCtx = newContext(group, name, handler);
163
164 addFirst0(newCtx);
165
166
167
168
169 if (!registered) {
170 newCtx.setAddPending();
171 callHandlerCallbackLater(newCtx, true);
172 return this;
173 }
174
175 EventExecutor executor = newCtx.executor();
176 if (!executor.inEventLoop()) {
177 callHandlerAddedInEventLoop(newCtx, executor);
178 return this;
179 }
180 }
181 callHandlerAdded0(newCtx);
182 return this;
183 }
184
185 private void addFirst0(AbstractChannelHandlerContext newCtx) {
186 AbstractChannelHandlerContext nextCtx = head.next;
187 newCtx.prev = head;
188 newCtx.next = nextCtx;
189 head.next = newCtx;
190 nextCtx.prev = newCtx;
191 }
192
193 @Override
194 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
195 return addLast(null, name, handler);
196 }
197
198 @Override
199 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
200 final AbstractChannelHandlerContext newCtx;
201 synchronized (this) {
202 checkMultiplicity(handler);
203
204 newCtx = newContext(group, filterName(name, handler), handler);
205
206 addLast0(newCtx);
207
208
209
210
211 if (!registered) {
212 newCtx.setAddPending();
213 callHandlerCallbackLater(newCtx, true);
214 return this;
215 }
216
217 EventExecutor executor = newCtx.executor();
218 if (!executor.inEventLoop()) {
219 callHandlerAddedInEventLoop(newCtx, executor);
220 return this;
221 }
222 }
223 callHandlerAdded0(newCtx);
224 return this;
225 }
226
227 private void addLast0(AbstractChannelHandlerContext newCtx) {
228 AbstractChannelHandlerContext prev = tail.prev;
229 newCtx.prev = prev;
230 newCtx.next = tail;
231 prev.next = newCtx;
232 tail.prev = newCtx;
233 }
234
235 @Override
236 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237 return addBefore(null, baseName, name, handler);
238 }
239
240 @Override
241 public final ChannelPipeline addBefore(
242 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243 final AbstractChannelHandlerContext newCtx;
244 final AbstractChannelHandlerContext ctx;
245 synchronized (this) {
246 checkMultiplicity(handler);
247 name = filterName(name, handler);
248 ctx = getContextOrDie(baseName);
249
250 newCtx = newContext(group, name, handler);
251
252 addBefore0(ctx, newCtx);
253
254
255
256
257 if (!registered) {
258 newCtx.setAddPending();
259 callHandlerCallbackLater(newCtx, true);
260 return this;
261 }
262
263 EventExecutor executor = newCtx.executor();
264 if (!executor.inEventLoop()) {
265 callHandlerAddedInEventLoop(newCtx, executor);
266 return this;
267 }
268 }
269 callHandlerAdded0(newCtx);
270 return this;
271 }
272
273 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
274 newCtx.prev = ctx.prev;
275 newCtx.next = ctx;
276 ctx.prev.next = newCtx;
277 ctx.prev = newCtx;
278 }
279
280 private String filterName(String name, ChannelHandler handler) {
281 if (name == null) {
282 return generateName(handler);
283 }
284 checkDuplicateName(name);
285 return name;
286 }
287
288 @Override
289 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
290 return addAfter(null, baseName, name, handler);
291 }
292
293 @Override
294 public final ChannelPipeline addAfter(
295 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
296 final AbstractChannelHandlerContext newCtx;
297 final AbstractChannelHandlerContext ctx;
298
299 synchronized (this) {
300 checkMultiplicity(handler);
301 name = filterName(name, handler);
302 ctx = getContextOrDie(baseName);
303
304 newCtx = newContext(group, name, handler);
305
306 addAfter0(ctx, newCtx);
307
308
309
310
311 if (!registered) {
312 newCtx.setAddPending();
313 callHandlerCallbackLater(newCtx, true);
314 return this;
315 }
316 EventExecutor executor = newCtx.executor();
317 if (!executor.inEventLoop()) {
318 callHandlerAddedInEventLoop(newCtx, executor);
319 return this;
320 }
321 }
322 callHandlerAdded0(newCtx);
323 return this;
324 }
325
326 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
327 newCtx.prev = ctx;
328 newCtx.next = ctx.next;
329 ctx.next.prev = newCtx;
330 ctx.next = newCtx;
331 }
332
333 public final ChannelPipeline addFirst(ChannelHandler handler) {
334 return addFirst(null, handler);
335 }
336
337 @Override
338 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
339 return addFirst(null, handlers);
340 }
341
342 @Override
343 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
344 ObjectUtil.checkNotNull(handlers, "handlers");
345 if (handlers.length == 0 || handlers[0] == null) {
346 return this;
347 }
348
349 int size;
350 for (size = 1; size < handlers.length; size ++) {
351 if (handlers[size] == null) {
352 break;
353 }
354 }
355
356 for (int i = size - 1; i >= 0; i --) {
357 ChannelHandler h = handlers[i];
358 addFirst(executor, null, h);
359 }
360
361 return this;
362 }
363
364 public final ChannelPipeline addLast(ChannelHandler handler) {
365 return addLast(null, handler);
366 }
367
368 @Override
369 public final ChannelPipeline addLast(ChannelHandler... handlers) {
370 return addLast(null, handlers);
371 }
372
373 @Override
374 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
375 ObjectUtil.checkNotNull(handlers, "handlers");
376
377 for (ChannelHandler h: handlers) {
378 if (h == null) {
379 break;
380 }
381 addLast(executor, null, h);
382 }
383
384 return this;
385 }
386
387 private String generateName(ChannelHandler handler) {
388 Map<Class<?>, String> cache = nameCaches.get();
389 Class<?> handlerType = handler.getClass();
390 String name = cache.get(handlerType);
391 if (name == null) {
392 name = generateName0(handlerType);
393 cache.put(handlerType, name);
394 }
395
396
397
398 if (context0(name) != null) {
399 String baseName = name.substring(0, name.length() - 1);
400 for (int i = 1;; i ++) {
401 String newName = baseName + i;
402 if (context0(newName) == null) {
403 name = newName;
404 break;
405 }
406 }
407 }
408 return name;
409 }
410
411 private static String generateName0(Class<?> handlerType) {
412 return StringUtil.simpleClassName(handlerType) + "#0";
413 }
414
415 @Override
416 public final ChannelPipeline remove(ChannelHandler handler) {
417 remove(getContextOrDie(handler));
418 return this;
419 }
420
421 @Override
422 public final ChannelHandler remove(String name) {
423 return remove(getContextOrDie(name)).handler();
424 }
425
426 @SuppressWarnings("unchecked")
427 @Override
428 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
429 return (T) remove(getContextOrDie(handlerType)).handler();
430 }
431
432 public final <T extends ChannelHandler> T removeIfExists(String name) {
433 return removeIfExists(context(name));
434 }
435
436 public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
437 return removeIfExists(context(handlerType));
438 }
439
440 public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
441 return removeIfExists(context(handler));
442 }
443
444 @SuppressWarnings("unchecked")
445 private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
446 if (ctx == null) {
447 return null;
448 }
449 return (T) remove((AbstractChannelHandlerContext) ctx).handler();
450 }
451
452 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
453 assert ctx != head && ctx != tail;
454
455 synchronized (this) {
456 atomicRemoveFromHandlerList(ctx);
457
458
459
460
461 if (!registered) {
462 callHandlerCallbackLater(ctx, false);
463 return ctx;
464 }
465
466 EventExecutor executor = ctx.executor();
467 if (!executor.inEventLoop()) {
468 executor.execute(new Runnable() {
469 @Override
470 public void run() {
471 callHandlerRemoved0(ctx);
472 }
473 });
474 return ctx;
475 }
476 }
477 callHandlerRemoved0(ctx);
478 return ctx;
479 }
480
481
482
483
484 private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
485 AbstractChannelHandlerContext prev = ctx.prev;
486 AbstractChannelHandlerContext next = ctx.next;
487 prev.next = next;
488 next.prev = prev;
489 }
490
491 @Override
492 public final ChannelHandler removeFirst() {
493 if (head.next == tail) {
494 throw new NoSuchElementException();
495 }
496 return remove(head.next).handler();
497 }
498
499 @Override
500 public final ChannelHandler removeLast() {
501 if (head.next == tail) {
502 throw new NoSuchElementException();
503 }
504 return remove(tail.prev).handler();
505 }
506
507 @Override
508 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
509 replace(getContextOrDie(oldHandler), newName, newHandler);
510 return this;
511 }
512
513 @Override
514 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
515 return replace(getContextOrDie(oldName), newName, newHandler);
516 }
517
518 @Override
519 @SuppressWarnings("unchecked")
520 public final <T extends ChannelHandler> T replace(
521 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
522 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
523 }
524
525 private ChannelHandler replace(
526 final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
527 assert ctx != head && ctx != tail;
528
529 final AbstractChannelHandlerContext newCtx;
530 synchronized (this) {
531 checkMultiplicity(newHandler);
532 if (newName == null) {
533 newName = generateName(newHandler);
534 } else {
535 boolean sameName = ctx.name().equals(newName);
536 if (!sameName) {
537 checkDuplicateName(newName);
538 }
539 }
540
541 newCtx = newContext(ctx.executor, newName, newHandler);
542
543 replace0(ctx, newCtx);
544
545
546
547
548
549 if (!registered) {
550 callHandlerCallbackLater(newCtx, true);
551 callHandlerCallbackLater(ctx, false);
552 return ctx.handler();
553 }
554 EventExecutor executor = ctx.executor();
555 if (!executor.inEventLoop()) {
556 executor.execute(new Runnable() {
557 @Override
558 public void run() {
559
560
561
562 callHandlerAdded0(newCtx);
563 callHandlerRemoved0(ctx);
564 }
565 });
566 return ctx.handler();
567 }
568 }
569
570
571
572 callHandlerAdded0(newCtx);
573 callHandlerRemoved0(ctx);
574 return ctx.handler();
575 }
576
577 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
578 AbstractChannelHandlerContext prev = oldCtx.prev;
579 AbstractChannelHandlerContext next = oldCtx.next;
580 newCtx.prev = prev;
581 newCtx.next = next;
582
583
584
585
586
587 prev.next = newCtx;
588 next.prev = newCtx;
589
590
591 oldCtx.prev = newCtx;
592 oldCtx.next = newCtx;
593 }
594
595 private static void checkMultiplicity(ChannelHandler handler) {
596 if (handler instanceof ChannelHandlerAdapter) {
597 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
598 if (!h.isSharable() && h.added) {
599 throw new ChannelPipelineException(
600 h.getClass().getName() +
601 " is not a @Sharable handler, so can't be added or removed multiple times.");
602 }
603 h.added = true;
604 }
605 }
606
607 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
608 try {
609 ctx.callHandlerAdded();
610 } catch (Throwable t) {
611 boolean removed = false;
612 try {
613 atomicRemoveFromHandlerList(ctx);
614 ctx.callHandlerRemoved();
615 removed = true;
616 } catch (Throwable t2) {
617 if (logger.isWarnEnabled()) {
618 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
619 }
620 }
621
622 if (removed) {
623 fireExceptionCaught(new ChannelPipelineException(
624 ctx.handler().getClass().getName() +
625 ".handlerAdded() has thrown an exception; removed.", t));
626 } else {
627 fireExceptionCaught(new ChannelPipelineException(
628 ctx.handler().getClass().getName() +
629 ".handlerAdded() has thrown an exception; also failed to remove.", t));
630 }
631 }
632 }
633
634 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
635
636 try {
637 ctx.callHandlerRemoved();
638 } catch (Throwable t) {
639 fireExceptionCaught(new ChannelPipelineException(
640 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
641 }
642 }
643
644 final void invokeHandlerAddedIfNeeded() {
645 assert channel.eventLoop().inEventLoop();
646 if (firstRegistration) {
647 firstRegistration = false;
648
649
650 callHandlerAddedForAllHandlers();
651 }
652 }
653
654 @Override
655 public final ChannelHandler first() {
656 ChannelHandlerContext first = firstContext();
657 if (first == null) {
658 return null;
659 }
660 return first.handler();
661 }
662
663 @Override
664 public final ChannelHandlerContext firstContext() {
665 AbstractChannelHandlerContext first = head.next;
666 if (first == tail) {
667 return null;
668 }
669 return head.next;
670 }
671
672 @Override
673 public final ChannelHandler last() {
674 AbstractChannelHandlerContext last = tail.prev;
675 if (last == head) {
676 return null;
677 }
678 return last.handler();
679 }
680
681 @Override
682 public final ChannelHandlerContext lastContext() {
683 AbstractChannelHandlerContext last = tail.prev;
684 if (last == head) {
685 return null;
686 }
687 return last;
688 }
689
690 @Override
691 public final ChannelHandler get(String name) {
692 ChannelHandlerContext ctx = context(name);
693 if (ctx == null) {
694 return null;
695 } else {
696 return ctx.handler();
697 }
698 }
699
700 @SuppressWarnings("unchecked")
701 @Override
702 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
703 ChannelHandlerContext ctx = context(handlerType);
704 if (ctx == null) {
705 return null;
706 } else {
707 return (T) ctx.handler();
708 }
709 }
710
711 @Override
712 public final ChannelHandlerContext context(String name) {
713 return context0(ObjectUtil.checkNotNull(name, "name"));
714 }
715
716 @Override
717 public final ChannelHandlerContext context(ChannelHandler handler) {
718 ObjectUtil.checkNotNull(handler, "handler");
719
720 AbstractChannelHandlerContext ctx = head.next;
721 for (;;) {
722
723 if (ctx == null) {
724 return null;
725 }
726
727 if (ctx.handler() == handler) {
728 return ctx;
729 }
730
731 ctx = ctx.next;
732 }
733 }
734
735 @Override
736 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
737 ObjectUtil.checkNotNull(handlerType, "handlerType");
738
739 AbstractChannelHandlerContext ctx = head.next;
740 for (;;) {
741 if (ctx == null) {
742 return null;
743 }
744 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
745 return ctx;
746 }
747 ctx = ctx.next;
748 }
749 }
750
751 @Override
752 public final List<String> names() {
753 List<String> list = new ArrayList<String>();
754 AbstractChannelHandlerContext ctx = head.next;
755 for (;;) {
756 if (ctx == null) {
757 return list;
758 }
759 list.add(ctx.name());
760 ctx = ctx.next;
761 }
762 }
763
764 @Override
765 public final Map<String, ChannelHandler> toMap() {
766 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
767 AbstractChannelHandlerContext ctx = head.next;
768 for (;;) {
769 if (ctx == tail) {
770 return map;
771 }
772 map.put(ctx.name(), ctx.handler());
773 ctx = ctx.next;
774 }
775 }
776
777 @Override
778 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
779 return toMap().entrySet().iterator();
780 }
781
782
783
784
785 @Override
786 public final String toString() {
787 StringBuilder buf = new StringBuilder()
788 .append(StringUtil.simpleClassName(this))
789 .append('{');
790 AbstractChannelHandlerContext ctx = head.next;
791 for (;;) {
792 if (ctx == tail) {
793 break;
794 }
795
796 buf.append('(')
797 .append(ctx.name())
798 .append(" = ")
799 .append(ctx.handler().getClass().getName())
800 .append(')');
801
802 ctx = ctx.next;
803 if (ctx == tail) {
804 break;
805 }
806
807 buf.append(", ");
808 }
809 buf.append('}');
810 return buf.toString();
811 }
812
813 @Override
814 public final ChannelPipeline fireChannelRegistered() {
815 AbstractChannelHandlerContext.invokeChannelRegistered(head);
816 return this;
817 }
818
819 @Override
820 public final ChannelPipeline fireChannelUnregistered() {
821 AbstractChannelHandlerContext.invokeChannelUnregistered(head);
822 return this;
823 }
824
825
826
827
828
829
830
831
832
833
834
835 private synchronized void destroy() {
836 destroyUp(head.next, false);
837 }
838
839 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
840 final Thread currentThread = Thread.currentThread();
841 final AbstractChannelHandlerContext tail = this.tail;
842 for (;;) {
843 if (ctx == tail) {
844 destroyDown(currentThread, tail.prev, inEventLoop);
845 break;
846 }
847
848 final EventExecutor executor = ctx.executor();
849 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
850 final AbstractChannelHandlerContext finalCtx = ctx;
851 executor.execute(new Runnable() {
852 @Override
853 public void run() {
854 destroyUp(finalCtx, true);
855 }
856 });
857 break;
858 }
859
860 ctx = ctx.next;
861 inEventLoop = false;
862 }
863 }
864
865 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
866
867 final AbstractChannelHandlerContext head = this.head;
868 for (;;) {
869 if (ctx == head) {
870 break;
871 }
872
873 final EventExecutor executor = ctx.executor();
874 if (inEventLoop || executor.inEventLoop(currentThread)) {
875 atomicRemoveFromHandlerList(ctx);
876 callHandlerRemoved0(ctx);
877 } else {
878 final AbstractChannelHandlerContext finalCtx = ctx;
879 executor.execute(new Runnable() {
880 @Override
881 public void run() {
882 destroyDown(Thread.currentThread(), finalCtx, true);
883 }
884 });
885 break;
886 }
887
888 ctx = ctx.prev;
889 inEventLoop = false;
890 }
891 }
892
893 @Override
894 public final ChannelPipeline fireChannelActive() {
895 AbstractChannelHandlerContext.invokeChannelActive(head);
896 return this;
897 }
898
899 @Override
900 public final ChannelPipeline fireChannelInactive() {
901 AbstractChannelHandlerContext.invokeChannelInactive(head);
902 return this;
903 }
904
905 @Override
906 public final ChannelPipeline fireExceptionCaught(Throwable cause) {
907 AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
908 return this;
909 }
910
911 @Override
912 public final ChannelPipeline fireUserEventTriggered(Object event) {
913 AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
914 return this;
915 }
916
917 @Override
918 public final ChannelPipeline fireChannelRead(Object msg) {
919 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
920 return this;
921 }
922
923 @Override
924 public final ChannelPipeline fireChannelReadComplete() {
925 AbstractChannelHandlerContext.invokeChannelReadComplete(head);
926 return this;
927 }
928
929 @Override
930 public final ChannelPipeline fireChannelWritabilityChanged() {
931 AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
932 return this;
933 }
934
935 @Override
936 public final ChannelFuture bind(SocketAddress localAddress) {
937 return tail.bind(localAddress);
938 }
939
940 @Override
941 public final ChannelFuture connect(SocketAddress remoteAddress) {
942 return tail.connect(remoteAddress);
943 }
944
945 @Override
946 public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
947 return tail.connect(remoteAddress, localAddress);
948 }
949
950 @Override
951 public final ChannelFuture disconnect() {
952 return tail.disconnect();
953 }
954
955 @Override
956 public final ChannelFuture close() {
957 return tail.close();
958 }
959
960 @Override
961 public final ChannelFuture deregister() {
962 return tail.deregister();
963 }
964
965 @Override
966 public final ChannelPipeline flush() {
967 tail.flush();
968 return this;
969 }
970
971 @Override
972 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
973 return tail.bind(localAddress, promise);
974 }
975
976 @Override
977 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
978 return tail.connect(remoteAddress, promise);
979 }
980
981 @Override
982 public final ChannelFuture connect(
983 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
984 return tail.connect(remoteAddress, localAddress, promise);
985 }
986
987 @Override
988 public final ChannelFuture disconnect(ChannelPromise promise) {
989 return tail.disconnect(promise);
990 }
991
992 @Override
993 public final ChannelFuture close(ChannelPromise promise) {
994 return tail.close(promise);
995 }
996
997 @Override
998 public final ChannelFuture deregister(final ChannelPromise promise) {
999 return tail.deregister(promise);
1000 }
1001
1002 @Override
1003 public final ChannelPipeline read() {
1004 tail.read();
1005 return this;
1006 }
1007
1008 @Override
1009 public final ChannelFuture write(Object msg) {
1010 return tail.write(msg);
1011 }
1012
1013 @Override
1014 public final ChannelFuture write(Object msg, ChannelPromise promise) {
1015 return tail.write(msg, promise);
1016 }
1017
1018 @Override
1019 public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1020 return tail.writeAndFlush(msg, promise);
1021 }
1022
1023 @Override
1024 public final ChannelFuture writeAndFlush(Object msg) {
1025 return tail.writeAndFlush(msg);
1026 }
1027
1028 @Override
1029 public final ChannelPromise newPromise() {
1030 return new DefaultChannelPromise(channel);
1031 }
1032
1033 @Override
1034 public final ChannelProgressivePromise newProgressivePromise() {
1035 return new DefaultChannelProgressivePromise(channel);
1036 }
1037
1038 @Override
1039 public final ChannelFuture newSucceededFuture() {
1040 return succeededFuture;
1041 }
1042
1043 @Override
1044 public final ChannelFuture newFailedFuture(Throwable cause) {
1045 return new FailedChannelFuture(channel, null, cause);
1046 }
1047
1048 @Override
1049 public final ChannelPromise voidPromise() {
1050 return voidPromise;
1051 }
1052
1053 private void checkDuplicateName(String name) {
1054 if (context0(name) != null) {
1055 throw new IllegalArgumentException("Duplicate handler name: " + name);
1056 }
1057 }
1058
1059 private AbstractChannelHandlerContext context0(String name) {
1060 AbstractChannelHandlerContext context = head.next;
1061 while (context != tail) {
1062 if (context.name().equals(name)) {
1063 return context;
1064 }
1065 context = context.next;
1066 }
1067 return null;
1068 }
1069
1070 private AbstractChannelHandlerContext getContextOrDie(String name) {
1071 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1072 if (ctx == null) {
1073 throw new NoSuchElementException(name);
1074 } else {
1075 return ctx;
1076 }
1077 }
1078
1079 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1080 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1081 if (ctx == null) {
1082 throw new NoSuchElementException(handler.getClass().getName());
1083 } else {
1084 return ctx;
1085 }
1086 }
1087
1088 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1089 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1090 if (ctx == null) {
1091 throw new NoSuchElementException(handlerType.getName());
1092 } else {
1093 return ctx;
1094 }
1095 }
1096
1097 private void callHandlerAddedForAllHandlers() {
1098 final PendingHandlerCallback pendingHandlerCallbackHead;
1099 synchronized (this) {
1100 assert !registered;
1101
1102
1103 registered = true;
1104
1105 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1106
1107 this.pendingHandlerCallbackHead = null;
1108 }
1109
1110
1111
1112
1113 PendingHandlerCallback task = pendingHandlerCallbackHead;
1114 while (task != null) {
1115 task.execute();
1116 task = task.next;
1117 }
1118 }
1119
1120 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1121 assert !registered;
1122
1123 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1124 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1125 if (pending == null) {
1126 pendingHandlerCallbackHead = task;
1127 } else {
1128
1129 while (pending.next != null) {
1130 pending = pending.next;
1131 }
1132 pending.next = task;
1133 }
1134 }
1135
1136 private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1137 newCtx.setAddPending();
1138 executor.execute(new Runnable() {
1139 @Override
1140 public void run() {
1141 callHandlerAdded0(newCtx);
1142 }
1143 });
1144 }
1145
1146
1147
1148
1149
1150 protected void onUnhandledInboundException(Throwable cause) {
1151 try {
1152 logger.warn(
1153 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1154 "It usually means the last handler in the pipeline did not handle the exception.",
1155 cause);
1156 } finally {
1157 ReferenceCountUtil.release(cause);
1158 }
1159 }
1160
1161
1162
1163
1164
1165 protected void onUnhandledInboundChannelActive() {
1166 }
1167
1168
1169
1170
1171
1172 protected void onUnhandledInboundChannelInactive() {
1173 }
1174
1175
1176
1177
1178
1179
1180 protected void onUnhandledInboundMessage(Object msg) {
1181 try {
1182 logger.debug(
1183 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1184 "Please check your pipeline configuration.", msg);
1185 } finally {
1186 ReferenceCountUtil.release(msg);
1187 }
1188 }
1189
1190
1191
1192
1193
1194
1195 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1196 onUnhandledInboundMessage(msg);
1197 if (logger.isDebugEnabled()) {
1198 logger.debug("Discarded message pipeline : {}. Channel : {}.",
1199 ctx.pipeline().names(), ctx.channel());
1200 }
1201 }
1202
1203
1204
1205
1206
1207 protected void onUnhandledInboundChannelReadComplete() {
1208 }
1209
1210
1211
1212
1213
1214
1215 protected void onUnhandledInboundUserEventTriggered(Object evt) {
1216
1217
1218 ReferenceCountUtil.release(evt);
1219 }
1220
1221
1222
1223
1224
1225 protected void onUnhandledChannelWritabilityChanged() {
1226 }
1227
1228 @UnstableApi
1229 protected void incrementPendingOutboundBytes(long size) {
1230 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1231 if (buffer != null) {
1232 buffer.incrementPendingOutboundBytes(size);
1233 }
1234 }
1235
1236 @UnstableApi
1237 protected void decrementPendingOutboundBytes(long size) {
1238 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1239 if (buffer != null) {
1240 buffer.decrementPendingOutboundBytes(size);
1241 }
1242 }
1243
1244
1245 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1246
1247 TailContext(DefaultChannelPipeline pipeline) {
1248 super(pipeline, null, TAIL_NAME, TailContext.class);
1249 setAddComplete();
1250 }
1251
1252 @Override
1253 public ChannelHandler handler() {
1254 return this;
1255 }
1256
1257 @Override
1258 public void channelRegistered(ChannelHandlerContext ctx) { }
1259
1260 @Override
1261 public void channelUnregistered(ChannelHandlerContext ctx) { }
1262
1263 @Override
1264 public void channelActive(ChannelHandlerContext ctx) {
1265 onUnhandledInboundChannelActive();
1266 }
1267
1268 @Override
1269 public void channelInactive(ChannelHandlerContext ctx) {
1270 onUnhandledInboundChannelInactive();
1271 }
1272
1273 @Override
1274 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1275 onUnhandledChannelWritabilityChanged();
1276 }
1277
1278 @Override
1279 public void handlerAdded(ChannelHandlerContext ctx) { }
1280
1281 @Override
1282 public void handlerRemoved(ChannelHandlerContext ctx) { }
1283
1284 @Override
1285 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1286 onUnhandledInboundUserEventTriggered(evt);
1287 }
1288
1289 @Override
1290 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1291 onUnhandledInboundException(cause);
1292 }
1293
1294 @Override
1295 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1296 onUnhandledInboundMessage(ctx, msg);
1297 }
1298
1299 @Override
1300 public void channelReadComplete(ChannelHandlerContext ctx) {
1301 onUnhandledInboundChannelReadComplete();
1302 }
1303 }
1304
1305 final class HeadContext extends AbstractChannelHandlerContext
1306 implements ChannelOutboundHandler, ChannelInboundHandler {
1307
1308 private final Unsafe unsafe;
1309
1310 HeadContext(DefaultChannelPipeline pipeline) {
1311 super(pipeline, null, HEAD_NAME, HeadContext.class);
1312 unsafe = pipeline.channel().unsafe();
1313 setAddComplete();
1314 }
1315
1316 @Override
1317 public ChannelHandler handler() {
1318 return this;
1319 }
1320
1321 @Override
1322 public void handlerAdded(ChannelHandlerContext ctx) {
1323
1324 }
1325
1326 @Override
1327 public void handlerRemoved(ChannelHandlerContext ctx) {
1328
1329 }
1330
1331 @Override
1332 public void bind(
1333 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1334 unsafe.bind(localAddress, promise);
1335 }
1336
1337 @Override
1338 public void connect(
1339 ChannelHandlerContext ctx,
1340 SocketAddress remoteAddress, SocketAddress localAddress,
1341 ChannelPromise promise) {
1342 unsafe.connect(remoteAddress, localAddress, promise);
1343 }
1344
1345 @Override
1346 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1347 unsafe.disconnect(promise);
1348 }
1349
1350 @Override
1351 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1352 unsafe.close(promise);
1353 }
1354
1355 @Override
1356 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1357 unsafe.deregister(promise);
1358 }
1359
1360 @Override
1361 public void read(ChannelHandlerContext ctx) {
1362 unsafe.beginRead();
1363 }
1364
1365 @Override
1366 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1367 unsafe.write(msg, promise);
1368 }
1369
1370 @Override
1371 public void flush(ChannelHandlerContext ctx) {
1372 unsafe.flush();
1373 }
1374
1375 @Override
1376 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1377 ctx.fireExceptionCaught(cause);
1378 }
1379
1380 @Override
1381 public void channelRegistered(ChannelHandlerContext ctx) {
1382 invokeHandlerAddedIfNeeded();
1383 ctx.fireChannelRegistered();
1384 }
1385
1386 @Override
1387 public void channelUnregistered(ChannelHandlerContext ctx) {
1388 ctx.fireChannelUnregistered();
1389
1390
1391 if (!channel.isOpen()) {
1392 destroy();
1393 }
1394 }
1395
1396 @Override
1397 public void channelActive(ChannelHandlerContext ctx) {
1398 ctx.fireChannelActive();
1399
1400 readIfIsAutoRead();
1401 }
1402
1403 @Override
1404 public void channelInactive(ChannelHandlerContext ctx) {
1405 ctx.fireChannelInactive();
1406 }
1407
1408 @Override
1409 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1410 ctx.fireChannelRead(msg);
1411 }
1412
1413 @Override
1414 public void channelReadComplete(ChannelHandlerContext ctx) {
1415 ctx.fireChannelReadComplete();
1416
1417 readIfIsAutoRead();
1418 }
1419
1420 private void readIfIsAutoRead() {
1421 if (channel.config().isAutoRead()) {
1422 channel.read();
1423 }
1424 }
1425
1426 @Override
1427 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1428 ctx.fireUserEventTriggered(evt);
1429 }
1430
1431 @Override
1432 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1433 ctx.fireChannelWritabilityChanged();
1434 }
1435 }
1436
1437 private abstract static class PendingHandlerCallback implements Runnable {
1438 final AbstractChannelHandlerContext ctx;
1439 PendingHandlerCallback next;
1440
1441 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1442 this.ctx = ctx;
1443 }
1444
1445 abstract void execute();
1446 }
1447
1448 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1449
1450 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1451 super(ctx);
1452 }
1453
1454 @Override
1455 public void run() {
1456 callHandlerAdded0(ctx);
1457 }
1458
1459 @Override
1460 void execute() {
1461 EventExecutor executor = ctx.executor();
1462 if (executor.inEventLoop()) {
1463 callHandlerAdded0(ctx);
1464 } else {
1465 try {
1466 executor.execute(this);
1467 } catch (RejectedExecutionException e) {
1468 if (logger.isWarnEnabled()) {
1469 logger.warn(
1470 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1471 executor, ctx.name(), e);
1472 }
1473 atomicRemoveFromHandlerList(ctx);
1474 ctx.setRemoved();
1475 }
1476 }
1477 }
1478 }
1479
1480 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1481
1482 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1483 super(ctx);
1484 }
1485
1486 @Override
1487 public void run() {
1488 callHandlerRemoved0(ctx);
1489 }
1490
1491 @Override
1492 void execute() {
1493 EventExecutor executor = ctx.executor();
1494 if (executor.inEventLoop()) {
1495 callHandlerRemoved0(ctx);
1496 } else {
1497 try {
1498 executor.execute(this);
1499 } catch (RejectedExecutionException e) {
1500 if (logger.isWarnEnabled()) {
1501 logger.warn(
1502 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1503 " removing handler {}.", executor, ctx.name(), e);
1504 }
1505
1506 ctx.setRemoved();
1507 }
1508 }
1509 }
1510 }
1511 }