1 /* 2 * Copyright 2016 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package io.netty.util.concurrent; 17 18 import io.netty.util.internal.ObjectUtil; 19 20 /** 21 * <p>A promise combiner monitors the outcome of a number of discrete futures, then notifies a final, aggregate promise 22 * when all of the combined futures are finished. The aggregate promise will succeed if and only if all of the combined 23 * futures succeed. If any of the combined futures fail, the aggregate promise will fail. The cause failure for the 24 * aggregate promise will be the failure for one of the failed combined futures; if more than one of the combined 25 * futures fails, exactly which cause of failure will be assigned to the aggregate promise is undefined.</p> 26 * 27 * <p>Callers may populate a promise combiner with any number of futures to be combined via the 28 * {@link PromiseCombiner#add(Future)} and {@link PromiseCombiner#addAll(Future[])} methods. When all futures to be 29 * combined have been added, callers must provide an aggregate promise to be notified when all combined promises have 30 * finished via the {@link PromiseCombiner#finish(Promise)} method.</p> 31 * 32 * <p>This implementation is <strong>NOT</strong> thread-safe and all methods must be called 33 * from the {@link EventExecutor} thread.</p> 34 */ 35 public final class PromiseCombiner { 36 private int expectedCount; 37 private int doneCount; 38 private Promise<Void> aggregatePromise; 39 private Throwable cause; 40 private final GenericFutureListener<Future<?>> listener = new GenericFutureListener<Future<?>>() { 41 @Override 42 public void operationComplete(final Future<?> future) { 43 if (executor.inEventLoop()) { 44 operationComplete0(future); 45 } else { 46 executor.execute(new Runnable() { 47 @Override 48 public void run() { 49 operationComplete0(future); 50 } 51 }); 52 } 53 } 54 55 private void operationComplete0(Future<?> future) { 56 assert executor.inEventLoop(); 57 ++doneCount; 58 if (!future.isSuccess() && cause == null) { 59 cause = future.cause(); 60 } 61 if (doneCount == expectedCount && aggregatePromise != null) { 62 tryPromise(); 63 } 64 } 65 }; 66 67 private final EventExecutor executor; 68 69 /** 70 * Deprecated use {@link PromiseCombiner#PromiseCombiner(EventExecutor)}. 71 */ 72 @Deprecated 73 public PromiseCombiner() { 74 this(ImmediateEventExecutor.INSTANCE); 75 } 76 77 /** 78 * The {@link EventExecutor} to use for notifications. You must call {@link #add(Future)}, {@link #addAll(Future[])} 79 * and {@link #finish(Promise)} from within the {@link EventExecutor} thread. 80 * 81 * @param executor the {@link EventExecutor} to use for notifications. 82 */ 83 public PromiseCombiner(EventExecutor executor) { 84 this.executor = ObjectUtil.checkNotNull(executor, "executor"); 85 } 86 87 /** 88 * Adds a new promise to be combined. New promises may be added until an aggregate promise is added via the 89 * {@link PromiseCombiner#finish(Promise)} method. 90 * 91 * @param promise the promise to add to this promise combiner 92 * 93 * @deprecated Replaced by {@link PromiseCombiner#add(Future)}. 94 */ 95 @Deprecated 96 public void add(Promise promise) { 97 add((Future) promise); 98 } 99 100 /** 101 * Adds a new future to be combined. New futures may be added until an aggregate promise is added via the 102 * {@link PromiseCombiner#finish(Promise)} method. 103 * 104 * @param future the future to add to this promise combiner 105 */ 106 @SuppressWarnings({ "unchecked", "rawtypes" }) 107 public void add(Future future) { 108 checkAddAllowed(); 109 checkInEventLoop(); 110 ++expectedCount; 111 future.addListener(listener); 112 } 113 114 /** 115 * Adds new promises to be combined. New promises may be added until an aggregate promise is added via the 116 * {@link PromiseCombiner#finish(Promise)} method. 117 * 118 * @param promises the promises to add to this promise combiner 119 * 120 * @deprecated Replaced by {@link PromiseCombiner#addAll(Future[])} 121 */ 122 @Deprecated 123 public void addAll(Promise... promises) { 124 addAll((Future[]) promises); 125 } 126 127 /** 128 * Adds new futures to be combined. New futures may be added until an aggregate promise is added via the 129 * {@link PromiseCombiner#finish(Promise)} method. 130 * 131 * @param futures the futures to add to this promise combiner 132 */ 133 @SuppressWarnings({ "unchecked", "rawtypes" }) 134 public void addAll(Future... futures) { 135 for (Future future : futures) { 136 this.add(future); 137 } 138 } 139 140 /** 141 * <p>Sets the promise to be notified when all combined futures have finished. If all combined futures succeed, 142 * then the aggregate promise will succeed. If one or more combined futures fails, then the aggregate promise will 143 * fail with the cause of one of the failed futures. If more than one combined future fails, then exactly which 144 * failure will be assigned to the aggregate promise is undefined.</p> 145 * 146 * <p>After this method is called, no more futures may be added via the {@link PromiseCombiner#add(Future)} or 147 * {@link PromiseCombiner#addAll(Future[])} methods.</p> 148 * 149 * @param aggregatePromise the promise to notify when all combined futures have finished 150 */ 151 public void finish(Promise<Void> aggregatePromise) { 152 ObjectUtil.checkNotNull(aggregatePromise, "aggregatePromise"); 153 checkInEventLoop(); 154 if (this.aggregatePromise != null) { 155 throw new IllegalStateException("Already finished"); 156 } 157 this.aggregatePromise = aggregatePromise; 158 if (doneCount == expectedCount) { 159 tryPromise(); 160 } 161 } 162 163 private void checkInEventLoop() { 164 if (!executor.inEventLoop()) { 165 throw new IllegalStateException("Must be called from EventExecutor thread"); 166 } 167 } 168 169 private boolean tryPromise() { 170 return (cause == null) ? aggregatePromise.trySuccess(null) : aggregatePromise.tryFailure(cause); 171 } 172 173 private void checkAddAllowed() { 174 if (aggregatePromise != null) { 175 throw new IllegalStateException("Adding promises is not allowed after finished adding"); 176 } 177 } 178 }