3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.OutputStreamWriter
;
22 import java
.io
.PrintStream
;
23 import java
.io
.PrintWriter
;
24 import java
.lang
.Thread
.UncaughtExceptionHandler
;
25 import java
.lang
.reflect
.InvocationTargetException
;
26 import java
.lang
.reflect
.Method
;
27 import java
.nio
.charset
.StandardCharsets
;
28 import java
.util
.concurrent
.LinkedBlockingQueue
;
29 import java
.util
.concurrent
.ThreadFactory
;
30 import java
.util
.concurrent
.ThreadPoolExecutor
;
31 import java
.util
.concurrent
.TimeUnit
;
32 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import org
.apache
.commons
.logging
.Log
;
35 import org
.apache
.commons
.logging
.LogFactory
;
36 import org
.apache
.hadoop
.util
.ReflectionUtils
;
37 import org
.apache
.hadoop
.util
.StringUtils
;
38 import org
.apache
.yetus
.audience
.InterfaceAudience
;
40 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.common
.base
.Preconditions
;
45 @InterfaceAudience.Private
46 public class Threads
{
47 private static final Log LOG
= LogFactory
.getLog(Threads
.class);
48 private static final AtomicInteger poolNumber
= new AtomicInteger(1);
50 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER
=
51 new UncaughtExceptionHandler() {
53 public void uncaughtException(Thread t
, Throwable e
) {
54 LOG
.warn("Thread:" + t
+ " exited with Exception:"
55 + StringUtils
.stringifyException(e
));
60 * Utility method that sets name, daemon status and starts passed thread.
61 * @param t thread to run
62 * @return Returns the passed Thread <code>t</code>.
64 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
) {
65 return setDaemonThreadRunning(t
, t
.getName());
69 * Utility method that sets name, daemon status and starts passed thread.
70 * @param t thread to frob
71 * @param name new name
72 * @return Returns the passed Thread <code>t</code>.
74 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
, String name
) {
75 return setDaemonThreadRunning(t
, name
, null);
79 * Utility method that sets name, daemon status and starts passed thread.
80 * @param t thread to frob
81 * @param name new name
82 * @param handler A handler to set on the thread. Pass null if want to use default handler.
83 * @return Returns the passed Thread <code>t</code>.
85 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
, String name
,
86 UncaughtExceptionHandler handler
) {
88 if (handler
!= null) {
89 t
.setUncaughtExceptionHandler(handler
);
97 * Shutdown passed thread using isAlive and join.
98 * @param t Thread to shutdown
100 public static void shutdown(final Thread t
) {
105 * Shutdown passed thread using isAlive and join.
106 * @param joinwait Pass 0 if we're to wait forever.
107 * @param t Thread to shutdown
109 public static void shutdown(final Thread t
, final long joinwait
) {
110 if (t
== null) return;
111 while (t
.isAlive()) {
114 } catch (InterruptedException e
) {
115 LOG
.warn(t
.getName() + "; joinwait=" + joinwait
, e
);
122 * @param t Waits on the passed thread to die dumping a threaddump every
123 * minute while its up.
124 * @throws InterruptedException
126 public static void threadDumpingIsAlive(final Thread t
)
127 throws InterruptedException
{
132 while (t
.isAlive()) {
135 printThreadInfo(System
.out
,
136 "Automatic Stack Trace every 60 seconds waiting on " +
143 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144 * @param millis How long to sleep for in milliseconds.
146 public static void sleep(long millis
) {
148 Thread
.sleep(millis
);
149 } catch (InterruptedException e
) {
150 LOG
.warn("sleep interrupted", e
);
151 Thread
.currentThread().interrupt();
156 * Sleeps for the given amount of time even if interrupted. Preserves
157 * the interrupt status.
158 * @param msToWait the amount of time to sleep in milliseconds
160 public static void sleepWithoutInterrupt(final long msToWait
) {
161 long timeMillis
= System
.currentTimeMillis();
162 long endTime
= timeMillis
+ msToWait
;
163 boolean interrupted
= false;
164 while (timeMillis
< endTime
) {
166 Thread
.sleep(endTime
- timeMillis
);
167 } catch (InterruptedException ex
) {
170 timeMillis
= System
.currentTimeMillis();
174 Thread
.currentThread().interrupt();
179 * Create a new CachedThreadPool with a bounded number as the maximum
180 * thread size in the pool.
182 * @param maxCachedThread the maximum thread could be created in the pool
183 * @param timeout the maximum time to wait
184 * @param unit the time unit of the timeout argument
185 * @param threadFactory the factory to use when creating new threads
186 * @return threadPoolExecutor the cachedThreadPool with a bounded number
187 * as the maximum thread size in the pool.
189 public static ThreadPoolExecutor
getBoundedCachedThreadPool(
190 int maxCachedThread
, long timeout
, TimeUnit unit
,
191 ThreadFactory threadFactory
) {
192 ThreadPoolExecutor boundedCachedThreadPool
=
193 new ThreadPoolExecutor(maxCachedThread
, maxCachedThread
, timeout
,
194 unit
, new LinkedBlockingQueue
<>(), threadFactory
);
195 // allow the core pool threads timeout and terminate
196 boundedCachedThreadPool
.allowCoreThreadTimeOut(true);
197 return boundedCachedThreadPool
;
202 * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
203 * with a common prefix.
204 * @param prefix The prefix of every created Thread's name
205 * @return a {@link java.util.concurrent.ThreadFactory} that names threads
207 public static ThreadFactory
getNamedThreadFactory(final String prefix
) {
208 SecurityManager s
= System
.getSecurityManager();
209 final ThreadGroup threadGroup
= (s
!= null) ? s
.getThreadGroup() : Thread
.currentThread()
212 return new ThreadFactory() {
213 final AtomicInteger threadNumber
= new AtomicInteger(1);
214 private final int poolNumber
= Threads
.poolNumber
.getAndIncrement();
215 final ThreadGroup group
= threadGroup
;
218 public Thread
newThread(Runnable r
) {
219 final String name
= prefix
+ "-pool" + poolNumber
+ "-t" + threadNumber
.getAndIncrement();
220 return new Thread(group
, r
, name
);
226 * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
227 * without setting the exception handler.
229 public static ThreadFactory
newDaemonThreadFactory(final String prefix
) {
230 return newDaemonThreadFactory(prefix
, null);
234 * Get a named {@link ThreadFactory} that just builds daemon threads.
235 * @param prefix name prefix for all threads created from the factory
236 * @param handler unhandles exception handler to set for all threads
237 * @return a thread factory that creates named, daemon threads with
238 * the supplied exception handler and normal priority
240 public static ThreadFactory
newDaemonThreadFactory(final String prefix
,
241 final UncaughtExceptionHandler handler
) {
242 final ThreadFactory namedFactory
= getNamedThreadFactory(prefix
);
243 return new ThreadFactory() {
245 public Thread
newThread(Runnable r
) {
246 Thread t
= namedFactory
.newThread(r
);
247 if (handler
!= null) {
248 t
.setUncaughtExceptionHandler(handler
);
250 t
.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER
);
255 if (t
.getPriority() != Thread
.NORM_PRIORITY
) {
256 t
.setPriority(Thread
.NORM_PRIORITY
);
264 /** Sets an UncaughtExceptionHandler for the thread which logs the
265 * Exception stack if the thread dies.
267 public static void setLoggingUncaughtExceptionHandler(Thread t
) {
268 t
.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER
);
271 private static interface PrintThreadInfoHelper
{
273 void printThreadInfo(PrintStream stream
, String title
);
277 private static class PrintThreadInfoLazyHolder
{
279 public static final PrintThreadInfoHelper HELPER
= initHelper();
281 private static PrintThreadInfoHelper
initHelper() {
282 Method method
= null;
284 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
285 method
= ReflectionUtils
.class.getMethod("printThreadInfo", PrintStream
.class,
287 method
.setAccessible(true);
288 final Method hadoop27Method
= method
;
289 return new PrintThreadInfoHelper() {
292 public void printThreadInfo(PrintStream stream
, String title
) {
294 hadoop27Method
.invoke(null, stream
, title
);
295 } catch (IllegalAccessException
| IllegalArgumentException e
) {
296 throw new RuntimeException(e
);
297 } catch (InvocationTargetException e
) {
298 throw new RuntimeException(e
.getCause());
302 } catch (NoSuchMethodException e
) {
304 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e
);
307 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
308 method
= ReflectionUtils
.class.getMethod("printThreadInfo", PrintWriter
.class,
310 method
.setAccessible(true);
311 final Method hadoop26Method
= method
;
312 return new PrintThreadInfoHelper() {
315 public void printThreadInfo(PrintStream stream
, String title
) {
317 hadoop26Method
.invoke(null, new PrintWriter(
318 new OutputStreamWriter(stream
, StandardCharsets
.UTF_8
)), title
);
319 } catch (IllegalAccessException
| IllegalArgumentException e
) {
320 throw new RuntimeException(e
);
321 } catch (InvocationTargetException e
) {
322 throw new RuntimeException(e
.getCause());
326 } catch (NoSuchMethodException e
) {
327 LOG
.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e
);
334 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
336 * @param stream the stream to
337 * @param title a string title for the stack trace
339 public static void printThreadInfo(PrintStream stream
, String title
) {
340 Preconditions
.checkNotNull(PrintThreadInfoLazyHolder
.HELPER
,
341 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream
, title
);