1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12 #include "llvm/Support/Threading.h"
19 llvm::ThreadPoolStrategy
llvm::parallel::strategy
;
23 #if LLVM_ENABLE_THREADS
26 static thread_local
unsigned threadIndex
= UINT_MAX
;
28 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL
; }
30 thread_local
unsigned threadIndex
= UINT_MAX
;
37 /// An abstract class that takes closures and runs them asynchronously.
40 virtual ~Executor() = default;
41 virtual void add(std::function
<void()> func
) = 0;
42 virtual size_t getThreadCount() const = 0;
44 static Executor
*getDefaultExecutor();
47 /// An implementation of an Executor that runs closures on a thread pool
49 class ThreadPoolExecutor
: public Executor
{
51 explicit ThreadPoolExecutor(ThreadPoolStrategy S
) {
52 ThreadCount
= S
.compute_thread_count();
53 // Spawn all but one of the threads in another thread as spawning threads
55 Threads
.reserve(ThreadCount
);
57 std::lock_guard
<std::mutex
> Lock(Mutex
);
58 // Use operator[] before creating the thread to avoid data race in .size()
59 // in 'safe libc++' mode.
60 auto &Thread0
= Threads
[0];
61 Thread0
= std::thread([this, S
] {
62 for (unsigned I
= 1; I
< ThreadCount
; ++I
) {
63 Threads
.emplace_back([=] { work(S
, I
); });
67 ThreadsCreated
.set_value();
74 std::lock_guard
<std::mutex
> Lock(Mutex
);
80 ThreadsCreated
.get_future().wait();
83 ~ThreadPoolExecutor() override
{
85 std::thread::id CurrentThreadId
= std::this_thread::get_id();
86 for (std::thread
&T
: Threads
)
87 if (T
.get_id() == CurrentThreadId
)
94 static void *call() { return new ThreadPoolExecutor(strategy
); }
97 static void call(void *Ptr
) { ((ThreadPoolExecutor
*)Ptr
)->stop(); }
100 void add(std::function
<void()> F
) override
{
102 std::lock_guard
<std::mutex
> Lock(Mutex
);
103 WorkStack
.push_back(std::move(F
));
108 size_t getThreadCount() const override
{ return ThreadCount
; }
111 void work(ThreadPoolStrategy S
, unsigned ThreadID
) {
112 threadIndex
= ThreadID
;
113 S
.apply_thread_strategy(ThreadID
);
115 std::unique_lock
<std::mutex
> Lock(Mutex
);
116 Cond
.wait(Lock
, [&] { return Stop
|| !WorkStack
.empty(); });
119 auto Task
= std::move(WorkStack
.back());
120 WorkStack
.pop_back();
126 std::atomic
<bool> Stop
{false};
127 std::vector
<std::function
<void()>> WorkStack
;
129 std::condition_variable Cond
;
130 std::promise
<void> ThreadsCreated
;
131 std::vector
<std::thread
> Threads
;
132 unsigned ThreadCount
;
135 Executor
*Executor::getDefaultExecutor() {
137 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
138 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
139 // stops the thread pool and waits for any worker thread creation to complete
140 // but does not wait for the threads to finish. The wait for worker thread
141 // creation to complete is important as it prevents intermittent crashes on
142 // Windows due to a race condition between thread creation and process exit.
144 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
145 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
146 // destructor ensures it has been stopped and waits for worker threads to
147 // finish. The wait is important as it prevents intermittent crashes on
148 // Windows when the process is doing a full exit.
150 // The Windows crashes appear to only occur with the MSVC static runtimes and
151 // are more frequent with the debug static runtime.
153 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
155 static ManagedStatic
<ThreadPoolExecutor
, ThreadPoolExecutor::Creator
,
156 ThreadPoolExecutor::Deleter
>
158 static std::unique_ptr
<ThreadPoolExecutor
> Exec(&(*ManagedExec
));
161 // ManagedStatic is not desired on other platforms. When `Exec` is destroyed
162 // by llvm_shutdown(), worker threads will clean up and invoke TLS
163 // destructors. This can lead to race conditions if other threads attempt to
164 // access TLS objects that have already been destroyed.
165 static ThreadPoolExecutor
Exec(strategy
);
170 } // namespace detail
172 size_t getThreadCount() {
173 return detail::Executor::getDefaultExecutor()->getThreadCount();
177 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
178 // lock if all threads in the default executor are blocked. To prevent the dead
179 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
180 // of nested parallel_for_each(), only the outermost one runs parallelly.
181 TaskGroup::TaskGroup()
182 #if LLVM_ENABLE_THREADS
183 : Parallel((parallel::strategy
.ThreadsRequested
!= 1) &&
184 (threadIndex
== UINT_MAX
)) {}
188 TaskGroup::~TaskGroup() {
189 // We must ensure that all the workloads have finished before decrementing the
194 void TaskGroup::spawn(std::function
<void()> F
) {
195 #if LLVM_ENABLE_THREADS
198 detail::Executor::getDefaultExecutor()->add([&, F
= std::move(F
)] {
208 } // namespace parallel
211 void llvm::parallelFor(size_t Begin
, size_t End
,
212 llvm::function_ref
<void(size_t)> Fn
) {
213 #if LLVM_ENABLE_THREADS
214 if (parallel::strategy
.ThreadsRequested
!= 1) {
215 auto NumItems
= End
- Begin
;
216 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
217 // overhead on large inputs.
218 auto TaskSize
= NumItems
/ parallel::detail::MaxTasksPerGroup
;
222 parallel::TaskGroup TG
;
223 for (; Begin
+ TaskSize
< End
; Begin
+= TaskSize
) {
225 for (size_t I
= Begin
, E
= Begin
+ TaskSize
; I
!= E
; ++I
)
231 for (size_t I
= Begin
; I
!= End
; ++I
)
239 for (; Begin
!= End
; ++Begin
)