查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.spdy;
17  
18  import io.netty.channel.ChannelDuplexHandler;
19  import io.netty.channel.ChannelFuture;
20  import io.netty.channel.ChannelFutureListener;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelPromise;
23  import io.netty.util.internal.ObjectUtil;
24  
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import static io.netty.handler.codec.spdy.SpdyCodecUtil.SPDY_SESSION_STREAM_ID;
28  import static io.netty.handler.codec.spdy.SpdyCodecUtil.isServerId;
29  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
30  
31  /**
32   * Manages streams within a SPDY session.
33   */
34  public class SpdySessionHandler extends ChannelDuplexHandler {
35  
36      private static final SpdyProtocolException PROTOCOL_EXCEPTION =
37              SpdyProtocolException.newStatic(null, SpdySessionHandler.class, "handleOutboundMessage(...)");
38      private static final SpdyProtocolException STREAM_CLOSED =
39              SpdyProtocolException.newStatic("Stream closed", SpdySessionHandler.class, "removeStream(...)");
40  
41      private static final int DEFAULT_WINDOW_SIZE = 64 * 1024; // 64 KB default initial window size
42      private int initialSendWindowSize    = DEFAULT_WINDOW_SIZE;
43      private int initialReceiveWindowSize = DEFAULT_WINDOW_SIZE;
44      private volatile int initialSessionReceiveWindowSize = DEFAULT_WINDOW_SIZE;
45  
46      private final SpdySession spdySession = new SpdySession(initialSendWindowSize, initialReceiveWindowSize);
47      private int lastGoodStreamId;
48  
49      private static final int DEFAULT_MAX_CONCURRENT_STREAMS = Integer.MAX_VALUE;
50      private int remoteConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
51      private int localConcurrentStreams  = DEFAULT_MAX_CONCURRENT_STREAMS;
52  
53      private final AtomicInteger pings = new AtomicInteger();
54  
55      private boolean sentGoAwayFrame;
56      private boolean receivedGoAwayFrame;
57  
58      private ChannelFutureListener closeSessionFutureListener;
59  
60      private final boolean server;
61      private final int minorVersion;
62  
63      /**
64       * Creates a new session handler.
65       *
66       * @param version the protocol version
67       * @param server  {@code true} if and only if this session handler should
68       *                handle the server endpoint of the connection.
69       *                {@code false} if and only if this session handler should
70       *                handle the client endpoint of the connection.
71       */
72      public SpdySessionHandler(SpdyVersion version, boolean server) {
73          this.minorVersion = ObjectUtil.checkNotNull(version, "version").getMinorVersion();
74          this.server = server;
75      }
76  
77      public void setSessionReceiveWindowSize(int sessionReceiveWindowSize) {
78          checkPositiveOrZero(sessionReceiveWindowSize, "sessionReceiveWindowSize");
79          // This will not send a window update frame immediately.
80          // If this value increases the allowed receive window size,
81          // a WINDOW_UPDATE frame will be sent when only half of the
82          // session window size remains during data frame processing.
83          // If this value decreases the allowed receive window size,
84          // the window will be reduced as data frames are processed.
85          initialSessionReceiveWindowSize = sessionReceiveWindowSize;
86      }
87  
88      @Override
89      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
90          if (msg instanceof SpdyDataFrame) {
91  
92              /*
93               * SPDY Data frame processing requirements:
94               *
95               * If an endpoint receives a data frame for a Stream-ID which is not open
96               * and the endpoint has not sent a GOAWAY frame, it must issue a stream error
97               * with the error code INVALID_STREAM for the Stream-ID.
98               *
99               * If an endpoint which created the stream receives a data frame before receiving
100              * a SYN_REPLY on that stream, it is a protocol error, and the recipient must
101              * issue a stream error with the getStatus code PROTOCOL_ERROR for the Stream-ID.
102              *
103              * If an endpoint receives multiple data frames for invalid Stream-IDs,
104              * it may close the session.
105              *
106              * If an endpoint refuses a stream it must ignore any data frames for that stream.
107              *
108              * If an endpoint receives a data frame after the stream is half-closed from the
109              * sender, it must send a RST_STREAM frame with the getStatus STREAM_ALREADY_CLOSED.
110              *
111              * If an endpoint receives a data frame after the stream is closed, it must send
112              * a RST_STREAM frame with the getStatus PROTOCOL_ERROR.
113              */
114             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
115             int streamId = spdyDataFrame.streamId();
116 
117             int deltaWindowSize = -1 * spdyDataFrame.content().readableBytes();
118             int newSessionWindowSize =
119                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, deltaWindowSize);
120 
121             // Check if session window size is reduced beyond allowable lower bound
122             if (newSessionWindowSize < 0) {
123                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
124                 return;
125             }
126 
127             // Send a WINDOW_UPDATE frame if less than half the session window size remains
128             if (newSessionWindowSize <= initialSessionReceiveWindowSize / 2) {
129                 int sessionDeltaWindowSize = initialSessionReceiveWindowSize - newSessionWindowSize;
130                 spdySession.updateReceiveWindowSize(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
131                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
132                     new DefaultSpdyWindowUpdateFrame(SPDY_SESSION_STREAM_ID, sessionDeltaWindowSize);
133                 ctx.writeAndFlush(spdyWindowUpdateFrame);
134             }
135 
136             // Check if we received a data frame for a Stream-ID which is not open
137 
138             if (!spdySession.isActiveStream(streamId)) {
139                 spdyDataFrame.release();
140                 if (streamId <= lastGoodStreamId) {
141                     issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
142                 } else if (!sentGoAwayFrame) {
143                     issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
144                 }
145                 return;
146             }
147 
148             // Check if we received a data frame for a stream which is half-closed
149 
150             if (spdySession.isRemoteSideClosed(streamId)) {
151                 spdyDataFrame.release();
152                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_ALREADY_CLOSED);
153                 return;
154             }
155 
156             // Check if we received a data frame before receiving a SYN_REPLY
157             if (!isRemoteInitiatedId(streamId) && !spdySession.hasReceivedReply(streamId)) {
158                 spdyDataFrame.release();
159                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
160                 return;
161             }
162 
163             /*
164              * SPDY Data frame flow control processing requirements:
165              *
166              * Recipient should not send a WINDOW_UPDATE frame as it consumes the last data frame.
167              */
168 
169             // Update receive window size
170             int newWindowSize = spdySession.updateReceiveWindowSize(streamId, deltaWindowSize);
171 
172             // Window size can become negative if we sent a SETTINGS frame that reduces the
173             // size of the transfer window after the peer has written data frames.
174             // The value is bounded by the length that SETTINGS frame decrease the window.
175             // This difference is stored for the session when writing the SETTINGS frame
176             // and is cleared once we send a WINDOW_UPDATE frame.
177             if (newWindowSize < spdySession.getReceiveWindowSizeLowerBound(streamId)) {
178                 spdyDataFrame.release();
179                 issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
180                 return;
181             }
182 
183             // Window size became negative due to sender writing frame before receiving SETTINGS
184             // Send data frames upstream in initialReceiveWindowSize chunks
185             if (newWindowSize < 0) {
186                 while (spdyDataFrame.content().readableBytes() > initialReceiveWindowSize) {
187                     SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
188                             streamId, spdyDataFrame.content().readRetainedSlice(initialReceiveWindowSize));
189                     ctx.writeAndFlush(partialDataFrame);
190                 }
191             }
192 
193             // Send a WINDOW_UPDATE frame if less than half the stream window size remains
194             if (newWindowSize <= initialReceiveWindowSize / 2 && !spdyDataFrame.isLast()) {
195                 int streamDeltaWindowSize = initialReceiveWindowSize - newWindowSize;
196                 spdySession.updateReceiveWindowSize(streamId, streamDeltaWindowSize);
197                 SpdyWindowUpdateFrame spdyWindowUpdateFrame =
198                         new DefaultSpdyWindowUpdateFrame(streamId, streamDeltaWindowSize);
199                 ctx.writeAndFlush(spdyWindowUpdateFrame);
200             }
201 
202             // Close the remote side of the stream if this is the last frame
203             if (spdyDataFrame.isLast()) {
204                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
205             }
206 
207         } else if (msg instanceof SpdySynStreamFrame) {
208 
209             /*
210              * SPDY SYN_STREAM frame processing requirements:
211              *
212              * If an endpoint receives a SYN_STREAM with a Stream-ID that is less than
213              * any previously received SYN_STREAM, it must issue a session error with
214              * the getStatus PROTOCOL_ERROR.
215              *
216              * If an endpoint receives multiple SYN_STREAM frames with the same active
217              * Stream-ID, it must issue a stream error with the getStatus code PROTOCOL_ERROR.
218              *
219              * The recipient can reject a stream by sending a stream error with the
220              * getStatus code REFUSED_STREAM.
221              */
222 
223             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
224             int streamId = spdySynStreamFrame.streamId();
225 
226             // Check if we received a valid SYN_STREAM frame
227             if (spdySynStreamFrame.isInvalid() ||
228                 !isRemoteInitiatedId(streamId) ||
229                 spdySession.isActiveStream(streamId)) {
230                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
231                 return;
232             }
233 
234             // Stream-IDs must be monotonically increasing
235             if (streamId <= lastGoodStreamId) {
236                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
237                 return;
238             }
239 
240             // Try to accept the stream
241             byte priority = spdySynStreamFrame.priority();
242             boolean remoteSideClosed = spdySynStreamFrame.isLast();
243             boolean localSideClosed = spdySynStreamFrame.isUnidirectional();
244             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
245                 issueStreamError(ctx, streamId, SpdyStreamStatus.REFUSED_STREAM);
246                 return;
247             }
248 
249         } else if (msg instanceof SpdySynReplyFrame) {
250 
251             /*
252              * SPDY SYN_REPLY frame processing requirements:
253              *
254              * If an endpoint receives multiple SYN_REPLY frames for the same active Stream-ID
255              * it must issue a stream error with the getStatus code STREAM_IN_USE.
256              */
257 
258             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
259             int streamId = spdySynReplyFrame.streamId();
260 
261             // Check if we received a valid SYN_REPLY frame
262             if (spdySynReplyFrame.isInvalid() ||
263                 isRemoteInitiatedId(streamId) ||
264                 spdySession.isRemoteSideClosed(streamId)) {
265                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
266                 return;
267             }
268 
269             // Check if we have received multiple frames for the same Stream-ID
270             if (spdySession.hasReceivedReply(streamId)) {
271                 issueStreamError(ctx, streamId, SpdyStreamStatus.STREAM_IN_USE);
272                 return;
273             }
274 
275             spdySession.receivedReply(streamId);
276 
277             // Close the remote side of the stream if this is the last frame
278             if (spdySynReplyFrame.isLast()) {
279                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
280             }
281 
282         } else if (msg instanceof SpdyRstStreamFrame) {
283 
284             /*
285              * SPDY RST_STREAM frame processing requirements:
286              *
287              * After receiving a RST_STREAM on a stream, the receiver must not send
288              * additional frames on that stream.
289              *
290              * An endpoint must not send a RST_STREAM in response to a RST_STREAM.
291              */
292 
293             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
294             removeStream(spdyRstStreamFrame.streamId(), ctx.newSucceededFuture());
295 
296         } else if (msg instanceof SpdySettingsFrame) {
297 
298             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
299 
300             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
301             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
302                 // Settings frame had the wrong minor version
303                 issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
304                 return;
305             }
306 
307             int newConcurrentStreams =
308                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
309             if (newConcurrentStreams >= 0) {
310                 remoteConcurrentStreams = newConcurrentStreams;
311             }
312 
313             // Persistence flag are inconsistent with the use of SETTINGS to communicate
314             // the initial window size. Remove flags from the sender requesting that the
315             // value be persisted. Remove values that the sender indicates are persisted.
316             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
317                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
318             }
319             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
320 
321             int newInitialWindowSize =
322                 spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
323             if (newInitialWindowSize >= 0) {
324                 updateInitialSendWindowSize(newInitialWindowSize);
325             }
326 
327         } else if (msg instanceof SpdyPingFrame) {
328 
329             /*
330              * SPDY PING frame processing requirements:
331              *
332              * Receivers of a PING frame should send an identical frame to the sender
333              * as soon as possible.
334              *
335              * Receivers of a PING frame must ignore frames that it did not initiate
336              */
337 
338             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
339 
340             if (isRemoteInitiatedId(spdyPingFrame.id())) {
341                 ctx.writeAndFlush(spdyPingFrame);
342                 return;
343             }
344 
345             // Note: only checks that there are outstanding pings since uniqueness is not enforced
346             if (pings.get() == 0) {
347                 return;
348             }
349             pings.getAndDecrement();
350 
351         } else if (msg instanceof SpdyGoAwayFrame) {
352 
353             receivedGoAwayFrame = true;
354 
355         } else if (msg instanceof SpdyHeadersFrame) {
356 
357             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
358             int streamId = spdyHeadersFrame.streamId();
359 
360             // Check if we received a valid HEADERS frame
361             if (spdyHeadersFrame.isInvalid()) {
362                 issueStreamError(ctx, streamId, SpdyStreamStatus.PROTOCOL_ERROR);
363                 return;
364             }
365 
366             if (spdySession.isRemoteSideClosed(streamId)) {
367                 issueStreamError(ctx, streamId, SpdyStreamStatus.INVALID_STREAM);
368                 return;
369             }
370 
371             // Close the remote side of the stream if this is the last frame
372             if (spdyHeadersFrame.isLast()) {
373                 halfCloseStream(streamId, true, ctx.newSucceededFuture());
374             }
375 
376         } else if (msg instanceof SpdyWindowUpdateFrame) {
377 
378             /*
379              * SPDY WINDOW_UPDATE frame processing requirements:
380              *
381              * Receivers of a WINDOW_UPDATE that cause the window size to exceed 2^31
382              * must send a RST_STREAM with the getStatus code FLOW_CONTROL_ERROR.
383              *
384              * Sender should ignore all WINDOW_UPDATE frames associated with a stream
385              * after sending the last frame for the stream.
386              */
387 
388             SpdyWindowUpdateFrame spdyWindowUpdateFrame = (SpdyWindowUpdateFrame) msg;
389             int streamId = spdyWindowUpdateFrame.streamId();
390             int deltaWindowSize = spdyWindowUpdateFrame.deltaWindowSize();
391 
392             // Ignore frames for half-closed streams
393             if (streamId != SPDY_SESSION_STREAM_ID && spdySession.isLocalSideClosed(streamId)) {
394                 return;
395             }
396 
397             // Check for numerical overflow
398             if (spdySession.getSendWindowSize(streamId) > Integer.MAX_VALUE - deltaWindowSize) {
399                 if (streamId == SPDY_SESSION_STREAM_ID) {
400                     issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
401                 } else {
402                     issueStreamError(ctx, streamId, SpdyStreamStatus.FLOW_CONTROL_ERROR);
403                 }
404                 return;
405             }
406 
407             updateSendWindowSize(ctx, streamId, deltaWindowSize);
408         }
409 
410         ctx.fireChannelRead(msg);
411     }
412 
413     @Override
414     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
415         for (Integer streamId: spdySession.activeStreams().keySet()) {
416             removeStream(streamId, ctx.newSucceededFuture());
417         }
418         ctx.fireChannelInactive();
419     }
420 
421     @Override
422     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
423         if (cause instanceof SpdyProtocolException) {
424             issueSessionError(ctx, SpdySessionStatus.PROTOCOL_ERROR);
425         }
426 
427         ctx.fireExceptionCaught(cause);
428     }
429 
430     @Override
431     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
432         sendGoAwayFrame(ctx, promise);
433     }
434 
435     @Override
436     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
437         if (msg instanceof SpdyDataFrame ||
438             msg instanceof SpdySynStreamFrame ||
439             msg instanceof SpdySynReplyFrame ||
440             msg instanceof SpdyRstStreamFrame ||
441             msg instanceof SpdySettingsFrame ||
442             msg instanceof SpdyPingFrame ||
443             msg instanceof SpdyGoAwayFrame ||
444             msg instanceof SpdyHeadersFrame ||
445             msg instanceof SpdyWindowUpdateFrame) {
446 
447             handleOutboundMessage(ctx, msg, promise);
448         } else {
449             ctx.write(msg, promise);
450         }
451     }
452 
453     private void handleOutboundMessage(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
454         if (msg instanceof SpdyDataFrame) {
455 
456             SpdyDataFrame spdyDataFrame = (SpdyDataFrame) msg;
457             int streamId = spdyDataFrame.streamId();
458 
459             // Frames must not be sent on half-closed streams
460             if (spdySession.isLocalSideClosed(streamId)) {
461                 spdyDataFrame.release();
462                 promise.setFailure(PROTOCOL_EXCEPTION);
463                 return;
464             }
465 
466             /*
467              * SPDY Data frame flow control processing requirements:
468              *
469              * Sender must not send a data frame with data length greater
470              * than the transfer window size.
471              *
472              * After sending each data frame, the sender decrements its
473              * transfer window size by the amount of data transmitted.
474              *
475              * When the window size becomes less than or equal to 0, the
476              * sender must pause transmitting data frames.
477              */
478 
479             int dataLength = spdyDataFrame.content().readableBytes();
480             int sendWindowSize = spdySession.getSendWindowSize(streamId);
481             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
482             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
483 
484             if (sendWindowSize <= 0) {
485                 // Stream is stalled -- enqueue Data frame and return
486                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
487                 return;
488             } else if (sendWindowSize < dataLength) {
489                 // Stream is not stalled but we cannot send the entire frame
490                 spdySession.updateSendWindowSize(streamId, -1 * sendWindowSize);
491                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
492 
493                 // Create a partial data frame whose length is the current window size
494                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
495                         streamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
496 
497                 // Enqueue the remaining data (will be the first frame queued)
498                 spdySession.putPendingWrite(streamId, new SpdySession.PendingWrite(spdyDataFrame, promise));
499 
500                 // The transfer window size is pre-decremented when sending a data frame downstream.
501                 // Close the session on write failures that leave the transfer window in a corrupt state.
502                 final ChannelHandlerContext context = ctx;
503                 ctx.write(partialDataFrame).addListener(new ChannelFutureListener() {
504                     @Override
505                     public void operationComplete(ChannelFuture future) throws Exception {
506                         if (!future.isSuccess()) {
507                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
508                         }
509                     }
510                 });
511                 return;
512             } else {
513                 // Window size is large enough to send entire data frame
514                 spdySession.updateSendWindowSize(streamId, -1 * dataLength);
515                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataLength);
516 
517                 // The transfer window size is pre-decremented when sending a data frame downstream.
518                 // Close the session on write failures that leave the transfer window in a corrupt state.
519                 final ChannelHandlerContext context = ctx;
520                 promise.addListener(new ChannelFutureListener() {
521                     @Override
522                     public void operationComplete(ChannelFuture future) throws Exception {
523                         if (!future.isSuccess()) {
524                             issueSessionError(context, SpdySessionStatus.INTERNAL_ERROR);
525                         }
526                     }
527                 });
528             }
529 
530             // Close the local side of the stream if this is the last frame
531             if (spdyDataFrame.isLast()) {
532                 halfCloseStream(streamId, false, promise);
533             }
534 
535         } else if (msg instanceof SpdySynStreamFrame) {
536 
537             SpdySynStreamFrame spdySynStreamFrame = (SpdySynStreamFrame) msg;
538             int streamId = spdySynStreamFrame.streamId();
539 
540             if (isRemoteInitiatedId(streamId)) {
541                 promise.setFailure(PROTOCOL_EXCEPTION);
542                 return;
543             }
544 
545             byte priority = spdySynStreamFrame.priority();
546             boolean remoteSideClosed = spdySynStreamFrame.isUnidirectional();
547             boolean localSideClosed = spdySynStreamFrame.isLast();
548             if (!acceptStream(streamId, priority, remoteSideClosed, localSideClosed)) {
549                 promise.setFailure(PROTOCOL_EXCEPTION);
550                 return;
551             }
552 
553         } else if (msg instanceof SpdySynReplyFrame) {
554 
555             SpdySynReplyFrame spdySynReplyFrame = (SpdySynReplyFrame) msg;
556             int streamId = spdySynReplyFrame.streamId();
557 
558             // Frames must not be sent on half-closed streams
559             if (!isRemoteInitiatedId(streamId) || spdySession.isLocalSideClosed(streamId)) {
560                 promise.setFailure(PROTOCOL_EXCEPTION);
561                 return;
562             }
563 
564             // Close the local side of the stream if this is the last frame
565             if (spdySynReplyFrame.isLast()) {
566                 halfCloseStream(streamId, false, promise);
567             }
568 
569         } else if (msg instanceof SpdyRstStreamFrame) {
570 
571             SpdyRstStreamFrame spdyRstStreamFrame = (SpdyRstStreamFrame) msg;
572             removeStream(spdyRstStreamFrame.streamId(), promise);
573 
574         } else if (msg instanceof SpdySettingsFrame) {
575 
576             SpdySettingsFrame spdySettingsFrame = (SpdySettingsFrame) msg;
577 
578             int settingsMinorVersion = spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MINOR_VERSION);
579             if (settingsMinorVersion >= 0 && settingsMinorVersion != minorVersion) {
580                 // Settings frame had the wrong minor version
581                 promise.setFailure(PROTOCOL_EXCEPTION);
582                 return;
583             }
584 
585             int newConcurrentStreams =
586                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_MAX_CONCURRENT_STREAMS);
587             if (newConcurrentStreams >= 0) {
588                 localConcurrentStreams = newConcurrentStreams;
589             }
590 
591             // Persistence flag are inconsistent with the use of SETTINGS to communicate
592             // the initial window size. Remove flags from the sender requesting that the
593             // value be persisted. Remove values that the sender indicates are persisted.
594             if (spdySettingsFrame.isPersisted(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE)) {
595                 spdySettingsFrame.removeValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
596             }
597             spdySettingsFrame.setPersistValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE, false);
598 
599             int newInitialWindowSize =
600                     spdySettingsFrame.getValue(SpdySettingsFrame.SETTINGS_INITIAL_WINDOW_SIZE);
601             if (newInitialWindowSize >= 0) {
602                 updateInitialReceiveWindowSize(newInitialWindowSize);
603             }
604 
605         } else if (msg instanceof SpdyPingFrame) {
606 
607             SpdyPingFrame spdyPingFrame = (SpdyPingFrame) msg;
608             if (isRemoteInitiatedId(spdyPingFrame.id())) {
609                 ctx.fireExceptionCaught(new IllegalArgumentException(
610                             "invalid PING ID: " + spdyPingFrame.id()));
611                 return;
612             }
613             pings.getAndIncrement();
614 
615         } else if (msg instanceof SpdyGoAwayFrame) {
616 
617             // Why is this being sent? Intercept it and fail the write.
618             // Should have sent a CLOSE ChannelStateEvent
619             promise.setFailure(PROTOCOL_EXCEPTION);
620             return;
621 
622         } else if (msg instanceof SpdyHeadersFrame) {
623 
624             SpdyHeadersFrame spdyHeadersFrame = (SpdyHeadersFrame) msg;
625             int streamId = spdyHeadersFrame.streamId();
626 
627             // Frames must not be sent on half-closed streams
628             if (spdySession.isLocalSideClosed(streamId)) {
629                 promise.setFailure(PROTOCOL_EXCEPTION);
630                 return;
631             }
632 
633             // Close the local side of the stream if this is the last frame
634             if (spdyHeadersFrame.isLast()) {
635                 halfCloseStream(streamId, false, promise);
636             }
637 
638         } else if (msg instanceof SpdyWindowUpdateFrame) {
639 
640             // Why is this being sent? Intercept it and fail the write.
641             promise.setFailure(PROTOCOL_EXCEPTION);
642             return;
643         }
644 
645         ctx.write(msg, promise);
646     }
647 
648     /*
649      * SPDY Session Error Handling:
650      *
651      * When a session error occurs, the endpoint encountering the error must first
652      * send a GOAWAY frame with the Stream-ID of the most recently received stream
653      * from the remote endpoint, and the error code for why the session is terminating.
654      *
655      * After sending the GOAWAY frame, the endpoint must close the TCP connection.
656      */
657     private void issueSessionError(
658             ChannelHandlerContext ctx, SpdySessionStatus status) {
659 
660         sendGoAwayFrame(ctx, status).addListener(new ClosingChannelFutureListener(ctx, ctx.newPromise()));
661     }
662 
663     /*
664      * SPDY Stream Error Handling:
665      *
666      * Upon a stream error, the endpoint must send a RST_STREAM frame which contains
667      * the Stream-ID for the stream where the error occurred and the error getStatus which
668      * caused the error.
669      *
670      * After sending the RST_STREAM, the stream is closed to the sending endpoint.
671      *
672      * Note: this is only called by the worker thread
673      */
674     private void issueStreamError(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) {
675         boolean fireChannelRead = !spdySession.isRemoteSideClosed(streamId);
676         ChannelPromise promise = ctx.newPromise();
677         removeStream(streamId, promise);
678 
679         SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, status);
680         ctx.writeAndFlush(spdyRstStreamFrame, promise);
681         if (fireChannelRead) {
682             ctx.fireChannelRead(spdyRstStreamFrame);
683         }
684     }
685 
686     /*
687      * Helper functions
688      */
689 
690     private boolean isRemoteInitiatedId(int id) {
691         boolean serverId = isServerId(id);
692         return server && !serverId || !server && serverId;
693     }
694 
695     // need to synchronize to prevent new streams from being created while updating active streams
696     private void updateInitialSendWindowSize(int newInitialWindowSize) {
697         int deltaWindowSize = newInitialWindowSize - initialSendWindowSize;
698         initialSendWindowSize = newInitialWindowSize;
699         spdySession.updateAllSendWindowSizes(deltaWindowSize);
700     }
701 
702     // need to synchronize to prevent new streams from being created while updating active streams
703     private void updateInitialReceiveWindowSize(int newInitialWindowSize) {
704         int deltaWindowSize = newInitialWindowSize - initialReceiveWindowSize;
705         initialReceiveWindowSize = newInitialWindowSize;
706         spdySession.updateAllReceiveWindowSizes(deltaWindowSize);
707     }
708 
709     // need to synchronize accesses to sentGoAwayFrame, lastGoodStreamId, and initial window sizes
710     private boolean acceptStream(
711             int streamId, byte priority, boolean remoteSideClosed, boolean localSideClosed) {
712         // Cannot initiate any new streams after receiving or sending GOAWAY
713         if (receivedGoAwayFrame || sentGoAwayFrame) {
714             return false;
715         }
716 
717         boolean remote = isRemoteInitiatedId(streamId);
718         int maxConcurrentStreams = remote ? localConcurrentStreams : remoteConcurrentStreams;
719         if (spdySession.numActiveStreams(remote) >= maxConcurrentStreams) {
720             return false;
721         }
722         spdySession.acceptStream(
723                 streamId, priority, remoteSideClosed, localSideClosed,
724                 initialSendWindowSize, initialReceiveWindowSize, remote);
725         if (remote) {
726             lastGoodStreamId = streamId;
727         }
728         return true;
729     }
730 
731     private void halfCloseStream(int streamId, boolean remote, ChannelFuture future) {
732         if (remote) {
733             spdySession.closeRemoteSide(streamId, isRemoteInitiatedId(streamId));
734         } else {
735             spdySession.closeLocalSide(streamId, isRemoteInitiatedId(streamId));
736         }
737         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
738             future.addListener(closeSessionFutureListener);
739         }
740     }
741 
742     private void removeStream(int streamId, ChannelFuture future) {
743         spdySession.removeStream(streamId, STREAM_CLOSED, isRemoteInitiatedId(streamId));
744 
745         if (closeSessionFutureListener != null && spdySession.noActiveStreams()) {
746             future.addListener(closeSessionFutureListener);
747         }
748     }
749 
750     private void updateSendWindowSize(final ChannelHandlerContext ctx, int streamId, int deltaWindowSize) {
751         spdySession.updateSendWindowSize(streamId, deltaWindowSize);
752 
753         while (true) {
754             // Check if we have unblocked a stalled stream
755             SpdySession.PendingWrite pendingWrite = spdySession.getPendingWrite(streamId);
756             if (pendingWrite == null) {
757                 return;
758             }
759 
760             SpdyDataFrame spdyDataFrame = pendingWrite.spdyDataFrame;
761             int dataFrameSize = spdyDataFrame.content().readableBytes();
762             int writeStreamId = spdyDataFrame.streamId();
763             int sendWindowSize = spdySession.getSendWindowSize(writeStreamId);
764             int sessionSendWindowSize = spdySession.getSendWindowSize(SPDY_SESSION_STREAM_ID);
765             sendWindowSize = Math.min(sendWindowSize, sessionSendWindowSize);
766 
767             if (sendWindowSize <= 0) {
768                 return;
769             } else if (sendWindowSize < dataFrameSize) {
770                 // We can send a partial frame
771                 spdySession.updateSendWindowSize(writeStreamId, -1 * sendWindowSize);
772                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * sendWindowSize);
773 
774                 // Create a partial data frame whose length is the current window size
775                 SpdyDataFrame partialDataFrame = new DefaultSpdyDataFrame(
776                         writeStreamId, spdyDataFrame.content().readRetainedSlice(sendWindowSize));
777 
778                 // The transfer window size is pre-decremented when sending a data frame downstream.
779                 // Close the session on write failures that leave the transfer window in a corrupt state.
780                 ctx.writeAndFlush(partialDataFrame).addListener(new ChannelFutureListener() {
781                     @Override
782                     public void operationComplete(ChannelFuture future) throws Exception {
783                         if (!future.isSuccess()) {
784                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
785                         }
786                     }
787                 });
788             } else {
789                 // Window size is large enough to send entire data frame
790                 spdySession.removePendingWrite(writeStreamId);
791                 spdySession.updateSendWindowSize(writeStreamId, -1 * dataFrameSize);
792                 spdySession.updateSendWindowSize(SPDY_SESSION_STREAM_ID, -1 * dataFrameSize);
793 
794                 // Close the local side of the stream if this is the last frame
795                 if (spdyDataFrame.isLast()) {
796                     halfCloseStream(writeStreamId, false, pendingWrite.promise);
797                 }
798 
799                 // The transfer window size is pre-decremented when sending a data frame downstream.
800                 // Close the session on write failures that leave the transfer window in a corrupt state.
801                 ctx.writeAndFlush(spdyDataFrame, pendingWrite.promise).addListener(new ChannelFutureListener() {
802                     @Override
803                     public void operationComplete(ChannelFuture future) throws Exception {
804                         if (!future.isSuccess()) {
805                             issueSessionError(ctx, SpdySessionStatus.INTERNAL_ERROR);
806                         }
807                     }
808                 });
809             }
810         }
811     }
812 
813     private void sendGoAwayFrame(ChannelHandlerContext ctx, ChannelPromise future) {
814         // Avoid NotYetConnectedException
815         if (!ctx.channel().isActive()) {
816             ctx.close(future);
817             return;
818         }
819 
820         ChannelFuture f = sendGoAwayFrame(ctx, SpdySessionStatus.OK);
821         if (spdySession.noActiveStreams()) {
822             f.addListener(new ClosingChannelFutureListener(ctx, future));
823         } else {
824             closeSessionFutureListener = new ClosingChannelFutureListener(ctx, future);
825         }
826         // FIXME: Close the connection forcibly after timeout.
827     }
828 
829     private ChannelFuture sendGoAwayFrame(
830             ChannelHandlerContext ctx, SpdySessionStatus status) {
831         if (!sentGoAwayFrame) {
832             sentGoAwayFrame = true;
833             SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, status);
834             return ctx.writeAndFlush(spdyGoAwayFrame);
835         } else {
836             return ctx.newSucceededFuture();
837         }
838     }
839 
840     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
841         private final ChannelHandlerContext ctx;
842         private final ChannelPromise promise;
843 
844         ClosingChannelFutureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
845             this.ctx = ctx;
846             this.promise = promise;
847         }
848 
849         @Override
850         public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
851             ctx.close(promise);
852         }
853     }
854 }