[RISCV] Fix mgather -> riscv.masked.strided.load combine not extending indices (...
[llvm-project.git] / llvm / lib / Support / Parallel.cpp
blob9b14b05b52116034dd9a1937f59329f403e42322
1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/Parallel.h"
10 #include "llvm/Config/llvm-config.h"
11 #include "llvm/Support/ManagedStatic.h"
12 #include "llvm/Support/Threading.h"
14 #include <atomic>
15 #include <deque>
16 #include <future>
17 #include <thread>
18 #include <vector>
20 llvm::ThreadPoolStrategy llvm::parallel::strategy;
22 namespace llvm {
23 namespace parallel {
24 #if LLVM_ENABLE_THREADS
26 #ifdef _WIN32
27 static thread_local unsigned threadIndex = UINT_MAX;
29 unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; }
30 #else
31 thread_local unsigned threadIndex = UINT_MAX;
32 #endif
34 namespace detail {
36 namespace {
38 /// An abstract class that takes closures and runs them asynchronously.
39 class Executor {
40 public:
41 virtual ~Executor() = default;
42 virtual void add(std::function<void()> func, bool Sequential = false) = 0;
43 virtual size_t getThreadCount() const = 0;
45 static Executor *getDefaultExecutor();
48 /// An implementation of an Executor that runs closures on a thread pool
49 /// in filo order.
50 class ThreadPoolExecutor : public Executor {
51 public:
52 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
53 ThreadCount = S.compute_thread_count();
54 // Spawn all but one of the threads in another thread as spawning threads
55 // can take a while.
56 Threads.reserve(ThreadCount);
57 Threads.resize(1);
58 std::lock_guard<std::mutex> Lock(Mutex);
59 // Use operator[] before creating the thread to avoid data race in .size()
60 // in “safe libc++” mode.
61 auto &Thread0 = Threads[0];
62 Thread0 = std::thread([this, S] {
63 for (unsigned I = 1; I < ThreadCount; ++I) {
64 Threads.emplace_back([=] { work(S, I); });
65 if (Stop)
66 break;
68 ThreadsCreated.set_value();
69 work(S, 0);
70 });
73 void stop() {
75 std::lock_guard<std::mutex> Lock(Mutex);
76 if (Stop)
77 return;
78 Stop = true;
80 Cond.notify_all();
81 ThreadsCreated.get_future().wait();
84 ~ThreadPoolExecutor() override {
85 stop();
86 std::thread::id CurrentThreadId = std::this_thread::get_id();
87 for (std::thread &T : Threads)
88 if (T.get_id() == CurrentThreadId)
89 T.detach();
90 else
91 T.join();
94 struct Creator {
95 static void *call() { return new ThreadPoolExecutor(strategy); }
97 struct Deleter {
98 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
101 void add(std::function<void()> F, bool Sequential = false) override {
103 std::lock_guard<std::mutex> Lock(Mutex);
104 if (Sequential)
105 WorkQueueSequential.emplace_front(std::move(F));
106 else
107 WorkQueue.emplace_back(std::move(F));
109 Cond.notify_one();
112 size_t getThreadCount() const override { return ThreadCount; }
114 private:
115 bool hasSequentialTasks() const {
116 return !WorkQueueSequential.empty() && !SequentialQueueIsLocked;
119 bool hasGeneralTasks() const { return !WorkQueue.empty(); }
121 void work(ThreadPoolStrategy S, unsigned ThreadID) {
122 threadIndex = ThreadID;
123 S.apply_thread_strategy(ThreadID);
124 while (true) {
125 std::unique_lock<std::mutex> Lock(Mutex);
126 Cond.wait(Lock, [&] {
127 return Stop || hasGeneralTasks() || hasSequentialTasks();
129 if (Stop)
130 break;
131 bool Sequential = hasSequentialTasks();
132 if (Sequential)
133 SequentialQueueIsLocked = true;
134 else
135 assert(hasGeneralTasks());
137 auto &Queue = Sequential ? WorkQueueSequential : WorkQueue;
138 auto Task = std::move(Queue.back());
139 Queue.pop_back();
140 Lock.unlock();
141 Task();
142 if (Sequential)
143 SequentialQueueIsLocked = false;
147 std::atomic<bool> Stop{false};
148 std::atomic<bool> SequentialQueueIsLocked{false};
149 std::deque<std::function<void()>> WorkQueue;
150 std::deque<std::function<void()>> WorkQueueSequential;
151 std::mutex Mutex;
152 std::condition_variable Cond;
153 std::promise<void> ThreadsCreated;
154 std::vector<std::thread> Threads;
155 unsigned ThreadCount;
158 Executor *Executor::getDefaultExecutor() {
159 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
160 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
161 // stops the thread pool and waits for any worker thread creation to complete
162 // but does not wait for the threads to finish. The wait for worker thread
163 // creation to complete is important as it prevents intermittent crashes on
164 // Windows due to a race condition between thread creation and process exit.
166 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
167 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
168 // destructor ensures it has been stopped and waits for worker threads to
169 // finish. The wait is important as it prevents intermittent crashes on
170 // Windows when the process is doing a full exit.
172 // The Windows crashes appear to only occur with the MSVC static runtimes and
173 // are more frequent with the debug static runtime.
175 // This also prevents intermittent deadlocks on exit with the MinGW runtime.
177 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
178 ThreadPoolExecutor::Deleter>
179 ManagedExec;
180 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
181 return Exec.get();
183 } // namespace
184 } // namespace detail
186 size_t getThreadCount() {
187 return detail::Executor::getDefaultExecutor()->getThreadCount();
189 #endif
191 // Latch::sync() called by the dtor may cause one thread to block. If is a dead
192 // lock if all threads in the default executor are blocked. To prevent the dead
193 // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario
194 // of nested parallel_for_each(), only the outermost one runs parallelly.
195 TaskGroup::TaskGroup()
196 #if LLVM_ENABLE_THREADS
197 : Parallel((parallel::strategy.ThreadsRequested != 1) &&
198 (threadIndex == UINT_MAX)) {}
199 #else
200 : Parallel(false) {}
201 #endif
202 TaskGroup::~TaskGroup() {
203 // We must ensure that all the workloads have finished before decrementing the
204 // instances count.
205 L.sync();
208 void TaskGroup::spawn(std::function<void()> F, bool Sequential) {
209 #if LLVM_ENABLE_THREADS
210 if (Parallel) {
211 L.inc();
212 detail::Executor::getDefaultExecutor()->add(
213 [&, F = std::move(F)] {
214 F();
215 L.dec();
217 Sequential);
218 return;
220 #endif
221 F();
224 } // namespace parallel
225 } // namespace llvm
227 void llvm::parallelFor(size_t Begin, size_t End,
228 llvm::function_ref<void(size_t)> Fn) {
229 #if LLVM_ENABLE_THREADS
230 if (parallel::strategy.ThreadsRequested != 1) {
231 auto NumItems = End - Begin;
232 // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
233 // overhead on large inputs.
234 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
235 if (TaskSize == 0)
236 TaskSize = 1;
238 parallel::TaskGroup TG;
239 for (; Begin + TaskSize < End; Begin += TaskSize) {
240 TG.spawn([=, &Fn] {
241 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
242 Fn(I);
245 if (Begin != End) {
246 TG.spawn([=, &Fn] {
247 for (size_t I = Begin; I != End; ++I)
248 Fn(I);
251 return;
253 #endif
255 for (; Begin != End; ++Begin)
256 Fn(Begin);