1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/ThreadPool.h"
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/Support/CommandLine.h"
15 #include "llvm/Support/Program.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 #include "llvm/TargetParser/Host.h"
19 #include "llvm/TargetParser/Triple.h"
22 #include "llvm/Support/Windows/WindowsSupport.h"
28 #include "gtest/gtest.h"
32 // Specialize gtest construct to provide friendlier name in the output.
33 #if LLVM_ENABLE_THREADS
34 template <> std::string GetTypeName
<llvm::StdThreadPool
>() {
35 return "llvm::StdThreadPool";
38 template <> std::string GetTypeName
<llvm::SingleThreadExecutor
>() {
39 return "llvm::SingleThreadExecutor";
41 } // namespace internal
42 } // namespace testing
46 // Fixture for the unittests, allowing to *temporarily* disable the unittests
47 // on a particular platform
48 template <typename ThreadPoolImpl
> class ThreadPoolTest
: public testing::Test
{
50 SmallVector
<Triple::ArchType
, 4> UnsupportedArchs
;
51 SmallVector
<Triple::OSType
, 4> UnsupportedOSs
;
52 SmallVector
<Triple::EnvironmentType
, 1> UnsupportedEnvironments
;
55 // This is intended for platform as a temporary "XFAIL"
56 bool isUnsupportedOSOrEnvironment() {
57 Triple
Host(Triple::normalize(sys::getProcessTriple()));
59 if (find(UnsupportedEnvironments
, Host
.getEnvironment()) !=
60 UnsupportedEnvironments
.end())
63 if (is_contained(UnsupportedOSs
, Host
.getOS()))
66 if (is_contained(UnsupportedArchs
, Host
.getArch()))
73 // Add unsupported configuration here, example:
74 // UnsupportedArchs.push_back(Triple::x86_64);
76 // See https://llvm.org/bugs/show_bug.cgi?id=25829
77 UnsupportedArchs
.push_back(Triple::ppc64le
);
78 UnsupportedArchs
.push_back(Triple::ppc64
);
81 /// Make sure this thread not progress faster than the main thread.
82 void waitForMainThread() { waitForPhase(1); }
84 /// Set the readiness of the main thread.
85 void setMainThreadReady() { setPhase(1); }
87 /// Wait until given phase is set using setPhase(); first "main" phase is 1.
88 /// See also PhaseResetHelper below.
89 void waitForPhase(int Phase
) {
90 std::unique_lock
<std::mutex
> LockGuard(CurrentPhaseMutex
);
91 CurrentPhaseCondition
.wait(
92 LockGuard
, [&] { return CurrentPhase
== Phase
|| CurrentPhase
< 0; });
94 /// If a thread waits on another phase, the test could bail out on a failed
95 /// assertion and ThreadPool destructor would wait() on all threads, which
96 /// would deadlock on the task waiting. Create this helper to automatically
97 /// reset the phase and unblock such threads.
98 struct PhaseResetHelper
{
99 PhaseResetHelper(ThreadPoolTest
*test
) : test(test
) {}
100 ~PhaseResetHelper() { test
->setPhase(-1); }
101 ThreadPoolTest
*test
;
104 /// Advance to the given phase.
105 void setPhase(int Phase
) {
107 std::unique_lock
<std::mutex
> LockGuard(CurrentPhaseMutex
);
108 assert(Phase
== CurrentPhase
+ 1 || Phase
< 0);
109 CurrentPhase
= Phase
;
111 CurrentPhaseCondition
.notify_all();
114 void SetUp() override
{ CurrentPhase
= 0; }
116 SmallVector
<llvm::BitVector
, 0> RunOnAllSockets(ThreadPoolStrategy S
);
118 std::condition_variable CurrentPhaseCondition
;
119 std::mutex CurrentPhaseMutex
;
120 int CurrentPhase
; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
123 using ThreadPoolImpls
= ::testing::Types
<
124 #if LLVM_ENABLE_THREADS
127 SingleThreadExecutor
>;
129 TYPED_TEST_SUITE(ThreadPoolTest
, ThreadPoolImpls
, );
131 #define CHECK_UNSUPPORTED() \
133 if (this->isUnsupportedOSOrEnvironment()) \
137 TYPED_TEST(ThreadPoolTest
, AsyncBarrier
) {
139 // test that async & barrier work together properly.
141 std::atomic_int checked_in
{0};
143 DefaultThreadPool Pool
;
144 for (size_t i
= 0; i
< 5; ++i
) {
145 Pool
.async([this, &checked_in
] {
146 this->waitForMainThread();
150 ASSERT_EQ(0, checked_in
);
151 this->setMainThreadReady();
153 ASSERT_EQ(5, checked_in
);
156 static void TestFunc(std::atomic_int
&checked_in
, int i
) { checked_in
+= i
; }
158 TYPED_TEST(ThreadPoolTest
, AsyncBarrierArgs
) {
160 // Test that async works with a function requiring multiple parameters.
161 std::atomic_int checked_in
{0};
163 DefaultThreadPool Pool
;
164 for (size_t i
= 0; i
< 5; ++i
) {
165 Pool
.async(TestFunc
, std::ref(checked_in
), i
);
168 ASSERT_EQ(10, checked_in
);
171 TYPED_TEST(ThreadPoolTest
, Async
) {
173 DefaultThreadPool Pool
;
174 std::atomic_int i
{0};
175 Pool
.async([this, &i
] {
176 this->waitForMainThread();
179 Pool
.async([&i
] { ++i
; });
180 ASSERT_NE(2, i
.load());
181 this->setMainThreadReady();
183 ASSERT_EQ(2, i
.load());
186 TYPED_TEST(ThreadPoolTest
, GetFuture
) {
188 DefaultThreadPool
Pool(hardware_concurrency(2));
189 std::atomic_int i
{0};
190 Pool
.async([this, &i
] {
191 this->waitForMainThread();
194 // Force the future using get()
195 Pool
.async([&i
] { ++i
; }).get();
196 ASSERT_NE(2, i
.load());
197 this->setMainThreadReady();
199 ASSERT_EQ(2, i
.load());
202 TYPED_TEST(ThreadPoolTest
, GetFutureWithResult
) {
204 DefaultThreadPool
Pool(hardware_concurrency(2));
205 auto F1
= Pool
.async([] { return 1; });
206 auto F2
= Pool
.async([] { return 2; });
208 this->setMainThreadReady();
210 ASSERT_EQ(1, F1
.get());
211 ASSERT_EQ(2, F2
.get());
214 TYPED_TEST(ThreadPoolTest
, GetFutureWithResultAndArgs
) {
216 DefaultThreadPool
Pool(hardware_concurrency(2));
217 auto Fn
= [](int x
) { return x
; };
218 auto F1
= Pool
.async(Fn
, 1);
219 auto F2
= Pool
.async(Fn
, 2);
221 this->setMainThreadReady();
223 ASSERT_EQ(1, F1
.get());
224 ASSERT_EQ(2, F2
.get());
227 TYPED_TEST(ThreadPoolTest
, PoolDestruction
) {
229 // Test that we are waiting on destruction
230 std::atomic_int checked_in
{0};
232 DefaultThreadPool Pool
;
233 for (size_t i
= 0; i
< 5; ++i
) {
234 Pool
.async([this, &checked_in
] {
235 this->waitForMainThread();
239 ASSERT_EQ(0, checked_in
);
240 this->setMainThreadReady();
242 ASSERT_EQ(5, checked_in
);
245 // Check running tasks in different groups.
246 TYPED_TEST(ThreadPoolTest
, Groups
) {
248 // Need at least two threads, as the task in group2
249 // might block a thread until all tasks in group1 finish.
250 ThreadPoolStrategy S
= hardware_concurrency(2);
251 if (S
.compute_thread_count() < 2)
253 DefaultThreadPool
Pool(S
);
254 typename
TestFixture::PhaseResetHelper
Helper(this);
255 ThreadPoolTaskGroup
Group1(Pool
);
256 ThreadPoolTaskGroup
Group2(Pool
);
258 // Check that waiting for an empty group is a no-op.
261 std::atomic_int checked_in1
{0};
262 std::atomic_int checked_in2
{0};
264 for (size_t i
= 0; i
< 5; ++i
) {
265 Group1
.async([this, &checked_in1
] {
266 this->waitForMainThread();
270 Group2
.async([this, &checked_in2
] {
271 this->waitForPhase(2);
274 ASSERT_EQ(0, checked_in1
);
275 ASSERT_EQ(0, checked_in2
);
276 // Start first group and wait for it.
277 this->setMainThreadReady();
279 ASSERT_EQ(5, checked_in1
);
280 // Second group has not yet finished, start it and wait for it.
281 ASSERT_EQ(0, checked_in2
);
284 ASSERT_EQ(5, checked_in1
);
285 ASSERT_EQ(1, checked_in2
);
288 // Check recursive tasks.
289 TYPED_TEST(ThreadPoolTest
, RecursiveGroups
) {
291 DefaultThreadPool Pool
;
292 ThreadPoolTaskGroup
Group(Pool
);
294 std::atomic_int checked_in1
{0};
296 for (size_t i
= 0; i
< 5; ++i
) {
297 Group
.async([this, &Pool
, &checked_in1
] {
298 this->waitForMainThread();
300 ThreadPoolTaskGroup
LocalGroup(Pool
);
302 // Check that waiting for an empty group is a no-op.
305 std::atomic_int checked_in2
{0};
306 for (size_t i
= 0; i
< 5; ++i
) {
307 LocalGroup
.async([&checked_in2
] { ++checked_in2
; });
310 ASSERT_EQ(5, checked_in2
);
315 ASSERT_EQ(0, checked_in1
);
316 this->setMainThreadReady();
318 ASSERT_EQ(5, checked_in1
);
321 TYPED_TEST(ThreadPoolTest
, RecursiveWaitDeadlock
) {
323 ThreadPoolStrategy S
= hardware_concurrency(2);
324 if (S
.compute_thread_count() < 2)
326 DefaultThreadPool
Pool(S
);
327 typename
TestFixture::PhaseResetHelper
Helper(this);
328 ThreadPoolTaskGroup
Group(Pool
);
330 // Test that a thread calling wait() for a group and is waiting for more tasks
331 // returns when the last task finishes in a different thread while the waiting
332 // thread was waiting for more tasks to process while waiting.
334 // Task A runs in the first thread. It finishes and leaves
335 // the background thread waiting for more tasks.
337 this->waitForMainThread();
340 // Task B is run in a second thread, it launches yet another
341 // task C in a different group, which will be handled by the waiting
342 // thread started above.
343 Group
.async([this, &Pool
] {
344 this->waitForPhase(2);
345 ThreadPoolTaskGroup
LocalGroup(Pool
);
346 LocalGroup
.async([this] {
347 this->waitForPhase(3);
348 // Give the other thread enough time to check that there's no task
349 // to process and suspend waiting for a notification. This is indeed racy,
350 // but probably the best that can be done.
351 std::this_thread::sleep_for(std::chrono::milliseconds(10));
353 // And task B only now will wait for the tasks in the group (=task C)
354 // to finish. This test checks that it does not deadlock. If the
355 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
356 // this task B would be stuck waiting for tasks to arrive.
360 this->setMainThreadReady();
364 #if LLVM_ENABLE_THREADS == 1
366 // FIXME: Skip some tests below on non-Windows because multi-socket systems
367 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
368 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
371 template <typename ThreadPoolImpl
>
372 SmallVector
<llvm::BitVector
, 0>
373 ThreadPoolTest
<ThreadPoolImpl
>::RunOnAllSockets(ThreadPoolStrategy S
) {
374 llvm::SetVector
<llvm::BitVector
> ThreadsUsed
;
377 std::condition_variable AllThreads
;
378 std::mutex AllThreadsLock
;
381 DefaultThreadPool
Pool(S
);
382 for (size_t I
= 0; I
< S
.compute_thread_count(); ++I
) {
385 std::lock_guard
<std::mutex
> Guard(AllThreadsLock
);
387 AllThreads
.notify_one();
389 this->waitForMainThread();
390 std::lock_guard
<std::mutex
> Guard(Lock
);
391 auto Mask
= llvm::get_thread_affinity_mask();
392 ThreadsUsed
.insert(Mask
);
395 EXPECT_EQ(true, ThreadsUsed
.empty());
397 std::unique_lock
<std::mutex
> Guard(AllThreadsLock
);
398 AllThreads
.wait(Guard
,
399 [&]() { return Active
== S
.compute_thread_count(); });
401 this->setMainThreadReady();
403 return ThreadsUsed
.takeVector();
406 TYPED_TEST(ThreadPoolTest
, AllThreads_UseAllRessources
) {
408 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
409 // We cannot relibly ensure that all thread affinity mask are covered,
410 // therefore this test should not run.
411 if (llvm::RunningWindows11OrGreater())
413 auto ThreadsUsed
= this->RunOnAllSockets({});
414 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed
.size());
417 TYPED_TEST(ThreadPoolTest
, AllThreads_OneThreadPerCore
) {
419 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
420 // We cannot relibly ensure that all thread affinity mask are covered,
421 // therefore this test should not run.
422 if (llvm::RunningWindows11OrGreater())
425 this->RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
426 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed
.size());
429 // From TestMain.cpp.
430 extern const char *TestMainArgv0
;
432 // Just a reachable symbol to ease resolving of the executable's path.
433 static cl::opt
<std::string
> ThreadPoolTestStringArg1("thread-pool-string-arg1");
436 #define setenv(name, var, ignore) _putenv_s(name, var)
439 TYPED_TEST(ThreadPoolTest
, AffinityMask
) {
442 // Skip this test if less than 4 threads are available.
443 if (llvm::hardware_concurrency().compute_thread_count() < 4)
446 using namespace llvm::sys
;
447 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
448 auto ThreadsUsed
= this->RunOnAllSockets({});
449 // Ensure the threads only ran on CPUs 0-3.
450 // NOTE: Don't use ASSERT* here because this runs in a subprocess,
451 // and will show up as un-executed in the parent.
452 assert(llvm::all_of(ThreadsUsed
,
453 [](auto &T
) { return T
.getData().front() < 16UL; }) &&
454 "Threads ran on more CPUs than expected! The affinity mask does not "
458 std::string Executable
=
459 sys::fs::getMainExecutable(TestMainArgv0
, &ThreadPoolTestStringArg1
);
460 StringRef argv
[] = {Executable
, "--gtest_filter=ThreadPoolTest.AffinityMask"};
462 // Add environment variable to the environment of the child process.
463 int Res
= setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
467 bool ExecutionFailed
;
470 Affinity
.set(0, 4); // Use CPUs 0,1,2,3.
471 int Ret
= sys::ExecuteAndWait(Executable
, argv
, {}, {}, 0, 0, &Error
,
472 &ExecutionFailed
, nullptr, &Affinity
);
476 #endif // #ifdef _WIN32
477 #endif // #if LLVM_ENABLE_THREADS == 1