HBASE-19497 Fix findbugs and error-prone warnings in hbase-common (branch-2)
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / Threads.java
blob4e2f09f611dce2a0439f9b5f7105055f6c6db659
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.OutputStreamWriter;
22 import java.io.PrintStream;
23 import java.io.PrintWriter;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.reflect.InvocationTargetException;
26 import java.lang.reflect.Method;
27 import java.nio.charset.StandardCharsets;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.util.ReflectionUtils;
37 import org.apache.hadoop.util.StringUtils;
38 import org.apache.yetus.audience.InterfaceAudience;
40 import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
42 /**
43 * Thread Utility
45 @InterfaceAudience.Private
46 public class Threads {
47 private static final Log LOG = LogFactory.getLog(Threads.class);
48 private static final AtomicInteger poolNumber = new AtomicInteger(1);
50 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
51 new UncaughtExceptionHandler() {
52 @Override
53 public void uncaughtException(Thread t, Throwable e) {
54 LOG.warn("Thread:" + t + " exited with Exception:"
55 + StringUtils.stringifyException(e));
59 /**
60 * Utility method that sets name, daemon status and starts passed thread.
61 * @param t thread to run
62 * @return Returns the passed Thread <code>t</code>.
64 public static <T extends Thread> T setDaemonThreadRunning(T t) {
65 return setDaemonThreadRunning(t, t.getName());
68 /**
69 * Utility method that sets name, daemon status and starts passed thread.
70 * @param t thread to frob
71 * @param name new name
72 * @return Returns the passed Thread <code>t</code>.
74 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
75 return setDaemonThreadRunning(t, name, null);
78 /**
79 * Utility method that sets name, daemon status and starts passed thread.
80 * @param t thread to frob
81 * @param name new name
82 * @param handler A handler to set on the thread. Pass null if want to use default handler.
83 * @return Returns the passed Thread <code>t</code>.
85 public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
86 UncaughtExceptionHandler handler) {
87 t.setName(name);
88 if (handler != null) {
89 t.setUncaughtExceptionHandler(handler);
91 t.setDaemon(true);
92 t.start();
93 return t;
96 /**
97 * Shutdown passed thread using isAlive and join.
98 * @param t Thread to shutdown
100 public static void shutdown(final Thread t) {
101 shutdown(t, 0);
105 * Shutdown passed thread using isAlive and join.
106 * @param joinwait Pass 0 if we're to wait forever.
107 * @param t Thread to shutdown
109 public static void shutdown(final Thread t, final long joinwait) {
110 if (t == null) return;
111 while (t.isAlive()) {
112 try {
113 t.join(joinwait);
114 } catch (InterruptedException e) {
115 LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
122 * @param t Waits on the passed thread to die dumping a threaddump every
123 * minute while its up.
124 * @throws InterruptedException
126 public static void threadDumpingIsAlive(final Thread t)
127 throws InterruptedException {
128 if (t == null) {
129 return;
132 while (t.isAlive()) {
133 t.join(60 * 1000);
134 if (t.isAlive()) {
135 printThreadInfo(System.out,
136 "Automatic Stack Trace every 60 seconds waiting on " +
137 t.getName());
143 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144 * @param millis How long to sleep for in milliseconds.
146 public static void sleep(long millis) {
147 try {
148 Thread.sleep(millis);
149 } catch (InterruptedException e) {
150 LOG.warn("sleep interrupted", e);
151 Thread.currentThread().interrupt();
156 * Sleeps for the given amount of time even if interrupted. Preserves
157 * the interrupt status.
158 * @param msToWait the amount of time to sleep in milliseconds
160 public static void sleepWithoutInterrupt(final long msToWait) {
161 long timeMillis = System.currentTimeMillis();
162 long endTime = timeMillis + msToWait;
163 boolean interrupted = false;
164 while (timeMillis < endTime) {
165 try {
166 Thread.sleep(endTime - timeMillis);
167 } catch (InterruptedException ex) {
168 interrupted = true;
170 timeMillis = System.currentTimeMillis();
173 if (interrupted) {
174 Thread.currentThread().interrupt();
179 * Create a new CachedThreadPool with a bounded number as the maximum
180 * thread size in the pool.
182 * @param maxCachedThread the maximum thread could be created in the pool
183 * @param timeout the maximum time to wait
184 * @param unit the time unit of the timeout argument
185 * @param threadFactory the factory to use when creating new threads
186 * @return threadPoolExecutor the cachedThreadPool with a bounded number
187 * as the maximum thread size in the pool.
189 public static ThreadPoolExecutor getBoundedCachedThreadPool(
190 int maxCachedThread, long timeout, TimeUnit unit,
191 ThreadFactory threadFactory) {
192 ThreadPoolExecutor boundedCachedThreadPool =
193 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
194 unit, new LinkedBlockingQueue<>(), threadFactory);
195 // allow the core pool threads timeout and terminate
196 boundedCachedThreadPool.allowCoreThreadTimeOut(true);
197 return boundedCachedThreadPool;
202 * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
203 * with a common prefix.
204 * @param prefix The prefix of every created Thread's name
205 * @return a {@link java.util.concurrent.ThreadFactory} that names threads
207 public static ThreadFactory getNamedThreadFactory(final String prefix) {
208 SecurityManager s = System.getSecurityManager();
209 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
210 .getThreadGroup();
212 return new ThreadFactory() {
213 final AtomicInteger threadNumber = new AtomicInteger(1);
214 private final int poolNumber = Threads.poolNumber.getAndIncrement();
215 final ThreadGroup group = threadGroup;
217 @Override
218 public Thread newThread(Runnable r) {
219 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
220 return new Thread(group, r, name);
226 * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
227 * without setting the exception handler.
229 public static ThreadFactory newDaemonThreadFactory(final String prefix) {
230 return newDaemonThreadFactory(prefix, null);
234 * Get a named {@link ThreadFactory} that just builds daemon threads.
235 * @param prefix name prefix for all threads created from the factory
236 * @param handler unhandles exception handler to set for all threads
237 * @return a thread factory that creates named, daemon threads with
238 * the supplied exception handler and normal priority
240 public static ThreadFactory newDaemonThreadFactory(final String prefix,
241 final UncaughtExceptionHandler handler) {
242 final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
243 return new ThreadFactory() {
244 @Override
245 public Thread newThread(Runnable r) {
246 Thread t = namedFactory.newThread(r);
247 if (handler != null) {
248 t.setUncaughtExceptionHandler(handler);
249 } else {
250 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
252 if (!t.isDaemon()) {
253 t.setDaemon(true);
255 if (t.getPriority() != Thread.NORM_PRIORITY) {
256 t.setPriority(Thread.NORM_PRIORITY);
258 return t;
264 /** Sets an UncaughtExceptionHandler for the thread which logs the
265 * Exception stack if the thread dies.
267 public static void setLoggingUncaughtExceptionHandler(Thread t) {
268 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
271 private static interface PrintThreadInfoHelper {
273 void printThreadInfo(PrintStream stream, String title);
277 private static class PrintThreadInfoLazyHolder {
279 public static final PrintThreadInfoHelper HELPER = initHelper();
281 private static PrintThreadInfoHelper initHelper() {
282 Method method = null;
283 try {
284 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
285 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class,
286 String.class);
287 method.setAccessible(true);
288 final Method hadoop27Method = method;
289 return new PrintThreadInfoHelper() {
291 @Override
292 public void printThreadInfo(PrintStream stream, String title) {
293 try {
294 hadoop27Method.invoke(null, stream, title);
295 } catch (IllegalAccessException | IllegalArgumentException e) {
296 throw new RuntimeException(e);
297 } catch (InvocationTargetException e) {
298 throw new RuntimeException(e.getCause());
302 } catch (NoSuchMethodException e) {
303 LOG.info(
304 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e);
306 try {
307 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
308 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class,
309 String.class);
310 method.setAccessible(true);
311 final Method hadoop26Method = method;
312 return new PrintThreadInfoHelper() {
314 @Override
315 public void printThreadInfo(PrintStream stream, String title) {
316 try {
317 hadoop26Method.invoke(null, new PrintWriter(
318 new OutputStreamWriter(stream, StandardCharsets.UTF_8)), title);
319 } catch (IllegalAccessException | IllegalArgumentException e) {
320 throw new RuntimeException(e);
321 } catch (InvocationTargetException e) {
322 throw new RuntimeException(e.getCause());
326 } catch (NoSuchMethodException e) {
327 LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e);
329 return null;
334 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
336 * @param stream the stream to
337 * @param title a string title for the stack trace
339 public static void printThreadInfo(PrintStream stream, String title) {
340 Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER,
341 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title);