1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19 import io.netty.util.internal.SystemPropertyUtil;
20 import io.netty.util.internal.ThreadExecutorMap;
21 import io.netty.util.internal.logging.InternalLogger;
22 import io.netty.util.internal.logging.InternalLoggerFactory;
23
24 import org.jetbrains.annotations.Async.Schedule;
25
26 import java.security.AccessController;
27 import java.security.PrivilegedAction;
28 import java.util.Queue;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.RejectedExecutionException;
33 import java.util.concurrent.ThreadFactory;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 /**
38 * Single-thread singleton {@link EventExecutor}. It starts the thread automatically and stops it when there is no
39 * task pending in the task queue for {@code io.netty.globalEventExecutor.quietPeriodSeconds} second
40 * (default is 1 second). Please note it is not scalable to schedule large number of tasks to this executor;
41 * use a dedicated executor.
42 */
43 public final class GlobalEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
44 private static final InternalLogger logger = InternalLoggerFactory.getInstance(GlobalEventExecutor.class);
45
46 private static final long SCHEDULE_QUIET_PERIOD_INTERVAL;
47
48 static {
49 int quietPeriod = SystemPropertyUtil.getInt("io.netty.globalEventExecutor.quietPeriodSeconds", 1);
50 if (quietPeriod <= 0) {
51 quietPeriod = 1;
52 }
53 logger.debug("-Dio.netty.globalEventExecutor.quietPeriodSeconds: {}", quietPeriod);
54
55 SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(quietPeriod);
56 }
57
58 public static final GlobalEventExecutor INSTANCE = new GlobalEventExecutor();
59
60 final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<Runnable>();
61 final ScheduledFutureTask<Void> quietPeriodTask = new ScheduledFutureTask<Void>(
62 this, Executors.<Void>callable(new Runnable() {
63 @Override
64 public void run() {
65 // NOOP
66 }
67 }, null),
68 // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method
69 // could be overridden leading to unsafe initialization here!
70 deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
71 -SCHEDULE_QUIET_PERIOD_INTERVAL
72 );
73
74 // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
75 // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
76 // be sticky about its thread group
77 // visible for testing
78 final ThreadFactory threadFactory;
79 private final TaskRunner taskRunner = new TaskRunner();
80 private final AtomicBoolean started = new AtomicBoolean();
81 volatile Thread thread;
82
83 private final Future<?> terminationFuture = new FailedFuture<Object>(this, new UnsupportedOperationException());
84
85 private GlobalEventExecutor() {
86 scheduledTaskQueue().add(quietPeriodTask);
87 threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
88 DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
89 }
90
91 /**
92 * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
93 *
94 * @return {@code null} if the executor thread has been interrupted or waken up.
95 */
96 Runnable takeTask() {
97 BlockingQueue<Runnable> taskQueue = this.taskQueue;
98 for (;;) {
99 ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
100 if (scheduledTask == null) {
101 Runnable task = null;
102 try {
103 task = taskQueue.take();
104 } catch (InterruptedException e) {
105 // Ignore
106 }
107 return task;
108 } else {
109 long delayNanos = scheduledTask.delayNanos();
110 Runnable task = null;
111 if (delayNanos > 0) {
112 try {
113 task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
114 } catch (InterruptedException e) {
115 // Waken up.
116 return null;
117 }
118 }
119 if (task == null) {
120 // We need to fetch the scheduled tasks now as otherwise there may be a chance that
121 // scheduled tasks are never executed if there is always one task in the taskQueue.
122 // This is for example true for the read task of OIO Transport
123 // See https://github.com/netty/netty/issues/1614
124 fetchFromScheduledTaskQueue();
125 task = taskQueue.poll();
126 }
127
128 if (task != null) {
129 return task;
130 }
131 }
132 }
133 }
134
135 private void fetchFromScheduledTaskQueue() {
136 long nanoTime = getCurrentTimeNanos();
137 Runnable scheduledTask = pollScheduledTask(nanoTime);
138 while (scheduledTask != null) {
139 taskQueue.add(scheduledTask);
140 scheduledTask = pollScheduledTask(nanoTime);
141 }
142 }
143
144 /**
145 * Return the number of tasks that are pending for processing.
146 */
147 public int pendingTasks() {
148 return taskQueue.size();
149 }
150
151 /**
152 * Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
153 * before.
154 */
155 private void addTask(Runnable task) {
156 taskQueue.add(ObjectUtil.checkNotNull(task, "task"));
157 }
158
159 @Override
160 public boolean inEventLoop(Thread thread) {
161 return thread == this.thread;
162 }
163
164 @Override
165 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
166 return terminationFuture();
167 }
168
169 @Override
170 public Future<?> terminationFuture() {
171 return terminationFuture;
172 }
173
174 @Override
175 @Deprecated
176 public void shutdown() {
177 throw new UnsupportedOperationException();
178 }
179
180 @Override
181 public boolean isShuttingDown() {
182 return false;
183 }
184
185 @Override
186 public boolean isShutdown() {
187 return false;
188 }
189
190 @Override
191 public boolean isTerminated() {
192 return false;
193 }
194
195 @Override
196 public boolean awaitTermination(long timeout, TimeUnit unit) {
197 return false;
198 }
199
200 /**
201 * Waits until the worker thread of this executor has no tasks left in its task queue and terminates itself.
202 * Because a new worker thread will be started again when a new task is submitted, this operation is only useful
203 * when you want to ensure that the worker thread is terminated <strong>after</strong> your application is shut
204 * down and there's no chance of submitting a new task afterwards.
205 *
206 * @return {@code true} if and only if the worker thread has been terminated
207 */
208 public boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
209 ObjectUtil.checkNotNull(unit, "unit");
210
211 final Thread thread = this.thread;
212 if (thread == null) {
213 throw new IllegalStateException("thread was not started");
214 }
215 thread.join(unit.toMillis(timeout));
216 return !thread.isAlive();
217 }
218
219 @Override
220 public void execute(Runnable task) {
221 execute0(task);
222 }
223
224 private void execute0(@Schedule Runnable task) {
225 addTask(ObjectUtil.checkNotNull(task, "task"));
226 if (!inEventLoop()) {
227 startThread();
228 }
229 }
230
231 private void startThread() {
232 if (started.compareAndSet(false, true)) {
233 final Thread t = threadFactory.newThread(taskRunner);
234 // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
235 // classloader.
236 // See:
237 // - https://github.com/netty/netty/issues/7290
238 // - https://bugs.openjdk.java.net/browse/JDK-7008595
239 AccessController.doPrivileged(new PrivilegedAction<Void>() {
240 @Override
241 public Void run() {
242 t.setContextClassLoader(null);
243 return null;
244 }
245 });
246
247 // Set the thread before starting it as otherwise inEventLoop() may return false and so produce
248 // an assert error.
249 // See https://github.com/netty/netty/issues/4357
250 thread = t;
251 t.start();
252 }
253 }
254
255 final class TaskRunner implements Runnable {
256 @Override
257 public void run() {
258 for (;;) {
259 Runnable task = takeTask();
260 if (task != null) {
261 try {
262 runTask(task);
263 } catch (Throwable t) {
264 logger.warn("Unexpected exception from the global event executor: ", t);
265 }
266
267 if (task != quietPeriodTask) {
268 continue;
269 }
270 }
271
272 Queue<ScheduledFutureTask<?>> scheduledTaskQueue = GlobalEventExecutor.this.scheduledTaskQueue;
273 // Terminate if there is no task in the queue (except the noop task).
274 if (taskQueue.isEmpty() && (scheduledTaskQueue == null || scheduledTaskQueue.size() == 1)) {
275 // Mark the current thread as stopped.
276 // The following CAS must always success and must be uncontended,
277 // because only one thread should be running at the same time.
278 boolean stopped = started.compareAndSet(true, false);
279 assert stopped;
280
281 // Check if there are pending entries added by execute() or schedule*() while we do CAS above.
282 // Do not check scheduledTaskQueue because it is not thread-safe and can only be mutated from a
283 // TaskRunner actively running tasks.
284 if (taskQueue.isEmpty()) {
285 // A) No new task was added and thus there's nothing to handle
286 // -> safe to terminate because there's nothing left to do
287 // B) A new thread started and handled all the new tasks.
288 // -> safe to terminate the new thread will take care the rest
289 break;
290 }
291
292 // There are pending tasks added again.
293 if (!started.compareAndSet(false, true)) {
294 // startThread() started a new thread and set 'started' to true.
295 // -> terminate this thread so that the new thread reads from taskQueue exclusively.
296 break;
297 }
298
299 // New tasks were added, but this worker was faster to set 'started' to true.
300 // i.e. a new worker thread was not started by startThread().
301 // -> keep this thread alive to handle the newly added entries.
302 }
303 }
304 }
305 }
306 }