1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.Channel.Unsafe;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelInitializer;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.concurrent.Future;
29 import io.netty.util.internal.ObjectUtil;
30
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public class IdleStateHandler extends ChannelDuplexHandler {
100 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
101
102
103 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
104 @Override
105 public void operationComplete(ChannelFuture future) throws Exception {
106 lastWriteTime = ticksInNanos();
107 firstWriterIdleEvent = firstAllIdleEvent = true;
108 }
109 };
110
111 private final boolean observeOutput;
112 private final long readerIdleTimeNanos;
113 private final long writerIdleTimeNanos;
114 private final long allIdleTimeNanos;
115
116 private Future<?> readerIdleTimeout;
117 private long lastReadTime;
118 private boolean firstReaderIdleEvent = true;
119
120 private Future<?> writerIdleTimeout;
121 private long lastWriteTime;
122 private boolean firstWriterIdleEvent = true;
123
124 private Future<?> allIdleTimeout;
125 private boolean firstAllIdleEvent = true;
126
127 private byte state;
128 private boolean reading;
129
130 private long lastChangeCheckTimeStamp;
131 private int lastMessageHashCode;
132 private long lastPendingWriteBytes;
133 private long lastFlushProgress;
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151 public IdleStateHandler(
152 int readerIdleTimeSeconds,
153 int writerIdleTimeSeconds,
154 int allIdleTimeSeconds) {
155
156 this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
157 TimeUnit.SECONDS);
158 }
159
160
161
162
163 public IdleStateHandler(
164 long readerIdleTime, long writerIdleTime, long allIdleTime,
165 TimeUnit unit) {
166 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 public IdleStateHandler(boolean observeOutput,
192 long readerIdleTime, long writerIdleTime, long allIdleTime,
193 TimeUnit unit) {
194 ObjectUtil.checkNotNull(unit, "unit");
195
196 this.observeOutput = observeOutput;
197
198 if (readerIdleTime <= 0) {
199 readerIdleTimeNanos = 0;
200 } else {
201 readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeNanos = 0;
205 } else {
206 writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeNanos = 0;
210 } else {
211 allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
212 }
213 }
214
215
216
217
218
219 public long getReaderIdleTimeInMillis() {
220 return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
221 }
222
223
224
225
226
227 public long getWriterIdleTimeInMillis() {
228 return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
229 }
230
231
232
233
234
235 public long getAllIdleTimeInMillis() {
236 return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
237 }
238
239 @Override
240 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
241 if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
242
243
244 initialize(ctx);
245 } else {
246
247
248 }
249 }
250
251 @Override
252 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
253 destroy();
254 }
255
256 @Override
257 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
258
259 if (ctx.channel().isActive()) {
260 initialize(ctx);
261 }
262 super.channelRegistered(ctx);
263 }
264
265 @Override
266 public void channelActive(ChannelHandlerContext ctx) throws Exception {
267
268
269
270 initialize(ctx);
271 super.channelActive(ctx);
272 }
273
274 @Override
275 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
276 destroy();
277 super.channelInactive(ctx);
278 }
279
280 @Override
281 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
282 if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
283 reading = true;
284 firstReaderIdleEvent = firstAllIdleEvent = true;
285 }
286 ctx.fireChannelRead(msg);
287 }
288
289 @Override
290 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
291 if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
292 lastReadTime = ticksInNanos();
293 reading = false;
294 }
295 ctx.fireChannelReadComplete();
296 }
297
298 @Override
299 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
300
301 if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
302 ctx.write(msg, promise.unvoid()).addListener(writeListener);
303 } else {
304 ctx.write(msg, promise);
305 }
306 }
307
308 private void initialize(ChannelHandlerContext ctx) {
309
310
311 switch (state) {
312 case 1:
313 case 2:
314 return;
315 default:
316 break;
317 }
318
319 state = 1;
320 initOutputChanged(ctx);
321
322 lastReadTime = lastWriteTime = ticksInNanos();
323 if (readerIdleTimeNanos > 0) {
324 readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
325 readerIdleTimeNanos, TimeUnit.NANOSECONDS);
326 }
327 if (writerIdleTimeNanos > 0) {
328 writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
329 writerIdleTimeNanos, TimeUnit.NANOSECONDS);
330 }
331 if (allIdleTimeNanos > 0) {
332 allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
333 allIdleTimeNanos, TimeUnit.NANOSECONDS);
334 }
335 }
336
337
338
339
340 long ticksInNanos() {
341 return System.nanoTime();
342 }
343
344
345
346
347 Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
348 return ctx.executor().schedule(task, delay, unit);
349 }
350
351 private void destroy() {
352 state = 2;
353
354 if (readerIdleTimeout != null) {
355 readerIdleTimeout.cancel(false);
356 readerIdleTimeout = null;
357 }
358 if (writerIdleTimeout != null) {
359 writerIdleTimeout.cancel(false);
360 writerIdleTimeout = null;
361 }
362 if (allIdleTimeout != null) {
363 allIdleTimeout.cancel(false);
364 allIdleTimeout = null;
365 }
366 }
367
368
369
370
371
372 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
373 ctx.fireUserEventTriggered(evt);
374 }
375
376
377
378
379 protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
380 switch (state) {
381 case ALL_IDLE:
382 return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
383 case READER_IDLE:
384 return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
385 case WRITER_IDLE:
386 return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
387 default:
388 throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
389 }
390 }
391
392
393
394
395 private void initOutputChanged(ChannelHandlerContext ctx) {
396 if (observeOutput) {
397 Channel channel = ctx.channel();
398 Unsafe unsafe = channel.unsafe();
399 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
400
401 if (buf != null) {
402 lastMessageHashCode = System.identityHashCode(buf.current());
403 lastPendingWriteBytes = buf.totalPendingWriteBytes();
404 lastFlushProgress = buf.currentProgress();
405 }
406 }
407 }
408
409
410
411
412
413
414
415
416 private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
417 if (observeOutput) {
418
419
420
421
422
423
424 if (lastChangeCheckTimeStamp != lastWriteTime) {
425 lastChangeCheckTimeStamp = lastWriteTime;
426
427
428 if (!first) {
429 return true;
430 }
431 }
432
433 Channel channel = ctx.channel();
434 Unsafe unsafe = channel.unsafe();
435 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
436
437 if (buf != null) {
438 int messageHashCode = System.identityHashCode(buf.current());
439 long pendingWriteBytes = buf.totalPendingWriteBytes();
440
441 if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
442 lastMessageHashCode = messageHashCode;
443 lastPendingWriteBytes = pendingWriteBytes;
444
445 if (!first) {
446 return true;
447 }
448 }
449
450 long flushProgress = buf.currentProgress();
451 if (flushProgress != lastFlushProgress) {
452 lastFlushProgress = flushProgress;
453
454 if (!first) {
455 return true;
456 }
457 }
458 }
459 }
460
461 return false;
462 }
463
464 private abstract static class AbstractIdleTask implements Runnable {
465
466 private final ChannelHandlerContext ctx;
467
468 AbstractIdleTask(ChannelHandlerContext ctx) {
469 this.ctx = ctx;
470 }
471
472 @Override
473 public void run() {
474 if (!ctx.channel().isOpen()) {
475 return;
476 }
477
478 run(ctx);
479 }
480
481 protected abstract void run(ChannelHandlerContext ctx);
482 }
483
484 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
485
486 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
487 super(ctx);
488 }
489
490 @Override
491 protected void run(ChannelHandlerContext ctx) {
492 long nextDelay = readerIdleTimeNanos;
493 if (!reading) {
494 nextDelay -= ticksInNanos() - lastReadTime;
495 }
496
497 if (nextDelay <= 0) {
498
499 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
500
501 boolean first = firstReaderIdleEvent;
502 firstReaderIdleEvent = false;
503
504 try {
505 IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
506 channelIdle(ctx, event);
507 } catch (Throwable t) {
508 ctx.fireExceptionCaught(t);
509 }
510 } else {
511
512 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
513 }
514 }
515 }
516
517 private final class WriterIdleTimeoutTask extends AbstractIdleTask {
518
519 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
520 super(ctx);
521 }
522
523 @Override
524 protected void run(ChannelHandlerContext ctx) {
525
526 long lastWriteTime = IdleStateHandler.this.lastWriteTime;
527 long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
528 if (nextDelay <= 0) {
529
530 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
531
532 boolean first = firstWriterIdleEvent;
533 firstWriterIdleEvent = false;
534
535 try {
536 if (hasOutputChanged(ctx, first)) {
537 return;
538 }
539
540 IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
541 channelIdle(ctx, event);
542 } catch (Throwable t) {
543 ctx.fireExceptionCaught(t);
544 }
545 } else {
546
547 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
548 }
549 }
550 }
551
552 private final class AllIdleTimeoutTask extends AbstractIdleTask {
553
554 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
555 super(ctx);
556 }
557
558 @Override
559 protected void run(ChannelHandlerContext ctx) {
560
561 long nextDelay = allIdleTimeNanos;
562 if (!reading) {
563 nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
564 }
565 if (nextDelay <= 0) {
566
567
568 allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
569
570 boolean first = firstAllIdleEvent;
571 firstAllIdleEvent = false;
572
573 try {
574 if (hasOutputChanged(ctx, first)) {
575 return;
576 }
577
578 IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
579 channelIdle(ctx, event);
580 } catch (Throwable t) {
581 ctx.fireExceptionCaught(t);
582 }
583 } else {
584
585
586 allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
587 }
588 }
589 }
590 }