4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
26 * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved.
27 * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
30 #include <sys/taskq_impl.h>
32 #include <sys/class.h>
33 #include <sys/debug.h>
34 #include <sys/ksynch.h>
37 #include <sys/systm.h>
38 #include <sys/sysmacros.h>
39 #include <sys/unistd.h>
41 /* avoid <unistd.h> */
42 extern long sysconf(int);
44 /* avoiding <thread.h> */
45 typedef unsigned int thread_t
;
46 typedef unsigned int thread_key_t
;
48 extern int thr_create(void *, size_t, void *(*)(void *), void *, long,
50 extern int thr_join(thread_t
, thread_t
*, void **);
54 * THR_BOUND is defined same as PTHREAD_SCOPE_SYSTEM in <pthread.h>
55 * THR_DETACHED is defined same as PTHREAD_CREATE_DETACHED in <pthread.h>
56 * Any changes in these definitions should be reflected in <pthread.h>
58 #define THR_BOUND 0x00000001 /* = PTHREAD_SCOPE_SYSTEM */
59 #define THR_NEW_LWP 0x00000002
60 #define THR_DETACHED 0x00000040 /* = PTHREAD_CREATE_DETACHED */
61 #define THR_SUSPENDED 0x00000080
62 #define THR_DAEMON 0x00000100
66 taskq_t
*system_taskq
;
68 #define TASKQ_ACTIVE 0x00010000
72 krwlock_t tq_threadlock
;
73 kcondvar_t tq_dispatch_cv
;
74 kcondvar_t tq_wait_cv
;
75 thread_t
*tq_threadlist
;
82 kcondvar_t tq_maxalloc_cv
;
84 taskq_ent_t
*tq_freelist
;
89 task_alloc(taskq_t
*tq
, int tqflags
)
94 again
: if ((t
= tq
->tq_freelist
) != NULL
&& tq
->tq_nalloc
>= tq
->tq_minalloc
) {
95 tq
->tq_freelist
= t
->tqent_next
;
97 if (tq
->tq_nalloc
>= tq
->tq_maxalloc
) {
98 if (!(tqflags
& KM_SLEEP
))
102 * We don't want to exceed tq_maxalloc, but we can't
103 * wait for other tasks to complete (and thus free up
104 * task structures) without risking deadlock with
105 * the caller. So, we just delay for one second
106 * to throttle the allocation rate. If we have tasks
107 * complete before one second timeout expires then
108 * taskq_ent_free will signal us and we will
109 * immediately retry the allocation.
111 tq
->tq_maxalloc_wait
++;
112 rv
= cv_timedwait(&tq
->tq_maxalloc_cv
,
113 &tq
->tq_lock
, ddi_get_lbolt() + hz
);
114 tq
->tq_maxalloc_wait
--;
116 goto again
; /* signaled */
118 mutex_exit(&tq
->tq_lock
);
120 t
= kmem_alloc(sizeof (taskq_ent_t
), tqflags
);
122 mutex_enter(&tq
->tq_lock
);
130 task_free(taskq_t
*tq
, taskq_ent_t
*t
)
132 if (tq
->tq_nalloc
<= tq
->tq_minalloc
) {
133 t
->tqent_next
= tq
->tq_freelist
;
137 mutex_exit(&tq
->tq_lock
);
138 kmem_free(t
, sizeof (taskq_ent_t
));
139 mutex_enter(&tq
->tq_lock
);
142 if (tq
->tq_maxalloc_wait
)
143 cv_signal(&tq
->tq_maxalloc_cv
);
147 taskq_dispatch(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t tqflags
)
156 mutex_enter(&tq
->tq_lock
);
157 ASSERT(tq
->tq_flags
& TASKQ_ACTIVE
);
158 if ((t
= task_alloc(tq
, tqflags
)) == NULL
) {
159 mutex_exit(&tq
->tq_lock
);
162 if (tqflags
& TQ_FRONT
) {
163 t
->tqent_next
= tq
->tq_task
.tqent_next
;
164 t
->tqent_prev
= &tq
->tq_task
;
166 t
->tqent_next
= &tq
->tq_task
;
167 t
->tqent_prev
= tq
->tq_task
.tqent_prev
;
169 t
->tqent_next
->tqent_prev
= t
;
170 t
->tqent_prev
->tqent_next
= t
;
171 t
->tqent_func
= func
;
174 cv_signal(&tq
->tq_dispatch_cv
);
175 mutex_exit(&tq
->tq_lock
);
180 taskq_dispatch_ent(taskq_t
*tq
, task_func_t func
, void *arg
, uint_t flags
,
183 ASSERT(func
!= NULL
);
184 ASSERT(!(tq
->tq_flags
& TASKQ_DYNAMIC
));
187 * Mark it as a prealloc'd task. This is important
188 * to ensure that we don't free it later.
190 t
->tqent_flags
|= TQENT_FLAG_PREALLOC
;
192 * Enqueue the task to the underlying queue.
194 mutex_enter(&tq
->tq_lock
);
196 if (flags
& TQ_FRONT
) {
197 t
->tqent_next
= tq
->tq_task
.tqent_next
;
198 t
->tqent_prev
= &tq
->tq_task
;
200 t
->tqent_next
= &tq
->tq_task
;
201 t
->tqent_prev
= tq
->tq_task
.tqent_prev
;
203 t
->tqent_next
->tqent_prev
= t
;
204 t
->tqent_prev
->tqent_next
= t
;
205 t
->tqent_func
= func
;
207 cv_signal(&tq
->tq_dispatch_cv
);
208 mutex_exit(&tq
->tq_lock
);
212 taskq_wait(taskq_t
*tq
)
214 mutex_enter(&tq
->tq_lock
);
215 while (tq
->tq_task
.tqent_next
!= &tq
->tq_task
|| tq
->tq_active
!= 0)
216 cv_wait(&tq
->tq_wait_cv
, &tq
->tq_lock
);
217 mutex_exit(&tq
->tq_lock
);
221 taskq_thread(void *arg
)
227 mutex_enter(&tq
->tq_lock
);
228 while (tq
->tq_flags
& TASKQ_ACTIVE
) {
229 if ((t
= tq
->tq_task
.tqent_next
) == &tq
->tq_task
) {
230 if (--tq
->tq_active
== 0)
231 cv_broadcast(&tq
->tq_wait_cv
);
232 cv_wait(&tq
->tq_dispatch_cv
, &tq
->tq_lock
);
236 t
->tqent_prev
->tqent_next
= t
->tqent_next
;
237 t
->tqent_next
->tqent_prev
= t
->tqent_prev
;
238 t
->tqent_next
= NULL
;
239 t
->tqent_prev
= NULL
;
240 prealloc
= t
->tqent_flags
& TQENT_FLAG_PREALLOC
;
241 mutex_exit(&tq
->tq_lock
);
243 rw_enter(&tq
->tq_threadlock
, RW_READER
);
244 t
->tqent_func(t
->tqent_arg
);
245 rw_exit(&tq
->tq_threadlock
);
247 mutex_enter(&tq
->tq_lock
);
252 cv_broadcast(&tq
->tq_wait_cv
);
253 mutex_exit(&tq
->tq_lock
);
259 taskq_create(const char *name
, int nthr
, pri_t pri
, int minalloc
,
260 int maxalloc
, uint_t flags
)
262 return (taskq_create_proc(name
, nthr
, pri
,
263 minalloc
, maxalloc
, NULL
, flags
));
268 taskq_create_proc(const char *name
, int nthreads
, pri_t pri
,
269 int minalloc
, int maxalloc
, proc_t
*proc
, uint_t flags
)
271 taskq_t
*tq
= kmem_zalloc(sizeof (taskq_t
), KM_SLEEP
);
274 if (flags
& TASKQ_THREADS_CPU_PCT
) {
276 ASSERT3S(nthreads
, >=, 0);
277 ASSERT3S(nthreads
, <=, 100);
278 pct
= MIN(nthreads
, 100);
281 nthreads
= (sysconf(_SC_NPROCESSORS_ONLN
) * pct
) / 100;
282 nthreads
= MAX(nthreads
, 1); /* need at least 1 thread */
284 ASSERT3S(nthreads
, >=, 1);
287 rw_init(&tq
->tq_threadlock
, NULL
, RW_DEFAULT
, NULL
);
288 mutex_init(&tq
->tq_lock
, NULL
, MUTEX_DEFAULT
, NULL
);
289 cv_init(&tq
->tq_dispatch_cv
, NULL
, CV_DEFAULT
, NULL
);
290 cv_init(&tq
->tq_wait_cv
, NULL
, CV_DEFAULT
, NULL
);
291 cv_init(&tq
->tq_maxalloc_cv
, NULL
, CV_DEFAULT
, NULL
);
292 tq
->tq_flags
= flags
| TASKQ_ACTIVE
;
293 tq
->tq_active
= nthreads
;
294 tq
->tq_nthreads
= nthreads
;
295 tq
->tq_minalloc
= minalloc
;
296 tq
->tq_maxalloc
= maxalloc
;
297 tq
->tq_task
.tqent_next
= &tq
->tq_task
;
298 tq
->tq_task
.tqent_prev
= &tq
->tq_task
;
299 tq
->tq_threadlist
= kmem_alloc(nthreads
* sizeof (thread_t
), KM_SLEEP
);
301 if (flags
& TASKQ_PREPOPULATE
) {
302 mutex_enter(&tq
->tq_lock
);
303 while (minalloc
-- > 0)
304 task_free(tq
, task_alloc(tq
, KM_SLEEP
));
305 mutex_exit(&tq
->tq_lock
);
308 for (t
= 0; t
< nthreads
; t
++)
309 (void) thr_create(0, 0, taskq_thread
,
310 tq
, THR_BOUND
, &tq
->tq_threadlist
[t
]);
316 taskq_destroy(taskq_t
*tq
)
319 int nthreads
= tq
->tq_nthreads
;
323 mutex_enter(&tq
->tq_lock
);
325 tq
->tq_flags
&= ~TASKQ_ACTIVE
;
326 cv_broadcast(&tq
->tq_dispatch_cv
);
328 while (tq
->tq_nthreads
!= 0)
329 cv_wait(&tq
->tq_wait_cv
, &tq
->tq_lock
);
332 while (tq
->tq_nalloc
!= 0) {
333 ASSERT(tq
->tq_freelist
!= NULL
);
334 task_free(tq
, task_alloc(tq
, KM_SLEEP
));
337 mutex_exit(&tq
->tq_lock
);
339 for (t
= 0; t
< nthreads
; t
++)
340 (void) thr_join(tq
->tq_threadlist
[t
], NULL
, NULL
);
342 kmem_free(tq
->tq_threadlist
, nthreads
* sizeof (thread_t
));
344 rw_destroy(&tq
->tq_threadlock
);
345 mutex_destroy(&tq
->tq_lock
);
346 cv_destroy(&tq
->tq_dispatch_cv
);
347 cv_destroy(&tq
->tq_wait_cv
);
348 cv_destroy(&tq
->tq_maxalloc_cv
);
350 kmem_free(tq
, sizeof (taskq_t
));
354 taskq_member(taskq_t
*tq
, struct _kthread
*t
)
361 for (i
= 0; i
< tq
->tq_nthreads
; i
++)
362 if (tq
->tq_threadlist
[i
] == (thread_t
)(uintptr_t)t
)
369 system_taskq_init(void)
371 system_taskq
= taskq_create("system_taskq", 64, minclsyspri
, 4, 512,
372 TASKQ_DYNAMIC
| TASKQ_PREPOPULATE
);
376 system_taskq_fini(void)
378 taskq_destroy(system_taskq
);
379 system_taskq
= NULL
; /* defensive */