scide: implement selectionLength for openDocument
[supercollider.git] / server / supernova / dsp_thread_queue / dsp_thread_queue.hpp
blob325dd1d71a7d787f081c51b40a06d733591d9eaf
1 // dsp thread queue
2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP
22 #include <algorithm>
23 #include <cstdint>
24 #include <iostream>
25 #include <memory>
26 #include <vector>
28 #include <boost/atomic.hpp>
29 #include <cstdio>
31 #ifdef DEBUG_DSP_THREADS
32 #include <cstdio>
33 #endif
35 #include <boost/lockfree/stack.hpp>
37 #include "nova-tt/semaphore.hpp"
39 #include "utilities/branch_hints.hpp"
40 #include "utilities/utils.hpp"
42 namespace nova {
44 template <typename runnable, typename Alloc>
45 class dsp_queue_interpreter;
48 concept runnable
50 runnable(const & runnable);
52 operator()(uint threadindex);
56 /** item of a dsp thread queue
58 * \tparam Alloc allocator for successor list
60 * \todo operator new doesn't support stateful allocators
62 template <typename runnable,
63 typename Alloc = std::allocator<void*> >
64 class dsp_thread_queue_item:
65 private Alloc
67 typedef nova::dsp_queue_interpreter<runnable, Alloc> dsp_queue_interpreter;
69 typedef typename Alloc::template rebind<dsp_thread_queue_item>::other new_allocator;
71 public:
72 typedef std::uint_fast16_t activation_limit_t;
74 struct successor_list
76 struct data_t
78 uint32_t count;
79 uint32_t size;
80 dsp_thread_queue_item* content[0];
83 typedef typename Alloc::template rebind<data_t>::other array_allocator;
85 /* create instance */
86 explicit successor_list(uint32_t size = 0)
88 data = array_allocator().allocate(2*sizeof(uint32_t) + size * sizeof(dsp_thread_queue_item*));
89 data->count = 1;
90 data->size = size;
93 successor_list(successor_list const & rhs):
94 data(rhs.data)
96 data->count++;
99 successor_list & operator=(successor_list const & rhs)
101 if (--data->count == 0)
102 array_allocator().deallocate(data, 2*sizeof(uint32_t) + data->size * sizeof(dsp_thread_queue_item*));
104 data = rhs.data;
105 data->count++;
106 return *this;
109 std::size_t size(void) const
111 return data->size;
114 bool empty(void) const
116 return size() == 0;
119 dsp_thread_queue_item *& operator[](std::size_t index)
121 assert (index < size());
122 return data->content[index];
125 dsp_thread_queue_item * const& operator[](std::size_t index) const
127 assert (index < size());
128 return data->content[index];
131 ~successor_list(void)
133 if (--data->count == 0)
134 array_allocator().deallocate(data, 2*sizeof(uint32_t) + data->size * sizeof(dsp_thread_queue_item*));
137 data_t * data;
140 dsp_thread_queue_item(runnable const & job, successor_list const & successors,
141 activation_limit_t activation_limit):
142 activation_count(0), job(job), successors(successors), activation_limit(activation_limit)
145 dsp_thread_queue_item * run(dsp_queue_interpreter & interpreter, boost::uint8_t thread_index)
147 assert(activation_count == 0);
149 job(thread_index);
151 dsp_thread_queue_item * next = update_dependencies(interpreter);
152 reset_activation_count();
153 return next;
156 /** called from the run method or once, when dsp queue is initialized */
157 void reset_activation_count(void)
159 assert(activation_count == 0);
160 activation_count.store(activation_limit, boost::memory_order_release);
163 runnable const & get_job(void) const
165 return job;
168 runnable & get_job(void)
170 return job;
173 #ifdef DEBUG_DSP_THREADS
174 void dump_item(void)
176 using namespace std;
177 printf("\titem %p\n", this);
178 printf("\tactivation limit %d\n", int(activation_limit));
180 if (!successors.empty()) {
181 printf("\tsuccessors:\n");
182 for(dsp_thread_queue_item * item : successors) {
183 printf("\t\t%p\n", item);
186 printf("\n");
188 #endif
190 private:
191 /** \brief update all successors and possibly mark them as runnable */
192 dsp_thread_queue_item * update_dependencies(dsp_queue_interpreter & interpreter)
194 dsp_thread_queue_item * ptr;
195 std::size_t i = 0;
196 for (;;) {
197 if (i == successors.size())
198 return NULL;
200 ptr = successors[i++]->dec_activation_count(interpreter);
201 if (ptr)
202 break; // no need to update the next item to run
205 while (i != successors.size()) {
206 dsp_thread_queue_item * next = successors[i++]->dec_activation_count(interpreter);
207 if (next)
208 interpreter.mark_as_runnable(next);
211 return ptr;
214 /** \brief decrement activation count and return this, if it drops to zero
216 inline dsp_thread_queue_item * dec_activation_count(dsp_queue_interpreter & interpreter)
218 activation_limit_t current = activation_count--;
219 assert(current > 0);
221 if (current == 1)
222 return this;
223 else
224 return NULL;
227 boost::atomic<activation_limit_t> activation_count; /**< current activation count */
229 runnable job;
230 const successor_list successors; /**< list of successing nodes */
231 const activation_limit_t activation_limit; /**< number of precedessors */
234 template <typename runnable, typename Alloc = std::allocator<void*> >
235 class dsp_thread_queue
237 typedef std::uint_fast16_t node_count_t;
239 typedef nova::dsp_thread_queue_item<runnable, Alloc> dsp_thread_queue_item;
240 typedef std::vector<dsp_thread_queue_item*,
241 typename Alloc::template rebind<dsp_thread_queue_item*>::other
242 > item_vector_t;
244 typedef typename Alloc::template rebind<dsp_thread_queue_item>::other item_allocator;
246 public:
247 #ifdef DEBUG_DSP_THREADS
248 void dump_queue(void)
250 using namespace std;
252 printf("queue %p\n items:\n", this);
253 BOOST_FOREACH(dsp_thread_queue_item * item, queue_items)
254 item->dump_item();
255 printf("\ninitial items:\n", this);
256 BOOST_FOREACH(dsp_thread_queue_item * item, initially_runnable_items)
257 item->dump_item();
259 printf("\n");
260 std::cout << std::endl;
262 #endif
264 /** preallocate node_count nodes */
265 dsp_thread_queue(std::size_t node_count, bool has_parallelism = true):
266 total_node_count(0), has_parallelism_(has_parallelism)
268 initially_runnable_items.reserve(node_count);
269 queue_items = item_allocator().allocate(node_count * sizeof(dsp_thread_queue_item));
272 ~dsp_thread_queue(void)
274 for (std::size_t i = 0; i != total_node_count; ++i)
275 queue_items[i].~dsp_thread_queue_item();
276 item_allocator().deallocate(queue_items, total_node_count * sizeof(dsp_thread_queue_item));
279 void add_initially_runnable(dsp_thread_queue_item * item)
281 initially_runnable_items.push_back(item);
284 /** return initialized queue item */
285 dsp_thread_queue_item *
286 allocate_queue_item(runnable const & job,
287 typename dsp_thread_queue_item::successor_list const & successors,
288 typename dsp_thread_queue_item::activation_limit_t activation_limit)
290 dsp_thread_queue_item * ret = queue_items + total_node_count;
291 ++total_node_count;
293 assert (total_node_count <= initially_runnable_items.capacity());
294 new (ret) dsp_thread_queue_item(job, successors, activation_limit);
295 return ret;
298 void reset_activation_counts(void)
300 for (node_count_t i = 0; i != total_node_count; ++i)
301 queue_items[i].reset_activation_count();
304 node_count_t get_total_node_count(void) const
306 return total_node_count;
309 bool has_parallelism(void) const
311 return has_parallelism_;
314 private:
315 node_count_t total_node_count; /* total number of nodes */
316 item_vector_t initially_runnable_items; /* nodes without precedessor */
317 dsp_thread_queue_item * queue_items; /* all nodes */
318 const bool has_parallelism_;
320 friend class dsp_queue_interpreter<runnable, Alloc>;
323 template <typename runnable,
324 typename Alloc = std::allocator<void*> >
325 class dsp_queue_interpreter
327 protected:
328 typedef nova::dsp_thread_queue<runnable, Alloc> dsp_thread_queue;
329 typedef nova::dsp_thread_queue_item<runnable, Alloc> dsp_thread_queue_item;
330 typedef typename dsp_thread_queue_item::successor_list successor_list;
331 typedef std::size_t size_t;
333 public:
334 typedef boost::uint_fast8_t thread_count_t;
335 typedef boost::uint_fast16_t node_count_t;
337 typedef std::unique_ptr<dsp_thread_queue> dsp_thread_queue_ptr;
339 dsp_queue_interpreter(thread_count_t tc):
340 node_count(0)
342 if (!runnable_set.is_lock_free())
343 std::cout << "Warning: scheduler queue is not lockfree!" << std::endl;
345 set_thread_count(tc);
348 /** prepares queue and queue interpreter for dsp tick
350 * \return true, if dsp queue is valid
351 * false, if no dsp queue is available or queue is empty
353 bool init_tick(void)
355 if (unlikely((queue.get() == NULL) or /* no queue */
356 (queue->get_total_node_count() == 0) /* no nodes */
358 return false;
360 /* reset node count */
361 assert(node_count == 0);
362 assert(runnable_set.empty());
363 node_count.store(queue->get_total_node_count(), boost::memory_order_release);
365 for (size_t i = 0; i != queue->initially_runnable_items.size(); ++i)
366 mark_as_runnable(queue->initially_runnable_items[i]);
368 return true;
371 dsp_thread_queue_ptr release_queue(void)
373 dsp_thread_queue_ptr ret(queue.release());
374 return ret;
377 dsp_thread_queue_ptr reset_queue(dsp_thread_queue_ptr && new_queue)
379 dsp_thread_queue_ptr ret(std::move(queue));
381 queue = std::move(new_queue);
382 if (queue.get() == 0)
383 return ret;
385 queue->reset_activation_counts();
387 #ifdef DEBUG_DSP_THREADS
388 queue->dump_queue();
389 #endif
391 if (queue->has_parallelism()) {
392 thread_count_t thread_number =
393 std::min(thread_count_t(std::min(total_node_count(),
394 node_count_t(std::numeric_limits<thread_count_t>::max()))),
395 thread_count);
397 used_helper_threads = thread_number - 1; /* this thread is not waked up */
398 } else
399 used_helper_threads = 0;
400 return ret;
403 node_count_t total_node_count(void) const
405 return queue->get_total_node_count();
408 void set_thread_count(thread_count_t i)
410 assert (i < std::numeric_limits<thread_count_t>::max());
411 i = std::max(thread_count_t(1u), i);
412 thread_count = i;
415 thread_count_t get_thread_count(void) const
417 return thread_count;
420 thread_count_t get_used_helper_threads(void) const
422 return used_helper_threads;
425 void tick(thread_count_t thread_index)
427 run_item(thread_index);
431 private:
432 struct backup
434 backup(int min, int max): min(min), max(max), loops(min) {}
436 void run(void)
438 for (int i = 0; i != loops; ++i)
439 asm(""); // empty asm to avoid optimization
441 loops = std::min(loops * 2, max);
444 void reset(void)
446 loops = min;
449 int min, max, loops;
452 void run_item(thread_count_t index)
454 backup b(256, 32768);
455 int poll_counts = 0;
457 for (;;) {
458 if (!node_count.load(boost::memory_order_acquire))
459 return;
461 /* we still have some nodes to process */
462 int state = run_next_item(index);
463 switch (state) {
464 case no_remaining_items:
465 return;
466 case fifo_empty:
467 b.run();
468 ++poll_counts;
469 break;
471 case remaining_items:
472 b.reset();
473 poll_counts = 0;
476 if (poll_counts == 50000) {
477 // the maximum poll count is system-dependent. 50000 should be high enough for recent machines
478 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
479 abort();
484 public:
485 void tick_master(void)
487 run_item_master();
490 private:
491 void run_item_master(void)
493 run_item(0);
494 wait_for_end();
495 assert(runnable_set.empty());
498 void wait_for_end(void)
500 int count = 0;
501 while (node_count.load(boost::memory_order_acquire) != 0) {
502 ++count;
503 if (count == 1000000) {
504 std::printf("nova::dsp_queue_interpreter::run_item: possible lookup detected\n");
505 abort();
507 } // busy-wait for helper threads to finish
510 HOT int run_next_item(thread_count_t index)
512 dsp_thread_queue_item * item;
513 bool success = runnable_set.pop(item);
515 if (!success)
516 return fifo_empty;
518 node_count_t consumed = 0;
520 do {
521 item = item->run(*this, index);
522 consumed += 1;
523 } while (item != NULL);
525 node_count_t remaining = node_count.fetch_sub(consumed, boost::memory_order_release);
527 assert (remaining >= consumed);
529 if (remaining == consumed)
530 return no_remaining_items;
531 else
532 return remaining_items;
535 void mark_as_runnable(dsp_thread_queue_item * item)
537 runnable_set.push(item);
540 friend class nova::dsp_thread_queue_item<runnable, Alloc>;
542 private:
543 enum {
544 no_remaining_items,
545 fifo_empty,
546 remaining_items
549 dsp_thread_queue_ptr queue;
551 thread_count_t thread_count; /* number of dsp threads to be used by this queue */
552 thread_count_t used_helper_threads; /* number of helper threads, which are actually used */
554 boost::lockfree::stack<dsp_thread_queue_item*, boost::lockfree::capacity<32768> > runnable_set;
555 boost::atomic<node_count_t> node_count; /* number of nodes, that need to be processed during this tick */
558 } /* namespace nova */
560 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_QUEUE_HPP */