1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.group;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelId;
24 import io.netty.channel.ServerChannel;
25 import io.netty.util.ReferenceCountUtil;
26 import io.netty.util.concurrent.EventExecutor;
27 import io.netty.util.internal.ObjectUtil;
28 import io.netty.util.internal.PlatformDependent;
29 import io.netty.util.internal.StringUtil;
30
31 import java.util.AbstractSet;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Iterator;
35 import java.util.LinkedHashMap;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40
41
42
43 public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
44
45 private static final AtomicInteger nextId = new AtomicInteger();
46 private final String name;
47 private final EventExecutor executor;
48 private final ConcurrentMap<ChannelId, Channel> serverChannels = PlatformDependent.newConcurrentHashMap();
49 private final ConcurrentMap<ChannelId, Channel> nonServerChannels = PlatformDependent.newConcurrentHashMap();
50 private final ChannelFutureListener remover = new ChannelFutureListener() {
51 @Override
52 public void operationComplete(ChannelFuture future) throws Exception {
53 remove(future.channel());
54 }
55 };
56 private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
57 private final boolean stayClosed;
58 private volatile boolean closed;
59
60
61
62
63
64 public DefaultChannelGroup(EventExecutor executor) {
65 this(executor, false);
66 }
67
68
69
70
71
72
73 public DefaultChannelGroup(String name, EventExecutor executor) {
74 this(name, executor, false);
75 }
76
77
78
79
80
81
82
83 public DefaultChannelGroup(EventExecutor executor, boolean stayClosed) {
84 this("group-0x" + Integer.toHexString(nextId.incrementAndGet()), executor, stayClosed);
85 }
86
87
88
89
90
91
92
93
94 public DefaultChannelGroup(String name, EventExecutor executor, boolean stayClosed) {
95 ObjectUtil.checkNotNull(name, "name");
96 this.name = name;
97 this.executor = executor;
98 this.stayClosed = stayClosed;
99 }
100
101 @Override
102 public String name() {
103 return name;
104 }
105
106 @Override
107 public Channel find(ChannelId id) {
108 Channel c = nonServerChannels.get(id);
109 if (c != null) {
110 return c;
111 } else {
112 return serverChannels.get(id);
113 }
114 }
115
116 @Override
117 public boolean isEmpty() {
118 return nonServerChannels.isEmpty() && serverChannels.isEmpty();
119 }
120
121 @Override
122 public int size() {
123 return nonServerChannels.size() + serverChannels.size();
124 }
125
126 @Override
127 public boolean contains(Object o) {
128 if (o instanceof ServerChannel) {
129 return serverChannels.containsValue(o);
130 } else if (o instanceof Channel) {
131 return nonServerChannels.containsValue(o);
132 }
133 return false;
134 }
135
136 @Override
137 public boolean add(Channel channel) {
138 ConcurrentMap<ChannelId, Channel> map =
139 channel instanceof ServerChannel? serverChannels : nonServerChannels;
140
141 boolean added = map.putIfAbsent(channel.id(), channel) == null;
142 if (added) {
143 channel.closeFuture().addListener(remover);
144 }
145
146 if (stayClosed && closed) {
147
148
149
150
151
152
153
154
155
156
157
158
159 channel.close();
160 }
161
162 return added;
163 }
164
165 @Override
166 public boolean remove(Object o) {
167 Channel c = null;
168 if (o instanceof ChannelId) {
169 c = nonServerChannels.remove(o);
170 if (c == null) {
171 c = serverChannels.remove(o);
172 }
173 } else if (o instanceof Channel) {
174 c = (Channel) o;
175 if (c instanceof ServerChannel) {
176 c = serverChannels.remove(c.id());
177 } else {
178 c = nonServerChannels.remove(c.id());
179 }
180 }
181
182 if (c == null) {
183 return false;
184 }
185
186 c.closeFuture().removeListener(remover);
187 return true;
188 }
189
190 @Override
191 public void clear() {
192 nonServerChannels.clear();
193 serverChannels.clear();
194 }
195
196 @Override
197 public Iterator<Channel> iterator() {
198 return new CombinedIterator<Channel>(
199 serverChannels.values().iterator(),
200 nonServerChannels.values().iterator());
201 }
202
203 @Override
204 public Object[] toArray() {
205 Collection<Channel> channels = new ArrayList<Channel>(size());
206 channels.addAll(serverChannels.values());
207 channels.addAll(nonServerChannels.values());
208 return channels.toArray();
209 }
210
211 @Override
212 public <T> T[] toArray(T[] a) {
213 Collection<Channel> channels = new ArrayList<Channel>(size());
214 channels.addAll(serverChannels.values());
215 channels.addAll(nonServerChannels.values());
216 return channels.toArray(a);
217 }
218
219 @Override
220 public ChannelGroupFuture close() {
221 return close(ChannelMatchers.all());
222 }
223
224 @Override
225 public ChannelGroupFuture disconnect() {
226 return disconnect(ChannelMatchers.all());
227 }
228
229 @Override
230 public ChannelGroupFuture deregister() {
231 return deregister(ChannelMatchers.all());
232 }
233
234 @Override
235 public ChannelGroupFuture write(Object message) {
236 return write(message, ChannelMatchers.all());
237 }
238
239
240
241 private static Object safeDuplicate(Object message) {
242 if (message instanceof ByteBuf) {
243 return ((ByteBuf) message).retainedDuplicate();
244 } else if (message instanceof ByteBufHolder) {
245 return ((ByteBufHolder) message).retainedDuplicate();
246 } else {
247 return ReferenceCountUtil.retain(message);
248 }
249 }
250
251 @Override
252 public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
253 return write(message, matcher, false);
254 }
255
256 @Override
257 public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
258 ObjectUtil.checkNotNull(message, "message");
259 ObjectUtil.checkNotNull(matcher, "matcher");
260
261 final ChannelGroupFuture future;
262 if (voidPromise) {
263 for (Channel c: nonServerChannels.values()) {
264 if (matcher.matches(c)) {
265 c.write(safeDuplicate(message), c.voidPromise());
266 }
267 }
268 future = voidFuture;
269 } else {
270 Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(nonServerChannels.size());
271 for (Channel c: nonServerChannels.values()) {
272 if (matcher.matches(c)) {
273 futures.put(c, c.write(safeDuplicate(message)));
274 }
275 }
276 future = new DefaultChannelGroupFuture(this, futures, executor);
277 }
278 ReferenceCountUtil.release(message);
279 return future;
280 }
281
282 @Override
283 public ChannelGroup flush() {
284 return flush(ChannelMatchers.all());
285 }
286
287 @Override
288 public ChannelGroupFuture flushAndWrite(Object message) {
289 return writeAndFlush(message);
290 }
291
292 @Override
293 public ChannelGroupFuture writeAndFlush(Object message) {
294 return writeAndFlush(message, ChannelMatchers.all());
295 }
296
297 @Override
298 public ChannelGroupFuture disconnect(ChannelMatcher matcher) {
299 ObjectUtil.checkNotNull(matcher, "matcher");
300
301 Map<Channel, ChannelFuture> futures =
302 new LinkedHashMap<Channel, ChannelFuture>(size());
303
304 for (Channel c: serverChannels.values()) {
305 if (matcher.matches(c)) {
306 futures.put(c, c.disconnect());
307 }
308 }
309 for (Channel c: nonServerChannels.values()) {
310 if (matcher.matches(c)) {
311 futures.put(c, c.disconnect());
312 }
313 }
314
315 return new DefaultChannelGroupFuture(this, futures, executor);
316 }
317
318 @Override
319 public ChannelGroupFuture close(ChannelMatcher matcher) {
320 ObjectUtil.checkNotNull(matcher, "matcher");
321
322 Map<Channel, ChannelFuture> futures =
323 new LinkedHashMap<Channel, ChannelFuture>(size());
324
325 if (stayClosed) {
326
327
328
329
330
331
332 closed = true;
333 }
334
335 for (Channel c: serverChannels.values()) {
336 if (matcher.matches(c)) {
337 futures.put(c, c.close());
338 }
339 }
340 for (Channel c: nonServerChannels.values()) {
341 if (matcher.matches(c)) {
342 futures.put(c, c.close());
343 }
344 }
345
346 return new DefaultChannelGroupFuture(this, futures, executor);
347 }
348
349 @Override
350 public ChannelGroupFuture deregister(ChannelMatcher matcher) {
351 ObjectUtil.checkNotNull(matcher, "matcher");
352
353 Map<Channel, ChannelFuture> futures =
354 new LinkedHashMap<Channel, ChannelFuture>(size());
355
356 for (Channel c: serverChannels.values()) {
357 if (matcher.matches(c)) {
358 futures.put(c, c.deregister());
359 }
360 }
361 for (Channel c: nonServerChannels.values()) {
362 if (matcher.matches(c)) {
363 futures.put(c, c.deregister());
364 }
365 }
366
367 return new DefaultChannelGroupFuture(this, futures, executor);
368 }
369
370 @Override
371 public ChannelGroup flush(ChannelMatcher matcher) {
372 for (Channel c: nonServerChannels.values()) {
373 if (matcher.matches(c)) {
374 c.flush();
375 }
376 }
377 return this;
378 }
379
380 @Override
381 public ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher) {
382 return writeAndFlush(message, matcher);
383 }
384
385 @Override
386 public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
387 return writeAndFlush(message, matcher, false);
388 }
389
390 @Override
391 public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
392 ObjectUtil.checkNotNull(message, "message");
393
394 final ChannelGroupFuture future;
395 if (voidPromise) {
396 for (Channel c: nonServerChannels.values()) {
397 if (matcher.matches(c)) {
398 c.writeAndFlush(safeDuplicate(message), c.voidPromise());
399 }
400 }
401 future = voidFuture;
402 } else {
403 Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(nonServerChannels.size());
404 for (Channel c: nonServerChannels.values()) {
405 if (matcher.matches(c)) {
406 futures.put(c, c.writeAndFlush(safeDuplicate(message)));
407 }
408 }
409 future = new DefaultChannelGroupFuture(this, futures, executor);
410 }
411 ReferenceCountUtil.release(message);
412 return future;
413 }
414
415 @Override
416 public ChannelGroupFuture newCloseFuture() {
417 return newCloseFuture(ChannelMatchers.all());
418 }
419
420 @Override
421 public ChannelGroupFuture newCloseFuture(ChannelMatcher matcher) {
422 Map<Channel, ChannelFuture> futures =
423 new LinkedHashMap<Channel, ChannelFuture>(size());
424
425 for (Channel c: serverChannels.values()) {
426 if (matcher.matches(c)) {
427 futures.put(c, c.closeFuture());
428 }
429 }
430 for (Channel c: nonServerChannels.values()) {
431 if (matcher.matches(c)) {
432 futures.put(c, c.closeFuture());
433 }
434 }
435
436 return new DefaultChannelGroupFuture(this, futures, executor);
437 }
438
439 @Override
440 public int hashCode() {
441 return System.identityHashCode(this);
442 }
443
444 @Override
445 public boolean equals(Object o) {
446 return this == o;
447 }
448
449 @Override
450 public int compareTo(ChannelGroup o) {
451 int v = name().compareTo(o.name());
452 if (v != 0) {
453 return v;
454 }
455
456 return System.identityHashCode(this) - System.identityHashCode(o);
457 }
458
459 @Override
460 public String toString() {
461 return StringUtil.simpleClassName(this) + "(name: " + name() + ", size: " + size() + ')';
462 }
463 }