1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.queue;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelConfig;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.Channels;
27 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28 import org.jboss.netty.channel.MessageEvent;
29 import org.jboss.netty.channel.SimpleChannelHandler;
30 import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
31 import org.jboss.netty.util.HashedWheelTimer;
32
33 import java.io.IOException;
34 import java.nio.channels.ClosedChannelException;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Queue;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.ConcurrentLinkedQueue;
40 import java.util.concurrent.atomic.AtomicBoolean;
41
42 /**
43 * Emulates buffered write operation. This handler stores all write requests
44 * into an unbounded {@link Queue} and flushes them to the downstream when
45 * {@link #flush()} method is called.
46 * <p>
47 * Here is an example that demonstrates the usage:
48 * <pre>
49 * BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
50 * ChannelPipeline p = ...;
51 * p.addFirst("buffer", bufferedWriter);
52 *
53 * ...
54 *
55 * Channel ch = ...;
56 *
57 * // msg1, 2, and 3 are stored in the queue of bufferedWriter.
58 * ch.write(msg1);
59 * ch.write(msg2);
60 * ch.write(msg3);
61 *
62 * // and will be flushed on request.
63 * bufferedWriter.flush();
64 * </pre>
65 *
66 * <h3>Auto-flush</h3>
67 * The write request queue is automatically flushed when the associated
68 * {@link Channel} is disconnected or closed. However, it does not flush the
69 * queue otherwise. It means you have to call {@link #flush()} before the size
70 * of the queue increases too much. You can implement your own auto-flush
71 * strategy by extending this handler:
72 * <pre>
73 * public class AutoFlusher extends {@link BufferedWriteHandler} {
74 *
75 * private final AtomicLong bufferSize = new AtomicLong();
76 *
77 * {@literal @Override}
78 * public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
79 * super.writeRequested(ctx, e);
80 *
81 * {@link ChannelBuffer} data = ({@link ChannelBuffer}) e.getMessage();
82 * int newBufferSize = bufferSize.addAndGet(data.readableBytes());
83 *
84 * // Flush the queue if it gets larger than 8KiB.
85 * if (newBufferSize > 8192) {
86 * flush();
87 * bufferSize.set(0);
88 * }
89 * }
90 * }
91 * </pre>
92 *
93 * <h3>Consolidate on flush</h3>
94 *
95 * If there are two or more write requests in the queue and all their message
96 * type is {@link ChannelBuffer}, they can be merged into a single write request
97 * to save the number of system calls.
98 * <pre>
99 * BEFORE consolidation: AFTER consolidation:
100 * +-------+-------+-------+ +-------------+
101 * | Req C | Req B | Req A |------\\| Request ABC |
102 * | "789" | "456" | "123" |------//| "123456789" |
103 * +-------+-------+-------+ +-------------+
104 * </pre>
105 * This feature is disabled by default. You can override the default when you
106 * create this handler or call {@link #flush(boolean)}. If you specified
107 * {@code true} when you call the constructor, calling {@link #flush()} will
108 * always consolidate the queue. Otherwise, you have to call
109 * {@link #flush(boolean)} with {@code true} to enable this feature for each
110 * flush.
111 * <p>
112 * The disadvantage of consolidation is that the {@link ChannelFuture} and its
113 * {@link ChannelFutureListener}s associated with the original write requests
114 * might be notified later than when they are actually written out. They will
115 * always be notified when the consolidated write request is fully written.
116 * <p>
117 * The following example implements the consolidation strategy that reduces
118 * the number of write requests based on the writability of a channel:
119 * <pre>
120 * public class ConsolidatingAutoFlusher extends {@link BufferedWriteHandler} {
121 *
122 * public ConsolidatingAutoFlusher() {
123 * // Enable consolidation by default.
124 * super(true);
125 * }
126 *
127 * {@literal @Override}
128 * public void channelOpen({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
129 * {@link ChannelConfig} cfg = e.getChannel().getConfig();
130 * if (cfg instanceof {@link NioSocketChannelConfig}) {
131 * // Lower the watermark to increase the chance of consolidation.
132 * cfg.setWriteBufferLowWaterMark(0);
133 * }
134 * super.channelOpen(e);
135 * }
136 *
137 * {@literal @Override}
138 * public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) throws Exception {
139 * super.writeRequested(ctx, et);
140 * if (e.getChannel().isWritable()) {
141 * flush();
142 * }
143 * }
144 *
145 * {@literal @Override}
146 * public void channelInterestChanged(
147 * {@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
148 * if (e.getChannel().isWritable()) {
149 * flush();
150 * }
151 * }
152 * }
153 * </pre>
154 *
155 * <h3>Prioritized Writes</h3>
156 *
157 * You can implement prioritized writes by specifying an unbounded priority
158 * queue in the constructor of this handler. It will be required to design
159 * the proper strategy to determine how often {@link #flush()} should be called.
160 * For example, you could call {@link #flush()} periodically, using
161 * {@link HashedWheelTimer} every second.
162 * @apiviz.landmark
163 */
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165
166 private final Queue<MessageEvent> queue;
167 private final boolean consolidateOnFlush;
168 private volatile ChannelHandlerContext ctx;
169 private final AtomicBoolean flush = new AtomicBoolean(false);
170
171 /**
172 * Creates a new instance with the default unbounded {@link BlockingQueue}
173 * implementation and without buffer consolidation.
174 */
175 public BufferedWriteHandler() {
176 this(false);
177 }
178
179 /**
180 * Creates a new instance with the specified thread-safe unbounded
181 * {@link Queue} and without buffer consolidation. Please note that
182 * specifying a bounded {@link Queue} or a thread-unsafe {@link Queue} will
183 * result in an unspecified behavior.
184 */
185 public BufferedWriteHandler(Queue<MessageEvent> queue) {
186 this(queue, false);
187 }
188
189 /**
190 * Creates a new instance with {@link ConcurrentLinkedQueue}
191 *
192 * @param consolidateOnFlush
193 * {@code true} if and only if the buffered write requests are merged
194 * into a single write request on {@link #flush()}
195 */
196 public BufferedWriteHandler(boolean consolidateOnFlush) {
197 this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198 }
199
200 /**
201 * Creates a new instance with the specified thread-safe unbounded
202 * {@link Queue}. Please note that specifying a bounded {@link Queue} or
203 * a thread-unsafe {@link Queue} will result in an unspecified behavior.
204 *
205 * @param consolidateOnFlush
206 * {@code true} if and only if the buffered write requests are merged
207 * into a single write request on {@link #flush()}
208 */
209 public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210 if (queue == null) {
211 throw new NullPointerException("queue");
212 }
213 this.queue = queue;
214 this.consolidateOnFlush = consolidateOnFlush;
215 }
216
217 public boolean isConsolidateOnFlush() {
218 return consolidateOnFlush;
219 }
220
221 /**
222 * Returns the queue which stores the write requests. The default
223 * implementation returns the queue which was specified in the constructor.
224 */
225 protected Queue<MessageEvent> getQueue() {
226 return queue;
227 }
228
229 /**
230 * Sends the queued write requests to the downstream.
231 */
232 public void flush() {
233 flush(consolidateOnFlush);
234 }
235
236 /**
237 * Sends the queued write requests to the downstream.
238 *
239 * @param consolidateOnFlush
240 * {@code true} if and only if the buffered write requests are merged
241 * into a single write request
242 */
243 public void flush(boolean consolidateOnFlush) {
244 final ChannelHandlerContext ctx = this.ctx;
245 if (ctx == null) {
246 // No write request was made.
247 return;
248 }
249 Channel channel = ctx.getChannel();
250 boolean acquired;
251
252 // use CAS to see if the have flush already running, if so we don't need to take further actions
253 if (acquired = flush.compareAndSet(false, true)) {
254 final Queue<MessageEvent> queue = getQueue();
255 if (consolidateOnFlush) {
256 if (queue.isEmpty()) {
257 flush.set(false);
258 return;
259 }
260
261 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
262 for (;;) {
263 MessageEvent e = queue.poll();
264 if (e == null) {
265 break;
266 }
267 if (!(e.getMessage() instanceof ChannelBuffer)) {
268 if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
269 pendingWrites = new ArrayList<MessageEvent>();
270 }
271 ctx.sendDownstream(e);
272 } else {
273 pendingWrites.add(e);
274 }
275 }
276 consolidatedWrite(pendingWrites);
277 } else {
278 for (;;) {
279 MessageEvent e = queue.poll();
280 if (e == null) {
281 break;
282 }
283 ctx.sendDownstream(e);
284 }
285 }
286 flush.set(false);
287 }
288
289 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
290 flush(consolidateOnFlush);
291 }
292 }
293
294 private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
295 final int size = pendingWrites.size();
296 if (size == 1) {
297 ctx.sendDownstream(pendingWrites.remove(0));
298 return pendingWrites;
299 }
300 if (size == 0) {
301 return pendingWrites;
302 }
303
304 ChannelBuffer[] data = new ChannelBuffer[size];
305 for (int i = 0; i < data.length; i ++) {
306 data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
307 }
308
309 ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
310 ChannelFuture future = Channels.future(ctx.getChannel());
311 future.addListener(new ChannelFutureListener() {
312 public void operationComplete(ChannelFuture future)
313 throws Exception {
314 if (future.isSuccess()) {
315 for (MessageEvent e: pendingWrites) {
316 e.getFuture().setSuccess();
317 }
318 } else {
319 Throwable cause = future.getCause();
320 for (MessageEvent e: pendingWrites) {
321 e.getFuture().setFailure(cause);
322 }
323 }
324 }
325 });
326
327 Channels.write(ctx, future, composite);
328 return null;
329 }
330
331 /**
332 * Stores all write requests to the queue so that they are actually written
333 * on {@link #flush()}.
334 */
335 @Override
336 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
337 throws Exception {
338 if (this.ctx == null) {
339 this.ctx = ctx;
340 } else {
341 assert this.ctx == ctx;
342 }
343
344 getQueue().add(e);
345 }
346
347 @Override
348 public void disconnectRequested(ChannelHandlerContext ctx,
349 ChannelStateEvent e) throws Exception {
350 try {
351 flush(consolidateOnFlush);
352 } finally {
353 ctx.sendDownstream(e);
354 }
355 }
356
357 @Override
358 public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
359 throws Exception {
360 try {
361 flush(consolidateOnFlush);
362 } finally {
363 ctx.sendDownstream(e);
364 }
365 }
366
367 /**
368 * Fail all buffered writes that are left. See
369 * <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
370 */
371 @Override
372 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
373 Throwable cause = null;
374 for (;;) {
375 MessageEvent ev = queue.poll();
376
377 if (ev == null) {
378 break;
379 }
380
381 if (cause == null) {
382 cause = new ClosedChannelException();
383 }
384 ev.getFuture().setFailure(cause);
385 }
386 if (cause != null) {
387 Channels.fireExceptionCaught(ctx.getChannel(), cause);
388 }
389
390 super.channelClosed(ctx, e);
391 }
392
393 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
394 // Nothing to do
395 }
396
397 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
398 // Nothing to do
399 }
400
401 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
402 // flush a last time before remove the handler
403 flush(consolidateOnFlush);
404 }
405
406 /**
407 * Fail all buffered writes that are left.
408 * See <a href="https://github.com/netty/netty/issues/308>#308</a> for more details.
409 */
410 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
411 Throwable cause = null;
412 for (;;) {
413 MessageEvent ev = queue.poll();
414
415 if (ev == null) {
416 break;
417 }
418
419 if (cause == null) {
420 cause = new IOException("Unable to flush message");
421 }
422 ev.getFuture().setFailure(cause);
423 }
424
425 if (cause != null) {
426 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
427 }
428 }
429 }