HBASE-24163 MOB compactor implementations should use format specifiers when calling...
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / Threads.java
blobc72231ac08e6d10d76dd214bf09576b4cc821838
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.OutputStreamWriter;
22 import java.io.PrintStream;
23 import java.io.PrintWriter;
24 import java.lang.Thread.UncaughtExceptionHandler;
25 import java.lang.reflect.InvocationTargetException;
26 import java.lang.reflect.Method;
27 import java.nio.charset.StandardCharsets;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.apache.hadoop.util.ReflectionUtils;
35 import org.apache.hadoop.util.StringUtils;
36 import org.apache.yetus.audience.InterfaceAudience;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
42 /**
43 * Thread Utility
45 @InterfaceAudience.Private
46 public class Threads {
47 private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
48 private static final AtomicInteger poolNumber = new AtomicInteger(1);
50 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
51 new UncaughtExceptionHandler() {
52 @Override
53 public void uncaughtException(Thread t, Throwable e) {
54 LOG.warn("Thread:" + t + " exited with Exception:"
55 + StringUtils.stringifyException(e));
59 /**
60 * Utility method that sets name, daemon status and starts passed thread.
61 * @param t thread to run
62 * @return Returns the passed Thread <code>t</code>.
64 public static <T extends Thread> T setDaemonThreadRunning(T t) {
65 return setDaemonThreadRunning(t, t.getName());
68 /**
69 * Utility method that sets name, daemon status and starts passed thread.
70 * @param t thread to frob
71 * @param name new name
72 * @return Returns the passed Thread <code>t</code>.
74 public static <T extends Thread> T setDaemonThreadRunning(T t, String name) {
75 return setDaemonThreadRunning(t, name, null);
78 /**
79 * Utility method that sets name, daemon status and starts passed thread.
80 * @param t thread to frob
81 * @param name new name
82 * @param handler A handler to set on the thread. Pass null if want to use default handler.
83 * @return Returns the passed Thread <code>t</code>.
85 public static <T extends Thread> T setDaemonThreadRunning(T t, String name,
86 UncaughtExceptionHandler handler) {
87 t.setName(name);
88 if (handler != null) {
89 t.setUncaughtExceptionHandler(handler);
91 t.setDaemon(true);
92 t.start();
93 return t;
96 /**
97 * Shutdown passed thread using isAlive and join.
98 * @param t Thread to shutdown
100 public static void shutdown(final Thread t) {
101 shutdown(t, 0);
105 * Shutdown passed thread using isAlive and join.
106 * @param joinwait Pass 0 if we're to wait forever.
107 * @param t Thread to shutdown
109 public static void shutdown(final Thread t, final long joinwait) {
110 if (t == null) return;
111 while (t.isAlive()) {
112 try {
113 t.join(joinwait);
114 } catch (InterruptedException e) {
115 LOG.warn(t.getName() + "; joinwait=" + joinwait, e);
122 * @param t Waits on the passed thread to die dumping a threaddump every
123 * minute while its up.
124 * @throws InterruptedException
126 public static void threadDumpingIsAlive(final Thread t)
127 throws InterruptedException {
128 if (t == null) {
129 return;
132 while (t.isAlive()) {
133 t.join(60 * 1000);
134 if (t.isAlive()) {
135 printThreadInfo(System.out,
136 "Automatic Stack Trace every 60 seconds waiting on " +
137 t.getName());
143 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144 * @param millis How long to sleep for in milliseconds.
146 public static void sleep(long millis) {
147 try {
148 Thread.sleep(millis);
149 } catch (InterruptedException e) {
150 LOG.warn("sleep interrupted", e);
151 Thread.currentThread().interrupt();
156 * Sleeps for the given amount of time even if interrupted. Preserves
157 * the interrupt status.
158 * @param msToWait the amount of time to sleep in milliseconds
160 public static void sleepWithoutInterrupt(final long msToWait) {
161 long timeMillis = System.currentTimeMillis();
162 long endTime = timeMillis + msToWait;
163 boolean interrupted = false;
164 while (timeMillis < endTime) {
165 try {
166 Thread.sleep(endTime - timeMillis);
167 } catch (InterruptedException ex) {
168 interrupted = true;
170 timeMillis = System.currentTimeMillis();
173 if (interrupted) {
174 Thread.currentThread().interrupt();
179 * Create a new CachedThreadPool with a bounded number as the maximum
180 * thread size in the pool.
182 * @param maxCachedThread the maximum thread could be created in the pool
183 * @param timeout the maximum time to wait
184 * @param unit the time unit of the timeout argument
185 * @param threadFactory the factory to use when creating new threads
186 * @return threadPoolExecutor the cachedThreadPool with a bounded number
187 * as the maximum thread size in the pool.
189 public static ThreadPoolExecutor getBoundedCachedThreadPool(
190 int maxCachedThread, long timeout, TimeUnit unit,
191 ThreadFactory threadFactory) {
192 ThreadPoolExecutor boundedCachedThreadPool =
193 new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout,
194 unit, new LinkedBlockingQueue<>(), threadFactory);
195 // allow the core pool threads timeout and terminate
196 boundedCachedThreadPool.allowCoreThreadTimeOut(true);
197 return boundedCachedThreadPool;
200 public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
201 TimeUnit unit, String prefix) {
202 return getBoundedCachedThreadPool(maxCachedThread, timeout, unit,
203 newDaemonThreadFactory(prefix));
207 * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
208 * with a common prefix.
209 * @param prefix The prefix of every created Thread's name
210 * @return a {@link java.util.concurrent.ThreadFactory} that names threads
212 private static ThreadFactory getNamedThreadFactory(final String prefix) {
213 SecurityManager s = System.getSecurityManager();
214 final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
215 .getThreadGroup();
217 return new ThreadFactory() {
218 final AtomicInteger threadNumber = new AtomicInteger(1);
219 private final int poolNumber = Threads.poolNumber.getAndIncrement();
220 final ThreadGroup group = threadGroup;
222 @Override
223 public Thread newThread(Runnable r) {
224 final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
225 return new Thread(group, r, name);
231 * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
232 * without setting the exception handler.
234 public static ThreadFactory newDaemonThreadFactory(final String prefix) {
235 return newDaemonThreadFactory(prefix, null);
239 * Get a named {@link ThreadFactory} that just builds daemon threads.
240 * @param prefix name prefix for all threads created from the factory
241 * @param handler unhandles exception handler to set for all threads
242 * @return a thread factory that creates named, daemon threads with
243 * the supplied exception handler and normal priority
245 public static ThreadFactory newDaemonThreadFactory(final String prefix,
246 final UncaughtExceptionHandler handler) {
247 final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
248 return new ThreadFactory() {
249 @Override
250 public Thread newThread(Runnable r) {
251 Thread t = namedFactory.newThread(r);
252 if (handler != null) {
253 t.setUncaughtExceptionHandler(handler);
254 } else {
255 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
257 if (!t.isDaemon()) {
258 t.setDaemon(true);
260 if (t.getPriority() != Thread.NORM_PRIORITY) {
261 t.setPriority(Thread.NORM_PRIORITY);
263 return t;
269 /** Sets an UncaughtExceptionHandler for the thread which logs the
270 * Exception stack if the thread dies.
272 public static void setLoggingUncaughtExceptionHandler(Thread t) {
273 t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
276 private static interface PrintThreadInfoHelper {
278 void printThreadInfo(PrintStream stream, String title);
282 private static class PrintThreadInfoLazyHolder {
284 public static final PrintThreadInfoHelper HELPER = initHelper();
286 private static PrintThreadInfoHelper initHelper() {
287 Method method = null;
288 try {
289 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
290 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintStream.class,
291 String.class);
292 method.setAccessible(true);
293 final Method hadoop27Method = method;
294 return new PrintThreadInfoHelper() {
296 @Override
297 public void printThreadInfo(PrintStream stream, String title) {
298 try {
299 hadoop27Method.invoke(null, stream, title);
300 } catch (IllegalAccessException | IllegalArgumentException e) {
301 throw new RuntimeException(e);
302 } catch (InvocationTargetException e) {
303 throw new RuntimeException(e.getCause());
307 } catch (NoSuchMethodException e) {
308 LOG.info(
309 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e);
311 try {
312 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
313 method = ReflectionUtils.class.getMethod("printThreadInfo", PrintWriter.class,
314 String.class);
315 method.setAccessible(true);
316 final Method hadoop26Method = method;
317 return new PrintThreadInfoHelper() {
319 @Override
320 public void printThreadInfo(PrintStream stream, String title) {
321 try {
322 hadoop26Method.invoke(null, new PrintWriter(
323 new OutputStreamWriter(stream, StandardCharsets.UTF_8)), title);
324 } catch (IllegalAccessException | IllegalArgumentException e) {
325 throw new RuntimeException(e);
326 } catch (InvocationTargetException e) {
327 throw new RuntimeException(e.getCause());
331 } catch (NoSuchMethodException e) {
332 LOG.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e);
334 return null;
339 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
341 * @param stream the stream to
342 * @param title a string title for the stack trace
344 public static void printThreadInfo(PrintStream stream, String title) {
345 Preconditions.checkNotNull(PrintThreadInfoLazyHolder.HELPER,
346 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream, title);