查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.channel;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.CompositeByteBuf;
20  import io.netty.buffer.Unpooled;
21  import io.netty.util.internal.ObjectUtil;
22  
23  /**
24   * A FIFO queue of bytes where producers add bytes by repeatedly adding {@link ByteBuf} and consumers take bytes in
25   * arbitrary lengths. This allows producers to add lots of small buffers and the consumer to take all the bytes
26   * out in a single buffer. Conversely the producer may add larger buffers and the consumer could take the bytes in
27   * many small buffers.
28   *
29   * <p>Bytes are added and removed with promises. If the last byte of a buffer added with a promise is removed then
30   * that promise will complete when the promise passed to {@link #remove} completes.
31   *
32   * <p>This functionality is useful for aggregating or partitioning writes into fixed size buffers for framing protocols
33   * such as HTTP2.
34   */
35  public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
36      private final Channel channel;
37  
38      public CoalescingBufferQueue(Channel channel) {
39          this(channel, 4);
40      }
41  
42      public CoalescingBufferQueue(Channel channel, int initSize) {
43          this(channel, initSize, false);
44      }
45  
46      public CoalescingBufferQueue(Channel channel, int initSize, boolean updateWritability) {
47          super(updateWritability ? channel : null, initSize);
48          this.channel = ObjectUtil.checkNotNull(channel, "channel");
49      }
50  
51      /**
52       * Remove a {@link ByteBuf} from the queue with the specified number of bytes. Any added buffer who's bytes are
53       * fully consumed during removal will have it's promise completed when the passed aggregate {@link ChannelPromise}
54       * completes.
55       *
56       * @param bytes the maximum number of readable bytes in the returned {@link ByteBuf}, if {@code bytes} is greater
57       *              than {@link #readableBytes} then a buffer of length {@link #readableBytes} is returned.
58       * @param aggregatePromise used to aggregate the promises and listeners for the constituent buffers.
59       * @return a {@link ByteBuf} composed of the enqueued buffers.
60       */
61      public ByteBuf remove(int bytes, ChannelPromise aggregatePromise) {
62          return remove(channel.alloc(), bytes, aggregatePromise);
63      }
64  
65      /**
66       *  Release all buffers in the queue and complete all listeners and promises.
67       */
68      public void releaseAndFailAll(Throwable cause) {
69          releaseAndFailAll(channel, cause);
70      }
71  
72      @Override
73      protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
74          if (cumulation instanceof CompositeByteBuf) {
75              CompositeByteBuf composite = (CompositeByteBuf) cumulation;
76              composite.addComponent(true, next);
77              return composite;
78          }
79          return composeIntoComposite(alloc, cumulation, next);
80      }
81  
82      @Override
83      protected ByteBuf removeEmptyValue() {
84          return Unpooled.EMPTY_BUFFER;
85      }
86  }