1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 import java.util.PriorityQueue;
21 import java.util.Queue;
22 import java.util.concurrent.Callable;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.TimeUnit;
25
26
27
28
29 public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
30
31 Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
32
33 protected static long nanoTime() {
34 return ScheduledFutureTask.nanoTime();
35 }
36
37 Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
38 if (scheduledTaskQueue == null) {
39 scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
40 }
41 return scheduledTaskQueue;
42 }
43
44 private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
45 return queue == null || queue.isEmpty();
46 }
47
48
49
50
51
52
53 protected void cancelScheduledTasks() {
54 assert inEventLoop();
55 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
56 if (isNullOrEmpty(scheduledTaskQueue)) {
57 return;
58 }
59
60 final ScheduledFutureTask<?>[] scheduledTasks =
61 scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
62
63 for (ScheduledFutureTask<?> task: scheduledTasks) {
64 task.cancelWithoutRemove(false);
65 }
66
67 scheduledTaskQueue.clear();
68 }
69
70
71
72
73 protected final Runnable pollScheduledTask() {
74 return pollScheduledTask(nanoTime());
75 }
76
77
78
79
80
81 protected final Runnable pollScheduledTask(long nanoTime) {
82 assert inEventLoop();
83
84 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
85 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
86 if (scheduledTask == null) {
87 return null;
88 }
89
90 if (scheduledTask.deadlineNanos() <= nanoTime) {
91 scheduledTaskQueue.remove();
92 return scheduledTask;
93 }
94 return null;
95 }
96
97
98
99
100 protected final long nextScheduledTaskNano() {
101 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
102 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
103 if (scheduledTask == null) {
104 return -1;
105 }
106 return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
107 }
108
109 final ScheduledFutureTask<?> peekScheduledTask() {
110 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
111 if (scheduledTaskQueue == null) {
112 return null;
113 }
114 return scheduledTaskQueue.peek();
115 }
116
117
118
119
120 protected final boolean hasScheduledTasks() {
121 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
122 ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
123 return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
124 }
125
126 @Override
127 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
128 ObjectUtil.checkNotNull(command, "command");
129 ObjectUtil.checkNotNull(unit, "unit");
130 if (delay < 0) {
131 delay = 0;
132 }
133 return schedule(new ScheduledFutureTask<Void>(
134 this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
135 }
136
137 @Override
138 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
139 ObjectUtil.checkNotNull(callable, "callable");
140 ObjectUtil.checkNotNull(unit, "unit");
141 if (delay < 0) {
142 delay = 0;
143 }
144 return schedule(new ScheduledFutureTask<V>(
145 this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
146 }
147
148 @Override
149 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
150 ObjectUtil.checkNotNull(command, "command");
151 ObjectUtil.checkNotNull(unit, "unit");
152 if (initialDelay < 0) {
153 throw new IllegalArgumentException(
154 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
155 }
156 if (period <= 0) {
157 throw new IllegalArgumentException(
158 String.format("period: %d (expected: > 0)", period));
159 }
160
161 return schedule(new ScheduledFutureTask<Void>(
162 this, Executors.<Void>callable(command, null),
163 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
164 }
165
166 @Override
167 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
168 ObjectUtil.checkNotNull(command, "command");
169 ObjectUtil.checkNotNull(unit, "unit");
170 if (initialDelay < 0) {
171 throw new IllegalArgumentException(
172 String.format("initialDelay: %d (expected: >= 0)", initialDelay));
173 }
174 if (delay <= 0) {
175 throw new IllegalArgumentException(
176 String.format("delay: %d (expected: > 0)", delay));
177 }
178
179 return schedule(new ScheduledFutureTask<Void>(
180 this, Executors.<Void>callable(command, null),
181 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
182 }
183
184 <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
185 if (inEventLoop()) {
186 scheduledTaskQueue().add(task);
187 } else {
188 execute(new Runnable() {
189 @Override
190 public void run() {
191 scheduledTaskQueue().add(task);
192 }
193 });
194 }
195
196 return task;
197 }
198
199 final void removeScheduled(final ScheduledFutureTask<?> task) {
200 if (inEventLoop()) {
201 scheduledTaskQueue().remove(task);
202 } else {
203 execute(new Runnable() {
204 @Override
205 public void run() {
206 removeScheduled(task);
207 }
208 });
209 }
210 }
211 }