4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or https://opensource.org/licenses/CDDL-1.0.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
32 #include "thread_pool_impl.h"
34 static pthread_mutex_t thread_pool_lock
= PTHREAD_MUTEX_INITIALIZER
;
35 static tpool_t
*thread_pools
= NULL
;
38 delete_pool(tpool_t
*tpool
)
42 ASSERT(tpool
->tp_current
== 0 && tpool
->tp_active
== NULL
);
45 * Unlink the pool from the global list of all pools.
47 (void) pthread_mutex_lock(&thread_pool_lock
);
48 if (thread_pools
== tpool
)
49 thread_pools
= tpool
->tp_forw
;
50 if (thread_pools
== tpool
)
53 tpool
->tp_back
->tp_forw
= tpool
->tp_forw
;
54 tpool
->tp_forw
->tp_back
= tpool
->tp_back
;
56 pthread_mutex_unlock(&thread_pool_lock
);
59 * There should be no pending jobs, but just in case...
61 for (job
= tpool
->tp_head
; job
!= NULL
; job
= tpool
->tp_head
) {
62 tpool
->tp_head
= job
->tpj_next
;
65 (void) pthread_attr_destroy(&tpool
->tp_attr
);
70 * Worker thread is terminating.
73 worker_cleanup(void *arg
)
75 tpool_t
*tpool
= (tpool_t
*)arg
;
77 if (--tpool
->tp_current
== 0 &&
78 (tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
))) {
79 if (tpool
->tp_flags
& TP_ABANDON
) {
80 pthread_mutex_unlock(&tpool
->tp_mutex
);
84 if (tpool
->tp_flags
& TP_DESTROY
)
85 (void) pthread_cond_broadcast(&tpool
->tp_busycv
);
87 pthread_mutex_unlock(&tpool
->tp_mutex
);
91 notify_waiters(tpool_t
*tpool
)
93 if (tpool
->tp_head
== NULL
&& tpool
->tp_active
== NULL
) {
94 tpool
->tp_flags
&= ~TP_WAIT
;
95 (void) pthread_cond_broadcast(&tpool
->tp_waitcv
);
100 * Called by a worker thread on return from a tpool_dispatch()d job.
103 job_cleanup(void *arg
)
105 tpool_t
*tpool
= (tpool_t
*)arg
;
107 pthread_t my_tid
= pthread_self();
108 tpool_active_t
*activep
;
109 tpool_active_t
**activepp
;
111 pthread_mutex_lock(&tpool
->tp_mutex
);
112 for (activepp
= &tpool
->tp_active
; ; activepp
= &activep
->tpa_next
) {
114 if (activep
->tpa_tid
== my_tid
) {
115 *activepp
= activep
->tpa_next
;
119 if (tpool
->tp_flags
& TP_WAIT
)
120 notify_waiters(tpool
);
124 tpool_worker(void *arg
)
126 tpool_t
*tpool
= (tpool_t
*)arg
;
129 void (*func
)(void *);
130 tpool_active_t active
;
132 pthread_mutex_lock(&tpool
->tp_mutex
);
133 pthread_cleanup_push(worker_cleanup
, tpool
);
136 * This is the worker's main loop.
137 * It will only be left if a timeout or an error has occurred.
139 active
.tpa_tid
= pthread_self();
143 if (tpool
->tp_flags
& TP_WAIT
)
144 notify_waiters(tpool
);
145 while ((tpool
->tp_head
== NULL
||
146 (tpool
->tp_flags
& TP_SUSPEND
)) &&
147 !(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
))) {
148 if (tpool
->tp_current
<= tpool
->tp_minimum
||
149 tpool
->tp_linger
== 0) {
150 (void) pthread_cond_wait(&tpool
->tp_workcv
,
155 clock_gettime(CLOCK_REALTIME
, &ts
);
156 ts
.tv_sec
+= tpool
->tp_linger
;
158 if (pthread_cond_timedwait(&tpool
->tp_workcv
,
159 &tpool
->tp_mutex
, &ts
) != 0) {
166 if (tpool
->tp_flags
& TP_DESTROY
)
168 if (tpool
->tp_flags
& TP_ABANDON
) {
169 /* can't abandon a suspended pool */
170 if (tpool
->tp_flags
& TP_SUSPEND
) {
171 tpool
->tp_flags
&= ~TP_SUSPEND
;
172 (void) pthread_cond_broadcast(
175 if (tpool
->tp_head
== NULL
)
178 if ((job
= tpool
->tp_head
) != NULL
&&
179 !(tpool
->tp_flags
& TP_SUSPEND
)) {
181 func
= job
->tpj_func
;
183 tpool
->tp_head
= job
->tpj_next
;
184 if (job
== tpool
->tp_tail
)
185 tpool
->tp_tail
= NULL
;
187 active
.tpa_next
= tpool
->tp_active
;
188 tpool
->tp_active
= &active
;
189 pthread_mutex_unlock(&tpool
->tp_mutex
);
190 pthread_cleanup_push(job_cleanup
, tpool
);
194 (void) pthread_sigmask(SIG_SETMASK
, NULL
, &maskset
);
197 * Call the specified function.
201 * We don't know what this thread has been doing,
202 * so we reset its signal mask and cancellation
203 * state back to the values prior to calling func().
205 (void) pthread_sigmask(SIG_SETMASK
, &maskset
, NULL
);
206 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED
,
208 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE
,
210 pthread_cleanup_pop(1);
212 if (elapsed
&& tpool
->tp_current
> tpool
->tp_minimum
) {
214 * We timed out and there is no work to be done
215 * and the number of workers exceeds the minimum.
216 * Exit now to reduce the size of the pool.
221 pthread_cleanup_pop(1);
226 * Create a worker thread, with default signals blocked.
229 create_worker(tpool_t
*tpool
)
235 (void) pthread_sigmask(SIG_SETMASK
, NULL
, &oset
);
236 error
= pthread_create(&thread
, &tpool
->tp_attr
, tpool_worker
, tpool
);
237 (void) pthread_sigmask(SIG_SETMASK
, &oset
, NULL
);
243 * pthread_attr_clone: make a copy of a pthread_attr_t. When old_attr
244 * is NULL initialize the cloned attr using default values.
247 pthread_attr_clone(pthread_attr_t
*attr
, const pthread_attr_t
*old_attr
)
251 error
= pthread_attr_init(attr
);
252 if (error
|| (old_attr
== NULL
))
257 size_t cpusetsize
= sizeof (cpuset
);
258 error
= pthread_attr_getaffinity_np(old_attr
, cpusetsize
, &cpuset
);
260 error
= pthread_attr_setaffinity_np(attr
, cpusetsize
, &cpuset
);
263 #endif /* __GLIBC__ */
266 error
= pthread_attr_getdetachstate(old_attr
, &detachstate
);
268 error
= pthread_attr_setdetachstate(attr
, detachstate
);
273 error
= pthread_attr_getguardsize(old_attr
, &guardsize
);
275 error
= pthread_attr_setguardsize(attr
, guardsize
);
280 error
= pthread_attr_getinheritsched(old_attr
, &inheritsched
);
282 error
= pthread_attr_setinheritsched(attr
, inheritsched
);
286 struct sched_param param
;
287 error
= pthread_attr_getschedparam(old_attr
, ¶m
);
289 error
= pthread_attr_setschedparam(attr
, ¶m
);
294 error
= pthread_attr_getschedpolicy(old_attr
, &policy
);
296 error
= pthread_attr_setschedpolicy(attr
, policy
);
301 error
= pthread_attr_getscope(old_attr
, &scope
);
303 error
= pthread_attr_setscope(attr
, scope
);
309 error
= pthread_attr_getstack(old_attr
, &stackaddr
, &stacksize
);
311 error
= pthread_attr_setstack(attr
, stackaddr
, stacksize
);
317 pthread_attr_destroy(attr
);
322 tpool_create(uint_t min_threads
, uint_t max_threads
, uint_t linger
,
323 pthread_attr_t
*attr
)
331 if (min_threads
> max_threads
|| max_threads
< 1) {
336 if (pthread_attr_getstack(attr
, &stackaddr
, &stacksize
) != 0) {
341 * Allow only one thread in the pool with a specified stack.
342 * Require threads to have at least the minimum stack size.
344 minstack
= PTHREAD_STACK_MIN
;
345 if (stackaddr
!= NULL
) {
346 if (stacksize
< minstack
|| max_threads
!= 1) {
350 } else if (stacksize
!= 0 && stacksize
< minstack
) {
356 tpool
= calloc(1, sizeof (*tpool
));
361 (void) pthread_mutex_init(&tpool
->tp_mutex
, NULL
);
362 (void) pthread_cond_init(&tpool
->tp_busycv
, NULL
);
363 (void) pthread_cond_init(&tpool
->tp_workcv
, NULL
);
364 (void) pthread_cond_init(&tpool
->tp_waitcv
, NULL
);
365 tpool
->tp_minimum
= min_threads
;
366 tpool
->tp_maximum
= max_threads
;
367 tpool
->tp_linger
= linger
;
370 * We cannot just copy the attribute pointer.
371 * We need to initialize a new pthread_attr_t structure
372 * with the values from the user-supplied pthread_attr_t.
373 * If the attribute pointer is NULL, we need to initialize
374 * the new pthread_attr_t structure with default values.
376 error
= pthread_attr_clone(&tpool
->tp_attr
, attr
);
383 /* make all pool threads be detached daemon threads */
384 (void) pthread_attr_setdetachstate(&tpool
->tp_attr
,
385 PTHREAD_CREATE_DETACHED
);
387 /* insert into the global list of all thread pools */
388 pthread_mutex_lock(&thread_pool_lock
);
389 if (thread_pools
== NULL
) {
390 tpool
->tp_forw
= tpool
;
391 tpool
->tp_back
= tpool
;
392 thread_pools
= tpool
;
394 thread_pools
->tp_back
->tp_forw
= tpool
;
395 tpool
->tp_forw
= thread_pools
;
396 tpool
->tp_back
= thread_pools
->tp_back
;
397 thread_pools
->tp_back
= tpool
;
399 pthread_mutex_unlock(&thread_pool_lock
);
405 * Dispatch a work request to the thread pool.
406 * If there are idle workers, awaken one.
407 * Else, if the maximum number of workers has
408 * not been reached, spawn a new worker thread.
409 * Else just return with the job added to the queue.
412 tpool_dispatch(tpool_t
*tpool
, void (*func
)(void *), void *arg
)
416 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
418 if ((job
= calloc(1, sizeof (*job
))) == NULL
)
420 job
->tpj_next
= NULL
;
421 job
->tpj_func
= func
;
424 pthread_mutex_lock(&tpool
->tp_mutex
);
426 if (tpool
->tp_head
== NULL
)
427 tpool
->tp_head
= job
;
429 tpool
->tp_tail
->tpj_next
= job
;
430 tpool
->tp_tail
= job
;
433 if (!(tpool
->tp_flags
& TP_SUSPEND
)) {
434 if (tpool
->tp_idle
> 0)
435 (void) pthread_cond_signal(&tpool
->tp_workcv
);
436 else if (tpool
->tp_current
< tpool
->tp_maximum
&&
437 create_worker(tpool
) == 0)
441 pthread_mutex_unlock(&tpool
->tp_mutex
);
446 tpool_cleanup(void *arg
)
448 tpool_t
*tpool
= (tpool_t
*)arg
;
450 pthread_mutex_unlock(&tpool
->tp_mutex
);
454 * Assumes: by the time tpool_destroy() is called no one will use this
455 * thread pool in any way and no one will try to dispatch entries to it.
456 * Calling tpool_destroy() from a job in the pool will cause deadlock.
459 tpool_destroy(tpool_t
*tpool
)
461 tpool_active_t
*activep
;
463 ASSERT(!tpool_member(tpool
));
464 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
466 pthread_mutex_lock(&tpool
->tp_mutex
);
467 pthread_cleanup_push(tpool_cleanup
, tpool
);
469 /* mark the pool as being destroyed; wakeup idle workers */
470 tpool
->tp_flags
|= TP_DESTROY
;
471 tpool
->tp_flags
&= ~TP_SUSPEND
;
472 (void) pthread_cond_broadcast(&tpool
->tp_workcv
);
474 /* cancel all active workers */
475 for (activep
= tpool
->tp_active
; activep
; activep
= activep
->tpa_next
)
476 (void) pthread_cancel(activep
->tpa_tid
);
478 /* wait for all active workers to finish */
479 while (tpool
->tp_active
!= NULL
) {
480 tpool
->tp_flags
|= TP_WAIT
;
481 (void) pthread_cond_wait(&tpool
->tp_waitcv
, &tpool
->tp_mutex
);
484 /* the last worker to terminate will wake us up */
485 while (tpool
->tp_current
!= 0)
486 (void) pthread_cond_wait(&tpool
->tp_busycv
, &tpool
->tp_mutex
);
488 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
493 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
494 * The last worker to terminate will delete the pool.
497 tpool_abandon(tpool_t
*tpool
)
499 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
501 pthread_mutex_lock(&tpool
->tp_mutex
);
502 if (tpool
->tp_current
== 0) {
503 /* no workers, just delete the pool */
504 pthread_mutex_unlock(&tpool
->tp_mutex
);
507 /* wake up all workers, last one will delete the pool */
508 tpool
->tp_flags
|= TP_ABANDON
;
509 tpool
->tp_flags
&= ~TP_SUSPEND
;
510 (void) pthread_cond_broadcast(&tpool
->tp_workcv
);
511 pthread_mutex_unlock(&tpool
->tp_mutex
);
516 * Wait for all jobs to complete.
517 * Calling tpool_wait() from a job in the pool will cause deadlock.
520 tpool_wait(tpool_t
*tpool
)
522 ASSERT(!tpool_member(tpool
));
523 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
525 pthread_mutex_lock(&tpool
->tp_mutex
);
526 pthread_cleanup_push(tpool_cleanup
, tpool
);
527 while (tpool
->tp_head
!= NULL
|| tpool
->tp_active
!= NULL
) {
528 tpool
->tp_flags
|= TP_WAIT
;
529 (void) pthread_cond_wait(&tpool
->tp_waitcv
, &tpool
->tp_mutex
);
530 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
532 pthread_cleanup_pop(1); /* pthread_mutex_unlock(&tpool->tp_mutex); */
536 tpool_suspend(tpool_t
*tpool
)
538 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
540 pthread_mutex_lock(&tpool
->tp_mutex
);
541 tpool
->tp_flags
|= TP_SUSPEND
;
542 pthread_mutex_unlock(&tpool
->tp_mutex
);
546 tpool_suspended(tpool_t
*tpool
)
550 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
552 pthread_mutex_lock(&tpool
->tp_mutex
);
553 suspended
= (tpool
->tp_flags
& TP_SUSPEND
) != 0;
554 pthread_mutex_unlock(&tpool
->tp_mutex
);
560 tpool_resume(tpool_t
*tpool
)
564 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
566 pthread_mutex_lock(&tpool
->tp_mutex
);
567 if (!(tpool
->tp_flags
& TP_SUSPEND
)) {
568 pthread_mutex_unlock(&tpool
->tp_mutex
);
571 tpool
->tp_flags
&= ~TP_SUSPEND
;
572 (void) pthread_cond_broadcast(&tpool
->tp_workcv
);
573 excess
= tpool
->tp_njobs
- tpool
->tp_idle
;
574 while (excess
-- > 0 && tpool
->tp_current
< tpool
->tp_maximum
) {
575 if (create_worker(tpool
) != 0)
576 break; /* pthread_create() failed */
579 pthread_mutex_unlock(&tpool
->tp_mutex
);
583 tpool_member(tpool_t
*tpool
)
585 pthread_t my_tid
= pthread_self();
586 tpool_active_t
*activep
;
588 ASSERT(!(tpool
->tp_flags
& (TP_DESTROY
| TP_ABANDON
)));
590 pthread_mutex_lock(&tpool
->tp_mutex
);
591 for (activep
= tpool
->tp_active
; activep
; activep
= activep
->tpa_next
) {
592 if (activep
->tpa_tid
== my_tid
) {
593 pthread_mutex_unlock(&tpool
->tp_mutex
);
597 pthread_mutex_unlock(&tpool
->tp_mutex
);