1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
13 * the License.
14 */
15 package io.netty.channel;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.buffer.CompositeByteBuf;
20 import io.netty.buffer.Unpooled;
21 import io.netty.util.internal.ObjectUtil;
22
23 /**
24 * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in
25 * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes
26 * out in a single buffer. Conversely the producer may add larger buffers and the consumer could take the bytes in
27 * many small buffers.
28 *
29 * <p>Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then
30 * that promise will complete when the promise passed to {@link #remove} completes.
31 *
32 * <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
33 * such as HTTP2.
34 */
35 public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
36 private final Channel channel;
37
38 public CoalescingBufferQueue(Channel channel) {
39 this(channel, 4);
40 }
41
42 public CoalescingBufferQueue(Channel channel, int initSize) {
43 this(channel, initSize, false);
44 }
45
46 public CoalescingBufferQueue(Channel channel, int initSize, boolean updateWritability) {
47 super(updateWritability ? channel : null, initSize);
48 this.channel = ObjectUtil.checkNotNull(channel, "channel");
49 }
50
51 /**
52 * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
53 * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
54 * completes.
55 *
56 * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
57 * than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
58 * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
59 * @return a {@link ByteBuf} composed of the enqueued buffers.
60 */
61 public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) {
62 return remove(channel.alloc(), bytes, aggregatePromise);
63 }
64
65 /**
66 * Release all buffers in the queue and complete all listeners and promises.
67 */
68 public void releaseAndFailAll(Throwable cause) {
69 releaseAndFailAll(channel, cause);
70 }
71
72 @Override
73 protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
74 if (cumulation instanceof CompositeByteBuf) {
75 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
76 composite.addComponent(true, next);
77 return composite;
78 }
79 return composeIntoComposite(alloc, cumulation, next);
80 }
81
82 @Override
83 protected ByteBuf removeEmptyValue() {
84 return Unpooled.EMPTY_BUFFER;
85 }
86 }