查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.PromiseNotificationUtil;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import static io.netty.util.internal.ObjectUtil.checkNotNull;
23  import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
24  
25  /**
26   * {@link GenericFutureListener} implementation which takes other {@link Promise}s
27   * and notifies them on completion.
28   *
29   * @param <V> the type of value returned by the future
30   * @param <F> the type of future
31   */
32  public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {
33  
34      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PromiseNotifier.class);
35      private final Promise<? super V>[] promises;
36      private final boolean logNotifyFailure;
37  
38      /**
39       * Create a new instance.
40       *
41       * @param promises  the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
42       */
43      @SafeVarargs
44      public PromiseNotifier(Promise<? super V>... promises) {
45          this(true, promises);
46      }
47  
48      /**
49       * Create a new instance.
50       *
51       * @param logNotifyFailure {@code true} if logging should be done in case notification fails.
52       * @param promises  the {@link Promise}s to notify once this {@link GenericFutureListener} is notified.
53       */
54      @SafeVarargs
55      public PromiseNotifier(boolean logNotifyFailure, Promise<? super V>... promises) {
56          checkNotNull(promises, "promises");
57          for (Promise<? super V> promise: promises) {
58              checkNotNullWithIAE(promise, "promise");
59          }
60          this.promises = promises.clone();
61          this.logNotifyFailure = logNotifyFailure;
62      }
63  
64      /**
65       * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
66       * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
67       * the {@link Promise} is cancelled and vise-versa.
68       *
69       * @param future    the {@link Future} which will be used to listen to for notifying the {@link Promise}.
70       * @param promise   the {@link Promise} which will be notified
71       * @param <V>       the type of the value.
72       * @param <F>       the type of the {@link Future}
73       * @return          the passed in {@link Future}
74       */
75      public static <V, F extends Future<V>> F cascade(final F future, final Promise<? super V> promise) {
76          return cascade(true, future, promise);
77      }
78  
79      /**
80       * Link the {@link Future} and {@link Promise} such that if the {@link Future} completes the {@link Promise}
81       * will be notified. Cancellation is propagated both ways such that if the {@link Future} is cancelled
82       * the {@link Promise} is cancelled and vise-versa.
83       *
84       * @param logNotifyFailure  {@code true} if logging should be done in case notification fails.
85       * @param future            the {@link Future} which will be used to listen to for notifying the {@link Promise}.
86       * @param promise           the {@link Promise} which will be notified
87       * @param <V>               the type of the value.
88       * @param <F>               the type of the {@link Future}
89       * @return                  the passed in {@link Future}
90       */
91      @SuppressWarnings({"unchecked", "rawtypes"})
92      public static <V, F extends Future<V>> F cascade(boolean logNotifyFailure, final F future,
93                                                       final Promise<? super V> promise) {
94          promise.addListener(new FutureListener() {
95              @Override
96              public void operationComplete(Future f) {
97                  if (f.isCancelled()) {
98                      future.cancel(false);
99                  }
100             }
101         });
102         future.addListener(new PromiseNotifier(logNotifyFailure, promise) {
103             @Override
104             public void operationComplete(Future f) throws Exception {
105                 if (promise.isCancelled() && f.isCancelled()) {
106                     // Just return if we propagate a cancel from the promise to the future and both are notified already
107                     return;
108                 }
109                 super.operationComplete(future);
110             }
111         });
112         return future;
113     }
114 
115     @Override
116     public void operationComplete(F future) throws Exception {
117         InternalLogger internalLogger = logNotifyFailure ? logger : null;
118         if (future.isSuccess()) {
119             V result = future.get();
120             for (Promise<? super V> p: promises) {
121                 PromiseNotificationUtil.trySuccess(p, result, internalLogger);
122             }
123         } else if (future.isCancelled()) {
124             for (Promise<? super V> p: promises) {
125                 PromiseNotificationUtil.tryCancel(p, internalLogger);
126             }
127         } else {
128             Throwable cause = future.cause();
129             for (Promise<? super V> p: promises) {
130                 PromiseNotificationUtil.tryFailure(p, cause, internalLogger);
131             }
132         }
133     }
134 }