2 // Copyright (C) 2007, 2008, 2009, 2010 Tim Blechmann
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with this program; see the file COPYING. If not, write to
16 // the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
17 // Boston, MA 02111-1307, USA.
19 #ifndef DSP_THREAD_QUEUE_DSP_THREAD_HPP
20 #define DSP_THREAD_QUEUE_DSP_THREAD_HPP
26 #include <boost/ptr_container/ptr_vector.hpp>
28 #include "dsp_thread_queue.hpp"
29 #include "../utilities/malloc_aligned.hpp"
30 #include "nova-tt/mlock.hpp"
36 struct nop_thread_init
41 template <typename Arg
>
42 nop_thread_init(Arg
const &)
45 void operator()(int thread_index
)
52 * the dsp helper threads are running with a high real-time priority and are
53 * pinned to a specific cpu
55 template <typename runnable
,
56 typename thread_init_functor
= nop_thread_init
,
57 typename Alloc
= std::allocator
<void*>
60 public boost::noncopyable
,
61 public thread_init_functor
63 typedef nova::dsp_queue_interpreter
<runnable
, Alloc
> dsp_queue_interpreter
;
66 dsp_thread(dsp_queue_interpreter
& interpreter
, uint16_t index
, size_t stack_size
,
67 thread_init_functor
const & thread_init
= thread_init_functor()):
68 thread_init_functor(thread_init
), interpreter(interpreter
), stop(false), index(index
), stack_ (NULL
)
71 stack_
= malloc_aligned
<char>(stack_size
);
73 throw std::bad_alloc();
74 // touch stack to avoid page faults
75 for (int i
= 0; i
!= stack_size
; ++i
)
77 mlock(stack_
, stack_size
);
91 thread_init_functor::operator()(index
);
95 if (unlikely(stop
.load(boost::memory_order_acquire
)))
98 interpreter
.tick(index
);
102 static void * run_static(void* arg
)
104 dsp_thread
* self
= static_cast<dsp_thread
*>(arg
);
111 stop
.store(true, boost::memory_order_release
);
115 void wake_thread(void)
128 dsp_queue_interpreter
& interpreter
;
129 boost::atomic
<bool> stop
;
134 /** \brief container for all dsp threads
136 * - no care is taken, that dsp_threads::run is executed on a valid instance
139 template <typename runnable
,
140 typename thread_init_functor
= nop_thread_init
,
141 typename Alloc
= std::allocator
<void*>
145 typedef nova::dsp_queue_interpreter
<runnable
, Alloc
> dsp_queue_interpreter
;
147 typedef nova::dsp_thread
<runnable
, thread_init_functor
, Alloc
> dsp_thread
;
151 typedef typename
dsp_queue_interpreter::node_count_t node_count_t
;
152 typedef typename
dsp_queue_interpreter::thread_count_t thread_count_t
;
154 typedef std::unique_ptr
<dsp_thread_queue
<runnable
, Alloc
> > dsp_thread_queue_ptr
;
156 dsp_threads(thread_count_t count
, thread_init_functor
const & init_functor
= thread_init_functor()):
157 interpreter(std::min(count
, (thread_count_t
)std::thread::hardware_concurrency()))
159 set_dsp_thread_count(interpreter
.get_thread_count(), init_functor
);
164 bool run_tick
= interpreter
.init_tick();
165 if (likely(run_tick
)) {
167 interpreter
.tick_master();
173 * don't call, if threads are currently accessing the queue
175 dsp_thread_queue_ptr
reset_queue(dsp_thread_queue_ptr
&& new_queue
)
177 dsp_thread_queue_ptr ret
= interpreter
.reset_queue(std::move(new_queue
));
178 return std::move(ret
);
181 dsp_thread_queue_ptr
release_queue(void)
183 return interpreter
.release_queue();
187 #if _POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600
188 // we can set the stack
189 static const int stack_size
= 524288;
192 void push_back(pthread_t t
)
194 thread_group_
.push_back(t
);
199 for(pthread_t
& thread
: thread_group_
) {
201 int err
= pthread_join(thread
, &ret
);
203 printf("Error when joining helper thread\n");
207 std::vector
<pthread_t
> thread_group_
;
210 void start_threads_impl(void)
212 for(dsp_thread
& thread
: threads
) {
214 pthread_attr_init(&attr
);
215 int err
= pthread_attr_setstack(&attr
, thread
.stack(), stack_size
);
217 throw std::logic_error("Cannot set stack of DSP helper thread");
220 err
= pthread_create(&thread_id
, &attr
, dsp_thread::run_static
, &thread
);
222 throw std::runtime_error("Cannot create DSP helper thread");
223 thread_group_
.push_back(thread_id
);
224 pthread_attr_destroy(&attr
);
232 for (std::thread
& thread
: threads
)
236 std::vector
<std::thread
> threads
;
239 static const int stack_size
= 0;
241 void start_threads_impl(void)
243 for(dsp_thread
& thread
: threads
)
244 thread_group_
.threads
.push_back(std::move(std::thread(std::bind(&dsp_thread::run
, std::ref(thread
)))));
249 void set_dsp_thread_count(thread_count_t count
, thread_init_functor
const & init_functor
)
251 for (thread_count_t i
= 1; i
!= count
; ++i
)
252 threads
.push_back(new dsp_thread(interpreter
, i
, stack_size
, init_functor
));
253 assert(threads
.size() == std::size_t(count
-1));
257 /** thread handling */
259 void start_threads(void)
261 start_threads_impl();
264 void terminate_threads(void)
266 for (dsp_thread
& thread
: threads
)
269 thread_group_
.join_all();
274 /** wake dsp threads */
275 void wake_threads(void)
277 for (thread_count_t i
= 0; i
!= interpreter
.get_used_helper_threads(); ++i
)
278 threads
[i
].wake_thread();
281 dsp_queue_interpreter interpreter
;
283 boost::ptr_vector
<dsp_thread
> threads
; /* container of dsp threads */
284 thread_group thread_group_
;
287 } /* namespace nova */
289 #endif /* DSP_THREAD_QUEUE_DSP_THREAD_HPP */