Merge remote-tracking branch 'origin/master'
[unleashed/lotheac.git] / usr / src / lib / mergeq / workq.c
blobb9f1f2aa1c62253e3cb6f91b2c5b340ce83d857c
1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
13 * Copyright 2015 Joyent, Inc.
17 * Work queue
19 * A multi-threaded work queue.
21 * The general design of this is to add a fixed number of items to the queue and
22 * then drain them with the specified number of threads.
25 #include <strings.h>
26 #include <sys/debug.h>
27 #include <thread.h>
28 #include <synch.h>
29 #include <errno.h>
30 #include <limits.h>
31 #include <stdlib.h>
33 #include "workq.h"
35 struct workq {
36 mutex_t wq_lock; /* Protects below items */
37 cond_t wq_cond; /* Condition variable */
38 void **wq_items; /* Array of items to process */
39 size_t wq_nitems; /* Number of items in queue */
40 size_t wq_cap; /* Queue capacity */
41 size_t wq_next; /* Next item to process */
42 uint_t wq_ndthreads; /* Desired number of threads */
43 thread_t *wq_thrs; /* Actual threads */
44 workq_proc_f *wq_func; /* Processing function */
45 void *wq_arg; /* Argument for processing */
46 boolean_t wq_working; /* Are we actively using it? */
47 boolean_t wq_iserror; /* Have we encountered an error? */
48 int wq_error; /* Error value, if any */
51 #define WORKQ_DEFAULT_CAP 64
53 static int
54 workq_error(int err)
56 VERIFY(err != 0);
57 errno = err;
58 return (WORKQ_ERROR);
61 void
62 workq_fini(workq_t *wqp)
64 if (wqp == NULL)
65 return;
67 VERIFY(wqp->wq_working != B_TRUE);
68 VERIFY0(mutex_destroy(&wqp->wq_lock));
69 VERIFY0(cond_destroy(&wqp->wq_cond));
70 if (wqp->wq_cap > 0)
71 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
72 if (wqp->wq_ndthreads > 0)
73 workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
74 workq_free(wqp, sizeof (workq_t));
77 int
78 workq_init(workq_t **outp, uint_t nthrs)
80 int ret;
81 workq_t *wqp;
83 wqp = workq_alloc(sizeof (workq_t));
84 if (wqp == NULL)
85 return (workq_error(ENOMEM));
87 bzero(wqp, sizeof (workq_t));
88 wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
89 if (wqp->wq_items == NULL) {
90 workq_free(wqp, sizeof (workq_t));
91 return (workq_error(ENOMEM));
93 bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
95 wqp->wq_ndthreads = nthrs - 1;
96 if (wqp->wq_ndthreads > 0) {
97 wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
98 wqp->wq_ndthreads);
99 if (wqp->wq_thrs == NULL) {
100 workq_free(wqp->wq_items, sizeof (void *) *
101 WORKQ_DEFAULT_CAP);
102 workq_free(wqp, sizeof (workq_t));
103 return (workq_error(ENOMEM));
107 if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
108 NULL)) != 0) {
109 if (wqp->wq_ndthreads > 0) {
110 workq_free(wqp->wq_thrs,
111 sizeof (thread_t) * wqp->wq_ndthreads);
113 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
114 workq_free(wqp, sizeof (workq_t));
115 return (workq_error(ret));
118 if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
119 VERIFY0(mutex_destroy(&wqp->wq_lock));
120 if (wqp->wq_ndthreads > 0) {
121 workq_free(wqp->wq_thrs,
122 sizeof (thread_t) * wqp->wq_ndthreads);
124 workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
125 workq_free(wqp, sizeof (workq_t));
126 return (workq_error(ret));
129 wqp->wq_cap = WORKQ_DEFAULT_CAP;
130 *outp = wqp;
131 return (0);
134 static void
135 workq_reset(workq_t *wqp)
137 VERIFY(MUTEX_HELD(&wqp->wq_lock));
138 VERIFY(wqp->wq_working == B_FALSE);
139 if (wqp->wq_cap > 0)
140 bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
141 wqp->wq_nitems = 0;
142 wqp->wq_next = 0;
143 wqp->wq_func = NULL;
144 wqp->wq_arg = NULL;
145 wqp->wq_iserror = B_FALSE;
146 wqp->wq_error = 0;
149 static int
150 workq_grow(workq_t *wqp)
152 size_t ncap;
153 void **items;
155 VERIFY(MUTEX_HELD(&wqp->wq_lock));
156 VERIFY(wqp->wq_working == B_FALSE);
158 if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
159 return (ENOSPC);
161 ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
162 items = workq_alloc(ncap * sizeof (void *));
163 if (items == NULL)
164 return (ENOMEM);
166 bzero(items, ncap * sizeof (void *));
167 bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
168 workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
169 wqp->wq_items = items;
170 wqp->wq_cap = ncap;
171 return (0);
175 workq_add(workq_t *wqp, void *item)
177 VERIFY0(mutex_lock(&wqp->wq_lock));
178 if (wqp->wq_working == B_TRUE) {
179 VERIFY0(mutex_unlock(&wqp->wq_lock));
180 return (workq_error(ENXIO));
183 if (wqp->wq_nitems == wqp->wq_cap) {
184 int ret;
186 if ((ret = workq_grow(wqp)) != 0) {
187 VERIFY0(mutex_unlock(&wqp->wq_lock));
188 return (workq_error(ret));
192 wqp->wq_items[wqp->wq_nitems] = item;
193 wqp->wq_nitems++;
195 VERIFY0(mutex_unlock(&wqp->wq_lock));
197 return (0);
200 static void *
201 workq_pop(workq_t *wqp)
203 void *out;
205 VERIFY(MUTEX_HELD(&wqp->wq_lock));
206 VERIFY(wqp->wq_next < wqp->wq_nitems);
208 out = wqp->wq_items[wqp->wq_next];
209 wqp->wq_items[wqp->wq_next] = NULL;
210 wqp->wq_next++;
212 return (out);
215 static void *
216 workq_thr_work(void *arg)
218 workq_t *wqp = arg;
220 VERIFY0(mutex_lock(&wqp->wq_lock));
221 VERIFY(wqp->wq_working == B_TRUE);
223 for (;;) {
224 int ret;
225 void *item;
227 if (wqp->wq_iserror == B_TRUE ||
228 wqp->wq_next == wqp->wq_nitems) {
229 VERIFY0(mutex_unlock(&wqp->wq_lock));
230 return (NULL);
233 item = workq_pop(wqp);
235 VERIFY0(mutex_unlock(&wqp->wq_lock));
236 ret = wqp->wq_func(item, wqp->wq_arg);
237 VERIFY0(mutex_lock(&wqp->wq_lock));
239 if (ret != 0) {
240 if (wqp->wq_iserror == B_FALSE) {
241 wqp->wq_iserror = B_TRUE;
242 wqp->wq_error = ret;
244 VERIFY0(mutex_unlock(&wqp->wq_lock));
245 return (NULL);
251 workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
253 int i, ret;
254 boolean_t seterr = B_FALSE;
256 if (wqp == NULL || func == NULL)
257 return (workq_error(EINVAL));
259 VERIFY0(mutex_lock(&wqp->wq_lock));
260 if (wqp->wq_working == B_TRUE) {
261 VERIFY0(mutex_unlock(&wqp->wq_lock));
262 return (workq_error(EBUSY));
265 if (wqp->wq_nitems == 0) {
266 workq_reset(wqp);
267 VERIFY0(mutex_unlock(&wqp->wq_lock));
268 return (0);
271 wqp->wq_func = func;
272 wqp->wq_arg = arg;
273 wqp->wq_next = 0;
274 wqp->wq_working = B_TRUE;
276 ret = 0;
277 for (i = 0; i < wqp->wq_ndthreads; i++) {
278 ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
279 &wqp->wq_thrs[i]);
280 if (ret != 0) {
281 wqp->wq_iserror = B_TRUE;
285 VERIFY0(mutex_unlock(&wqp->wq_lock));
286 if (ret == 0)
287 (void) workq_thr_work(wqp);
289 for (i = 0; i < wqp->wq_ndthreads; i++) {
290 VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
293 VERIFY0(mutex_lock(&wqp->wq_lock));
294 wqp->wq_working = B_FALSE;
295 if (ret == 0 && wqp->wq_iserror == B_TRUE) {
296 ret = WORKQ_UERROR;
297 if (errp != NULL)
298 *errp = wqp->wq_error;
299 } else if (ret != 0) {
300 VERIFY(wqp->wq_iserror == B_FALSE);
301 seterr = B_TRUE;
304 workq_reset(wqp);
305 VERIFY0(mutex_unlock(&wqp->wq_lock));
307 if (seterr == B_TRUE)
308 return (workq_error(ret));
310 return (ret);