1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.group;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelFuture;
20 import io.netty.channel.ChannelFutureListener;
21 import io.netty.util.concurrent.BlockingOperationException;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.concurrent.Future;
25 import io.netty.util.concurrent.GenericFutureListener;
26 import io.netty.util.concurrent.ImmediateEventExecutor;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Map;
36
37
38
39
40
41 final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
42
43 private final ChannelGroup group;
44 private final Map<Channel, ChannelFuture> futures;
45 private int successCount;
46 private int failureCount;
47
48 private final ChannelFutureListener childListener = new ChannelFutureListener() {
49 @Override
50 public void operationComplete(ChannelFuture future) throws Exception {
51 boolean success = future.isSuccess();
52 boolean callSetDone;
53 synchronized (DefaultChannelGroupFuture.this) {
54 if (success) {
55 successCount ++;
56 } else {
57 failureCount ++;
58 }
59
60 callSetDone = successCount + failureCount == futures.size();
61 assert successCount + failureCount <= futures.size();
62 }
63
64 if (callSetDone) {
65 if (failureCount > 0) {
66 List<Map.Entry<Channel, Throwable>> failed =
67 new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
68 for (ChannelFuture f: futures.values()) {
69 if (!f.isSuccess()) {
70 failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
71 }
72 }
73 setFailure0(new ChannelGroupException(failed));
74 } else {
75 setSuccess0();
76 }
77 }
78 }
79 };
80
81
82
83
84 DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures, EventExecutor executor) {
85 super(executor);
86 this.group = ObjectUtil.checkNotNull(group, "group");
87 ObjectUtil.checkNotNull(futures, "futures");
88
89 Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
90 for (ChannelFuture f: futures) {
91 futureMap.put(f.channel(), f);
92 }
93
94 this.futures = Collections.unmodifiableMap(futureMap);
95
96 for (ChannelFuture f: this.futures.values()) {
97 f.addListener(childListener);
98 }
99
100
101 if (this.futures.isEmpty()) {
102 setSuccess0();
103 }
104 }
105
106 DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
107 super(executor);
108 this.group = group;
109 this.futures = Collections.unmodifiableMap(futures);
110 for (ChannelFuture f: this.futures.values()) {
111 f.addListener(childListener);
112 }
113
114
115 if (this.futures.isEmpty()) {
116 setSuccess0();
117 }
118 }
119
120 @Override
121 public ChannelGroup group() {
122 return group;
123 }
124
125 @Override
126 public ChannelFuture find(Channel channel) {
127 return futures.get(channel);
128 }
129
130 @Override
131 public Iterator<ChannelFuture> iterator() {
132 return futures.values().iterator();
133 }
134
135 @Override
136 public synchronized boolean isPartialSuccess() {
137 return successCount != 0 && successCount != futures.size();
138 }
139
140 @Override
141 public synchronized boolean isPartialFailure() {
142 return failureCount != 0 && failureCount != futures.size();
143 }
144
145 @Override
146 public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
147 super.addListener(listener);
148 return this;
149 }
150
151 @Override
152 public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
153 super.addListeners(listeners);
154 return this;
155 }
156
157 @Override
158 public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
159 super.removeListener(listener);
160 return this;
161 }
162
163 @Override
164 public DefaultChannelGroupFuture removeListeners(
165 GenericFutureListener<? extends Future<? super Void>>... listeners) {
166 super.removeListeners(listeners);
167 return this;
168 }
169
170 @Override
171 public DefaultChannelGroupFuture await() throws InterruptedException {
172 super.await();
173 return this;
174 }
175
176 @Override
177 public DefaultChannelGroupFuture awaitUninterruptibly() {
178 super.awaitUninterruptibly();
179 return this;
180 }
181
182 @Override
183 public DefaultChannelGroupFuture syncUninterruptibly() {
184 super.syncUninterruptibly();
185 return this;
186 }
187
188 @Override
189 public DefaultChannelGroupFuture sync() throws InterruptedException {
190 super.sync();
191 return this;
192 }
193
194 @Override
195 public ChannelGroupException cause() {
196 return (ChannelGroupException) super.cause();
197 }
198
199 private void setSuccess0() {
200 super.setSuccess(null);
201 }
202
203 private void setFailure0(ChannelGroupException cause) {
204 super.setFailure(cause);
205 }
206
207 @Override
208 public DefaultChannelGroupFuture setSuccess(Void result) {
209 throw new IllegalStateException();
210 }
211
212 @Override
213 public boolean trySuccess(Void result) {
214 throw new IllegalStateException();
215 }
216
217 @Override
218 public DefaultChannelGroupFuture setFailure(Throwable cause) {
219 throw new IllegalStateException();
220 }
221
222 @Override
223 public boolean tryFailure(Throwable cause) {
224 throw new IllegalStateException();
225 }
226
227 @Override
228 protected void checkDeadLock() {
229 EventExecutor e = executor();
230 if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
231 throw new BlockingOperationException();
232 }
233 }
234
235 private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
236 private final K key;
237 private final V value;
238
239 DefaultEntry(K key, V value) {
240 this.key = key;
241 this.value = value;
242 }
243
244 @Override
245 public K getKey() {
246 return key;
247 }
248
249 @Override
250 public V getValue() {
251 return value;
252 }
253
254 @Override
255 public V setValue(V value) {
256 throw new UnsupportedOperationException("read-only");
257 }
258 }
259 }