查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.timeout;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.Channel.Unsafe;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInitializer;
26  import io.netty.channel.ChannelOutboundBuffer;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.util.concurrent.Future;
29  import io.netty.util.internal.ObjectUtil;
30  
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
35   * read, write, or both operation for a while.
36   *
37   * <h3>Supported idle states</h3>
38   * <table border="1">
39   * <tr>
40   * <th>Property</th><th>Meaning</th>
41   * </tr>
42   * <tr>
43   * <td>{@code readerIdleTime}</td>
44   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
45   *     will be triggered when no read was performed for the specified period of
46   *     time.  Specify {@code 0} to disable.</td>
47   * </tr>
48   * <tr>
49   * <td>{@code writerIdleTime}</td>
50   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
51   *     will be triggered when no write was performed for the specified period of
52   *     time.  Specify {@code 0} to disable.</td>
53   * </tr>
54   * <tr>
55   * <td>{@code allIdleTime}</td>
56   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
57   *     will be triggered when neither read nor write was performed for the
58   *     specified period of time.  Specify {@code 0} to disable.</td>
59   * </tr>
60   * </table>
61   *
62   * <pre>
63   * // An example that sends a ping message when there is no outbound traffic
64   * // for 30 seconds.  The connection is closed when there is no inbound traffic
65   * // for 60 seconds.
66   *
67   * public class MyChannelInitializer extends {@link ChannelInitializer}&lt;{@link Channel}&gt; {
68   *     {@code @Override}
69   *     public void initChannel({@link Channel} channel) {
70   *         channel.pipeline().addLast("idleStateHandler", new {@link IdleStateHandler}(60, 30, 0));
71   *         channel.pipeline().addLast("myHandler", new MyHandler());
72   *     }
73   * }
74   *
75   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
76   * public class MyHandler extends {@link ChannelDuplexHandler} {
77   *     {@code @Override}
78   *     public void userEventTriggered({@link ChannelHandlerContext} ctx, {@link Object} evt) throws {@link Exception} {
79   *         if (evt instanceof {@link IdleStateEvent}) {
80   *             {@link IdleStateEvent} e = ({@link IdleStateEvent}) evt;
81   *             if (e.state() == {@link IdleState}.READER_IDLE) {
82   *                 ctx.close();
83   *             } else if (e.state() == {@link IdleState}.WRITER_IDLE) {
84   *                 ctx.writeAndFlush(new PingMessage());
85   *             }
86   *         }
87   *     }
88   * }
89   *
90   * {@link ServerBootstrap} bootstrap = ...;
91   * ...
92   * bootstrap.childHandler(new MyChannelInitializer());
93   * ...
94   * </pre>
95   *
96   * @see ReadTimeoutHandler
97   * @see WriteTimeoutHandler
98   */
99  public class IdleStateHandler extends ChannelDuplexHandler {
100     private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101 
102     // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
103     private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104         @Override
105         public void operationComplete(ChannelFuture future) throws Exception {
106             lastWriteTime = ticksInNanos();
107             firstWriterIdleEvent = firstAllIdleEvent = true;
108         }
109     };
110 
111     private final boolean observeOutput;
112     private final long readerIdleTimeNanos;
113     private final long writerIdleTimeNanos;
114     private final long allIdleTimeNanos;
115 
116     private Future<?> readerIdleTimeout;
117     private long lastReadTime;
118     private boolean firstReaderIdleEvent = true;
119 
120     private Future<?> writerIdleTimeout;
121     private long lastWriteTime;
122     private boolean firstWriterIdleEvent = true;
123 
124     private Future<?> allIdleTimeout;
125     private boolean firstAllIdleEvent = true;
126 
127     private byte state;
128     private static final byte ST_INITIALIZED = 1;
129     private static final byte ST_DESTROYED = 2;
130 
131     private boolean reading;
132 
133     private long lastChangeCheckTimeStamp;
134     private int lastMessageHashCode;
135     private long lastPendingWriteBytes;
136     private long lastFlushProgress;
137 
138     /**
139      * Creates a new instance firing {@link IdleStateEvent}s.
140      *
141      * @param readerIdleTimeSeconds
142      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
143      *        will be triggered when no read was performed for the specified
144      *        period of time.  Specify {@code 0} to disable.
145      * @param writerIdleTimeSeconds
146      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
147      *        will be triggered when no write was performed for the specified
148      *        period of time.  Specify {@code 0} to disable.
149      * @param allIdleTimeSeconds
150      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
151      *        will be triggered when neither read nor write was performed for
152      *        the specified period of time.  Specify {@code 0} to disable.
153      */
154     public IdleStateHandler(
155             int readerIdleTimeSeconds,
156             int writerIdleTimeSeconds,
157             int allIdleTimeSeconds) {
158 
159         this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160              TimeUnit.SECONDS);
161     }
162 
163     /**
164      * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
165      */
166     public IdleStateHandler(
167             long readerIdleTime, long writerIdleTime, long allIdleTime,
168             TimeUnit unit) {
169         this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
170     }
171 
172     /**
173      * Creates a new instance firing {@link IdleStateEvent}s.
174      *
175      * @param observeOutput
176      *        whether or not the consumption of {@code bytes} should be taken into
177      *        consideration when assessing write idleness. The default is {@code false}.
178      * @param readerIdleTime
179      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
180      *        will be triggered when no read was performed for the specified
181      *        period of time.  Specify {@code 0} to disable.
182      * @param writerIdleTime
183      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
184      *        will be triggered when no write was performed for the specified
185      *        period of time.  Specify {@code 0} to disable.
186      * @param allIdleTime
187      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
188      *        will be triggered when neither read nor write was performed for
189      *        the specified period of time.  Specify {@code 0} to disable.
190      * @param unit
191      *        the {@link TimeUnit} of {@code readerIdleTime},
192      *        {@code writeIdleTime}, and {@code allIdleTime}
193      */
194     public IdleStateHandler(boolean observeOutput,
195             long readerIdleTime, long writerIdleTime, long allIdleTime,
196             TimeUnit unit) {
197         ObjectUtil.checkNotNull(unit, "unit");
198 
199         this.observeOutput = observeOutput;
200 
201         if (readerIdleTime <= 0) {
202             readerIdleTimeNanos = 0;
203         } else {
204             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
205         }
206         if (writerIdleTime <= 0) {
207             writerIdleTimeNanos = 0;
208         } else {
209             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
210         }
211         if (allIdleTime <= 0) {
212             allIdleTimeNanos = 0;
213         } else {
214             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
215         }
216     }
217 
218     /**
219      * Return the readerIdleTime that was given when instance this class in milliseconds.
220      *
221      */
222     public long getReaderIdleTimeInMillis() {
223         return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
224     }
225 
226     /**
227      * Return the writerIdleTime that was given when instance this class in milliseconds.
228      *
229      */
230     public long getWriterIdleTimeInMillis() {
231         return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
232     }
233 
234     /**
235      * Return the allIdleTime that was given when instance this class in milliseconds.
236      *
237      */
238     public long getAllIdleTimeInMillis() {
239         return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
240     }
241 
242     @Override
243     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
244         if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
245             // channelActive() event has been fired already, which means this.channelActive() will
246             // not be invoked. We have to initialize here instead.
247             initialize(ctx);
248         } else {
249             // channelActive() event has not been fired yet.  this.channelActive() will be invoked
250             // and initialization will occur there.
251         }
252     }
253 
254     @Override
255     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
256         destroy();
257     }
258 
259     @Override
260     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
261         // Initialize early if channel is active already.
262         if (ctx.channel().isActive()) {
263             initialize(ctx);
264         }
265         super.channelRegistered(ctx);
266     }
267 
268     @Override
269     public void channelActive(ChannelHandlerContext ctx) throws Exception {
270         // This method will be invoked only if this handler was added
271         // before channelActive() event is fired.  If a user adds this handler
272         // after the channelActive() event, initialize() will be called by beforeAdd().
273         initialize(ctx);
274         super.channelActive(ctx);
275     }
276 
277     @Override
278     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
279         destroy();
280         super.channelInactive(ctx);
281     }
282 
283     @Override
284     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
285         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
286             reading = true;
287             firstReaderIdleEvent = firstAllIdleEvent = true;
288         }
289         ctx.fireChannelRead(msg);
290     }
291 
292     @Override
293     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
294         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
295             lastReadTime = ticksInNanos();
296             reading = false;
297         }
298         ctx.fireChannelReadComplete();
299     }
300 
301     @Override
302     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
303         // Allow writing with void promise if handler is only configured for read timeout events.
304         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
305             ctx.write(msg, promise.unvoid()).addListener(writeListener);
306         } else {
307             ctx.write(msg, promise);
308         }
309     }
310 
311     /**
312      * Reset the read timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
313      */
314     public void resetReadTimeout() {
315         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
316             lastReadTime = ticksInNanos();
317             reading = false;
318         }
319     }
320 
321     /**
322      * Reset the write timeout. As this handler is not thread-safe, this method <b>must</b> be called on the event loop.
323      */
324     public void resetWriteTimeout() {
325         if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
326             lastWriteTime = ticksInNanos();
327         }
328     }
329 
330     private void initialize(ChannelHandlerContext ctx) {
331         // Avoid the case where destroy() is called before scheduling timeouts.
332         // See: https://github.com/netty/netty/issues/143
333         switch (state) {
334         case 1:
335         case 2:
336             return;
337         default:
338              break;
339         }
340 
341         state = ST_INITIALIZED;
342         initOutputChanged(ctx);
343 
344         lastReadTime = lastWriteTime = ticksInNanos();
345         if (readerIdleTimeNanos > 0) {
346             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
347                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
348         }
349         if (writerIdleTimeNanos > 0) {
350             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
351                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
352         }
353         if (allIdleTimeNanos > 0) {
354             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
355                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
356         }
357     }
358 
359     /**
360      * This method is visible for testing!
361      */
362     long ticksInNanos() {
363         return System.nanoTime();
364     }
365 
366     /**
367      * This method is visible for testing!
368      */
369     Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
370         return ctx.executor().schedule(task, delay, unit);
371     }
372 
373     private void destroy() {
374         state = ST_DESTROYED;
375 
376         if (readerIdleTimeout != null) {
377             readerIdleTimeout.cancel(false);
378             readerIdleTimeout = null;
379         }
380         if (writerIdleTimeout != null) {
381             writerIdleTimeout.cancel(false);
382             writerIdleTimeout = null;
383         }
384         if (allIdleTimeout != null) {
385             allIdleTimeout.cancel(false);
386             allIdleTimeout = null;
387         }
388     }
389 
390     /**
391      * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
392      * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
393      */
394     protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
395         ctx.fireUserEventTriggered(evt);
396     }
397 
398     /**
399      * Returns a {@link IdleStateEvent}.
400      */
401     protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
402         switch (state) {
403             case ALL_IDLE:
404                 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
405             case READER_IDLE:
406                 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
407             case WRITER_IDLE:
408                 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
409             default:
410                 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
411         }
412     }
413 
414     /**
415      * @see #hasOutputChanged(ChannelHandlerContext, boolean)
416      */
417     private void initOutputChanged(ChannelHandlerContext ctx) {
418         if (observeOutput) {
419             Channel channel = ctx.channel();
420             Unsafe unsafe = channel.unsafe();
421             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
422 
423             if (buf != null) {
424                 lastMessageHashCode = System.identityHashCode(buf.current());
425                 lastPendingWriteBytes = buf.totalPendingWriteBytes();
426                 lastFlushProgress = buf.currentProgress();
427             }
428         }
429     }
430 
431     /**
432      * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
433      * with {@link #observeOutput} enabled and there has been an observed change in the
434      * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
435      *
436      * https://github.com/netty/netty/issues/6150
437      */
438     private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
439         if (observeOutput) {
440 
441             // We can take this shortcut if the ChannelPromises that got passed into write()
442             // appear to complete. It indicates "change" on message level and we simply assume
443             // that there's change happening on byte level. If the user doesn't observe channel
444             // writability events then they'll eventually OOME and there's clearly a different
445             // problem and idleness is least of their concerns.
446             if (lastChangeCheckTimeStamp != lastWriteTime) {
447                 lastChangeCheckTimeStamp = lastWriteTime;
448 
449                 // But this applies only if it's the non-first call.
450                 if (!first) {
451                     return true;
452                 }
453             }
454 
455             Channel channel = ctx.channel();
456             Unsafe unsafe = channel.unsafe();
457             ChannelOutboundBuffer buf = unsafe.outboundBuffer();
458 
459             if (buf != null) {
460                 int messageHashCode = System.identityHashCode(buf.current());
461                 long pendingWriteBytes = buf.totalPendingWriteBytes();
462 
463                 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
464                     lastMessageHashCode = messageHashCode;
465                     lastPendingWriteBytes = pendingWriteBytes;
466 
467                     if (!first) {
468                         return true;
469                     }
470                 }
471 
472                 long flushProgress = buf.currentProgress();
473                 if (flushProgress != lastFlushProgress) {
474                     lastFlushProgress = flushProgress;
475                     return !first;
476                 }
477             }
478         }
479 
480         return false;
481     }
482 
483     private abstract static class AbstractIdleTask implements Runnable {
484 
485         private final ChannelHandlerContext ctx;
486 
487         AbstractIdleTask(ChannelHandlerContext ctx) {
488             this.ctx = ctx;
489         }
490 
491         @Override
492         public void run() {
493             if (!ctx.channel().isOpen()) {
494                 return;
495             }
496 
497             run(ctx);
498         }
499 
500         protected abstract void run(ChannelHandlerContext ctx);
501     }
502 
503     private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
504 
505         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
506             super(ctx);
507         }
508 
509         @Override
510         protected void run(ChannelHandlerContext ctx) {
511             long nextDelay = readerIdleTimeNanos;
512             if (!reading) {
513                 nextDelay -= ticksInNanos() - lastReadTime;
514             }
515 
516             if (nextDelay <= 0) {
517                 // Reader is idle - set a new timeout and notify the callback.
518                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
519 
520                 boolean first = firstReaderIdleEvent;
521                 firstReaderIdleEvent = false;
522 
523                 try {
524                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
525                     channelIdle(ctx, event);
526                 } catch (Throwable t) {
527                     ctx.fireExceptionCaught(t);
528                 }
529             } else {
530                 // Read occurred before the timeout - set a new timeout with shorter delay.
531                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
532             }
533         }
534     }
535 
536     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
537 
538         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
539             super(ctx);
540         }
541 
542         @Override
543         protected void run(ChannelHandlerContext ctx) {
544 
545             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
546             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
547             if (nextDelay <= 0) {
548                 // Writer is idle - set a new timeout and notify the callback.
549                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
550 
551                 boolean first = firstWriterIdleEvent;
552                 firstWriterIdleEvent = false;
553 
554                 try {
555                     if (hasOutputChanged(ctx, first)) {
556                         return;
557                     }
558 
559                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
560                     channelIdle(ctx, event);
561                 } catch (Throwable t) {
562                     ctx.fireExceptionCaught(t);
563                 }
564             } else {
565                 // Write occurred before the timeout - set a new timeout with shorter delay.
566                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
567             }
568         }
569     }
570 
571     private final class AllIdleTimeoutTask extends AbstractIdleTask {
572 
573         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
574             super(ctx);
575         }
576 
577         @Override
578         protected void run(ChannelHandlerContext ctx) {
579 
580             long nextDelay = allIdleTimeNanos;
581             if (!reading) {
582                 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
583             }
584             if (nextDelay <= 0) {
585                 // Both reader and writer are idle - set a new timeout and
586                 // notify the callback.
587                 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
588 
589                 boolean first = firstAllIdleEvent;
590                 firstAllIdleEvent = false;
591 
592                 try {
593                     if (hasOutputChanged(ctx, first)) {
594                         return;
595                     }
596 
597                     IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
598                     channelIdle(ctx, event);
599                 } catch (Throwable t) {
600                     ctx.fireExceptionCaught(t);
601                 }
602             } else {
603                 // Either read or write occurred before the timeout - set a new
604                 // timeout with shorter delay.
605                 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
606             }
607         }
608     }
609 }