1 /*
2 * Copyright 2015 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
13 * the License.
14 */
15 package io.netty.handler.codec.http2;
16
17 import io.netty.util.internal.UnstableApi;
18
19 import java.util.ArrayDeque;
20 import java.util.Deque;
21
22 import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MIN_ALLOCATION_CHUNK;
23 import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
24 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
25 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
26 import static io.netty.util.internal.ObjectUtil.checkNotNull;
27 import static io.netty.util.internal.ObjectUtil.checkPositive;
28 import static java.lang.Math.max;
29 import static java.lang.Math.min;
30
31 /**
32 * A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
33 * streams. This class uses a minimum chunk size that will be allocated to each stream. While
34 * fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
35 * should improve the goodput on each written stream.
36 */
37 @UnstableApi
38 public final class UniformStreamByteDistributor implements StreamByteDistributor {
39 private final Http2Connection.PropertyKey stateKey;
40 private final Deque<State> queue = new ArrayDeque<State>(4);
41
42 /**
43 * The minimum number of bytes that we will attempt to allocate to a stream. This is to
44 * help improve goodput on a per-stream basis.
45 */
46 private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
47 private long totalStreamableBytes;
48
49 public UniformStreamByteDistributor(Http2Connection connection) {
50 // Add a state for the connection.
51 stateKey = connection.newKey();
52 Http2Stream connectionStream = connection.connectionStream();
53 connectionStream.setProperty(stateKey, new State(connectionStream));
54
55 // Register for notification of new streams.
56 connection.addListener(new Http2ConnectionAdapter() {
57 @Override
58 public void onStreamAdded(Http2Stream stream) {
59 stream.setProperty(stateKey, new State(stream));
60 }
61
62 @Override
63 public void onStreamClosed(Http2Stream stream) {
64 state(stream).close();
65 }
66 });
67 }
68
69 /**
70 * Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
71 *
72 * @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
73 * Must be > 0.
74 */
75 public void minAllocationChunk(int minAllocationChunk) {
76 checkPositive(minAllocationChunk, "minAllocationChunk");
77 this.minAllocationChunk = minAllocationChunk;
78 }
79
80 @Override
81 public void updateStreamableBytes(StreamState streamState) {
82 state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
83 streamState.hasFrame(),
84 streamState.windowSize());
85 }
86
87 @Override
88 public void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive) {
89 // This class ignores priority and dependency!
90 }
91
92 @Override
93 public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
94 final int size = queue.size();
95 if (size == 0) {
96 return totalStreamableBytes > 0;
97 }
98
99 final int chunkSize = max(minAllocationChunk, maxBytes / size);
100
101 State state = queue.pollFirst();
102 do {
103 state.enqueued = false;
104 if (state.windowNegative) {
105 continue;
106 }
107 if (maxBytes == 0 && state.streamableBytes > 0) {
108 // Stop at the first state that can't send. Add this state back to the head of the queue. Note
109 // that empty frames at the head of the queue will always be written, assuming the stream window
110 // is not negative.
111 queue.addFirst(state);
112 state.enqueued = true;
113 break;
114 }
115
116 // Allocate as much data as we can for this stream.
117 int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
118 maxBytes -= chunk;
119
120 // Write the allocated bytes and enqueue as necessary.
121 state.write(chunk, writer);
122 } while ((state = queue.pollFirst()) != null);
123
124 return totalStreamableBytes > 0;
125 }
126
127 private State state(Http2Stream stream) {
128 return checkNotNull(stream, "stream").getProperty(stateKey);
129 }
130
131 /**
132 * The remote flow control state for a single stream.
133 */
134 private final class State {
135 final Http2Stream stream;
136 int streamableBytes;
137 boolean windowNegative;
138 boolean enqueued;
139 boolean writing;
140
141 State(Http2Stream stream) {
142 this.stream = stream;
143 }
144
145 void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
146 assert hasFrame || newStreamableBytes == 0 :
147 "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;
148
149 int delta = newStreamableBytes - streamableBytes;
150 if (delta != 0) {
151 streamableBytes = newStreamableBytes;
152 totalStreamableBytes += delta;
153 }
154 // In addition to only enqueuing state when they have frames we enforce the following restrictions:
155 // 1. If the window has gone negative. We never want to queue a state. However we also don't want to
156 // Immediately remove the item if it is already queued because removal from deque is O(n). So
157 // we allow it to stay queued and rely on the distribution loop to remove this state.
158 // 2. If the window is zero we only want to queue if we are not writing. If we are writing that means
159 // we gave the state a chance to write zero length frames. We wait until updateStreamableBytes is
160 // called again before this state is allowed to write.
161 windowNegative = windowSize < 0;
162 if (hasFrame && (windowSize > 0 || windowSize == 0 && !writing)) {
163 addToQueue();
164 }
165 }
166
167 /**
168 * Write any allocated bytes for the given stream and updates the streamable bytes,
169 * assuming all of the bytes will be written.
170 */
171 void write(int numBytes, Writer writer) throws Http2Exception {
172 writing = true;
173 try {
174 // Write the allocated bytes.
175 writer.write(stream, numBytes);
176 } catch (Throwable t) {
177 throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
178 } finally {
179 writing = false;
180 }
181 }
182
183 void addToQueue() {
184 if (!enqueued) {
185 enqueued = true;
186 queue.addLast(this);
187 }
188 }
189
190 void removeFromQueue() {
191 if (enqueued) {
192 enqueued = false;
193 queue.remove(this);
194 }
195 }
196
197 void close() {
198 // Remove this state from the queue.
199 removeFromQueue();
200
201 // Clear the streamable bytes.
202 updateStreamableBytes(0, false, 0);
203 }
204 }
205 }