import less(1)
[unleashed/tickless.git] / usr / src / lib / libc / port / tpool / thread_pool.c
blob7ab97cd6331e0e85aacb32378571cec188ddba1c
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include <stdlib.h>
32 #include <signal.h>
33 #include <errno.h>
34 #include "thread_pool_impl.h"
36 static mutex_t thread_pool_lock = DEFAULTMUTEX;
37 static tpool_t *thread_pools = NULL;
39 static void
40 delete_pool(tpool_t *tpool)
42 tpool_job_t *job;
44 ASSERT(tpool->tp_current == 0 && tpool->tp_active == NULL);
47 * Unlink the pool from the global list of all pools.
49 lmutex_lock(&thread_pool_lock);
50 if (thread_pools == tpool)
51 thread_pools = tpool->tp_forw;
52 if (thread_pools == tpool)
53 thread_pools = NULL;
54 else {
55 tpool->tp_back->tp_forw = tpool->tp_forw;
56 tpool->tp_forw->tp_back = tpool->tp_back;
58 lmutex_unlock(&thread_pool_lock);
61 * There should be no pending jobs, but just in case...
63 for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
64 tpool->tp_head = job->tpj_next;
65 lfree(job, sizeof (*job));
67 (void) pthread_attr_destroy(&tpool->tp_attr);
68 lfree(tpool, sizeof (*tpool));
72 * Worker thread is terminating.
74 static void
75 worker_cleanup(tpool_t *tpool)
77 ASSERT(MUTEX_HELD(&tpool->tp_mutex));
79 if (--tpool->tp_current == 0 &&
80 (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
81 if (tpool->tp_flags & TP_ABANDON) {
82 sig_mutex_unlock(&tpool->tp_mutex);
83 delete_pool(tpool);
84 return;
86 if (tpool->tp_flags & TP_DESTROY)
87 (void) cond_broadcast(&tpool->tp_busycv);
89 sig_mutex_unlock(&tpool->tp_mutex);
92 static void
93 notify_waiters(tpool_t *tpool)
95 if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
96 tpool->tp_flags &= ~TP_WAIT;
97 (void) cond_broadcast(&tpool->tp_waitcv);
102 * Called by a worker thread on return from a tpool_dispatch()d job.
104 static void
105 job_cleanup(tpool_t *tpool)
107 pthread_t my_tid = pthread_self();
108 tpool_active_t *activep;
109 tpool_active_t **activepp;
111 sig_mutex_lock(&tpool->tp_mutex);
112 /* CSTYLED */
113 for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
114 activep = *activepp;
115 if (activep->tpa_tid == my_tid) {
116 *activepp = activep->tpa_next;
117 break;
120 if (tpool->tp_flags & TP_WAIT)
121 notify_waiters(tpool);
124 static void *
125 tpool_worker(void *arg)
127 tpool_t *tpool = (tpool_t *)arg;
128 int elapsed;
129 tpool_job_t *job;
130 void (*func)(void *);
131 tpool_active_t active;
133 sig_mutex_lock(&tpool->tp_mutex);
134 pthread_cleanup_push(worker_cleanup, tpool);
137 * This is the worker's main loop.
138 * It will only be left if a timeout or an error has occured.
140 active.tpa_tid = pthread_self();
141 for (;;) {
142 elapsed = 0;
143 tpool->tp_idle++;
144 if (tpool->tp_flags & TP_WAIT)
145 notify_waiters(tpool);
146 while ((tpool->tp_head == NULL ||
147 (tpool->tp_flags & TP_SUSPEND)) &&
148 !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
149 if (tpool->tp_current <= tpool->tp_minimum ||
150 tpool->tp_linger == 0) {
151 (void) sig_cond_wait(&tpool->tp_workcv,
152 &tpool->tp_mutex);
153 } else {
154 timestruc_t timeout;
156 timeout.tv_sec = tpool->tp_linger;
157 timeout.tv_nsec = 0;
158 if (sig_cond_reltimedwait(&tpool->tp_workcv,
159 &tpool->tp_mutex, &timeout) != 0) {
160 elapsed = 1;
161 break;
165 tpool->tp_idle--;
166 if (tpool->tp_flags & TP_DESTROY)
167 break;
168 if (tpool->tp_flags & TP_ABANDON) {
169 /* can't abandon a suspended pool */
170 if (tpool->tp_flags & TP_SUSPEND) {
171 tpool->tp_flags &= ~TP_SUSPEND;
172 (void) cond_broadcast(&tpool->tp_workcv);
174 if (tpool->tp_head == NULL)
175 break;
177 if ((job = tpool->tp_head) != NULL &&
178 !(tpool->tp_flags & TP_SUSPEND)) {
179 elapsed = 0;
180 func = job->tpj_func;
181 arg = job->tpj_arg;
182 tpool->tp_head = job->tpj_next;
183 if (job == tpool->tp_tail)
184 tpool->tp_tail = NULL;
185 tpool->tp_njobs--;
186 active.tpa_next = tpool->tp_active;
187 tpool->tp_active = &active;
188 sig_mutex_unlock(&tpool->tp_mutex);
189 pthread_cleanup_push(job_cleanup, tpool);
190 lfree(job, sizeof (*job));
192 * Call the specified function.
194 func(arg);
196 * We don't know what this thread has been doing,
197 * so we reset its signal mask and cancellation
198 * state back to the initial values.
200 (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
201 (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
202 NULL);
203 (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
204 NULL);
205 pthread_cleanup_pop(1);
207 if (elapsed && tpool->tp_current > tpool->tp_minimum) {
209 * We timed out and there is no work to be done
210 * and the number of workers exceeds the minimum.
211 * Exit now to reduce the size of the pool.
213 break;
216 pthread_cleanup_pop(1);
217 return (arg);
221 * Create a worker thread, with all signals blocked.
223 static int
224 create_worker(tpool_t *tpool)
226 sigset_t oset;
227 int error;
229 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
230 error = pthread_create(NULL, &tpool->tp_attr, tpool_worker, tpool);
231 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
232 return (error);
235 tpool_t *
236 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
237 pthread_attr_t *attr)
239 tpool_t *tpool;
240 void *stackaddr;
241 size_t stacksize;
242 size_t minstack;
243 int error;
245 if (min_threads > max_threads || max_threads < 1) {
246 errno = EINVAL;
247 return (NULL);
249 if (attr != NULL) {
250 if (pthread_attr_getstack(attr, &stackaddr, &stacksize) != 0) {
251 errno = EINVAL;
252 return (NULL);
255 * Allow only one thread in the pool with a specified stack.
256 * Require threads to have at least the minimum stack size.
258 minstack = thr_min_stack();
259 if (stackaddr != NULL) {
260 if (stacksize < minstack || max_threads != 1) {
261 errno = EINVAL;
262 return (NULL);
264 } else if (stacksize != 0 && stacksize < minstack) {
265 errno = EINVAL;
266 return (NULL);
270 tpool = lmalloc(sizeof (*tpool));
271 if (tpool == NULL) {
272 errno = ENOMEM;
273 return (NULL);
275 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
276 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
277 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
278 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
279 tpool->tp_minimum = min_threads;
280 tpool->tp_maximum = max_threads;
281 tpool->tp_linger = linger;
284 * We cannot just copy the attribute pointer.
285 * We need to initialize a new pthread_attr_t structure
286 * with the values from the user-supplied pthread_attr_t.
287 * If the attribute pointer is NULL, we need to initialize
288 * the new pthread_attr_t structure with default values.
290 error = pthread_attr_clone(&tpool->tp_attr, attr);
291 if (error) {
292 lfree(tpool, sizeof (*tpool));
293 errno = error;
294 return (NULL);
297 /* make all pool threads be detached daemon threads */
298 (void) pthread_attr_setdetachstate(&tpool->tp_attr,
299 PTHREAD_CREATE_DETACHED);
300 (void) pthread_attr_setdaemonstate_np(&tpool->tp_attr,
301 PTHREAD_CREATE_DAEMON_NP);
303 /* insert into the global list of all thread pools */
304 lmutex_lock(&thread_pool_lock);
305 if (thread_pools == NULL) {
306 tpool->tp_forw = tpool;
307 tpool->tp_back = tpool;
308 thread_pools = tpool;
309 } else {
310 thread_pools->tp_back->tp_forw = tpool;
311 tpool->tp_forw = thread_pools;
312 tpool->tp_back = thread_pools->tp_back;
313 thread_pools->tp_back = tpool;
315 lmutex_unlock(&thread_pool_lock);
317 return (tpool);
321 * Dispatch a work request to the thread pool.
322 * If there are idle workers, awaken one.
323 * Else, if the maximum number of workers has
324 * not been reached, spawn a new worker thread.
325 * Else just return with the job added to the queue.
328 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
330 tpool_job_t *job;
332 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
334 if ((job = lmalloc(sizeof (*job))) == NULL)
335 return (-1);
336 job->tpj_next = NULL;
337 job->tpj_func = func;
338 job->tpj_arg = arg;
340 sig_mutex_lock(&tpool->tp_mutex);
342 if (tpool->tp_head == NULL)
343 tpool->tp_head = job;
344 else
345 tpool->tp_tail->tpj_next = job;
346 tpool->tp_tail = job;
347 tpool->tp_njobs++;
349 if (!(tpool->tp_flags & TP_SUSPEND)) {
350 if (tpool->tp_idle > 0)
351 (void) cond_signal(&tpool->tp_workcv);
352 else if (tpool->tp_current < tpool->tp_maximum &&
353 create_worker(tpool) == 0)
354 tpool->tp_current++;
357 sig_mutex_unlock(&tpool->tp_mutex);
358 return (0);
362 * Assumes: by the time tpool_destroy() is called no one will use this
363 * thread pool in any way and no one will try to dispatch entries to it.
364 * Calling tpool_destroy() from a job in the pool will cause deadlock.
366 void
367 tpool_destroy(tpool_t *tpool)
369 tpool_active_t *activep;
371 ASSERT(!tpool_member(tpool));
372 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
374 sig_mutex_lock(&tpool->tp_mutex);
375 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
377 /* mark the pool as being destroyed; wakeup idle workers */
378 tpool->tp_flags |= TP_DESTROY;
379 tpool->tp_flags &= ~TP_SUSPEND;
380 (void) cond_broadcast(&tpool->tp_workcv);
382 /* cancel all active workers */
383 for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
384 (void) pthread_cancel(activep->tpa_tid);
386 /* wait for all active workers to finish */
387 while (tpool->tp_active != NULL) {
388 tpool->tp_flags |= TP_WAIT;
389 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
392 /* the last worker to terminate will wake us up */
393 while (tpool->tp_current != 0)
394 (void) sig_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
396 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
397 delete_pool(tpool);
401 * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
402 * The last worker to terminate will delete the pool.
404 void
405 tpool_abandon(tpool_t *tpool)
407 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
409 sig_mutex_lock(&tpool->tp_mutex);
410 if (tpool->tp_current == 0) {
411 /* no workers, just delete the pool */
412 sig_mutex_unlock(&tpool->tp_mutex);
413 delete_pool(tpool);
414 } else {
415 /* wake up all workers, last one will delete the pool */
416 tpool->tp_flags |= TP_ABANDON;
417 tpool->tp_flags &= ~TP_SUSPEND;
418 (void) cond_broadcast(&tpool->tp_workcv);
419 sig_mutex_unlock(&tpool->tp_mutex);
424 * Wait for all jobs to complete.
425 * Calling tpool_wait() from a job in the pool will cause deadlock.
427 void
428 tpool_wait(tpool_t *tpool)
430 ASSERT(!tpool_member(tpool));
431 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
433 sig_mutex_lock(&tpool->tp_mutex);
434 pthread_cleanup_push(sig_mutex_unlock, &tpool->tp_mutex);
435 while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
436 tpool->tp_flags |= TP_WAIT;
437 (void) sig_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
438 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
440 pthread_cleanup_pop(1); /* sig_mutex_unlock(&tpool->tp_mutex); */
443 void
444 tpool_suspend(tpool_t *tpool)
446 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
448 sig_mutex_lock(&tpool->tp_mutex);
449 tpool->tp_flags |= TP_SUSPEND;
450 sig_mutex_unlock(&tpool->tp_mutex);
454 tpool_suspended(tpool_t *tpool)
456 int suspended;
458 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
460 sig_mutex_lock(&tpool->tp_mutex);
461 suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
462 sig_mutex_unlock(&tpool->tp_mutex);
464 return (suspended);
467 void
468 tpool_resume(tpool_t *tpool)
470 int excess;
472 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
474 sig_mutex_lock(&tpool->tp_mutex);
475 if (!(tpool->tp_flags & TP_SUSPEND)) {
476 sig_mutex_unlock(&tpool->tp_mutex);
477 return;
479 tpool->tp_flags &= ~TP_SUSPEND;
480 (void) cond_broadcast(&tpool->tp_workcv);
481 excess = tpool->tp_njobs - tpool->tp_idle;
482 while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
483 if (create_worker(tpool) != 0)
484 break; /* pthread_create() failed */
485 tpool->tp_current++;
487 sig_mutex_unlock(&tpool->tp_mutex);
491 tpool_member(tpool_t *tpool)
493 pthread_t my_tid = pthread_self();
494 tpool_active_t *activep;
496 ASSERT(!(tpool->tp_flags & (TP_DESTROY | TP_ABANDON)));
498 sig_mutex_lock(&tpool->tp_mutex);
499 for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
500 if (activep->tpa_tid == my_tid) {
501 sig_mutex_unlock(&tpool->tp_mutex);
502 return (1);
505 sig_mutex_unlock(&tpool->tp_mutex);
506 return (0);
509 void
510 postfork1_child_tpool(void)
512 pthread_t my_tid = pthread_self();
513 tpool_t *tpool;
514 tpool_job_t *job;
517 * All of the thread pool workers are gone, except possibly
518 * for the current thread, if it is a thread pool worker thread.
519 * Retain the thread pools, but make them all empty. Whatever
520 * jobs were queued or running belong to the parent process.
522 top:
523 if ((tpool = thread_pools) == NULL)
524 return;
526 do {
527 tpool_active_t *activep;
529 (void) mutex_init(&tpool->tp_mutex, USYNC_THREAD, NULL);
530 (void) cond_init(&tpool->tp_busycv, USYNC_THREAD, NULL);
531 (void) cond_init(&tpool->tp_workcv, USYNC_THREAD, NULL);
532 (void) cond_init(&tpool->tp_waitcv, USYNC_THREAD, NULL);
533 for (job = tpool->tp_head; job; job = tpool->tp_head) {
534 tpool->tp_head = job->tpj_next;
535 lfree(job, sizeof (*job));
537 tpool->tp_tail = NULL;
538 tpool->tp_njobs = 0;
539 for (activep = tpool->tp_active; activep;
540 activep = activep->tpa_next) {
541 if (activep->tpa_tid == my_tid) {
542 activep->tpa_next = NULL;
543 break;
546 tpool->tp_idle = 0;
547 tpool->tp_current = 0;
548 if ((tpool->tp_active = activep) != NULL)
549 tpool->tp_current = 1;
550 tpool->tp_flags &= ~TP_WAIT;
551 if (tpool->tp_flags & (TP_DESTROY | TP_ABANDON)) {
552 tpool->tp_flags &= ~TP_DESTROY;
553 tpool->tp_flags |= TP_ABANDON;
554 if (tpool->tp_current == 0) {
555 delete_pool(tpool);
556 goto top; /* start over */
559 } while ((tpool = tpool->tp_forw) != thread_pools);