4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 #include "thr_uberdata.h"
34 #include "thread_pool_impl.h"
36 static mutex_t thread_pool_lock
= DEFAULTMUTEX
;
37 static tpool_t
*thread_pools
= NULL
;
40 delete_pool(tpool_t
*tpool
)
44 ASSERT(tpool
->tp_current
== 0 && tpool
->tp_active
== NULL
);
47 * Unlink the pool from the global list of all pools.
49 lmutex_lock(&thread_pool_lock
);
50 if (thread_pools
== tpool
)
51 thread_pools
= tpool
->tp_forw
;
52 if (thread_pools
== tpool
)
55 tpool
->tp_back
->tp_forw
= tpool
->tp_forw
;
56 tpool
->tp_forw
->tp_back
= tpool
->tp_back
;
58 lmutex_unlock(&thread_pool_lock
);
61 * There should be no pending jobs, but just in case...
63 for (job
= tpool
->tp_head
; job
!= NULL
; job
= tpool
->tp_head
) {
64 tpool
->tp_head
= job
->tpj_next
;
65 lfree(job
, sizeof (*job
));
67 (void) pthread_attr_destroy(&tpool
->tp_attr
);
68 lfree(tpool
, sizeof (*tpool
));
72 * Worker thread is terminating.
75 worker_cleanup(tpool_t
*tpool
)
77 ASSERT(MUTEX_HELD(&tpool
->tp_mutex
));
79 if (--tpool
->tp_current
== 0 &&
80 (tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
))) {
81 if (tpool
->tp_flags
& TP_ABANDON
) {
82 sig_mutex_unlock(&tpool
->tp_mutex
);
86 if (tpool
->tp_flags
& TP_DESTROY
)
87 (void) cond_broadcast(&tpool
->tp_busycv
);
89 sig_mutex_unlock(&tpool
->tp_mutex
);
93 notify_waiters(tpool_t
*tpool
)
95 if (tpool
->tp_head
== NULL
&& tpool
->tp_active
== NULL
) {
96 tpool
->tp_flags
&= ~TP_WAIT
;
97 (void) cond_broadcast(&tpool
->tp_waitcv
);
102 * Called by a worker thread on return from a tpool_dispatch()d job.
105 job_cleanup(tpool_t
*tpool
)
107 pthread_t my_tid
= pthread_self();
108 tpool_active_t
*activep
;
109 tpool_active_t
**activepp
;
111 sig_mutex_lock(&tpool
->tp_mutex
);
113 for (activepp
= &tpool
->tp_active
;; activepp
= &activep
->tpa_next
) {
115 if (activep
->tpa_tid
== my_tid
) {
116 *activepp
= activep
->tpa_next
;
120 if (tpool
->tp_flags
& TP_WAIT
)
121 notify_waiters(tpool
);
125 tpool_worker(void *arg
)
127 tpool_t
*tpool
= (tpool_t
*)arg
;
130 void (*func
)(void *);
131 tpool_active_t active
;
133 sig_mutex_lock(&tpool
->tp_mutex
);
134 pthread_cleanup_push(worker_cleanup
, tpool
);
137 * This is the worker's main loop.
138 * It will only be left if a timeout or an error has occured.
140 active
.tpa_tid
= pthread_self();
144 if (tpool
->tp_flags
& TP_WAIT
)
145 notify_waiters(tpool
);
146 while ((tpool
->tp_head
== NULL
||
147 (tpool
->tp_flags
& TP_SUSPEND
)) &&
148 !(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
))) {
149 if (tpool
->tp_current
<= tpool
->tp_minimum
||
150 tpool
->tp_linger
== 0) {
151 (void) sig_cond_wait(&tpool
->tp_workcv
,
156 timeout
.tv_sec
= tpool
->tp_linger
;
158 if (sig_cond_reltimedwait(&tpool
->tp_workcv
,
159 &tpool
->tp_mutex
, &timeout
) != 0) {
166 if (tpool
->tp_flags
& TP_DESTROY
)
168 if (tpool
->tp_flags
& TP_ABANDON
) {
169 /* can't abandon a suspended pool */
170 if (tpool
->tp_flags
& TP_SUSPEND
) {
171 tpool
->tp_flags
&= ~TP_SUSPEND
;
172 (void) cond_broadcast(&tpool
->tp_workcv
);
174 if (tpool
->tp_head
== NULL
)
177 if ((job
= tpool
->tp_head
) != NULL
&&
178 !(tpool
->tp_flags
& TP_SUSPEND
)) {
180 func
= job
->tpj_func
;
182 tpool
->tp_head
= job
->tpj_next
;
183 if (job
== tpool
->tp_tail
)
184 tpool
->tp_tail
= NULL
;
186 active
.tpa_next
= tpool
->tp_active
;
187 tpool
->tp_active
= &active
;
188 sig_mutex_unlock(&tpool
->tp_mutex
);
189 pthread_cleanup_push(job_cleanup
, tpool
);
190 lfree(job
, sizeof (*job
));
192 * Call the specified function.
196 * We don't know what this thread has been doing,
197 * so we reset its signal mask and cancellation
198 * state back to the initial values.
200 (void) pthread_sigmask(SIG_SETMASK
, &maskset
, NULL
);
201 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED
,
203 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE
,
205 pthread_cleanup_pop(1);
207 if (elapsed
&& tpool
->tp_current
> tpool
->tp_minimum
) {
209 * We timed out and there is no work to be done
210 * and the number of workers exceeds the minimum.
211 * Exit now to reduce the size of the pool.
216 pthread_cleanup_pop(1);
221 * Create a worker thread, with all signals blocked.
224 create_worker(tpool_t
*tpool
)
229 (void) pthread_sigmask(SIG_SETMASK
, &maskset
, &oset
);
230 error
= pthread_create(NULL
, &tpool
->tp_attr
, tpool_worker
, tpool
);
231 (void) pthread_sigmask(SIG_SETMASK
, &oset
, NULL
);
236 tpool_create(uint_t min_threads
, uint_t max_threads
, uint_t linger
,
237 pthread_attr_t
*attr
)
245 if (min_threads
> max_threads
|| max_threads
< 1) {
250 if (pthread_attr_getstack(attr
, &stackaddr
, &stacksize
) != 0) {
255 * Allow only one thread in the pool with a specified stack.
256 * Require threads to have at least the minimum stack size.
258 minstack
= thr_min_stack();
259 if (stackaddr
!= NULL
) {
260 if (stacksize
< minstack
|| max_threads
!= 1) {
264 } else if (stacksize
!= 0 && stacksize
< minstack
) {
270 tpool
= lmalloc(sizeof (*tpool
));
275 (void) mutex_init(&tpool
->tp_mutex
, USYNC_THREAD
, NULL
);
276 (void) cond_init(&tpool
->tp_busycv
, USYNC_THREAD
, NULL
);
277 (void) cond_init(&tpool
->tp_workcv
, USYNC_THREAD
, NULL
);
278 (void) cond_init(&tpool
->tp_waitcv
, USYNC_THREAD
, NULL
);
279 tpool
->tp_minimum
= min_threads
;
280 tpool
->tp_maximum
= max_threads
;
281 tpool
->tp_linger
= linger
;
284 * We cannot just copy the attribute pointer.
285 * We need to initialize a new pthread_attr_t structure
286 * with the values from the user-supplied pthread_attr_t.
287 * If the attribute pointer is NULL, we need to initialize
288 * the new pthread_attr_t structure with default values.
290 error
= pthread_attr_clone(&tpool
->tp_attr
, attr
);
292 lfree(tpool
, sizeof (*tpool
));
297 /* make all pool threads be detached daemon threads */
298 (void) pthread_attr_setdetachstate(&tpool
->tp_attr
,
299 PTHREAD_CREATE_DETACHED
);
300 (void) pthread_attr_setdaemonstate_np(&tpool
->tp_attr
,
301 PTHREAD_CREATE_DAEMON_NP
);
303 /* insert into the global list of all thread pools */
304 lmutex_lock(&thread_pool_lock
);
305 if (thread_pools
== NULL
) {
306 tpool
->tp_forw
= tpool
;
307 tpool
->tp_back
= tpool
;
308 thread_pools
= tpool
;
310 thread_pools
->tp_back
->tp_forw
= tpool
;
311 tpool
->tp_forw
= thread_pools
;
312 tpool
->tp_back
= thread_pools
->tp_back
;
313 thread_pools
->tp_back
= tpool
;
315 lmutex_unlock(&thread_pool_lock
);
321 * Dispatch a work request to the thread pool.
322 * If there are idle workers, awaken one.
323 * Else, if the maximum number of workers has
324 * not been reached, spawn a new worker thread.
325 * Else just return with the job added to the queue.
328 tpool_dispatch(tpool_t
*tpool
, void (*func
)(void *), void *arg
)
332 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
334 if ((job
= lmalloc(sizeof (*job
))) == NULL
)
336 job
->tpj_next
= NULL
;
337 job
->tpj_func
= func
;
340 sig_mutex_lock(&tpool
->tp_mutex
);
342 if (tpool
->tp_head
== NULL
)
343 tpool
->tp_head
= job
;
345 tpool
->tp_tail
->tpj_next
= job
;
346 tpool
->tp_tail
= job
;
349 if (!(tpool
->tp_flags
& TP_SUSPEND
)) {
350 if (tpool
->tp_idle
> 0)
351 (void) cond_signal(&tpool
->tp_workcv
);
352 else if (tpool
->tp_current
< tpool
->tp_maximum
&&
353 create_worker(tpool
) == 0)
357 sig_mutex_unlock(&tpool
->tp_mutex
);
362 * Assumes: by the time tpool_destroy() is called no one will use this
363 * thread pool in any way and no one will try to dispatch entries to it.
364 * Calling tpool_destroy() from a job in the pool will cause deadlock.
367 tpool_destroy(tpool_t
*tpool
)
369 tpool_active_t
*activep
;
371 ASSERT(!tpool_member(tpool
));
372 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
374 sig_mutex_lock(&tpool
->tp_mutex
);
375 pthread_cleanup_push(sig_mutex_unlock
, &tpool
->tp_mutex
);
377 /* mark the pool as being destroyed; wakeup idle workers */
378 tpool
->tp_flags
|= TP_DESTROY
;
379 tpool
->tp_flags
&= ~TP_SUSPEND
;
380 (void) cond_broadcast(&tpool
->tp_workcv
);
382 /* cancel all active workers */
383 for (activep
= tpool
->tp_active
; activep
; activep
= activep
->tpa_next
)
384 (void) pthread_cancel(activep
->tpa_tid
);
386 /* wait for all active workers to finish */
387 while (tpool
->tp_active
!= NULL
) {
388 tpool
->tp_flags
|= TP_WAIT
;
389 (void) sig_cond_wait(&tpool
->tp_waitcv
, &tpool
->tp_mutex
);
392 /* the last worker to terminate will wake us up */
393 while (tpool
->tp_current
!= 0)
394 (void) sig_cond_wait(&tpool
->tp_busycv
, &tpool
->tp_mutex
);
396 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
401 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
402 * The last worker to terminate will delete the pool.
405 tpool_abandon(tpool_t
*tpool
)
407 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
409 sig_mutex_lock(&tpool
->tp_mutex
);
410 if (tpool
->tp_current
== 0) {
411 /* no workers, just delete the pool */
412 sig_mutex_unlock(&tpool
->tp_mutex
);
415 /* wake up all workers, last one will delete the pool */
416 tpool
->tp_flags
|= TP_ABANDON
;
417 tpool
->tp_flags
&= ~TP_SUSPEND
;
418 (void) cond_broadcast(&tpool
->tp_workcv
);
419 sig_mutex_unlock(&tpool
->tp_mutex
);
424 * Wait for all jobs to complete.
425 * Calling tpool_wait() from a job in the pool will cause deadlock.
428 tpool_wait(tpool_t
*tpool
)
430 ASSERT(!tpool_member(tpool
));
431 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
433 sig_mutex_lock(&tpool
->tp_mutex
);
434 pthread_cleanup_push(sig_mutex_unlock
, &tpool
->tp_mutex
);
435 while (tpool
->tp_head
!= NULL
|| tpool
->tp_active
!= NULL
) {
436 tpool
->tp_flags
|= TP_WAIT
;
437 (void) sig_cond_wait(&tpool
->tp_waitcv
, &tpool
->tp_mutex
);
438 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
440 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
444 tpool_suspend(tpool_t
*tpool
)
446 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
448 sig_mutex_lock(&tpool
->tp_mutex
);
449 tpool
->tp_flags
|= TP_SUSPEND
;
450 sig_mutex_unlock(&tpool
->tp_mutex
);
454 tpool_suspended(tpool_t
*tpool
)
458 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
460 sig_mutex_lock(&tpool
->tp_mutex
);
461 suspended
= (tpool
->tp_flags
& TP_SUSPEND
) != 0;
462 sig_mutex_unlock(&tpool
->tp_mutex
);
468 tpool_resume(tpool_t
*tpool
)
472 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
474 sig_mutex_lock(&tpool
->tp_mutex
);
475 if (!(tpool
->tp_flags
& TP_SUSPEND
)) {
476 sig_mutex_unlock(&tpool
->tp_mutex
);
479 tpool
->tp_flags
&= ~TP_SUSPEND
;
480 (void) cond_broadcast(&tpool
->tp_workcv
);
481 excess
= tpool
->tp_njobs
- tpool
->tp_idle
;
482 while (excess
-- > 0 && tpool
->tp_current
< tpool
->tp_maximum
) {
483 if (create_worker(tpool
) != 0)
484 break; /* pthread_create() failed */
487 sig_mutex_unlock(&tpool
->tp_mutex
);
491 tpool_member(tpool_t
*tpool
)
493 pthread_t my_tid
= pthread_self();
494 tpool_active_t
*activep
;
496 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
498 sig_mutex_lock(&tpool
->tp_mutex
);
499 for (activep
= tpool
->tp_active
; activep
; activep
= activep
->tpa_next
) {
500 if (activep
->tpa_tid
== my_tid
) {
501 sig_mutex_unlock(&tpool
->tp_mutex
);
505 sig_mutex_unlock(&tpool
->tp_mutex
);
510 postfork1_child_tpool(void)
512 pthread_t my_tid
= pthread_self();
517 * All of the thread pool workers are gone, except possibly
518 * for the current thread, if it is a thread pool worker thread.
519 * Retain the thread pools, but make them all empty. Whatever
520 * jobs were queued or running belong to the parent process.
523 if ((tpool
= thread_pools
) == NULL
)
527 tpool_active_t
*activep
;
529 (void) mutex_init(&tpool
->tp_mutex
, USYNC_THREAD
, NULL
);
530 (void) cond_init(&tpool
->tp_busycv
, USYNC_THREAD
, NULL
);
531 (void) cond_init(&tpool
->tp_workcv
, USYNC_THREAD
, NULL
);
532 (void) cond_init(&tpool
->tp_waitcv
, USYNC_THREAD
, NULL
);
533 for (job
= tpool
->tp_head
; job
; job
= tpool
->tp_head
) {
534 tpool
->tp_head
= job
->tpj_next
;
535 lfree(job
, sizeof (*job
));
537 tpool
->tp_tail
= NULL
;
539 for (activep
= tpool
->tp_active
; activep
;
540 activep
= activep
->tpa_next
) {
541 if (activep
->tpa_tid
== my_tid
) {
542 activep
->tpa_next
= NULL
;
547 tpool
->tp_current
= 0;
548 if ((tpool
->tp_active
= activep
) != NULL
)
549 tpool
->tp_current
= 1;
550 tpool
->tp_flags
&= ~TP_WAIT
;
551 if (tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)) {
552 tpool
->tp_flags
&= ~TP_DESTROY
;
553 tpool
->tp_flags
|= TP_ABANDON
;
554 if (tpool
->tp_current
== 0) {
556 goto top
; /* start over */
559 } while ((tpool
= tpool
->tp_forw
) != thread_pools
);