Fix scvim regsitry file for updated filename (thanks Carlo Capocasa)
[supercollider.git] / server / supernova / dsp_thread_queue / dsp_thread_queue.hpp
blobba5b3caf4037508be94099ab2eebfbb198c8c753
1 // dsp thread queue
2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
22 #include <algorithm>
23 #include <iostream>
24 #include <memory>
25 #include <vector>
27 #include <boost/atomic.hpp>
28 #include <boost/cstdint.hpp>
29 #include <boost/thread.hpp>
30 #include <cstdio>
32 #ifdef DEBUG_DSP_THREADS
33 #include <boost/foreach.hpp>
34 #include <cstdio>
35 #endif
37 #include <boost/lockfree/stack.hpp>
39 #include "nova-tt/semaphore.hpp"
41 #include "utilities/branch_hints.hpp"
42 #include "utilities/utils.hpp"
44 namespace nova
47 template <typename runnable, typename Alloc>
48 class dsp_queue_interpreter;
51 concept runnable
53 runnable(const & runnable);
55 operator()(uint threadindex);
59 /** item of a dsp thread queue
61 * \tparam Alloc allocator for successor list
63 * \todo operator new doesn't support stateful allocators
65 template <typename runnable,
66 typename Alloc = std::allocator<void*> >
67 class dsp_thread_queue_item:
68 private Alloc
70 typedef nova::dsp_queue_interpreter<runnable, Alloc> dsp_queue_interpreter;
72 typedef typename Alloc::template rebind<dsp_thread_queue_item>::other new_allocator;
74 public:
75 typedef boost::uint_fast16_t activation_limit_t;
77 struct successor_list
79 struct data_t
81 uint32_t count;
82 uint32_t size;
83 dsp_thread_queue_item* content[0];
86 typedef typename Alloc::template rebind<data_t>::other array_allocator;
88 /* create instance */
89 explicit successor_list(uint32_t size = 0)
91 data = array_allocator().allocate(2*sizeof(uint32_t) + size * sizeof(dsp_thread_queue_item*));
92 data->count = 1;
93 data->size = size;
96 successor_list(successor_list const & rhs):
97 data(rhs.data)
99 data->count++;
102 successor_list & operator=(successor_list const & rhs)
104 if (--data->count == 0)
105 array_allocator().deallocate(data, 2*sizeof(uint32_t) + data->size * sizeof(dsp_thread_queue_item*));
107 data = rhs.data;
108 data->count++;
109 return *this;
112 std::size_t size(void) const
114 return data->size;
117 bool empty(void) const
119 return size() == 0;
122 dsp_thread_queue_item *& operator[](std::size_t index)
124 assert (index < size());
125 return data->content[index];
128 dsp_thread_queue_item * const& operator[](std::size_t index) const
130 assert (index < size());
131 return data->content[index];
134 ~successor_list(void)
136 if (--data->count == 0)
137 array_allocator().deallocate(data, 2*sizeof(uint32_t) + data->size * sizeof(dsp_thread_queue_item*));
140 data_t * data;
143 dsp_thread_queue_item(runnable const & job, successor_list const & successors,
144 activation_limit_t activation_limit):
145 activation_count(0), job(job), successors(successors), activation_limit(activation_limit)
148 dsp_thread_queue_item * run(dsp_queue_interpreter & interpreter, boost::uint8_t thread_index)
150 assert(activation_count == 0);
152 job(thread_index);
154 dsp_thread_queue_item * next = update_dependencies(interpreter);
155 reset_activation_count();
156 return next;
159 /** called from the run method or once, when dsp queue is initialized */
160 void reset_activation_count(void)
162 assert(activation_count == 0);
163 activation_count.store(activation_limit, boost::memory_order_release);
166 runnable const & get_job(void) const
168 return job;
171 runnable & get_job(void)
173 return job;
176 #ifdef DEBUG_DSP_THREADS
177 void dump_item(void)
179 using namespace std;
180 printf("\titem %p\n", this);
181 printf("\tactivation limit %d\n", int(activation_limit));
183 if (!successors.empty()) {
184 printf("\tsuccessors:\n");
185 BOOST_FOREACH(dsp_thread_queue_item * item, successors) {
186 printf("\t\t%p\n", item);
189 printf("\n");
191 #endif
193 private:
194 /** \brief update all successors and possibly mark them as runnable */
195 dsp_thread_queue_item * update_dependencies(dsp_queue_interpreter & interpreter)
197 dsp_thread_queue_item * ptr;
198 std::size_t i = 0;
199 for (;;) {
200 if (i == successors.size())
201 return NULL;
203 ptr = successors[i++]->dec_activation_count(interpreter);
204 if (ptr)
205 break; // no need to update the next item to run
208 while (i != successors.size()) {
209 dsp_thread_queue_item * next = successors[i++]->dec_activation_count(interpreter);
210 if (next)
211 interpreter.mark_as_runnable(next);
214 return ptr;
217 /** \brief decrement activation count and return this, if it drops to zero
219 inline dsp_thread_queue_item * dec_activation_count(dsp_queue_interpreter & interpreter)
221 activation_limit_t current = activation_count--;
222 assert(current > 0);
224 if (current == 1)
225 return this;
226 else
227 return NULL;
230 boost::atomic<activation_limit_t> activation_count; /**< current activation count */
232 runnable job;
233 const successor_list successors; /**< list of successing nodes */
234 const activation_limit_t activation_limit; /**< number of precedessors */
237 template <typename runnable, typename Alloc = std::allocator<void*> >
238 class dsp_thread_queue
240 typedef boost::uint_fast16_t node_count_t;
242 typedef nova::dsp_thread_queue_item<runnable, Alloc> dsp_thread_queue_item;
243 typedef std::vector<dsp_thread_queue_item*,
244 typename Alloc::template rebind<dsp_thread_queue_item*>::other
245 > item_vector_t;
247 typedef typename Alloc::template rebind<dsp_thread_queue_item>::other item_allocator;
249 public:
250 #ifdef DEBUG_DSP_THREADS
251 void dump_queue(void)
253 using namespace std;
255 printf("queue %p\n items:\n", this);
256 BOOST_FOREACH(dsp_thread_queue_item * item, queue_items)
257 item->dump_item();
258 printf("\ninitial items:\n", this);
259 BOOST_FOREACH(dsp_thread_queue_item * item, initially_runnable_items)
260 item->dump_item();
262 printf("\n");
263 std::cout << std::endl;
265 #endif
267 /** preallocate node_count nodes */
268 dsp_thread_queue(std::size_t node_count):
269 total_node_count(0)
271 initially_runnable_items.reserve(node_count);
272 queue_items = item_allocator().allocate(node_count * sizeof(dsp_thread_queue_item));
275 ~dsp_thread_queue(void)
277 for (std::size_t i = 0; i != total_node_count; ++i)
278 queue_items[i].~dsp_thread_queue_item();
279 item_allocator().deallocate(queue_items, total_node_count * sizeof(dsp_thread_queue_item));
282 void add_initially_runnable(dsp_thread_queue_item * item)
284 initially_runnable_items.push_back(item);
287 /** return initialized queue item */
288 dsp_thread_queue_item *
289 allocate_queue_item(runnable const & job,
290 typename dsp_thread_queue_item::successor_list const & successors,
291 typename dsp_thread_queue_item::activation_limit_t activation_limit)
293 dsp_thread_queue_item * ret = queue_items + total_node_count;
294 ++total_node_count;
296 assert (total_node_count <= initially_runnable_items.capacity());
297 new (ret) dsp_thread_queue_item(job, successors, activation_limit);
298 return ret;
301 void reset_activation_counts(void)
303 for (node_count_t i = 0; i != total_node_count; ++i)
304 queue_items[i].reset_activation_count();
307 node_count_t get_total_node_count(void) const
309 return total_node_count;
312 private:
313 node_count_t total_node_count; /* total number of nodes */
314 item_vector_t initially_runnable_items; /* nodes without precedessor */
315 dsp_thread_queue_item * queue_items; /* all nodes */
317 friend class dsp_queue_interpreter<runnable, Alloc>;
320 template <typename runnable,
321 typename Alloc = std::allocator<void*> >
322 class dsp_queue_interpreter
324 protected:
325 typedef nova::dsp_thread_queue<runnable, Alloc> dsp_thread_queue;
326 typedef nova::dsp_thread_queue_item<runnable, Alloc> dsp_thread_queue_item;
327 typedef typename dsp_thread_queue_item::successor_list successor_list;
328 typedef std::size_t size_t;
330 public:
331 typedef boost::uint_fast8_t thread_count_t;
332 typedef boost::uint_fast16_t node_count_t;
334 #ifdef __GXX_EXPERIMENTAL_CXX0X__
335 typedef std::unique_ptr<dsp_thread_queue> dsp_thread_queue_ptr;
336 #else
337 typedef std::auto_ptr<dsp_thread_queue> dsp_thread_queue_ptr;
338 #endif
340 dsp_queue_interpreter(thread_count_t tc):
341 runnable_set(1024), node_count(0)
343 if (!runnable_set.is_lock_free())
344 std::cout << "Warning: scheduler queue is not lockfree!" << std::endl;
346 set_thread_count(tc);
349 /** prepares queue and queue interpreter for dsp tick
351 * \return true, if dsp queue is valid
352 * false, if no dsp queue is available or queue is empty
354 bool init_tick(void)
356 if (unlikely((queue.get() == NULL) or /* no queue */
357 (queue->get_total_node_count() == 0) /* no nodes */
359 return false;
361 /* reset node count */
362 assert(node_count == 0);
363 assert(runnable_set.empty());
364 node_count.store(queue->get_total_node_count(), boost::memory_order_release);
366 for (size_t i = 0; i != queue->initially_runnable_items.size(); ++i)
367 mark_as_runnable(queue->initially_runnable_items[i]);
369 return true;
372 dsp_thread_queue_ptr release_queue(void)
374 dsp_thread_queue_ptr ret(queue.release());
375 return ret;
378 #ifdef __GXX_EXPERIMENTAL_CXX0X__
379 dsp_thread_queue_ptr reset_queue(dsp_thread_queue_ptr && new_queue)
381 dsp_thread_queue_ptr ret(std::move(queue));
383 queue = std::move(new_queue);
384 if (queue.get() == 0)
385 return ret;
387 queue->reset_activation_counts();
389 #ifdef DEBUG_DSP_THREADS
390 queue->dump_queue();
391 #endif
393 thread_count_t thread_number =
394 std::min(thread_count_t(std::min(total_node_count(),
395 node_count_t(std::numeric_limits<thread_count_t>::max()))),
396 thread_count);
398 used_helper_threads = thread_number - 1; /* this thread is not waked up */
399 return ret;
402 #else
404 dsp_thread_queue_ptr reset_queue(dsp_thread_queue_ptr & new_queue)
406 dsp_thread_queue_ptr ret(queue.release());
407 queue = new_queue;
408 if (queue.get() == 0)
409 return ret;
411 queue->reset_activation_counts();
413 #ifdef DEBUG_DSP_THREADS
414 queue->dump_queue();
415 #endif
417 thread_count_t thread_number =
418 std::min(thread_count_t(std::min(total_node_count(),
419 node_count_t(std::numeric_limits<thread_count_t>::max()))),
420 thread_count);
422 used_helper_threads = thread_number - 1; /* this thread is not waked up */
423 return ret;
425 #endif
427 node_count_t total_node_count(void) const
429 return queue->get_total_node_count();
432 void set_thread_count(thread_count_t i)
434 assert (i < std::numeric_limits<thread_count_t>::max());
435 i = std::max(thread_count_t(1u), i);
436 thread_count = i;
439 thread_count_t get_thread_count(void) const
441 return thread_count;
444 thread_count_t get_used_helper_threads(void) const
446 return used_helper_threads;
449 void tick(thread_count_t thread_index)
451 run_item(thread_index);
455 private:
456 struct backup
458 backup(int min, int max): min(min), max(max), loops(min) {}
460 void run(void)
462 for (int i = 0; i != loops; ++i)
463 asm(""); // empty asm to avoid optimization
465 loops = std::min(loops * 2, max);
468 void reset(void)
470 loops = min;
473 int min, max, loops;
476 void run_item(thread_count_t index)
478 backup b(256, 32768);
479 int poll_counts = 0;
481 for (;;) {
482 if (!node_count.load(boost::memory_order_acquire))
483 return;
485 /* we still have some nodes to process */
486 int state = run_next_item(index);
487 switch (state) {
488 case no_remaining_items:
489 return;
490 case fifo_empty:
491 b.run();
492 ++poll_counts;
493 break;
495 case remaining_items:
496 b.reset();
497 poll_counts = 0;
500 if (poll_counts == 50000) {
501 // the maximum poll count is system-dependent. 50000 should be high enough for recent machines
502 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
503 abort();
508 public:
509 void tick_master(void)
511 run_item_master();
514 private:
515 void run_item_master(void)
517 run_item(0);
518 wait_for_end();
519 assert(runnable_set.empty());
522 void wait_for_end(void)
524 int count = 0;
525 while (node_count.load(boost::memory_order_acquire) != 0) {
526 ++count;
527 if (count == 1000000) {
528 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
529 abort();
531 } // busy-wait for helper threads to finish
534 HOT int run_next_item(thread_count_t index)
536 dsp_thread_queue_item * item;
537 bool success = runnable_set.pop(item);
539 if (!success)
540 return fifo_empty;
542 node_count_t consumed = 0;
544 do {
545 item = item->run(*this, index);
546 consumed += 1;
547 } while (item != NULL);
549 node_count_t remaining = node_count.fetch_sub(consumed, boost::memory_order_release);
551 assert (remaining >= consumed);
553 if (remaining == consumed)
554 return no_remaining_items;
555 else
556 return remaining_items;
559 void mark_as_runnable(dsp_thread_queue_item * item)
561 runnable_set.push(item);
564 friend class nova::dsp_thread_queue_item<runnable, Alloc>;
566 private:
567 enum {
568 no_remaining_items,
569 fifo_empty,
570 remaining_items
573 dsp_thread_queue_ptr queue;
575 thread_count_t thread_count; /* number of dsp threads to be used by this queue */
576 thread_count_t used_helper_threads; /* number of helper threads, which are actually used */
578 boost::lockfree::stack<dsp_thread_queue_item*> runnable_set;
579 boost::atomic<node_count_t> node_count; /* number of nodes, that need to be processed during this tick */
582 } /* namespace nova */
584 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP */