查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NoSuchElementException;
27  import java.util.concurrent.RejectedExecutionException;
28  
29  /**
30   * The default {@link ChannelPipeline} implementation.  It is recommended
31   * to use {@link Channels#pipeline()} to create a new {@link ChannelPipeline}
32   * instance rather than calling the constructor directly.
33   */
34  public class DefaultChannelPipeline implements ChannelPipeline {
35  
36      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
37      static final ChannelSink discardingSink = new DiscardingChannelSink();
38  
39      private volatile Channel channel;
40      private volatile ChannelSink sink;
41      private volatile DefaultChannelHandlerContext head;
42      private volatile DefaultChannelHandlerContext tail;
43      private final Map<String, DefaultChannelHandlerContext> name2ctx =
44          new HashMap<String, DefaultChannelHandlerContext>(4);
45  
46      public Channel getChannel() {
47          return channel;
48      }
49  
50      public ChannelSink getSink() {
51          ChannelSink sink = this.sink;
52          if (sink == null) {
53              return discardingSink;
54          }
55          return sink;
56      }
57  
58      public void attach(Channel channel, ChannelSink sink) {
59          if (channel == null) {
60              throw new NullPointerException("channel");
61          }
62          if (sink == null) {
63              throw new NullPointerException("sink");
64          }
65          if (this.channel != null || this.sink != null) {
66              throw new IllegalStateException("attached already");
67          }
68          this.channel = channel;
69          this.sink = sink;
70      }
71  
72      public boolean isAttached() {
73          return sink != null;
74      }
75  
76      public synchronized void addFirst(String name, ChannelHandler handler) {
77          if (name2ctx.isEmpty()) {
78              init(name, handler);
79          } else {
80              checkDuplicateName(name);
81              DefaultChannelHandlerContext oldHead = head;
82              DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
83  
84              callBeforeAdd(newHead);
85  
86              oldHead.prev = newHead;
87              head = newHead;
88              name2ctx.put(name, newHead);
89  
90              callAfterAdd(newHead);
91          }
92      }
93  
94      public synchronized void addLast(String name, ChannelHandler handler) {
95          if (name2ctx.isEmpty()) {
96              init(name, handler);
97          } else {
98              checkDuplicateName(name);
99              DefaultChannelHandlerContext oldTail = tail;
100             DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
101 
102             callBeforeAdd(newTail);
103 
104             oldTail.next = newTail;
105             tail = newTail;
106             name2ctx.put(name, newTail);
107 
108             callAfterAdd(newTail);
109         }
110     }
111 
112     public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
113         DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
114         if (ctx == head) {
115             addFirst(name, handler);
116         } else {
117             checkDuplicateName(name);
118             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
119 
120             callBeforeAdd(newCtx);
121 
122             ctx.prev.next = newCtx;
123             ctx.prev = newCtx;
124             name2ctx.put(name, newCtx);
125 
126             callAfterAdd(newCtx);
127         }
128     }
129 
130     public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
131         DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
132         if (ctx == tail) {
133             addLast(name, handler);
134         } else {
135             checkDuplicateName(name);
136             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
137 
138             callBeforeAdd(newCtx);
139 
140             ctx.next.prev = newCtx;
141             ctx.next = newCtx;
142             name2ctx.put(name, newCtx);
143 
144             callAfterAdd(newCtx);
145         }
146     }
147 
148     public synchronized void remove(ChannelHandler handler) {
149         remove(getContextOrDie(handler));
150     }
151 
152     public synchronized ChannelHandler remove(String name) {
153         return remove(getContextOrDie(name)).getHandler();
154     }
155 
156     @SuppressWarnings("unchecked")
157     public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
158         return (T) remove(getContextOrDie(handlerType)).getHandler();
159     }
160 
161     private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
162         if (head == tail) {
163             callBeforeRemove(ctx);
164 
165             head = tail = null;
166             name2ctx.clear();
167 
168             callAfterRemove(ctx);
169         } else if (ctx == head) {
170             removeFirst();
171         } else if (ctx == tail) {
172             removeLast();
173         } else {
174             callBeforeRemove(ctx);
175 
176             DefaultChannelHandlerContext prev = ctx.prev;
177             DefaultChannelHandlerContext next = ctx.next;
178             prev.next = next;
179             next.prev = prev;
180             name2ctx.remove(ctx.getName());
181 
182             callAfterRemove(ctx);
183         }
184         return ctx;
185     }
186 
187     public synchronized ChannelHandler removeFirst() {
188         if (name2ctx.isEmpty()) {
189             throw new NoSuchElementException();
190         }
191 
192         DefaultChannelHandlerContext oldHead = head;
193         if (oldHead == null) {
194             throw new NoSuchElementException();
195         }
196 
197         callBeforeRemove(oldHead);
198 
199         if (oldHead.next == null) {
200             head = tail = null;
201             name2ctx.clear();
202         } else {
203             oldHead.next.prev = null;
204             head = oldHead.next;
205             name2ctx.remove(oldHead.getName());
206         }
207 
208         callAfterRemove(oldHead);
209 
210         return oldHead.getHandler();
211     }
212 
213     public synchronized ChannelHandler removeLast() {
214         if (name2ctx.isEmpty()) {
215             throw new NoSuchElementException();
216         }
217 
218         DefaultChannelHandlerContext oldTail = tail;
219         if (oldTail == null) {
220             throw new NoSuchElementException();
221         }
222 
223         callBeforeRemove(oldTail);
224 
225         if (oldTail.prev == null) {
226             head = tail = null;
227             name2ctx.clear();
228         } else {
229             oldTail.prev.next = null;
230             tail = oldTail.prev;
231             name2ctx.remove(oldTail.getName());
232         }
233 
234         callAfterRemove(oldTail);
235 
236         return oldTail.getHandler();
237     }
238 
239     public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
240         replace(getContextOrDie(oldHandler), newName, newHandler);
241     }
242 
243     public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
244         return replace(getContextOrDie(oldName), newName, newHandler);
245     }
246 
247     @SuppressWarnings("unchecked")
248     public synchronized <T extends ChannelHandler> T replace(
249             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
250         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
251     }
252 
253     private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
254         if (ctx == head) {
255             removeFirst();
256             addFirst(newName, newHandler);
257         } else if (ctx == tail) {
258             removeLast();
259             addLast(newName, newHandler);
260         } else {
261             boolean sameName = ctx.getName().equals(newName);
262             if (!sameName) {
263                 checkDuplicateName(newName);
264             }
265 
266             DefaultChannelHandlerContext prev = ctx.prev;
267             DefaultChannelHandlerContext next = ctx.next;
268             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
269 
270             callBeforeRemove(ctx);
271             callBeforeAdd(newCtx);
272 
273             prev.next = newCtx;
274             next.prev = newCtx;
275 
276             if (!sameName) {
277                 name2ctx.remove(ctx.getName());
278             }
279             name2ctx.put(newName, newCtx);
280 
281             ChannelHandlerLifeCycleException removeException = null;
282             ChannelHandlerLifeCycleException addException = null;
283             boolean removed = false;
284             try {
285                 callAfterRemove(ctx);
286                 removed = true;
287             } catch (ChannelHandlerLifeCycleException e) {
288                 removeException = e;
289             }
290 
291             boolean added = false;
292             try {
293                 callAfterAdd(newCtx);
294                 added = true;
295             } catch (ChannelHandlerLifeCycleException e) {
296                 addException = e;
297             }
298 
299             if (!removed && !added) {
300                 logger.warn(removeException.getMessage(), removeException);
301                 logger.warn(addException.getMessage(), addException);
302                 throw new ChannelHandlerLifeCycleException(
303                         "Both " + ctx.getHandler().getClass().getName() +
304                         ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
305                         ".afterAdd() failed; see logs.");
306             } else if (!removed) {
307                 throw removeException;
308             } else if (!added) {
309                 throw addException;
310             }
311         }
312 
313         return ctx.getHandler();
314     }
315 
316     private static void callBeforeAdd(ChannelHandlerContext ctx) {
317         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
318             return;
319         }
320 
321         LifeCycleAwareChannelHandler h =
322             (LifeCycleAwareChannelHandler) ctx.getHandler();
323 
324         try {
325             h.beforeAdd(ctx);
326         } catch (Throwable t) {
327             throw new ChannelHandlerLifeCycleException(
328                     h.getClass().getName() +
329                     ".beforeAdd() has thrown an exception; not adding.", t);
330         }
331     }
332 
333     private void callAfterAdd(ChannelHandlerContext ctx) {
334         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
335             return;
336         }
337 
338         LifeCycleAwareChannelHandler h =
339             (LifeCycleAwareChannelHandler) ctx.getHandler();
340 
341         try {
342             h.afterAdd(ctx);
343         } catch (Throwable t) {
344             boolean removed = false;
345             try {
346                 remove((DefaultChannelHandlerContext) ctx);
347                 removed = true;
348             } catch (Throwable t2) {
349                 if (logger.isWarnEnabled()) {
350                     logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
351                 }
352             }
353 
354             if (removed) {
355                 throw new ChannelHandlerLifeCycleException(
356                         h.getClass().getName() +
357                         ".afterAdd() has thrown an exception; removed.", t);
358             } else {
359                 throw new ChannelHandlerLifeCycleException(
360                         h.getClass().getName() +
361                         ".afterAdd() has thrown an exception; also failed to remove.", t);
362             }
363         }
364     }
365 
366     private static void callBeforeRemove(ChannelHandlerContext ctx) {
367         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
368             return;
369         }
370 
371         LifeCycleAwareChannelHandler h =
372             (LifeCycleAwareChannelHandler) ctx.getHandler();
373 
374         try {
375             h.beforeRemove(ctx);
376         } catch (Throwable t) {
377             throw new ChannelHandlerLifeCycleException(
378                     h.getClass().getName() +
379                     ".beforeRemove() has thrown an exception; not removing.", t);
380         }
381     }
382 
383     private static void callAfterRemove(ChannelHandlerContext ctx) {
384         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
385             return;
386         }
387 
388         LifeCycleAwareChannelHandler h =
389             (LifeCycleAwareChannelHandler) ctx.getHandler();
390 
391         try {
392             h.afterRemove(ctx);
393         } catch (Throwable t) {
394             throw new ChannelHandlerLifeCycleException(
395                     h.getClass().getName() +
396                     ".afterRemove() has thrown an exception.", t);
397         }
398     }
399 
400     public synchronized ChannelHandler getFirst() {
401         DefaultChannelHandlerContext head = this.head;
402         if (head == null) {
403             return null;
404         }
405         return head.getHandler();
406     }
407 
408     public synchronized ChannelHandler getLast() {
409         DefaultChannelHandlerContext tail = this.tail;
410         if (tail == null) {
411             return null;
412         }
413         return tail.getHandler();
414     }
415 
416     public synchronized ChannelHandler get(String name) {
417         DefaultChannelHandlerContext ctx = name2ctx.get(name);
418         if (ctx == null) {
419             return null;
420         } else {
421             return ctx.getHandler();
422         }
423     }
424 
425     public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
426         ChannelHandlerContext ctx = getContext(handlerType);
427         if (ctx == null) {
428             return null;
429         } else {
430             @SuppressWarnings("unchecked")
431             T handler = (T) ctx.getHandler();
432             return handler;
433         }
434     }
435 
436     public synchronized ChannelHandlerContext getContext(String name) {
437         if (name == null) {
438             throw new NullPointerException("name");
439         }
440         return name2ctx.get(name);
441     }
442 
443     public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
444         if (handler == null) {
445             throw new NullPointerException("handler");
446         }
447         if (name2ctx.isEmpty()) {
448             return null;
449         }
450         DefaultChannelHandlerContext ctx = head;
451         for (;;) {
452             if (ctx.getHandler() == handler) {
453                 return ctx;
454             }
455 
456             ctx = ctx.next;
457             if (ctx == null) {
458                 break;
459             }
460         }
461         return null;
462     }
463 
464     public synchronized ChannelHandlerContext getContext(
465             Class<? extends ChannelHandler> handlerType) {
466         if (handlerType == null) {
467             throw new NullPointerException("handlerType");
468         }
469 
470         if (name2ctx.isEmpty()) {
471             return null;
472         }
473         DefaultChannelHandlerContext ctx = head;
474         for (;;) {
475             if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
476                 return ctx;
477             }
478 
479             ctx = ctx.next;
480             if (ctx == null) {
481                 break;
482             }
483         }
484         return null;
485     }
486 
487     public List<String> getNames() {
488         List<String> list = new ArrayList<String>();
489         if (name2ctx.isEmpty()) {
490             return list;
491         }
492 
493         DefaultChannelHandlerContext ctx = head;
494         for (;;) {
495             list.add(ctx.getName());
496             ctx = ctx.next;
497             if (ctx == null) {
498                 break;
499             }
500         }
501         return list;
502     }
503 
504     public Map<String, ChannelHandler> toMap() {
505         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
506         if (name2ctx.isEmpty()) {
507             return map;
508         }
509 
510         DefaultChannelHandlerContext ctx = head;
511         for (;;) {
512             map.put(ctx.getName(), ctx.getHandler());
513             ctx = ctx.next;
514             if (ctx == null) {
515                 break;
516             }
517         }
518         return map;
519     }
520 
521     /**
522      * Returns the {@link String} representation of this pipeline.
523      */
524     @Override
525     public String toString() {
526         StringBuilder buf = new StringBuilder();
527         buf.append(getClass().getSimpleName());
528         buf.append('{');
529         DefaultChannelHandlerContext ctx = head;
530         if (ctx != null) {
531             for (;;) {
532                 buf.append('(');
533                 buf.append(ctx.getName());
534                 buf.append(" = ");
535                 buf.append(ctx.getHandler().getClass().getName());
536                 buf.append(')');
537                 ctx = ctx.next;
538                 if (ctx == null) {
539                     break;
540                 }
541                 buf.append(", ");
542             }
543         }
544         buf.append('}');
545         return buf.toString();
546     }
547 
548     public void sendUpstream(ChannelEvent e) {
549         DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
550         if (head == null) {
551             if (logger.isWarnEnabled()) {
552                 logger.warn(
553                         "The pipeline contains no upstream handlers; discarding: " + e);
554             }
555 
556             return;
557         }
558 
559         sendUpstream(head, e);
560     }
561 
562     void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
563         try {
564             ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
565         } catch (Throwable t) {
566             notifyHandlerException(e, t);
567         }
568     }
569 
570     public void sendDownstream(ChannelEvent e) {
571         DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
572         if (tail == null) {
573             try {
574                 getSink().eventSunk(this, e);
575                 return;
576             } catch (Throwable t) {
577                 notifyHandlerException(e, t);
578                 return;
579             }
580         }
581 
582         sendDownstream(tail, e);
583     }
584 
585     void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
586         if (e instanceof UpstreamMessageEvent) {
587             throw new IllegalArgumentException("cannot send an upstream event to downstream");
588         }
589 
590         try {
591             ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
592         } catch (Throwable t) {
593             // Unlike an upstream event, a downstream event usually has an
594             // incomplete future which is supposed to be updated by ChannelSink.
595             // However, if an exception is raised before the event reaches at
596             // ChannelSink, the future is not going to be updated, so we update
597             // here.
598             e.getFuture().setFailure(t);
599             notifyHandlerException(e, t);
600         }
601     }
602 
603     private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
604         if (ctx == null) {
605             return null;
606         }
607 
608         DefaultChannelHandlerContext realCtx = ctx;
609         while (!realCtx.canHandleUpstream()) {
610             realCtx = realCtx.next;
611             if (realCtx == null) {
612                 return null;
613             }
614         }
615 
616         return realCtx;
617     }
618 
619     private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
620         if (ctx == null) {
621             return null;
622         }
623 
624         DefaultChannelHandlerContext realCtx = ctx;
625         while (!realCtx.canHandleDownstream()) {
626             realCtx = realCtx.prev;
627             if (realCtx == null) {
628                 return null;
629             }
630         }
631 
632         return realCtx;
633     }
634 
635     public ChannelFuture execute(Runnable task) {
636         return getSink().execute(this, task);
637     }
638 
639     protected void notifyHandlerException(ChannelEvent e, Throwable t) {
640         if (e instanceof ExceptionEvent) {
641             if (logger.isWarnEnabled()) {
642                 logger.warn(
643                         "An exception was thrown by a user handler " +
644                         "while handling an exception event (" + e + ')', t);
645             }
646 
647             return;
648         }
649 
650         ChannelPipelineException pe;
651         if (t instanceof ChannelPipelineException) {
652             pe = (ChannelPipelineException) t;
653         } else {
654             pe = new ChannelPipelineException(t);
655         }
656 
657         try {
658             sink.exceptionCaught(this, e, pe);
659         } catch (Exception e1) {
660             if (logger.isWarnEnabled()) {
661                 logger.warn("An exception was thrown by an exception handler.", e1);
662             }
663         }
664     }
665 
666     private void init(String name, ChannelHandler handler) {
667         DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
668         callBeforeAdd(ctx);
669         head = tail = ctx;
670         name2ctx.clear();
671         name2ctx.put(name, ctx);
672         callAfterAdd(ctx);
673     }
674 
675     private void checkDuplicateName(String name) {
676         if (name2ctx.containsKey(name)) {
677             throw new IllegalArgumentException("Duplicate handler name: " + name);
678         }
679     }
680 
681     private DefaultChannelHandlerContext getContextOrDie(String name) {
682         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
683         if (ctx == null) {
684             throw new NoSuchElementException(name);
685         } else {
686             return ctx;
687         }
688     }
689 
690     private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
691         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
692         if (ctx == null) {
693             throw new NoSuchElementException(handler.getClass().getName());
694         } else {
695             return ctx;
696         }
697     }
698 
699     private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
700         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
701         if (ctx == null) {
702             throw new NoSuchElementException(handlerType.getName());
703         } else {
704             return ctx;
705         }
706     }
707 
708     private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
709         volatile DefaultChannelHandlerContext next;
710         volatile DefaultChannelHandlerContext prev;
711         private final String name;
712         private final ChannelHandler handler;
713         private final boolean canHandleUpstream;
714         private final boolean canHandleDownstream;
715         private volatile Object attachment;
716 
717         DefaultChannelHandlerContext(
718                 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
719                 String name, ChannelHandler handler) {
720 
721             if (name == null) {
722                 throw new NullPointerException("name");
723             }
724             if (handler == null) {
725                 throw new NullPointerException("handler");
726             }
727             canHandleUpstream = handler instanceof ChannelUpstreamHandler;
728             canHandleDownstream = handler instanceof ChannelDownstreamHandler;
729 
730             if (!canHandleUpstream && !canHandleDownstream) {
731                 throw new IllegalArgumentException(
732                         "handler must be either " +
733                         ChannelUpstreamHandler.class.getName() + " or " +
734                         ChannelDownstreamHandler.class.getName() + '.');
735             }
736 
737             this.prev = prev;
738             this.next = next;
739             this.name = name;
740             this.handler = handler;
741         }
742 
743         public Channel getChannel() {
744             return getPipeline().getChannel();
745         }
746 
747         public ChannelPipeline getPipeline() {
748             return DefaultChannelPipeline.this;
749         }
750 
751         public boolean canHandleDownstream() {
752             return canHandleDownstream;
753         }
754 
755         public boolean canHandleUpstream() {
756             return canHandleUpstream;
757         }
758 
759         public ChannelHandler getHandler() {
760             return handler;
761         }
762 
763         public String getName() {
764             return name;
765         }
766 
767         public Object getAttachment() {
768             return attachment;
769         }
770 
771         public void setAttachment(Object attachment) {
772             this.attachment = attachment;
773         }
774 
775         public void sendDownstream(ChannelEvent e) {
776             DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
777             if (prev == null) {
778                 try {
779                     getSink().eventSunk(DefaultChannelPipeline.this, e);
780                 } catch (Throwable t) {
781                     notifyHandlerException(e, t);
782                 }
783             } else {
784                 DefaultChannelPipeline.this.sendDownstream(prev, e);
785             }
786         }
787 
788         public void sendUpstream(ChannelEvent e) {
789             DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
790             if (next != null) {
791                 DefaultChannelPipeline.this.sendUpstream(next, e);
792             }
793         }
794     }
795 
796     private static final class DiscardingChannelSink implements ChannelSink {
797         DiscardingChannelSink() {
798         }
799 
800         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
801             if (logger.isWarnEnabled()) {
802                 logger.warn("Not attached yet; discarding: " + e);
803             }
804         }
805 
806         public void exceptionCaught(ChannelPipeline pipeline,
807                 ChannelEvent e, ChannelPipelineException cause) throws Exception {
808             throw cause;
809         }
810 
811         public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
812             if (logger.isWarnEnabled()) {
813                 logger.warn("Not attached yet; rejecting: " + task);
814             }
815             return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
816         }
817     }
818 }