查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import java.net.SocketAddress;
19  import java.util.AbstractSet;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Iterator;
23  import java.util.LinkedHashMap;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.jboss.netty.buffer.ChannelBuffer;
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.channel.ServerChannel;
33  import org.jboss.netty.util.internal.ConcurrentHashMap;
34  
35  /**
36   * The default {@link ChannelGroup} implementation.
37   * @apiviz.landmark
38   */
39  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
40  
41      private static final AtomicInteger nextId = new AtomicInteger();
42  
43      private final String name;
44      private final ConcurrentMap<Integer, Channel> serverChannels = new ConcurrentHashMap<Integer, Channel>();
45      private final ConcurrentMap<Integer, Channel> nonServerChannels = new ConcurrentHashMap<Integer, Channel>();
46      private final ChannelFutureListener remover = new ChannelFutureListener() {
47          public void operationComplete(ChannelFuture future) throws Exception {
48              remove(future.getChannel());
49          }
50      };
51  
52      /**
53       * Creates a new group with a generated name.
54       */
55      public DefaultChannelGroup() {
56          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()));
57      }
58  
59      /**
60       * Creates a new group with the specified {@code name}.  Please note that
61       * different groups can have the same name, which means no duplicate check
62       * is done against group names.
63       */
64      public DefaultChannelGroup(String name) {
65          if (name == null) {
66              throw new NullPointerException("name");
67          }
68          this.name = name;
69      }
70  
71      public String getName() {
72          return name;
73      }
74  
75      @Override
76      public boolean isEmpty() {
77          return nonServerChannels.isEmpty() && serverChannels.isEmpty();
78      }
79  
80      @Override
81      public int size() {
82          return nonServerChannels.size() + serverChannels.size();
83      }
84  
85      public Channel find(Integer id) {
86          Channel c = nonServerChannels.get(id);
87          if (c != null) {
88              return c;
89          } else {
90              return serverChannels.get(id);
91          }
92      }
93  
94      @Override
95      public boolean contains(Object o) {
96          if (o instanceof Integer) {
97              return nonServerChannels.containsKey(o) || serverChannels.containsKey(o);
98          } else if (o instanceof Channel) {
99              Channel c = (Channel) o;
100             if (o instanceof ServerChannel) {
101                 return serverChannels.containsKey(c.getId());
102             } else {
103                 return nonServerChannels.containsKey(c.getId());
104             }
105         } else {
106             return false;
107         }
108     }
109 
110     @Override
111     public boolean add(Channel channel) {
112         ConcurrentMap<Integer, Channel> map =
113             channel instanceof ServerChannel? serverChannels : nonServerChannels;
114 
115         boolean added = map.putIfAbsent(channel.getId(), channel) == null;
116         if (added) {
117             channel.getCloseFuture().addListener(remover);
118         }
119         return added;
120     }
121 
122     @Override
123     public boolean remove(Object o) {
124         Channel c = null;
125         if (o instanceof Integer) {
126             c = nonServerChannels.remove(o);
127             if (c == null) {
128                 c = serverChannels.remove(o);
129             }
130         } else if (o instanceof Channel) {
131             c = (Channel) o;
132             if (c instanceof ServerChannel) {
133                 c = serverChannels.remove(c.getId());
134             } else {
135                 c = nonServerChannels.remove(c.getId());
136             }
137         }
138 
139         if (c == null) {
140             return false;
141         }
142 
143         c.getCloseFuture().removeListener(remover);
144         return true;
145     }
146 
147     @Override
148     public void clear() {
149         nonServerChannels.clear();
150         serverChannels.clear();
151     }
152 
153     @Override
154     public Iterator<Channel> iterator() {
155         return new CombinedIterator<Channel>(
156                 serverChannels.values().iterator(),
157                 nonServerChannels.values().iterator());
158     }
159 
160     @Override
161     public Object[] toArray() {
162         Collection<Channel> channels = new ArrayList<Channel>(size());
163         channels.addAll(serverChannels.values());
164         channels.addAll(nonServerChannels.values());
165         return channels.toArray();
166     }
167 
168     @Override
169     public <T> T[] toArray(T[] a) {
170         Collection<Channel> channels = new ArrayList<Channel>(size());
171         channels.addAll(serverChannels.values());
172         channels.addAll(nonServerChannels.values());
173         return channels.toArray(a);
174     }
175 
176     public ChannelGroupFuture close() {
177         Map<Integer, ChannelFuture> futures =
178             new LinkedHashMap<Integer, ChannelFuture>(size());
179 
180         for (Channel c: serverChannels.values()) {
181             futures.put(c.getId(), c.close().awaitUninterruptibly());
182         }
183         for (Channel c: nonServerChannels.values()) {
184             futures.put(c.getId(), c.close());
185         }
186 
187         return new DefaultChannelGroupFuture(this, futures);
188     }
189 
190     public ChannelGroupFuture disconnect() {
191         Map<Integer, ChannelFuture> futures =
192             new LinkedHashMap<Integer, ChannelFuture>(size());
193 
194         for (Channel c: serverChannels.values()) {
195             futures.put(c.getId(), c.disconnect().awaitUninterruptibly());
196         }
197         for (Channel c: nonServerChannels.values()) {
198             futures.put(c.getId(), c.disconnect());
199         }
200 
201         return new DefaultChannelGroupFuture(this, futures);
202     }
203 
204     public ChannelGroupFuture setInterestOps(int interestOps) {
205         Map<Integer, ChannelFuture> futures =
206             new LinkedHashMap<Integer, ChannelFuture>(size());
207 
208         for (Channel c: serverChannels.values()) {
209             futures.put(c.getId(), c.setInterestOps(interestOps).awaitUninterruptibly());
210         }
211         for (Channel c: nonServerChannels.values()) {
212             futures.put(c.getId(), c.setInterestOps(interestOps));
213         }
214 
215         return new DefaultChannelGroupFuture(this, futures);
216     }
217 
218     public ChannelGroupFuture setReadable(boolean readable) {
219         Map<Integer, ChannelFuture> futures =
220             new LinkedHashMap<Integer, ChannelFuture>(size());
221 
222         for (Channel c: serverChannels.values()) {
223             futures.put(c.getId(), c.setReadable(readable).awaitUninterruptibly());
224         }
225         for (Channel c: nonServerChannels.values()) {
226             futures.put(c.getId(), c.setReadable(readable));
227         }
228 
229         return new DefaultChannelGroupFuture(this, futures);
230     }
231 
232     public ChannelGroupFuture unbind() {
233         Map<Integer, ChannelFuture> futures =
234             new LinkedHashMap<Integer, ChannelFuture>(size());
235 
236         for (Channel c: serverChannels.values()) {
237             futures.put(c.getId(), c.unbind().awaitUninterruptibly());
238         }
239         for (Channel c: nonServerChannels.values()) {
240             futures.put(c.getId(), c.unbind());
241         }
242 
243         return new DefaultChannelGroupFuture(this, futures);
244     }
245 
246     public ChannelGroupFuture write(Object message) {
247         Map<Integer, ChannelFuture> futures =
248             new LinkedHashMap<Integer, ChannelFuture>(size());
249         if (message instanceof ChannelBuffer) {
250             ChannelBuffer buf = (ChannelBuffer) message;
251             for (Channel c: nonServerChannels.values()) {
252                 futures.put(c.getId(), c.write(buf.duplicate()));
253             }
254         } else {
255             for (Channel c: nonServerChannels.values()) {
256                 futures.put(c.getId(), c.write(message));
257             }
258         }
259         return new DefaultChannelGroupFuture(this, futures);
260     }
261 
262     public ChannelGroupFuture write(Object message, SocketAddress remoteAddress) {
263         Map<Integer, ChannelFuture> futures =
264             new LinkedHashMap<Integer, ChannelFuture>(size());
265         if (message instanceof ChannelBuffer) {
266             ChannelBuffer buf = (ChannelBuffer) message;
267             for (Channel c: nonServerChannels.values()) {
268                 futures.put(c.getId(), c.write(buf.duplicate(), remoteAddress));
269             }
270         } else {
271             for (Channel c: nonServerChannels.values()) {
272                 futures.put(c.getId(), c.write(message, remoteAddress));
273             }
274         }
275         return new DefaultChannelGroupFuture(this, futures);
276     }
277 
278     @Override
279     public int hashCode() {
280         return System.identityHashCode(this);
281     }
282 
283     @Override
284     public boolean equals(Object o) {
285         return this == o;
286     }
287 
288     public int compareTo(ChannelGroup o) {
289         int v = getName().compareTo(o.getName());
290         if (v != 0) {
291             return v;
292         }
293 
294         return System.identityHashCode(this) - System.identityHashCode(o);
295     }
296 
297     @Override
298     public String toString() {
299         return getClass().getSimpleName() +
300                "(name: " + getName() + ", size: " + size() + ')';
301     }
302 }