1 /*
2 * Copyright 2014 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16
17 package io.netty.util;
18
19 import io.netty.util.concurrent.DefaultThreadFactory;
20 import io.netty.util.internal.StringUtil;
21 import io.netty.util.internal.SystemPropertyUtil;
22 import io.netty.util.internal.logging.InternalLogger;
23 import io.netty.util.internal.logging.InternalLoggerFactory;
24
25 import java.security.AccessController;
26 import java.security.PrivilegedAction;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Queue;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 /**
36 * Checks if a thread is alive periodically and runs a task when a thread dies.
37 * <p>
38 * This thread starts a daemon thread to check the state of the threads being watched and to invoke their
39 * associated {@link Runnable}s. When there is no thread to watch (i.e. all threads are dead), the daemon thread
40 * will terminate itself, and a new daemon thread will be started again when a new watch is added.
41 * </p>
42 *
43 * @deprecated will be removed in the next major release
44 */
45 @Deprecated
46 public final class ThreadDeathWatcher {
47
48 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadDeathWatcher.class);
49 // visible for testing
50 static final ThreadFactory threadFactory;
51
52 // Use a MPMC queue as we may end up checking isEmpty() from multiple threads which may not be allowed to do
53 // concurrently depending on the implementation of it in a MPSC queue.
54 private static final Queue<Entry> pendingEntries = new ConcurrentLinkedQueue<Entry>();
55 private static final Watcher watcher = new Watcher();
56 private static final AtomicBoolean started = new AtomicBoolean();
57 private static volatile Thread watcherThread;
58
59 static {
60 String poolName = "threadDeathWatcher";
61 String serviceThreadPrefix = SystemPropertyUtil.get("io.netty.serviceThreadPrefix");
62 if (!StringUtil.isNullOrEmpty(serviceThreadPrefix)) {
63 poolName = serviceThreadPrefix + poolName;
64 }
65 // because the ThreadDeathWatcher is a singleton, tasks submitted to it can come from arbitrary threads and
66 // this can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory
67 // must not be sticky about its thread group
68 threadFactory = new DefaultThreadFactory(poolName, true, Thread.MIN_PRIORITY, null);
69 }
70
71 /**
72 * Schedules the specified {@code task} to run when the specified {@code thread} dies.
73 *
74 * @param thread the {@link Thread} to watch
75 * @param task the {@link Runnable} to run when the {@code thread} dies
76 *
77 * @throws IllegalArgumentException if the specified {@code thread} is not alive
78 */
79 public static void watch(Thread thread, Runnable task) {
80 if (thread == null) {
81 throw new NullPointerException("thread");
82 }
83 if (task == null) {
84 throw new NullPointerException("task");
85 }
86 if (!thread.isAlive()) {
87 throw new IllegalArgumentException("thread must be alive.");
88 }
89
90 schedule(thread, task, true);
91 }
92
93 /**
94 * Cancels the task scheduled via {@link #watch(Thread, Runnable)}.
95 */
96 public static void unwatch(Thread thread, Runnable task) {
97 if (thread == null) {
98 throw new NullPointerException("thread");
99 }
100 if (task == null) {
101 throw new NullPointerException("task");
102 }
103
104 schedule(thread, task, false);
105 }
106
107 private static void schedule(Thread thread, Runnable task, boolean isWatch) {
108 pendingEntries.add(new Entry(thread, task, isWatch));
109
110 if (started.compareAndSet(false, true)) {
111 final Thread watcherThread = threadFactory.newThread(watcher);
112 // Set to null to ensure we not create classloader leaks by holds a strong reference to the inherited
113 // classloader.
114 // See:
115 // - https://github.com/netty/netty/issues/7290
116 // - https://bugs.openjdk.java.net/browse/JDK-7008595
117 AccessController.doPrivileged(new PrivilegedAction<Void>() {
118 @Override
119 public Void run() {
120 watcherThread.setContextClassLoader(null);
121 return null;
122 }
123 });
124
125 watcherThread.start();
126 ThreadDeathWatcher.watcherThread = watcherThread;
127 }
128 }
129
130 /**
131 * Waits until the thread of this watcher has no threads to watch and terminates itself.
132 * Because a new watcher thread will be started again on {@link #watch(Thread, Runnable)},
133 * this operation is only useful when you want to ensure that the watcher thread is terminated
134 * <strong>after</strong> your application is shut down and there's no chance of calling
135 * {@link #watch(Thread, Runnable)} afterwards.
136 *
137 * @return {@code true} if and only if the watcher thread has been terminated
138 */
139 public static boolean awaitInactivity(long timeout, TimeUnit unit) throws InterruptedException {
140 if (unit == null) {
141 throw new NullPointerException("unit");
142 }
143
144 Thread watcherThread = ThreadDeathWatcher.watcherThread;
145 if (watcherThread != null) {
146 watcherThread.join(unit.toMillis(timeout));
147 return !watcherThread.isAlive();
148 } else {
149 return true;
150 }
151 }
152
153 private ThreadDeathWatcher() { }
154
155 private static final class Watcher implements Runnable {
156
157 private final List<Entry> watchees = new ArrayList<Entry>();
158
159 @Override
160 public void run() {
161 for (;;) {
162 fetchWatchees();
163 notifyWatchees();
164
165 // Try once again just in case notifyWatchees() triggered watch() or unwatch().
166 fetchWatchees();
167 notifyWatchees();
168
169 try {
170 Thread.sleep(1000);
171 } catch (InterruptedException ignore) {
172 // Ignore the interrupt; do not terminate until all tasks are run.
173 }
174
175 if (watchees.isEmpty() && pendingEntries.isEmpty()) {
176
177 // Mark the current worker thread as stopped.
178 // The following CAS must always success and must be uncontended,
179 // because only one watcher thread should be running at the same time.
180 boolean stopped = started.compareAndSet(true, false);
181 assert stopped;
182
183 // Check if there are pending entries added by watch() while we do CAS above.
184 if (pendingEntries.isEmpty()) {
185 // A) watch() was not invoked and thus there's nothing to handle
186 // -> safe to terminate because there's nothing left to do
187 // B) a new watcher thread started and handled them all
188 // -> safe to terminate the new watcher thread will take care the rest
189 break;
190 }
191
192 // There are pending entries again, added by watch()
193 if (!started.compareAndSet(false, true)) {
194 // watch() started a new watcher thread and set 'started' to true.
195 // -> terminate this thread so that the new watcher reads from pendingEntries exclusively.
196 break;
197 }
198
199 // watch() added an entry, but this worker was faster to set 'started' to true.
200 // i.e. a new watcher thread was not started
201 // -> keep this thread alive to handle the newly added entries.
202 }
203 }
204 }
205
206 private void fetchWatchees() {
207 for (;;) {
208 Entry e = pendingEntries.poll();
209 if (e == null) {
210 break;
211 }
212
213 if (e.isWatch) {
214 watchees.add(e);
215 } else {
216 watchees.remove(e);
217 }
218 }
219 }
220
221 private void notifyWatchees() {
222 List<Entry> watchees = this.watchees;
223 for (int i = 0; i < watchees.size();) {
224 Entry e = watchees.get(i);
225 if (!e.thread.isAlive()) {
226 watchees.remove(i);
227 try {
228 e.task.run();
229 } catch (Throwable t) {
230 logger.warn("Thread death watcher task raised an exception:", t);
231 }
232 } else {
233 i ++;
234 }
235 }
236 }
237 }
238
239 private static final class Entry {
240 final Thread thread;
241 final Runnable task;
242 final boolean isWatch;
243
244 Entry(Thread thread, Runnable task, boolean isWatch) {
245 this.thread = thread;
246 this.task = task;
247 this.isWatch = isWatch;
248 }
249
250 @Override
251 public int hashCode() {
252 return thread.hashCode() ^ task.hashCode();
253 }
254
255 @Override
256 public boolean equals(Object obj) {
257 if (obj == this) {
258 return true;
259 }
260
261 if (!(obj instanceof Entry)) {
262 return false;
263 }
264
265 Entry that = (Entry) obj;
266 return thread == that.thread && task == that.task;
267 }
268 }
269 }