[docs] Fix build-docs.sh
[llvm-project.git] / llvm / unittests / Support / ThreadPool.cpp
blobfd9d7272e7e0b6bef4cdd37cc530d6137dd0d4f4
1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/ThreadPool.h"
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/CommandLine.h"
16 #include "llvm/Support/Host.h"
17 #include "llvm/Support/Program.h"
18 #include "llvm/Support/TargetSelect.h"
19 #include "llvm/Support/Threading.h"
21 #include <chrono>
22 #include <thread>
24 #include "gtest/gtest.h"
26 using namespace llvm;
28 // Fixture for the unittests, allowing to *temporarily* disable the unittests
29 // on a particular platform
30 class ThreadPoolTest : public testing::Test {
31 Triple Host;
32 SmallVector<Triple::ArchType, 4> UnsupportedArchs;
33 SmallVector<Triple::OSType, 4> UnsupportedOSs;
34 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
36 protected:
37 // This is intended for platform as a temporary "XFAIL"
38 bool isUnsupportedOSOrEnvironment() {
39 Triple Host(Triple::normalize(sys::getProcessTriple()));
41 if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
42 UnsupportedEnvironments.end())
43 return true;
45 if (is_contained(UnsupportedOSs, Host.getOS()))
46 return true;
48 if (is_contained(UnsupportedArchs, Host.getArch()))
49 return true;
51 return false;
54 ThreadPoolTest() {
55 // Add unsupported configuration here, example:
56 // UnsupportedArchs.push_back(Triple::x86_64);
58 // See https://llvm.org/bugs/show_bug.cgi?id=25829
59 UnsupportedArchs.push_back(Triple::ppc64le);
60 UnsupportedArchs.push_back(Triple::ppc64);
63 /// Make sure this thread not progress faster than the main thread.
64 void waitForMainThread() { waitForPhase(1); }
66 /// Set the readiness of the main thread.
67 void setMainThreadReady() { setPhase(1); }
69 /// Wait until given phase is set using setPhase(); first "main" phase is 1.
70 /// See also PhaseResetHelper below.
71 void waitForPhase(int Phase) {
72 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
73 CurrentPhaseCondition.wait(
74 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
76 /// If a thread waits on another phase, the test could bail out on a failed
77 /// assertion and ThreadPool destructor would wait() on all threads, which
78 /// would deadlock on the task waiting. Create this helper to automatically
79 /// reset the phase and unblock such threads.
80 struct PhaseResetHelper {
81 PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
82 ~PhaseResetHelper() { test->setPhase(-1); }
83 ThreadPoolTest *test;
86 /// Advance to the given phase.
87 void setPhase(int Phase) {
89 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
90 assert(Phase == CurrentPhase + 1 || Phase < 0);
91 CurrentPhase = Phase;
93 CurrentPhaseCondition.notify_all();
96 void SetUp() override { CurrentPhase = 0; }
98 std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
100 std::condition_variable CurrentPhaseCondition;
101 std::mutex CurrentPhaseMutex;
102 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
105 #define CHECK_UNSUPPORTED() \
106 do { \
107 if (isUnsupportedOSOrEnvironment()) \
108 GTEST_SKIP(); \
109 } while (0);
111 TEST_F(ThreadPoolTest, AsyncBarrier) {
112 CHECK_UNSUPPORTED();
113 // test that async & barrier work together properly.
115 std::atomic_int checked_in{0};
117 ThreadPool Pool;
118 for (size_t i = 0; i < 5; ++i) {
119 Pool.async([this, &checked_in] {
120 waitForMainThread();
121 ++checked_in;
124 ASSERT_EQ(0, checked_in);
125 setMainThreadReady();
126 Pool.wait();
127 ASSERT_EQ(5, checked_in);
130 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
132 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
133 CHECK_UNSUPPORTED();
134 // Test that async works with a function requiring multiple parameters.
135 std::atomic_int checked_in{0};
137 ThreadPool Pool;
138 for (size_t i = 0; i < 5; ++i) {
139 Pool.async(TestFunc, std::ref(checked_in), i);
141 Pool.wait();
142 ASSERT_EQ(10, checked_in);
145 TEST_F(ThreadPoolTest, Async) {
146 CHECK_UNSUPPORTED();
147 ThreadPool Pool;
148 std::atomic_int i{0};
149 Pool.async([this, &i] {
150 waitForMainThread();
151 ++i;
153 Pool.async([&i] { ++i; });
154 ASSERT_NE(2, i.load());
155 setMainThreadReady();
156 Pool.wait();
157 ASSERT_EQ(2, i.load());
160 TEST_F(ThreadPoolTest, GetFuture) {
161 CHECK_UNSUPPORTED();
162 ThreadPool Pool(hardware_concurrency(2));
163 std::atomic_int i{0};
164 Pool.async([this, &i] {
165 waitForMainThread();
166 ++i;
168 // Force the future using get()
169 Pool.async([&i] { ++i; }).get();
170 ASSERT_NE(2, i.load());
171 setMainThreadReady();
172 Pool.wait();
173 ASSERT_EQ(2, i.load());
176 TEST_F(ThreadPoolTest, GetFutureWithResult) {
177 CHECK_UNSUPPORTED();
178 ThreadPool Pool(hardware_concurrency(2));
179 auto F1 = Pool.async([] { return 1; });
180 auto F2 = Pool.async([] { return 2; });
182 setMainThreadReady();
183 Pool.wait();
184 ASSERT_EQ(1, F1.get());
185 ASSERT_EQ(2, F2.get());
188 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
189 CHECK_UNSUPPORTED();
190 ThreadPool Pool(hardware_concurrency(2));
191 auto Fn = [](int x) { return x; };
192 auto F1 = Pool.async(Fn, 1);
193 auto F2 = Pool.async(Fn, 2);
195 setMainThreadReady();
196 Pool.wait();
197 ASSERT_EQ(1, F1.get());
198 ASSERT_EQ(2, F2.get());
201 TEST_F(ThreadPoolTest, PoolDestruction) {
202 CHECK_UNSUPPORTED();
203 // Test that we are waiting on destruction
204 std::atomic_int checked_in{0};
206 ThreadPool Pool;
207 for (size_t i = 0; i < 5; ++i) {
208 Pool.async([this, &checked_in] {
209 waitForMainThread();
210 ++checked_in;
213 ASSERT_EQ(0, checked_in);
214 setMainThreadReady();
216 ASSERT_EQ(5, checked_in);
219 // Check running tasks in different groups.
220 TEST_F(ThreadPoolTest, Groups) {
221 CHECK_UNSUPPORTED();
222 // Need at least two threads, as the task in group2
223 // might block a thread until all tasks in group1 finish.
224 ThreadPoolStrategy S = hardware_concurrency(2);
225 if (S.compute_thread_count() < 2)
226 return;
227 ThreadPool Pool(S);
228 PhaseResetHelper Helper(this);
229 ThreadPoolTaskGroup Group1(Pool);
230 ThreadPoolTaskGroup Group2(Pool);
232 // Check that waiting for an empty group is a no-op.
233 Group1.wait();
235 std::atomic_int checked_in1{0};
236 std::atomic_int checked_in2{0};
238 for (size_t i = 0; i < 5; ++i) {
239 Group1.async([this, &checked_in1] {
240 waitForMainThread();
241 ++checked_in1;
244 Group2.async([this, &checked_in2] {
245 waitForPhase(2);
246 ++checked_in2;
248 ASSERT_EQ(0, checked_in1);
249 ASSERT_EQ(0, checked_in2);
250 // Start first group and wait for it.
251 setMainThreadReady();
252 Group1.wait();
253 ASSERT_EQ(5, checked_in1);
254 // Second group has not yet finished, start it and wait for it.
255 ASSERT_EQ(0, checked_in2);
256 setPhase(2);
257 Group2.wait();
258 ASSERT_EQ(5, checked_in1);
259 ASSERT_EQ(1, checked_in2);
262 // Check recursive tasks.
263 TEST_F(ThreadPoolTest, RecursiveGroups) {
264 CHECK_UNSUPPORTED();
265 ThreadPool Pool;
266 ThreadPoolTaskGroup Group(Pool);
268 std::atomic_int checked_in1{0};
270 for (size_t i = 0; i < 5; ++i) {
271 Group.async([this, &Pool, &checked_in1] {
272 waitForMainThread();
274 ThreadPoolTaskGroup LocalGroup(Pool);
276 // Check that waiting for an empty group is a no-op.
277 LocalGroup.wait();
279 std::atomic_int checked_in2{0};
280 for (size_t i = 0; i < 5; ++i) {
281 LocalGroup.async([&checked_in2] { ++checked_in2; });
283 LocalGroup.wait();
284 ASSERT_EQ(5, checked_in2);
286 ++checked_in1;
289 ASSERT_EQ(0, checked_in1);
290 setMainThreadReady();
291 Group.wait();
292 ASSERT_EQ(5, checked_in1);
295 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
296 CHECK_UNSUPPORTED();
297 ThreadPoolStrategy S = hardware_concurrency(2);
298 if (S.compute_thread_count() < 2)
299 return;
300 ThreadPool Pool(S);
301 PhaseResetHelper Helper(this);
302 ThreadPoolTaskGroup Group(Pool);
304 // Test that a thread calling wait() for a group and is waiting for more tasks
305 // returns when the last task finishes in a different thread while the waiting
306 // thread was waiting for more tasks to process while waiting.
308 // Task A runs in the first thread. It finishes and leaves
309 // the background thread waiting for more tasks.
310 Group.async([this] {
311 waitForMainThread();
312 setPhase(2);
314 // Task B is run in a second thread, it launches yet another
315 // task C in a different group, which will be handled by the waiting
316 // thread started above.
317 Group.async([this, &Pool] {
318 waitForPhase(2);
319 ThreadPoolTaskGroup LocalGroup(Pool);
320 LocalGroup.async([this] {
321 waitForPhase(3);
322 // Give the other thread enough time to check that there's no task
323 // to process and suspend waiting for a notification. This is indeed racy,
324 // but probably the best that can be done.
325 std::this_thread::sleep_for(std::chrono::milliseconds(10));
327 // And task B only now will wait for the tasks in the group (=task C)
328 // to finish. This test checks that it does not deadlock. If the
329 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
330 // this task B would be stuck waiting for tasks to arrive.
331 setPhase(3);
332 LocalGroup.wait();
334 setMainThreadReady();
335 Group.wait();
338 #if LLVM_ENABLE_THREADS == 1
340 // FIXME: Skip some tests below on non-Windows because multi-socket systems
341 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
342 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
343 #ifdef _WIN32
345 std::vector<llvm::BitVector>
346 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
347 llvm::SetVector<llvm::BitVector> ThreadsUsed;
348 std::mutex Lock;
350 std::condition_variable AllThreads;
351 std::mutex AllThreadsLock;
352 unsigned Active = 0;
354 ThreadPool Pool(S);
355 for (size_t I = 0; I < S.compute_thread_count(); ++I) {
356 Pool.async([&] {
358 std::lock_guard<std::mutex> Guard(AllThreadsLock);
359 ++Active;
360 AllThreads.notify_one();
362 waitForMainThread();
363 std::lock_guard<std::mutex> Guard(Lock);
364 auto Mask = llvm::get_thread_affinity_mask();
365 ThreadsUsed.insert(Mask);
368 EXPECT_EQ(true, ThreadsUsed.empty());
370 std::unique_lock<std::mutex> Guard(AllThreadsLock);
371 AllThreads.wait(Guard,
372 [&]() { return Active == S.compute_thread_count(); });
374 setMainThreadReady();
376 return ThreadsUsed.takeVector();
379 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
380 CHECK_UNSUPPORTED();
381 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
382 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
385 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
386 CHECK_UNSUPPORTED();
387 std::vector<llvm::BitVector> ThreadsUsed =
388 RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
389 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
392 // From TestMain.cpp.
393 extern const char *TestMainArgv0;
395 // Just a reachable symbol to ease resolving of the executable's path.
396 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
398 #ifdef _WIN32
399 #define setenv(name, var, ignore) _putenv_s(name, var)
400 #endif
402 TEST_F(ThreadPoolTest, AffinityMask) {
403 CHECK_UNSUPPORTED();
405 // Skip this test if less than 4 threads are available.
406 if (llvm::hardware_concurrency().compute_thread_count() < 4)
407 GTEST_SKIP();
409 using namespace llvm::sys;
410 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
411 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
412 // Ensure the threads only ran on CPUs 0-3.
413 // NOTE: Don't use ASSERT* here because this runs in a subprocess,
414 // and will show up as un-executed in the parent.
415 assert(llvm::all_of(ThreadsUsed,
416 [](auto &T) { return T.getData().front() < 16UL; }) &&
417 "Threads ran on more CPUs than expected! The affinity mask does not "
418 "seem to work.");
419 GTEST_SKIP();
421 std::string Executable =
422 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
423 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
425 // Add environment variable to the environment of the child process.
426 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
427 ASSERT_EQ(Res, 0);
429 std::string Error;
430 bool ExecutionFailed;
431 BitVector Affinity;
432 Affinity.resize(4);
433 Affinity.set(0, 4); // Use CPUs 0,1,2,3.
434 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
435 &ExecutionFailed, nullptr, &Affinity);
436 ASSERT_EQ(0, Ret);
439 #endif // #ifdef _WIN32
440 #endif // #if LLVM_ENABLE_THREADS == 1