1 //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 // This file implements a crude C++11 based thread pool.
11 //===----------------------------------------------------------------------===//
13 #include "llvm/Support/ThreadPool.h"
15 #include "llvm/Config/llvm-config.h"
16 #include "llvm/Support/Threading.h"
17 #include "llvm/Support/raw_ostream.h"
21 #if LLVM_ENABLE_THREADS
23 ThreadPool::ThreadPool(ThreadPoolStrategy S
)
24 : ThreadCount(S
.compute_thread_count()) {
25 // Create ThreadCount threads that will loop forever, wait on QueueCondition
26 // for tasks to be queued or the Pool to be destroyed.
27 Threads
.reserve(ThreadCount
);
28 for (unsigned ThreadID
= 0; ThreadID
< ThreadCount
; ++ThreadID
) {
29 Threads
.emplace_back([S
, ThreadID
, this] {
30 S
.apply_thread_strategy(ThreadID
);
34 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
35 // Wait for tasks to be pushed in the queue
36 QueueCondition
.wait(LockGuard
,
37 [&] { return !EnableFlag
|| !Tasks
.empty(); });
39 if (!EnableFlag
&& Tasks
.empty())
41 // Yeah, we have a task, grab it and release the lock on the queue
43 // We first need to signal that we are active before popping the queue
44 // in order for wait() to properly detect that even if the queue is
45 // empty, there is still a task in flight.
47 Task
= std::move(Tasks
.front());
50 // Run the task we just grabbed
55 // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
56 std::lock_guard
<std::mutex
> LockGuard(QueueLock
);
58 Notify
= workCompletedUnlocked();
60 // Notify task completion if this is the last active thread, in case
61 // someone waits on ThreadPool::wait().
63 CompletionCondition
.notify_all();
69 void ThreadPool::wait() {
70 // Wait for all threads to complete and the queue to be empty
71 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
72 CompletionCondition
.wait(LockGuard
, [&] { return workCompletedUnlocked(); });
75 bool ThreadPool::isWorkerThread() const {
76 llvm::thread::id CurrentThreadId
= llvm::this_thread::get_id();
77 for (const llvm::thread
&Thread
: Threads
)
78 if (CurrentThreadId
== Thread
.get_id())
83 std::shared_future
<void> ThreadPool::asyncImpl(TaskTy Task
) {
84 /// Wrap the Task in a packaged_task to return a future object.
85 PackagedTaskTy
PackagedTask(std::move(Task
));
86 auto Future
= PackagedTask
.get_future();
88 // Lock the queue and push the new task
89 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
91 // Don't allow enqueueing after disabling the pool
92 assert(EnableFlag
&& "Queuing a thread during ThreadPool destruction");
94 Tasks
.push(std::move(PackagedTask
));
96 QueueCondition
.notify_one();
97 return Future
.share();
100 // The destructor joins all threads, waiting for completion.
101 ThreadPool::~ThreadPool() {
103 std::unique_lock
<std::mutex
> LockGuard(QueueLock
);
106 QueueCondition
.notify_all();
107 for (auto &Worker
: Threads
)
111 #else // LLVM_ENABLE_THREADS Disabled
113 // No threads are launched, issue a warning if ThreadCount is not 0
114 ThreadPool::ThreadPool(ThreadPoolStrategy S
)
115 : ThreadCount(S
.compute_thread_count()) {
116 if (ThreadCount
!= 1) {
117 errs() << "Warning: request a ThreadPool with " << ThreadCount
118 << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
122 void ThreadPool::wait() {
123 // Sequential implementation running the tasks
124 while (!Tasks
.empty()) {
125 auto Task
= std::move(Tasks
.front());
131 std::shared_future
<void> ThreadPool::asyncImpl(TaskTy Task
) {
132 // Get a Future with launch::deferred execution using std::async
133 auto Future
= std::async(std::launch::deferred
, std::move(Task
)).share();
134 // Wrap the future so that both ThreadPool::wait() can operate and the
135 // returned future can be sync'ed on.
136 PackagedTaskTy
PackagedTask([Future
]() { Future
.get(); });
137 Tasks
.push(std::move(PackagedTask
));
141 ThreadPool::~ThreadPool() { wait(); }