Run DCE after a LoopFlatten test to reduce spurious output [nfc]
[llvm-project.git] / llvm / unittests / Support / ThreadPool.cpp
blobcce20b6dd1dfb57f6c14550c2b882b3f9e35426a
1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
9 #include "llvm/Support/ThreadPool.h"
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/Support/CommandLine.h"
15 #include "llvm/Support/Program.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 #include "llvm/TargetParser/Host.h"
19 #include "llvm/TargetParser/Triple.h"
21 #ifdef _WIN32
22 #include "llvm/Support/Windows/WindowsSupport.h"
23 #endif
25 #include <chrono>
26 #include <thread>
28 #include "gtest/gtest.h"
30 using namespace llvm;
32 // Fixture for the unittests, allowing to *temporarily* disable the unittests
33 // on a particular platform
34 class ThreadPoolTest : public testing::Test {
35 Triple Host;
36 SmallVector<Triple::ArchType, 4> UnsupportedArchs;
37 SmallVector<Triple::OSType, 4> UnsupportedOSs;
38 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
40 protected:
41 // This is intended for platform as a temporary "XFAIL"
42 bool isUnsupportedOSOrEnvironment() {
43 Triple Host(Triple::normalize(sys::getProcessTriple()));
45 if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
46 UnsupportedEnvironments.end())
47 return true;
49 if (is_contained(UnsupportedOSs, Host.getOS()))
50 return true;
52 if (is_contained(UnsupportedArchs, Host.getArch()))
53 return true;
55 return false;
58 ThreadPoolTest() {
59 // Add unsupported configuration here, example:
60 // UnsupportedArchs.push_back(Triple::x86_64);
62 // See https://llvm.org/bugs/show_bug.cgi?id=25829
63 UnsupportedArchs.push_back(Triple::ppc64le);
64 UnsupportedArchs.push_back(Triple::ppc64);
67 /// Make sure this thread not progress faster than the main thread.
68 void waitForMainThread() { waitForPhase(1); }
70 /// Set the readiness of the main thread.
71 void setMainThreadReady() { setPhase(1); }
73 /// Wait until given phase is set using setPhase(); first "main" phase is 1.
74 /// See also PhaseResetHelper below.
75 void waitForPhase(int Phase) {
76 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
77 CurrentPhaseCondition.wait(
78 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
80 /// If a thread waits on another phase, the test could bail out on a failed
81 /// assertion and ThreadPool destructor would wait() on all threads, which
82 /// would deadlock on the task waiting. Create this helper to automatically
83 /// reset the phase and unblock such threads.
84 struct PhaseResetHelper {
85 PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
86 ~PhaseResetHelper() { test->setPhase(-1); }
87 ThreadPoolTest *test;
90 /// Advance to the given phase.
91 void setPhase(int Phase) {
93 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
94 assert(Phase == CurrentPhase + 1 || Phase < 0);
95 CurrentPhase = Phase;
97 CurrentPhaseCondition.notify_all();
100 void SetUp() override { CurrentPhase = 0; }
102 SmallVector<llvm::BitVector, 0> RunOnAllSockets(ThreadPoolStrategy S);
104 std::condition_variable CurrentPhaseCondition;
105 std::mutex CurrentPhaseMutex;
106 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
109 #define CHECK_UNSUPPORTED() \
110 do { \
111 if (isUnsupportedOSOrEnvironment()) \
112 GTEST_SKIP(); \
113 } while (0);
115 TEST_F(ThreadPoolTest, AsyncBarrier) {
116 CHECK_UNSUPPORTED();
117 // test that async & barrier work together properly.
119 std::atomic_int checked_in{0};
121 ThreadPool Pool;
122 for (size_t i = 0; i < 5; ++i) {
123 Pool.async([this, &checked_in] {
124 waitForMainThread();
125 ++checked_in;
128 ASSERT_EQ(0, checked_in);
129 setMainThreadReady();
130 Pool.wait();
131 ASSERT_EQ(5, checked_in);
134 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
136 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
137 CHECK_UNSUPPORTED();
138 // Test that async works with a function requiring multiple parameters.
139 std::atomic_int checked_in{0};
141 ThreadPool Pool;
142 for (size_t i = 0; i < 5; ++i) {
143 Pool.async(TestFunc, std::ref(checked_in), i);
145 Pool.wait();
146 ASSERT_EQ(10, checked_in);
149 TEST_F(ThreadPoolTest, Async) {
150 CHECK_UNSUPPORTED();
151 ThreadPool Pool;
152 std::atomic_int i{0};
153 Pool.async([this, &i] {
154 waitForMainThread();
155 ++i;
157 Pool.async([&i] { ++i; });
158 ASSERT_NE(2, i.load());
159 setMainThreadReady();
160 Pool.wait();
161 ASSERT_EQ(2, i.load());
164 TEST_F(ThreadPoolTest, GetFuture) {
165 CHECK_UNSUPPORTED();
166 ThreadPool Pool(hardware_concurrency(2));
167 std::atomic_int i{0};
168 Pool.async([this, &i] {
169 waitForMainThread();
170 ++i;
172 // Force the future using get()
173 Pool.async([&i] { ++i; }).get();
174 ASSERT_NE(2, i.load());
175 setMainThreadReady();
176 Pool.wait();
177 ASSERT_EQ(2, i.load());
180 TEST_F(ThreadPoolTest, GetFutureWithResult) {
181 CHECK_UNSUPPORTED();
182 ThreadPool Pool(hardware_concurrency(2));
183 auto F1 = Pool.async([] { return 1; });
184 auto F2 = Pool.async([] { return 2; });
186 setMainThreadReady();
187 Pool.wait();
188 ASSERT_EQ(1, F1.get());
189 ASSERT_EQ(2, F2.get());
192 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
193 CHECK_UNSUPPORTED();
194 ThreadPool Pool(hardware_concurrency(2));
195 auto Fn = [](int x) { return x; };
196 auto F1 = Pool.async(Fn, 1);
197 auto F2 = Pool.async(Fn, 2);
199 setMainThreadReady();
200 Pool.wait();
201 ASSERT_EQ(1, F1.get());
202 ASSERT_EQ(2, F2.get());
205 TEST_F(ThreadPoolTest, PoolDestruction) {
206 CHECK_UNSUPPORTED();
207 // Test that we are waiting on destruction
208 std::atomic_int checked_in{0};
210 ThreadPool Pool;
211 for (size_t i = 0; i < 5; ++i) {
212 Pool.async([this, &checked_in] {
213 waitForMainThread();
214 ++checked_in;
217 ASSERT_EQ(0, checked_in);
218 setMainThreadReady();
220 ASSERT_EQ(5, checked_in);
223 // Check running tasks in different groups.
224 TEST_F(ThreadPoolTest, Groups) {
225 CHECK_UNSUPPORTED();
226 // Need at least two threads, as the task in group2
227 // might block a thread until all tasks in group1 finish.
228 ThreadPoolStrategy S = hardware_concurrency(2);
229 if (S.compute_thread_count() < 2)
230 GTEST_SKIP();
231 ThreadPool Pool(S);
232 PhaseResetHelper Helper(this);
233 ThreadPoolTaskGroup Group1(Pool);
234 ThreadPoolTaskGroup Group2(Pool);
236 // Check that waiting for an empty group is a no-op.
237 Group1.wait();
239 std::atomic_int checked_in1{0};
240 std::atomic_int checked_in2{0};
242 for (size_t i = 0; i < 5; ++i) {
243 Group1.async([this, &checked_in1] {
244 waitForMainThread();
245 ++checked_in1;
248 Group2.async([this, &checked_in2] {
249 waitForPhase(2);
250 ++checked_in2;
252 ASSERT_EQ(0, checked_in1);
253 ASSERT_EQ(0, checked_in2);
254 // Start first group and wait for it.
255 setMainThreadReady();
256 Group1.wait();
257 ASSERT_EQ(5, checked_in1);
258 // Second group has not yet finished, start it and wait for it.
259 ASSERT_EQ(0, checked_in2);
260 setPhase(2);
261 Group2.wait();
262 ASSERT_EQ(5, checked_in1);
263 ASSERT_EQ(1, checked_in2);
266 // Check recursive tasks.
267 TEST_F(ThreadPoolTest, RecursiveGroups) {
268 CHECK_UNSUPPORTED();
269 ThreadPool Pool;
270 ThreadPoolTaskGroup Group(Pool);
272 std::atomic_int checked_in1{0};
274 for (size_t i = 0; i < 5; ++i) {
275 Group.async([this, &Pool, &checked_in1] {
276 waitForMainThread();
278 ThreadPoolTaskGroup LocalGroup(Pool);
280 // Check that waiting for an empty group is a no-op.
281 LocalGroup.wait();
283 std::atomic_int checked_in2{0};
284 for (size_t i = 0; i < 5; ++i) {
285 LocalGroup.async([&checked_in2] { ++checked_in2; });
287 LocalGroup.wait();
288 ASSERT_EQ(5, checked_in2);
290 ++checked_in1;
293 ASSERT_EQ(0, checked_in1);
294 setMainThreadReady();
295 Group.wait();
296 ASSERT_EQ(5, checked_in1);
299 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
300 CHECK_UNSUPPORTED();
301 ThreadPoolStrategy S = hardware_concurrency(2);
302 if (S.compute_thread_count() < 2)
303 GTEST_SKIP();
304 ThreadPool Pool(S);
305 PhaseResetHelper Helper(this);
306 ThreadPoolTaskGroup Group(Pool);
308 // Test that a thread calling wait() for a group and is waiting for more tasks
309 // returns when the last task finishes in a different thread while the waiting
310 // thread was waiting for more tasks to process while waiting.
312 // Task A runs in the first thread. It finishes and leaves
313 // the background thread waiting for more tasks.
314 Group.async([this] {
315 waitForMainThread();
316 setPhase(2);
318 // Task B is run in a second thread, it launches yet another
319 // task C in a different group, which will be handled by the waiting
320 // thread started above.
321 Group.async([this, &Pool] {
322 waitForPhase(2);
323 ThreadPoolTaskGroup LocalGroup(Pool);
324 LocalGroup.async([this] {
325 waitForPhase(3);
326 // Give the other thread enough time to check that there's no task
327 // to process and suspend waiting for a notification. This is indeed racy,
328 // but probably the best that can be done.
329 std::this_thread::sleep_for(std::chrono::milliseconds(10));
331 // And task B only now will wait for the tasks in the group (=task C)
332 // to finish. This test checks that it does not deadlock. If the
333 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
334 // this task B would be stuck waiting for tasks to arrive.
335 setPhase(3);
336 LocalGroup.wait();
338 setMainThreadReady();
339 Group.wait();
342 #if LLVM_ENABLE_THREADS == 1
344 // FIXME: Skip some tests below on non-Windows because multi-socket systems
345 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
346 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
347 #ifdef _WIN32
349 SmallVector<llvm::BitVector, 0>
350 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
351 llvm::SetVector<llvm::BitVector> ThreadsUsed;
352 std::mutex Lock;
354 std::condition_variable AllThreads;
355 std::mutex AllThreadsLock;
356 unsigned Active = 0;
358 ThreadPool Pool(S);
359 for (size_t I = 0; I < S.compute_thread_count(); ++I) {
360 Pool.async([&] {
362 std::lock_guard<std::mutex> Guard(AllThreadsLock);
363 ++Active;
364 AllThreads.notify_one();
366 waitForMainThread();
367 std::lock_guard<std::mutex> Guard(Lock);
368 auto Mask = llvm::get_thread_affinity_mask();
369 ThreadsUsed.insert(Mask);
372 EXPECT_EQ(true, ThreadsUsed.empty());
374 std::unique_lock<std::mutex> Guard(AllThreadsLock);
375 AllThreads.wait(Guard,
376 [&]() { return Active == S.compute_thread_count(); });
378 setMainThreadReady();
380 return ThreadsUsed.takeVector();
383 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
384 CHECK_UNSUPPORTED();
385 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
386 // We cannot relibly ensure that all thread affinity mask are covered,
387 // therefore this test should not run.
388 if (llvm::RunningWindows11OrGreater())
389 GTEST_SKIP();
390 auto ThreadsUsed = RunOnAllSockets({});
391 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
394 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
395 CHECK_UNSUPPORTED();
396 // After Windows 11, the OS is free to deploy the threads on any CPU socket.
397 // We cannot relibly ensure that all thread affinity mask are covered,
398 // therefore this test should not run.
399 if (llvm::RunningWindows11OrGreater())
400 GTEST_SKIP();
401 auto ThreadsUsed = RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
402 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
405 // From TestMain.cpp.
406 extern const char *TestMainArgv0;
408 // Just a reachable symbol to ease resolving of the executable's path.
409 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
411 #ifdef _WIN32
412 #define setenv(name, var, ignore) _putenv_s(name, var)
413 #endif
415 TEST_F(ThreadPoolTest, AffinityMask) {
416 CHECK_UNSUPPORTED();
418 // Skip this test if less than 4 threads are available.
419 if (llvm::hardware_concurrency().compute_thread_count() < 4)
420 GTEST_SKIP();
422 using namespace llvm::sys;
423 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
424 auto ThreadsUsed = RunOnAllSockets({});
425 // Ensure the threads only ran on CPUs 0-3.
426 // NOTE: Don't use ASSERT* here because this runs in a subprocess,
427 // and will show up as un-executed in the parent.
428 assert(llvm::all_of(ThreadsUsed,
429 [](auto &T) { return T.getData().front() < 16UL; }) &&
430 "Threads ran on more CPUs than expected! The affinity mask does not "
431 "seem to work.");
432 GTEST_SKIP();
434 std::string Executable =
435 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
436 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
438 // Add environment variable to the environment of the child process.
439 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
440 ASSERT_EQ(Res, 0);
442 std::string Error;
443 bool ExecutionFailed;
444 BitVector Affinity;
445 Affinity.resize(4);
446 Affinity.set(0, 4); // Use CPUs 0,1,2,3.
447 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
448 &ExecutionFailed, nullptr, &Affinity);
449 ASSERT_EQ(0, Ret);
452 #endif // #ifdef _WIN32
453 #endif // #if LLVM_ENABLE_THREADS == 1