Merge branch 'master' into scmaster
[nova-tt.git] / nova-tt / thread_pool.hpp
blobadc5b33315cf70c28e558c0f66c1253dd182a35d
1 // c++11-style thread pool
2 // Copyright (C) 2013 Tim Blechmann
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef THREAD_POOL_HPP
20 #define THREAD_POOL_HPP
22 #include <condition_variable>
23 #include <deque>
24 #include <future>
25 #include <mutex>
26 #include <thread>
27 #include <vector>
29 namespace nova {
30 namespace detail {
32 struct task_base
34 virtual void run() = 0;
37 template <typename T>
38 struct task:
39 task_base
41 typedef std::packaged_task<T()> work;
43 explicit task (work && rhs):
44 packaged_task(std::move(rhs))
47 void run()
49 packaged_task();
52 std::packaged_task<T()> packaged_task;
55 // LATER: implement some work-stealing
56 class thread_pool
58 public:
59 thread_pool(size_t thread_count):
60 request_stop(false)
62 for (size_t i = 0; i != thread_count; ++i)
63 worker_threads.emplace_back(std::bind(&thread_pool::worker_thread, this));
66 ~thread_pool()
68 request_stop = true;
69 condition.notify_all();
71 for (;;) {
72 std::shared_ptr<task_base> work;
74 std::unique_lock<std::mutex> lock(queue_guard);
75 if (tasks.empty())
76 break;
78 work = std::move(tasks.front());
79 tasks.pop_front();
81 work->run();
84 for (auto & thread : worker_threads)
85 thread.join();
88 template<class Functor>
89 auto schedule(Functor f) -> std::future<decltype( f() )>
91 typedef decltype(f()) T;
93 if (worker_threads.empty())
94 return std::async(std::launch::async, f);
96 std::packaged_task<T()> job(f);
97 std::future<T> ret = job.get_future();
100 std::unique_lock<std::mutex> lock(queue_guard);
101 std::shared_ptr<task_base> task_ptr (new task<T>(std::move(job)));
102 tasks.emplace_back(task_ptr);
105 condition.notify_one();
106 return ret;
109 private:
110 void worker_thread()
112 for (;;) {
113 std::shared_ptr<task_base> work;
115 std::unique_lock<std::mutex> lock(queue_guard);
116 while(!request_stop && tasks.empty())
117 condition.wait(lock);
119 if (request_stop && tasks.empty())
120 return;
122 work = std::move(tasks.front());
123 tasks.pop_front();
125 work->run();
129 std::vector< std::thread > worker_threads;
130 std::deque< std::shared_ptr<task_base> > tasks;
132 std::mutex queue_guard;
133 std::condition_variable condition;
134 bool request_stop;
139 using detail::thread_pool;
143 #endif // THREAD_POOL_HPP