1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12 #include "llvm/Support/Threading.h"
20 llvm::ThreadPoolStrategy
llvm::parallel::strategy
;
24 #if LLVM_ENABLE_THREADS
27 static thread_local
unsigned threadIndex
= UINT_MAX
;
29 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL
; }
31 thread_local
unsigned threadIndex
= UINT_MAX
;
38 /// An abstract class that takes closures and runs them asynchronously.
41 virtual ~Executor() = default;
42 virtual void add(std::function
<void()> func
, bool Sequential
= false) = 0;
43 virtual size_t getThreadCount() const = 0;
45 static Executor
*getDefaultExecutor();
48 /// An implementation of an Executor that runs closures on a thread pool
50 class ThreadPoolExecutor
: public Executor
{
52 explicit ThreadPoolExecutor(ThreadPoolStrategy S
= hardware_concurrency()) {
53 ThreadCount
= S
.compute_thread_count();
54 // Spawn all but one of the threads in another thread as spawning threads
56 Threads
.reserve(ThreadCount
);
58 std::lock_guard
<std::mutex
> Lock(Mutex
);
59 // Use operator[] before creating the thread to avoid data race in .size()
60 // in “safe libc++” mode.
61 auto &Thread0
= Threads
[0];
62 Thread0
= std::thread([this, S
] {
63 for (unsigned I
= 1; I
< ThreadCount
; ++I
) {
64 Threads
.emplace_back([=] { work(S
, I
); });
68 ThreadsCreated
.set_value();
75 std::lock_guard
<std::mutex
> Lock(Mutex
);
81 ThreadsCreated
.get_future().wait();
84 ~ThreadPoolExecutor() override
{
86 std::thread::id CurrentThreadId
= std::this_thread::get_id();
87 for (std::thread
&T
: Threads
)
88 if (T
.get_id() == CurrentThreadId
)
95 static void *call() { return new ThreadPoolExecutor(strategy
); }
98 static void call(void *Ptr
) { ((ThreadPoolExecutor
*)Ptr
)->stop(); }
101 void add(std::function
<void()> F
, bool Sequential
= false) override
{
103 std::lock_guard
<std::mutex
> Lock(Mutex
);
105 WorkQueueSequential
.emplace_front(std::move(F
));
107 WorkQueue
.emplace_back(std::move(F
));
112 size_t getThreadCount() const override
{ return ThreadCount
; }
115 bool hasSequentialTasks() const {
116 return !WorkQueueSequential
.empty() && !SequentialQueueIsLocked
;
119 bool hasGeneralTasks() const { return !WorkQueue
.empty(); }
121 void work(ThreadPoolStrategy S
, unsigned ThreadID
) {
122 threadIndex
= ThreadID
;
123 S
.apply_thread_strategy(ThreadID
);
125 std::unique_lock
<std::mutex
> Lock(Mutex
);
126 Cond
.wait(Lock
, [&] {
127 return Stop
|| hasGeneralTasks() || hasSequentialTasks();
131 bool Sequential
= hasSequentialTasks();
133 SequentialQueueIsLocked
= true;
135 assert(hasGeneralTasks());
137 auto &Queue
= Sequential
? WorkQueueSequential
: WorkQueue
;
138 auto Task
= std::move(Queue
.back());
143 SequentialQueueIsLocked
= false;
147 std::atomic
<bool> Stop
{false};
148 std::atomic
<bool> SequentialQueueIsLocked
{false};
149 std::deque
<std::function
<void()>> WorkQueue
;
150 std::deque
<std::function
<void()>> WorkQueueSequential
;
152 std::condition_variable Cond
;
153 std::promise
<void> ThreadsCreated
;
154 std::vector
<std::thread
> Threads
;
155 unsigned ThreadCount
;
158 Executor
*Executor::getDefaultExecutor() {
159 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161 // stops the thread pool and waits for any worker thread creation to complete
162 // but does not wait for the threads to finish. The wait for worker thread
163 // creation to complete is important as it prevents intermittent crashes on
164 // Windows due to a race condition between thread creation and process exit.
166 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168 // destructor ensures it has been stopped and waits for worker threads to
169 // finish. The wait is important as it prevents intermittent crashes on
170 // Windows when the process is doing a full exit.
172 // The Windows crashes appear to only occur with the MSVC static runtimes and
173 // are more frequent with the debug static runtime.
175 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
177 static ManagedStatic
<ThreadPoolExecutor
, ThreadPoolExecutor::Creator
,
178 ThreadPoolExecutor::Deleter
>
180 static std::unique_ptr
<ThreadPoolExecutor
> Exec(&(*ManagedExec
));
184 } // namespace detail
186 size_t getThreadCount() {
187 return detail::Executor::getDefaultExecutor()->getThreadCount();
191 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
192 // lock if all threads in the default executor are blocked. To prevent the dead
193 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194 // of nested parallel_for_each(), only the outermost one runs parallelly.
195 TaskGroup::TaskGroup()
196 #if LLVM_ENABLE_THREADS
197 : Parallel((parallel::strategy
.ThreadsRequested
!= 1) &&
198 (threadIndex
== UINT_MAX
)) {}
202 TaskGroup::~TaskGroup() {
203 // We must ensure that all the workloads have finished before decrementing the
208 void TaskGroup::spawn(std::function
<void()> F
, bool Sequential
) {
209 #if LLVM_ENABLE_THREADS
212 detail::Executor::getDefaultExecutor()->add(
213 [&, F
= std::move(F
)] {
224 } // namespace parallel
227 void llvm::parallelFor(size_t Begin
, size_t End
,
228 llvm::function_ref
<void(size_t)> Fn
) {
229 #if LLVM_ENABLE_THREADS
230 if (parallel::strategy
.ThreadsRequested
!= 1) {
231 auto NumItems
= End
- Begin
;
232 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233 // overhead on large inputs.
234 auto TaskSize
= NumItems
/ parallel::detail::MaxTasksPerGroup
;
238 parallel::TaskGroup TG
;
239 for (; Begin
+ TaskSize
< End
; Begin
+= TaskSize
) {
241 for (size_t I
= Begin
, E
= Begin
+ TaskSize
; I
!= E
; ++I
)
247 for (size_t I
= Begin
; I
!= End
; ++I
)
255 for (; Begin
!= End
; ++Begin
)