1 // lock-free single-producer/single-consumer ringbuffer
2 // this algorithm is implemented in various projects (linux kernel)
4 // Copyright (C) 2009, 2011 Tim Blechmann
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
10 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
11 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
15 #include <boost/array.hpp>
16 #include <boost/noncopyable.hpp>
17 #include <boost/static_assert.hpp>
19 #include <boost/lockfree/detail/atomic.hpp>
20 #include <boost/lockfree/detail/branch_hints.hpp>
21 #include <boost/lockfree/detail/parameter.hpp>
22 #include <boost/lockfree/detail/prefix.hpp>
29 typedef parameter::parameters
<boost::parameter::optional
<tag::capacity
>,
30 boost::parameter::optional
<tag::allocator
>
31 > ringbuffer_signature
;
34 class ringbuffer_base
:
37 #ifndef BOOST_DOXYGEN_INVOKED
38 typedef std::size_t size_t;
39 static const int padding_size
= BOOST_LOCKFREE_CACHELINE_BYTES
- sizeof(size_t);
40 atomic
<size_t> write_index_
;
41 char padding1
[padding_size
]; /* force read_index and write_index to different cache lines */
42 atomic
<size_t> read_index_
;
45 ringbuffer_base(void):
46 write_index_(0), read_index_(0)
49 static size_t next_index(size_t arg
, size_t max_size
)
52 while (unlikely(ret
>= max_size
))
57 static size_t read_available(size_t write_index
, size_t read_index
, size_t max_size
)
59 if (write_index
>= read_index
)
60 return write_index
- read_index
;
62 size_t ret
= write_index
+ max_size
- read_index
;
66 static size_t write_available(size_t write_index
, size_t read_index
, size_t max_size
)
68 size_t ret
= read_index
- write_index
- 1;
69 if (write_index
>= read_index
)
74 bool push(T
const & t
, T
* buffer
, size_t max_size
)
76 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from push thread
77 size_t next
= next_index(write_index
, max_size
);
79 if (next
== read_index_
.load(memory_order_acquire
))
80 return false; /* ringbuffer is full */
82 buffer
[write_index
] = t
;
84 write_index_
.store(next
, memory_order_release
);
89 size_t push(const T
* input_buffer
, size_t input_count
, T
* internal_buffer
, size_t max_size
)
91 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from push thread
92 const size_t read_index
= read_index_
.load(memory_order_acquire
);
93 const size_t avail
= write_available(write_index
, read_index
, max_size
);
98 input_count
= std::min(input_count
, avail
);
100 size_t new_write_index
= write_index
+ input_count
;
102 if (write_index
+ input_count
> max_size
) {
103 /* copy data in two sections */
104 size_t count0
= max_size
- write_index
;
106 std::copy(input_buffer
, input_buffer
+ count0
, internal_buffer
+ write_index
);
107 std::copy(input_buffer
+ count0
, input_buffer
+ input_count
, internal_buffer
);
108 new_write_index
-= max_size
;
110 std::copy(input_buffer
, input_buffer
+ input_count
, internal_buffer
+ write_index
);
112 if (new_write_index
== max_size
)
116 write_index_
.store(new_write_index
, memory_order_release
);
120 template <typename ConstIterator
>
121 ConstIterator
push(ConstIterator begin
, ConstIterator end
, T
* internal_buffer
, size_t max_size
)
123 // FIXME: avoid std::distance and std::advance
125 size_t write_index
= write_index_
.load(memory_order_relaxed
); // only written from push thread
126 const size_t read_index
= read_index_
.load(memory_order_acquire
);
127 const size_t avail
= write_available(write_index
, read_index
, max_size
);
132 size_t input_count
= std::distance(begin
, end
);
133 input_count
= std::min(input_count
, avail
);
135 size_t new_write_index
= write_index
+ input_count
;
137 ConstIterator last
= begin
;
138 std::advance(last
, input_count
);
140 if (write_index
+ input_count
> max_size
) {
141 /* copy data in two sections */
142 size_t count0
= max_size
- write_index
;
143 ConstIterator midpoint
= begin
;
144 std::advance(midpoint
, count0
);
146 std::copy(begin
, midpoint
, internal_buffer
+ write_index
);
147 std::copy(midpoint
, last
, internal_buffer
);
148 new_write_index
-= max_size
;
150 std::copy(begin
, last
, internal_buffer
+ write_index
);
152 if (new_write_index
== max_size
)
156 write_index_
.store(new_write_index
, memory_order_release
);
160 bool pop (T
& ret
, T
* buffer
, size_t max_size
)
162 size_t write_index
= write_index_
.load(memory_order_acquire
);
163 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from pop thread
164 if (empty(write_index
, read_index
))
167 ret
= buffer
[read_index
];
168 size_t next
= next_index(read_index
, max_size
);
169 read_index_
.store(next
, memory_order_release
);
173 size_t pop (T
* output_buffer
, size_t output_count
, const T
* internal_buffer
, size_t max_size
)
175 const size_t write_index
= write_index_
.load(memory_order_acquire
);
176 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from pop thread
178 const size_t avail
= read_available(write_index
, read_index
, max_size
);
183 output_count
= std::min(output_count
, avail
);
185 size_t new_read_index
= read_index
+ output_count
;
187 if (read_index
+ output_count
> max_size
) {
188 /* copy data in two sections */
189 size_t count0
= max_size
- read_index
;
190 size_t count1
= output_count
- count0
;
192 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, output_buffer
);
193 std::copy(internal_buffer
, internal_buffer
+ count1
, output_buffer
+ count0
);
195 new_read_index
-= max_size
;
197 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ output_count
, output_buffer
);
198 if (new_read_index
== max_size
)
202 read_index_
.store(new_read_index
, memory_order_release
);
206 template <typename OutputIterator
>
207 size_t pop (OutputIterator it
, const T
* internal_buffer
, size_t max_size
)
209 const size_t write_index
= write_index_
.load(memory_order_acquire
);
210 size_t read_index
= read_index_
.load(memory_order_relaxed
); // only written from pop thread
212 const size_t avail
= read_available(write_index
, read_index
, max_size
);
216 size_t new_read_index
= read_index
+ avail
;
218 if (read_index
+ avail
> max_size
) {
219 /* copy data in two sections */
220 size_t count0
= max_size
- read_index
;
221 size_t count1
= avail
- count0
;
223 std::copy(internal_buffer
+ read_index
, internal_buffer
+ max_size
, it
);
224 std::copy(internal_buffer
, internal_buffer
+ count1
, it
);
226 new_read_index
-= max_size
;
228 std::copy(internal_buffer
+ read_index
, internal_buffer
+ read_index
+ avail
, it
);
229 if (new_read_index
== max_size
)
233 read_index_
.store(new_read_index
, memory_order_release
);
240 /** reset the ringbuffer
242 * \note Not thread-safe
246 write_index_
.store(0, memory_order_relaxed
);
247 read_index_
.store(0, memory_order_release
);
250 /** Check if the ringbuffer is empty
252 * \return true, if the ringbuffer is empty, false otherwise
253 * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
257 return empty(write_index_
.load(memory_order_relaxed
), read_index_
.load(memory_order_relaxed
));
261 * \return true, if implementation is lock-free.
264 bool is_lock_free(void) const
266 return write_index_
.is_lock_free() && read_index_
.is_lock_free();
270 bool empty(size_t write_index
, size_t read_index
)
272 return write_index
== read_index
;
276 template <typename T
, std::size_t max_size
>
277 class compile_time_sized_ringbuffer
:
278 public ringbuffer_base
<T
>
280 typedef std::size_t size_t;
281 boost::array
<T
, max_size
> array_
;
284 bool push(T
const & t
)
286 return ringbuffer_base
<T
>::push(t
, array_
.c_array(), max_size
);
291 return ringbuffer_base
<T
>::pop(ret
, array_
.c_array(), max_size
);
294 size_t push(T
const * t
, size_t size
)
296 return ringbuffer_base
<T
>::push(t
, size
, array_
.c_array(), max_size
);
299 template <size_t size
>
300 size_t push(T
const (&t
)[size
])
302 return push(t
, size
);
305 template <typename ConstIterator
>
306 ConstIterator
push(ConstIterator begin
, ConstIterator end
)
308 return ringbuffer_base
<T
>::push(begin
, end
, array_
.c_array(), max_size
);
311 size_t pop(T
* ret
, size_t size
)
313 return ringbuffer_base
<T
>::pop(ret
, size
, array_
.c_array(), max_size
);
316 template <size_t size
>
317 size_t pop(T (&ret
)[size
])
319 return pop(ret
, size
);
322 template <typename OutputIterator
>
323 size_t pop(OutputIterator it
)
325 return ringbuffer_base
<T
>::pop(it
, array_
.c_array(), max_size
);
329 template <typename T
, typename Alloc
>
330 class runtime_sized_ringbuffer
:
331 public ringbuffer_base
<T
>,
334 typedef std::size_t size_t;
335 size_t max_elements_
;
336 typedef typename
Alloc::pointer pointer
;
340 explicit runtime_sized_ringbuffer(size_t max_elements
):
341 max_elements_(max_elements
)
343 // TODO: we don't necessarily need to construct all elements
344 array_
= Alloc::allocate(max_elements
);
345 for (size_t i
= 0; i
!= max_elements
; ++i
)
346 Alloc::construct(array_
+ i
, T());
349 template <typename U
>
350 runtime_sized_ringbuffer(typename
Alloc::template rebind
<U
>::other
const & alloc
, size_t max_elements
):
351 Alloc(alloc
), max_elements_(max_elements
)
353 // TODO: we don't necessarily need to construct all elements
354 array_
= Alloc::allocate(max_elements
);
355 for (size_t i
= 0; i
!= max_elements
; ++i
)
356 Alloc::construct(array_
+ i
, T());
359 runtime_sized_ringbuffer(Alloc
const & alloc
, size_t max_elements
):
360 Alloc(alloc
), max_elements_(max_elements
)
362 // TODO: we don't necessarily need to construct all elements
363 array_
= Alloc::allocate(max_elements
);
364 for (size_t i
= 0; i
!= max_elements
; ++i
)
365 Alloc::construct(array_
+ i
, T());
368 ~runtime_sized_ringbuffer(void)
370 for (size_t i
= 0; i
!= max_elements_
; ++i
)
371 Alloc::destroy(array_
+ i
);
372 Alloc::deallocate(array_
, max_elements_
);
375 bool push(T
const & t
)
377 return ringbuffer_base
<T
>::push(t
, &*array_
, max_elements_
);
382 return ringbuffer_base
<T
>::pop(ret
, &*array_
, max_elements_
);
385 size_t push(T
const * t
, size_t size
)
387 return ringbuffer_base
<T
>::push(t
, size
, &*array_
, max_elements_
);
390 template <size_t size
>
391 size_t push(T
const (&t
)[size
])
393 return push(t
, size
);
396 template <typename ConstIterator
>
397 ConstIterator
push(ConstIterator begin
, ConstIterator end
)
399 return ringbuffer_base
<T
>::push(begin
, end
, array_
, max_elements_
);
402 size_t pop(T
* ret
, size_t size
)
404 return ringbuffer_base
<T
>::pop(ret
, size
, array_
, max_elements_
);
407 template <size_t size
>
408 size_t pop(T (&ret
)[size
])
410 return pop(ret
, size
);
413 template <typename OutputIterator
>
414 size_t pop(OutputIterator it
)
416 return ringbuffer_base
<T
>::pop(it
, array_
, max_elements_
);
420 template <typename T
, typename A0
, typename A1
>
421 struct make_ringbuffer
423 typedef typename
ringbuffer_signature::bind
<A0
, A1
>::type bound_args
;
425 typedef extract_capacity
<bound_args
> extract_capacity_t
;
427 static const bool runtime_sized
= !extract_capacity_t::has_capacity
;
428 static const size_t capacity
= extract_capacity_t::capacity
;
430 typedef extract_allocator
<bound_args
, T
> extract_allocator_t
;
431 typedef typename
extract_allocator_t::type allocator
;
433 // allocator argument is only sane, for run-time sized ringbuffers
434 BOOST_STATIC_ASSERT((mpl::if_
<mpl::bool_
<!runtime_sized
>,
435 mpl::bool_
<!extract_allocator_t::has_allocator
>,
439 typedef typename
mpl::if_c
<runtime_sized
,
440 runtime_sized_ringbuffer
<T
, allocator
>,
441 compile_time_sized_ringbuffer
<T
, capacity
>
442 >::type ringbuffer_type
;
446 } /* namespace detail */
449 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
452 * - \c boost::lockfree::capacity<>, optional <br>
453 * If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
455 * - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
456 * Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
457 * to be sized at run-time
460 * - T must have a default constructor
461 * - T must be copyable
463 #ifndef BOOST_DOXYGEN_INVOKED
464 template <typename T
,
465 class A0
= boost::parameter::void_
,
466 class A1
= boost::parameter::void_
>
468 template <typename T
, ...Options
>
471 public detail::make_ringbuffer
<T
, A0
, A1
>::ringbuffer_type
475 #ifndef BOOST_DOXYGEN_INVOKED
476 typedef typename
detail::make_ringbuffer
<T
, A0
, A1
>::ringbuffer_type base_type
;
477 static const bool runtime_sized
= detail::make_ringbuffer
<T
, A0
, A1
>::runtime_sized
;
478 typedef typename
detail::make_ringbuffer
<T
, A0
, A1
>::allocator allocator_arg
;
480 struct implementation_defined
482 typedef allocator_arg allocator
;
483 typedef std::size_t size_type
;
488 typedef T value_type
;
489 typedef typename
implementation_defined::allocator allocator
;
490 typedef typename
implementation_defined::size_type size_type
;
492 /** Constructs a spsc_queue
494 * \pre spsc_queue must be configured to be sized at compile-time
499 BOOST_STATIC_ASSERT(!runtime_sized
);
502 template <typename U
>
503 explicit spsc_queue(typename
allocator::template rebind
<U
>::other
const & alloc
)
505 // just for API compatibility: we don't actually need an allocator
506 BOOST_STATIC_ASSERT(!runtime_sized
);
509 explicit spsc_queue(allocator
const & alloc
)
511 // just for API compatibility: we don't actually need an allocator
512 BOOST_STATIC_ASSERT(!runtime_sized
);
517 /** Constructs a spsc_queue for element_count elements
519 * \pre spsc_queue must be configured to be sized at run-time
522 explicit spsc_queue(size_type element_count
):
523 base_type(element_count
)
525 BOOST_STATIC_ASSERT(runtime_sized
);
528 template <typename U
>
529 spsc_queue(size_type element_count
, typename
allocator::template rebind
<U
>::other
const & alloc
):
530 base_type(alloc
, element_count
)
532 BOOST_STATIC_ASSERT(runtime_sized
);
535 spsc_queue(size_type element_count
, allocator_arg
const & alloc
):
536 base_type(alloc
, element_count
)
538 BOOST_STATIC_ASSERT(runtime_sized
);
542 /** Pushes object t to the ringbuffer.
544 * \pre only one thread is allowed to push data to the spsc_queue
545 * \post object will be pushed to the spsc_queue, unless it is full.
546 * \return true, if the push operation is successful.
548 * \note Thread-safe and wait-free
550 bool push(T
const & t
)
552 return base_type::push(t
);
555 /** Pops one object from ringbuffer.
557 * \pre only one thread is allowed to pop data to the spsc_queue
558 * \post if ringbuffer is not empty, object will be copied to ret.
559 * \return true, if the pop operation is successful, false if ringbuffer was empty.
561 * \note Thread-safe and wait-free
565 return base_type::pop(ret
);
568 /** Pushes as many objects from the array t as there is space.
570 * \pre only one thread is allowed to push data to the spsc_queue
571 * \return number of pushed items
573 * \note Thread-safe and wait-free
575 size_type
push(T
const * t
, size_type size
)
577 return base_type::push(t
, size
);
580 /** Pushes as many objects from the array t as there is space available.
582 * \pre only one thread is allowed to push data to the spsc_queue
583 * \return number of pushed items
585 * \note Thread-safe and wait-free
587 template <size_type size
>
588 size_type
push(T
const (&t
)[size
])
590 return push(t
, size
);
593 /** Pushes as many objects from the range [begin, end) as there is space .
595 * \pre only one thread is allowed to push data to the spsc_queue
596 * \return iterator to the first element, which has not been pushed
598 * \note Thread-safe and wait-free
600 template <typename ConstIterator
>
601 ConstIterator
push(ConstIterator begin
, ConstIterator end
)
603 return base_type::push(begin
, end
);
606 /** Pops a maximum of size objects from ringbuffer.
608 * \pre only one thread is allowed to pop data to the spsc_queue
609 * \return number of popped items
611 * \note Thread-safe and wait-free
613 size_type
pop(T
* ret
, size_type size
)
615 return base_type::pop(ret
, size
);
618 /** Pops a maximum of size objects from spsc_queue.
620 * \pre only one thread is allowed to pop data to the spsc_queue
621 * \return number of popped items
623 * \note Thread-safe and wait-free
625 template <size_type size
>
626 size_type
pop(T (&ret
)[size
])
628 return pop(ret
, size
);
631 /** Pops objects to the output iterator it
633 * \pre only one thread is allowed to pop data to the spsc_queue
634 * \return number of popped items
636 * \note Thread-safe and wait-free
638 template <typename OutputIterator
>
639 size_type
pop(OutputIterator it
)
641 return base_type::pop(it
);
645 } /* namespace lockfree */
646 } /* namespace boost */
649 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */