查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.group;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.util.concurrent.BlockingOperationException;
22  import io.netty.util.concurrent.DefaultPromise;
23  import io.netty.util.concurrent.EventExecutor;
24  import io.netty.util.concurrent.Future;
25  import io.netty.util.concurrent.GenericFutureListener;
26  import io.netty.util.concurrent.ImmediateEventExecutor;
27  import io.netty.util.internal.ObjectUtil;
28  
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  
38  /**
39   * The default {@link ChannelGroupFuture} implementation.
40   */
41  final class DefaultChannelGroupFuture extends DefaultPromise<Void> implements ChannelGroupFuture {
42  
43      private final ChannelGroup group;
44      private final Map<Channel, ChannelFuture> futures;
45      private int successCount;
46      private int failureCount;
47  
48      private final ChannelFutureListener childListener = new ChannelFutureListener() {
49          @Override
50          public void operationComplete(ChannelFuture future) throws Exception {
51              boolean success = future.isSuccess();
52              boolean callSetDone;
53              synchronized (DefaultChannelGroupFuture.this) {
54                  if (success) {
55                      successCount ++;
56                  } else {
57                      failureCount ++;
58                  }
59  
60                  callSetDone = successCount + failureCount == futures.size();
61                  assert successCount + failureCount <= futures.size();
62              }
63  
64              if (callSetDone) {
65                  if (failureCount > 0) {
66                      List<Map.Entry<Channel, Throwable>> failed =
67                              new ArrayList<Map.Entry<Channel, Throwable>>(failureCount);
68                      for (ChannelFuture f: futures.values()) {
69                          if (!f.isSuccess()) {
70                              failed.add(new DefaultEntry<Channel, Throwable>(f.channel(), f.cause()));
71                          }
72                      }
73                      setFailure0(new ChannelGroupException(failed));
74                  } else {
75                      setSuccess0();
76                  }
77              }
78          }
79      };
80  
81      /**
82       * Creates a new instance.
83       */
84      DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures,  EventExecutor executor) {
85          super(executor);
86          this.group = ObjectUtil.checkNotNull(group, "group");
87          ObjectUtil.checkNotNull(futures, "futures");
88  
89          Map<Channel, ChannelFuture> futureMap = new LinkedHashMap<Channel, ChannelFuture>();
90          for (ChannelFuture f: futures) {
91              futureMap.put(f.channel(), f);
92          }
93  
94          this.futures = Collections.unmodifiableMap(futureMap);
95  
96          for (ChannelFuture f: this.futures.values()) {
97              f.addListener(childListener);
98          }
99  
100         // Done on arrival?
101         if (this.futures.isEmpty()) {
102             setSuccess0();
103         }
104     }
105 
106     DefaultChannelGroupFuture(ChannelGroup group, Map<Channel, ChannelFuture> futures, EventExecutor executor) {
107         super(executor);
108         this.group = group;
109         this.futures = Collections.unmodifiableMap(futures);
110         for (ChannelFuture f: this.futures.values()) {
111             f.addListener(childListener);
112         }
113 
114         // Done on arrival?
115         if (this.futures.isEmpty()) {
116             setSuccess0();
117         }
118     }
119 
120     @Override
121     public ChannelGroup group() {
122         return group;
123     }
124 
125     @Override
126     public ChannelFuture find(Channel channel) {
127         return futures.get(channel);
128     }
129 
130     @Override
131     public Iterator<ChannelFuture> iterator() {
132         return futures.values().iterator();
133     }
134 
135     @Override
136     public synchronized boolean isPartialSuccess() {
137         return successCount != 0 && successCount != futures.size();
138     }
139 
140     @Override
141     public synchronized boolean isPartialFailure() {
142         return failureCount != 0 && failureCount != futures.size();
143     }
144 
145     @Override
146     public DefaultChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
147         super.addListener(listener);
148         return this;
149     }
150 
151     @Override
152     public DefaultChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
153         super.addListeners(listeners);
154         return this;
155     }
156 
157     @Override
158     public DefaultChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
159         super.removeListener(listener);
160         return this;
161     }
162 
163     @Override
164     public DefaultChannelGroupFuture removeListeners(
165             GenericFutureListener<? extends Future<? super Void>>... listeners) {
166         super.removeListeners(listeners);
167         return this;
168     }
169 
170     @Override
171     public DefaultChannelGroupFuture await() throws InterruptedException {
172         super.await();
173         return this;
174     }
175 
176     @Override
177     public DefaultChannelGroupFuture awaitUninterruptibly() {
178         super.awaitUninterruptibly();
179         return this;
180     }
181 
182     @Override
183     public DefaultChannelGroupFuture syncUninterruptibly() {
184         super.syncUninterruptibly();
185         return this;
186     }
187 
188     @Override
189     public DefaultChannelGroupFuture sync() throws InterruptedException {
190         super.sync();
191         return this;
192     }
193 
194     @Override
195     public ChannelGroupException cause() {
196         return (ChannelGroupException) super.cause();
197     }
198 
199     private void setSuccess0() {
200         super.setSuccess(null);
201     }
202 
203     private void setFailure0(ChannelGroupException cause) {
204         super.setFailure(cause);
205     }
206 
207     @Override
208     public DefaultChannelGroupFuture setSuccess(Void result) {
209         throw new IllegalStateException();
210     }
211 
212     @Override
213     public boolean trySuccess(Void result) {
214         throw new IllegalStateException();
215     }
216 
217     @Override
218     public DefaultChannelGroupFuture setFailure(Throwable cause) {
219         throw new IllegalStateException();
220     }
221 
222     @Override
223     public boolean tryFailure(Throwable cause) {
224         throw new IllegalStateException();
225     }
226 
227     @Override
228     protected void checkDeadLock() {
229         EventExecutor e = executor();
230         if (e != null && e != ImmediateEventExecutor.INSTANCE && e.inEventLoop()) {
231             throw new BlockingOperationException();
232         }
233     }
234 
235     private static final class DefaultEntry<K, V> implements Map.Entry<K, V> {
236         private final K key;
237         private final V value;
238 
239         DefaultEntry(K key, V value) {
240             this.key = key;
241             this.value = value;
242         }
243 
244         @Override
245         public K getKey() {
246             return key;
247         }
248 
249         @Override
250         public V getValue() {
251             return value;
252         }
253 
254         @Override
255         public V setValue(V value) {
256             throw new UnsupportedOperationException("read-only");
257         }
258     }
259 }