查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.buffer.PooledByteBufAllocator;
19  import io.netty.buffer.Unpooled;
20  import io.netty.buffer.UnpooledByteBufAllocator;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
26  import io.netty.microbench.util.AbstractMicrobenchmark;
27  import org.openjdk.jmh.annotations.Benchmark;
28  import org.openjdk.jmh.annotations.BenchmarkMode;
29  import org.openjdk.jmh.annotations.Fork;
30  import org.openjdk.jmh.annotations.Level;
31  import org.openjdk.jmh.annotations.Measurement;
32  import org.openjdk.jmh.annotations.Mode;
33  import org.openjdk.jmh.annotations.OutputTimeUnit;
34  import org.openjdk.jmh.annotations.Param;
35  import org.openjdk.jmh.annotations.Scope;
36  import org.openjdk.jmh.annotations.Setup;
37  import org.openjdk.jmh.annotations.State;
38  import org.openjdk.jmh.annotations.TearDown;
39  import org.openjdk.jmh.annotations.Warmup;
40  
41  import java.util.concurrent.TimeUnit;
42  
43  import static io.netty.buffer.Unpooled.directBuffer;
44  import static io.netty.buffer.Unpooled.unreleasableBuffer;
45  import static io.netty.handler.codec.http2.Http2CodecUtil.DATA_FRAME_HEADER_LENGTH;
46  import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
47  import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_BYTE;
48  import static io.netty.handler.codec.http2.Http2CodecUtil.verifyPadding;
49  import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeaderInternal;
50  import static io.netty.handler.codec.http2.Http2FrameTypes.DATA;
51  import static io.netty.util.internal.ObjectUtil.checkPositive;
52  import static java.lang.Math.max;
53  import static java.lang.Math.min;
54  
55  @Fork(1)
56  @Warmup(iterations = 5)
57  @Measurement(iterations = 5)
58  @State(Scope.Benchmark)
59  @OutputTimeUnit(TimeUnit.NANOSECONDS)
60  public class Http2FrameWriterDataBenchmark extends AbstractMicrobenchmark {
61      @Param({ "64", "1024", "4096", "16384", "1048576", "4194304" })
62      public int payloadSize;
63  
64      @Param({ "0", "100", "255" })
65      public int padding;
66  
67      @Param({ "true", "false" })
68      public boolean pooled;
69  
70      private ByteBuf payload;
71      private ChannelHandlerContext ctx;
72      private Http2DataWriter writer;
73      private Http2DataWriter oldWriter;
74  
75      @Setup(Level.Trial)
76      public void setup() {
77          writer = new DefaultHttp2FrameWriter();
78          oldWriter = new OldDefaultHttp2FrameWriter();
79          payload = pooled ? PooledByteBufAllocator.DEFAULT.buffer(payloadSize) : Unpooled.buffer(payloadSize);
80          payload.writeZero(payloadSize);
81          ctx = new EmbeddedChannelWriteReleaseHandlerContext(
82                  pooled ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT,
83                  new ChannelInboundHandlerAdapter()) {
84              @Override
85              protected void handleException(Throwable t) {
86                  handleUnexpectedException(t);
87              }
88          };
89      }
90  
91      @TearDown(Level.Trial)
92      public void teardown() throws Exception {
93          if (payload != null) {
94              payload.release();
95          }
96          if (ctx != null) {
97              ctx.close();
98          }
99      }
100 
101     @Benchmark
102     @BenchmarkMode(Mode.AverageTime)
103     public void newWriter() {
104         writer.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
105         ctx.flush();
106     }
107 
108     @Benchmark
109     @BenchmarkMode(Mode.AverageTime)
110     public void oldWriter() {
111         oldWriter.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
112         ctx.flush();
113     }
114 
115     private static final class OldDefaultHttp2FrameWriter implements Http2DataWriter {
116         private static final ByteBuf ZERO_BUFFER =
117                 unreleasableBuffer(directBuffer(MAX_UNSIGNED_BYTE).writeZero(MAX_UNSIGNED_BYTE)).asReadOnly();
118         private final int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
119         @Override
120         public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
121                                        int padding, boolean endStream, ChannelPromise promise) {
122             final Http2CodecUtil.SimpleChannelPromiseAggregator promiseAggregator =
123                     new Http2CodecUtil.SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
124             final DataFrameHeader header = new DataFrameHeader(ctx, streamId);
125             boolean needToReleaseHeaders = true;
126             boolean needToReleaseData = true;
127             try {
128                 checkPositive(streamId, "streamId");
129                 verifyPadding(padding);
130 
131                 boolean lastFrame;
132                 int remainingData = data.readableBytes();
133                 do {
134                     // Determine how much data and padding to write in this frame. Put all padding at the end.
135                     int frameDataBytes = min(remainingData, maxFrameSize);
136                     int framePaddingBytes = min(padding, max(0, (maxFrameSize - 1) - frameDataBytes));
137 
138                     // Decrement the remaining counters.
139                     padding -= framePaddingBytes;
140                     remainingData -= frameDataBytes;
141 
142                     // Determine whether or not this is the last frame to be sent.
143                     lastFrame = remainingData == 0 && padding == 0;
144 
145                     // Only the last frame is not retained. Until then, the outer finally must release.
146                     ByteBuf frameHeader = header.slice(frameDataBytes, framePaddingBytes, lastFrame && endStream);
147                     needToReleaseHeaders = !lastFrame;
148                     ctx.write(lastFrame ? frameHeader : frameHeader.retain(), promiseAggregator.newPromise());
149 
150                     // Write the frame data.
151                     ByteBuf frameData = data.readSlice(frameDataBytes);
152                     // Only the last frame is not retained. Until then, the outer finally must release.
153                     needToReleaseData = !lastFrame;
154                     ctx.write(lastFrame ? frameData : frameData.retain(), promiseAggregator.newPromise());
155 
156                     // Write the frame padding.
157                     if (paddingBytes(framePaddingBytes) > 0) {
158                         ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)),
159                                 promiseAggregator.newPromise());
160                     }
161                 } while (!lastFrame);
162             } catch (Throwable t) {
163                 try {
164                     if (needToReleaseHeaders) {
165                         header.release();
166                     }
167                     if (needToReleaseData) {
168                         data.release();
169                     }
170                 } finally {
171                     promiseAggregator.setFailure(t);
172                     promiseAggregator.doneAllocatingPromises();
173                 }
174                 return promiseAggregator;
175             }
176             return promiseAggregator.doneAllocatingPromises();
177         }
178 
179         private static int paddingBytes(int padding) {
180             // The padding parameter contains the 1 byte pad length field as well as the trailing padding bytes.
181             // Subtract 1, so to only get the number of padding bytes that need to be appended to the end of a frame.
182             return padding - 1;
183         }
184 
185         private static void writePaddingLength(ByteBuf buf, int padding) {
186             if (padding > 0) {
187                 // It is assumed that the padding length has been bounds checked before this
188                 // Minus 1, as the pad length field is included in the padding parameter and is 1 byte wide.
189                 buf.writeByte(padding - 1);
190             }
191         }
192 
193         /**
194          * Utility class that manages the creation of frame header buffers for {@code DATA} frames. Attempts
195          * to reuse the same buffer repeatedly when splitting data into multiple frames.
196          */
197         private static final class DataFrameHeader {
198             private final int streamId;
199             private final ByteBuf buffer;
200             private final Http2Flags flags = new Http2Flags();
201             private int prevData;
202             private int prevPadding;
203             private ByteBuf frameHeader;
204 
205             DataFrameHeader(ChannelHandlerContext ctx, int streamId) {
206                 // All padding will be put at the end, so in the worst case we need 3 headers:
207                 // a repeated no-padding frame of maxFrameSize, a frame that has part data and part
208                 // padding, and a frame that has the remainder of the padding.
209                 buffer = ctx.alloc().buffer(3 * DATA_FRAME_HEADER_LENGTH);
210                 this.streamId = streamId;
211             }
212 
213             /**
214              * Gets the frame header buffer configured for the current frame.
215              */
216             ByteBuf slice(int data, int padding, boolean endOfStream) {
217                 // Since we're reusing the current frame header whenever possible, check if anything changed
218                 // that requires a new header.
219                 if (data != prevData || padding != prevPadding
220                         || endOfStream != flags.endOfStream() || frameHeader == null) {
221                     // Update the header state.
222                     prevData = data;
223                     prevPadding = padding;
224                     flags.paddingPresent(padding > 0);
225                     flags.endOfStream(endOfStream);
226                     frameHeader = buffer.slice(buffer.readerIndex(), DATA_FRAME_HEADER_LENGTH).writerIndex(0);
227                     buffer.setIndex(buffer.readerIndex() + DATA_FRAME_HEADER_LENGTH,
228                             buffer.writerIndex() + DATA_FRAME_HEADER_LENGTH);
229 
230                     int payloadLength = data + padding;
231                     writeFrameHeaderInternal(frameHeader, payloadLength, DATA, flags, streamId);
232                     writePaddingLength(frameHeader, padding);
233                 }
234                 return frameHeader.slice();
235             }
236 
237             void release() {
238                 buffer.release();
239             }
240         }
241     }
242 }