1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.spdy;
17
18 import io.netty.channel.ChannelPromise;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.util.Comparator;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.TreeMap;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import static io.netty.handler.codec.spdy.SpdyCodecUtil.*;
29
30 final class SpdySession {
31
32 private final AtomicInteger activeLocalStreams = new AtomicInteger();
33 private final AtomicInteger activeRemoteStreams = new AtomicInteger();
34 private final Map<Integer, StreamState> activeStreams = PlatformDependent.newConcurrentHashMap();
35 private final StreamComparator streamComparator = new StreamComparator();
36 private final AtomicInteger sendWindowSize;
37 private final AtomicInteger receiveWindowSize;
38
39 SpdySession(int sendWindowSize, int receiveWindowSize) {
40 this.sendWindowSize = new AtomicInteger(sendWindowSize);
41 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
42 }
43
44 int numActiveStreams(boolean remote) {
45 if (remote) {
46 return activeRemoteStreams.get();
47 } else {
48 return activeLocalStreams.get();
49 }
50 }
51
52 boolean noActiveStreams() {
53 return activeStreams.isEmpty();
54 }
55
56 boolean isActiveStream(int streamId) {
57 return activeStreams.containsKey(streamId);
58 }
59
60
61 Map<Integer, StreamState> activeStreams() {
62 Map<Integer, StreamState> streams = new TreeMap<Integer, StreamState>(streamComparator);
63 streams.putAll(activeStreams);
64 return streams;
65 }
66
67 void acceptStream(
68 int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed,
69 int sendWindowSize, int receiveWindowSize, boolean remote) {
70 if (!remoteSideClosed || !localSideClosed) {
71 StreamState state = activeStreams.put(streamId, new StreamState(
72 priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
73 if (state == null) {
74 if (remote) {
75 activeRemoteStreams.incrementAndGet();
76 } else {
77 activeLocalStreams.incrementAndGet();
78 }
79 }
80 }
81 }
82
83 private StreamState removeActiveStream(int streamId, boolean remote) {
84 StreamState state = activeStreams.remove(streamId);
85 if (state != null) {
86 if (remote) {
87 activeRemoteStreams.decrementAndGet();
88 } else {
89 activeLocalStreams.decrementAndGet();
90 }
91 }
92 return state;
93 }
94
95 void removeStream(int streamId, Throwable cause, boolean remote) {
96 StreamState state = removeActiveStream(streamId, remote);
97 if (state != null) {
98 state.clearPendingWrites(cause);
99 }
100 }
101
102 boolean isRemoteSideClosed(int streamId) {
103 StreamState state = activeStreams.get(streamId);
104 return state == null || state.isRemoteSideClosed();
105 }
106
107 void closeRemoteSide(int streamId, boolean remote) {
108 StreamState state = activeStreams.get(streamId);
109 if (state != null) {
110 state.closeRemoteSide();
111 if (state.isLocalSideClosed()) {
112 removeActiveStream(streamId, remote);
113 }
114 }
115 }
116
117 boolean isLocalSideClosed(int streamId) {
118 StreamState state = activeStreams.get(streamId);
119 return state == null || state.isLocalSideClosed();
120 }
121
122 void closeLocalSide(int streamId, boolean remote) {
123 StreamState state = activeStreams.get(streamId);
124 if (state != null) {
125 state.closeLocalSide();
126 if (state.isRemoteSideClosed()) {
127 removeActiveStream(streamId, remote);
128 }
129 }
130 }
131
132
133
134
135
136 boolean hasReceivedReply(int streamId) {
137 StreamState state = activeStreams.get(streamId);
138 return state != null && state.hasReceivedReply();
139 }
140
141 void receivedReply(int streamId) {
142 StreamState state = activeStreams.get(streamId);
143 if (state != null) {
144 state.receivedReply();
145 }
146 }
147
148 int getSendWindowSize(int streamId) {
149 if (streamId == SPDY_SESSION_STREAM_ID) {
150 return sendWindowSize.get();
151 }
152
153 StreamState state = activeStreams.get(streamId);
154 return state != null ? state.getSendWindowSize() : -1;
155 }
156
157 int updateSendWindowSize(int streamId, int deltaWindowSize) {
158 if (streamId == SPDY_SESSION_STREAM_ID) {
159 return sendWindowSize.addAndGet(deltaWindowSize);
160 }
161
162 StreamState state = activeStreams.get(streamId);
163 return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
164 }
165
166 int updateReceiveWindowSize(int streamId, int deltaWindowSize) {
167 if (streamId == SPDY_SESSION_STREAM_ID) {
168 return receiveWindowSize.addAndGet(deltaWindowSize);
169 }
170
171 StreamState state = activeStreams.get(streamId);
172 if (state == null) {
173 return -1;
174 }
175 if (deltaWindowSize > 0) {
176 state.setReceiveWindowSizeLowerBound(0);
177 }
178 return state.updateReceiveWindowSize(deltaWindowSize);
179 }
180
181 int getReceiveWindowSizeLowerBound(int streamId) {
182 if (streamId == SPDY_SESSION_STREAM_ID) {
183 return 0;
184 }
185
186 StreamState state = activeStreams.get(streamId);
187 return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
188 }
189
190 void updateAllSendWindowSizes(int deltaWindowSize) {
191 for (StreamState state: activeStreams.values()) {
192 state.updateSendWindowSize(deltaWindowSize);
193 }
194 }
195
196 void updateAllReceiveWindowSizes(int deltaWindowSize) {
197 for (StreamState state: activeStreams.values()) {
198 state.updateReceiveWindowSize(deltaWindowSize);
199 if (deltaWindowSize < 0) {
200 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
201 }
202 }
203 }
204
205 boolean putPendingWrite(int streamId, PendingWrite pendingWrite) {
206 StreamState state = activeStreams.get(streamId);
207 return state != null && state.putPendingWrite(pendingWrite);
208 }
209
210 PendingWrite getPendingWrite(int streamId) {
211 if (streamId == SPDY_SESSION_STREAM_ID) {
212 for (Map.Entry<Integer, StreamState> e: activeStreams().entrySet()) {
213 StreamState state = e.getValue();
214 if (state.getSendWindowSize() > 0) {
215 PendingWrite pendingWrite = state.getPendingWrite();
216 if (pendingWrite != null) {
217 return pendingWrite;
218 }
219 }
220 }
221 return null;
222 }
223
224 StreamState state = activeStreams.get(streamId);
225 return state != null ? state.getPendingWrite() : null;
226 }
227
228 PendingWrite removePendingWrite(int streamId) {
229 StreamState state = activeStreams.get(streamId);
230 return state != null ? state.removePendingWrite() : null;
231 }
232
233 private static final class StreamState {
234
235 private final byte priority;
236 private boolean remoteSideClosed;
237 private boolean localSideClosed;
238 private boolean receivedReply;
239 private final AtomicInteger sendWindowSize;
240 private final AtomicInteger receiveWindowSize;
241 private int receiveWindowSizeLowerBound;
242 private final Queue<PendingWrite> pendingWriteQueue = new ConcurrentLinkedQueue<PendingWrite>();
243
244 StreamState(
245 byte priority, boolean remoteSideClosed, boolean localSideClosed,
246 int sendWindowSize, int receiveWindowSize) {
247 this.priority = priority;
248 this.remoteSideClosed = remoteSideClosed;
249 this.localSideClosed = localSideClosed;
250 this.sendWindowSize = new AtomicInteger(sendWindowSize);
251 this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
252 }
253
254 byte getPriority() {
255 return priority;
256 }
257
258 boolean isRemoteSideClosed() {
259 return remoteSideClosed;
260 }
261
262 void closeRemoteSide() {
263 remoteSideClosed = true;
264 }
265
266 boolean isLocalSideClosed() {
267 return localSideClosed;
268 }
269
270 void closeLocalSide() {
271 localSideClosed = true;
272 }
273
274 boolean hasReceivedReply() {
275 return receivedReply;
276 }
277
278 void receivedReply() {
279 receivedReply = true;
280 }
281
282 int getSendWindowSize() {
283 return sendWindowSize.get();
284 }
285
286 int updateSendWindowSize(int deltaWindowSize) {
287 return sendWindowSize.addAndGet(deltaWindowSize);
288 }
289
290 int updateReceiveWindowSize(int deltaWindowSize) {
291 return receiveWindowSize.addAndGet(deltaWindowSize);
292 }
293
294 int getReceiveWindowSizeLowerBound() {
295 return receiveWindowSizeLowerBound;
296 }
297
298 void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
299 this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
300 }
301
302 boolean putPendingWrite(PendingWrite msg) {
303 return pendingWriteQueue.offer(msg);
304 }
305
306 PendingWrite getPendingWrite() {
307 return pendingWriteQueue.peek();
308 }
309
310 PendingWrite removePendingWrite() {
311 return pendingWriteQueue.poll();
312 }
313
314 void clearPendingWrites(Throwable cause) {
315 for (;;) {
316 PendingWrite pendingWrite = pendingWriteQueue.poll();
317 if (pendingWrite == null) {
318 break;
319 }
320 pendingWrite.fail(cause);
321 }
322 }
323 }
324
325 private final class StreamComparator implements Comparator<Integer> {
326
327 StreamComparator() { }
328
329 @Override
330 public int compare(Integer id1, Integer id2) {
331 StreamState state1 = activeStreams.get(id1);
332 StreamState state2 = activeStreams.get(id2);
333
334 int result = state1.getPriority() - state2.getPriority();
335 if (result != 0) {
336 return result;
337 }
338
339 return id1 - id2;
340 }
341 }
342
343 public static final class PendingWrite {
344 final SpdyDataFrame spdyDataFrame;
345 final ChannelPromise promise;
346
347 PendingWrite(SpdyDataFrame spdyDataFrame, ChannelPromise promise) {
348 this.spdyDataFrame = spdyDataFrame;
349 this.promise = promise;
350 }
351
352 void fail(Throwable cause) {
353 spdyDataFrame.release();
354 promise.setFailure(cause);
355 }
356 }
357 }