查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  import java.util.PriorityQueue;
21  import java.util.Queue;
22  import java.util.concurrent.Callable;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.TimeUnit;
25  
26  /**
27   * Abstract base class for {@link EventExecutor}s that want to support scheduling.
28   */
29  public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
30  
31      Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
32  
33      protected static long nanoTime() {
34          return ScheduledFutureTask.nanoTime();
35      }
36  
37      Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
38          if (scheduledTaskQueue == null) {
39              scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
40          }
41          return scheduledTaskQueue;
42      }
43  
44      private static  boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
45          return queue == null || queue.isEmpty();
46      }
47  
48      /**
49       * Cancel all scheduled tasks.
50       *
51       * This method MUST be called only when {@link #inEventLoop()} is {@code true}.
52       */
53      protected void cancelScheduledTasks() {
54          assert inEventLoop();
55          Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
56          if (isNullOrEmpty(scheduledTaskQueue)) {
57              return;
58          }
59  
60          final ScheduledFutureTask<?>[] scheduledTasks =
61                  scheduledTaskQueue.toArray(new ScheduledFutureTask<?>[scheduledTaskQueue.size()]);
62  
63          for (ScheduledFutureTask<?> task: scheduledTasks) {
64              task.cancelWithoutRemove(false);
65          }
66  
67          scheduledTaskQueue.clear();
68      }
69  
70      /**
71       * @see #pollScheduledTask(long)
72       */
73      protected final Runnable pollScheduledTask() {
74          return pollScheduledTask(nanoTime());
75      }
76  
77      /**
78       * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
79       * You should use {@link #nanoTime()} to retrieve the the correct {@code nanoTime}.
80       */
81      protected final Runnable pollScheduledTask(long nanoTime) {
82          assert inEventLoop();
83  
84          Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
85          ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
86          if (scheduledTask == null) {
87              return null;
88          }
89  
90          if (scheduledTask.deadlineNanos() <= nanoTime) {
91              scheduledTaskQueue.remove();
92              return scheduledTask;
93          }
94          return null;
95      }
96  
97      /**
98       * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
99       */
100     protected final long nextScheduledTaskNano() {
101         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
102         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
103         if (scheduledTask == null) {
104             return -1;
105         }
106         return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
107     }
108 
109     final ScheduledFutureTask<?> peekScheduledTask() {
110         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
111         if (scheduledTaskQueue == null) {
112             return null;
113         }
114         return scheduledTaskQueue.peek();
115     }
116 
117     /**
118      * Returns {@code true} if a scheduled task is ready for processing.
119      */
120     protected final boolean hasScheduledTasks() {
121         Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
122         ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
123         return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
124     }
125 
126     @Override
127     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
128         ObjectUtil.checkNotNull(command, "command");
129         ObjectUtil.checkNotNull(unit, "unit");
130         if (delay < 0) {
131             delay = 0;
132         }
133         return schedule(new ScheduledFutureTask<Void>(
134                 this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
135     }
136 
137     @Override
138     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
139         ObjectUtil.checkNotNull(callable, "callable");
140         ObjectUtil.checkNotNull(unit, "unit");
141         if (delay < 0) {
142             delay = 0;
143         }
144         return schedule(new ScheduledFutureTask<V>(
145                 this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
146     }
147 
148     @Override
149     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
150         ObjectUtil.checkNotNull(command, "command");
151         ObjectUtil.checkNotNull(unit, "unit");
152         if (initialDelay < 0) {
153             throw new IllegalArgumentException(
154                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
155         }
156         if (period <= 0) {
157             throw new IllegalArgumentException(
158                     String.format("period: %d (expected: > 0)", period));
159         }
160 
161         return schedule(new ScheduledFutureTask<Void>(
162                 this, Executors.<Void>callable(command, null),
163                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
164     }
165 
166     @Override
167     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
168         ObjectUtil.checkNotNull(command, "command");
169         ObjectUtil.checkNotNull(unit, "unit");
170         if (initialDelay < 0) {
171             throw new IllegalArgumentException(
172                     String.format("initialDelay: %d (expected: >= 0)", initialDelay));
173         }
174         if (delay <= 0) {
175             throw new IllegalArgumentException(
176                     String.format("delay: %d (expected: > 0)", delay));
177         }
178 
179         return schedule(new ScheduledFutureTask<Void>(
180                 this, Executors.<Void>callable(command, null),
181                 ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
182     }
183 
184     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
185         if (inEventLoop()) {
186             scheduledTaskQueue().add(task);
187         } else {
188             execute(new Runnable() {
189                 @Override
190                 public void run() {
191                     scheduledTaskQueue().add(task);
192                 }
193             });
194         }
195 
196         return task;
197     }
198 
199     final void removeScheduled(final ScheduledFutureTask<?> task) {
200         if (inEventLoop()) {
201             scheduledTaskQueue().remove(task);
202         } else {
203             execute(new Runnable() {
204                 @Override
205                 public void run() {
206                     removeScheduled(task);
207                 }
208             });
209         }
210     }
211 }