1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.TimeUnit;
31
32 /**
33 * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
34 * read, write, or both operation for a while.
35 *
36 * <h3>Supported idle states</h3>
37 * <table border="1">
38 * <tr>
39 * <th>Property</th><th>Meaning</th>
40 * </tr>
41 * <tr>
42 * <td>{@code readerIdleTime}</td>
43 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
44 * will be triggered when no read was performed for the specified period of
45 * time. Specify {@code 0} to disable.</td>
46 * </tr>
47 * <tr>
48 * <td>{@code writerIdleTime}</td>
49 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
50 * will be triggered when no write was performed for the specified period of
51 * time. Specify {@code 0} to disable.</td>
52 * </tr>
53 * <tr>
54 * <td>{@code allIdleTime}</td>
55 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
56 * will be triggered when neither read nor write was performed for the
57 * specified period of time. Specify {@code 0} to disable.</td>
58 * </tr>
59 * </table>
60 *
61 * <pre>
62 * // An example that sends a ping message when there is no outbound traffic
63 * // for 30 seconds. The connection is closed when there is no inbound traffic
64 * // for 60 seconds.
65 *
66 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
67 * {@code @Override}
68 * public void initChannel({@link Channel} channel) {
69 * channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
70 * channel.pipeline().addLast("myHandler", new MyHandler());
71 * }
72 * }
73 *
74 * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
75 * public class MyHandler extends {@link ChannelDuplexHandler} {
76 * {@code @Override}
77 * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
78 * if (evt instanceof {@link IdleStateEvent}) {
79 * {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
80 * if (e.state() == {@link IdleState}.READER_IDLE) {
81 * ctx.close();
82 * } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
83 * ctx.writeAndFlush(new PingMessage());
84 * }
85 * }
86 * }
87 * }
88 *
89 * {@link ServerBootstrap} bootstrap = ...;
90 * ...
91 * bootstrap.childHandler(new MyChannelInitializer());
92 * ...
93 * </pre>
94 *
95 * @see ReadTimeoutHandler
96 * @see WriteTimeoutHandler
97 */
98 public class IdleStateHandler extends ChannelDuplexHandler {
99 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
100
101 // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
102 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
103 @Override
104 public void operationComplete(ChannelFuture future) throws Exception {
105 lastWriteTime = ticksInNanos();
106 firstWriterIdleEvent = firstAllIdleEvent = true;
107 }
108 };
109
110 private final boolean observeOutput;
111 private final long readerIdleTimeNanos;
112 private final long writerIdleTimeNanos;
113 private final long allIdleTimeNanos;
114
115 private ScheduledFuture<?> readerIdleTimeout;
116 private long lastReadTime;
117 private boolean firstReaderIdleEvent = true;
118
119 private ScheduledFuture<?> writerIdleTimeout;
120 private long lastWriteTime;
121 private boolean firstWriterIdleEvent = true;
122
123 private ScheduledFuture<?> allIdleTimeout;
124 private boolean firstAllIdleEvent = true;
125
126 private byte state; // 0 - none, 1 - initialized, 2 - destroyed
127 private boolean reading;
128
129 private long lastChangeCheckTimeStamp;
130 private int lastMessageHashCode;
131 private long lastPendingWriteBytes;
132
133 /**
134 * Creates a new instance firing {@link IdleStateEvent}s.
135 *
136 * @param readerIdleTimeSeconds
137 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
138 * will be triggered when no read was performed for the specified
139 * period of time. Specify {@code 0} to disable.
140 * @param writerIdleTimeSeconds
141 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
142 * will be triggered when no write was performed for the specified
143 * period of time. Specify {@code 0} to disable.
144 * @param allIdleTimeSeconds
145 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
146 * will be triggered when neither read nor write was performed for
147 * the specified period of time. Specify {@code 0} to disable.
148 */
149 public IdleStateHandler(
150 int readerIdleTimeSeconds,
151 int writerIdleTimeSeconds,
152 int allIdleTimeSeconds) {
153
154 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
155 TimeUnit.SECONDS);
156 }
157
158 /**
159 * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
160 */
161 public IdleStateHandler(
162 long readerIdleTime, long writerIdleTime, long allIdleTime,
163 TimeUnit unit) {
164 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
165 }
166
167 /**
168 * Creates a new instance firing {@link IdleStateEvent}s.
169 *
170 * @param observeOutput
171 * whether or not the consumption of {@code bytes} should be taken into
172 * consideration when assessing write idleness. The default is {@code false}.
173 * @param readerIdleTime
174 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
175 * will be triggered when no read was performed for the specified
176 * period of time. Specify {@code 0} to disable.
177 * @param writerIdleTime
178 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
179 * will be triggered when no write was performed for the specified
180 * period of time. Specify {@code 0} to disable.
181 * @param allIdleTime
182 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
183 * will be triggered when neither read nor write was performed for
184 * the specified period of time. Specify {@code 0} to disable.
185 * @param unit
186 * the {@link TimeUnit} of {@code readerIdleTime},
187 * {@code writeIdleTime}, and {@code allIdleTime}
188 */
189 public IdleStateHandler(boolean observeOutput,
190 long readerIdleTime, long writerIdleTime, long allIdleTime,
191 TimeUnit unit) {
192 if (unit == null) {
193 throw new NullPointerException("unit");
194 }
195
196 this.observeOutput = observeOutput;
197
198 if (readerIdleTime <= 0) {
199 readerIdleTimeNanos = 0;
200 } else {
201 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeNanos = 0;
205 } else {
206 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeNanos = 0;
210 } else {
211 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212 }
213 }
214
215 /**
216 * Return the readerIdleTime that was given when instance this class in milliseconds.
217 *
218 */
219 public long getReaderIdleTimeInMillis() {
220 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221 }
222
223 /**
224 * Return the writerIdleTime that was given when instance this class in milliseconds.
225 *
226 */
227 public long getWriterIdleTimeInMillis() {
228 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229 }
230
231 /**
232 * Return the allIdleTime that was given when instance this class in milliseconds.
233 *
234 */
235 public long getAllIdleTimeInMillis() {
236 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237 }
238
239 @Override
240 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242 // channelActive() event has been fired already, which means this.channelActive() will
243 // not be invoked. We have to initialize here instead.
244 initialize(ctx);
245 } else {
246 // channelActive() event has not been fired yet. this.channelActive() will be invoked
247 // and initialization will occur there.
248 }
249 }
250
251 @Override
252 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253 destroy();
254 }
255
256 @Override
257 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258 // Initialize early if channel is active already.
259 if (ctx.channel().isActive()) {
260 initialize(ctx);
261 }
262 super.channelRegistered(ctx);
263 }
264
265 @Override
266 public void channelActive(ChannelHandlerContext ctx) throws Exception {
267 // This method will be invoked only if this handler was added
268 // before channelActive() event is fired. If a user adds this handler
269 // after the channelActive() event, initialize() will be called by beforeAdd().
270 initialize(ctx);
271 super.channelActive(ctx);
272 }
273
274 @Override
275 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276 destroy();
277 super.channelInactive(ctx);
278 }
279
280 @Override
281 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283 reading = true;
284 firstReaderIdleEvent = firstAllIdleEvent = true;
285 }
286 ctx.fireChannelRead(msg);
287 }
288
289 @Override
290 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292 lastReadTime = ticksInNanos();
293 reading = false;
294 }
295 ctx.fireChannelReadComplete();
296 }
297
298 @Override
299 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300 // Allow writing with void promise if handler is only configured for read timeout events.
301 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302 ctx.write(msg, promise).addListener(writeListener);
303 } else {
304 ctx.write(msg, promise);
305 }
306 }
307
308 private void initialize(ChannelHandlerContext ctx) {
309 // Avoid the case where destroy() is called before scheduling timeouts.
310 // See: https://github.com/netty/netty/issues/143
311 switch (state) {
312 case 1:
313 case 2:
314 return;
315 }
316
317 state = 1;
318 initOutputChanged(ctx);
319
320 lastReadTime = lastWriteTime = ticksInNanos();
321 if (readerIdleTimeNanos > 0) {
322 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
323 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
324 }
325 if (writerIdleTimeNanos > 0) {
326 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
327 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
328 }
329 if (allIdleTimeNanos > 0) {
330 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
331 allIdleTimeNanos, TimeUnit.NANOSECONDS);
332 }
333 }
334
335 /**
336 * This method is visible for testing!
337 */
338 long ticksInNanos() {
339 return System.nanoTime();
340 }
341
342 /**
343 * This method is visible for testing!
344 */
345 ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
346 return ctx.executor().schedule(task, delay, unit);
347 }
348
349 private void destroy() {
350 state = 2;
351
352 if (readerIdleTimeout != null) {
353 readerIdleTimeout.cancel(false);
354 readerIdleTimeout = null;
355 }
356 if (writerIdleTimeout != null) {
357 writerIdleTimeout.cancel(false);
358 writerIdleTimeout = null;
359 }
360 if (allIdleTimeout != null) {
361 allIdleTimeout.cancel(false);
362 allIdleTimeout = null;
363 }
364 }
365
366 /**
367 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
368 * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
369 */
370 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
371 ctx.fireUserEventTriggered(evt);
372 }
373
374 /**
375 * Returns a {@link IdleStateEvent}.
376 */
377 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
378 switch (state) {
379 case ALL_IDLE:
380 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
381 case READER_IDLE:
382 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
383 case WRITER_IDLE:
384 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
385 default:
386 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
387 }
388 }
389
390 /**
391 * @see #hasOutputChanged(ChannelHandlerContext, boolean)
392 */
393 private void initOutputChanged(ChannelHandlerContext ctx) {
394 if (observeOutput) {
395 Channel channel = ctx.channel();
396 Unsafe unsafe = channel.unsafe();
397 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
398
399 if (buf != null) {
400 lastMessageHashCode = System.identityHashCode(buf.current());
401 lastPendingWriteBytes = buf.totalPendingWriteBytes();
402 }
403 }
404 }
405
406 /**
407 * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
408 * with {@link #observeOutput} enabled and there has been an observed change in the
409 * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
410 *
411 * https://github.com/netty/netty/issues/6150
412 */
413 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
414 if (observeOutput) {
415
416 // We can take this shortcut if the ChannelPromises that got passed into write()
417 // appear to complete. It indicates "change" on message level and we simply assume
418 // that there's change happening on byte level. If the user doesn't observe channel
419 // writability events then they'll eventually OOME and there's clearly a different
420 // problem and idleness is least of their concerns.
421 if (lastChangeCheckTimeStamp != lastWriteTime) {
422 lastChangeCheckTimeStamp = lastWriteTime;
423
424 // But this applies only if it's the non-first call.
425 if (!first) {
426 return true;
427 }
428 }
429
430 Channel channel = ctx.channel();
431 Unsafe unsafe = channel.unsafe();
432 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
433
434 if (buf != null) {
435 int messageHashCode = System.identityHashCode(buf.current());
436 long pendingWriteBytes = buf.totalPendingWriteBytes();
437
438 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
439 lastMessageHashCode = messageHashCode;
440 lastPendingWriteBytes = pendingWriteBytes;
441
442 if (!first) {
443 return true;
444 }
445 }
446 }
447 }
448
449 return false;
450 }
451
452 private abstract static class AbstractIdleTask implements Runnable {
453
454 private final ChannelHandlerContext ctx;
455
456 AbstractIdleTask(ChannelHandlerContext ctx) {
457 this.ctx = ctx;
458 }
459
460 @Override
461 public void run() {
462 if (!ctx.channel().isOpen()) {
463 return;
464 }
465
466 run(ctx);
467 }
468
469 protected abstract void run(ChannelHandlerContext ctx);
470 }
471
472 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
473
474 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
475 super(ctx);
476 }
477
478 @Override
479 protected void run(ChannelHandlerContext ctx) {
480 long nextDelay = readerIdleTimeNanos;
481 if (!reading) {
482 nextDelay -= ticksInNanos() - lastReadTime;
483 }
484
485 if (nextDelay <= 0) {
486 // Reader is idle - set a new timeout and notify the callback.
487 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
488
489 boolean first = firstReaderIdleEvent;
490 firstReaderIdleEvent = false;
491
492 try {
493 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
494 channelIdle(ctx, event);
495 } catch (Throwable t) {
496 ctx.fireExceptionCaught(t);
497 }
498 } else {
499 // Read occurred before the timeout - set a new timeout with shorter delay.
500 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
501 }
502 }
503 }
504
505 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
506
507 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
508 super(ctx);
509 }
510
511 @Override
512 protected void run(ChannelHandlerContext ctx) {
513
514 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
515 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
516 if (nextDelay <= 0) {
517 // Writer is idle - set a new timeout and notify the callback.
518 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
519
520 boolean first = firstWriterIdleEvent;
521 firstWriterIdleEvent = false;
522
523 try {
524 if (hasOutputChanged(ctx, first)) {
525 return;
526 }
527
528 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
529 channelIdle(ctx, event);
530 } catch (Throwable t) {
531 ctx.fireExceptionCaught(t);
532 }
533 } else {
534 // Write occurred before the timeout - set a new timeout with shorter delay.
535 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
536 }
537 }
538 }
539
540 private final class AllIdleTimeoutTask extends AbstractIdleTask {
541
542 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
543 super(ctx);
544 }
545
546 @Override
547 protected void run(ChannelHandlerContext ctx) {
548
549 long nextDelay = allIdleTimeNanos;
550 if (!reading) {
551 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
552 }
553 if (nextDelay <= 0) {
554 // Both reader and writer are idle - set a new timeout and
555 // notify the callback.
556 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
557
558 boolean first = firstAllIdleEvent;
559 firstAllIdleEvent = false;
560
561 try {
562 if (hasOutputChanged(ctx, first)) {
563 return;
564 }
565
566 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
567 channelIdle(ctx, event);
568 } catch (Throwable t) {
569 ctx.fireExceptionCaught(t);
570 }
571 } else {
572 // Either read or write occurred before the timeout - set a new
573 // timeout with shorter delay.
574 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
575 }
576 }
577 }
578 }