查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
5    * "License"); you may not use this file except in compliance with the License. You may obtain a
6    * copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software distributed under the License
11   * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12   * or implied. See the License for the specific language governing permissions and limitations under
13   * the License.
14   */
15  package io.netty.handler.codec.http2;
16  
17  import io.netty.buffer.ByteBuf;
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.ChannelPromise;
22  import io.netty.channel.CoalescingBufferQueue;
23  import io.netty.handler.codec.http.HttpStatusClass;
24  import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
25  import io.netty.util.internal.UnstableApi;
26  
27  import java.util.ArrayDeque;
28  import java.util.Queue;
29  
30  import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
31  import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
32  import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
33  import static io.netty.handler.codec.http2.Http2Exception.connectionError;
34  import static io.netty.util.internal.ObjectUtil.checkNotNull;
35  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
36  import static java.lang.Integer.MAX_VALUE;
37  import static java.lang.Math.min;
38  
39  /**
40   * Default implementation of {@link Http2ConnectionEncoder}.
41   */
42  @UnstableApi
43  public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
44      private final Http2FrameWriter frameWriter;
45      private final Http2Connection connection;
46      private Http2LifecycleManager lifecycleManager;
47      // We prefer ArrayDeque to LinkedList because later will produce more GC.
48      // This initial capacity is plenty for SETTINGS traffic.
49      private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
50      private Queue<Http2Settings> outstandingRemoteSettingsQueue;
51  
52      public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
53          this.connection = checkNotNull(connection, "connection");
54          this.frameWriter = checkNotNull(frameWriter, "frameWriter");
55          if (connection.remote().flowController() == null) {
56              connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
57          }
58      }
59  
60      @Override
61      public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
62          this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
63      }
64  
65      @Override
66      public Http2FrameWriter frameWriter() {
67          return frameWriter;
68      }
69  
70      @Override
71      public Http2Connection connection() {
72          return connection;
73      }
74  
75      @Override
76      public final Http2RemoteFlowController flowController() {
77          return connection().remote().flowController();
78      }
79  
80      @Override
81      public void remoteSettings(Http2Settings settings) throws Http2Exception {
82          Boolean pushEnabled = settings.pushEnabled();
83          Http2FrameWriter.Configuration config = configuration();
84          Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
85          Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
86          if (pushEnabled != null) {
87              if (!connection.isServer() && pushEnabled) {
88                  throw connectionError(PROTOCOL_ERROR,
89                      "Client received a value of ENABLE_PUSH specified to other than 0");
90              }
91              connection.remote().allowPushTo(pushEnabled);
92          }
93  
94          Long maxConcurrentStreams = settings.maxConcurrentStreams();
95          if (maxConcurrentStreams != null) {
96              connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
97          }
98  
99          Long headerTableSize = settings.headerTableSize();
100         if (headerTableSize != null) {
101             outboundHeaderConfig.maxHeaderTableSize(headerTableSize);
102         }
103 
104         Long maxHeaderListSize = settings.maxHeaderListSize();
105         if (maxHeaderListSize != null) {
106             outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
107         }
108 
109         Integer maxFrameSize = settings.maxFrameSize();
110         if (maxFrameSize != null) {
111             outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
112         }
113 
114         Integer initialWindowSize = settings.initialWindowSize();
115         if (initialWindowSize != null) {
116             flowController().initialWindowSize(initialWindowSize);
117         }
118     }
119 
120     @Override
121     public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
122             final boolean endOfStream, ChannelPromise promise) {
123         promise = promise.unvoid();
124         final Http2Stream stream;
125         try {
126             stream = requireStream(streamId);
127 
128             // Verify that the stream is in the appropriate state for sending DATA frames.
129             switch (stream.state()) {
130                 case OPEN:
131                 case HALF_CLOSED_REMOTE:
132                     // Allowed sending DATA frames in these states.
133                     break;
134                 default:
135                     throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
136             }
137         } catch (Throwable e) {
138             data.release();
139             return promise.setFailure(e);
140         }
141 
142         // Hand control of the frame to the flow controller.
143         flowController().addFlowControlled(stream,
144                 new FlowControlledData(stream, data, padding, endOfStream, promise));
145         return promise;
146     }
147 
148     @Override
149     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
150             boolean endStream, ChannelPromise promise) {
151         return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream, promise);
152     }
153 
154     private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
155                                                     boolean endOfStream) {
156         boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
157         if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
158             throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
159         }
160         return isInformational;
161     }
162 
163     @Override
164     public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
165             final Http2Headers headers, final int streamDependency, final short weight,
166             final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
167         return writeHeaders0(ctx, streamId, headers, true, streamDependency,
168                 weight, exclusive, padding, endOfStream, promise);
169     }
170 
171     /**
172      * Write headers via {@link Http2FrameWriter}. If {@code hasPriority} is {@code false} it will ignore the
173      * {@code streamDependency}, {@code weight} and {@code exclusive} parameters.
174      */
175     private static ChannelFuture sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
176                                        Http2Headers headers, final boolean hasPriority,
177                                        int streamDependency, final short weight,
178                                        boolean exclusive, final int padding,
179                                        boolean endOfStream, ChannelPromise promise) {
180         if (hasPriority) {
181             return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
182                     weight, exclusive, padding, endOfStream, promise);
183         }
184         return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream, promise);
185     }
186 
187     private ChannelFuture writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
188                                         final Http2Headers headers, final boolean hasPriority,
189                                         final int streamDependency, final short weight,
190                                         final boolean exclusive, final int padding,
191                                         final boolean endOfStream, ChannelPromise promise) {
192         try {
193             Http2Stream stream = connection.stream(streamId);
194             if (stream == null) {
195                 try {
196                     // We don't create the stream in a `halfClosed` state because if this is an initial
197                     // HEADERS frame we don't want the connection state to signify that the HEADERS have
198                     // been sent until after they have been encoded and placed in the outbound buffer.
199                     // Therefore, we let the `LifeCycleManager` will take care of transitioning the state
200                     // as appropriate.
201                     stream = connection.local().createStream(streamId, /*endOfStream*/ false);
202                 } catch (Http2Exception cause) {
203                     if (connection.remote().mayHaveCreatedStream(streamId)) {
204                         promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
205                         return promise;
206                     }
207                     throw cause;
208                 }
209             } else {
210                 switch (stream.state()) {
211                     case RESERVED_LOCAL:
212                         stream.open(endOfStream);
213                         break;
214                     case OPEN:
215                     case HALF_CLOSED_REMOTE:
216                         // Allowed sending headers in these states.
217                         break;
218                     default:
219                         throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
220                                                         stream.state());
221                 }
222             }
223 
224             // Trailing headers must go through flow control if there are other frames queued in flow control
225             // for this stream.
226             Http2RemoteFlowController flowController = flowController();
227             if (!endOfStream || !flowController.hasFlowControlled(stream)) {
228                 // The behavior here should mirror that in FlowControlledHeaders
229 
230                 promise = promise.unvoid();
231                 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
232 
233                 ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
234                         weight, exclusive, padding, endOfStream, promise);
235 
236                 // Writing headers may fail during the encode state if they violate HPACK limits.
237                 Throwable failureCause = future.cause();
238                 if (failureCause == null) {
239                     // Synchronously set the headersSent flag to ensure that we do not subsequently write
240                     // other headers containing pseudo-header fields.
241                     //
242                     // This just sets internal stream state which is used elsewhere in the codec and doesn't
243                     // necessarily mean the write will complete successfully.
244                     stream.headersSent(isInformational);
245 
246                     if (!future.isSuccess()) {
247                         // Either the future is not done or failed in the meantime.
248                         notifyLifecycleManagerOnError(future, ctx);
249                     }
250                 } else {
251                     lifecycleManager.onError(ctx, true, failureCause);
252                 }
253 
254                 if (endOfStream) {
255                     // Must handle calling onError before calling closeStreamLocal, otherwise the error handler will
256                     // incorrectly think the stream no longer exists and so may not send RST_STREAM or perform similar
257                     // appropriate action.
258                     lifecycleManager.closeStreamLocal(stream, future);
259                 }
260 
261                 return future;
262             } else {
263                 // Pass headers to the flow-controller so it can maintain their sequence relative to DATA frames.
264                 flowController.addFlowControlled(stream,
265                         new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
266                                 weight, exclusive, padding, true, promise));
267                 return promise;
268             }
269         } catch (Throwable t) {
270             lifecycleManager.onError(ctx, true, t);
271             promise.tryFailure(t);
272             return promise;
273         }
274     }
275 
276     @Override
277     public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
278             boolean exclusive, ChannelPromise promise) {
279         return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
280     }
281 
282     @Override
283     public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
284             ChannelPromise promise) {
285         // Delegate to the lifecycle manager for proper updating of connection state.
286         return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
287     }
288 
289     @Override
290     public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
291             ChannelPromise promise) {
292         outstandingLocalSettingsQueue.add(settings);
293         try {
294             Boolean pushEnabled = settings.pushEnabled();
295             if (pushEnabled != null && connection.isServer()) {
296                 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
297             }
298         } catch (Throwable e) {
299             return promise.setFailure(e);
300         }
301 
302         return frameWriter.writeSettings(ctx, settings, promise);
303     }
304 
305     @Override
306     public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
307         if (outstandingRemoteSettingsQueue == null) {
308             return frameWriter.writeSettingsAck(ctx, promise);
309         }
310         Http2Settings settings = outstandingRemoteSettingsQueue.poll();
311         if (settings == null) {
312             return promise.setFailure(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
313                     " pending SETTINGS"));
314         }
315         SimpleChannelPromiseAggregator aggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(),
316                 ctx.executor());
317         // Acknowledge receipt of the settings. We should do this before we process the settings to ensure our
318         // remote peer applies these settings before any subsequent frames that we may send which depend upon
319         // these new settings. See https://github.com/netty/netty/issues/6520.
320         frameWriter.writeSettingsAck(ctx, aggregator.newPromise());
321 
322         // We create a "new promise" to make sure that status from both the write and the application are taken into
323         // account independently.
324         ChannelPromise applySettingsPromise = aggregator.newPromise();
325         try {
326             remoteSettings(settings);
327             applySettingsPromise.setSuccess();
328         } catch (Throwable e) {
329             applySettingsPromise.setFailure(e);
330             lifecycleManager.onError(ctx, true, e);
331         }
332         return aggregator.doneAllocatingPromises();
333     }
334 
335     @Override
336     public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
337         return frameWriter.writePing(ctx, ack, data, promise);
338     }
339 
340     @Override
341     public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
342             Http2Headers headers, int padding, ChannelPromise promise) {
343         try {
344             if (connection.goAwayReceived()) {
345                 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
346             }
347 
348             Http2Stream stream = requireStream(streamId);
349             // Reserve the promised stream.
350             connection.local().reservePushStream(promisedStreamId, stream);
351 
352             promise = promise.unvoid();
353             ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
354                                                                 promise);
355             // Writing headers may fail during the encode state if they violate HPACK limits.
356             Throwable failureCause = future.cause();
357             if (failureCause == null) {
358                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
359                 // necessarily mean the write will complete successfully.
360                 stream.pushPromiseSent();
361 
362                 if (!future.isSuccess()) {
363                     // Either the future is not done or failed in the meantime.
364                     notifyLifecycleManagerOnError(future, ctx);
365                 }
366             } else {
367                 lifecycleManager.onError(ctx, true, failureCause);
368             }
369             return future;
370         } catch (Throwable t) {
371             lifecycleManager.onError(ctx, true, t);
372             promise.tryFailure(t);
373             return promise;
374         }
375     }
376 
377     @Override
378     public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
379             ChannelPromise promise) {
380         return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
381     }
382 
383     @Override
384     public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
385             ChannelPromise promise) {
386         return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
387                 " objects to control window sizes"));
388     }
389 
390     @Override
391     public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
392             ByteBuf payload, ChannelPromise promise) {
393         return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
394     }
395 
396     @Override
397     public void close() {
398         frameWriter.close();
399     }
400 
401     @Override
402     public Http2Settings pollSentSettings() {
403         return outstandingLocalSettingsQueue.poll();
404     }
405 
406     @Override
407     public Configuration configuration() {
408         return frameWriter.configuration();
409     }
410 
411     private Http2Stream requireStream(int streamId) {
412         Http2Stream stream = connection.stream(streamId);
413         if (stream == null) {
414             final String message;
415             if (connection.streamMayHaveExisted(streamId)) {
416                 message = "Stream no longer exists: " + streamId;
417             } else {
418                 message = "Stream does not exist: " + streamId;
419             }
420             throw new IllegalArgumentException(message);
421         }
422         return stream;
423     }
424 
425     @Override
426     public void consumeReceivedSettings(Http2Settings settings) {
427         if (outstandingRemoteSettingsQueue == null) {
428             outstandingRemoteSettingsQueue = new ArrayDeque<Http2Settings>(2);
429         }
430         outstandingRemoteSettingsQueue.add(settings);
431     }
432 
433     /**
434      * Wrap a DATA frame so it can be written subject to flow-control. Note that this implementation assumes it
435      * only writes padding once for the entire payload as opposed to writing it once per-frame. This makes the
436      * {@link #size} calculation deterministic thereby greatly simplifying the implementation.
437      * <p>
438      * If frame-splitting is required to fit within max-frame-size and flow-control constraints we ensure that
439      * the passed promise is not completed until last frame write.
440      * </p>
441      */
442     private final class FlowControlledData extends FlowControlledBase {
443         private final CoalescingBufferQueue queue;
444         private int dataSize;
445 
446         FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
447                                    ChannelPromise promise) {
448             super(stream, padding, endOfStream, promise);
449             queue = new CoalescingBufferQueue(promise.channel());
450             queue.add(buf, promise);
451             dataSize = queue.readableBytes();
452         }
453 
454         @Override
455         public int size() {
456             return dataSize + padding;
457         }
458 
459         @Override
460         public void error(ChannelHandlerContext ctx, Throwable cause) {
461             queue.releaseAndFailAll(cause);
462             // Don't update dataSize because we need to ensure the size() method returns a consistent size even after
463             // error so we don't invalidate flow control when returning bytes to flow control.
464             //
465             // That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
466             // because of an error.
467             lifecycleManager.onError(ctx, true, cause);
468         }
469 
470         @Override
471         public void write(ChannelHandlerContext ctx, int allowedBytes) {
472             int queuedData = queue.readableBytes();
473             if (!endOfStream) {
474                 if (queuedData == 0) {
475                     if (queue.isEmpty()) {
476                         // When the queue is empty it means we did clear it because of an error(...) call
477                         // (as otherwise we will have at least 1 entry in there), which will happen either when called
478                         // explicit or when the write itself fails. In this case just set dataSize and padding to 0
479                         // which will signal back that the whole frame was consumed.
480                         //
481                         // See https://github.com/netty/netty/issues/8707.
482                         padding = dataSize = 0;
483                     } else {
484                         // There's no need to write any data frames because there are only empty data frames in the
485                         // queue and it is not end of stream yet. Just complete their promises by getting the buffer
486                         // corresponding to 0 bytes and writing it to the channel (to preserve notification order).
487                         ChannelPromise writePromise = ctx.newPromise().addListener(this);
488                         ctx.write(queue.remove(0, writePromise), writePromise);
489                     }
490                     return;
491                 }
492 
493                 if (allowedBytes == 0) {
494                     return;
495                 }
496             }
497 
498             // Determine how much data to write.
499             int writableData = min(queuedData, allowedBytes);
500             ChannelPromise writePromise = ctx.newPromise().addListener(this);
501             ByteBuf toWrite = queue.remove(writableData, writePromise);
502             dataSize = queue.readableBytes();
503 
504             // Determine how much padding to write.
505             int writablePadding = min(allowedBytes - writableData, padding);
506             padding -= writablePadding;
507 
508             // Write the frame(s).
509             frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
510                     endOfStream && size() == 0, writePromise);
511         }
512 
513         @Override
514         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
515             FlowControlledData nextData;
516             if (FlowControlledData.class != next.getClass() ||
517                 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
518                 return false;
519             }
520             nextData.queue.copyTo(queue);
521             dataSize = queue.readableBytes();
522             // Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
523             padding = Math.max(padding, nextData.padding);
524             endOfStream = nextData.endOfStream;
525             return true;
526         }
527     }
528 
529     private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
530         future.addListener(new ChannelFutureListener() {
531             @Override
532             public void operationComplete(ChannelFuture future) throws Exception {
533                 Throwable cause = future.cause();
534                 if (cause != null) {
535                     lifecycleManager.onError(ctx, true, cause);
536                 }
537             }
538         });
539     }
540 
541     /**
542      * Wrap headers so they can be written subject to flow-control. While headers do not have cost against the
543      * flow-control window their order with respect to other frames must be maintained, hence if a DATA frame is
544      * blocked on flow-control a HEADER frame must wait until this frame has been written.
545      */
546     private final class FlowControlledHeaders extends FlowControlledBase {
547         private final Http2Headers headers;
548         private final boolean hasPriority;
549         private final int streamDependency;
550         private final short weight;
551         private final boolean exclusive;
552 
553         FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
554                               int streamDependency, short weight, boolean exclusive,
555                               int padding, boolean endOfStream, ChannelPromise promise) {
556             super(stream, padding, endOfStream, promise.unvoid());
557             this.headers = headers;
558             this.hasPriority = hasPriority;
559             this.streamDependency = streamDependency;
560             this.weight = weight;
561             this.exclusive = exclusive;
562         }
563 
564         @Override
565         public int size() {
566             return 0;
567         }
568 
569         @Override
570         public void error(ChannelHandlerContext ctx, Throwable cause) {
571             if (ctx != null) {
572                 lifecycleManager.onError(ctx, true, cause);
573             }
574             promise.tryFailure(cause);
575         }
576 
577         @Override
578         public void write(ChannelHandlerContext ctx, int allowedBytes) {
579             boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
580             // The code is currently requiring adding this listener before writing, in order to call onError() before
581             // closeStreamLocal().
582             promise.addListener(this);
583 
584             ChannelFuture f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
585                     weight, exclusive, padding, endOfStream, promise);
586             // Writing headers may fail during the encode state if they violate HPACK limits.
587             Throwable failureCause = f.cause();
588             if (failureCause == null) {
589                 // This just sets internal stream state which is used elsewhere in the codec and doesn't
590                 // necessarily mean the write will complete successfully.
591                 stream.headersSent(isInformational);
592             }
593         }
594 
595         @Override
596         public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
597             return false;
598         }
599     }
600 
601     /**
602      * Common base type for payloads to deliver via flow-control.
603      */
604     public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
605             ChannelFutureListener {
606         protected final Http2Stream stream;
607         protected ChannelPromise promise;
608         protected boolean endOfStream;
609         protected int padding;
610 
611         FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
612                 final ChannelPromise promise) {
613             checkPositiveOrZero(padding, "padding");
614             this.padding = padding;
615             this.endOfStream = endOfStream;
616             this.stream = stream;
617             this.promise = promise;
618         }
619 
620         @Override
621         public void writeComplete() {
622             if (endOfStream) {
623                 lifecycleManager.closeStreamLocal(stream, promise);
624             }
625         }
626 
627         @Override
628         public void operationComplete(ChannelFuture future) throws Exception {
629             if (!future.isSuccess()) {
630                 error(flowController().channelHandlerContext(), future.cause());
631             }
632         }
633     }
634 }