1 /*
2 * Copyright 2014 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.PromiseNotificationUtil;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import static io.netty.util.internal.ObjectUtil.checkNotNull;
23 import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
24
25 /**
26 * {@link GenericFutureListener} implementation which takes other {@link Promise}s
27 * and notifies them on completion.
28 *
29 * @param <V> the type of value returned by the future
30 * @param <F> the type of future
31 */
32 public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
33
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
35 private final Promise<? super V>[] promises;
36 private final boolean logNotifyFailure;
37
38 /**
39 * Create a new instance.
40 *
41 * @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
42 */
43 @SafeVarargs
44 public PromiseNotifier(Promise<? super V>... promises) {
45 this(true, promises);
46 }
47
48 /**
49 * Create a new instance.
50 *
51 * @param logNotifyFailure {@code true} if logging should be done in case notification fails.
52 * @param promises the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
53 */
54 @SafeVarargs
55 public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
56 checkNotNull(promises, "promises");
57 for (Promise<? super V> promise: promises) {
58 checkNotNullWithIAE(promise, "promise");
59 }
60 this.promises = promises.clone();
61 this.logNotifyFailure = logNotifyFailure;
62 }
63
64 /**
65 * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
66 * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
67 * the {@link Promise} is cancelled and vise-versa.
68 *
69 * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
70 * @param promise the {@link Promise} which will be notified
71 * @param <V> the type of the value.
72 * @param <F> the type of the {@link Future}
73 * @return the passed in {@link Future}
74 */
75 public static <V, F extends Future<V>> F cascade(final F future, final Promise<? super V> promise) {
76 return cascade(true, future, promise);
77 }
78
79 /**
80 * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
81 * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
82 * the {@link Promise} is cancelled and vise-versa.
83 *
84 * @param logNotifyFailure {@code true} if logging should be done in case notification fails.
85 * @param future the {@link Future} which will be used to listen to for notifying the {@link Promise}.
86 * @param promise the {@link Promise} which will be notified
87 * @param <V> the type of the value.
88 * @param <F> the type of the {@link Future}
89 * @return the passed in {@link Future}
90 */
91 @SuppressWarnings({"unchecked", "rawtypes"})
92 public static <V, F extends Future<V>> F cascade(boolean logNotifyFailure, final F future,
93 final Promise<? super V> promise) {
94 promise.addListener(new FutureListener() {
95 @Override
96 public void operationComplete(Future f) {
97 if (f.isCancelled()) {
98 future.cancel(false);
99 }
100 }
101 });
102 future.addListener(new PromiseNotifier(logNotifyFailure, promise) {
103 @Override
104 public void operationComplete(Future f) throws Exception {
105 if (promise.isCancelled() && f.isCancelled()) {
106 // Just return if we propagate a cancel from the promise to the future and both are notified already
107 return;
108 }
109 super.operationComplete(future);
110 }
111 });
112 return future;
113 }
114
115 @Override
116 public void operationComplete(F future) throws Exception {
117 InternalLogger internalLogger = logNotifyFailure ? logger : null;
118 if (future.isSuccess()) {
119 V result = future.get();
120 for (Promise<? super V> p: promises) {
121 PromiseNotificationUtil.trySuccess(p, result, internalLogger);
122 }
123 } else if (future.isCancelled()) {
124 for (Promise<? super V> p: promises) {
125 PromiseNotificationUtil.tryCancel(p, internalLogger);
126 }
127 } else {
128 Throwable cause = future.cause();
129 for (Promise<? super V> p: promises) {
130 PromiseNotificationUtil.tryFailure(p, cause, internalLogger);
131 }
132 }
133 }
134 }