1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.util.internal.UnstableApi;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import java.util.ArrayDeque;
25
26 import static io.netty.util.ReferenceCountUtil.safeRelease;
27 import static io.netty.util.internal.ObjectUtil.checkNotNull;
28 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
29 import static io.netty.util.internal.PlatformDependent.throwException;
30
31 @UnstableApi
32 public abstract class AbstractCoalescingBufferQueue {
33 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
34 private final ArrayDeque<Object> bufAndListenerPairs;
35 private final PendingBytesTracker tracker;
36 private int readableBytes;
37
38
39
40
41
42
43
44
45 protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
46 bufAndListenerPairs = new ArrayDeque<Object>(initSize);
47 tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
48 }
49
50
51
52
53
54
55
56 public final void addFirst(ByteBuf buf, ChannelPromise promise) {
57 addFirst(buf, toChannelFutureListener(promise));
58 }
59
60 private void addFirst(ByteBuf buf, ChannelFutureListener listener) {
61
62 buf.touch();
63
64 if (listener != null) {
65 bufAndListenerPairs.addFirst(listener);
66 }
67 bufAndListenerPairs.addFirst(buf);
68 incrementReadableBytes(buf.readableBytes());
69 }
70
71
72
73
74 public final void add(ByteBuf buf) {
75 add(buf, (ChannelFutureListener) null);
76 }
77
78
79
80
81
82
83
84 public final void add(ByteBuf buf, ChannelPromise promise) {
85
86
87 add(buf, toChannelFutureListener(promise));
88 }
89
90
91
92
93
94
95
96 public final void add(ByteBuf buf, ChannelFutureListener listener) {
97
98 buf.touch();
99
100
101
102 bufAndListenerPairs.add(buf);
103 if (listener != null) {
104 bufAndListenerPairs.add(listener);
105 }
106 incrementReadableBytes(buf.readableBytes());
107 }
108
109
110
111
112
113
114 public final ByteBuf removeFirst(ChannelPromise aggregatePromise) {
115 Object entry = bufAndListenerPairs.poll();
116 if (entry == null) {
117 return null;
118 }
119 assert entry instanceof ByteBuf;
120 ByteBuf result = (ByteBuf) entry;
121
122 decrementReadableBytes(result.readableBytes());
123
124 entry = bufAndListenerPairs.peek();
125 if (entry instanceof ChannelFutureListener) {
126 aggregatePromise.addListener((ChannelFutureListener) entry);
127 bufAndListenerPairs.poll();
128 }
129 return result;
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143 public final ByteBuf remove(ByteBufAllocator alloc, int bytes, ChannelPromise aggregatePromise) {
144 checkPositiveOrZero(bytes, "bytes");
145 checkNotNull(aggregatePromise, "aggregatePromise");
146
147
148 if (bufAndListenerPairs.isEmpty()) {
149 assert readableBytes == 0;
150 return removeEmptyValue();
151 }
152 bytes = Math.min(bytes, readableBytes);
153
154 ByteBuf toReturn = null;
155 ByteBuf entryBuffer = null;
156 int originalBytes = bytes;
157 try {
158 for (;;) {
159 Object entry = bufAndListenerPairs.poll();
160 if (entry == null) {
161 break;
162 }
163
164 if (entry instanceof ByteBuf) {
165 entryBuffer = (ByteBuf) entry;
166 int bufferBytes = entryBuffer.readableBytes();
167
168 if (bufferBytes > bytes) {
169
170 bufAndListenerPairs.addFirst(entryBuffer);
171 if (bytes > 0) {
172
173 entryBuffer = entryBuffer.readRetainedSlice(bytes);
174
175 toReturn = toReturn == null ? entryBuffer
176 : compose(alloc, toReturn, entryBuffer);
177 bytes = 0;
178 }
179 break;
180 }
181
182 bytes -= bufferBytes;
183 if (toReturn == null) {
184
185 toReturn = bufferBytes == readableBytes
186 ? entryBuffer
187 : composeFirst(alloc, entryBuffer);
188 } else {
189 toReturn = compose(alloc, toReturn, entryBuffer);
190 }
191 entryBuffer = null;
192 } else if (entry instanceof DelegatingChannelPromiseNotifier) {
193 aggregatePromise.addListener((DelegatingChannelPromiseNotifier) entry);
194 } else if (entry instanceof ChannelFutureListener) {
195 aggregatePromise.addListener((ChannelFutureListener) entry);
196 }
197 }
198 } catch (Throwable cause) {
199 safeRelease(entryBuffer);
200 safeRelease(toReturn);
201 aggregatePromise.setFailure(cause);
202 throwException(cause);
203 }
204 decrementReadableBytes(originalBytes - bytes);
205 return toReturn;
206 }
207
208
209
210
211 public final int readableBytes() {
212 return readableBytes;
213 }
214
215
216
217
218 public final boolean isEmpty() {
219 return bufAndListenerPairs.isEmpty();
220 }
221
222
223
224
225 public final void releaseAndFailAll(ChannelOutboundInvoker invoker, Throwable cause) {
226 releaseAndCompleteAll(invoker.newFailedFuture(cause));
227 }
228
229
230
231
232
233 public final void copyTo(AbstractCoalescingBufferQueue dest) {
234 dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
235 dest.incrementReadableBytes(readableBytes);
236 }
237
238
239
240
241
242 public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
243 Throwable pending = null;
244 ByteBuf previousBuf = null;
245 for (;;) {
246 Object entry = bufAndListenerPairs.poll();
247 try {
248 if (entry == null) {
249 if (previousBuf != null) {
250 decrementReadableBytes(previousBuf.readableBytes());
251 ctx.write(previousBuf, ctx.voidPromise());
252 }
253 break;
254 }
255
256 if (entry instanceof ByteBuf) {
257 if (previousBuf != null) {
258 decrementReadableBytes(previousBuf.readableBytes());
259 ctx.write(previousBuf, ctx.voidPromise());
260 }
261 previousBuf = (ByteBuf) entry;
262 } else if (entry instanceof ChannelPromise) {
263 decrementReadableBytes(previousBuf.readableBytes());
264 ctx.write(previousBuf, (ChannelPromise) entry);
265 previousBuf = null;
266 } else {
267 decrementReadableBytes(previousBuf.readableBytes());
268 ctx.write(previousBuf).addListener((ChannelFutureListener) entry);
269 previousBuf = null;
270 }
271 } catch (Throwable t) {
272 if (pending == null) {
273 pending = t;
274 } else {
275 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
276 }
277 }
278 }
279 if (pending != null) {
280 throw new IllegalStateException(pending);
281 }
282 }
283
284 @Override
285 public String toString() {
286 return "bytes: " + readableBytes + " buffers: " + (size() >> 1);
287 }
288
289
290
291
292 protected abstract ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next);
293
294
295
296
297 protected final ByteBuf composeIntoComposite(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
298
299
300 CompositeByteBuf composite = alloc.compositeBuffer(size() + 2);
301 try {
302 composite.addComponent(true, cumulation);
303 composite.addComponent(true, next);
304 } catch (Throwable cause) {
305 composite.release();
306 safeRelease(next);
307 throwException(cause);
308 }
309 return composite;
310 }
311
312
313
314
315
316
317
318
319 protected final ByteBuf copyAndCompose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
320 ByteBuf newCumulation = alloc.ioBuffer(cumulation.readableBytes() + next.readableBytes());
321 try {
322 newCumulation.writeBytes(cumulation).writeBytes(next);
323 } catch (Throwable cause) {
324 newCumulation.release();
325 safeRelease(next);
326 throwException(cause);
327 }
328 cumulation.release();
329 next.release();
330 return newCumulation;
331 }
332
333
334
335
336
337 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
338 return first;
339 }
340
341
342
343
344
345 protected abstract ByteBuf removeEmptyValue();
346
347
348
349
350
351 protected final int size() {
352 return bufAndListenerPairs.size();
353 }
354
355 private void releaseAndCompleteAll(ChannelFuture future) {
356 Throwable pending = null;
357 for (;;) {
358 Object entry = bufAndListenerPairs.poll();
359 if (entry == null) {
360 break;
361 }
362 try {
363 if (entry instanceof ByteBuf) {
364 ByteBuf buffer = (ByteBuf) entry;
365 decrementReadableBytes(buffer.readableBytes());
366 safeRelease(buffer);
367 } else {
368 ((ChannelFutureListener) entry).operationComplete(future);
369 }
370 } catch (Throwable t) {
371 if (pending == null) {
372 pending = t;
373 } else {
374 logger.info("Throwable being suppressed because Throwable {} is already pending", pending, t);
375 }
376 }
377 }
378 if (pending != null) {
379 throw new IllegalStateException(pending);
380 }
381 }
382
383 private void incrementReadableBytes(int increment) {
384 int nextReadableBytes = readableBytes + increment;
385 if (nextReadableBytes < readableBytes) {
386 throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
387 }
388 readableBytes = nextReadableBytes;
389 if (tracker != null) {
390 tracker.incrementPendingOutboundBytes(increment);
391 }
392 }
393
394 private void decrementReadableBytes(int decrement) {
395 readableBytes -= decrement;
396 assert readableBytes >= 0;
397 if (tracker != null) {
398 tracker.decrementPendingOutboundBytes(decrement);
399 }
400 }
401
402 private static ChannelFutureListener toChannelFutureListener(ChannelPromise promise) {
403 return promise.isVoid() ? null : new DelegatingChannelPromiseNotifier(promise);
404 }
405 }