2 * Copyright (c) 2009 Pawel Jakub Dawidek <pjd@FreeBSD.org>
5 * Copyright (c) 2012 Spectra Logic Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29 #include <sys/cdefs.h>
30 __FBSDID("$FreeBSD$");
32 #include <sys/param.h>
34 #include <sys/epoch.h>
35 #include <sys/kernel.h>
38 #include <sys/mutex.h>
39 #include <sys/queue.h>
40 #include <sys/taskq.h>
41 #include <sys/taskqueue.h>
42 #include <sys/zfs_context.h>
44 #if defined(__i386__) || defined(__amd64__) || defined(__aarch64__)
45 #include <machine/pcb.h>
50 #if __FreeBSD_version < 1201522
51 #define taskqueue_start_threads_in_proc(tqp, count, pri, proc, name, ...) \
52 taskqueue_start_threads(tqp, count, pri, name, __VA_ARGS__)
55 static uint_t taskq_tsd
;
56 static uma_zone_t taskq_zone
;
59 * Global system-wide dynamic task queue available for all consumers. This
60 * taskq is not intended for long-running tasks; instead, a dedicated taskq
63 taskq_t
*system_taskq
= NULL
;
64 taskq_t
*system_delay_taskq
= NULL
;
65 taskq_t
*dynamic_taskq
= NULL
;
69 extern int uma_align_cache
;
71 static MALLOC_DEFINE(M_TASKQ
, "taskq", "taskq structures");
73 static CK_LIST_HEAD(tqenthashhead
, taskq_ent
) *tqenthashtbl
;
74 static unsigned long tqenthash
;
75 static unsigned long tqenthashlock
;
76 static struct sx
*tqenthashtbl_lock
;
78 static taskqid_t tqidnext
;
80 #define TQIDHASH(tqid) (&tqenthashtbl[(tqid) & tqenthash])
81 #define TQIDHASHLOCK(tqid) (&tqenthashtbl_lock[((tqid) & tqenthashlock)])
83 #define TIMEOUT_TASK 1
87 system_taskq_init(void *arg
)
91 tsd_create(&taskq_tsd
, NULL
);
92 tqenthashtbl
= hashinit(mp_ncpus
* 8, M_TASKQ
, &tqenthash
);
93 tqenthashlock
= (tqenthash
+ 1) / 8;
94 if (tqenthashlock
> 0)
97 malloc(sizeof (*tqenthashtbl_lock
) * (tqenthashlock
+ 1),
98 M_TASKQ
, M_WAITOK
| M_ZERO
);
99 for (i
= 0; i
< tqenthashlock
+ 1; i
++)
100 sx_init_flags(&tqenthashtbl_lock
[i
], "tqenthash", SX_DUPOK
);
101 taskq_zone
= uma_zcreate("taskq_zone", sizeof (taskq_ent_t
),
102 NULL
, NULL
, NULL
, NULL
,
104 system_taskq
= taskq_create("system_taskq", mp_ncpus
, minclsyspri
,
106 system_delay_taskq
= taskq_create("system_delay_taskq", mp_ncpus
,
107 minclsyspri
, 0, 0, 0);
109 SYSINIT(system_taskq_init
, SI_SUB_CONFIGURE
, SI_ORDER_ANY
, system_taskq_init
,
113 system_taskq_fini(void *arg
)
117 taskq_destroy(system_delay_taskq
);
118 taskq_destroy(system_taskq
);
119 uma_zdestroy(taskq_zone
);
120 tsd_destroy(&taskq_tsd
);
121 for (i
= 0; i
< tqenthashlock
+ 1; i
++)
122 sx_destroy(&tqenthashtbl_lock
[i
]);
123 for (i
= 0; i
< tqenthash
+ 1; i
++)
124 VERIFY(CK_LIST_EMPTY(&tqenthashtbl
[i
]));
125 free(tqenthashtbl_lock
, M_TASKQ
);
126 free(tqenthashtbl
, M_TASKQ
);
128 SYSUNINIT(system_taskq_fini
, SI_SUB_CONFIGURE
, SI_ORDER_ANY
, system_taskq_fini
,
138 * Assume a 64-bit counter will not wrap in practice.
140 tqid
= atomic_add_64_nv(&tqidnext
, 1);
151 tqid
= atomic_add_32_nv(&tqidnext
, 1);
152 if (__predict_true(tqid
!= 0))
161 taskq_lookup(taskqid_t tqid
)
163 taskq_ent_t
*ent
= NULL
;
165 sx_xlock(TQIDHASHLOCK(tqid
));
166 CK_LIST_FOREACH(ent
, TQIDHASH(tqid
), tqent_hash
) {
167 if (ent
->tqent_id
== tqid
)
171 refcount_acquire(&ent
->tqent_rc
);
172 sx_xunlock(TQIDHASHLOCK(tqid
));
177 taskq_insert(taskq_ent_t
*ent
)
181 tqid
= __taskq_genid();
182 ent
->tqent_id
= tqid
;
183 ent
->tqent_registered
= B_TRUE
;
184 sx_xlock(TQIDHASHLOCK(tqid
));
185 CK_LIST_INSERT_HEAD(TQIDHASH(tqid
), ent
, tqent_hash
);
186 sx_xunlock(TQIDHASHLOCK(tqid
));
191 taskq_remove(taskq_ent_t
*ent
)
193 taskqid_t tqid
= ent
->tqent_id
;
195 if (!ent
->tqent_registered
)
198 sx_xlock(TQIDHASHLOCK(tqid
));
199 CK_LIST_REMOVE(ent
, tqent_hash
);
200 sx_xunlock(TQIDHASHLOCK(tqid
));
201 ent
->tqent_registered
= B_FALSE
;
205 taskq_tsd_set(void *context
)
207 taskq_t
*tq
= context
;
209 #if defined(__amd64__) || defined(__i386__) || defined(__aarch64__)
210 if (context
!= NULL
&& tsd_get(taskq_tsd
) == NULL
)
211 fpu_kern_thread(FPU_KERN_NORMAL
);
213 tsd_set(taskq_tsd
, tq
);
217 taskq_create_impl(const char *name
, int nthreads
, pri_t pri
,
218 proc_t
*proc __maybe_unused
, uint_t flags
)
222 if ((flags
& TASKQ_THREADS_CPU_PCT
) != 0)
223 nthreads
= MAX((mp_ncpus
* nthreads
) / 100, 1);
225 tq
= kmem_alloc(sizeof (*tq
), KM_SLEEP
);
226 tq
->tq_queue
= taskqueue_create(name
, M_WAITOK
,
227 taskqueue_thread_enqueue
, &tq
->tq_queue
);
228 taskqueue_set_callback(tq
->tq_queue
, TASKQUEUE_CALLBACK_TYPE_INIT
,
230 taskqueue_set_callback(tq
->tq_queue
, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN
,
231 taskq_tsd_set
, NULL
);
232 (void) taskqueue_start_threads_in_proc(&tq
->tq_queue
, nthreads
, pri
,
235 return ((taskq_t
*)tq
);
239 taskq_create(const char *name
, int nthreads
, pri_t pri
, int minalloc __unused
,
240 int maxalloc __unused
, uint_t flags
)
242 return (taskq_create_impl(name
, nthreads
, pri
, system_proc
, flags
));
246 taskq_create_proc(const char *name
, int nthreads
, pri_t pri
,
247 int minalloc __unused
, int maxalloc __unused
, proc_t
*proc
, uint_t flags
)
249 return (taskq_create_impl(name
, nthreads
, pri
, proc
, flags
));
253 taskq_destroy(taskq_t
*tq
)
256 taskqueue_free(tq
->tq_queue
);
257 kmem_free(tq
, sizeof (*tq
));
261 taskq_member(taskq_t
*tq
, kthread_t
*thread
)
264 return (taskqueue_member(tq
->tq_queue
, thread
));
268 taskq_of_curthread(void)
270 return (tsd_get(taskq_tsd
));
274 taskq_free(taskq_ent_t
*task
)
277 if (refcount_release(&task
->tqent_rc
))
278 uma_zfree(taskq_zone
, task
);
282 taskq_cancel_id(taskq_t
*tq
, taskqid_t tid
)
291 if ((ent
= taskq_lookup(tid
)) == NULL
)
294 ent
->tqent_cancelled
= B_TRUE
;
295 if (ent
->tqent_type
== TIMEOUT_TASK
) {
296 rc
= taskqueue_cancel_timeout(tq
->tq_queue
,
297 &ent
->tqent_timeout_task
, &pend
);
299 rc
= taskqueue_cancel(tq
->tq_queue
, &ent
->tqent_task
, &pend
);
301 taskqueue_drain(tq
->tq_queue
, &ent
->tqent_task
);
304 * Tasks normally free themselves when run, but here the task
305 * was cancelled so it did not free itself.
309 /* Free the extra reference we added with taskq_lookup. */
315 taskq_run(void *arg
, int pending __unused
)
317 taskq_ent_t
*task
= arg
;
319 if (!task
->tqent_cancelled
)
320 task
->tqent_func(task
->tqent_arg
);
325 taskq_dispatch_delay(taskq_t
*tq
, task_func_t func
, void *arg
,
326 uint_t flags
, clock_t expire_time
)
333 timo
= expire_time
- ddi_get_lbolt();
335 return (taskq_dispatch(tq
, func
, arg
, flags
));
337 if ((flags
& (TQ_SLEEP
| TQ_NOQUEUE
)) == TQ_SLEEP
)
342 task
= uma_zalloc(taskq_zone
, mflag
);
345 task
->tqent_func
= func
;
346 task
->tqent_arg
= arg
;
347 task
->tqent_type
= TIMEOUT_TASK
;
348 task
->tqent_cancelled
= B_FALSE
;
349 refcount_init(&task
->tqent_rc
, 1);
350 tqid
= taskq_insert(task
);
351 TIMEOUT_TASK_INIT(tq
->tq_queue
, &task
->tqent_timeout_task
, 0,
354 taskqueue_enqueue_timeout(tq
->tq_queue
, &task
->tqent_timeout_task
,
360 taskq_dispatch(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
)
366 if ((flags
& (TQ_SLEEP
| TQ_NOQUEUE
)) == TQ_SLEEP
)
371 * If TQ_FRONT is given, we want higher priority for this task, so it
372 * can go at the front of the queue.
374 prio
= !!(flags
& TQ_FRONT
);
376 task
= uma_zalloc(taskq_zone
, mflag
);
379 refcount_init(&task
->tqent_rc
, 1);
380 task
->tqent_func
= func
;
381 task
->tqent_arg
= arg
;
382 task
->tqent_cancelled
= B_FALSE
;
383 task
->tqent_type
= NORMAL_TASK
;
384 tqid
= taskq_insert(task
);
385 TASK_INIT(&task
->tqent_task
, prio
, taskq_run
, task
);
386 taskqueue_enqueue(tq
->tq_queue
, &task
->tqent_task
);
391 taskq_run_ent(void *arg
, int pending __unused
)
393 taskq_ent_t
*task
= arg
;
395 task
->tqent_func(task
->tqent_arg
);
399 taskq_dispatch_ent(taskq_t
*tq
, task_func_t func
, void *arg
, uint32_t flags
,
405 * If TQ_FRONT is given, we want higher priority for this task, so it
406 * can go at the front of the queue.
408 prio
= !!(flags
& TQ_FRONT
);
409 task
->tqent_cancelled
= B_FALSE
;
410 task
->tqent_registered
= B_FALSE
;
412 task
->tqent_func
= func
;
413 task
->tqent_arg
= arg
;
415 TASK_INIT(&task
->tqent_task
, prio
, taskq_run_ent
, task
);
416 taskqueue_enqueue(tq
->tq_queue
, &task
->tqent_task
);
420 taskq_wait(taskq_t
*tq
)
422 taskqueue_quiesce(tq
->tq_queue
);
426 taskq_wait_id(taskq_t
*tq
, taskqid_t tid
)
432 if ((ent
= taskq_lookup(tid
)) == NULL
)
435 taskqueue_drain(tq
->tq_queue
, &ent
->tqent_task
);
440 taskq_wait_outstanding(taskq_t
*tq
, taskqid_t id __unused
)
442 taskqueue_drain_all(tq
->tq_queue
);
446 taskq_empty_ent(taskq_ent_t
*t
)
448 return (t
->tqent_task
.ta_pending
== 0);