查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.EventExecutorGroup;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.StringUtil;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.net.SocketAddress;
29  import java.util.ArrayList;
30  import java.util.IdentityHashMap;
31  import java.util.Iterator;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NoSuchElementException;
36  import java.util.WeakHashMap;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39  
40  /**
41   * The default {@link ChannelPipeline} implementation.  It is usually created
42   * by a {@link Channel} implementation when the {@link Channel} is created.
43   */
44  public class DefaultChannelPipeline implements ChannelPipeline {
45  
46      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
47  
48      private static final String HEAD_NAME = generateName0(HeadContext.class);
49      private static final String TAIL_NAME = generateName0(TailContext.class);
50  
51      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
52              new FastThreadLocal<Map<Class<?>, String>>() {
53          @Override
54          protected Map<Class<?>, String> initialValue() throws Exception {
55              return new WeakHashMap<Class<?>, String>();
56          }
57      };
58  
59      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
60              AtomicReferenceFieldUpdater.newUpdater(
61                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
62      final AbstractChannelHandlerContext head;
63      final AbstractChannelHandlerContext tail;
64  
65      private final Channel channel;
66      private Map<EventExecutorGroup, EventExecutor> childExecutors;
67      private volatile MessageSizeEstimator.Handle estimatorHandle;
68      private boolean firstRegistration = true;
69  
70      /**
71       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
72       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
73       *
74       * We only keep the head because it is expected that the list is used infrequently and its size is small.
75       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
76       * complexity.
77       */
78      private PendingHandlerCallback pendingHandlerCallbackHead;
79  
80      /**
81       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
82       * change.
83       */
84      private boolean registered;
85  
86      protected DefaultChannelPipeline(Channel channel) {
87          this.channel = ObjectUtil.checkNotNull(channel, "channel");
88  
89          tail = new TailContext(this);
90          head = new HeadContext(this);
91  
92          head.next = tail;
93          tail.prev = head;
94      }
95  
96      final MessageSizeEstimator.Handle estimatorHandle() {
97          MessageSizeEstimator.Handle handle = estimatorHandle;
98          if (handle == null) {
99              handle = channel.config().getMessageSizeEstimator().newHandle();
100             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
101                 handle = estimatorHandle;
102             }
103         }
104         return handle;
105     }
106 
107     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
108         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
109     }
110 
111     private EventExecutor childExecutor(EventExecutorGroup group) {
112         if (group == null) {
113             return null;
114         }
115         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
116         if (pinEventExecutor != null && !pinEventExecutor) {
117             return group.next();
118         }
119         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
120         if (childExecutors == null) {
121             // Use size of 4 as most people only use one extra EventExecutor.
122             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
123         }
124         // Pin one of the child executors once and remember it so that the same child executor
125         // is used to fire events for the same channel.
126         EventExecutor childExecutor = childExecutors.get(group);
127         if (childExecutor == null) {
128             childExecutor = group.next();
129             childExecutors.put(group, childExecutor);
130         }
131         return childExecutor;
132     }
133     @Override
134     public final Channel channel() {
135         return channel;
136     }
137 
138     @Override
139     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
140         return addFirst(null, name, handler);
141     }
142 
143     @Override
144     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
145         final AbstractChannelHandlerContext newCtx;
146         synchronized (this) {
147             checkMultiplicity(handler);
148             name = filterName(name, handler);
149 
150             newCtx = newContext(group, name, handler);
151 
152             addFirst0(newCtx);
153 
154             // If the registered is false it means that the channel was not registered on an eventloop yet.
155             // In this case we add the context to the pipeline and add a task that will call
156             // ChannelHandler.handlerAdded(...) once the channel is registered.
157             if (!registered) {
158                 newCtx.setAddPending();
159                 callHandlerCallbackLater(newCtx, true);
160                 return this;
161             }
162 
163             EventExecutor executor = newCtx.executor();
164             if (!executor.inEventLoop()) {
165                 newCtx.setAddPending();
166                 executor.execute(new Runnable() {
167                     @Override
168                     public void run() {
169                         callHandlerAdded0(newCtx);
170                     }
171                 });
172                 return this;
173             }
174         }
175         callHandlerAdded0(newCtx);
176         return this;
177     }
178 
179     private void addFirst0(AbstractChannelHandlerContext newCtx) {
180         AbstractChannelHandlerContext nextCtx = head.next;
181         newCtx.prev = head;
182         newCtx.next = nextCtx;
183         head.next = newCtx;
184         nextCtx.prev = newCtx;
185     }
186 
187     @Override
188     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
189         return addLast(null, name, handler);
190     }
191 
192     @Override
193     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
194         final AbstractChannelHandlerContext newCtx;
195         synchronized (this) {
196             checkMultiplicity(handler);
197 
198             newCtx = newContext(group, filterName(name, handler), handler);
199 
200             addLast0(newCtx);
201 
202             // If the registered is false it means that the channel was not registered on an eventloop yet.
203             // In this case we add the context to the pipeline and add a task that will call
204             // ChannelHandler.handlerAdded(...) once the channel is registered.
205             if (!registered) {
206                 newCtx.setAddPending();
207                 callHandlerCallbackLater(newCtx, true);
208                 return this;
209             }
210 
211             EventExecutor executor = newCtx.executor();
212             if (!executor.inEventLoop()) {
213                 newCtx.setAddPending();
214                 executor.execute(new Runnable() {
215                     @Override
216                     public void run() {
217                         callHandlerAdded0(newCtx);
218                     }
219                 });
220                 return this;
221             }
222         }
223         callHandlerAdded0(newCtx);
224         return this;
225     }
226 
227     private void addLast0(AbstractChannelHandlerContext newCtx) {
228         AbstractChannelHandlerContext prev = tail.prev;
229         newCtx.prev = prev;
230         newCtx.next = tail;
231         prev.next = newCtx;
232         tail.prev = newCtx;
233     }
234 
235     @Override
236     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237         return addBefore(null, baseName, name, handler);
238     }
239 
240     @Override
241     public final ChannelPipeline addBefore(
242             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243         final AbstractChannelHandlerContext newCtx;
244         final AbstractChannelHandlerContext ctx;
245         synchronized (this) {
246             checkMultiplicity(handler);
247             name = filterName(name, handler);
248             ctx = getContextOrDie(baseName);
249 
250             newCtx = newContext(group, name, handler);
251 
252             addBefore0(ctx, newCtx);
253 
254             // If the registered is false it means that the channel was not registered on an eventloop yet.
255             // In this case we add the context to the pipeline and add a task that will call
256             // ChannelHandler.handlerAdded(...) once the channel is registered.
257             if (!registered) {
258                 newCtx.setAddPending();
259                 callHandlerCallbackLater(newCtx, true);
260                 return this;
261             }
262 
263             EventExecutor executor = newCtx.executor();
264             if (!executor.inEventLoop()) {
265                 newCtx.setAddPending();
266                 executor.execute(new Runnable() {
267                     @Override
268                     public void run() {
269                         callHandlerAdded0(newCtx);
270                     }
271                 });
272                 return this;
273             }
274         }
275         callHandlerAdded0(newCtx);
276         return this;
277     }
278 
279     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
280         newCtx.prev = ctx.prev;
281         newCtx.next = ctx;
282         ctx.prev.next = newCtx;
283         ctx.prev = newCtx;
284     }
285 
286     private String filterName(String name, ChannelHandler handler) {
287         if (name == null) {
288             return generateName(handler);
289         }
290         checkDuplicateName(name);
291         return name;
292     }
293 
294     @Override
295     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
296         return addAfter(null, baseName, name, handler);
297     }
298 
299     @Override
300     public final ChannelPipeline addAfter(
301             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
302         final AbstractChannelHandlerContext newCtx;
303         final AbstractChannelHandlerContext ctx;
304 
305         synchronized (this) {
306             checkMultiplicity(handler);
307             name = filterName(name, handler);
308             ctx = getContextOrDie(baseName);
309 
310             newCtx = newContext(group, name, handler);
311 
312             addAfter0(ctx, newCtx);
313 
314             // If the registered is false it means that the channel was not registered on an eventloop yet.
315             // In this case we remove the context from the pipeline and add a task that will call
316             // ChannelHandler.handlerRemoved(...) once the channel is registered.
317             if (!registered) {
318                 newCtx.setAddPending();
319                 callHandlerCallbackLater(newCtx, true);
320                 return this;
321             }
322             EventExecutor executor = newCtx.executor();
323             if (!executor.inEventLoop()) {
324                 newCtx.setAddPending();
325                 executor.execute(new Runnable() {
326                     @Override
327                     public void run() {
328                         callHandlerAdded0(newCtx);
329                     }
330                 });
331                 return this;
332             }
333         }
334         callHandlerAdded0(newCtx);
335         return this;
336     }
337 
338     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
339         newCtx.prev = ctx;
340         newCtx.next = ctx.next;
341         ctx.next.prev = newCtx;
342         ctx.next = newCtx;
343     }
344 
345     @Override
346     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
347         return addFirst(null, handlers);
348     }
349 
350     @Override
351     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
352         if (handlers == null) {
353             throw new NullPointerException("handlers");
354         }
355         if (handlers.length == 0 || handlers[0] == null) {
356             return this;
357         }
358 
359         int size;
360         for (size = 1; size < handlers.length; size ++) {
361             if (handlers[size] == null) {
362                 break;
363             }
364         }
365 
366         for (int i = size - 1; i >= 0; i --) {
367             ChannelHandler h = handlers[i];
368             addFirst(executor, null, h);
369         }
370 
371         return this;
372     }
373 
374     @Override
375     public final ChannelPipeline addLast(ChannelHandler... handlers) {
376         return addLast(null, handlers);
377     }
378 
379     @Override
380     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
381         if (handlers == null) {
382             throw new NullPointerException("handlers");
383         }
384 
385         for (ChannelHandler h: handlers) {
386             if (h == null) {
387                 break;
388             }
389             addLast(executor, null, h);
390         }
391 
392         return this;
393     }
394 
395     private String generateName(ChannelHandler handler) {
396         Map<Class<?>, String> cache = nameCaches.get();
397         Class<?> handlerType = handler.getClass();
398         String name = cache.get(handlerType);
399         if (name == null) {
400             name = generateName0(handlerType);
401             cache.put(handlerType, name);
402         }
403 
404         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
405         // any name conflicts.  Note that we don't cache the names generated here.
406         if (context0(name) != null) {
407             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
408             for (int i = 1;; i ++) {
409                 String newName = baseName + i;
410                 if (context0(newName) == null) {
411                     name = newName;
412                     break;
413                 }
414             }
415         }
416         return name;
417     }
418 
419     private static String generateName0(Class<?> handlerType) {
420         return StringUtil.simpleClassName(handlerType) + "#0";
421     }
422 
423     @Override
424     public final ChannelPipeline remove(ChannelHandler handler) {
425         remove(getContextOrDie(handler));
426         return this;
427     }
428 
429     @Override
430     public final ChannelHandler remove(String name) {
431         return remove(getContextOrDie(name)).handler();
432     }
433 
434     @SuppressWarnings("unchecked")
435     @Override
436     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
437         return (T) remove(getContextOrDie(handlerType)).handler();
438     }
439 
440     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
441         assert ctx != head && ctx != tail;
442 
443         synchronized (this) {
444             remove0(ctx);
445 
446             // If the registered is false it means that the channel was not registered on an eventloop yet.
447             // In this case we remove the context from the pipeline and add a task that will call
448             // ChannelHandler.handlerRemoved(...) once the channel is registered.
449             if (!registered) {
450                 callHandlerCallbackLater(ctx, false);
451                 return ctx;
452             }
453 
454             EventExecutor executor = ctx.executor();
455             if (!executor.inEventLoop()) {
456                 executor.execute(new Runnable() {
457                     @Override
458                     public void run() {
459                         callHandlerRemoved0(ctx);
460                     }
461                 });
462                 return ctx;
463             }
464         }
465         callHandlerRemoved0(ctx);
466         return ctx;
467     }
468 
469     private static void remove0(AbstractChannelHandlerContext ctx) {
470         AbstractChannelHandlerContext prev = ctx.prev;
471         AbstractChannelHandlerContext next = ctx.next;
472         prev.next = next;
473         next.prev = prev;
474     }
475 
476     @Override
477     public final ChannelHandler removeFirst() {
478         if (head.next == tail) {
479             throw new NoSuchElementException();
480         }
481         return remove(head.next).handler();
482     }
483 
484     @Override
485     public final ChannelHandler removeLast() {
486         if (head.next == tail) {
487             throw new NoSuchElementException();
488         }
489         return remove(tail.prev).handler();
490     }
491 
492     @Override
493     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
494         replace(getContextOrDie(oldHandler), newName, newHandler);
495         return this;
496     }
497 
498     @Override
499     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
500         return replace(getContextOrDie(oldName), newName, newHandler);
501     }
502 
503     @Override
504     @SuppressWarnings("unchecked")
505     public final <T extends ChannelHandler> T replace(
506             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
507         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
508     }
509 
510     private ChannelHandler replace(
511             final AbstractChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
512         assert ctx != head && ctx != tail;
513 
514         final AbstractChannelHandlerContext newCtx;
515         synchronized (this) {
516             checkMultiplicity(newHandler);
517             boolean sameName = ctx.name().equals(newName);
518             if (!sameName) {
519                 checkDuplicateName(newName);
520             }
521 
522             newCtx = newContext(ctx.executor, newName, newHandler);
523 
524             replace0(ctx, newCtx);
525 
526             // If the registered is false it means that the channel was not registered on an eventloop yet.
527             // In this case we replace the context in the pipeline
528             // and add a task that will call ChannelHandler.handlerAdded(...) and
529             // ChannelHandler.handlerRemoved(...) once the channel is registered.
530             if (!registered) {
531                 callHandlerCallbackLater(newCtx, true);
532                 callHandlerCallbackLater(ctx, false);
533                 return ctx.handler();
534             }
535             EventExecutor executor = ctx.executor();
536             if (!executor.inEventLoop()) {
537                 executor.execute(new Runnable() {
538                     @Override
539                     public void run() {
540                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
541                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
542                         // those event handlers must be called after handlerAdded().
543                         callHandlerAdded0(newCtx);
544                         callHandlerRemoved0(ctx);
545                     }
546                 });
547                 return ctx.handler();
548             }
549         }
550         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
551         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
552         // event handlers must be called after handlerAdded().
553         callHandlerAdded0(newCtx);
554         callHandlerRemoved0(ctx);
555         return ctx.handler();
556     }
557 
558     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
559         AbstractChannelHandlerContext prev = oldCtx.prev;
560         AbstractChannelHandlerContext next = oldCtx.next;
561         newCtx.prev = prev;
562         newCtx.next = next;
563 
564         // Finish the replacement of oldCtx with newCtx in the linked list.
565         // Note that this doesn't mean events will be sent to the new handler immediately
566         // because we are currently at the event handler thread and no more than one handler methods can be invoked
567         // at the same time (we ensured that in replace().)
568         prev.next = newCtx;
569         next.prev = newCtx;
570 
571         // update the reference to the replacement so forward of buffered content will work correctly
572         oldCtx.prev = newCtx;
573         oldCtx.next = newCtx;
574     }
575 
576     private static void checkMultiplicity(ChannelHandler handler) {
577         if (handler instanceof ChannelHandlerAdapter) {
578             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
579             if (!h.isSharable() && h.added) {
580                 throw new ChannelPipelineException(
581                         h.getClass().getName() +
582                         " is not a @Sharable handler, so can't be added or removed multiple times.");
583             }
584             h.added = true;
585         }
586     }
587 
588     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
589         try {
590             // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
591             // any pipeline events ctx.handler() will miss them because the state will not allow it.
592             ctx.setAddComplete();
593             ctx.handler().handlerAdded(ctx);
594         } catch (Throwable t) {
595             boolean removed = false;
596             try {
597                 remove0(ctx);
598                 try {
599                     ctx.handler().handlerRemoved(ctx);
600                 } finally {
601                     ctx.setRemoved();
602                 }
603                 removed = true;
604             } catch (Throwable t2) {
605                 if (logger.isWarnEnabled()) {
606                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
607                 }
608             }
609 
610             if (removed) {
611                 fireExceptionCaught(new ChannelPipelineException(
612                         ctx.handler().getClass().getName() +
613                         ".handlerAdded() has thrown an exception; removed.", t));
614             } else {
615                 fireExceptionCaught(new ChannelPipelineException(
616                         ctx.handler().getClass().getName() +
617                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
618             }
619         }
620     }
621 
622     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
623         // Notify the complete removal.
624         try {
625             try {
626                 ctx.handler().handlerRemoved(ctx);
627             } finally {
628                 ctx.setRemoved();
629             }
630         } catch (Throwable t) {
631             fireExceptionCaught(new ChannelPipelineException(
632                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
633         }
634     }
635 
636     final void invokeHandlerAddedIfNeeded() {
637         assert channel.eventLoop().inEventLoop();
638         if (firstRegistration) {
639             firstRegistration = false;
640             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
641             // that were added before the registration was done.
642             callHandlerAddedForAllHandlers();
643         }
644     }
645 
646     @Override
647     public final ChannelHandler first() {
648         ChannelHandlerContext first = firstContext();
649         if (first == null) {
650             return null;
651         }
652         return first.handler();
653     }
654 
655     @Override
656     public final ChannelHandlerContext firstContext() {
657         AbstractChannelHandlerContext first = head.next;
658         if (first == tail) {
659             return null;
660         }
661         return head.next;
662     }
663 
664     @Override
665     public final ChannelHandler last() {
666         AbstractChannelHandlerContext last = tail.prev;
667         if (last == head) {
668             return null;
669         }
670         return last.handler();
671     }
672 
673     @Override
674     public final ChannelHandlerContext lastContext() {
675         AbstractChannelHandlerContext last = tail.prev;
676         if (last == head) {
677             return null;
678         }
679         return last;
680     }
681 
682     @Override
683     public final ChannelHandler get(String name) {
684         ChannelHandlerContext ctx = context(name);
685         if (ctx == null) {
686             return null;
687         } else {
688             return ctx.handler();
689         }
690     }
691 
692     @SuppressWarnings("unchecked")
693     @Override
694     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
695         ChannelHandlerContext ctx = context(handlerType);
696         if (ctx == null) {
697             return null;
698         } else {
699             return (T) ctx.handler();
700         }
701     }
702 
703     @Override
704     public final ChannelHandlerContext context(String name) {
705         if (name == null) {
706             throw new NullPointerException("name");
707         }
708 
709         return context0(name);
710     }
711 
712     @Override
713     public final ChannelHandlerContext context(ChannelHandler handler) {
714         if (handler == null) {
715             throw new NullPointerException("handler");
716         }
717 
718         AbstractChannelHandlerContext ctx = head.next;
719         for (;;) {
720 
721             if (ctx == null) {
722                 return null;
723             }
724 
725             if (ctx.handler() == handler) {
726                 return ctx;
727             }
728 
729             ctx = ctx.next;
730         }
731     }
732 
733     @Override
734     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
735         if (handlerType == null) {
736             throw new NullPointerException("handlerType");
737         }
738 
739         AbstractChannelHandlerContext ctx = head.next;
740         for (;;) {
741             if (ctx == null) {
742                 return null;
743             }
744             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
745                 return ctx;
746             }
747             ctx = ctx.next;
748         }
749     }
750 
751     @Override
752     public final List<String> names() {
753         List<String> list = new ArrayList<String>();
754         AbstractChannelHandlerContext ctx = head.next;
755         for (;;) {
756             if (ctx == null) {
757                 return list;
758             }
759             list.add(ctx.name());
760             ctx = ctx.next;
761         }
762     }
763 
764     @Override
765     public final Map<String, ChannelHandler> toMap() {
766         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
767         AbstractChannelHandlerContext ctx = head.next;
768         for (;;) {
769             if (ctx == tail) {
770                 return map;
771             }
772             map.put(ctx.name(), ctx.handler());
773             ctx = ctx.next;
774         }
775     }
776 
777     @Override
778     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
779         return toMap().entrySet().iterator();
780     }
781 
782     /**
783      * Returns the {@link String} representation of this pipeline.
784      */
785     @Override
786     public final String toString() {
787         StringBuilder buf = new StringBuilder()
788             .append(StringUtil.simpleClassName(this))
789             .append('{');
790         AbstractChannelHandlerContext ctx = head.next;
791         for (;;) {
792             if (ctx == tail) {
793                 break;
794             }
795 
796             buf.append('(')
797                .append(ctx.name())
798                .append(" = ")
799                .append(ctx.handler().getClass().getName())
800                .append(')');
801 
802             ctx = ctx.next;
803             if (ctx == tail) {
804                 break;
805             }
806 
807             buf.append(", ");
808         }
809         buf.append('}');
810         return buf.toString();
811     }
812 
813     @Override
814     public final ChannelPipeline fireChannelRegistered() {
815         AbstractChannelHandlerContext.invokeChannelRegistered(head);
816         return this;
817     }
818 
819     @Override
820     public final ChannelPipeline fireChannelUnregistered() {
821         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
822         return this;
823     }
824 
825     /**
826      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
827      * handlerRemoved().
828      *
829      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
830      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
831      * the handlers are removed after all events are handled.
832      *
833      * See: https://github.com/netty/netty/issues/3156
834      */
835     private synchronized void destroy() {
836         destroyUp(head.next, false);
837     }
838 
839     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
840         final Thread currentThread = Thread.currentThread();
841         final AbstractChannelHandlerContext tail = this.tail;
842         for (;;) {
843             if (ctx == tail) {
844                 destroyDown(currentThread, tail.prev, inEventLoop);
845                 break;
846             }
847 
848             final EventExecutor executor = ctx.executor();
849             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
850                 final AbstractChannelHandlerContext finalCtx = ctx;
851                 executor.execute(new Runnable() {
852                     @Override
853                     public void run() {
854                         destroyUp(finalCtx, true);
855                     }
856                 });
857                 break;
858             }
859 
860             ctx = ctx.next;
861             inEventLoop = false;
862         }
863     }
864 
865     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
866         // We have reached at tail; now traverse backwards.
867         final AbstractChannelHandlerContext head = this.head;
868         for (;;) {
869             if (ctx == head) {
870                 break;
871             }
872 
873             final EventExecutor executor = ctx.executor();
874             if (inEventLoop || executor.inEventLoop(currentThread)) {
875                 synchronized (this) {
876                     remove0(ctx);
877                 }
878                 callHandlerRemoved0(ctx);
879             } else {
880                 final AbstractChannelHandlerContext finalCtx = ctx;
881                 executor.execute(new Runnable() {
882                     @Override
883                     public void run() {
884                         destroyDown(Thread.currentThread(), finalCtx, true);
885                     }
886                 });
887                 break;
888             }
889 
890             ctx = ctx.prev;
891             inEventLoop = false;
892         }
893     }
894 
895     @Override
896     public final ChannelPipeline fireChannelActive() {
897         AbstractChannelHandlerContext.invokeChannelActive(head);
898         return this;
899     }
900 
901     @Override
902     public final ChannelPipeline fireChannelInactive() {
903         AbstractChannelHandlerContext.invokeChannelInactive(head);
904         return this;
905     }
906 
907     @Override
908     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
909         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
910         return this;
911     }
912 
913     @Override
914     public final ChannelPipeline fireUserEventTriggered(Object event) {
915         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
916         return this;
917     }
918 
919     @Override
920     public final ChannelPipeline fireChannelRead(Object msg) {
921         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
922         return this;
923     }
924 
925     @Override
926     public final ChannelPipeline fireChannelReadComplete() {
927         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
928         return this;
929     }
930 
931     @Override
932     public final ChannelPipeline fireChannelWritabilityChanged() {
933         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
934         return this;
935     }
936 
937     @Override
938     public final ChannelFuture bind(SocketAddress localAddress) {
939         return tail.bind(localAddress);
940     }
941 
942     @Override
943     public final ChannelFuture connect(SocketAddress remoteAddress) {
944         return tail.connect(remoteAddress);
945     }
946 
947     @Override
948     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
949         return tail.connect(remoteAddress, localAddress);
950     }
951 
952     @Override
953     public final ChannelFuture disconnect() {
954         return tail.disconnect();
955     }
956 
957     @Override
958     public final ChannelFuture close() {
959         return tail.close();
960     }
961 
962     @Override
963     public final ChannelFuture deregister() {
964         return tail.deregister();
965     }
966 
967     @Override
968     public final ChannelPipeline flush() {
969         tail.flush();
970         return this;
971     }
972 
973     @Override
974     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
975         return tail.bind(localAddress, promise);
976     }
977 
978     @Override
979     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
980         return tail.connect(remoteAddress, promise);
981     }
982 
983     @Override
984     public final ChannelFuture connect(
985             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
986         return tail.connect(remoteAddress, localAddress, promise);
987     }
988 
989     @Override
990     public final ChannelFuture disconnect(ChannelPromise promise) {
991         return tail.disconnect(promise);
992     }
993 
994     @Override
995     public final ChannelFuture close(ChannelPromise promise) {
996         return tail.close(promise);
997     }
998 
999     @Override
1000     public final ChannelFuture deregister(final ChannelPromise promise) {
1001         return tail.deregister(promise);
1002     }
1003 
1004     @Override
1005     public final ChannelPipeline read() {
1006         tail.read();
1007         return this;
1008     }
1009 
1010     @Override
1011     public final ChannelFuture write(Object msg) {
1012         return tail.write(msg);
1013     }
1014 
1015     @Override
1016     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1017         return tail.write(msg, promise);
1018     }
1019 
1020     @Override
1021     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1022         return tail.writeAndFlush(msg, promise);
1023     }
1024 
1025     @Override
1026     public final ChannelFuture writeAndFlush(Object msg) {
1027         return tail.writeAndFlush(msg);
1028     }
1029 
1030     private void checkDuplicateName(String name) {
1031         if (context0(name) != null) {
1032             throw new IllegalArgumentException("Duplicate handler name: " + name);
1033         }
1034     }
1035 
1036     private AbstractChannelHandlerContext context0(String name) {
1037         AbstractChannelHandlerContext context = head.next;
1038         while (context != tail) {
1039             if (context.name().equals(name)) {
1040                 return context;
1041             }
1042             context = context.next;
1043         }
1044         return null;
1045     }
1046 
1047     private AbstractChannelHandlerContext getContextOrDie(String name) {
1048         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1049         if (ctx == null) {
1050             throw new NoSuchElementException(name);
1051         } else {
1052             return ctx;
1053         }
1054     }
1055 
1056     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1057         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1058         if (ctx == null) {
1059             throw new NoSuchElementException(handler.getClass().getName());
1060         } else {
1061             return ctx;
1062         }
1063     }
1064 
1065     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1066         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1067         if (ctx == null) {
1068             throw new NoSuchElementException(handlerType.getName());
1069         } else {
1070             return ctx;
1071         }
1072     }
1073 
1074     private void callHandlerAddedForAllHandlers() {
1075         final PendingHandlerCallback pendingHandlerCallbackHead;
1076         synchronized (this) {
1077             assert !registered;
1078 
1079             // This Channel itself was registered.
1080             registered = true;
1081 
1082             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1083             // Null out so it can be GC'ed.
1084             this.pendingHandlerCallbackHead = null;
1085         }
1086 
1087         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1088         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1089         // the EventLoop.
1090         PendingHandlerCallback task = pendingHandlerCallbackHead;
1091         while (task != null) {
1092             task.execute();
1093             task = task.next;
1094         }
1095     }
1096 
1097     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1098         assert !registered;
1099 
1100         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1101         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1102         if (pending == null) {
1103             pendingHandlerCallbackHead = task;
1104         } else {
1105             // Find the tail of the linked-list.
1106             while (pending.next != null) {
1107                 pending = pending.next;
1108             }
1109             pending.next = task;
1110         }
1111     }
1112 
1113     /**
1114      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1115      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1116      */
1117     protected void onUnhandledInboundException(Throwable cause) {
1118         try {
1119             logger.warn(
1120                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1121                             "It usually means the last handler in the pipeline did not handle the exception.",
1122                     cause);
1123         } finally {
1124             ReferenceCountUtil.release(cause);
1125         }
1126     }
1127 
1128     /**
1129      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1130      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1131      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1132      */
1133     protected void onUnhandledInboundMessage(Object msg) {
1134         try {
1135             logger.debug(
1136                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1137                             "Please check your pipeline configuration.", msg);
1138         } finally {
1139             ReferenceCountUtil.release(msg);
1140         }
1141     }
1142 
1143     // A special catch-all handler that handles both bytes and messages.
1144     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1145 
1146         TailContext(DefaultChannelPipeline pipeline) {
1147             super(pipeline, null, TAIL_NAME, true, false);
1148             setAddComplete();
1149         }
1150 
1151         @Override
1152         public ChannelHandler handler() {
1153             return this;
1154         }
1155 
1156         @Override
1157         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1158 
1159         @Override
1160         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1161 
1162         @Override
1163         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1164 
1165         @Override
1166         public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1167 
1168         @Override
1169         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1170 
1171         @Override
1172         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1173 
1174         @Override
1175         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1176 
1177         @Override
1178         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1179             // This may not be a configuration error and so don't log anything.
1180             // The event may be superfluous for the current pipeline configuration.
1181             ReferenceCountUtil.release(evt);
1182         }
1183 
1184         @Override
1185         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1186             onUnhandledInboundException(cause);
1187         }
1188 
1189         @Override
1190         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1191             onUnhandledInboundMessage(msg);
1192         }
1193 
1194         @Override
1195         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1196     }
1197 
1198     final class HeadContext extends AbstractChannelHandlerContext
1199             implements ChannelOutboundHandler, ChannelInboundHandler {
1200 
1201         private final Unsafe unsafe;
1202 
1203         HeadContext(DefaultChannelPipeline pipeline) {
1204             super(pipeline, null, HEAD_NAME, false, true);
1205             unsafe = pipeline.channel().unsafe();
1206             setAddComplete();
1207         }
1208 
1209         @Override
1210         public ChannelHandler handler() {
1211             return this;
1212         }
1213 
1214         @Override
1215         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1216             // NOOP
1217         }
1218 
1219         @Override
1220         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
1221             // NOOP
1222         }
1223 
1224         @Override
1225         public void bind(
1226                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1227                 throws Exception {
1228             unsafe.bind(localAddress, promise);
1229         }
1230 
1231         @Override
1232         public void connect(
1233                 ChannelHandlerContext ctx,
1234                 SocketAddress remoteAddress, SocketAddress localAddress,
1235                 ChannelPromise promise) throws Exception {
1236             unsafe.connect(remoteAddress, localAddress, promise);
1237         }
1238 
1239         @Override
1240         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1241             unsafe.disconnect(promise);
1242         }
1243 
1244         @Override
1245         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1246             unsafe.close(promise);
1247         }
1248 
1249         @Override
1250         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1251             unsafe.deregister(promise);
1252         }
1253 
1254         @Override
1255         public void read(ChannelHandlerContext ctx) {
1256             unsafe.beginRead();
1257         }
1258 
1259         @Override
1260         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1261             unsafe.write(msg, promise);
1262         }
1263 
1264         @Override
1265         public void flush(ChannelHandlerContext ctx) throws Exception {
1266             unsafe.flush();
1267         }
1268 
1269         @Override
1270         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1271             ctx.fireExceptionCaught(cause);
1272         }
1273 
1274         @Override
1275         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1276             invokeHandlerAddedIfNeeded();
1277             ctx.fireChannelRegistered();
1278         }
1279 
1280         @Override
1281         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1282             ctx.fireChannelUnregistered();
1283 
1284             // Remove all handlers sequentially if channel is closed and unregistered.
1285             if (!channel.isOpen()) {
1286                 destroy();
1287             }
1288         }
1289 
1290         @Override
1291         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1292             ctx.fireChannelActive();
1293 
1294             readIfIsAutoRead();
1295         }
1296 
1297         @Override
1298         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1299             ctx.fireChannelInactive();
1300         }
1301 
1302         @Override
1303         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1304             ctx.fireChannelRead(msg);
1305         }
1306 
1307         @Override
1308         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1309             ctx.fireChannelReadComplete();
1310 
1311             readIfIsAutoRead();
1312         }
1313 
1314         private void readIfIsAutoRead() {
1315             if (channel.config().isAutoRead()) {
1316                 channel.read();
1317             }
1318         }
1319 
1320         @Override
1321         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1322             ctx.fireUserEventTriggered(evt);
1323         }
1324 
1325         @Override
1326         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1327             ctx.fireChannelWritabilityChanged();
1328         }
1329     }
1330 
1331     private abstract static class PendingHandlerCallback implements Runnable {
1332         final AbstractChannelHandlerContext ctx;
1333         PendingHandlerCallback next;
1334 
1335         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1336             this.ctx = ctx;
1337         }
1338 
1339         abstract void execute();
1340     }
1341 
1342     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1343 
1344         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1345             super(ctx);
1346         }
1347 
1348         @Override
1349         public void run() {
1350             callHandlerAdded0(ctx);
1351         }
1352 
1353         @Override
1354         void execute() {
1355             EventExecutor executor = ctx.executor();
1356             if (executor.inEventLoop()) {
1357                 callHandlerAdded0(ctx);
1358             } else {
1359                 try {
1360                     executor.execute(this);
1361                 } catch (RejectedExecutionException e) {
1362                     if (logger.isWarnEnabled()) {
1363                         logger.warn(
1364                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1365                                 executor, ctx.name(), e);
1366                     }
1367                     remove0(ctx);
1368                     ctx.setRemoved();
1369                 }
1370             }
1371         }
1372     }
1373 
1374     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1375 
1376         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1377             super(ctx);
1378         }
1379 
1380         @Override
1381         public void run() {
1382             callHandlerRemoved0(ctx);
1383         }
1384 
1385         @Override
1386         void execute() {
1387             EventExecutor executor = ctx.executor();
1388             if (executor.inEventLoop()) {
1389                 callHandlerRemoved0(ctx);
1390             } else {
1391                 try {
1392                     executor.execute(this);
1393                 } catch (RejectedExecutionException e) {
1394                     if (logger.isWarnEnabled()) {
1395                         logger.warn(
1396                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1397                                         " removing handler {}.", executor, ctx.name(), e);
1398                     }
1399                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1400                     ctx.setRemoved();
1401                 }
1402             }
1403         }
1404     }
1405 }