查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  /**
21   * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise
22   * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined
23   * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the
24   * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined
25   * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p>
26   *
27   * <p>Callers may populate a promise combiner with any number of futures to be combined via the
28   * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be
29   * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have
30   * finished via the {@link PromiseCombiner#finish(Promise)} method.</p>
31   *
32   * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called
33   * from the {@link EventExecutor} thread.</p>
34   */
35  public final class PromiseCombiner {
36      private int expectedCount;
37      private int doneCount;
38      private Promise<Void> aggregatePromise;
39      private Throwable cause;
40      private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() {
41          @Override
42          public void operationComplete(final Future<?> future) {
43              if (executor.inEventLoop()) {
44                  operationComplete0(future);
45              } else {
46                  executor.execute(new Runnable() {
47                      @Override
48                      public void run() {
49                          operationComplete0(future);
50                      }
51                  });
52              }
53          }
54  
55          private void operationComplete0(Future<?> future) {
56              assert executor.inEventLoop();
57              ++doneCount;
58              if (!future.isSuccess() && cause == null) {
59                  cause = future.cause();
60              }
61              if (doneCount == expectedCount && aggregatePromise != null) {
62                  tryPromise();
63              }
64          }
65      };
66  
67      private final EventExecutor executor;
68  
69      /**
70       * Deprecated use {@link PromiseCombiner#PromiseCombiner(EventExecutor)}.
71       */
72      @Deprecated
73      public PromiseCombiner() {
74          this(ImmediateEventExecutor.INSTANCE);
75      }
76  
77      /**
78       * The {@link EventExecutor} to use for notifications. You must call {@link #add(Future)}, {@link #addAll(Future[])}
79       * and {@link #finish(Promise)} from within the {@link EventExecutor} thread.
80       *
81       * @param executor the {@link EventExecutor} to use for notifications.
82       */
83      public PromiseCombiner(EventExecutor executor) {
84          this.executor = ObjectUtil.checkNotNull(executor, "executor");
85      }
86  
87      /**
88       * Adds a new promise to be combined. New promises may be added until an aggregate promise is added via the
89       * {@link PromiseCombiner#finish(Promise)} method.
90       *
91       * @param promise the promise to add to this promise combiner
92       *
93       * @deprecated Replaced by {@link PromiseCombiner#add(Future)}.
94       */
95      @Deprecated
96      public void add(Promise promise) {
97          add((Future) promise);
98      }
99  
100     /**
101      * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the
102      * {@link PromiseCombiner#finish(Promise)} method.
103      *
104      * @param future the future to add to this promise combiner
105      */
106     @SuppressWarnings({ "unchecked", "rawtypes" })
107     public void add(Future future) {
108         checkAddAllowed();
109         checkInEventLoop();
110         ++expectedCount;
111         future.addListener(listener);
112     }
113 
114     /**
115      * Adds new promises to be combined. New promises may be added until an aggregate promise is added via the
116      * {@link PromiseCombiner#finish(Promise)} method.
117      *
118      * @param promises the promises to add to this promise combiner
119      *
120      * @deprecated Replaced by {@link PromiseCombiner#addAll(Future[])}
121      */
122     @Deprecated
123     public void addAll(Promise... promises) {
124         addAll((Future[]) promises);
125     }
126 
127     /**
128      * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the
129      * {@link PromiseCombiner#finish(Promise)} method.
130      *
131      * @param futures the futures to add to this promise combiner
132      */
133     @SuppressWarnings({ "unchecked", "rawtypes" })
134     public void addAll(Future... futures) {
135         for (Future future : futures) {
136             this.add(future);
137         }
138     }
139 
140     /**
141      * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed,
142      * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will
143      * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which
144      * failure will be assigned to the aggregate promise is undefined.</p>
145      *
146      * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or
147      * {@link PromiseCombiner#addAll(Future[])} methods.</p>
148      *
149      * @param aggregatePromise the promise to notify when all combined futures have finished
150      */
151     public void finish(Promise<Void> aggregatePromise) {
152         ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise");
153         checkInEventLoop();
154         if (this.aggregatePromise != null) {
155             throw new IllegalStateException("Already finished");
156         }
157         this.aggregatePromise = aggregatePromise;
158         if (doneCount == expectedCount) {
159             tryPromise();
160         }
161     }
162 
163     private void checkInEventLoop() {
164         if (!executor.inEventLoop()) {
165             throw new IllegalStateException("Must be called from EventExecutor thread");
166         }
167     }
168 
169     private boolean tryPromise() {
170         return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause);
171     }
172 
173     private void checkAddAllowed() {
174         if (aggregatePromise != null) {
175             throw new IllegalStateException("Adding promises is not allowed after finished adding");
176         }
177     }
178 }