2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
27 #include <boost/atomic.hpp>
28 #include <boost/cstdint.hpp>
29 #include <boost/thread.hpp>
31 #ifdef DEBUG_DSP_THREADS
32 #include <boost/foreach.hpp>
36 #include <boost/lockfree/stack.hpp>
38 #include "nova-tt/semaphore.hpp"
40 #include "utilities/branch_hints.hpp"
41 #include "utilities/utils.hpp"
46 template <typename runnable
, typename Alloc
>
47 class dsp_queue_interpreter
;
52 runnable(const & runnable);
54 operator()(uint threadindex);
58 /** item of a dsp thread queue
60 * \tparam Alloc allocator for successor list
62 * \todo operator new doesn't support stateful allocators
64 template <typename runnable
,
65 typename Alloc
= std::allocator
<void*> >
66 class dsp_thread_queue_item
:
69 typedef nova::dsp_queue_interpreter
<runnable
, Alloc
> dsp_queue_interpreter
;
71 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other new_allocator
;
74 typedef boost::uint_fast16_t activation_limit_t
;
82 dsp_thread_queue_item
* content
[0];
85 typedef typename
Alloc::template rebind
<data_t
>::other array_allocator
;
88 explicit successor_list(uint32_t size
= 0)
90 data
= array_allocator().allocate(2*sizeof(uint32_t) + size
* sizeof(dsp_thread_queue_item
*));
95 successor_list(successor_list
const & rhs
):
101 successor_list
& operator=(successor_list
const & rhs
)
103 if (--data
->count
== 0)
104 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
111 std::size_t size(void) const
116 bool empty(void) const
121 dsp_thread_queue_item
*& operator[](std::size_t index
)
123 assert (index
< size());
124 return data
->content
[index
];
127 dsp_thread_queue_item
* const& operator[](std::size_t index
) const
129 assert (index
< size());
130 return data
->content
[index
];
133 ~successor_list(void)
135 if (--data
->count
== 0)
136 array_allocator().deallocate(data
, 2*sizeof(uint32_t) + data
->size
* sizeof(dsp_thread_queue_item
*));
142 dsp_thread_queue_item(runnable
const & job
, successor_list
const & successors
,
143 activation_limit_t activation_limit
):
144 activation_count(0), job(job
), successors(successors
), activation_limit(activation_limit
)
147 dsp_thread_queue_item
* run(dsp_queue_interpreter
& interpreter
, boost::uint8_t thread_index
)
149 assert(activation_count
== 0);
153 dsp_thread_queue_item
* next
= update_dependencies(interpreter
);
154 reset_activation_count();
158 /** called from the run method or once, when dsp queue is initialized */
159 void reset_activation_count(void)
161 assert(activation_count
== 0);
162 activation_count
.store(activation_limit
, boost::memory_order_release
);
165 runnable
const & get_job(void) const
170 runnable
& get_job(void)
175 #ifdef DEBUG_DSP_THREADS
179 printf("\titem %p\n", this);
180 printf("\tactivation limit %d\n", int(activation_limit
));
182 if (!successors
.empty()) {
183 printf("\tsuccessors:\n");
184 BOOST_FOREACH(dsp_thread_queue_item
* item
, successors
) {
185 printf("\t\t%p\n", item
);
193 /** \brief update all successors and possibly mark them as runnable */
194 dsp_thread_queue_item
* update_dependencies(dsp_queue_interpreter
& interpreter
)
196 dsp_thread_queue_item
* ptr
;
199 if (i
== successors
.size())
202 ptr
= successors
[i
++]->dec_activation_count(interpreter
);
204 break; // no need to update the next item to run
207 while (i
!= successors
.size()) {
208 dsp_thread_queue_item
* next
= successors
[i
++]->dec_activation_count(interpreter
);
210 interpreter
.mark_as_runnable(next
);
216 /** \brief decrement activation count and return this, if it drops to zero
218 inline dsp_thread_queue_item
* dec_activation_count(dsp_queue_interpreter
& interpreter
)
220 activation_limit_t current
= activation_count
--;
229 boost::atomic
<activation_limit_t
> activation_count
; /**< current activation count */
232 const successor_list successors
; /**< list of successing nodes */
233 const activation_limit_t activation_limit
; /**< number of precedessors */
236 template <typename runnable
, typename Alloc
= std::allocator
<void*> >
237 class dsp_thread_queue
239 typedef boost::uint_fast16_t node_count_t
;
241 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
242 typedef std::vector
<dsp_thread_queue_item
*,
243 typename
Alloc::template rebind
<dsp_thread_queue_item
*>::other
246 typedef typename
Alloc::template rebind
<dsp_thread_queue_item
>::other item_allocator
;
249 #ifdef DEBUG_DSP_THREADS
250 void dump_queue(void)
254 printf("queue %p\n items:\n", this);
255 BOOST_FOREACH(dsp_thread_queue_item
* item
, queue_items
)
257 printf("\ninitial items:\n", this);
258 BOOST_FOREACH(dsp_thread_queue_item
* item
, initially_runnable_items
)
262 std::cout
<< std::endl
;
266 /** preallocate node_count nodes */
267 dsp_thread_queue(std::size_t node_count
):
270 initially_runnable_items
.reserve(node_count
);
271 queue_items
= item_allocator().allocate(node_count
* sizeof(dsp_thread_queue_item
));
274 ~dsp_thread_queue(void)
276 for (std::size_t i
= 0; i
!= total_node_count
; ++i
)
277 queue_items
[i
].~dsp_thread_queue_item();
278 item_allocator().deallocate(queue_items
, total_node_count
* sizeof(dsp_thread_queue_item
));
281 void add_initially_runnable(dsp_thread_queue_item
* item
)
283 initially_runnable_items
.push_back(item
);
286 /** return initialized queue item */
287 dsp_thread_queue_item
*
288 allocate_queue_item(runnable
const & job
,
289 typename
dsp_thread_queue_item::successor_list
const & successors
,
290 typename
dsp_thread_queue_item::activation_limit_t activation_limit
)
292 dsp_thread_queue_item
* ret
= queue_items
+ total_node_count
;
295 assert (total_node_count
<= initially_runnable_items
.capacity());
296 new (ret
) dsp_thread_queue_item(job
, successors
, activation_limit
);
300 void reset_activation_counts(void)
302 for (node_count_t i
= 0; i
!= total_node_count
; ++i
)
303 queue_items
[i
].reset_activation_count();
306 node_count_t
get_total_node_count(void) const
308 return total_node_count
;
312 node_count_t total_node_count
; /* total number of nodes */
313 item_vector_t initially_runnable_items
; /* nodes without precedessor */
314 dsp_thread_queue_item
* queue_items
; /* all nodes */
316 friend class dsp_queue_interpreter
<runnable
, Alloc
>;
319 template <typename runnable
,
320 typename Alloc
= std::allocator
<void*> >
321 class dsp_queue_interpreter
324 typedef nova::dsp_thread_queue
<runnable
, Alloc
> dsp_thread_queue
;
325 typedef nova::dsp_thread_queue_item
<runnable
, Alloc
> dsp_thread_queue_item
;
326 typedef typename
dsp_thread_queue_item::successor_list successor_list
;
327 typedef std::size_t size_t;
330 typedef boost::uint_fast8_t thread_count_t
;
331 typedef boost::uint_fast16_t node_count_t
;
333 #ifdef __GXX_EXPERIMENTAL_CXX0X__
334 typedef std::unique_ptr
<dsp_thread_queue
> dsp_thread_queue_ptr
;
336 typedef std::auto_ptr
<dsp_thread_queue
> dsp_thread_queue_ptr
;
339 dsp_queue_interpreter(thread_count_t tc
):
340 runnable_set(1024), node_count(0)
342 if (!runnable_set
.is_lock_free())
343 std::cout
<< "Warning: scheduler queue is not lockfree!" << std::endl
;
345 set_thread_count(tc
);
348 /** prepares queue and queue interpreter for dsp tick
350 * \return true, if dsp queue is valid
351 * false, if no dsp queue is available or queue is empty
355 if (unlikely((queue
.get() == NULL
) or /* no queue */
356 (queue
->get_total_node_count() == 0) /* no nodes */
360 /* reset node count */
361 assert(node_count
== 0);
362 assert(runnable_set
.empty());
363 node_count
.store(queue
->get_total_node_count(), boost::memory_order_release
);
365 for (size_t i
= 0; i
!= queue
->initially_runnable_items
.size(); ++i
)
366 mark_as_runnable(queue
->initially_runnable_items
[i
]);
371 dsp_thread_queue_ptr
release_queue(void)
373 dsp_thread_queue_ptr
ret(queue
.release());
377 #ifdef __GXX_EXPERIMENTAL_CXX0X__
378 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
&& new_queue
)
380 dsp_thread_queue_ptr
ret(std::move(queue
));
382 queue
= std::move(new_queue
);
383 if (queue
.get() == 0)
386 queue
->reset_activation_counts();
388 #ifdef DEBUG_DSP_THREADS
392 thread_count_t thread_number
=
393 std::min(thread_count_t(std::min(total_node_count(),
394 node_count_t(std::numeric_limits
<thread_count_t
>::max()))),
397 used_helper_threads
= thread_number
- 1; /* this thread is not waked up */
403 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
& new_queue
)
405 dsp_thread_queue_ptr
ret(queue
.release());
407 if (queue
.get() == 0)
410 queue
->reset_activation_counts();
412 #ifdef DEBUG_DSP_THREADS
416 thread_count_t thread_number
=
417 std::min(thread_count_t(std::min(total_node_count(),
418 node_count_t(std::numeric_limits
<thread_count_t
>::max()))),
421 used_helper_threads
= thread_number
- 1; /* this thread is not waked up */
426 node_count_t
total_node_count(void) const
428 return queue
->get_total_node_count();
431 void set_thread_count(thread_count_t i
)
433 assert (i
< std::numeric_limits
<thread_count_t
>::max());
434 i
= std::max(thread_count_t(1u), i
);
438 thread_count_t
get_thread_count(void) const
443 thread_count_t
get_used_helper_threads(void) const
445 return used_helper_threads
;
448 void tick(thread_count_t thread_index
)
450 run_item(thread_index
);
457 backup(int min
, int max
): min(min
), max(max
), loops(min
) {}
461 for (int i
= 0; i
!= loops
; ++i
)
462 asm(""); // empty asm to avoid optimization
464 loops
= std::min(loops
* 2, max
);
475 void run_item(thread_count_t index
)
477 backup
b(256, 32768);
479 if (node_count
.load(boost::memory_order_acquire
)) {
480 /* we still have some nodes to process */
481 int state
= run_next_item(index
);
484 case no_remaining_items
:
498 void tick_master(void)
504 void run_item_master(void)
508 assert(runnable_set
.empty());
511 void wait_for_end(void)
513 while (node_count
.load(boost::memory_order_acquire
) != 0)
514 {} // busy-wait for helper threads to finish
517 HOT
int run_next_item(thread_count_t index
)
519 dsp_thread_queue_item
* item
;
520 bool success
= runnable_set
.pop(item
);
525 node_count_t consumed
= 0;
528 item
= item
->run(*this, index
);
530 } while (item
!= NULL
);
532 node_count_t remaining
= node_count
.fetch_sub(consumed
, boost::memory_order_release
);
534 assert (remaining
>= consumed
);
536 if (remaining
== consumed
)
537 return no_remaining_items
;
539 return remaining_items
;
542 void mark_as_runnable(dsp_thread_queue_item
* item
)
544 runnable_set
.push(item
);
547 friend class nova::dsp_thread_queue_item
<runnable
, Alloc
>;
556 dsp_thread_queue_ptr queue
;
558 thread_count_t thread_count
; /* number of dsp threads to be used by this queue */
559 thread_count_t used_helper_threads
; /* number of helper threads, which are actually used */
561 boost::lockfree::stack
<dsp_thread_queue_item
*> runnable_set
;
562 boost::atomic
<node_count_t
> node_count
; /* number of nodes, that need to be processed during this tick */
565 } /* namespace nova */
567 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP */