1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.PromiseNotificationUtil;
19 import io.netty.util.internal.logging.InternalLogger;
20 import io.netty.util.internal.logging.InternalLoggerFactory;
21
22 import static io.netty.util.internal.ObjectUtil.checkNotNull;
23 import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
24
25
26
27
28
29
30
31
32 public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
33
34 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
35 private final Promise<? super V>[] promises;
36 private final boolean logNotifyFailure;
37
38
39
40
41
42
43 @SafeVarargs
44 public PromiseNotifier(Promise<? super V>... promises) {
45 this(true, promises);
46 }
47
48
49
50
51
52
53
54 @SafeVarargs
55 public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
56 checkNotNull(promises, "promises");
57 for (Promise<? super V> promise: promises) {
58 checkNotNullWithIAE(promise, "promise");
59 }
60 this.promises = promises.clone();
61 this.logNotifyFailure = logNotifyFailure;
62 }
63
64
65
66
67
68
69
70
71
72
73
74
75 public static <V, F extends Future<V>> F cascade(final F future, final Promise<? super V> promise) {
76 return cascade(true, future, promise);
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91 @SuppressWarnings({"unchecked", "rawtypes"})
92 public static <V, F extends Future<V>> F cascade(boolean logNotifyFailure, final F future,
93 final Promise<? super V> promise) {
94 promise.addListener(new FutureListener() {
95 @Override
96 public void operationComplete(Future f) {
97 if (f.isCancelled()) {
98 future.cancel(false);
99 }
100 }
101 });
102 future.addListener(new PromiseNotifier(logNotifyFailure, promise) {
103 @Override
104 public void operationComplete(Future f) throws Exception {
105 if (promise.isCancelled() && f.isCancelled()) {
106
107 return;
108 }
109 super.operationComplete(future);
110 }
111 });
112 return future;
113 }
114
115 @Override
116 public void operationComplete(F future) throws Exception {
117 InternalLogger internalLogger = logNotifyFailure ? logger : null;
118 if (future.isSuccess()) {
119 V result = future.get();
120 for (Promise<? super V> p: promises) {
121 PromiseNotificationUtil.trySuccess(p, result, internalLogger);
122 }
123 } else if (future.isCancelled()) {
124 for (Promise<? super V> p: promises) {
125 PromiseNotificationUtil.tryCancel(p, internalLogger);
126 }
127 } else {
128 Throwable cause = future.cause();
129 for (Promise<? super V> p: promises) {
130 PromiseNotificationUtil.tryFailure(p, cause, internalLogger);
131 }
132 }
133 }
134 }