查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.embedder;
17  
18  import org.jboss.netty.buffer.ChannelBufferFactory;
19  import org.jboss.netty.channel.Channel;
20  import org.jboss.netty.channel.ChannelEvent;
21  import org.jboss.netty.channel.ChannelFuture;
22  import org.jboss.netty.channel.ChannelHandler;
23  import org.jboss.netty.channel.ChannelHandlerContext;
24  import org.jboss.netty.channel.ChannelPipeline;
25  import org.jboss.netty.channel.ChannelPipelineException;
26  import org.jboss.netty.channel.ChannelSink;
27  import org.jboss.netty.channel.ChannelUpstreamHandler;
28  import org.jboss.netty.channel.DefaultChannelPipeline;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  
32  import java.lang.reflect.Array;
33  import java.util.ConcurrentModificationException;
34  import java.util.LinkedList;
35  import java.util.Queue;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * A skeletal {@link CodecEmbedder} implementation.
41   */
42  abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
43  
44      private final Channel channel;
45      private final ChannelPipeline pipeline;
46      private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
47  
48      final Queue<Object> productQueue = new LinkedList<Object>();
49  
50      /**
51       * Creates a new embedder whose pipeline is composed of the specified
52       * handlers.
53       */
54      protected AbstractCodecEmbedder(ChannelHandler... handlers) {
55          pipeline = new EmbeddedChannelPipeline();
56          configurePipeline(handlers);
57          channel = new EmbeddedChannel(pipeline, sink);
58          fireInitialEvents();
59      }
60  
61      /**
62       * Creates a new embedder whose pipeline is composed of the specified
63       * handlers.
64       *
65       * @param bufferFactory the {@link ChannelBufferFactory} to be used when
66       *                      creating a new buffer.
67       */
68      protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
69          this(handlers);
70          getChannel().getConfig().setBufferFactory(bufferFactory);
71      }
72  
73      private void fireInitialEvents() {
74          // Fire the typical initial events.
75          fireChannelOpen(channel);
76          fireChannelBound(channel, channel.getLocalAddress());
77          fireChannelConnected(channel, channel.getRemoteAddress());
78      }
79  
80      private void configurePipeline(ChannelHandler... handlers) {
81          if (handlers == null) {
82              throw new NullPointerException("handlers");
83          }
84  
85          if (handlers.length == 0) {
86              throw new IllegalArgumentException(
87                      "handlers should contain at least one " +
88                      ChannelHandler.class.getSimpleName() + '.');
89          }
90  
91          for (int i = 0; i < handlers.length; i ++) {
92              ChannelHandler h = handlers[i];
93              if (h == null) {
94                  throw new NullPointerException("handlers[" + i + ']');
95              }
96              pipeline.addLast(String.valueOf(i), handlers[i]);
97          }
98          pipeline.addLast("SINK", sink);
99      }
100 
101     public boolean finish() {
102         close(channel);
103         fireChannelDisconnected(channel);
104         fireChannelUnbound(channel);
105         fireChannelClosed(channel);
106         return !productQueue.isEmpty();
107     }
108 
109     /**
110      * Returns the virtual {@link Channel} which will be used as a mock
111      * during encoding and decoding.
112      */
113     protected final Channel getChannel() {
114         return channel;
115     }
116 
117     /**
118      * Returns {@code true} if and only if the produce queue is empty and
119      * therefore {@link #poll()} will return {@code null}.
120      */
121     protected final boolean isEmpty() {
122         return productQueue.isEmpty();
123     }
124 
125     @SuppressWarnings("unchecked")
126     public final E poll() {
127         return (E) productQueue.poll();
128     }
129 
130     @SuppressWarnings("unchecked")
131     public final E peek() {
132         return (E) productQueue.peek();
133     }
134 
135     public final Object[] pollAll() {
136         final int size = size();
137         Object[] a = new Object[size];
138         for (int i = 0; i < size; i ++) {
139             E product = poll();
140             if (product == null) {
141                 throw new ConcurrentModificationException();
142             }
143             a[i] = product;
144         }
145         return a;
146     }
147 
148     @SuppressWarnings("unchecked")
149     public final <T> T[] pollAll(T[] a) {
150         if (a == null) {
151             throw new NullPointerException("a");
152         }
153 
154         final int size = size();
155 
156         // Create a new array if the specified one is too small.
157         if (a.length < size) {
158             a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
159         }
160 
161         for (int i = 0;; i ++) {
162             T product = (T) poll();
163             if (product == null) {
164                 break;
165             }
166             a[i] = product;
167         }
168 
169         // Put the terminator if necessary.
170         if (a.length > size) {
171             a[size] = null;
172         }
173 
174         return a;
175     }
176 
177     public final int size() {
178         return productQueue.size();
179     }
180 
181     public ChannelPipeline getPipeline() {
182         return pipeline;
183     }
184 
185     private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
186         EmbeddedChannelSink() {
187         }
188 
189         public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
190             handleEvent(e);
191         }
192 
193         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
194             handleEvent(e);
195         }
196 
197         private void handleEvent(ChannelEvent e) {
198             if (e instanceof MessageEvent) {
199                 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
200                 assert offered;
201             } else if (e instanceof ExceptionEvent) {
202                 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
203             }
204 
205             // Swallow otherwise.
206         }
207 
208         public void exceptionCaught(
209                 ChannelPipeline pipeline, ChannelEvent e,
210                 ChannelPipelineException cause) throws Exception {
211             Throwable actualCause = cause.getCause();
212             if (actualCause == null) {
213                 actualCause = cause;
214             }
215 
216             throw new CodecEmbedderException(actualCause);
217         }
218 
219         public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
220             try {
221                 task.run();
222                 return succeededFuture(pipeline.getChannel());
223             } catch (Throwable t) {
224                 return failedFuture(pipeline.getChannel(), t);
225             }
226         }
227     }
228 
229     private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
230 
231         EmbeddedChannelPipeline() {
232         }
233 
234         @Override
235         protected void notifyHandlerException(ChannelEvent e, Throwable t) {
236             while (t instanceof ChannelPipelineException && t.getCause() != null) {
237                 t = t.getCause();
238             }
239             if (t instanceof CodecEmbedderException) {
240                 throw (CodecEmbedderException) t;
241             } else {
242                 throw new CodecEmbedderException(t);
243             }
244         }
245     }
246 }