1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.flush;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelDuplexHandler;
20 import io.netty.channel.ChannelHandler;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOutboundHandler;
23 import io.netty.channel.ChannelOutboundInvoker;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.util.internal.ObjectUtil;
27
28 import java.util.concurrent.Future;
29
30 /**
31 * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
32 * operations (which also includes
33 * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
34 * {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
35 * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
36 * <p>
37 * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
38 * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
39 * as much as possible.
40 * <p>
41 * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
42 * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
43 * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
44 * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
45 * <ul>
46 * <li>if {@code false}, flushes are passed on to the next handler directly;</li>
47 * <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
48 * high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
49 * batching multiple flushes into one.</li>
50 * </ul>
51 * If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
52 * while batching outside of a read loop).
53 * <p>
54 * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
55 * <p>
56 * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
57 * {@link ChannelPipeline} to have the best effect.
58 */
59 public class FlushConsolidationHandler extends ChannelDuplexHandler {
60 private final int explicitFlushAfterFlushes;
61 private final boolean consolidateWhenNoReadInProgress;
62 private final Runnable flushTask;
63 private int flushPendingCount;
64 private boolean readInProgress;
65 private ChannelHandlerContext ctx;
66 private Future<?> nextScheduledFlush;
67
68 /**
69 * The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
70 * read loop, or while batching outside of a read loop).
71 */
72 public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
73
74 /**
75 * Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
76 * operations at the latest.
77 */
78 public FlushConsolidationHandler() {
79 this(DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false);
80 }
81
82 /**
83 * Create new instance which doesn't consolidate flushes when no read is in progress.
84 *
85 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
86 */
87 public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
88 this(explicitFlushAfterFlushes, false);
89 }
90
91 /**
92 * Create new instance.
93 *
94 * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
95 * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
96 * ongoing.
97 */
98 public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
99 this.explicitFlushAfterFlushes =
100 ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
101 this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
102 this.flushTask = consolidateWhenNoReadInProgress ?
103 new Runnable() {
104 @Override
105 public void run() {
106 if (flushPendingCount > 0 && !readInProgress) {
107 flushPendingCount = 0;
108 nextScheduledFlush = null;
109 ctx.flush();
110 } // else we'll flush when the read completes
111 }
112 }
113 : null;
114 }
115
116 @Override
117 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
118 this.ctx = ctx;
119 }
120
121 @Override
122 public void flush(ChannelHandlerContext ctx) throws Exception {
123 if (readInProgress) {
124 // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
125 // we only need to flush if we reach the explicitFlushAfterFlushes limit.
126 if (++flushPendingCount == explicitFlushAfterFlushes) {
127 flushNow(ctx);
128 }
129 } else if (consolidateWhenNoReadInProgress) {
130 // Flush immediately if we reach the threshold, otherwise schedule
131 if (++flushPendingCount == explicitFlushAfterFlushes) {
132 flushNow(ctx);
133 } else {
134 scheduleFlush(ctx);
135 }
136 } else {
137 // Always flush directly
138 flushNow(ctx);
139 }
140 }
141
142 @Override
143 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
144 // This may be the last event in the read loop, so flush now!
145 resetReadAndFlushIfNeeded(ctx);
146 ctx.fireChannelReadComplete();
147 }
148
149 @Override
150 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
151 readInProgress = true;
152 ctx.fireChannelRead(msg);
153 }
154
155 @Override
156 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
157 // To ensure we not miss to flush anything, do it now.
158 resetReadAndFlushIfNeeded(ctx);
159 ctx.fireExceptionCaught(cause);
160 }
161
162 @Override
163 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
164 // Try to flush one last time if flushes are pending before disconnect the channel.
165 resetReadAndFlushIfNeeded(ctx);
166 ctx.disconnect(promise);
167 }
168
169 @Override
170 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
171 // Try to flush one last time if flushes are pending before close the channel.
172 resetReadAndFlushIfNeeded(ctx);
173 ctx.close(promise);
174 }
175
176 @Override
177 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
178 if (!ctx.channel().isWritable()) {
179 // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
180 flushIfNeeded(ctx);
181 }
182 ctx.fireChannelWritabilityChanged();
183 }
184
185 @Override
186 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
187 flushIfNeeded(ctx);
188 }
189
190 private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
191 readInProgress = false;
192 flushIfNeeded(ctx);
193 }
194
195 private void flushIfNeeded(ChannelHandlerContext ctx) {
196 if (flushPendingCount > 0) {
197 flushNow(ctx);
198 }
199 }
200
201 private void flushNow(ChannelHandlerContext ctx) {
202 cancelScheduledFlush();
203 flushPendingCount = 0;
204 ctx.flush();
205 }
206
207 private void scheduleFlush(final ChannelHandlerContext ctx) {
208 if (nextScheduledFlush == null) {
209 // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
210 nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
211 }
212 }
213
214 private void cancelScheduledFlush() {
215 if (nextScheduledFlush != null) {
216 nextScheduledFlush.cancel(false);
217 nextScheduledFlush = null;
218 }
219 }
220 }