1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util;
17
18 import static io.netty.util.internal.ObjectUtil.checkInRange;
19 import static io.netty.util.internal.ObjectUtil.checkPositive;
20 import static io.netty.util.internal.ObjectUtil.checkNotNull;
21
22 import io.netty.util.concurrent.ImmediateExecutor;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.util.Collections;
29 import java.util.HashSet;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.ThreadFactory;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41 import java.util.concurrent.atomic.AtomicLong;
42
43 import static io.netty.util.internal.StringUtil.simpleClassName;
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public class HashedWheelTimer implements Timer {
87
88 static final InternalLogger logger =
89 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
90
91 private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
92 private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
93 private static final int INSTANCE_COUNT_LIMIT = 64;
94 private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
95 private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
96 .newResourceLeakDetector(HashedWheelTimer.class, 1);
97
98 private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
99 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
100
101 private final ResourceLeakTracker<HashedWheelTimer> leak;
102 private final Worker worker = new Worker();
103 private final Thread workerThread;
104
105 public static final int WORKER_STATE_INIT = 0;
106 public static final int WORKER_STATE_STARTED = 1;
107 public static final int WORKER_STATE_SHUTDOWN = 2;
108 @SuppressWarnings({"unused", "FieldMayBeFinal"})
109 private volatile int workerState;
110
111 private final long tickDuration;
112 private final HashedWheelBucket[] wheel;
113 private final int mask;
114 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
115 private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
116 private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
117 private final AtomicLong pendingTimeouts = new AtomicLong(0);
118 private final long maxPendingTimeouts;
119 private final Executor taskExecutor;
120
121 private volatile long startTime;
122
123
124
125
126
127
128 public HashedWheelTimer() {
129 this(Executors.defaultThreadFactory());
130 }
131
132
133
134
135
136
137
138
139
140
141
142 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
143 this(Executors.defaultThreadFactory(), tickDuration, unit);
144 }
145
146
147
148
149
150
151
152
153
154
155
156 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
157 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
158 }
159
160
161
162
163
164
165
166
167
168
169 public HashedWheelTimer(ThreadFactory threadFactory) {
170 this(threadFactory, 100, TimeUnit.MILLISECONDS);
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184 public HashedWheelTimer(
185 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
186 this(threadFactory, tickDuration, unit, 512);
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 public HashedWheelTimer(
202 ThreadFactory threadFactory,
203 long tickDuration, TimeUnit unit, int ticksPerWheel) {
204 this(threadFactory, tickDuration, unit, ticksPerWheel, true);
205 }
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222 public HashedWheelTimer(
223 ThreadFactory threadFactory,
224 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
225 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
226 }
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 public HashedWheelTimer(
249 ThreadFactory threadFactory,
250 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
251 long maxPendingTimeouts) {
252 this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection,
253 maxPendingTimeouts, ImmediateExecutor.INSTANCE);
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278 public HashedWheelTimer(
279 ThreadFactory threadFactory,
280 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
281 long maxPendingTimeouts, Executor taskExecutor) {
282
283 checkNotNull(threadFactory, "threadFactory");
284 checkNotNull(unit, "unit");
285 checkPositive(tickDuration, "tickDuration");
286 checkPositive(ticksPerWheel, "ticksPerWheel");
287 this.taskExecutor = checkNotNull(taskExecutor, "taskExecutor");
288
289
290 wheel = createWheel(ticksPerWheel);
291 mask = wheel.length - 1;
292
293
294 long duration = unit.toNanos(tickDuration);
295
296
297 if (duration >= Long.MAX_VALUE / wheel.length) {
298 throw new IllegalArgumentException(String.format(
299 "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
300 tickDuration, Long.MAX_VALUE / wheel.length));
301 }
302
303 if (duration < MILLISECOND_NANOS) {
304 logger.warn("Configured tickDuration {} smaller than {}, using 1ms.",
305 tickDuration, MILLISECOND_NANOS);
306 this.tickDuration = MILLISECOND_NANOS;
307 } else {
308 this.tickDuration = duration;
309 }
310
311 workerThread = threadFactory.newThread(worker);
312
313 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
314
315 this.maxPendingTimeouts = maxPendingTimeouts;
316
317 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
318 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
319 reportTooManyInstances();
320 }
321 }
322
323 @Override
324 protected void finalize() throws Throwable {
325 try {
326 super.finalize();
327 } finally {
328
329
330 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
331 INSTANCE_COUNTER.decrementAndGet();
332 }
333 }
334 }
335
336 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
337 ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
338
339 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
340 for (int i = 0; i < wheel.length; i ++) {
341 wheel[i] = new HashedWheelBucket();
342 }
343 return wheel;
344 }
345
346
347
348
349
350
351
352
353 public void start() {
354 switch (WORKER_STATE_UPDATER.get(this)) {
355 case WORKER_STATE_INIT:
356 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
357 workerThread.start();
358 }
359 break;
360 case WORKER_STATE_STARTED:
361 break;
362 case WORKER_STATE_SHUTDOWN:
363 throw new IllegalStateException("cannot be started once stopped");
364 default:
365 throw new Error("Invalid WorkerState");
366 }
367
368
369 while (startTime == 0) {
370 try {
371 startTimeInitialized.await();
372 } catch (InterruptedException ignore) {
373
374 }
375 }
376 }
377
378 @Override
379 public Set<Timeout> stop() {
380 if (Thread.currentThread() == workerThread) {
381 throw new IllegalStateException(
382 HashedWheelTimer.class.getSimpleName() +
383 ".stop() cannot be called from " +
384 TimerTask.class.getSimpleName());
385 }
386
387 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
388
389 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
390 INSTANCE_COUNTER.decrementAndGet();
391 if (leak != null) {
392 boolean closed = leak.close(this);
393 assert closed;
394 }
395 }
396
397 return Collections.emptySet();
398 }
399
400 try {
401 boolean interrupted = false;
402 while (workerThread.isAlive()) {
403 workerThread.interrupt();
404 try {
405 workerThread.join(100);
406 } catch (InterruptedException ignored) {
407 interrupted = true;
408 }
409 }
410
411 if (interrupted) {
412 Thread.currentThread().interrupt();
413 }
414 } finally {
415 INSTANCE_COUNTER.decrementAndGet();
416 if (leak != null) {
417 boolean closed = leak.close(this);
418 assert closed;
419 }
420 }
421 return worker.unprocessedTimeouts();
422 }
423
424 @Override
425 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
426 checkNotNull(task, "task");
427 checkNotNull(unit, "unit");
428
429 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
430
431 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
432 pendingTimeouts.decrementAndGet();
433 throw new RejectedExecutionException("Number of pending timeouts ("
434 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
435 + "timeouts (" + maxPendingTimeouts + ")");
436 }
437
438 start();
439
440
441
442 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
443
444
445 if (delay > 0 && deadline < 0) {
446 deadline = Long.MAX_VALUE;
447 }
448 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
449 timeouts.add(timeout);
450 return timeout;
451 }
452
453
454
455
456 public long pendingTimeouts() {
457 return pendingTimeouts.get();
458 }
459
460 private static void reportTooManyInstances() {
461 if (logger.isErrorEnabled()) {
462 String resourceType = simpleClassName(HashedWheelTimer.class);
463 logger.error("You are creating too many " + resourceType + " instances. " +
464 resourceType + " is a shared resource that must be reused across the JVM, " +
465 "so that only a few instances are created.");
466 }
467 }
468
469 private final class Worker implements Runnable {
470 private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
471
472 private long tick;
473
474 @Override
475 public void run() {
476
477 startTime = System.nanoTime();
478 if (startTime == 0) {
479
480 startTime = 1;
481 }
482
483
484 startTimeInitialized.countDown();
485
486 do {
487 final long deadline = waitForNextTick();
488 if (deadline > 0) {
489 int idx = (int) (tick & mask);
490 processCancelledTasks();
491 HashedWheelBucket bucket =
492 wheel[idx];
493 transferTimeoutsToBuckets();
494 bucket.expireTimeouts(deadline);
495 tick++;
496 }
497 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
498
499
500 for (HashedWheelBucket bucket: wheel) {
501 bucket.clearTimeouts(unprocessedTimeouts);
502 }
503 for (;;) {
504 HashedWheelTimeout timeout = timeouts.poll();
505 if (timeout == null) {
506 break;
507 }
508 if (!timeout.isCancelled()) {
509 unprocessedTimeouts.add(timeout);
510 }
511 }
512 processCancelledTasks();
513 }
514
515 private void transferTimeoutsToBuckets() {
516
517
518 for (int i = 0; i < 100000; i++) {
519 HashedWheelTimeout timeout = timeouts.poll();
520 if (timeout == null) {
521
522 break;
523 }
524 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
525
526 continue;
527 }
528
529 long calculated = timeout.deadline / tickDuration;
530 timeout.remainingRounds = (calculated - tick) / wheel.length;
531
532 final long ticks = Math.max(calculated, tick);
533 int stopIndex = (int) (ticks & mask);
534
535 HashedWheelBucket bucket = wheel[stopIndex];
536 bucket.addTimeout(timeout);
537 }
538 }
539
540 private void processCancelledTasks() {
541 for (;;) {
542 HashedWheelTimeout timeout = cancelledTimeouts.poll();
543 if (timeout == null) {
544
545 break;
546 }
547 try {
548 timeout.remove();
549 } catch (Throwable t) {
550 if (logger.isWarnEnabled()) {
551 logger.warn("An exception was thrown while process a cancellation task", t);
552 }
553 }
554 }
555 }
556
557
558
559
560
561
562
563 private long waitForNextTick() {
564 long deadline = tickDuration * (tick + 1);
565
566 for (;;) {
567 final long currentTime = System.nanoTime() - startTime;
568 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
569
570 if (sleepTimeMs <= 0) {
571 if (currentTime == Long.MIN_VALUE) {
572 return -Long.MAX_VALUE;
573 } else {
574 return currentTime;
575 }
576 }
577
578
579
580
581
582
583 if (PlatformDependent.isWindows()) {
584 sleepTimeMs = sleepTimeMs / 10 * 10;
585 if (sleepTimeMs == 0) {
586 sleepTimeMs = 1;
587 }
588 }
589
590 try {
591 Thread.sleep(sleepTimeMs);
592 } catch (InterruptedException ignored) {
593 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
594 return Long.MIN_VALUE;
595 }
596 }
597 }
598 }
599
600 public Set<Timeout> unprocessedTimeouts() {
601 return Collections.unmodifiableSet(unprocessedTimeouts);
602 }
603 }
604
605 private static final class HashedWheelTimeout implements Timeout, Runnable {
606
607 private static final int ST_INIT = 0;
608 private static final int ST_CANCELLED = 1;
609 private static final int ST_EXPIRED = 2;
610 private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
611 AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
612
613 private final HashedWheelTimer timer;
614 private final TimerTask task;
615 private final long deadline;
616
617 @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
618 private volatile int state = ST_INIT;
619
620
621
622 long remainingRounds;
623
624
625
626 HashedWheelTimeout next;
627 HashedWheelTimeout prev;
628
629
630 HashedWheelBucket bucket;
631
632 HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
633 this.timer = timer;
634 this.task = task;
635 this.deadline = deadline;
636 }
637
638 @Override
639 public Timer timer() {
640 return timer;
641 }
642
643 @Override
644 public TimerTask task() {
645 return task;
646 }
647
648 @Override
649 public boolean cancel() {
650
651 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
652 return false;
653 }
654
655
656
657 timer.cancelledTimeouts.add(this);
658 return true;
659 }
660
661 void remove() {
662 HashedWheelBucket bucket = this.bucket;
663 if (bucket != null) {
664 bucket.remove(this);
665 } else {
666 timer.pendingTimeouts.decrementAndGet();
667 }
668 }
669
670 public boolean compareAndSetState(int expected, int state) {
671 return STATE_UPDATER.compareAndSet(this, expected, state);
672 }
673
674 public int state() {
675 return state;
676 }
677
678 @Override
679 public boolean isCancelled() {
680 return state() == ST_CANCELLED;
681 }
682
683 @Override
684 public boolean isExpired() {
685 return state() == ST_EXPIRED;
686 }
687
688 public void expire() {
689 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
690 return;
691 }
692
693 try {
694 timer.taskExecutor.execute(this);
695 } catch (Throwable t) {
696 if (logger.isWarnEnabled()) {
697 logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
698 + " for execution.", t);
699 }
700 }
701 }
702
703 @Override
704 public void run() {
705 try {
706 task.run(this);
707 } catch (Throwable t) {
708 if (logger.isWarnEnabled()) {
709 logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
710 }
711 }
712 }
713
714 @Override
715 public String toString() {
716 final long currentTime = System.nanoTime();
717 long remaining = deadline - currentTime + timer.startTime;
718
719 StringBuilder buf = new StringBuilder(192)
720 .append(simpleClassName(this))
721 .append('(')
722 .append("deadline: ");
723 if (remaining > 0) {
724 buf.append(remaining)
725 .append(" ns later");
726 } else if (remaining < 0) {
727 buf.append(-remaining)
728 .append(" ns ago");
729 } else {
730 buf.append("now");
731 }
732
733 if (isCancelled()) {
734 buf.append(", cancelled");
735 }
736
737 return buf.append(", task: ")
738 .append(task())
739 .append(')')
740 .toString();
741 }
742 }
743
744
745
746
747
748
749 private static final class HashedWheelBucket {
750
751 private HashedWheelTimeout head;
752 private HashedWheelTimeout tail;
753
754
755
756
757 public void addTimeout(HashedWheelTimeout timeout) {
758 assert timeout.bucket == null;
759 timeout.bucket = this;
760 if (head == null) {
761 head = tail = timeout;
762 } else {
763 tail.next = timeout;
764 timeout.prev = tail;
765 tail = timeout;
766 }
767 }
768
769
770
771
772 public void expireTimeouts(long deadline) {
773 HashedWheelTimeout timeout = head;
774
775
776 while (timeout != null) {
777 HashedWheelTimeout next = timeout.next;
778 if (timeout.remainingRounds <= 0) {
779 next = remove(timeout);
780 if (timeout.deadline <= deadline) {
781 timeout.expire();
782 } else {
783
784 throw new IllegalStateException(String.format(
785 "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
786 }
787 } else if (timeout.isCancelled()) {
788 next = remove(timeout);
789 } else {
790 timeout.remainingRounds --;
791 }
792 timeout = next;
793 }
794 }
795
796 public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
797 HashedWheelTimeout next = timeout.next;
798
799 if (timeout.prev != null) {
800 timeout.prev.next = next;
801 }
802 if (timeout.next != null) {
803 timeout.next.prev = timeout.prev;
804 }
805
806 if (timeout == head) {
807
808 if (timeout == tail) {
809 tail = null;
810 head = null;
811 } else {
812 head = next;
813 }
814 } else if (timeout == tail) {
815
816 tail = timeout.prev;
817 }
818
819 timeout.prev = null;
820 timeout.next = null;
821 timeout.bucket = null;
822 timeout.timer.pendingTimeouts.decrementAndGet();
823 return next;
824 }
825
826
827
828
829 public void clearTimeouts(Set<Timeout> set) {
830 for (;;) {
831 HashedWheelTimeout timeout = pollTimeout();
832 if (timeout == null) {
833 return;
834 }
835 if (timeout.isExpired() || timeout.isCancelled()) {
836 continue;
837 }
838 set.add(timeout);
839 }
840 }
841
842 private HashedWheelTimeout pollTimeout() {
843 HashedWheelTimeout head = this.head;
844 if (head == null) {
845 return null;
846 }
847 HashedWheelTimeout next = head.next;
848 if (next == null) {
849 tail = this.head = null;
850 } else {
851 this.head = next;
852 next.prev = null;
853 }
854
855
856 head.next = null;
857 head.prev = null;
858 head.bucket = null;
859 return head;
860 }
861 }
862 }