2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
13 * Copyright 2015 Joyent, Inc.
19 * A multi-threaded work queue.
21 * The general design of this is to add a fixed number of items to the queue and
22 * then drain them with the specified number of threads.
26 #include <sys/debug.h>
36 mutex_t wq_lock
; /* Protects below items */
37 cond_t wq_cond
; /* Condition variable */
38 void **wq_items
; /* Array of items to process */
39 size_t wq_nitems
; /* Number of items in queue */
40 size_t wq_cap
; /* Queue capacity */
41 size_t wq_next
; /* Next item to process */
42 uint_t wq_ndthreads
; /* Desired number of threads */
43 thread_t
*wq_thrs
; /* Actual threads */
44 workq_proc_f
*wq_func
; /* Processing function */
45 void *wq_arg
; /* Argument for processing */
46 boolean_t wq_working
; /* Are we actively using it? */
47 boolean_t wq_iserror
; /* Have we encountered an error? */
48 int wq_error
; /* Error value, if any */
51 #define WORKQ_DEFAULT_CAP 64
62 workq_fini(workq_t
*wqp
)
67 VERIFY(wqp
->wq_working
!= B_TRUE
);
68 VERIFY0(mutex_destroy(&wqp
->wq_lock
));
69 VERIFY0(cond_destroy(&wqp
->wq_cond
));
71 workq_free(wqp
->wq_items
, sizeof (void *) * wqp
->wq_cap
);
72 if (wqp
->wq_ndthreads
> 0)
73 workq_free(wqp
->wq_thrs
, sizeof (thread_t
) * wqp
->wq_ndthreads
);
74 workq_free(wqp
, sizeof (workq_t
));
78 workq_init(workq_t
**outp
, uint_t nthrs
)
83 wqp
= workq_alloc(sizeof (workq_t
));
85 return (workq_error(ENOMEM
));
87 bzero(wqp
, sizeof (workq_t
));
88 wqp
->wq_items
= workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP
);
89 if (wqp
->wq_items
== NULL
) {
90 workq_free(wqp
, sizeof (workq_t
));
91 return (workq_error(ENOMEM
));
93 bzero(wqp
->wq_items
, sizeof (void *) * WORKQ_DEFAULT_CAP
);
95 wqp
->wq_ndthreads
= nthrs
- 1;
96 if (wqp
->wq_ndthreads
> 0) {
97 wqp
->wq_thrs
= workq_alloc(sizeof (thread_t
) *
99 if (wqp
->wq_thrs
== NULL
) {
100 workq_free(wqp
->wq_items
, sizeof (void *) *
102 workq_free(wqp
, sizeof (workq_t
));
103 return (workq_error(ENOMEM
));
107 if ((ret
= mutex_init(&wqp
->wq_lock
, USYNC_THREAD
| LOCK_ERRORCHECK
,
109 if (wqp
->wq_ndthreads
> 0) {
110 workq_free(wqp
->wq_thrs
,
111 sizeof (thread_t
) * wqp
->wq_ndthreads
);
113 workq_free(wqp
->wq_items
, sizeof (void *) * WORKQ_DEFAULT_CAP
);
114 workq_free(wqp
, sizeof (workq_t
));
115 return (workq_error(ret
));
118 if ((ret
= cond_init(&wqp
->wq_cond
, USYNC_THREAD
, NULL
)) != 0) {
119 VERIFY0(mutex_destroy(&wqp
->wq_lock
));
120 if (wqp
->wq_ndthreads
> 0) {
121 workq_free(wqp
->wq_thrs
,
122 sizeof (thread_t
) * wqp
->wq_ndthreads
);
124 workq_free(wqp
->wq_items
, sizeof (void *) * WORKQ_DEFAULT_CAP
);
125 workq_free(wqp
, sizeof (workq_t
));
126 return (workq_error(ret
));
129 wqp
->wq_cap
= WORKQ_DEFAULT_CAP
;
135 workq_reset(workq_t
*wqp
)
137 VERIFY(MUTEX_HELD(&wqp
->wq_lock
));
138 VERIFY(wqp
->wq_working
== B_FALSE
);
140 bzero(wqp
->wq_items
, sizeof (void *) * wqp
->wq_cap
);
145 wqp
->wq_iserror
= B_FALSE
;
150 workq_grow(workq_t
*wqp
)
155 VERIFY(MUTEX_HELD(&wqp
->wq_lock
));
156 VERIFY(wqp
->wq_working
== B_FALSE
);
158 if (SIZE_MAX
- wqp
->wq_cap
< WORKQ_DEFAULT_CAP
)
161 ncap
= wqp
->wq_cap
+ WORKQ_DEFAULT_CAP
;
162 items
= workq_alloc(ncap
* sizeof (void *));
166 bzero(items
, ncap
* sizeof (void *));
167 bcopy(wqp
->wq_items
, items
, wqp
->wq_cap
* sizeof (void *));
168 workq_free(wqp
->wq_items
, sizeof (void *) * wqp
->wq_cap
);
169 wqp
->wq_items
= items
;
175 workq_add(workq_t
*wqp
, void *item
)
177 VERIFY0(mutex_lock(&wqp
->wq_lock
));
178 if (wqp
->wq_working
== B_TRUE
) {
179 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
180 return (workq_error(ENXIO
));
183 if (wqp
->wq_nitems
== wqp
->wq_cap
) {
186 if ((ret
= workq_grow(wqp
)) != 0) {
187 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
188 return (workq_error(ret
));
192 wqp
->wq_items
[wqp
->wq_nitems
] = item
;
195 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
201 workq_pop(workq_t
*wqp
)
205 VERIFY(MUTEX_HELD(&wqp
->wq_lock
));
206 VERIFY(wqp
->wq_next
< wqp
->wq_nitems
);
208 out
= wqp
->wq_items
[wqp
->wq_next
];
209 wqp
->wq_items
[wqp
->wq_next
] = NULL
;
216 workq_thr_work(void *arg
)
220 VERIFY0(mutex_lock(&wqp
->wq_lock
));
221 VERIFY(wqp
->wq_working
== B_TRUE
);
227 if (wqp
->wq_iserror
== B_TRUE
||
228 wqp
->wq_next
== wqp
->wq_nitems
) {
229 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
233 item
= workq_pop(wqp
);
235 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
236 ret
= wqp
->wq_func(item
, wqp
->wq_arg
);
237 VERIFY0(mutex_lock(&wqp
->wq_lock
));
240 if (wqp
->wq_iserror
== B_FALSE
) {
241 wqp
->wq_iserror
= B_TRUE
;
244 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
251 workq_work(workq_t
*wqp
, workq_proc_f
*func
, void *arg
, int *errp
)
254 boolean_t seterr
= B_FALSE
;
256 if (wqp
== NULL
|| func
== NULL
)
257 return (workq_error(EINVAL
));
259 VERIFY0(mutex_lock(&wqp
->wq_lock
));
260 if (wqp
->wq_working
== B_TRUE
) {
261 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
262 return (workq_error(EBUSY
));
265 if (wqp
->wq_nitems
== 0) {
267 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
274 wqp
->wq_working
= B_TRUE
;
277 for (i
= 0; i
< wqp
->wq_ndthreads
; i
++) {
278 ret
= thr_create(NULL
, 0, workq_thr_work
, wqp
, 0,
281 wqp
->wq_iserror
= B_TRUE
;
285 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
287 (void) workq_thr_work(wqp
);
289 for (i
= 0; i
< wqp
->wq_ndthreads
; i
++) {
290 VERIFY0(thr_join(wqp
->wq_thrs
[i
], NULL
, NULL
));
293 VERIFY0(mutex_lock(&wqp
->wq_lock
));
294 wqp
->wq_working
= B_FALSE
;
295 if (ret
== 0 && wqp
->wq_iserror
== B_TRUE
) {
298 *errp
= wqp
->wq_error
;
299 } else if (ret
!= 0) {
300 VERIFY(wqp
->wq_iserror
== B_FALSE
);
305 VERIFY0(mutex_unlock(&wqp
->wq_lock
));
307 if (seterr
== B_TRUE
)
308 return (workq_error(ret
));