1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http2;
16
17 import io.netty.buffer.ByteBuf;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.ChannelPromise;
22 import io.netty.channel.CoalescingBufferQueue;
23 import io.netty.handler.codec.http.HttpStatusClass;
24 import io.netty.handler.codec.http2.Http2CodecUtil.SimpleChannelPromiseAggregator;
25 import io.netty.util.internal.UnstableApi;
26
27 import java.util.ArrayDeque;
28 import java.util.Queue;
29
30 import static io.netty.handler.codec.http.HttpStatusClass.INFORMATIONAL;
31 import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
32 import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
33 import static io.netty.handler.codec.http2.Http2Exception.connectionError;
34 import static io.netty.util.internal.ObjectUtil.checkNotNull;
35 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
36 import static java.lang.Integer.MAX_VALUE;
37 import static java.lang.Math.min;
38
39
40
41
42 @UnstableApi
43 public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder, Http2SettingsReceivedConsumer {
44 private final Http2FrameWriter frameWriter;
45 private final Http2Connection connection;
46 private Http2LifecycleManager lifecycleManager;
47
48
49 private final Queue<Http2Settings> outstandingLocalSettingsQueue = new ArrayDeque<Http2Settings>(4);
50 private Queue<Http2Settings> outstandingRemoteSettingsQueue;
51
52 public DefaultHttp2ConnectionEncoder(Http2Connection connection, Http2FrameWriter frameWriter) {
53 this.connection = checkNotNull(connection, "connection");
54 this.frameWriter = checkNotNull(frameWriter, "frameWriter");
55 if (connection.remote().flowController() == null) {
56 connection.remote().flowController(new DefaultHttp2RemoteFlowController(connection));
57 }
58 }
59
60 @Override
61 public void lifecycleManager(Http2LifecycleManager lifecycleManager) {
62 this.lifecycleManager = checkNotNull(lifecycleManager, "lifecycleManager");
63 }
64
65 @Override
66 public Http2FrameWriter frameWriter() {
67 return frameWriter;
68 }
69
70 @Override
71 public Http2Connection connection() {
72 return connection;
73 }
74
75 @Override
76 public final Http2RemoteFlowController flowController() {
77 return connection().remote().flowController();
78 }
79
80 @Override
81 public void remoteSettings(Http2Settings settings) throws Http2Exception {
82 Boolean pushEnabled = settings.pushEnabled();
83 Http2FrameWriter.Configuration config = configuration();
84 Http2HeadersEncoder.Configuration outboundHeaderConfig = config.headersConfiguration();
85 Http2FrameSizePolicy outboundFrameSizePolicy = config.frameSizePolicy();
86 if (pushEnabled != null) {
87 if (!connection.isServer() && pushEnabled) {
88 throw connectionError(PROTOCOL_ERROR,
89 "Client received a value of ENABLE_PUSH specified to other than 0");
90 }
91 connection.remote().allowPushTo(pushEnabled);
92 }
93
94 Long maxConcurrentStreams = settings.maxConcurrentStreams();
95 if (maxConcurrentStreams != null) {
96 connection.local().maxActiveStreams((int) min(maxConcurrentStreams, MAX_VALUE));
97 }
98
99 Long headerTableSize = settings.headerTableSize();
100 if (headerTableSize != null) {
101 outboundHeaderConfig.maxHeaderTableSize(headerTableSize);
102 }
103
104 Long maxHeaderListSize = settings.maxHeaderListSize();
105 if (maxHeaderListSize != null) {
106 outboundHeaderConfig.maxHeaderListSize(maxHeaderListSize);
107 }
108
109 Integer maxFrameSize = settings.maxFrameSize();
110 if (maxFrameSize != null) {
111 outboundFrameSizePolicy.maxFrameSize(maxFrameSize);
112 }
113
114 Integer initialWindowSize = settings.initialWindowSize();
115 if (initialWindowSize != null) {
116 flowController().initialWindowSize(initialWindowSize);
117 }
118 }
119
120 @Override
121 public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, ByteBuf data, int padding,
122 final boolean endOfStream, ChannelPromise promise) {
123 promise = promise.unvoid();
124 final Http2Stream stream;
125 try {
126 stream = requireStream(streamId);
127
128
129 switch (stream.state()) {
130 case OPEN:
131 case HALF_CLOSED_REMOTE:
132
133 break;
134 default:
135 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " + stream.state());
136 }
137 } catch (Throwable e) {
138 data.release();
139 return promise.setFailure(e);
140 }
141
142
143 flowController().addFlowControlled(stream,
144 new FlowControlledData(stream, data, padding, endOfStream, promise));
145 return promise;
146 }
147
148 @Override
149 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
150 boolean endStream, ChannelPromise promise) {
151 return writeHeaders0(ctx, streamId, headers, false, 0, (short) 0, false, padding, endStream, promise);
152 }
153
154 private static boolean validateHeadersSentState(Http2Stream stream, Http2Headers headers, boolean isServer,
155 boolean endOfStream) {
156 boolean isInformational = isServer && HttpStatusClass.valueOf(headers.status()) == INFORMATIONAL;
157 if ((isInformational || !endOfStream) && stream.isHeadersSent() || stream.isTrailersSent()) {
158 throw new IllegalStateException("Stream " + stream.id() + " sent too many headers EOS: " + endOfStream);
159 }
160 return isInformational;
161 }
162
163 @Override
164 public ChannelFuture writeHeaders(final ChannelHandlerContext ctx, final int streamId,
165 final Http2Headers headers, final int streamDependency, final short weight,
166 final boolean exclusive, final int padding, final boolean endOfStream, ChannelPromise promise) {
167 return writeHeaders0(ctx, streamId, headers, true, streamDependency,
168 weight, exclusive, padding, endOfStream, promise);
169 }
170
171
172
173
174
175 private static ChannelFuture sendHeaders(Http2FrameWriter frameWriter, ChannelHandlerContext ctx, int streamId,
176 Http2Headers headers, final boolean hasPriority,
177 int streamDependency, final short weight,
178 boolean exclusive, final int padding,
179 boolean endOfStream, ChannelPromise promise) {
180 if (hasPriority) {
181 return frameWriter.writeHeaders(ctx, streamId, headers, streamDependency,
182 weight, exclusive, padding, endOfStream, promise);
183 }
184 return frameWriter.writeHeaders(ctx, streamId, headers, padding, endOfStream, promise);
185 }
186
187 private ChannelFuture writeHeaders0(final ChannelHandlerContext ctx, final int streamId,
188 final Http2Headers headers, final boolean hasPriority,
189 final int streamDependency, final short weight,
190 final boolean exclusive, final int padding,
191 final boolean endOfStream, ChannelPromise promise) {
192 try {
193 Http2Stream stream = connection.stream(streamId);
194 if (stream == null) {
195 try {
196
197
198
199
200
201 stream = connection.local().createStream(streamId, false);
202 } catch (Http2Exception cause) {
203 if (connection.remote().mayHaveCreatedStream(streamId)) {
204 promise.tryFailure(new IllegalStateException("Stream no longer exists: " + streamId, cause));
205 return promise;
206 }
207 throw cause;
208 }
209 } else {
210 switch (stream.state()) {
211 case RESERVED_LOCAL:
212 stream.open(endOfStream);
213 break;
214 case OPEN:
215 case HALF_CLOSED_REMOTE:
216
217 break;
218 default:
219 throw new IllegalStateException("Stream " + stream.id() + " in unexpected state " +
220 stream.state());
221 }
222 }
223
224
225
226 Http2RemoteFlowController flowController = flowController();
227 if (!endOfStream || !flowController.hasFlowControlled(stream)) {
228
229
230 promise = promise.unvoid();
231 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
232
233 ChannelFuture future = sendHeaders(frameWriter, ctx, streamId, headers, hasPriority, streamDependency,
234 weight, exclusive, padding, endOfStream, promise);
235
236
237 Throwable failureCause = future.cause();
238 if (failureCause == null) {
239
240
241
242
243
244 stream.headersSent(isInformational);
245
246 if (!future.isSuccess()) {
247
248 notifyLifecycleManagerOnError(future, ctx);
249 }
250 } else {
251 lifecycleManager.onError(ctx, true, failureCause);
252 }
253
254 if (endOfStream) {
255
256
257
258 lifecycleManager.closeStreamLocal(stream, future);
259 }
260
261 return future;
262 } else {
263
264 flowController.addFlowControlled(stream,
265 new FlowControlledHeaders(stream, headers, hasPriority, streamDependency,
266 weight, exclusive, padding, true, promise));
267 return promise;
268 }
269 } catch (Throwable t) {
270 lifecycleManager.onError(ctx, true, t);
271 promise.tryFailure(t);
272 return promise;
273 }
274 }
275
276 @Override
277 public ChannelFuture writePriority(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight,
278 boolean exclusive, ChannelPromise promise) {
279 return frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
280 }
281
282 @Override
283 public ChannelFuture writeRstStream(ChannelHandlerContext ctx, int streamId, long errorCode,
284 ChannelPromise promise) {
285
286 return lifecycleManager.resetStream(ctx, streamId, errorCode, promise);
287 }
288
289 @Override
290 public ChannelFuture writeSettings(ChannelHandlerContext ctx, Http2Settings settings,
291 ChannelPromise promise) {
292 outstandingLocalSettingsQueue.add(settings);
293 try {
294 Boolean pushEnabled = settings.pushEnabled();
295 if (pushEnabled != null && connection.isServer()) {
296 throw connectionError(PROTOCOL_ERROR, "Server sending SETTINGS frame with ENABLE_PUSH specified");
297 }
298 } catch (Throwable e) {
299 return promise.setFailure(e);
300 }
301
302 return frameWriter.writeSettings(ctx, settings, promise);
303 }
304
305 @Override
306 public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
307 if (outstandingRemoteSettingsQueue == null) {
308 return frameWriter.writeSettingsAck(ctx, promise);
309 }
310 Http2Settings settings = outstandingRemoteSettingsQueue.poll();
311 if (settings == null) {
312 return promise.setFailure(new Http2Exception(INTERNAL_ERROR, "attempted to write a SETTINGS ACK with no " +
313 " pending SETTINGS"));
314 }
315 SimpleChannelPromiseAggregator aggregator = new SimpleChannelPromiseAggregator(promise, ctx.channel(),
316 ctx.executor());
317
318
319
320 frameWriter.writeSettingsAck(ctx, aggregator.newPromise());
321
322
323
324 ChannelPromise applySettingsPromise = aggregator.newPromise();
325 try {
326 remoteSettings(settings);
327 applySettingsPromise.setSuccess();
328 } catch (Throwable e) {
329 applySettingsPromise.setFailure(e);
330 lifecycleManager.onError(ctx, true, e);
331 }
332 return aggregator.doneAllocatingPromises();
333 }
334
335 @Override
336 public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
337 return frameWriter.writePing(ctx, ack, data, promise);
338 }
339
340 @Override
341 public ChannelFuture writePushPromise(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
342 Http2Headers headers, int padding, ChannelPromise promise) {
343 try {
344 if (connection.goAwayReceived()) {
345 throw connectionError(PROTOCOL_ERROR, "Sending PUSH_PROMISE after GO_AWAY received.");
346 }
347
348 Http2Stream stream = requireStream(streamId);
349
350 connection.local().reservePushStream(promisedStreamId, stream);
351
352 promise = promise.unvoid();
353 ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding,
354 promise);
355
356 Throwable failureCause = future.cause();
357 if (failureCause == null) {
358
359
360 stream.pushPromiseSent();
361
362 if (!future.isSuccess()) {
363
364 notifyLifecycleManagerOnError(future, ctx);
365 }
366 } else {
367 lifecycleManager.onError(ctx, true, failureCause);
368 }
369 return future;
370 } catch (Throwable t) {
371 lifecycleManager.onError(ctx, true, t);
372 promise.tryFailure(t);
373 return promise;
374 }
375 }
376
377 @Override
378 public ChannelFuture writeGoAway(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData,
379 ChannelPromise promise) {
380 return lifecycleManager.goAway(ctx, lastStreamId, errorCode, debugData, promise);
381 }
382
383 @Override
384 public ChannelFuture writeWindowUpdate(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement,
385 ChannelPromise promise) {
386 return promise.setFailure(new UnsupportedOperationException("Use the Http2[Inbound|Outbound]FlowController" +
387 " objects to control window sizes"));
388 }
389
390 @Override
391 public ChannelFuture writeFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags,
392 ByteBuf payload, ChannelPromise promise) {
393 return frameWriter.writeFrame(ctx, frameType, streamId, flags, payload, promise);
394 }
395
396 @Override
397 public void close() {
398 frameWriter.close();
399 }
400
401 @Override
402 public Http2Settings pollSentSettings() {
403 return outstandingLocalSettingsQueue.poll();
404 }
405
406 @Override
407 public Configuration configuration() {
408 return frameWriter.configuration();
409 }
410
411 private Http2Stream requireStream(int streamId) {
412 Http2Stream stream = connection.stream(streamId);
413 if (stream == null) {
414 final String message;
415 if (connection.streamMayHaveExisted(streamId)) {
416 message = "Stream no longer exists: " + streamId;
417 } else {
418 message = "Stream does not exist: " + streamId;
419 }
420 throw new IllegalArgumentException(message);
421 }
422 return stream;
423 }
424
425 @Override
426 public void consumeReceivedSettings(Http2Settings settings) {
427 if (outstandingRemoteSettingsQueue == null) {
428 outstandingRemoteSettingsQueue = new ArrayDeque<Http2Settings>(2);
429 }
430 outstandingRemoteSettingsQueue.add(settings);
431 }
432
433
434
435
436
437
438
439
440
441
442 private final class FlowControlledData extends FlowControlledBase {
443 private final CoalescingBufferQueue queue;
444 private int dataSize;
445
446 FlowControlledData(Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream,
447 ChannelPromise promise) {
448 super(stream, padding, endOfStream, promise);
449 queue = new CoalescingBufferQueue(promise.channel());
450 queue.add(buf, promise);
451 dataSize = queue.readableBytes();
452 }
453
454 @Override
455 public int size() {
456 return dataSize + padding;
457 }
458
459 @Override
460 public void error(ChannelHandlerContext ctx, Throwable cause) {
461 queue.releaseAndFailAll(cause);
462
463
464
465
466
467 lifecycleManager.onError(ctx, true, cause);
468 }
469
470 @Override
471 public void write(ChannelHandlerContext ctx, int allowedBytes) {
472 int queuedData = queue.readableBytes();
473 if (!endOfStream) {
474 if (queuedData == 0) {
475 if (queue.isEmpty()) {
476
477
478
479
480
481
482 padding = dataSize = 0;
483 } else {
484
485
486
487 ChannelPromise writePromise = ctx.newPromise().addListener(this);
488 ctx.write(queue.remove(0, writePromise), writePromise);
489 }
490 return;
491 }
492
493 if (allowedBytes == 0) {
494 return;
495 }
496 }
497
498
499 int writableData = min(queuedData, allowedBytes);
500 ChannelPromise writePromise = ctx.newPromise().addListener(this);
501 ByteBuf toWrite = queue.remove(writableData, writePromise);
502 dataSize = queue.readableBytes();
503
504
505 int writablePadding = min(allowedBytes - writableData, padding);
506 padding -= writablePadding;
507
508
509 frameWriter().writeData(ctx, stream.id(), toWrite, writablePadding,
510 endOfStream && size() == 0, writePromise);
511 }
512
513 @Override
514 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
515 FlowControlledData nextData;
516 if (FlowControlledData.class != next.getClass() ||
517 MAX_VALUE - (nextData = (FlowControlledData) next).size() < size()) {
518 return false;
519 }
520 nextData.queue.copyTo(queue);
521 dataSize = queue.readableBytes();
522
523 padding = Math.max(padding, nextData.padding);
524 endOfStream = nextData.endOfStream;
525 return true;
526 }
527 }
528
529 private void notifyLifecycleManagerOnError(ChannelFuture future, final ChannelHandlerContext ctx) {
530 future.addListener(new ChannelFutureListener() {
531 @Override
532 public void operationComplete(ChannelFuture future) throws Exception {
533 Throwable cause = future.cause();
534 if (cause != null) {
535 lifecycleManager.onError(ctx, true, cause);
536 }
537 }
538 });
539 }
540
541
542
543
544
545
546 private final class FlowControlledHeaders extends FlowControlledBase {
547 private final Http2Headers headers;
548 private final boolean hasPriority;
549 private final int streamDependency;
550 private final short weight;
551 private final boolean exclusive;
552
553 FlowControlledHeaders(Http2Stream stream, Http2Headers headers, boolean hasPriority,
554 int streamDependency, short weight, boolean exclusive,
555 int padding, boolean endOfStream, ChannelPromise promise) {
556 super(stream, padding, endOfStream, promise.unvoid());
557 this.headers = headers;
558 this.hasPriority = hasPriority;
559 this.streamDependency = streamDependency;
560 this.weight = weight;
561 this.exclusive = exclusive;
562 }
563
564 @Override
565 public int size() {
566 return 0;
567 }
568
569 @Override
570 public void error(ChannelHandlerContext ctx, Throwable cause) {
571 if (ctx != null) {
572 lifecycleManager.onError(ctx, true, cause);
573 }
574 promise.tryFailure(cause);
575 }
576
577 @Override
578 public void write(ChannelHandlerContext ctx, int allowedBytes) {
579 boolean isInformational = validateHeadersSentState(stream, headers, connection.isServer(), endOfStream);
580
581
582 promise.addListener(this);
583
584 ChannelFuture f = sendHeaders(frameWriter, ctx, stream.id(), headers, hasPriority, streamDependency,
585 weight, exclusive, padding, endOfStream, promise);
586
587 Throwable failureCause = f.cause();
588 if (failureCause == null) {
589
590
591 stream.headersSent(isInformational);
592 }
593 }
594
595 @Override
596 public boolean merge(ChannelHandlerContext ctx, Http2RemoteFlowController.FlowControlled next) {
597 return false;
598 }
599 }
600
601
602
603
604 public abstract class FlowControlledBase implements Http2RemoteFlowController.FlowControlled,
605 ChannelFutureListener {
606 protected final Http2Stream stream;
607 protected ChannelPromise promise;
608 protected boolean endOfStream;
609 protected int padding;
610
611 FlowControlledBase(final Http2Stream stream, int padding, boolean endOfStream,
612 final ChannelPromise promise) {
613 checkPositiveOrZero(padding, "padding");
614 this.padding = padding;
615 this.endOfStream = endOfStream;
616 this.stream = stream;
617 this.promise = promise;
618 }
619
620 @Override
621 public void writeComplete() {
622 if (endOfStream) {
623 lifecycleManager.closeStreamLocal(stream, promise);
624 }
625 }
626
627 @Override
628 public void operationComplete(ChannelFuture future) throws Exception {
629 if (!future.isSuccess()) {
630 error(flowController().channelHandlerContext(), future.cause());
631 }
632 }
633 }
634 }