查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import org.jboss.netty.util.internal.ConcurrentHashMap;
19  
20  import java.net.SocketAddress;
21  import java.util.Random;
22  import java.util.concurrent.ConcurrentMap;
23  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24  
25  /**
26   * A skeletal {@link Channel} implementation.
27   */
28  public abstract class AbstractChannel implements Channel {
29  
30      static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();
31  
32      private static final Random random = new Random();
33  
34      private static Integer allocateId(Channel channel) {
35          Integer id = random.nextInt();
36          for (;;) {
37              // Loop until a unique ID is acquired.
38              // It should be found in one loop practically.
39              if (allChannels.putIfAbsent(id, channel) == null) {
40                  // Successfully acquired.
41                  return id;
42              } else {
43                  // Taken by other channel at almost the same moment.
44                  id = id.intValue() + 1;
45              }
46          }
47      }
48  
49      private final Integer id;
50      private final Channel parent;
51      private final ChannelFactory factory;
52      private final ChannelPipeline pipeline;
53      private final ChannelFuture succeededFuture = new SucceededChannelFuture(this);
54      private final ChannelCloseFuture closeFuture = new ChannelCloseFuture();
55      private volatile int interestOps = OP_READ;
56  
57      /** Cache for the string representation of this channel */
58      private boolean strValConnected;
59      private String strVal;
60      private volatile Object attachment;
61  
62      private static final AtomicIntegerFieldUpdater<AbstractChannel> UNWRITABLE_UPDATER;
63      @SuppressWarnings("UnusedDeclaration")
64      private volatile int unwritable;
65  
66      static {
67          UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "unwritable");
68      }
69  
70      /**
71       * Creates a new instance.
72       *
73       * @param parent
74       *        the parent of this channel. {@code null} if there's no parent.
75       * @param factory
76       *        the factory which created this channel
77       * @param pipeline
78       *        the pipeline which is going to be attached to this channel
79       * @param sink
80       *        the sink which will receive downstream events from the pipeline
81       *        and send upstream events to the pipeline
82       */
83      protected AbstractChannel(
84              Channel parent, ChannelFactory factory,
85              ChannelPipeline pipeline, ChannelSink sink) {
86  
87          this.parent = parent;
88          this.factory = factory;
89          this.pipeline = pipeline;
90  
91          id = allocateId(this);
92  
93          pipeline.attach(this, sink);
94      }
95  
96      /**
97       * (Internal use only) Creates a new temporary instance with the specified
98       * ID.
99       *
100      * @param parent
101      *        the parent of this channel. {@code null} if there's no parent.
102      * @param factory
103      *        the factory which created this channel
104      * @param pipeline
105      *        the pipeline which is going to be attached to this channel
106      * @param sink
107      *        the sink which will receive downstream events from the pipeline
108      *        and send upstream events to the pipeline
109      */
110     protected AbstractChannel(
111             Integer id,
112             Channel parent, ChannelFactory factory,
113             ChannelPipeline pipeline, ChannelSink sink) {
114 
115         this.id = id;
116         this.parent = parent;
117         this.factory = factory;
118         this.pipeline = pipeline;
119         pipeline.attach(this, sink);
120     }
121 
122     public final Integer getId() {
123         return id;
124     }
125 
126     public Channel getParent() {
127         return parent;
128     }
129 
130     public ChannelFactory getFactory() {
131         return factory;
132     }
133 
134     public ChannelPipeline getPipeline() {
135         return pipeline;
136     }
137 
138     /**
139      * Returns the cached {@link SucceededChannelFuture} instance.
140      */
141     protected ChannelFuture getSucceededFuture() {
142         return succeededFuture;
143     }
144 
145     /**
146      * Returns the {@link FailedChannelFuture} whose cause is an
147      * {@link UnsupportedOperationException}.
148      */
149     protected ChannelFuture getUnsupportedOperationFuture() {
150         return new FailedChannelFuture(this, new UnsupportedOperationException());
151     }
152 
153     /**
154      * Returns the ID of this channel.
155      */
156     @Override
157     public final int hashCode() {
158         return id;
159     }
160 
161     /**
162      * Returns {@code true} if and only if the specified object is identical
163      * with this channel (i.e: {@code this == o}).
164      */
165     @Override
166     public final boolean equals(Object o) {
167         return this == o;
168     }
169 
170     /**
171      * Compares the {@linkplain #getId() ID} of the two channels.
172      */
173     public final int compareTo(Channel o) {
174         return getId().compareTo(o.getId());
175     }
176 
177     public boolean isOpen() {
178         return !closeFuture.isDone();
179     }
180 
181     /**
182      * Marks this channel as closed.  This method is intended to be called by
183      * an internal component - please do not call it unless you know what you
184      * are doing.
185      *
186      * @return {@code true} if and only if this channel was not marked as
187      *                      closed yet
188      */
189     protected boolean setClosed() {
190         // Deallocate the current channel's ID from allChannels so that other
191         // new channels can use it.
192         allChannels.remove(id);
193 
194         return closeFuture.setClosed();
195     }
196 
197     public ChannelFuture bind(SocketAddress localAddress) {
198         return Channels.bind(this, localAddress);
199     }
200 
201     public ChannelFuture unbind() {
202         return Channels.unbind(this);
203     }
204 
205     public ChannelFuture close() {
206         ChannelFuture returnedCloseFuture = Channels.close(this);
207         assert closeFuture == returnedCloseFuture;
208         return closeFuture;
209     }
210 
211     public ChannelFuture getCloseFuture() {
212         return closeFuture;
213     }
214 
215     public ChannelFuture connect(SocketAddress remoteAddress) {
216         return Channels.connect(this, remoteAddress);
217     }
218 
219     public ChannelFuture disconnect() {
220         return Channels.disconnect(this);
221     }
222 
223     public int getInterestOps() {
224         if (!isOpen()) {
225             return Channel.OP_WRITE;
226         }
227 
228         int interestOps = getInternalInterestOps() & ~OP_WRITE;
229         if (!isWritable()) {
230             interestOps |= OP_WRITE;
231         }
232         return interestOps;
233     }
234 
235     public ChannelFuture setInterestOps(int interestOps) {
236         return Channels.setInterestOps(this, interestOps);
237     }
238 
239     protected int getInternalInterestOps() {
240         return interestOps;
241     }
242 
243     /**
244      * Sets the {@link #getInterestOps() interestOps} property of this channel
245      * immediately.  This method is intended to be called by an internal
246      * component - please do not call it unless you know what you are doing.
247      */
248     protected void setInternalInterestOps(int interestOps) {
249         this.interestOps = interestOps;
250     }
251 
252     public boolean isReadable() {
253         return (getInternalInterestOps() & OP_READ) != 0;
254     }
255 
256     public boolean isWritable() {
257         return unwritable == 0;
258     }
259 
260     public final boolean getUserDefinedWritability(int index) {
261         return (unwritable & writabilityMask(index)) == 0;
262     }
263 
264     public final void setUserDefinedWritability(int index, boolean writable) {
265         if (writable) {
266             setUserDefinedWritability(index);
267         } else {
268             clearUserDefinedWritability(index);
269         }
270     }
271 
272     private void setUserDefinedWritability(int index) {
273         final int mask = ~writabilityMask(index);
274         for (;;) {
275             final int oldValue = unwritable;
276             final int newValue = oldValue & mask;
277             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
278                 if (oldValue != 0 && newValue == 0) {
279                     getPipeline().sendUpstream(
280                             new UpstreamChannelStateEvent(
281                                     this, ChannelState.INTEREST_OPS, getInterestOps()));
282                 }
283                 break;
284             }
285         }
286     }
287 
288     private void clearUserDefinedWritability(int index) {
289         final int mask = writabilityMask(index);
290         for (;;) {
291             final int oldValue = unwritable;
292             final int newValue = oldValue | mask;
293             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
294                 if (oldValue == 0 && newValue != 0) {
295                     getPipeline().sendUpstream(
296                             new UpstreamChannelStateEvent(
297                                     this, ChannelState.INTEREST_OPS, getInterestOps()));
298                 }
299                 break;
300             }
301         }
302     }
303 
304     private static int writabilityMask(int index) {
305         if (index < 1 || index > 31) {
306             throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
307         }
308         return 1 << index;
309     }
310 
311     protected boolean setWritable() {
312         for (;;) {
313             final int oldValue = unwritable;
314             final int newValue = oldValue & ~1;
315             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
316                 if (oldValue != 0 && newValue == 0) {
317                     return true;
318                 }
319                 break;
320             }
321         }
322         return false;
323     }
324 
325     protected boolean setUnwritable() {
326         for (;;) {
327             final int oldValue = unwritable;
328             final int newValue = oldValue | 1;
329             if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
330                 if (oldValue == 0 && newValue != 0) {
331                     return true;
332                 }
333                 break;
334             }
335         }
336         return false;
337     }
338 
339     public ChannelFuture setReadable(boolean readable) {
340         if (readable) {
341             return setInterestOps(getInterestOps() | OP_READ);
342         } else {
343             return setInterestOps(getInterestOps() & ~OP_READ);
344         }
345     }
346 
347     public ChannelFuture write(Object message) {
348         return Channels.write(this, message);
349     }
350 
351     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
352         return Channels.write(this, message, remoteAddress);
353     }
354 
355     public Object getAttachment() {
356         return attachment;
357     }
358 
359     public void setAttachment(Object attachment) {
360         this.attachment = attachment;
361     }
362     /**
363      * Returns the {@link String} representation of this channel.  The returned
364      * string contains the {@linkplain #getId() ID}, {@linkplain #getLocalAddress() local address},
365      * and {@linkplain #getRemoteAddress() remote address} of this channel for
366      * easier identification.
367      */
368     @Override
369     public String toString() {
370         boolean connected = isConnected();
371         if (strValConnected == connected && strVal != null) {
372             return strVal;
373         }
374 
375         StringBuilder buf = new StringBuilder(128);
376         buf.append("[id: 0x");
377         buf.append(getIdString());
378 
379         SocketAddress localAddress = getLocalAddress();
380         SocketAddress remoteAddress = getRemoteAddress();
381         if (remoteAddress != null) {
382             buf.append(", ");
383             if (getParent() == null) {
384                 buf.append(localAddress);
385                 buf.append(connected? " => " : " :> ");
386                 buf.append(remoteAddress);
387             } else {
388                 buf.append(remoteAddress);
389                 buf.append(connected? " => " : " :> ");
390                 buf.append(localAddress);
391             }
392         } else if (localAddress != null) {
393             buf.append(", ");
394             buf.append(localAddress);
395         }
396 
397         buf.append(']');
398 
399         String strVal = buf.toString();
400         this.strVal = strVal;
401         strValConnected = connected;
402         return strVal;
403     }
404 
405     private String getIdString() {
406         String answer = Integer.toHexString(id.intValue());
407         switch (answer.length()) {
408         case 0:
409             answer = "00000000";
410             break;
411         case 1:
412             answer = "0000000" + answer;
413             break;
414         case 2:
415             answer = "000000" + answer;
416             break;
417         case 3:
418             answer = "00000" + answer;
419             break;
420         case 4:
421             answer = "0000" + answer;
422             break;
423         case 5:
424             answer = "000" + answer;
425             break;
426         case 6:
427             answer = "00" + answer;
428             break;
429         case 7:
430             answer = '0' + answer;
431             break;
432         }
433         return answer;
434     }
435 
436     private final class ChannelCloseFuture extends DefaultChannelFuture {
437 
438         ChannelCloseFuture() {
439             super(AbstractChannel.this, false);
440         }
441 
442         @Override
443         public boolean setSuccess() {
444             // User is not supposed to call this method - ignore silently.
445             return false;
446         }
447 
448         @Override
449         public boolean setFailure(Throwable cause) {
450             // User is not supposed to call this method - ignore silently.
451             return false;
452         }
453 
454         boolean setClosed() {
455             return super.setSuccess();
456         }
457     }
458 }