2 * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org>
5 * Copyright (c) 2012 Spectra Logic Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 #include <sys/param.h>
30 #include <sys/kernel.h>
33 #include <sys/mutex.h>
34 #include <sys/queue.h>
35 #include <sys/taskq.h>
36 #include <sys/taskqueue.h>
37 #include <sys/zfs_context.h>
39 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__)
40 #include <machine/pcb.h>
45 static uint_t taskq_tsd
;
46 static uma_zone_t taskq_zone
;
49 * Global system-wide dynamic task queue available for all consumers. This
50 * taskq is not intended for long-running tasks; instead, a dedicated taskq
53 taskq_t
*system_taskq
= NULL
;
54 taskq_t
*system_delay_taskq
= NULL
;
55 taskq_t
*dynamic_taskq
= NULL
;
59 static MALLOC_DEFINE(M_TASKQ
, "taskq", "taskq structures");
61 static LIST_HEAD(tqenthashhead
, taskq_ent
) *tqenthashtbl
;
62 static unsigned long tqenthash
;
63 static unsigned long tqenthashlock
;
64 static struct sx
*tqenthashtbl_lock
;
66 static taskqid_t tqidnext
;
68 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
69 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
72 #define TIMEOUT_TASK 1
75 system_taskq_init(void *arg
)
79 tsd_create(&taskq_tsd
, NULL
);
80 tqenthashtbl
= hashinit(mp_ncpus
* 8, M_TASKQ
, &tqenthash
);
81 tqenthashlock
= (tqenthash
+ 1) / 8;
82 if (tqenthashlock
> 0)
85 malloc(sizeof (*tqenthashtbl_lock
) * (tqenthashlock
+ 1),
86 M_TASKQ
, M_WAITOK
| M_ZERO
);
87 for (i
= 0; i
< tqenthashlock
+ 1; i
++)
88 sx_init_flags(&tqenthashtbl_lock
[i
], "tqenthash", SX_DUPOK
);
89 taskq_zone
= uma_zcreate("taskq_zone", sizeof (taskq_ent_t
),
90 NULL
, NULL
, NULL
, NULL
,
92 system_taskq
= taskq_create("system_taskq", mp_ncpus
, minclsyspri
,
94 system_delay_taskq
= taskq_create("system_delay_taskq", mp_ncpus
,
95 minclsyspri
, 0, 0, 0);
97 SYSINIT(system_taskq_init
, SI_SUB_CONFIGURE
, SI_ORDER_ANY
, system_taskq_init
,
101 system_taskq_fini(void *arg
)
105 taskq_destroy(system_delay_taskq
);
106 taskq_destroy(system_taskq
);
107 uma_zdestroy(taskq_zone
);
108 tsd_destroy(&taskq_tsd
);
109 for (i
= 0; i
< tqenthashlock
+ 1; i
++)
110 sx_destroy(&tqenthashtbl_lock
[i
]);
111 for (i
= 0; i
< tqenthash
+ 1; i
++)
112 VERIFY(LIST_EMPTY(&tqenthashtbl
[i
]));
113 free(tqenthashtbl_lock
, M_TASKQ
);
114 free(tqenthashtbl
, M_TASKQ
);
116 SYSUNINIT(system_taskq_fini
, SI_SUB_CONFIGURE
, SI_ORDER_ANY
, system_taskq_fini
,
126 * Assume a 64-bit counter will not wrap in practice.
128 tqid
= atomic_add_64_nv(&tqidnext
, 1);
139 tqid
= atomic_add_32_nv(&tqidnext
, 1);
140 if (__predict_true(tqid
!= 0))
149 taskq_lookup(taskqid_t tqid
)
151 taskq_ent_t
*ent
= NULL
;
155 sx_slock(TQIDHASHLOCK(tqid
));
156 LIST_FOREACH(ent
, TQIDHASH(tqid
), tqent_hash
) {
157 if (ent
->tqent_id
== tqid
)
161 refcount_acquire(&ent
->tqent_rc
);
162 sx_sunlock(TQIDHASHLOCK(tqid
));
167 taskq_insert(taskq_ent_t
*ent
)
169 taskqid_t tqid
= __taskq_genid();
171 ent
->tqent_id
= tqid
;
172 sx_xlock(TQIDHASHLOCK(tqid
));
173 LIST_INSERT_HEAD(TQIDHASH(tqid
), ent
, tqent_hash
);
174 sx_xunlock(TQIDHASHLOCK(tqid
));
179 taskq_remove(taskq_ent_t
*ent
)
181 taskqid_t tqid
= ent
->tqent_id
;
185 sx_xlock(TQIDHASHLOCK(tqid
));
186 if (ent
->tqent_id
!= 0) {
187 LIST_REMOVE(ent
, tqent_hash
);
190 sx_xunlock(TQIDHASHLOCK(tqid
));
194 taskq_tsd_set(void *context
)
196 taskq_t
*tq
= context
;
198 #if defined(__amd64__) || defined(__i386__) || defined(__aarch64__)
199 if (context
!= NULL
&& tsd_get(taskq_tsd
) == NULL
)
200 fpu_kern_thread(FPU_KERN_NORMAL
);
202 tsd_set(taskq_tsd
, tq
);
206 taskq_create_impl(const char *name
, int nthreads
, pri_t pri
,
207 proc_t
*proc __maybe_unused
, uint_t flags
)
211 if ((flags
& TASKQ_THREADS_CPU_PCT
) != 0)
212 nthreads
= MAX((mp_ncpus
* nthreads
) / 100, 1);
214 tq
= kmem_alloc(sizeof (*tq
), KM_SLEEP
);
215 tq
->tq_nthreads
= nthreads
;
216 tq
->tq_queue
= taskqueue_create(name
, M_WAITOK
,
217 taskqueue_thread_enqueue
, &tq
->tq_queue
);
218 taskqueue_set_callback(tq
->tq_queue
, TASKQUEUE_CALLBACK_TYPE_INIT
,
220 taskqueue_set_callback(tq
->tq_queue
, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN
,
221 taskq_tsd_set
, NULL
);
222 (void) taskqueue_start_threads_in_proc(&tq
->tq_queue
, nthreads
, pri
,
225 return ((taskq_t
*)tq
);
229 taskq_create(const char *name
, int nthreads
, pri_t pri
, int minalloc __unused
,
230 int maxalloc __unused
, uint_t flags
)
232 return (taskq_create_impl(name
, nthreads
, pri
, system_proc
, flags
));
236 taskq_create_proc(const char *name
, int nthreads
, pri_t pri
,
237 int minalloc __unused
, int maxalloc __unused
, proc_t
*proc
, uint_t flags
)
239 return (taskq_create_impl(name
, nthreads
, pri
, proc
, flags
));
243 taskq_destroy(taskq_t
*tq
)
246 taskqueue_free(tq
->tq_queue
);
247 kmem_free(tq
, sizeof (*tq
));
250 static void taskq_sync_assign(void *arg
);
252 typedef struct taskq_sync_arg
{
253 kthread_t
*tqa_thread
;
260 taskq_sync_assign(void *arg
)
262 taskq_sync_arg_t
*tqa
= arg
;
264 mutex_enter(&tqa
->tqa_lock
);
265 tqa
->tqa_thread
= curthread
;
267 cv_signal(&tqa
->tqa_cv
);
268 while (tqa
->tqa_ready
== 1)
269 cv_wait(&tqa
->tqa_cv
, &tqa
->tqa_lock
);
270 mutex_exit(&tqa
->tqa_lock
);
274 * Create a taskq with a specified number of pool threads. Allocate
275 * and return an array of nthreads kthread_t pointers, one for each
276 * thread in the pool. The array is not ordered and must be freed
280 taskq_create_synced(const char *name
, int nthreads
, pri_t pri
,
281 int minalloc
, int maxalloc
, uint_t flags
, kthread_t
***ktpp
)
284 taskq_sync_arg_t
*tqs
= kmem_zalloc(sizeof (*tqs
) * nthreads
, KM_SLEEP
);
285 kthread_t
**kthreads
= kmem_zalloc(sizeof (*kthreads
) * nthreads
,
288 flags
&= ~(TASKQ_DYNAMIC
| TASKQ_THREADS_CPU_PCT
| TASKQ_DC_BATCH
);
290 tq
= taskq_create(name
, nthreads
, minclsyspri
, nthreads
, INT_MAX
,
291 flags
| TASKQ_PREPOPULATE
);
293 VERIFY(tq
->tq_nthreads
== nthreads
);
295 /* spawn all syncthreads */
296 for (int i
= 0; i
< nthreads
; i
++) {
297 cv_init(&tqs
[i
].tqa_cv
, NULL
, CV_DEFAULT
, NULL
);
298 mutex_init(&tqs
[i
].tqa_lock
, NULL
, MUTEX_DEFAULT
, NULL
);
299 (void) taskq_dispatch(tq
, taskq_sync_assign
,
303 /* wait on all syncthreads to start */
304 for (int i
= 0; i
< nthreads
; i
++) {
305 mutex_enter(&tqs
[i
].tqa_lock
);
306 while (tqs
[i
].tqa_ready
== 0)
307 cv_wait(&tqs
[i
].tqa_cv
, &tqs
[i
].tqa_lock
);
308 mutex_exit(&tqs
[i
].tqa_lock
);
311 /* let all syncthreads resume, finish */
312 for (int i
= 0; i
< nthreads
; i
++) {
313 mutex_enter(&tqs
[i
].tqa_lock
);
314 tqs
[i
].tqa_ready
= 2;
315 cv_broadcast(&tqs
[i
].tqa_cv
);
316 mutex_exit(&tqs
[i
].tqa_lock
);
320 for (int i
= 0; i
< nthreads
; i
++) {
321 kthreads
[i
] = tqs
[i
].tqa_thread
;
322 mutex_destroy(&tqs
[i
].tqa_lock
);
323 cv_destroy(&tqs
[i
].tqa_cv
);
325 kmem_free(tqs
, sizeof (*tqs
) * nthreads
);
332 taskq_member(taskq_t
*tq
, kthread_t
*thread
)
335 return (taskqueue_member(tq
->tq_queue
, thread
));
339 taskq_of_curthread(void)
341 return (tsd_get(taskq_tsd
));
345 taskq_free(taskq_ent_t
*task
)
348 if (refcount_release(&task
->tqent_rc
))
349 uma_zfree(taskq_zone
, task
);
353 taskq_cancel_id(taskq_t
*tq
, taskqid_t tid
)
359 if ((ent
= taskq_lookup(tid
)) == NULL
)
362 if (ent
->tqent_type
== NORMAL_TASK
) {
363 rc
= taskqueue_cancel(tq
->tq_queue
, &ent
->tqent_task
, &pend
);
365 taskqueue_drain(tq
->tq_queue
, &ent
->tqent_task
);
367 rc
= taskqueue_cancel_timeout(tq
->tq_queue
,
368 &ent
->tqent_timeout_task
, &pend
);
370 taskqueue_drain_timeout(tq
->tq_queue
,
371 &ent
->tqent_timeout_task
);
376 * Tasks normally free themselves when run, but here the task
377 * was cancelled so it did not free itself.
381 /* Free the extra reference we added with taskq_lookup. */
383 return (pend
? 0 : ENOENT
);
387 taskq_run(void *arg
, int pending
)
389 taskq_ent_t
*task
= arg
;
393 task
->tqent_func(task
->tqent_arg
);
398 taskq_dispatch_delay(taskq_t
*tq
, task_func_t func
, void *arg
,
399 uint_t flags
, clock_t expire_time
)
406 timo
= expire_time
- ddi_get_lbolt();
408 return (taskq_dispatch(tq
, func
, arg
, flags
));
410 if ((flags
& (TQ_SLEEP
| TQ_NOQUEUE
)) == TQ_SLEEP
)
415 task
= uma_zalloc(taskq_zone
, mflag
);
418 task
->tqent_func
= func
;
419 task
->tqent_arg
= arg
;
420 task
->tqent_type
= TIMEOUT_TASK
;
421 refcount_init(&task
->tqent_rc
, 1);
422 tqid
= taskq_insert(task
);
423 TIMEOUT_TASK_INIT(tq
->tq_queue
, &task
->tqent_timeout_task
, 0,
426 taskqueue_enqueue_timeout(tq
->tq_queue
, &task
->tqent_timeout_task
,
432 taskq_dispatch(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
)
438 if ((flags
& (TQ_SLEEP
| TQ_NOQUEUE
)) == TQ_SLEEP
)
443 * If TQ_FRONT is given, we want higher priority for this task, so it
444 * can go at the front of the queue.
446 prio
= !!(flags
& TQ_FRONT
);
448 task
= uma_zalloc(taskq_zone
, mflag
);
451 refcount_init(&task
->tqent_rc
, 1);
452 task
->tqent_func
= func
;
453 task
->tqent_arg
= arg
;
454 task
->tqent_type
= NORMAL_TASK
;
455 tqid
= taskq_insert(task
);
456 TASK_INIT(&task
->tqent_task
, prio
, taskq_run
, task
);
457 taskqueue_enqueue(tq
->tq_queue
, &task
->tqent_task
);
462 taskq_run_ent(void *arg
, int pending
)
464 taskq_ent_t
*task
= arg
;
468 task
->tqent_func(task
->tqent_arg
);
472 taskq_dispatch_ent(taskq_t
*tq
, task_func_t func
, void *arg
, uint32_t flags
,
476 * If TQ_FRONT is given, we want higher priority for this task, so it
477 * can go at the front of the queue.
479 task
->tqent_task
.ta_priority
= !!(flags
& TQ_FRONT
);
480 task
->tqent_func
= func
;
481 task
->tqent_arg
= arg
;
482 taskqueue_enqueue(tq
->tq_queue
, &task
->tqent_task
);
486 taskq_init_ent(taskq_ent_t
*task
)
488 TASK_INIT(&task
->tqent_task
, 0, taskq_run_ent
, task
);
489 task
->tqent_func
= NULL
;
490 task
->tqent_arg
= NULL
;
492 task
->tqent_type
= NORMAL_TASK
;
497 taskq_empty_ent(taskq_ent_t
*task
)
499 return (task
->tqent_task
.ta_pending
== 0);
503 taskq_wait(taskq_t
*tq
)
505 taskqueue_quiesce(tq
->tq_queue
);
509 taskq_wait_id(taskq_t
*tq
, taskqid_t tid
)
513 if ((ent
= taskq_lookup(tid
)) == NULL
)
516 if (ent
->tqent_type
== NORMAL_TASK
)
517 taskqueue_drain(tq
->tq_queue
, &ent
->tqent_task
);
519 taskqueue_drain_timeout(tq
->tq_queue
, &ent
->tqent_timeout_task
);
524 taskq_wait_outstanding(taskq_t
*tq
, taskqid_t id __unused
)
526 taskqueue_drain_all(tq
->tq_queue
);