1 /* $NetBSD: subr_workqueue.c,v 1.29 2009/10/21 21:12:06 rmind Exp $ */
4 * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 #include <sys/cdefs.h>
30 __KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.29 2009/10/21 21:12:06 rmind Exp $");
32 #include <sys/param.h>
34 #include <sys/systm.h>
35 #include <sys/kthread.h>
38 #include <sys/workqueue.h>
39 #include <sys/mutex.h>
40 #include <sys/condvar.h>
41 #include <sys/queue.h>
43 #include <uvm/uvm_extern.h>
45 typedef struct work_impl
{
46 SIMPLEQ_ENTRY(work_impl
) wk_entry
;
49 SIMPLEQ_HEAD(workqhead
, work_impl
);
51 struct workqueue_queue
{
54 struct workqhead q_queue
;
59 void (*wq_func
)(struct work
*, void *);
68 #define WQ_SIZE (roundup2(sizeof(struct workqueue), coherency_unit))
69 #define WQ_QUEUE_SIZE (roundup2(sizeof(struct workqueue_queue), coherency_unit))
71 #define POISON 0xaabbccdd
74 workqueue_size(int flags
)
78 + ((flags
& WQ_PERCPU
) != 0 ? ncpu
: 1) * WQ_QUEUE_SIZE
82 static struct workqueue_queue
*
83 workqueue_queue_lookup(struct workqueue
*wq
, struct cpu_info
*ci
)
87 if (wq
->wq_flags
& WQ_PERCPU
) {
88 idx
= ci
? cpu_index(ci
) : cpu_index(curcpu());
91 return (void *)((uintptr_t)(wq
) + WQ_SIZE
+ (idx
* WQ_QUEUE_SIZE
));
95 workqueue_runlist(struct workqueue
*wq
, struct workqhead
*list
)
101 * note that "list" is not a complete SIMPLEQ.
104 for (wk
= SIMPLEQ_FIRST(list
); wk
!= NULL
; wk
= next
) {
105 next
= SIMPLEQ_NEXT(wk
, wk_entry
);
106 (*wq
->wq_func
)((void *)wk
, wq
->wq_arg
);
111 workqueue_worker(void *cookie
)
113 struct workqueue
*wq
= cookie
;
114 struct workqueue_queue
*q
;
116 /* find the workqueue of this kthread */
117 q
= workqueue_queue_lookup(wq
, curlwp
->l_cpu
);
120 struct workqhead tmp
;
123 * we violate abstraction of SIMPLEQ.
126 #if defined(DIAGNOSTIC)
127 tmp
.sqh_last
= (void *)POISON
;
128 #endif /* defined(DIAGNOSTIC) */
130 mutex_enter(&q
->q_mutex
);
131 while (SIMPLEQ_EMPTY(&q
->q_queue
))
132 cv_wait(&q
->q_cv
, &q
->q_mutex
);
133 tmp
.sqh_first
= q
->q_queue
.sqh_first
; /* XXX */
134 SIMPLEQ_INIT(&q
->q_queue
);
135 mutex_exit(&q
->q_mutex
);
137 workqueue_runlist(wq
, &tmp
);
142 workqueue_init(struct workqueue
*wq
, const char *name
,
143 void (*callback_func
)(struct work
*, void *), void *callback_arg
,
149 wq
->wq_func
= callback_func
;
150 wq
->wq_arg
= callback_arg
;
154 workqueue_initqueue(struct workqueue
*wq
, struct workqueue_queue
*q
,
155 int ipl
, struct cpu_info
*ci
)
159 KASSERT(q
->q_worker
== NULL
);
161 mutex_init(&q
->q_mutex
, MUTEX_DEFAULT
, ipl
);
162 cv_init(&q
->q_cv
, wq
->wq_name
);
163 SIMPLEQ_INIT(&q
->q_queue
);
164 ktf
= ((wq
->wq_flags
& WQ_MPSAFE
) != 0 ? KTHREAD_MPSAFE
: 0);
166 error
= kthread_create(wq
->wq_prio
, ktf
, ci
, workqueue_worker
,
167 wq
, &q
->q_worker
, "%s/%u", wq
->wq_name
, ci
->ci_index
);
169 error
= kthread_create(wq
->wq_prio
, ktf
, ci
, workqueue_worker
,
170 wq
, &q
->q_worker
, "%s", wq
->wq_name
);
173 mutex_destroy(&q
->q_mutex
);
174 cv_destroy(&q
->q_cv
);
175 KASSERT(q
->q_worker
== NULL
);
180 struct workqueue_exitargs
{
182 struct workqueue_queue
*wqe_q
;
186 workqueue_exit(struct work
*wk
, void *arg
)
188 struct workqueue_exitargs
*wqe
= (void *)wk
;
189 struct workqueue_queue
*q
= wqe
->wqe_q
;
192 * only competition at this point is workqueue_finiqueue.
195 KASSERT(q
->q_worker
== curlwp
);
196 KASSERT(SIMPLEQ_EMPTY(&q
->q_queue
));
197 mutex_enter(&q
->q_mutex
);
200 mutex_exit(&q
->q_mutex
);
205 workqueue_finiqueue(struct workqueue
*wq
, struct workqueue_queue
*q
)
207 struct workqueue_exitargs wqe
;
209 KASSERT(wq
->wq_func
== workqueue_exit
);
212 KASSERT(SIMPLEQ_EMPTY(&q
->q_queue
));
213 KASSERT(q
->q_worker
!= NULL
);
214 mutex_enter(&q
->q_mutex
);
215 SIMPLEQ_INSERT_TAIL(&q
->q_queue
, &wqe
.wqe_wk
, wk_entry
);
217 while (q
->q_worker
!= NULL
) {
218 cv_wait(&q
->q_cv
, &q
->q_mutex
);
220 mutex_exit(&q
->q_mutex
);
221 mutex_destroy(&q
->q_mutex
);
222 cv_destroy(&q
->q_cv
);
228 workqueue_create(struct workqueue
**wqp
, const char *name
,
229 void (*callback_func
)(struct work
*, void *), void *callback_arg
,
230 pri_t prio
, int ipl
, int flags
)
232 struct workqueue
*wq
;
233 struct workqueue_queue
*q
;
237 CTASSERT(sizeof(work_impl_t
) <= sizeof(struct work
));
239 ptr
= kmem_zalloc(workqueue_size(flags
), KM_SLEEP
);
240 wq
= (void *)roundup2((uintptr_t)ptr
, coherency_unit
);
242 wq
->wq_flags
= flags
;
244 workqueue_init(wq
, name
, callback_func
, callback_arg
, prio
, ipl
);
246 if (flags
& WQ_PERCPU
) {
248 CPU_INFO_ITERATOR cii
;
250 /* create the work-queue for each CPU */
251 for (CPU_INFO_FOREACH(cii
, ci
)) {
252 q
= workqueue_queue_lookup(wq
, ci
);
253 error
= workqueue_initqueue(wq
, q
, ipl
, ci
);
259 /* initialize a work-queue */
260 q
= workqueue_queue_lookup(wq
, NULL
);
261 error
= workqueue_initqueue(wq
, q
, ipl
, NULL
);
265 workqueue_destroy(wq
);
274 workqueue_destroy(struct workqueue
*wq
)
276 struct workqueue_queue
*q
;
278 CPU_INFO_ITERATOR cii
;
280 wq
->wq_func
= workqueue_exit
;
281 for (CPU_INFO_FOREACH(cii
, ci
)) {
282 q
= workqueue_queue_lookup(wq
, ci
);
283 if (q
->q_worker
!= NULL
) {
284 workqueue_finiqueue(wq
, q
);
287 kmem_free(wq
->wq_ptr
, workqueue_size(wq
->wq_flags
));
291 workqueue_enqueue(struct workqueue
*wq
, struct work
*wk0
, struct cpu_info
*ci
)
293 struct workqueue_queue
*q
;
294 work_impl_t
*wk
= (void *)wk0
;
296 KASSERT(wq
->wq_flags
& WQ_PERCPU
|| ci
== NULL
);
297 q
= workqueue_queue_lookup(wq
, ci
);
299 mutex_enter(&q
->q_mutex
);
300 SIMPLEQ_INSERT_TAIL(&q
->q_queue
, wk
, wk_entry
);
302 mutex_exit(&q
->q_mutex
);