1 //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 // This file implements a crude C++11 based thread pool.
11 //===----------------------------------------------------------------------===//
13 #include "llvm/Support/ThreadPool.h"
15 #include "llvm/Config/llvm-config.h"
16 #include "llvm/Support/Threading.h"
17 #include "llvm/Support/raw_ostream.h"
21 #if LLVM_ENABLE_THREADS
23 // Default to hardware_concurrency
24 ThreadPool::ThreadPool() : ThreadPool(hardware_concurrency()) {}
26 ThreadPool::ThreadPool(unsigned ThreadCount
)
27 : ActiveThreads(0), EnableFlag(true) {
28 // Create ThreadCount threads that will loop forever, wait on QueueCondition
29 // for tasks to be queued or the Pool to be destroyed.
30 Threads
.reserve(ThreadCount
);
31 for (unsigned ThreadID
= 0; ThreadID
< ThreadCount
; ++ThreadID
) {
32 Threads
.emplace_back([&] {
36 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
37 // Wait for tasks to be pushed in the queue
38 QueueCondition
.wait(LockGuard
,
39 [&] { return !EnableFlag
|| !Tasks
.empty(); });
41 if (!EnableFlag
&& Tasks
.empty())
43 // Yeah, we have a task, grab it and release the lock on the queue
45 // We first need to signal that we are active before popping the queue
46 // in order for wait() to properly detect that even if the queue is
47 // empty, there is still a task in flight.
49 std::unique_lock
<std::mutex
> LockGuard(CompletionLock
);
52 Task
= std::move(Tasks
.front());
55 // Run the task we just grabbed
59 // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
60 std::unique_lock
<std::mutex
> LockGuard(CompletionLock
);
64 // Notify task completion, in case someone waits on ThreadPool::wait()
65 CompletionCondition
.notify_all();
71 void ThreadPool::wait() {
72 // Wait for all threads to complete and the queue to be empty
73 std::unique_lock
<std::mutex
> LockGuard(CompletionLock
);
74 // The order of the checks for ActiveThreads and Tasks.empty() matters because
75 // any active threads might be modifying the Tasks queue, and this would be a
77 CompletionCondition
.wait(LockGuard
,
78 [&] { return !ActiveThreads
&& Tasks
.empty(); });
81 std::shared_future
<void> ThreadPool::asyncImpl(TaskTy Task
) {
82 /// Wrap the Task in a packaged_task to return a future object.
83 PackagedTaskTy
PackagedTask(std::move(Task
));
84 auto Future
= PackagedTask
.get_future();
86 // Lock the queue and push the new task
87 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
89 // Don't allow enqueueing after disabling the pool
90 assert(EnableFlag
&& "Queuing a thread during ThreadPool destruction");
92 Tasks
.push(std::move(PackagedTask
));
94 QueueCondition
.notify_one();
95 return Future
.share();
98 // The destructor joins all threads, waiting for completion.
99 ThreadPool::~ThreadPool() {
101 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
104 QueueCondition
.notify_all();
105 for (auto &Worker
: Threads
)
109 #else // LLVM_ENABLE_THREADS Disabled
111 ThreadPool::ThreadPool() : ThreadPool(0) {}
113 // No threads are launched, issue a warning if ThreadCount is not 0
114 ThreadPool::ThreadPool(unsigned ThreadCount
)
117 errs() << "Warning: request a ThreadPool with " << ThreadCount
118 << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
122 void ThreadPool::wait() {
123 // Sequential implementation running the tasks
124 while (!Tasks
.empty()) {
125 auto Task
= std::move(Tasks
.front());
131 std::shared_future
<void> ThreadPool::asyncImpl(TaskTy Task
) {
132 // Get a Future with launch::deferred execution using std::async
133 auto Future
= std::async(std::launch::deferred
, std::move(Task
)).share();
134 // Wrap the future so that both ThreadPool::wait() can operate and the
135 // returned future can be sync'ed on.
136 PackagedTaskTy
PackagedTask([Future
]() { Future
.get(); });
137 Tasks
.push(std::move(PackagedTask
));
141 ThreadPool::~ThreadPool() {