2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
13 * Copyright 2015 Joyent, Inc.
19 * A multi-threaded merging queue.
21 * The general constraint of the merge queue is that if a set of items are
22 * inserted into the queue in the same order, then no matter how many threads
23 * are on the scene, we will always process the items in the same order. The
24 * secondary constraint is that to support environments that must be
25 * single-threaded, we explicitly *must not* create a thread in the case where
26 * the number of requested threads is just one.
28 * To that end, we've designed our queue as a circular buffer. We will grow that
29 * buffer to contain enough space for all the input items, after which we'll
30 * then treat it as a circular buffer.
32 * Items will be issued to a processing function two at a time, until there is
33 * only one item remaining in the queue, at which point we will be doing doing
36 * A given queue has three different entries that we care about tracking:
38 * o mq_nproc - What is the slot of the next item to process for something
41 * o mq_next - What is the slot of the next item that should be inserted into
44 * o mq_ncommit - What is the slot of the next item that should be committed.
46 * When a thread comes and looks for work, we pop entries off of the queue based
47 * on the index provided by mq_nproc. At the same time, it also gets the slot
48 * that it should place the result in, which is mq_next. However, because we
49 * have multiple threads that are operating on the system, we want to make sure
50 * that we push things onto the queue in order. We do that by allocating a slot
51 * to each task and when it completes, it waits for its slot to be ready based
52 * on it being the value of mq_ncommit.
54 * In addition, we keep track of the number of items in the queue as well as the
55 * number of active workers. There's also a generation count that is used to
56 * figure out when the various values might lap one another.
58 * The following images show what happens when we have a queue with six items
59 * and whose capacity has been shrunk to six, to better fit in the screen.
62 * 1) This is the initial configuration of the queue right before any processing
63 * is done in the context of mergeq_merge(). Every box has an initial item for
64 * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and
65 * mq_ncommit will all point at the initial entry. However, the mq_next has
66 * already lapped around the array and thus has a generation count of one.
68 * The '+' characters indicate which bucket the corresponding value of mq_nproc,
69 * mq_ncommit, and mq_nproc.
71 * +---++---++---++---++---++---+
72 * | X || X || X || X || X || X |
73 * +---++---++---++---++---++---+
78 * 2) This shows the state right as the first thread begins to process an entry.
79 * Note in this example we will have two threads processing this queue. Note,
80 * mq_ncommit has not advanced. This is because the first thread has started
81 * processing entries, but it has not finished, and thus we can't commit it.
82 * We've incremented mq_next by one because it has gone ahead and assigned a
83 * single entry. We've incremented mq_nproc by two, because we have removed two
84 * entries and thus will have another set available.
86 * +---++---++---++---++---++---+ t1 - slot 0
87 * | || || X || X || X || X | t2 - idle
88 * +---++---++---++---++---++---+
94 * 3) This shows the state right after the second thread begins to process an
95 * entry, note that the first thread has not finished. The changes are very
96 * similar to the previous state, we've advanced, mq_nproc and mq_next, but not
99 * +---++---++---++---++---++---+ t1 - slot 0
100 * | || || || || X || X | t2 - slot 1
101 * +---++---++---++---++---++---+
106 * 4) This shows the state after thread one has finished processing an item, but
107 * before it does anything else. Note that even if thread two finishes early, it
108 * cannot commit its item until thread one finishes. Here 'Y' refers to the
109 * result of merging the first two 'X's.
111 * +---++---++---++---++---++---+ t1 - idle
112 * | Y || || || || X || X | t2 - slot 1
113 * +---++---++---++---++---++---+
118 * 5) This shows the state after thread one has begun to process the next round
119 * and after thread two has committed, but before it begins processing the next
120 * item. Note that mq_nproc has wrapped around and we've bumped its generation
123 * +---++---++---++---++---++---+ t1 - slot 2
124 * | Y || Y || || || || | t2 - idle
125 * +---++---++---++---++---++---+
130 * 6) Here, thread two, will take the next two Y values and thread 1 will commit
131 * its 'Y'. Thread one now must wait until thread two finishes such that it can
132 * do additional work.
134 * +---++---++---++---++---++---+ t1 - waiting
135 * | || || Y || || || | t2 - slot 3
136 * +---++---++---++---++---++---+
141 * 7) Here, thread two has committed and thread one is about to go process the
142 * final entry. The character 'Z' represents the results of merging two 'Y's.
144 * +---++---++---++---++---++---+ t1 - idle
145 * | || || Y || Z || || | t2 - idle
146 * +---++---++---++---++---++---+
151 * 8) Here, thread one is processing the final item. Thread two is waiting in
152 * mergeq_pop() for enough items to be available. In this case, it will never
153 * happen; however, once all threads have finished it will break out.
155 * +---++---++---++---++---++---+ t1 - slot 4
156 * | || || || || || | t2 - idle
157 * +---++---++---++---++---++---+
162 * 9) This is the final state of the queue, it has a single '*' item which is
163 * the final merge result. At this point, both thread one and thread two would
164 * stop processing and we'll return the result to the user.
166 * +---++---++---++---++---++---+ t1 - slot 4
167 * | || || || || * || | t2 - idle
168 * +---++---++---++---++---++---+
174 * Note, that if at any point in time the processing function fails, then all
175 * the merges will quiesce and that error will be propagated back to the user.
179 #include <sys/debug.h>
189 mutex_t mq_lock
; /* Protects items below */
190 cond_t mq_cond
; /* Condition variable */
191 void **mq_items
; /* Array of items to process */
192 size_t mq_nitems
; /* Number of items in the queue */
193 size_t mq_cap
; /* Capacity of the items */
194 size_t mq_next
; /* Place to put next entry */
195 size_t mq_gnext
; /* Generation for next */
196 size_t mq_nproc
; /* Index of next thing to process */
197 size_t mq_gnproc
; /* Generation for next proc */
198 size_t mq_ncommit
; /* Index of the next thing to commit */
199 size_t mq_gncommit
; /* Commit generation */
200 uint_t mq_nactthrs
; /* Number of active threads */
201 uint_t mq_ndthreads
; /* Desired number of threads */
202 thread_t
*mq_thrs
; /* Actual threads */
203 mergeq_proc_f
*mq_func
; /* Processing function */
204 void *mq_arg
; /* Argument for processing */
205 boolean_t mq_working
; /* Are we working on processing */
206 boolean_t mq_iserror
; /* Have we encountered an error? */
210 #define MERGEQ_DEFAULT_CAP 64
213 mergeq_error(int err
)
216 return (MERGEQ_ERROR
);
220 mergeq_fini(mergeq_t
*mqp
)
225 VERIFY(mqp
->mq_working
!= B_TRUE
);
227 if (mqp
->mq_items
!= NULL
)
228 mergeq_free(mqp
->mq_items
, sizeof (void *) * mqp
->mq_cap
);
229 if (mqp
->mq_ndthreads
> 0) {
230 mergeq_free(mqp
->mq_thrs
, sizeof (thread_t
) *
233 VERIFY0(cond_destroy(&mqp
->mq_cond
));
234 VERIFY0(mutex_destroy(&mqp
->mq_lock
));
235 mergeq_free(mqp
, sizeof (mergeq_t
));
239 mergeq_init(mergeq_t
**outp
, uint_t nthrs
)
244 mqp
= mergeq_alloc(sizeof (mergeq_t
));
246 return (mergeq_error(ENOMEM
));
248 bzero(mqp
, sizeof (mergeq_t
));
249 mqp
->mq_items
= mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP
);
250 if (mqp
->mq_items
== NULL
) {
251 mergeq_free(mqp
, sizeof (mergeq_t
));
252 return (mergeq_error(ENOMEM
));
254 bzero(mqp
->mq_items
, sizeof (void *) * MERGEQ_DEFAULT_CAP
);
256 mqp
->mq_ndthreads
= nthrs
- 1;
257 if (mqp
->mq_ndthreads
> 0) {
258 mqp
->mq_thrs
= mergeq_alloc(sizeof (thread_t
) *
260 if (mqp
->mq_thrs
== NULL
) {
261 mergeq_free(mqp
->mq_items
, sizeof (void *) *
263 mergeq_free(mqp
, sizeof (mergeq_t
));
264 return (mergeq_error(ENOMEM
));
268 if ((ret
= mutex_init(&mqp
->mq_lock
, USYNC_THREAD
| LOCK_ERRORCHECK
,
270 if (mqp
->mq_ndthreads
> 0) {
271 mergeq_free(mqp
->mq_thrs
,
272 sizeof (thread_t
) * mqp
->mq_ndthreads
);
274 mergeq_free(mqp
->mq_items
, sizeof (void *) *
276 mergeq_free(mqp
, sizeof (mergeq_t
));
277 return (mergeq_error(ret
));
280 if ((ret
= cond_init(&mqp
->mq_cond
, USYNC_THREAD
, NULL
)) != 0) {
281 VERIFY0(mutex_destroy(&mqp
->mq_lock
));
282 if (mqp
->mq_ndthreads
> 0) {
283 mergeq_free(mqp
->mq_thrs
,
284 sizeof (thread_t
) * mqp
->mq_ndthreads
);
286 mergeq_free(mqp
->mq_items
, sizeof (void *) *
288 mergeq_free(mqp
, sizeof (mergeq_t
));
289 return (mergeq_error(ret
));
292 mqp
->mq_cap
= MERGEQ_DEFAULT_CAP
;
298 mergeq_reset(mergeq_t
*mqp
)
300 VERIFY(MUTEX_HELD(&mqp
->mq_lock
));
301 VERIFY(mqp
->mq_working
== B_FALSE
);
302 if (mqp
->mq_cap
!= 0)
303 bzero(mqp
->mq_items
, sizeof (void *) * mqp
->mq_cap
);
310 mqp
->mq_gncommit
= 0;
313 mqp
->mq_iserror
= B_FALSE
;
318 mergeq_grow(mergeq_t
*mqp
)
323 VERIFY(MUTEX_HELD(&mqp
->mq_lock
));
324 VERIFY(mqp
->mq_working
== B_FALSE
);
326 if (SIZE_MAX
- mqp
->mq_cap
< MERGEQ_DEFAULT_CAP
)
329 ncap
= mqp
->mq_cap
+ MERGEQ_DEFAULT_CAP
;
330 items
= mergeq_alloc(ncap
* sizeof (void *));
334 bzero(items
, ncap
* sizeof (void *));
335 bcopy(mqp
->mq_items
, items
, mqp
->mq_cap
* sizeof (void *));
336 mergeq_free(mqp
->mq_items
, sizeof (mqp
->mq_cap
) * sizeof (void *));
337 mqp
->mq_items
= items
;
343 mergeq_add(mergeq_t
*mqp
, void *item
)
345 VERIFY0(mutex_lock(&mqp
->mq_lock
));
346 if (mqp
->mq_working
== B_TRUE
) {
347 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
348 return (mergeq_error(ENXIO
));
351 if (mqp
->mq_next
== mqp
->mq_cap
) {
354 if ((ret
= mergeq_grow(mqp
)) != 0) {
355 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
356 return (mergeq_error(ret
));
359 mqp
->mq_items
[mqp
->mq_next
] = item
;
363 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
368 mergeq_slot(mergeq_t
*mqp
)
372 VERIFY(MUTEX_HELD(&mqp
->mq_lock
));
373 VERIFY(mqp
->mq_next
< mqp
->mq_cap
);
376 * This probably should be a cv / wait thing.
378 VERIFY(mqp
->mq_nproc
!= (mqp
->mq_next
+ 1) % mqp
->mq_cap
);
382 if (mqp
->mq_next
== mqp
->mq_cap
) {
383 mqp
->mq_next
%= mqp
->mq_cap
;
391 * Internal function to push items onto the queue which is now a circular
392 * buffer. This should only be used once we begin working on the queue.
395 mergeq_push(mergeq_t
*mqp
, size_t slot
, void *item
)
397 VERIFY(MUTEX_HELD(&mqp
->mq_lock
));
398 VERIFY(slot
< mqp
->mq_cap
);
401 * We need to verify that we don't push over something that exists.
402 * Based on the design, this should never happen. However, in the face
403 * of bugs, anything is possible.
405 while (mqp
->mq_ncommit
!= slot
&& mqp
->mq_iserror
== B_FALSE
)
406 (void) cond_wait(&mqp
->mq_cond
, &mqp
->mq_lock
);
408 if (mqp
->mq_iserror
== B_TRUE
)
411 mqp
->mq_items
[slot
] = item
;
414 if (mqp
->mq_ncommit
== mqp
->mq_cap
) {
415 mqp
->mq_ncommit
%= mqp
->mq_cap
;
418 (void) cond_broadcast(&mqp
->mq_cond
);
422 mergeq_pop_one(mergeq_t
*mqp
)
427 * We can't move mq_nproc beyond mq_next if they're on the same
430 VERIFY(mqp
->mq_gnext
!= mqp
->mq_gnproc
||
431 mqp
->mq_nproc
!= mqp
->mq_next
);
433 out
= mqp
->mq_items
[mqp
->mq_nproc
];
435 mqp
->mq_items
[mqp
->mq_nproc
] = NULL
;
437 if (mqp
->mq_nproc
== mqp
->mq_cap
) {
438 mqp
->mq_nproc
%= mqp
->mq_cap
;
447 * Pop a set of two entries from the queue. We may not have anything to process
448 * at the moment, eg. be waiting for someone to add something. In which case,
449 * we'll be sitting and waiting.
452 mergeq_pop(mergeq_t
*mqp
, void **first
, void **second
)
454 VERIFY(MUTEX_HELD(&mqp
->mq_lock
));
455 VERIFY(mqp
->mq_nproc
< mqp
->mq_cap
);
457 while (mqp
->mq_nitems
< 2 && mqp
->mq_nactthrs
> 0 &&
458 mqp
->mq_iserror
== B_FALSE
)
459 (void) cond_wait(&mqp
->mq_cond
, &mqp
->mq_lock
);
461 if (mqp
->mq_iserror
== B_TRUE
)
464 if (mqp
->mq_nitems
< 2 && mqp
->mq_nactthrs
== 0) {
465 VERIFY(mqp
->mq_iserror
== B_TRUE
|| mqp
->mq_nitems
== 1);
468 VERIFY(mqp
->mq_nitems
>= 2);
470 *first
= mergeq_pop_one(mqp
);
471 *second
= mergeq_pop_one(mqp
);
477 mergeq_thr_merge(void *arg
)
481 VERIFY0(mutex_lock(&mqp
->mq_lock
));
484 * Check to make sure creation worked and if not, fail fast.
486 if (mqp
->mq_iserror
== B_TRUE
) {
487 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
492 void *first
, *second
, *out
;
496 if (mqp
->mq_nitems
== 1 && mqp
->mq_nactthrs
== 0) {
497 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
501 if (mergeq_pop(mqp
, &first
, &second
) == B_FALSE
) {
502 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
505 slot
= mergeq_slot(mqp
);
509 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
510 ret
= mqp
->mq_func(first
, second
, &out
, mqp
->mq_arg
);
511 VERIFY0(mutex_lock(&mqp
->mq_lock
));
514 if (mqp
->mq_iserror
== B_FALSE
) {
515 mqp
->mq_iserror
= B_TRUE
;
517 (void) cond_broadcast(&mqp
->mq_cond
);
520 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
523 mergeq_push(mqp
, slot
, out
);
529 mergeq_merge(mergeq_t
*mqp
, mergeq_proc_f
*func
, void *arg
, void **outp
,
533 boolean_t seterr
= B_FALSE
;
535 if (mqp
== NULL
|| func
== NULL
|| outp
== NULL
) {
536 return (mergeq_error(EINVAL
));
539 VERIFY0(mutex_lock(&mqp
->mq_lock
));
540 if (mqp
->mq_working
== B_TRUE
) {
541 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
542 return (mergeq_error(EBUSY
));
545 if (mqp
->mq_nitems
== 0) {
548 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
553 * Now that we've finished adding items to the queue, turn it into a
559 mqp
->mq_working
= B_TRUE
;
560 if (mqp
->mq_next
== mqp
->mq_cap
) {
561 mqp
->mq_next
%= mqp
->mq_cap
;
564 mqp
->mq_ncommit
= mqp
->mq_next
;
567 for (i
= 0; i
< mqp
->mq_ndthreads
; i
++) {
568 ret
= thr_create(NULL
, 0, mergeq_thr_merge
, mqp
, 0,
571 mqp
->mq_iserror
= B_TRUE
;
576 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
578 (void) mergeq_thr_merge(mqp
);
580 for (i
= 0; i
< mqp
->mq_ndthreads
; i
++) {
581 VERIFY0(thr_join(mqp
->mq_thrs
[i
], NULL
, NULL
));
584 VERIFY0(mutex_lock(&mqp
->mq_lock
));
586 VERIFY(mqp
->mq_nactthrs
== 0);
587 mqp
->mq_working
= B_FALSE
;
588 if (ret
== 0 && mqp
->mq_iserror
== B_FALSE
) {
589 VERIFY(mqp
->mq_nitems
== 1);
590 *outp
= mergeq_pop_one(mqp
);
591 } else if (ret
== 0 && mqp
->mq_iserror
== B_TRUE
) {
594 *errp
= mqp
->mq_error
;
600 VERIFY0(mutex_unlock(&mqp
->mq_lock
));
602 if (seterr
== B_TRUE
)
603 return (mergeq_error(ret
));