1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.internal.ObjectUtil;
30
31 import java.util.concurrent.TimeUnit;
32
33 /**
34 * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35 * read, write, or both operation for a while.
36 *
37 * <h3>Supported idle states</h3>
38 * <table border="1">
39 * <tr>
40 * <th>Property</th><th>Meaning</th>
41 * </tr>
42 * <tr>
43 * <td>{@code readerIdleTime}</td>
44 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45 * will be triggered when no read was performed for the specified period of
46 * time. Specify {@code 0} to disable.</td>
47 * </tr>
48 * <tr>
49 * <td>{@code writerIdleTime}</td>
50 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51 * will be triggered when no write was performed for the specified period of
52 * time. Specify {@code 0} to disable.</td>
53 * </tr>
54 * <tr>
55 * <td>{@code allIdleTime}</td>
56 * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57 * will be triggered when neither read nor write was performed for the
58 * specified period of time. Specify {@code 0} to disable.</td>
59 * </tr>
60 * </table>
61 *
62 * <pre>
63 * // An example that sends a ping message when there is no outbound traffic
64 * // for 30 seconds. The connection is closed when there is no inbound traffic
65 * // for 60 seconds.
66 *
67 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
68 * {@code @Override}
69 * public void initChannel({@link Channel} channel) {
70 * channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
71 * channel.pipeline().addLast("myHandler", new MyHandler());
72 * }
73 * }
74 *
75 * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76 * public class MyHandler extends {@link ChannelDuplexHandler} {
77 * {@code @Override}
78 * public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79 * if (evt instanceof {@link IdleStateEvent}) {
80 * {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81 * if (e.state() == {@link IdleState}.READER_IDLE) {
82 * ctx.close();
83 * } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84 * ctx.writeAndFlush(new PingMessage());
85 * }
86 * }
87 * }
88 * }
89 *
90 * {@link ServerBootstrap} bootstrap = ...;
91 * ...
92 * bootstrap.childHandler(new MyChannelInitializer());
93 * ...
94 * </pre>
95 *
96 * @see ReadTimeoutHandler
97 * @see WriteTimeoutHandler
98 */
99 public class IdleStateHandler extends ChannelDuplexHandler {
100 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102 // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
103 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104 @Override
105 public void operationComplete(ChannelFuture future) throws Exception {
106 lastWriteTime = ticksInNanos();
107 firstWriterIdleEvent = firstAllIdleEvent = true;
108 }
109 };
110
111 private final boolean observeOutput;
112 private final long readerIdleTimeNanos;
113 private final long writerIdleTimeNanos;
114 private final long allIdleTimeNanos;
115
116 private Future<?> readerIdleTimeout;
117 private long lastReadTime;
118 private boolean firstReaderIdleEvent = true;
119
120 private Future<?> writerIdleTimeout;
121 private long lastWriteTime;
122 private boolean firstWriterIdleEvent = true;
123
124 private Future<?> allIdleTimeout;
125 private boolean firstAllIdleEvent = true;
126
127 private byte state;
128 private static final byte ST_INITIALIZED = 1;
129 private static final byte ST_DESTROYED = 2;
130
131 private boolean reading;
132
133 private long lastChangeCheckTimeStamp;
134 private int lastMessageHashCode;
135 private long lastPendingWriteBytes;
136 private long lastFlushProgress;
137
138 /**
139 * Creates a new instance firing {@link IdleStateEvent}s.
140 *
141 * @param readerIdleTimeSeconds
142 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
143 * will be triggered when no read was performed for the specified
144 * period of time. Specify {@code 0} to disable.
145 * @param writerIdleTimeSeconds
146 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
147 * will be triggered when no write was performed for the specified
148 * period of time. Specify {@code 0} to disable.
149 * @param allIdleTimeSeconds
150 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
151 * will be triggered when neither read nor write was performed for
152 * the specified period of time. Specify {@code 0} to disable.
153 */
154 public IdleStateHandler(
155 int readerIdleTimeSeconds,
156 int writerIdleTimeSeconds,
157 int allIdleTimeSeconds) {
158
159 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160 TimeUnit.SECONDS);
161 }
162
163 /**
164 * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
165 */
166 public IdleStateHandler(
167 long readerIdleTime, long writerIdleTime, long allIdleTime,
168 TimeUnit unit) {
169 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
170 }
171
172 /**
173 * Creates a new instance firing {@link IdleStateEvent}s.
174 *
175 * @param observeOutput
176 * whether or not the consumption of {@code bytes} should be taken into
177 * consideration when assessing write idleness. The default is {@code false}.
178 * @param readerIdleTime
179 * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
180 * will be triggered when no read was performed for the specified
181 * period of time. Specify {@code 0} to disable.
182 * @param writerIdleTime
183 * an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
184 * will be triggered when no write was performed for the specified
185 * period of time. Specify {@code 0} to disable.
186 * @param allIdleTime
187 * an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
188 * will be triggered when neither read nor write was performed for
189 * the specified period of time. Specify {@code 0} to disable.
190 * @param unit
191 * the {@link TimeUnit} of {@code readerIdleTime},
192 * {@code writeIdleTime}, and {@code allIdleTime}
193 */
194 public IdleStateHandler(boolean observeOutput,
195 long readerIdleTime, long writerIdleTime, long allIdleTime,
196 TimeUnit unit) {
197 ObjectUtil.checkNotNull(unit, "unit");
198
199 this.observeOutput = observeOutput;
200
201 if (readerIdleTime <= 0) {
202 readerIdleTimeNanos = 0;
203 } else {
204 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
205 }
206 if (writerIdleTime <= 0) {
207 writerIdleTimeNanos = 0;
208 } else {
209 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
210 }
211 if (allIdleTime <= 0) {
212 allIdleTimeNanos = 0;
213 } else {
214 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
215 }
216 }
217
218 /**
219 * Return the readerIdleTime that was given when instance this class in milliseconds.
220 *
221 */
222 public long getReaderIdleTimeInMillis() {
223 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
224 }
225
226 /**
227 * Return the writerIdleTime that was given when instance this class in milliseconds.
228 *
229 */
230 public long getWriterIdleTimeInMillis() {
231 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
232 }
233
234 /**
235 * Return the allIdleTime that was given when instance this class in milliseconds.
236 *
237 */
238 public long getAllIdleTimeInMillis() {
239 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
240 }
241
242 @Override
243 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
244 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
245 // channelActive() event has been fired already, which means this.channelActive() will
246 // not be invoked. We have to initialize here instead.
247 initialize(ctx);
248 } else {
249 // channelActive() event has not been fired yet. this.channelActive() will be invoked
250 // and initialization will occur there.
251 }
252 }
253
254 @Override
255 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
256 destroy();
257 }
258
259 @Override
260 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
261 // Initialize early if channel is active already.
262 if (ctx.channel().isActive()) {
263 initialize(ctx);
264 }
265 super.channelRegistered(ctx);
266 }
267
268 @Override
269 public void channelActive(ChannelHandlerContext ctx) throws Exception {
270 // This method will be invoked only if this handler was added
271 // before channelActive() event is fired. If a user adds this handler
272 // after the channelActive() event, initialize() will be called by beforeAdd().
273 initialize(ctx);
274 super.channelActive(ctx);
275 }
276
277 @Override
278 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
279 destroy();
280 super.channelInactive(ctx);
281 }
282
283 @Override
284 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
285 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
286 reading = true;
287 firstReaderIdleEvent = firstAllIdleEvent = true;
288 }
289 ctx.fireChannelRead(msg);
290 }
291
292 @Override
293 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
294 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
295 lastReadTime = ticksInNanos();
296 reading = false;
297 }
298 ctx.fireChannelReadComplete();
299 }
300
301 @Override
302 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
303 // Allow writing with void promise if handler is only configured for read timeout events.
304 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
305 ctx.write(msg, promise.unvoid()).addListener(writeListener);
306 } else {
307 ctx.write(msg, promise);
308 }
309 }
310
311 /**
312 * Reset the read timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
313 */
314 public void resetReadTimeout() {
315 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
316 lastReadTime = ticksInNanos();
317 reading = false;
318 }
319 }
320
321 /**
322 * Reset the write timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
323 */
324 public void resetWriteTimeout() {
325 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
326 lastWriteTime = ticksInNanos();
327 }
328 }
329
330 private void initialize(ChannelHandlerContext ctx) {
331 // Avoid the case where destroy() is called before scheduling timeouts.
332 // See: https://github.com/netty/netty/issues/143
333 switch (state) {
334 case 1:
335 case 2:
336 return;
337 default:
338 break;
339 }
340
341 state = ST_INITIALIZED;
342 initOutputChanged(ctx);
343
344 lastReadTime = lastWriteTime = ticksInNanos();
345 if (readerIdleTimeNanos > 0) {
346 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
347 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
348 }
349 if (writerIdleTimeNanos > 0) {
350 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
351 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
352 }
353 if (allIdleTimeNanos > 0) {
354 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
355 allIdleTimeNanos, TimeUnit.NANOSECONDS);
356 }
357 }
358
359 /**
360 * This method is visible for testing!
361 */
362 long ticksInNanos() {
363 return System.nanoTime();
364 }
365
366 /**
367 * This method is visible for testing!
368 */
369 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
370 return ctx.executor().schedule(task, delay, unit);
371 }
372
373 private void destroy() {
374 state = ST_DESTROYED;
375
376 if (readerIdleTimeout != null) {
377 readerIdleTimeout.cancel(false);
378 readerIdleTimeout = null;
379 }
380 if (writerIdleTimeout != null) {
381 writerIdleTimeout.cancel(false);
382 writerIdleTimeout = null;
383 }
384 if (allIdleTimeout != null) {
385 allIdleTimeout.cancel(false);
386 allIdleTimeout = null;
387 }
388 }
389
390 /**
391 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
392 * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
393 */
394 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
395 ctx.fireUserEventTriggered(evt);
396 }
397
398 /**
399 * Returns a {@link IdleStateEvent}.
400 */
401 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
402 switch (state) {
403 case ALL_IDLE:
404 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
405 case READER_IDLE:
406 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
407 case WRITER_IDLE:
408 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
409 default:
410 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
411 }
412 }
413
414 /**
415 * @see #hasOutputChanged(ChannelHandlerContext, boolean)
416 */
417 private void initOutputChanged(ChannelHandlerContext ctx) {
418 if (observeOutput) {
419 Channel channel = ctx.channel();
420 Unsafe unsafe = channel.unsafe();
421 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
422
423 if (buf != null) {
424 lastMessageHashCode = System.identityHashCode(buf.current());
425 lastPendingWriteBytes = buf.totalPendingWriteBytes();
426 lastFlushProgress = buf.currentProgress();
427 }
428 }
429 }
430
431 /**
432 * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
433 * with {@link #observeOutput} enabled and there has been an observed change in the
434 * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
435 *
436 * https://github.com/netty/netty/issues/6150
437 */
438 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
439 if (observeOutput) {
440
441 // We can take this shortcut if the ChannelPromises that got passed into write()
442 // appear to complete. It indicates "change" on message level and we simply assume
443 // that there's change happening on byte level. If the user doesn't observe channel
444 // writability events then they'll eventually OOME and there's clearly a different
445 // problem and idleness is least of their concerns.
446 if (lastChangeCheckTimeStamp != lastWriteTime) {
447 lastChangeCheckTimeStamp = lastWriteTime;
448
449 // But this applies only if it's the non-first call.
450 if (!first) {
451 return true;
452 }
453 }
454
455 Channel channel = ctx.channel();
456 Unsafe unsafe = channel.unsafe();
457 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
458
459 if (buf != null) {
460 int messageHashCode = System.identityHashCode(buf.current());
461 long pendingWriteBytes = buf.totalPendingWriteBytes();
462
463 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
464 lastMessageHashCode = messageHashCode;
465 lastPendingWriteBytes = pendingWriteBytes;
466
467 if (!first) {
468 return true;
469 }
470 }
471
472 long flushProgress = buf.currentProgress();
473 if (flushProgress != lastFlushProgress) {
474 lastFlushProgress = flushProgress;
475 return !first;
476 }
477 }
478 }
479
480 return false;
481 }
482
483 private abstract static class AbstractIdleTask implements Runnable {
484
485 private final ChannelHandlerContext ctx;
486
487 AbstractIdleTask(ChannelHandlerContext ctx) {
488 this.ctx = ctx;
489 }
490
491 @Override
492 public void run() {
493 if (!ctx.channel().isOpen()) {
494 return;
495 }
496
497 run(ctx);
498 }
499
500 protected abstract void run(ChannelHandlerContext ctx);
501 }
502
503 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
504
505 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
506 super(ctx);
507 }
508
509 @Override
510 protected void run(ChannelHandlerContext ctx) {
511 long nextDelay = readerIdleTimeNanos;
512 if (!reading) {
513 nextDelay -= ticksInNanos() - lastReadTime;
514 }
515
516 if (nextDelay <= 0) {
517 // Reader is idle - set a new timeout and notify the callback.
518 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
519
520 boolean first = firstReaderIdleEvent;
521 firstReaderIdleEvent = false;
522
523 try {
524 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
525 channelIdle(ctx, event);
526 } catch (Throwable t) {
527 ctx.fireExceptionCaught(t);
528 }
529 } else {
530 // Read occurred before the timeout - set a new timeout with shorter delay.
531 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
532 }
533 }
534 }
535
536 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
537
538 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
539 super(ctx);
540 }
541
542 @Override
543 protected void run(ChannelHandlerContext ctx) {
544
545 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
546 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
547 if (nextDelay <= 0) {
548 // Writer is idle - set a new timeout and notify the callback.
549 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
550
551 boolean first = firstWriterIdleEvent;
552 firstWriterIdleEvent = false;
553
554 try {
555 if (hasOutputChanged(ctx, first)) {
556 return;
557 }
558
559 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
560 channelIdle(ctx, event);
561 } catch (Throwable t) {
562 ctx.fireExceptionCaught(t);
563 }
564 } else {
565 // Write occurred before the timeout - set a new timeout with shorter delay.
566 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
567 }
568 }
569 }
570
571 private final class AllIdleTimeoutTask extends AbstractIdleTask {
572
573 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
574 super(ctx);
575 }
576
577 @Override
578 protected void run(ChannelHandlerContext ctx) {
579
580 long nextDelay = allIdleTimeNanos;
581 if (!reading) {
582 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
583 }
584 if (nextDelay <= 0) {
585 // Both reader and writer are idle - set a new timeout and
586 // notify the callback.
587 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
588
589 boolean first = firstAllIdleEvent;
590 firstAllIdleEvent = false;
591
592 try {
593 if (hasOutputChanged(ctx, first)) {
594 return;
595 }
596
597 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
598 channelIdle(ctx, event);
599 } catch (Throwable t) {
600 ctx.fireExceptionCaught(t);
601 }
602 } else {
603 // Either read or write occurred before the timeout - set a new
604 // timeout with shorter delay.
605 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
606 }
607 }
608 }
609 }