1 /*
2 * Copyright 2016 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License, version
5 * 2.0 (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16 package io.netty.handler.flow;
17
18 import java.util.ArrayDeque;
19 import java.util.Queue;
20
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelDuplexHandler;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.ByteToMessageDecoder;
26 import io.netty.handler.codec.MessageToByteEncoder;
27 import io.netty.util.ReferenceCountUtil;
28 import io.netty.util.internal.ObjectPool;
29 import io.netty.util.internal.ObjectPool.Handle;
30 import io.netty.util.internal.ObjectPool.ObjectCreator;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 /**
35 * The {@link FlowControlHandler} ensures that only one message per {@code read()} is sent downstream.
36 *
37 * Classes such as {@link ByteToMessageDecoder} or {@link MessageToByteEncoder} are free to emit as
38 * many events as they like for any given input. A channel's auto reading configuration doesn't usually
39 * apply in these scenarios. This is causing problems in downstream {@link ChannelHandler}s that would
40 * like to hold subsequent events while they're processing one event. It's a common problem with the
41 * {@code HttpObjectDecoder} that will very often fire an {@code HttpRequest} that is immediately followed
42 * by a {@code LastHttpContent} event.
43 *
44 * <pre>{@code
45 * ChannelPipeline pipeline = ...;
46 *
47 * pipeline.addLast(new HttpServerCodec());
48 * pipeline.addLast(new FlowControlHandler());
49 *
50 * pipeline.addLast(new MyExampleHandler());
51 *
52 * class MyExampleHandler extends ChannelInboundHandlerAdapter {
53 * @Override
54 * public void channelRead(ChannelHandlerContext ctx, Object msg) {
55 * if (msg instanceof HttpRequest) {
56 * ctx.channel().config().setAutoRead(false);
57 *
58 * // The FlowControlHandler will hold any subsequent events that
59 * // were emitted by HttpObjectDecoder until auto reading is turned
60 * // back on or Channel#read() is being called.
61 * }
62 * }
63 * }
64 * }</pre>
65 *
66 * @see ChannelConfig#setAutoRead(boolean)
67 */
68 public class FlowControlHandler extends ChannelDuplexHandler {
69 private static final InternalLogger logger = InternalLoggerFactory.getInstance(FlowControlHandler.class);
70
71 private final boolean releaseMessages;
72
73 private RecyclableArrayDeque queue;
74
75 private ChannelConfig config;
76
77 private boolean shouldConsume;
78
79 public FlowControlHandler() {
80 this(true);
81 }
82
83 public FlowControlHandler(boolean releaseMessages) {
84 this.releaseMessages = releaseMessages;
85 }
86
87 /**
88 * Determine if the underlying {@link Queue} is empty. This method exists for
89 * testing, debugging and inspection purposes and it is not Thread safe!
90 */
91 boolean isQueueEmpty() {
92 return queue == null || queue.isEmpty();
93 }
94
95 /**
96 * Releases all messages and destroys the {@link Queue}.
97 */
98 private void destroy() {
99 if (queue != null) {
100
101 if (!queue.isEmpty()) {
102 logger.trace("Non-empty queue: {}", queue);
103
104 if (releaseMessages) {
105 Object msg;
106 while ((msg = queue.poll()) != null) {
107 ReferenceCountUtil.safeRelease(msg);
108 }
109 }
110 }
111
112 queue.recycle();
113 queue = null;
114 }
115 }
116
117 @Override
118 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
119 config = ctx.channel().config();
120 }
121
122 @Override
123 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
124 super.handlerRemoved(ctx);
125 if (!isQueueEmpty()) {
126 dequeue(ctx, queue.size());
127 }
128 destroy();
129 }
130
131 @Override
132 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
133 destroy();
134 ctx.fireChannelInactive();
135 }
136
137 @Override
138 public void read(ChannelHandlerContext ctx) throws Exception {
139 if (dequeue(ctx, 1) == 0) {
140 // It seems no messages were consumed. We need to read() some
141 // messages from upstream and once one arrives it need to be
142 // relayed to downstream to keep the flow going.
143 shouldConsume = true;
144 ctx.read();
145 } else if (config.isAutoRead()) {
146 ctx.read();
147 }
148 }
149
150 @Override
151 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
152 if (queue == null) {
153 queue = RecyclableArrayDeque.newInstance();
154 }
155
156 queue.offer(msg);
157
158 // We just received one message. Do we need to relay it regardless
159 // of the auto reading configuration? The answer is yes if this
160 // method was called as a result of a prior read() call.
161 int minConsume = shouldConsume ? 1 : 0;
162 shouldConsume = false;
163
164 dequeue(ctx, minConsume);
165 }
166
167 @Override
168 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
169 if (isQueueEmpty()) {
170 ctx.fireChannelReadComplete();
171 } else {
172 // Don't relay completion events from upstream as they
173 // make no sense in this context. See dequeue() where
174 // a new set of completion events is being produced.
175 }
176 }
177
178 /**
179 * Dequeues one or many (or none) messages depending on the channel's auto
180 * reading state and returns the number of messages that were consumed from
181 * the internal queue.
182 *
183 * The {@code minConsume} argument is used to force {@code dequeue()} into
184 * consuming that number of messages regardless of the channel's auto
185 * reading configuration.
186 *
187 * @see #read(ChannelHandlerContext)
188 * @see #channelRead(ChannelHandlerContext, Object)
189 */
190 private int dequeue(ChannelHandlerContext ctx, int minConsume) {
191 int consumed = 0;
192
193 // fireChannelRead(...) may call ctx.read() and so this method may reentrance. Because of this we need to
194 // check if queue was set to null in the meantime and if so break the loop.
195 while (queue != null && (consumed < minConsume || config.isAutoRead())) {
196 Object msg = queue.poll();
197 if (msg == null) {
198 break;
199 }
200
201 ++consumed;
202 ctx.fireChannelRead(msg);
203 }
204
205 // We're firing a completion event every time one (or more)
206 // messages were consumed and the queue ended up being drained
207 // to an empty state.
208 if (queue != null && queue.isEmpty()) {
209 queue.recycle();
210 queue = null;
211
212 if (consumed > 0) {
213 ctx.fireChannelReadComplete();
214 }
215 }
216
217 return consumed;
218 }
219
220 /**
221 * A recyclable {@link ArrayDeque}.
222 */
223 private static final class RecyclableArrayDeque extends ArrayDeque<Object> {
224
225 private static final long serialVersionUID = 0L;
226
227 /**
228 * A value of {@code 2} should be a good choice for most scenarios.
229 */
230 private static final int DEFAULT_NUM_ELEMENTS = 2;
231
232 private static final ObjectPool<RecyclableArrayDeque> RECYCLER = ObjectPool.newPool(
233 new ObjectCreator<RecyclableArrayDeque>() {
234 @Override
235 public RecyclableArrayDeque newObject(Handle<RecyclableArrayDeque> handle) {
236 return new RecyclableArrayDeque(DEFAULT_NUM_ELEMENTS, handle);
237 }
238 });
239
240 public static RecyclableArrayDeque newInstance() {
241 return RECYCLER.get();
242 }
243
244 private final Handle<RecyclableArrayDeque> handle;
245
246 private RecyclableArrayDeque(int numElements, Handle<RecyclableArrayDeque> handle) {
247 super(numElements);
248 this.handle = handle;
249 }
250
251 public void recycle() {
252 clear();
253 handle.recycle(this);
254 }
255 }
256 }