2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
28 #include <boost/atomic.hpp>
31 #ifdef DEBUG_DSP_THREADS
35 #include <boost/lockfree/stack.hpp>
37 #include "nova-tt/semaphore.hpp"
39 #include "utilities/branch_hints.hpp"
40 #include "utilities/utils.hpp"
44 template <typename runnable
, typename Alloc
>
45 class dsp_queue_interpreter
;
50 runnable(const & runnable);
52 operator()(uint threadindex);
56 /** item of a dsp thread queue
58 * \tparam Alloc allocator for successor list
60 * \todo operator new doesn't support stateful allocators
62 template <typename runnable
,
63 typename Alloc
= std::allocator
<void*> >
64 class dsp_thread_queue_item
:
67 typedef nova::dsp_queue_interpreter
<runnable
, Alloc
> dsp_queue_interpreter
;
69 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other new_allocator
;
72 typedef std::uint_fast16_t activation_limit_t
;
80 dsp_thread_queue_item
* content
[0];
83 typedef typename
Alloc::template rebind
<data_t
>::other array_allocator
;
86 explicit successor_list(uint32_t size
= 0)
88 data
= array_allocator().allocate(2*sizeof(uint32_t) + size
* sizeof(dsp_thread_queue_item
*));
93 successor_list(successor_list
const & rhs
):
99 successor_list
& operator=(successor_list
const & rhs
)
101 if (--data
->count
== 0)
102 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
109 std::size_t size(void) const
114 bool empty(void) const
119 dsp_thread_queue_item
*& operator[](std::size_t index
)
121 assert (index
< size());
122 return data
->content
[index
];
125 dsp_thread_queue_item
* const& operator[](std::size_t index
) const
127 assert (index
< size());
128 return data
->content
[index
];
131 ~successor_list(void)
133 if (--data
->count
== 0)
134 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
140 dsp_thread_queue_item(runnable
const & job
, successor_list
const & successors
,
141 activation_limit_t activation_limit
):
142 activation_count(0), job(job
), successors(successors
), activation_limit(activation_limit
)
145 dsp_thread_queue_item
* run(dsp_queue_interpreter
& interpreter
, boost::uint8_t thread_index
)
147 assert(activation_count
== 0);
151 dsp_thread_queue_item
* next
= update_dependencies(interpreter
);
152 reset_activation_count();
156 /** called from the run method or once, when dsp queue is initialized */
157 void reset_activation_count(void)
159 assert(activation_count
== 0);
160 activation_count
.store(activation_limit
, boost::memory_order_release
);
163 runnable
const & get_job(void) const
168 runnable
& get_job(void)
173 #ifdef DEBUG_DSP_THREADS
177 printf("\titem %p\n", this);
178 printf("\tactivation limit %d\n", int(activation_limit
));
180 if (!successors
.empty()) {
181 printf("\tsuccessors:\n");
182 for(dsp_thread_queue_item
* item
: successors
) {
183 printf("\t\t%p\n", item
);
191 /** \brief update all successors and possibly mark them as runnable */
192 dsp_thread_queue_item
* update_dependencies(dsp_queue_interpreter
& interpreter
)
194 dsp_thread_queue_item
* ptr
;
197 if (i
== successors
.size())
200 ptr
= successors
[i
++]->dec_activation_count(interpreter
);
202 break; // no need to update the next item to run
205 while (i
!= successors
.size()) {
206 dsp_thread_queue_item
* next
= successors
[i
++]->dec_activation_count(interpreter
);
208 interpreter
.mark_as_runnable(next
);
214 /** \brief decrement activation count and return this, if it drops to zero
216 inline dsp_thread_queue_item
* dec_activation_count(dsp_queue_interpreter
& interpreter
)
218 activation_limit_t current
= activation_count
--;
227 boost::atomic
<activation_limit_t
> activation_count
; /**< current activation count */
230 const successor_list successors
; /**< list of successing nodes */
231 const activation_limit_t activation_limit
; /**< number of precedessors */
234 template <typename runnable
, typename Alloc
= std::allocator
<void*> >
235 class dsp_thread_queue
237 typedef std::uint_fast16_t node_count_t
;
239 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
240 typedef std::vector
<dsp_thread_queue_item
*,
241 typename
Alloc::template rebind
<dsp_thread_queue_item
*>::other
244 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other item_allocator
;
247 #ifdef DEBUG_DSP_THREADS
248 void dump_queue(void)
252 printf("queue %p\n items:\n", this);
253 BOOST_FOREACH(dsp_thread_queue_item
* item
, queue_items
)
255 printf("\ninitial items:\n", this);
256 BOOST_FOREACH(dsp_thread_queue_item
* item
, initially_runnable_items
)
260 std::cout
<< std::endl
;
264 /** preallocate node_count nodes */
265 dsp_thread_queue(std::size_t node_count
, bool has_parallelism
= true):
266 total_node_count(0), has_parallelism_(has_parallelism
)
268 initially_runnable_items
.reserve(node_count
);
269 queue_items
= item_allocator().allocate(node_count
* sizeof(dsp_thread_queue_item
));
272 ~dsp_thread_queue(void)
274 for (std::size_t i
= 0; i
!= total_node_count
; ++i
)
275 queue_items
[i
].~dsp_thread_queue_item();
276 item_allocator().deallocate(queue_items
, total_node_count
* sizeof(dsp_thread_queue_item
));
279 void add_initially_runnable(dsp_thread_queue_item
* item
)
281 initially_runnable_items
.push_back(item
);
284 /** return initialized queue item */
285 dsp_thread_queue_item
*
286 allocate_queue_item(runnable
const & job
,
287 typename
dsp_thread_queue_item::successor_list
const & successors
,
288 typename
dsp_thread_queue_item::activation_limit_t activation_limit
)
290 dsp_thread_queue_item
* ret
= queue_items
+ total_node_count
;
293 assert (total_node_count
<= initially_runnable_items
.capacity());
294 new (ret
) dsp_thread_queue_item(job
, successors
, activation_limit
);
298 void reset_activation_counts(void)
300 for (node_count_t i
= 0; i
!= total_node_count
; ++i
)
301 queue_items
[i
].reset_activation_count();
304 node_count_t
get_total_node_count(void) const
306 return total_node_count
;
309 bool has_parallelism(void) const
311 return has_parallelism_
;
315 node_count_t total_node_count
; /* total number of nodes */
316 item_vector_t initially_runnable_items
; /* nodes without precedessor */
317 dsp_thread_queue_item
* queue_items
; /* all nodes */
318 const bool has_parallelism_
;
320 friend class dsp_queue_interpreter
<runnable
, Alloc
>;
323 template <typename runnable
,
324 typename Alloc
= std::allocator
<void*> >
325 class dsp_queue_interpreter
328 typedef nova::dsp_thread_queue
<runnable
, Alloc
> dsp_thread_queue
;
329 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
330 typedef typename
dsp_thread_queue_item::successor_list successor_list
;
331 typedef std::size_t size_t;
334 typedef boost::uint_fast8_t thread_count_t
;
335 typedef boost::uint_fast16_t node_count_t
;
337 typedef std::unique_ptr
<dsp_thread_queue
> dsp_thread_queue_ptr
;
339 dsp_queue_interpreter(thread_count_t tc
):
342 if (!runnable_set
.is_lock_free())
343 std::cout
<< "Warning: scheduler queue is not lockfree!" << std::endl
;
345 set_thread_count(tc
);
348 /** prepares queue and queue interpreter for dsp tick
350 * \return true, if dsp queue is valid
351 * false, if no dsp queue is available or queue is empty
355 if (unlikely((queue
.get() == NULL
) or /* no queue */
356 (queue
->get_total_node_count() == 0) /* no nodes */
360 /* reset node count */
361 assert(node_count
== 0);
362 assert(runnable_set
.empty());
363 node_count
.store(queue
->get_total_node_count(), boost::memory_order_release
);
365 for (size_t i
= 0; i
!= queue
->initially_runnable_items
.size(); ++i
)
366 mark_as_runnable(queue
->initially_runnable_items
[i
]);
371 dsp_thread_queue_ptr
release_queue(void)
373 dsp_thread_queue_ptr
ret(queue
.release());
377 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
&& new_queue
)
379 dsp_thread_queue_ptr
ret(std::move(queue
));
381 queue
= std::move(new_queue
);
382 if (queue
.get() == 0)
385 queue
->reset_activation_counts();
387 #ifdef DEBUG_DSP_THREADS
391 if (queue
->has_parallelism()) {
392 thread_count_t thread_number
=
393 std::min(thread_count_t(std::min(total_node_count(),
394 node_count_t(std::numeric_limits
<thread_count_t
>::max()))),
397 used_helper_threads
= thread_number
- 1; /* this thread is not waked up */
399 used_helper_threads
= 0;
403 node_count_t
total_node_count(void) const
405 return queue
->get_total_node_count();
408 void set_thread_count(thread_count_t i
)
410 assert (i
< std::numeric_limits
<thread_count_t
>::max());
411 i
= std::max(thread_count_t(1u), i
);
415 thread_count_t
get_thread_count(void) const
420 thread_count_t
get_used_helper_threads(void) const
422 return used_helper_threads
;
425 void tick(thread_count_t thread_index
)
427 run_item(thread_index
);
434 backup(int min
, int max
): min(min
), max(max
), loops(min
) {}
438 for (int i
= 0; i
!= loops
; ++i
)
439 asm(""); // empty asm to avoid optimization
441 loops
= std::min(loops
* 2, max
);
452 void run_item(thread_count_t index
)
454 backup
b(256, 32768);
458 if (!node_count
.load(boost::memory_order_acquire
))
461 /* we still have some nodes to process */
462 int state
= run_next_item(index
);
464 case no_remaining_items
:
471 case remaining_items
:
476 if (poll_counts
== 50000) {
477 // the maximum poll count is system-dependent. 50000 should be high enough for recent machines
478 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
485 void tick_master(void)
491 void run_item_master(void)
495 assert(runnable_set
.empty());
498 void wait_for_end(void)
501 while (node_count
.load(boost::memory_order_acquire
) != 0) {
503 if (count
== 1000000) {
504 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
507 } // busy-wait for helper threads to finish
510 HOT
int run_next_item(thread_count_t index
)
512 dsp_thread_queue_item
* item
;
513 bool success
= runnable_set
.pop(item
);
518 node_count_t consumed
= 0;
521 item
= item
->run(*this, index
);
523 } while (item
!= NULL
);
525 node_count_t remaining
= node_count
.fetch_sub(consumed
, boost::memory_order_release
);
527 assert (remaining
>= consumed
);
529 if (remaining
== consumed
)
530 return no_remaining_items
;
532 return remaining_items
;
535 void mark_as_runnable(dsp_thread_queue_item
* item
)
537 runnable_set
.push(item
);
540 friend class nova::dsp_thread_queue_item
<runnable
, Alloc
>;
549 dsp_thread_queue_ptr queue
;
551 thread_count_t thread_count
; /* number of dsp threads to be used by this queue */
552 thread_count_t used_helper_threads
; /* number of helper threads, which are actually used */
554 boost::lockfree::stack
<dsp_thread_queue_item
*, boost::lockfree::capacity
<32768> > runnable_set
;
555 boost::atomic
<node_count_t
> node_count
; /* number of nodes, that need to be processed during this tick */
558 } /* namespace nova */
560 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP */