2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
27 #include <boost/atomic.hpp>
28 #include <boost/cstdint.hpp>
29 #include <boost/thread.hpp>
32 #ifdef DEBUG_DSP_THREADS
33 #include <boost/foreach.hpp>
37 #include <boost/lockfree/stack.hpp>
39 #include "nova-tt/semaphore.hpp"
41 #include "utilities/branch_hints.hpp"
42 #include "utilities/utils.hpp"
47 template <typename runnable
, typename Alloc
>
48 class dsp_queue_interpreter
;
53 runnable(const & runnable);
55 operator()(uint threadindex);
59 /** item of a dsp thread queue
61 * \tparam Alloc allocator for successor list
63 * \todo operator new doesn't support stateful allocators
65 template <typename runnable
,
66 typename Alloc
= std::allocator
<void*> >
67 class dsp_thread_queue_item
:
70 typedef nova::dsp_queue_interpreter
<runnable
, Alloc
> dsp_queue_interpreter
;
72 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other new_allocator
;
75 typedef boost::uint_fast16_t activation_limit_t
;
83 dsp_thread_queue_item
* content
[0];
86 typedef typename
Alloc::template rebind
<data_t
>::other array_allocator
;
89 explicit successor_list(uint32_t size
= 0)
91 data
= array_allocator().allocate(2*sizeof(uint32_t) + size
* sizeof(dsp_thread_queue_item
*));
96 successor_list(successor_list
const & rhs
):
102 successor_list
& operator=(successor_list
const & rhs
)
104 if (--data
->count
== 0)
105 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
112 std::size_t size(void) const
117 bool empty(void) const
122 dsp_thread_queue_item
*& operator[](std::size_t index
)
124 assert (index
< size());
125 return data
->content
[index
];
128 dsp_thread_queue_item
* const& operator[](std::size_t index
) const
130 assert (index
< size());
131 return data
->content
[index
];
134 ~successor_list(void)
136 if (--data
->count
== 0)
137 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
143 dsp_thread_queue_item(runnable
const & job
, successor_list
const & successors
,
144 activation_limit_t activation_limit
):
145 activation_count(0), job(job
), successors(successors
), activation_limit(activation_limit
)
148 dsp_thread_queue_item
* run(dsp_queue_interpreter
& interpreter
, boost::uint8_t thread_index
)
150 assert(activation_count
== 0);
154 dsp_thread_queue_item
* next
= update_dependencies(interpreter
);
155 reset_activation_count();
159 /** called from the run method or once, when dsp queue is initialized */
160 void reset_activation_count(void)
162 assert(activation_count
== 0);
163 activation_count
.store(activation_limit
, boost::memory_order_release
);
166 runnable
const & get_job(void) const
171 runnable
& get_job(void)
176 #ifdef DEBUG_DSP_THREADS
180 printf("\titem %p\n", this);
181 printf("\tactivation limit %d\n", int(activation_limit
));
183 if (!successors
.empty()) {
184 printf("\tsuccessors:\n");
185 BOOST_FOREACH(dsp_thread_queue_item
* item
, successors
) {
186 printf("\t\t%p\n", item
);
194 /** \brief update all successors and possibly mark them as runnable */
195 dsp_thread_queue_item
* update_dependencies(dsp_queue_interpreter
& interpreter
)
197 dsp_thread_queue_item
* ptr
;
200 if (i
== successors
.size())
203 ptr
= successors
[i
++]->dec_activation_count(interpreter
);
205 break; // no need to update the next item to run
208 while (i
!= successors
.size()) {
209 dsp_thread_queue_item
* next
= successors
[i
++]->dec_activation_count(interpreter
);
211 interpreter
.mark_as_runnable(next
);
217 /** \brief decrement activation count and return this, if it drops to zero
219 inline dsp_thread_queue_item
* dec_activation_count(dsp_queue_interpreter
& interpreter
)
221 activation_limit_t current
= activation_count
--;
230 boost::atomic
<activation_limit_t
> activation_count
; /**< current activation count */
233 const successor_list successors
; /**< list of successing nodes */
234 const activation_limit_t activation_limit
; /**< number of precedessors */
237 template <typename runnable
, typename Alloc
= std::allocator
<void*> >
238 class dsp_thread_queue
240 typedef boost::uint_fast16_t node_count_t
;
242 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
243 typedef std::vector
<dsp_thread_queue_item
*,
244 typename
Alloc::template rebind
<dsp_thread_queue_item
*>::other
247 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other item_allocator
;
250 #ifdef DEBUG_DSP_THREADS
251 void dump_queue(void)
255 printf("queue %p\n items:\n", this);
256 BOOST_FOREACH(dsp_thread_queue_item
* item
, queue_items
)
258 printf("\ninitial items:\n", this);
259 BOOST_FOREACH(dsp_thread_queue_item
* item
, initially_runnable_items
)
263 std::cout
<< std::endl
;
267 /** preallocate node_count nodes */
268 dsp_thread_queue(std::size_t node_count
):
271 initially_runnable_items
.reserve(node_count
);
272 queue_items
= item_allocator().allocate(node_count
* sizeof(dsp_thread_queue_item
));
275 ~dsp_thread_queue(void)
277 for (std::size_t i
= 0; i
!= total_node_count
; ++i
)
278 queue_items
[i
].~dsp_thread_queue_item();
279 item_allocator().deallocate(queue_items
, total_node_count
* sizeof(dsp_thread_queue_item
));
282 void add_initially_runnable(dsp_thread_queue_item
* item
)
284 initially_runnable_items
.push_back(item
);
287 /** return initialized queue item */
288 dsp_thread_queue_item
*
289 allocate_queue_item(runnable
const & job
,
290 typename
dsp_thread_queue_item::successor_list
const & successors
,
291 typename
dsp_thread_queue_item::activation_limit_t activation_limit
)
293 dsp_thread_queue_item
* ret
= queue_items
+ total_node_count
;
296 assert (total_node_count
<= initially_runnable_items
.capacity());
297 new (ret
) dsp_thread_queue_item(job
, successors
, activation_limit
);
301 void reset_activation_counts(void)
303 for (node_count_t i
= 0; i
!= total_node_count
; ++i
)
304 queue_items
[i
].reset_activation_count();
307 node_count_t
get_total_node_count(void) const
309 return total_node_count
;
313 node_count_t total_node_count
; /* total number of nodes */
314 item_vector_t initially_runnable_items
; /* nodes without precedessor */
315 dsp_thread_queue_item
* queue_items
; /* all nodes */
317 friend class dsp_queue_interpreter
<runnable
, Alloc
>;
320 template <typename runnable
,
321 typename Alloc
= std::allocator
<void*> >
322 class dsp_queue_interpreter
325 typedef nova::dsp_thread_queue
<runnable
, Alloc
> dsp_thread_queue
;
326 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
327 typedef typename
dsp_thread_queue_item::successor_list successor_list
;
328 typedef std::size_t size_t;
331 typedef boost::uint_fast8_t thread_count_t
;
332 typedef boost::uint_fast16_t node_count_t
;
334 #ifdef __GXX_EXPERIMENTAL_CXX0X__
335 typedef std::unique_ptr
<dsp_thread_queue
> dsp_thread_queue_ptr
;
337 typedef std::auto_ptr
<dsp_thread_queue
> dsp_thread_queue_ptr
;
340 dsp_queue_interpreter(thread_count_t tc
):
341 runnable_set(1024), node_count(0)
343 if (!runnable_set
.is_lock_free())
344 std::cout
<< "Warning: scheduler queue is not lockfree!" << std::endl
;
346 set_thread_count(tc
);
349 /** prepares queue and queue interpreter for dsp tick
351 * \return true, if dsp queue is valid
352 * false, if no dsp queue is available or queue is empty
356 if (unlikely((queue
.get() == NULL
) or /* no queue */
357 (queue
->get_total_node_count() == 0) /* no nodes */
361 /* reset node count */
362 assert(node_count
== 0);
363 assert(runnable_set
.empty());
364 node_count
.store(queue
->get_total_node_count(), boost::memory_order_release
);
366 for (size_t i
= 0; i
!= queue
->initially_runnable_items
.size(); ++i
)
367 mark_as_runnable(queue
->initially_runnable_items
[i
]);
372 dsp_thread_queue_ptr
release_queue(void)
374 dsp_thread_queue_ptr
ret(queue
.release());
378 #ifdef __GXX_EXPERIMENTAL_CXX0X__
379 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
&& new_queue
)
381 dsp_thread_queue_ptr
ret(std::move(queue
));
383 queue
= std::move(new_queue
);
384 if (queue
.get() == 0)
387 queue
->reset_activation_counts();
389 #ifdef DEBUG_DSP_THREADS
393 thread_count_t thread_number
=
394 std::min(thread_count_t(std::min(total_node_count(),
395 node_count_t(std::numeric_limits
<thread_count_t
>::max()))),
398 used_helper_threads
= thread_number
- 1; /* this thread is not waked up */
404 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
& new_queue
)
406 dsp_thread_queue_ptr
ret(queue
.release());
408 if (queue
.get() == 0)
411 queue
->reset_activation_counts();
413 #ifdef DEBUG_DSP_THREADS
417 thread_count_t thread_number
=
418 std::min(thread_count_t(std::min(total_node_count(),
419 node_count_t(std::numeric_limits
<thread_count_t
>::max()))),
422 used_helper_threads
= thread_number
- 1; /* this thread is not waked up */
427 node_count_t
total_node_count(void) const
429 return queue
->get_total_node_count();
432 void set_thread_count(thread_count_t i
)
434 assert (i
< std::numeric_limits
<thread_count_t
>::max());
435 i
= std::max(thread_count_t(1u), i
);
439 thread_count_t
get_thread_count(void) const
444 thread_count_t
get_used_helper_threads(void) const
446 return used_helper_threads
;
449 void tick(thread_count_t thread_index
)
451 run_item(thread_index
);
458 backup(int min
, int max
): min(min
), max(max
), loops(min
) {}
462 for (int i
= 0; i
!= loops
; ++i
)
463 asm(""); // empty asm to avoid optimization
465 loops
= std::min(loops
* 2, max
);
476 void run_item(thread_count_t index
)
478 backup
b(256, 32768);
482 if (!node_count
.load(boost::memory_order_acquire
))
485 /* we still have some nodes to process */
486 int state
= run_next_item(index
);
488 case no_remaining_items
:
495 case remaining_items
:
500 if (poll_counts
== 50000) {
501 // the maximum poll count is system-dependent. 50000 should be high enough for recent machines
502 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
509 void tick_master(void)
515 void run_item_master(void)
519 assert(runnable_set
.empty());
522 void wait_for_end(void)
525 while (node_count
.load(boost::memory_order_acquire
) != 0) {
527 if (count
== 1000000) {
528 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
531 } // busy-wait for helper threads to finish
534 HOT
int run_next_item(thread_count_t index
)
536 dsp_thread_queue_item
* item
;
537 bool success
= runnable_set
.pop(item
);
542 node_count_t consumed
= 0;
545 item
= item
->run(*this, index
);
547 } while (item
!= NULL
);
549 node_count_t remaining
= node_count
.fetch_sub(consumed
, boost::memory_order_release
);
551 assert (remaining
>= consumed
);
553 if (remaining
== consumed
)
554 return no_remaining_items
;
556 return remaining_items
;
559 void mark_as_runnable(dsp_thread_queue_item
* item
)
561 runnable_set
.push(item
);
564 friend class nova::dsp_thread_queue_item
<runnable
, Alloc
>;
573 dsp_thread_queue_ptr queue
;
575 thread_count_t thread_count
; /* number of dsp threads to be used by this queue */
576 thread_count_t used_helper_threads
; /* number of helper threads, which are actually used */
578 boost::lockfree::stack
<dsp_thread_queue_item
*> runnable_set
;
579 boost::atomic
<node_count_t
> node_count
; /* number of nodes, that need to be processed during this tick */
582 } /* namespace nova */
584 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP */