Merge pull request #506 from andrewcsmith/patch-2
[supercollider.git] / external_libraries / boost-lockfree / boost / lockfree / spsc_queue.hpp
blob4c97ebd48a5a4907426368200017c63de4c44f28
1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
3 //
4 // Copyright (C) 2009, 2011 Tim Blechmann
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
13 #include <algorithm>
15 #include <boost/array.hpp>
16 #include <boost/noncopyable.hpp>
17 #include <boost/static_assert.hpp>
19 #include <boost/lockfree/detail/atomic.hpp>
20 #include <boost/lockfree/detail/branch_hints.hpp>
21 #include <boost/lockfree/detail/parameter.hpp>
22 #include <boost/lockfree/detail/prefix.hpp>
25 namespace boost {
26 namespace lockfree {
27 namespace detail {
29 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
30 boost::parameter::optional<tag::allocator>
31 > ringbuffer_signature;
33 template <typename T>
34 class ringbuffer_base:
35 boost::noncopyable
37 #ifndef BOOST_DOXYGEN_INVOKED
38 typedef std::size_t size_t;
39 static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
40 atomic<size_t> write_index_;
41 char padding1[padding_size]; /* force read_index and write_index to different cache lines */
42 atomic<size_t> read_index_;
44 protected:
45 ringbuffer_base(void):
46 write_index_(0), read_index_(0)
49 static size_t next_index(size_t arg, size_t max_size)
51 size_t ret = arg + 1;
52 while (unlikely(ret >= max_size))
53 ret -= max_size;
54 return ret;
57 static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
59 if (write_index >= read_index)
60 return write_index - read_index;
62 size_t ret = write_index + max_size - read_index;
63 return ret;
66 static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
68 size_t ret = read_index - write_index - 1;
69 if (write_index >= read_index)
70 ret += max_size;
71 return ret;
74 bool push(T const & t, T * buffer, size_t max_size)
76 size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
77 size_t next = next_index(write_index, max_size);
79 if (next == read_index_.load(memory_order_acquire))
80 return false; /* ringbuffer is full */
82 buffer[write_index] = t;
84 write_index_.store(next, memory_order_release);
86 return true;
89 size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
91 size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
92 const size_t read_index = read_index_.load(memory_order_acquire);
93 const size_t avail = write_available(write_index, read_index, max_size);
95 if (avail == 0)
96 return 0;
98 input_count = std::min(input_count, avail);
100 size_t new_write_index = write_index + input_count;
102 if (write_index + input_count > max_size) {
103 /* copy data in two sections */
104 size_t count0 = max_size - write_index;
106 std::copy(input_buffer, input_buffer + count0, internal_buffer + write_index);
107 std::copy(input_buffer + count0, input_buffer + input_count, internal_buffer);
108 new_write_index -= max_size;
109 } else {
110 std::copy(input_buffer, input_buffer + input_count, internal_buffer + write_index);
112 if (new_write_index == max_size)
113 new_write_index = 0;
116 write_index_.store(new_write_index, memory_order_release);
117 return input_count;
120 template <typename ConstIterator>
121 ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
123 // FIXME: avoid std::distance and std::advance
125 size_t write_index = write_index_.load(memory_order_relaxed); // only written from push thread
126 const size_t read_index = read_index_.load(memory_order_acquire);
127 const size_t avail = write_available(write_index, read_index, max_size);
129 if (avail == 0)
130 return begin;
132 size_t input_count = std::distance(begin, end);
133 input_count = std::min(input_count, avail);
135 size_t new_write_index = write_index + input_count;
137 ConstIterator last = begin;
138 std::advance(last, input_count);
140 if (write_index + input_count > max_size) {
141 /* copy data in two sections */
142 size_t count0 = max_size - write_index;
143 ConstIterator midpoint = begin;
144 std::advance(midpoint, count0);
146 std::copy(begin, midpoint, internal_buffer + write_index);
147 std::copy(midpoint, last, internal_buffer);
148 new_write_index -= max_size;
149 } else {
150 std::copy(begin, last, internal_buffer + write_index);
152 if (new_write_index == max_size)
153 new_write_index = 0;
156 write_index_.store(new_write_index, memory_order_release);
157 return last;
160 bool pop (T & ret, T * buffer, size_t max_size)
162 size_t write_index = write_index_.load(memory_order_acquire);
163 size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
164 if (empty(write_index, read_index))
165 return false;
167 ret = buffer[read_index];
168 size_t next = next_index(read_index, max_size);
169 read_index_.store(next, memory_order_release);
170 return true;
173 size_t pop (T * output_buffer, size_t output_count, const T * internal_buffer, size_t max_size)
175 const size_t write_index = write_index_.load(memory_order_acquire);
176 size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
178 const size_t avail = read_available(write_index, read_index, max_size);
180 if (avail == 0)
181 return 0;
183 output_count = std::min(output_count, avail);
185 size_t new_read_index = read_index + output_count;
187 if (read_index + output_count > max_size) {
188 /* copy data in two sections */
189 size_t count0 = max_size - read_index;
190 size_t count1 = output_count - count0;
192 std::copy(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
193 std::copy(internal_buffer, internal_buffer + count1, output_buffer + count0);
195 new_read_index -= max_size;
196 } else {
197 std::copy(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
198 if (new_read_index == max_size)
199 new_read_index = 0;
202 read_index_.store(new_read_index, memory_order_release);
203 return output_count;
206 template <typename OutputIterator>
207 size_t pop (OutputIterator it, const T * internal_buffer, size_t max_size)
209 const size_t write_index = write_index_.load(memory_order_acquire);
210 size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
212 const size_t avail = read_available(write_index, read_index, max_size);
213 if (avail == 0)
214 return 0;
216 size_t new_read_index = read_index + avail;
218 if (read_index + avail > max_size) {
219 /* copy data in two sections */
220 size_t count0 = max_size - read_index;
221 size_t count1 = avail - count0;
223 std::copy(internal_buffer + read_index, internal_buffer + max_size, it);
224 std::copy(internal_buffer, internal_buffer + count1, it);
226 new_read_index -= max_size;
227 } else {
228 std::copy(internal_buffer + read_index, internal_buffer + read_index + avail, it);
229 if (new_read_index == max_size)
230 new_read_index = 0;
233 read_index_.store(new_read_index, memory_order_release);
234 return avail;
236 #endif
239 public:
240 /** reset the ringbuffer
242 * \note Not thread-safe
243 * */
244 void reset(void)
246 write_index_.store(0, memory_order_relaxed);
247 read_index_.store(0, memory_order_release);
250 /** Check if the ringbuffer is empty
252 * \return true, if the ringbuffer is empty, false otherwise
253 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
254 * */
255 bool empty(void)
257 return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
261 * \return true, if implementation is lock-free.
263 * */
264 bool is_lock_free(void) const
266 return write_index_.is_lock_free() && read_index_.is_lock_free();
269 private:
270 bool empty(size_t write_index, size_t read_index)
272 return write_index == read_index;
276 template <typename T, std::size_t max_size>
277 class compile_time_sized_ringbuffer:
278 public ringbuffer_base<T>
280 typedef std::size_t size_t;
281 boost::array<T, max_size> array_;
283 public:
284 bool push(T const & t)
286 return ringbuffer_base<T>::push(t, array_.c_array(), max_size);
289 bool pop(T & ret)
291 return ringbuffer_base<T>::pop(ret, array_.c_array(), max_size);
294 size_t push(T const * t, size_t size)
296 return ringbuffer_base<T>::push(t, size, array_.c_array(), max_size);
299 template <size_t size>
300 size_t push(T const (&t)[size])
302 return push(t, size);
305 template <typename ConstIterator>
306 ConstIterator push(ConstIterator begin, ConstIterator end)
308 return ringbuffer_base<T>::push(begin, end, array_.c_array(), max_size);
311 size_t pop(T * ret, size_t size)
313 return ringbuffer_base<T>::pop(ret, size, array_.c_array(), max_size);
316 template <size_t size>
317 size_t pop(T (&ret)[size])
319 return pop(ret, size);
322 template <typename OutputIterator>
323 size_t pop(OutputIterator it)
325 return ringbuffer_base<T>::pop(it, array_.c_array(), max_size);
329 template <typename T, typename Alloc>
330 class runtime_sized_ringbuffer:
331 public ringbuffer_base<T>,
332 private Alloc
334 typedef std::size_t size_t;
335 size_t max_elements_;
336 typedef typename Alloc::pointer pointer;
337 pointer array_;
339 public:
340 explicit runtime_sized_ringbuffer(size_t max_elements):
341 max_elements_(max_elements)
343 // TODO: we don't necessarily need to construct all elements
344 array_ = Alloc::allocate(max_elements);
345 for (size_t i = 0; i != max_elements; ++i)
346 Alloc::construct(array_ + i, T());
349 template <typename U>
350 runtime_sized_ringbuffer(typename Alloc::template rebind<U>::other const & alloc, size_t max_elements):
351 Alloc(alloc), max_elements_(max_elements)
353 // TODO: we don't necessarily need to construct all elements
354 array_ = Alloc::allocate(max_elements);
355 for (size_t i = 0; i != max_elements; ++i)
356 Alloc::construct(array_ + i, T());
359 runtime_sized_ringbuffer(Alloc const & alloc, size_t max_elements):
360 Alloc(alloc), max_elements_(max_elements)
362 // TODO: we don't necessarily need to construct all elements
363 array_ = Alloc::allocate(max_elements);
364 for (size_t i = 0; i != max_elements; ++i)
365 Alloc::construct(array_ + i, T());
368 ~runtime_sized_ringbuffer(void)
370 for (size_t i = 0; i != max_elements_; ++i)
371 Alloc::destroy(array_ + i);
372 Alloc::deallocate(array_, max_elements_);
375 bool push(T const & t)
377 return ringbuffer_base<T>::push(t, &*array_, max_elements_);
380 bool pop(T & ret)
382 return ringbuffer_base<T>::pop(ret, &*array_, max_elements_);
385 size_t push(T const * t, size_t size)
387 return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
390 template <size_t size>
391 size_t push(T const (&t)[size])
393 return push(t, size);
396 template <typename ConstIterator>
397 ConstIterator push(ConstIterator begin, ConstIterator end)
399 return ringbuffer_base<T>::push(begin, end, array_, max_elements_);
402 size_t pop(T * ret, size_t size)
404 return ringbuffer_base<T>::pop(ret, size, array_, max_elements_);
407 template <size_t size>
408 size_t pop(T (&ret)[size])
410 return pop(ret, size);
413 template <typename OutputIterator>
414 size_t pop(OutputIterator it)
416 return ringbuffer_base<T>::pop(it, array_, max_elements_);
420 template <typename T, typename A0, typename A1>
421 struct make_ringbuffer
423 typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
425 typedef extract_capacity<bound_args> extract_capacity_t;
427 static const bool runtime_sized = !extract_capacity_t::has_capacity;
428 static const size_t capacity = extract_capacity_t::capacity;
430 typedef extract_allocator<bound_args, T> extract_allocator_t;
431 typedef typename extract_allocator_t::type allocator;
433 // allocator argument is only sane, for run-time sized ringbuffers
434 BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
435 mpl::bool_<!extract_allocator_t::has_allocator>,
436 mpl::true_
437 >::type::value));
439 typedef typename mpl::if_c<runtime_sized,
440 runtime_sized_ringbuffer<T, allocator>,
441 compile_time_sized_ringbuffer<T, capacity>
442 >::type ringbuffer_type;
446 } /* namespace detail */
449 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
451 * \b Policies:
452 * - \c boost::lockfree::capacity<>, optional <br>
453 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
455 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
456 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
457 * to be sized at run-time
459 * \b Requirements:
460 * - T must have a default constructor
461 * - T must be copyable
462 * */
463 #ifndef BOOST_DOXYGEN_INVOKED
464 template <typename T,
465 class A0 = boost::parameter::void_,
466 class A1 = boost::parameter::void_>
467 #else
468 template <typename T, ...Options>
469 #endif
470 class spsc_queue:
471 public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
473 private:
475 #ifndef BOOST_DOXYGEN_INVOKED
476 typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
477 static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
478 typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
480 struct implementation_defined
482 typedef allocator_arg allocator;
483 typedef std::size_t size_type;
485 #endif
487 public:
488 typedef T value_type;
489 typedef typename implementation_defined::allocator allocator;
490 typedef typename implementation_defined::size_type size_type;
492 /** Constructs a spsc_queue
494 * \pre spsc_queue must be configured to be sized at compile-time
496 // @{
497 spsc_queue(void)
499 BOOST_STATIC_ASSERT(!runtime_sized);
502 template <typename U>
503 explicit spsc_queue(typename allocator::template rebind<U>::other const & alloc)
505 // just for API compatibility: we don't actually need an allocator
506 BOOST_STATIC_ASSERT(!runtime_sized);
509 explicit spsc_queue(allocator const & alloc)
511 // just for API compatibility: we don't actually need an allocator
512 BOOST_STATIC_ASSERT(!runtime_sized);
514 // @}
517 /** Constructs a spsc_queue for element_count elements
519 * \pre spsc_queue must be configured to be sized at run-time
521 // @{
522 explicit spsc_queue(size_type element_count):
523 base_type(element_count)
525 BOOST_STATIC_ASSERT(runtime_sized);
528 template <typename U>
529 spsc_queue(size_type element_count, typename allocator::template rebind<U>::other const & alloc):
530 base_type(alloc, element_count)
532 BOOST_STATIC_ASSERT(runtime_sized);
535 spsc_queue(size_type element_count, allocator_arg const & alloc):
536 base_type(alloc, element_count)
538 BOOST_STATIC_ASSERT(runtime_sized);
540 // @}
542 /** Pushes object t to the ringbuffer.
544 * \pre only one thread is allowed to push data to the spsc_queue
545 * \post object will be pushed to the spsc_queue, unless it is full.
546 * \return true, if the push operation is successful.
548 * \note Thread-safe and wait-free
549 * */
550 bool push(T const & t)
552 return base_type::push(t);
555 /** Pops one object from ringbuffer.
557 * \pre only one thread is allowed to pop data to the spsc_queue
558 * \post if ringbuffer is not empty, object will be copied to ret.
559 * \return true, if the pop operation is successful, false if ringbuffer was empty.
561 * \note Thread-safe and wait-free
563 bool pop(T & ret)
565 return base_type::pop(ret);
568 /** Pushes as many objects from the array t as there is space.
570 * \pre only one thread is allowed to push data to the spsc_queue
571 * \return number of pushed items
573 * \note Thread-safe and wait-free
575 size_type push(T const * t, size_type size)
577 return base_type::push(t, size);
580 /** Pushes as many objects from the array t as there is space available.
582 * \pre only one thread is allowed to push data to the spsc_queue
583 * \return number of pushed items
585 * \note Thread-safe and wait-free
587 template <size_type size>
588 size_type push(T const (&t)[size])
590 return push(t, size);
593 /** Pushes as many objects from the range [begin, end) as there is space .
595 * \pre only one thread is allowed to push data to the spsc_queue
596 * \return iterator to the first element, which has not been pushed
598 * \note Thread-safe and wait-free
600 template <typename ConstIterator>
601 ConstIterator push(ConstIterator begin, ConstIterator end)
603 return base_type::push(begin, end);
606 /** Pops a maximum of size objects from ringbuffer.
608 * \pre only one thread is allowed to pop data to the spsc_queue
609 * \return number of popped items
611 * \note Thread-safe and wait-free
612 * */
613 size_type pop(T * ret, size_type size)
615 return base_type::pop(ret, size);
618 /** Pops a maximum of size objects from spsc_queue.
620 * \pre only one thread is allowed to pop data to the spsc_queue
621 * \return number of popped items
623 * \note Thread-safe and wait-free
624 * */
625 template <size_type size>
626 size_type pop(T (&ret)[size])
628 return pop(ret, size);
631 /** Pops objects to the output iterator it
633 * \pre only one thread is allowed to pop data to the spsc_queue
634 * \return number of popped items
636 * \note Thread-safe and wait-free
637 * */
638 template <typename OutputIterator>
639 size_type pop(OutputIterator it)
641 return base_type::pop(it);
645 } /* namespace lockfree */
646 } /* namespace boost */
649 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */