1 #include "ThreadedWorkQueue.h"
7 class ThreadedWorker
: public Threaded
9 ThreadedWorkQueue
*_twq
;
11 virtual void *ThreadProc()
13 _twq
->WorkerThreadProc();
18 ThreadedWorker(ThreadedWorkQueue
*twq
) : _twq(twq
)
21 throw std::runtime_error("StartThread failed");
25 virtual ~ThreadedWorker()
31 ThreadedWorkQueue::OrderedItemsDestroyer::~OrderedItemsDestroyer()
33 for (auto *twi
: *this) {
38 ThreadedWorkQueue::ThreadedWorkQueue(size_t threads_count
)
40 _threads_count(threads_count
? threads_count
: BestThreadsCount())
44 ThreadedWorkQueue::~ThreadedWorkQueue()
46 const size_t workers_count
= _workers
.size();
47 if (workers_count
!= 0) {
48 std::unique_lock
<std::mutex
> lock(_mtx
);
52 _workers
.clear(); // this also joins them
53 ASSERT(_working
== 0);
56 "%s: threads=%lu/%lu done=%lu finalized=%lu unprocessed_backlog=%lu unprocessed_done=%lu _backlog_waits=%lu\n",
57 __FUNCTION__
, (unsigned long)workers_count
, (unsigned long)_threads_count
,
58 (unsigned long)_done_counter
, (unsigned long)_finalized_counter
,
59 (unsigned long)_backlog
.size(), (unsigned long)_done
.size(), (unsigned long)_backlog_waits
);
61 for (const auto &it
: _done
) {
64 for (auto *twi
: _backlog
) {
69 // invoked by ThreadedWorker from its thread routine
70 void ThreadedWorkQueue::WorkerThreadProc()
73 IThreadedWorkItem
*twi
= nullptr;
78 } catch (std::exception
&e
) {
79 fprintf(stderr
, "%s/WorkProc: %s", __FUNCTION__
, e
.what());
82 std::unique_lock
<std::mutex
> lock(_mtx
);
84 _done
.emplace(seq
, twi
);
86 if (_notify_on_done
) {
87 _notify_on_done
= false;
92 } catch (std::exception
&e
) { // OOM? retry in one second til memory will appear
93 fprintf(stderr
, "%s: %s", __FUNCTION__
, e
.what());
101 if (_backlog
.empty()) {
105 seq
= ++_done_counter
;
106 twi
= _backlog
.front();
107 _backlog
.pop_front();
113 void ThreadedWorkQueue::Queue(IThreadedWorkItem
*twi
, size_t backlog_limit
)
115 if (backlog_limit
== (size_t)-1) {
116 backlog_limit
= 2 * _threads_count
;
119 OrderedItemsDestroyer oid
;
122 std::unique_lock
<std::mutex
> lock(_mtx
);
123 if (_workers
.empty() || (!_backlog
.empty() && _workers
.size() < _threads_count
)) {
125 _workers
.emplace_back(this);
126 } catch (std::exception
&e
) {
127 fprintf(stderr
, "%s/WORKER: %s\n", __FUNCTION__
, e
.what());
130 if (!_workers
.empty()) {
131 _backlog
.emplace_back(twi
);
133 if (_backlog
.size() == 1) {
138 if (_backlog
.size() > backlog_limit
) {
141 _notify_on_done
= true;
143 } while (_backlog
.size() > backlog_limit
);
146 FetchOrderedDoneItems(oid
);
150 // no workers? fallback to synchronous processing
153 } catch (std::exception
&e
) {
154 fprintf(stderr
, "%s/WorkProc: %s", __FUNCTION__
, e
.what());
160 void ThreadedWorkQueue::Finalize()
162 OrderedItemsDestroyer oid
;
163 std::unique_lock
<std::mutex
> lock(_mtx
);
165 if (_backlog
.empty() && _working
== 0) {
166 FetchOrderedDoneItems(oid
);
169 _notify_on_done
= true;
174 // must be invoked under _mtx lock held
175 void ThreadedWorkQueue::FetchOrderedDoneItems(OrderedItemsDestroyer
&oid
)
177 while (!_done
.empty() && _done
.begin()->first
== _finalized_counter
+ 1) {
178 ++_finalized_counter
;
179 oid
.emplace_back(_done
.begin()->second
);
180 _done
.erase(_done
.begin());
184 ////////////////////////////////////////
186 ThreadedWorkQueuePtrScope::ThreadedWorkQueuePtrScope(std::unique_ptr
<ThreadedWorkQueue
> &pWQ
)
191 _pWQ
.reset(new ThreadedWorkQueue
);
195 ThreadedWorkQueuePtrScope::~ThreadedWorkQueuePtrScope()
199 } catch (std::exception
&e
) {
200 fprintf(stderr
, "%s: %s\n", __FUNCTION__
, e
.what());