3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.OutputStreamWriter
;
22 import java
.io
.PrintStream
;
23 import java
.io
.PrintWriter
;
24 import java
.lang
.Thread
.UncaughtExceptionHandler
;
25 import java
.lang
.reflect
.InvocationTargetException
;
26 import java
.lang
.reflect
.Method
;
27 import java
.nio
.charset
.StandardCharsets
;
28 import java
.util
.concurrent
.LinkedBlockingQueue
;
29 import java
.util
.concurrent
.ThreadFactory
;
30 import java
.util
.concurrent
.ThreadPoolExecutor
;
31 import java
.util
.concurrent
.TimeUnit
;
32 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import org
.apache
.hadoop
.util
.ReflectionUtils
;
35 import org
.apache
.hadoop
.util
.StringUtils
;
36 import org
.apache
.yetus
.audience
.InterfaceAudience
;
37 import org
.slf4j
.Logger
;
38 import org
.slf4j
.LoggerFactory
;
40 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
45 @InterfaceAudience.Private
46 public class Threads
{
47 private static final Logger LOG
= LoggerFactory
.getLogger(Threads
.class);
48 private static final AtomicInteger poolNumber
= new AtomicInteger(1);
50 public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER
=
51 new UncaughtExceptionHandler() {
53 public void uncaughtException(Thread t
, Throwable e
) {
54 LOG
.warn("Thread:" + t
+ " exited with Exception:"
55 + StringUtils
.stringifyException(e
));
60 * Utility method that sets name, daemon status and starts passed thread.
61 * @param t thread to run
62 * @return Returns the passed Thread <code>t</code>.
64 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
) {
65 return setDaemonThreadRunning(t
, t
.getName());
69 * Utility method that sets name, daemon status and starts passed thread.
70 * @param t thread to frob
71 * @param name new name
72 * @return Returns the passed Thread <code>t</code>.
74 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
, String name
) {
75 return setDaemonThreadRunning(t
, name
, null);
79 * Utility method that sets name, daemon status and starts passed thread.
80 * @param t thread to frob
81 * @param name new name
82 * @param handler A handler to set on the thread. Pass null if want to use default handler.
83 * @return Returns the passed Thread <code>t</code>.
85 public static <T
extends Thread
> T
setDaemonThreadRunning(T t
, String name
,
86 UncaughtExceptionHandler handler
) {
88 if (handler
!= null) {
89 t
.setUncaughtExceptionHandler(handler
);
97 * Shutdown passed thread using isAlive and join.
98 * @param t Thread to shutdown
100 public static void shutdown(final Thread t
) {
105 * Shutdown passed thread using isAlive and join.
106 * @param joinwait Pass 0 if we're to wait forever.
107 * @param t Thread to shutdown
109 public static void shutdown(final Thread t
, final long joinwait
) {
110 if (t
== null) return;
111 while (t
.isAlive()) {
114 } catch (InterruptedException e
) {
115 LOG
.warn(t
.getName() + "; joinwait=" + joinwait
, e
);
122 * @param t Waits on the passed thread to die dumping a threaddump every
123 * minute while its up.
124 * @throws InterruptedException
126 public static void threadDumpingIsAlive(final Thread t
)
127 throws InterruptedException
{
132 while (t
.isAlive()) {
135 printThreadInfo(System
.out
,
136 "Automatic Stack Trace every 60 seconds waiting on " +
143 * If interrupted, just prints out the interrupt on STDOUT, resets interrupt and returns
144 * @param millis How long to sleep for in milliseconds.
146 public static void sleep(long millis
) {
148 Thread
.sleep(millis
);
149 } catch (InterruptedException e
) {
150 LOG
.warn("sleep interrupted", e
);
151 Thread
.currentThread().interrupt();
156 * Sleeps for the given amount of time even if interrupted. Preserves
157 * the interrupt status.
158 * @param msToWait the amount of time to sleep in milliseconds
160 public static void sleepWithoutInterrupt(final long msToWait
) {
161 long timeMillis
= System
.currentTimeMillis();
162 long endTime
= timeMillis
+ msToWait
;
163 boolean interrupted
= false;
164 while (timeMillis
< endTime
) {
166 Thread
.sleep(endTime
- timeMillis
);
167 } catch (InterruptedException ex
) {
170 timeMillis
= System
.currentTimeMillis();
174 Thread
.currentThread().interrupt();
179 * Create a new CachedThreadPool with a bounded number as the maximum
180 * thread size in the pool.
182 * @param maxCachedThread the maximum thread could be created in the pool
183 * @param timeout the maximum time to wait
184 * @param unit the time unit of the timeout argument
185 * @param threadFactory the factory to use when creating new threads
186 * @return threadPoolExecutor the cachedThreadPool with a bounded number
187 * as the maximum thread size in the pool.
189 public static ThreadPoolExecutor
getBoundedCachedThreadPool(
190 int maxCachedThread
, long timeout
, TimeUnit unit
,
191 ThreadFactory threadFactory
) {
192 ThreadPoolExecutor boundedCachedThreadPool
=
193 new ThreadPoolExecutor(maxCachedThread
, maxCachedThread
, timeout
,
194 unit
, new LinkedBlockingQueue
<>(), threadFactory
);
195 // allow the core pool threads timeout and terminate
196 boundedCachedThreadPool
.allowCoreThreadTimeOut(true);
197 return boundedCachedThreadPool
;
200 public static ThreadPoolExecutor
getBoundedCachedThreadPool(int maxCachedThread
, long timeout
,
201 TimeUnit unit
, String prefix
) {
202 return getBoundedCachedThreadPool(maxCachedThread
, timeout
, unit
,
203 newDaemonThreadFactory(prefix
));
207 * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
208 * with a common prefix.
209 * @param prefix The prefix of every created Thread's name
210 * @return a {@link java.util.concurrent.ThreadFactory} that names threads
212 private static ThreadFactory
getNamedThreadFactory(final String prefix
) {
213 SecurityManager s
= System
.getSecurityManager();
214 final ThreadGroup threadGroup
= (s
!= null) ? s
.getThreadGroup() : Thread
.currentThread()
217 return new ThreadFactory() {
218 final AtomicInteger threadNumber
= new AtomicInteger(1);
219 private final int poolNumber
= Threads
.poolNumber
.getAndIncrement();
220 final ThreadGroup group
= threadGroup
;
223 public Thread
newThread(Runnable r
) {
224 final String name
= prefix
+ "-pool" + poolNumber
+ "-t" + threadNumber
.getAndIncrement();
225 return new Thread(group
, r
, name
);
231 * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
232 * without setting the exception handler.
234 public static ThreadFactory
newDaemonThreadFactory(final String prefix
) {
235 return newDaemonThreadFactory(prefix
, null);
239 * Get a named {@link ThreadFactory} that just builds daemon threads.
240 * @param prefix name prefix for all threads created from the factory
241 * @param handler unhandles exception handler to set for all threads
242 * @return a thread factory that creates named, daemon threads with
243 * the supplied exception handler and normal priority
245 public static ThreadFactory
newDaemonThreadFactory(final String prefix
,
246 final UncaughtExceptionHandler handler
) {
247 final ThreadFactory namedFactory
= getNamedThreadFactory(prefix
);
248 return new ThreadFactory() {
250 public Thread
newThread(Runnable r
) {
251 Thread t
= namedFactory
.newThread(r
);
252 if (handler
!= null) {
253 t
.setUncaughtExceptionHandler(handler
);
255 t
.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER
);
260 if (t
.getPriority() != Thread
.NORM_PRIORITY
) {
261 t
.setPriority(Thread
.NORM_PRIORITY
);
269 /** Sets an UncaughtExceptionHandler for the thread which logs the
270 * Exception stack if the thread dies.
272 public static void setLoggingUncaughtExceptionHandler(Thread t
) {
273 t
.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER
);
276 private static interface PrintThreadInfoHelper
{
278 void printThreadInfo(PrintStream stream
, String title
);
282 private static class PrintThreadInfoLazyHolder
{
284 public static final PrintThreadInfoHelper HELPER
= initHelper();
286 private static PrintThreadInfoHelper
initHelper() {
287 Method method
= null;
289 // Hadoop 2.7+ declares printThreadInfo(PrintStream, String)
290 method
= ReflectionUtils
.class.getMethod("printThreadInfo", PrintStream
.class,
292 method
.setAccessible(true);
293 final Method hadoop27Method
= method
;
294 return new PrintThreadInfoHelper() {
297 public void printThreadInfo(PrintStream stream
, String title
) {
299 hadoop27Method
.invoke(null, stream
, title
);
300 } catch (IllegalAccessException
| IllegalArgumentException e
) {
301 throw new RuntimeException(e
);
302 } catch (InvocationTargetException e
) {
303 throw new RuntimeException(e
.getCause());
307 } catch (NoSuchMethodException e
) {
309 "Can not find hadoop 2.7+ printThreadInfo method, try hadoop hadoop 2.6 and earlier", e
);
312 // Hadoop 2.6 and earlier declares printThreadInfo(PrintWriter, String)
313 method
= ReflectionUtils
.class.getMethod("printThreadInfo", PrintWriter
.class,
315 method
.setAccessible(true);
316 final Method hadoop26Method
= method
;
317 return new PrintThreadInfoHelper() {
320 public void printThreadInfo(PrintStream stream
, String title
) {
322 hadoop26Method
.invoke(null, new PrintWriter(
323 new OutputStreamWriter(stream
, StandardCharsets
.UTF_8
)), title
);
324 } catch (IllegalAccessException
| IllegalArgumentException e
) {
325 throw new RuntimeException(e
);
326 } catch (InvocationTargetException e
) {
327 throw new RuntimeException(e
.getCause());
331 } catch (NoSuchMethodException e
) {
332 LOG
.warn("Cannot find printThreadInfo method. Check hadoop jars linked", e
);
339 * Print all of the thread's information and stack traces. Wrapper around Hadoop's method.
341 * @param stream the stream to
342 * @param title a string title for the stack trace
344 public static void printThreadInfo(PrintStream stream
, String title
) {
345 Preconditions
.checkNotNull(PrintThreadInfoLazyHolder
.HELPER
,
346 "Cannot find method. Check hadoop jars linked").printThreadInfo(stream
, title
);