1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.channel;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21 /**
22 * This implementation allows to register {@link ChannelFuture} instances which will get notified once some amount of
23 * data was written and so a checkpoint was reached.
24 */
25 public final class ChannelFlushPromiseNotifier {
26
27 private long writeCounter;
28 private final Queue<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
29 private final boolean tryNotify;
30
31 /**
32 * Create a new instance
33 *
34 * @param tryNotify if {@code true} the {@link ChannelPromise}s will get notified with
35 * {@link ChannelPromise#trySuccess()} and {@link ChannelPromise#tryFailure(Throwable)}.
36 * Otherwise {@link ChannelPromise#setSuccess()} and {@link ChannelPromise#setFailure(Throwable)}
37 * is used
38 */
39 public ChannelFlushPromiseNotifier(boolean tryNotify) {
40 this.tryNotify = tryNotify;
41 }
42
43 /**
44 * Create a new instance which will use {@link ChannelPromise#setSuccess()} and
45 * {@link ChannelPromise#setFailure(Throwable)} to notify the {@link ChannelPromise}s.
46 */
47 public ChannelFlushPromiseNotifier() {
48 this(false);
49 }
50
51 /**
52 * @deprecated use {@link #add(ChannelPromise, long)}
53 */
54 @Deprecated
55 public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
56 return add(promise, (long) pendingDataSize);
57 }
58
59 /**
60 * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
61 * {@code pendingDataSize} was reached.
62 */
63 public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
64 if (promise == null) {
65 throw new NullPointerException("promise");
66 }
67 if (pendingDataSize < 0) {
68 throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
69 }
70 long checkpoint = writeCounter + pendingDataSize;
71 if (promise instanceof FlushCheckpoint) {
72 FlushCheckpoint cp = (FlushCheckpoint) promise;
73 cp.flushCheckpoint(checkpoint);
74 flushCheckpoints.add(cp);
75 } else {
76 flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, promise));
77 }
78 return this;
79 }
80 /**
81 * Increase the current write counter by the given delta
82 */
83 public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
84 if (delta < 0) {
85 throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
86 }
87 writeCounter += delta;
88 return this;
89 }
90
91 /**
92 * Return the current write counter of this {@link ChannelFlushPromiseNotifier}
93 */
94 public long writeCounter() {
95 return writeCounter;
96 }
97
98 /**
99 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
100 * their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}.
101 *
102 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
103 * so not receive anymore notification.
104 */
105 public ChannelFlushPromiseNotifier notifyPromises() {
106 notifyPromises0(null);
107 return this;
108 }
109
110 /**
111 * @deprecated use {@link #notifyPromises()}
112 */
113 @Deprecated
114 public ChannelFlushPromiseNotifier notifyFlushFutures() {
115 return notifyPromises();
116 }
117
118 /**
119 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
120 * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
121 *
122 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
123 * so not receive anymore notification.
124 *
125 * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
126 *
127 * So after this operation this {@link ChannelFutureListener} is empty.
128 */
129 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
130 notifyPromises();
131 for (;;) {
132 FlushCheckpoint cp = flushCheckpoints.poll();
133 if (cp == null) {
134 break;
135 }
136 if (tryNotify) {
137 cp.promise().tryFailure(cause);
138 } else {
139 cp.promise().setFailure(cause);
140 }
141 }
142 return this;
143 }
144
145 /**
146 * @deprecated use {@link #notifyPromises(Throwable)}
147 */
148 @Deprecated
149 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
150 return notifyPromises(cause);
151 }
152
153 /**
154 * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
155 * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
156 * the given cause1.
157 *
158 * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
159 * so not receive anymore notification.
160 *
161 * The rest of the remaining {@link ChannelFuture}s will be failed with the given {@link Throwable}.
162 *
163 * So after this operation this {@link ChannelFutureListener} is empty.
164 *
165 * @param cause1 the {@link Throwable} which will be used to fail all of the {@link ChannelFuture}s which
166 * pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
167 * @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
168 */
169 public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
170 notifyPromises0(cause1);
171 for (;;) {
172 FlushCheckpoint cp = flushCheckpoints.poll();
173 if (cp == null) {
174 break;
175 }
176 if (tryNotify) {
177 cp.promise().tryFailure(cause2);
178 } else {
179 cp.promise().setFailure(cause2);
180 }
181 }
182 return this;
183 }
184
185 /**
186 * @deprecated use {@link #notifyPromises(Throwable, Throwable)}
187 */
188 @Deprecated
189 public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
190 return notifyPromises(cause1, cause2);
191 }
192
193 private void notifyPromises0(Throwable cause) {
194 if (flushCheckpoints.isEmpty()) {
195 writeCounter = 0;
196 return;
197 }
198
199 final long writeCounter = this.writeCounter;
200 for (;;) {
201 FlushCheckpoint cp = flushCheckpoints.peek();
202 if (cp == null) {
203 // Reset the counter if there's nothing in the notification list.
204 this.writeCounter = 0;
205 break;
206 }
207
208 if (cp.flushCheckpoint() > writeCounter) {
209 if (writeCounter > 0 && flushCheckpoints.size() == 1) {
210 this.writeCounter = 0;
211 cp.flushCheckpoint(cp.flushCheckpoint() - writeCounter);
212 }
213 break;
214 }
215
216 flushCheckpoints.remove();
217 ChannelPromise promise = cp.promise();
218 if (cause == null) {
219 if (tryNotify) {
220 promise.trySuccess();
221 } else {
222 promise.setSuccess();
223 }
224 } else {
225 if (tryNotify) {
226 promise.tryFailure(cause);
227 } else {
228 promise.setFailure(cause);
229 }
230 }
231 }
232
233 // Avoid overflow
234 final long newWriteCounter = this.writeCounter;
235 if (newWriteCounter >= 0x8000000000L) {
236 // Reset the counter only when the counter grew pretty large
237 // so that we can reduce the cost of updating all entries in the notification list.
238 this.writeCounter = 0;
239 for (FlushCheckpoint cp: flushCheckpoints) {
240 cp.flushCheckpoint(cp.flushCheckpoint() - newWriteCounter);
241 }
242 }
243 }
244
245 interface FlushCheckpoint {
246 long flushCheckpoint();
247 void flushCheckpoint(long checkpoint);
248 ChannelPromise promise();
249 }
250
251 private static class DefaultFlushCheckpoint implements FlushCheckpoint {
252 private long checkpoint;
253 private final ChannelPromise future;
254
255 DefaultFlushCheckpoint(long checkpoint, ChannelPromise future) {
256 this.checkpoint = checkpoint;
257 this.future = future;
258 }
259
260 @Override
261 public long flushCheckpoint() {
262 return checkpoint;
263 }
264
265 @Override
266 public void flushCheckpoint(long checkpoint) {
267 this.checkpoint = checkpoint;
268 }
269
270 @Override
271 public ChannelPromise promise() {
272 return future;
273 }
274 }
275 }