import less(1)
[unleashed/tickless.git] / usr / src / lib / libc / port / rt / sigev_thread.c
blob05f3785c25f2545489fa715e5109f8c2095b0cc5
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
29 #include "lint.h"
30 #include "thr_uberdata.h"
31 #include <sys/types.h>
32 #include <pthread.h>
33 #include <unistd.h>
34 #include <stdlib.h>
35 #include <thread.h>
36 #include <pthread.h>
37 #include <synch.h>
38 #include <port.h>
39 #include <signal.h>
40 #include <stdio.h>
41 #include <errno.h>
42 #include <stdarg.h>
43 #include <string.h>
44 #include <sys/aiocb.h>
45 #include <time.h>
46 #include <signal.h>
47 #include <fcntl.h>
48 #include "sigev_thread.h"
51 * There is but one spawner for all aio operations.
53 thread_communication_data_t *sigev_aio_tcd = NULL;
56 * Set non-zero via _RT_DEBUG to enable debugging printf's.
58 static int _rt_debug = 0;
60 void
61 init_sigev_thread(void)
63 char *ldebug;
65 if ((ldebug = getenv("_RT_DEBUG")) != NULL)
66 _rt_debug = atoi(ldebug);
70 * Routine to print debug messages:
71 * If _rt_debug is set, printf the debug message to stderr
72 * with an appropriate prefix.
74 /*PRINTFLIKE1*/
75 static void
76 dprintf(const char *format, ...)
78 if (_rt_debug) {
79 va_list alist;
81 va_start(alist, format);
82 flockfile(stderr);
83 pthread_cleanup_push(funlockfile, stderr);
84 (void) fputs("DEBUG: ", stderr);
85 (void) vfprintf(stderr, format, alist);
86 pthread_cleanup_pop(1); /* funlockfile(stderr) */
87 va_end(alist);
92 * The notify_thread() function can be used as the start function of a new
93 * thread but it is normally called from notifier(), below, in the context
94 * of a thread pool worker thread. It is used as the start function of a
95 * new thread only when individual pthread attributes differ from those
96 * that are common to all workers. This only occurs in the AIO case.
98 static void *
99 notify_thread(void *arg)
101 sigev_thread_data_t *stdp = arg;
102 void (*function)(union sigval) = stdp->std_func;
103 union sigval argument = stdp->std_arg;
105 lfree(stdp, sizeof (*stdp));
106 function(argument);
107 return (NULL);
111 * Thread pool interface to call the user-supplied notification function.
113 static void
114 notifier(void *arg)
116 (void) notify_thread(arg);
120 * This routine adds a new work request, described by function
121 * and argument, to the list of outstanding jobs.
122 * It returns 0 indicating success. A value != 0 indicates an error.
124 static int
125 sigev_add_work(thread_communication_data_t *tcdp,
126 void (*function)(union sigval), union sigval argument)
128 tpool_t *tpool = tcdp->tcd_poolp;
129 sigev_thread_data_t *stdp;
131 if (tpool == NULL)
132 return (EINVAL);
133 if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
134 return (errno);
135 stdp->std_func = function;
136 stdp->std_arg = argument;
137 if (tpool_dispatch(tpool, notifier, stdp) != 0) {
138 lfree(stdp, sizeof (*stdp));
139 return (errno);
141 return (0);
144 static void
145 sigev_destroy_pool(thread_communication_data_t *tcdp)
147 if (tcdp->tcd_poolp != NULL)
148 tpool_abandon(tcdp->tcd_poolp);
149 tcdp->tcd_poolp = NULL;
151 if (tcdp->tcd_subsystem == MQ) {
153 * synchronize with del_sigev_mq()
155 sig_mutex_lock(&tcdp->tcd_lock);
156 tcdp->tcd_server_id = 0;
157 if (tcdp->tcd_msg_closing) {
158 (void) cond_broadcast(&tcdp->tcd_cv);
159 sig_mutex_unlock(&tcdp->tcd_lock);
160 return; /* del_sigev_mq() will free the tcd */
162 sig_mutex_unlock(&tcdp->tcd_lock);
166 * now delete everything
168 free_sigev_handler(tcdp);
172 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
173 * functions for the daemon threads that get the event(s) for the
174 * respective SIGEV_THREAD subsystems. There is one timer spawner for
175 * each timer_create(), one mqueue spawner for every mq_open(), and
176 * exactly one aio spawner for all aio requests. These spawners add
177 * work requests to be done by a pool of daemon worker threads. In case
178 * the event requires creation of a worker thread with different pthread
179 * attributes than those from the pool of workers, a new daemon thread
180 * with these attributes is spawned apart from the pool of workers.
181 * If the spawner fails to add work or fails to create an additional
182 * thread because of lacking resources, it puts the event back into
183 * the kernel queue and re-tries some time later.
186 void *
187 timer_spawner(void *arg)
189 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
190 port_event_t port_event;
192 /* destroy the pool if we are cancelled */
193 pthread_cleanup_push(sigev_destroy_pool, tcdp);
195 for (;;) {
196 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
197 dprintf("port_get on port %d failed with %d <%s>\n",
198 tcdp->tcd_port, errno, strerror(errno));
199 break;
201 switch (port_event.portev_source) {
202 case PORT_SOURCE_TIMER:
203 break;
204 case PORT_SOURCE_ALERT:
205 if (port_event.portev_events != SIGEV_THREAD_TERM)
206 errno = EPROTO;
207 goto out;
208 default:
209 dprintf("port_get on port %d returned %u "
210 "(not PORT_SOURCE_TIMER)\n",
211 tcdp->tcd_port, port_event.portev_source);
212 errno = EPROTO;
213 goto out;
216 tcdp->tcd_overruns = port_event.portev_events - 1;
217 if (sigev_add_work(tcdp,
218 tcdp->tcd_notif.sigev_notify_function,
219 tcdp->tcd_notif.sigev_value) != 0)
220 break;
221 /* wait until job is done before looking for another */
222 tpool_wait(tcdp->tcd_poolp);
224 out:
225 pthread_cleanup_pop(1);
226 return (NULL);
229 void *
230 mqueue_spawner(void *arg)
232 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
233 int ret = 0;
234 int ntype;
235 void (*function)(union sigval);
236 union sigval argument;
238 /* destroy the pool if we are cancelled */
239 pthread_cleanup_push(sigev_destroy_pool, tcdp);
241 while (ret == 0) {
242 sig_mutex_lock(&tcdp->tcd_lock);
243 pthread_cleanup_push(sig_mutex_unlock, &tcdp->tcd_lock);
244 while ((ntype = tcdp->tcd_msg_enabled) == 0)
245 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
246 pthread_cleanup_pop(1);
248 while (sem_wait(tcdp->tcd_msg_avail) == -1)
249 continue;
251 sig_mutex_lock(&tcdp->tcd_lock);
252 tcdp->tcd_msg_enabled = 0;
253 sig_mutex_unlock(&tcdp->tcd_lock);
255 /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
256 if (ntype == SIGEV_THREAD) {
257 function = tcdp->tcd_notif.sigev_notify_function;
258 argument.sival_ptr = tcdp->tcd_msg_userval;
259 ret = sigev_add_work(tcdp, function, argument);
260 } else { /* ntype == SIGEV_PORT */
261 ret = _port_dispatch(tcdp->tcd_port, 0, PORT_SOURCE_MQ,
262 0, (uintptr_t)tcdp->tcd_msg_object,
263 tcdp->tcd_msg_userval);
266 sig_mutex_unlock(&tcdp->tcd_lock);
268 pthread_cleanup_pop(1);
269 return (NULL);
272 void *
273 aio_spawner(void *arg)
275 thread_communication_data_t *tcdp = (thread_communication_data_t *)arg;
276 int error = 0;
277 void (*function)(union sigval);
278 union sigval argument;
279 port_event_t port_event;
280 struct sigevent *sigevp;
281 timespec_t delta;
282 pthread_attr_t *attrp;
284 /* destroy the pool if we are cancelled */
285 pthread_cleanup_push(sigev_destroy_pool, tcdp);
287 while (error == 0) {
288 if (port_get(tcdp->tcd_port, &port_event, NULL) != 0) {
289 error = errno;
290 dprintf("port_get on port %d failed with %d <%s>\n",
291 tcdp->tcd_port, error, strerror(error));
292 break;
294 switch (port_event.portev_source) {
295 case PORT_SOURCE_AIO:
296 break;
297 case PORT_SOURCE_ALERT:
298 if (port_event.portev_events != SIGEV_THREAD_TERM)
299 errno = EPROTO;
300 goto out;
301 default:
302 dprintf("port_get on port %d returned %u "
303 "(not PORT_SOURCE_AIO)\n",
304 tcdp->tcd_port, port_event.portev_source);
305 errno = EPROTO;
306 goto out;
308 argument.sival_ptr = port_event.portev_user;
309 switch (port_event.portev_events) {
310 case AIOLIO:
311 #if !defined(_LP64)
312 case AIOLIO64:
313 #endif
314 sigevp = (struct sigevent *)port_event.portev_object;
315 function = sigevp->sigev_notify_function;
316 attrp = sigevp->sigev_notify_attributes;
317 break;
318 case AIOAREAD:
319 case AIOAWRITE:
320 case AIOFSYNC:
322 aiocb_t *aiocbp =
323 (aiocb_t *)port_event.portev_object;
324 function = aiocbp->aio_sigevent.sigev_notify_function;
325 attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
326 break;
328 #if !defined(_LP64)
329 case AIOAREAD64:
330 case AIOAWRITE64:
331 case AIOFSYNC64:
333 aiocb64_t *aiocbp =
334 (aiocb64_t *)port_event.portev_object;
335 function = aiocbp->aio_sigevent.sigev_notify_function;
336 attrp = aiocbp->aio_sigevent.sigev_notify_attributes;
337 break;
339 #endif
340 default:
341 function = NULL;
342 attrp = NULL;
343 break;
346 if (function == NULL)
347 error = EINVAL;
348 else if (pthread_attr_equal(attrp, tcdp->tcd_attrp))
349 error = sigev_add_work(tcdp, function, argument);
350 else {
352 * The attributes don't match.
353 * Spawn a thread with the non-matching attributes.
355 pthread_attr_t local_attr;
356 sigev_thread_data_t *stdp;
358 if ((stdp = lmalloc(sizeof (*stdp))) == NULL)
359 error = ENOMEM;
360 else
361 error = pthread_attr_clone(&local_attr, attrp);
363 if (error == 0) {
364 (void) pthread_attr_setdetachstate(
365 &local_attr, PTHREAD_CREATE_DETACHED);
366 (void) pthread_attr_setdaemonstate_np(
367 &local_attr, PTHREAD_CREATE_DAEMON_NP);
368 stdp->std_func = function;
369 stdp->std_arg = argument;
370 error = pthread_create(NULL, &local_attr,
371 notify_thread, stdp);
372 (void) pthread_attr_destroy(&local_attr);
374 if (error && stdp != NULL)
375 lfree(stdp, sizeof (*stdp));
378 if (error) {
379 dprintf("Cannot add work, error=%d <%s>.\n",
380 error, strerror(error));
381 if (error == EAGAIN || error == ENOMEM) {
382 /* (Temporary) no resources are available. */
383 if (_port_dispatch(tcdp->tcd_port, 0,
384 PORT_SOURCE_AIO, port_event.portev_events,
385 port_event.portev_object,
386 port_event.portev_user) != 0)
387 break;
388 error = 0;
389 delta.tv_sec = 0;
390 delta.tv_nsec = NANOSEC / 20; /* 50 msec */
391 (void) nanosleep(&delta, NULL);
395 out:
396 pthread_cleanup_pop(1);
397 return (NULL);
401 * Allocate a thread_communication_data_t block.
403 static thread_communication_data_t *
404 alloc_sigev_handler(subsystem_t caller)
406 thread_communication_data_t *tcdp;
408 if ((tcdp = lmalloc(sizeof (*tcdp))) != NULL) {
409 tcdp->tcd_subsystem = caller;
410 tcdp->tcd_port = -1;
411 (void) mutex_init(&tcdp->tcd_lock, USYNC_THREAD, NULL);
412 (void) cond_init(&tcdp->tcd_cv, USYNC_THREAD, NULL);
414 return (tcdp);
418 * Free a thread_communication_data_t block.
420 void
421 free_sigev_handler(thread_communication_data_t *tcdp)
423 if (tcdp->tcd_attrp) {
424 (void) pthread_attr_destroy(tcdp->tcd_attrp);
425 tcdp->tcd_attrp = NULL;
427 (void) memset(&tcdp->tcd_notif, 0, sizeof (tcdp->tcd_notif));
429 switch (tcdp->tcd_subsystem) {
430 case TIMER:
431 case AIO:
432 if (tcdp->tcd_port >= 0)
433 (void) close(tcdp->tcd_port);
434 break;
435 case MQ:
436 tcdp->tcd_msg_avail = NULL;
437 tcdp->tcd_msg_object = NULL;
438 tcdp->tcd_msg_userval = NULL;
439 tcdp->tcd_msg_enabled = 0;
440 break;
443 lfree(tcdp, sizeof (*tcdp));
447 * Initialize data structure and create the port.
449 thread_communication_data_t *
450 setup_sigev_handler(const struct sigevent *sigevp, subsystem_t caller)
452 thread_communication_data_t *tcdp;
453 int error;
455 if (sigevp == NULL) {
456 errno = EINVAL;
457 return (NULL);
460 if ((tcdp = alloc_sigev_handler(caller)) == NULL) {
461 errno = ENOMEM;
462 return (NULL);
465 if (sigevp->sigev_notify_attributes == NULL)
466 tcdp->tcd_attrp = NULL; /* default attributes */
467 else {
469 * We cannot just copy the sigevp->sigev_notify_attributes
470 * pointer. We need to initialize a new pthread_attr_t
471 * structure with the values from the user-supplied
472 * pthread_attr_t.
474 tcdp->tcd_attrp = &tcdp->tcd_user_attr;
475 error = pthread_attr_clone(tcdp->tcd_attrp,
476 sigevp->sigev_notify_attributes);
477 if (error) {
478 tcdp->tcd_attrp = NULL;
479 free_sigev_handler(tcdp);
480 errno = error;
481 return (NULL);
484 tcdp->tcd_notif = *sigevp;
485 tcdp->tcd_notif.sigev_notify_attributes = tcdp->tcd_attrp;
487 if (caller == TIMER || caller == AIO) {
488 if ((tcdp->tcd_port = port_create()) < 0 ||
489 fcntl(tcdp->tcd_port, FD_CLOEXEC) == -1) {
490 free_sigev_handler(tcdp);
491 errno = EBADF;
492 return (NULL);
495 return (tcdp);
499 * Create a thread pool and launch the spawner.
502 launch_spawner(thread_communication_data_t *tcdp)
504 int ret;
505 int maxworkers;
506 void *(*spawner)(void *);
507 sigset_t set;
508 sigset_t oset;
510 switch (tcdp->tcd_subsystem) {
511 case TIMER:
512 spawner = timer_spawner;
513 maxworkers = 1;
514 break;
515 case MQ:
516 spawner = mqueue_spawner;
517 maxworkers = 1;
518 break;
519 case AIO:
520 spawner = aio_spawner;
521 maxworkers = 100;
522 break;
523 default:
524 return (-1);
526 tcdp->tcd_poolp = tpool_create(1, maxworkers, 20,
527 tcdp->tcd_notif.sigev_notify_attributes);
528 if (tcdp->tcd_poolp == NULL)
529 return (-1);
530 /* create the spawner with all signals blocked */
531 (void) sigfillset(&set);
532 (void) thr_sigsetmask(SIG_SETMASK, &set, &oset);
533 ret = thr_create(NULL, 0, spawner, tcdp,
534 THR_DETACHED | THR_DAEMON, &tcdp->tcd_server_id);
535 (void) thr_sigsetmask(SIG_SETMASK, &oset, NULL);
536 if (ret != 0) {
537 tpool_destroy(tcdp->tcd_poolp);
538 tcdp->tcd_poolp = NULL;
539 return (-1);
541 return (0);
545 * Delete the data associated with the sigev_thread timer, if timer is
546 * associated with such a notification option.
547 * Destroy the timer_spawner thread.
550 del_sigev_timer(timer_t timer)
552 int rc = 0;
553 thread_communication_data_t *tcdp;
555 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL) {
556 sig_mutex_lock(&tcdp->tcd_lock);
557 if (tcdp->tcd_port >= 0) {
558 if ((rc = port_alert(tcdp->tcd_port,
559 PORT_ALERT_SET, SIGEV_THREAD_TERM, NULL)) == 0) {
560 dprintf("del_sigev_timer(%d) OK.\n", timer);
563 timer_tcd[timer] = NULL;
564 sig_mutex_unlock(&tcdp->tcd_lock);
566 return (rc);
570 sigev_timer_getoverrun(timer_t timer)
572 thread_communication_data_t *tcdp;
574 if ((uint_t)timer < timer_max && (tcdp = timer_tcd[timer]) != NULL)
575 return (tcdp->tcd_overruns);
576 return (0);
579 static void
580 del_sigev_mq_cleanup(thread_communication_data_t *tcdp)
582 sig_mutex_unlock(&tcdp->tcd_lock);
583 free_sigev_handler(tcdp);
587 * Delete the data associated with the sigev_thread message queue,
588 * if the message queue is associated with such a notification option.
589 * Destroy the mqueue_spawner thread.
591 void
592 del_sigev_mq(thread_communication_data_t *tcdp)
594 pthread_t server_id;
595 int rc;
597 sig_mutex_lock(&tcdp->tcd_lock);
599 server_id = tcdp->tcd_server_id;
600 tcdp->tcd_msg_closing = 1;
601 if ((rc = pthread_cancel(server_id)) != 0) { /* "can't happen" */
602 sig_mutex_unlock(&tcdp->tcd_lock);
603 dprintf("Fail to cancel %u with error %d <%s>.\n",
604 server_id, rc, strerror(rc));
605 return;
609 * wait for sigev_destroy_pool() to finish
611 pthread_cleanup_push(del_sigev_mq_cleanup, tcdp);
612 while (tcdp->tcd_server_id == server_id)
613 (void) sig_cond_wait(&tcdp->tcd_cv, &tcdp->tcd_lock);
614 pthread_cleanup_pop(1);
618 * POSIX aio:
619 * If the notification type is SIGEV_THREAD, set up
620 * the port number for notifications. Create the
621 * thread pool and launch the spawner if necessary.
622 * If the notification type is not SIGEV_THREAD, do nothing.
625 _aio_sigev_thread_init(struct sigevent *sigevp)
627 static mutex_t sigev_aio_lock = DEFAULTMUTEX;
628 static cond_t sigev_aio_cv = DEFAULTCV;
629 static int sigev_aio_busy = 0;
631 thread_communication_data_t *tcdp;
632 int port;
633 int cancel_state;
634 int rc = 0;
636 if (sigevp == NULL ||
637 sigevp->sigev_notify != SIGEV_THREAD ||
638 sigevp->sigev_notify_function == NULL)
639 return (0);
641 lmutex_lock(&sigev_aio_lock);
642 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
643 while (sigev_aio_busy)
644 (void) cond_wait(&sigev_aio_cv, &sigev_aio_lock);
645 (void) pthread_setcancelstate(cancel_state, NULL);
646 if ((tcdp = sigev_aio_tcd) != NULL)
647 port = tcdp->tcd_port;
648 else {
649 sigev_aio_busy = 1;
650 lmutex_unlock(&sigev_aio_lock);
652 tcdp = setup_sigev_handler(sigevp, AIO);
653 if (tcdp == NULL) {
654 port = -1;
655 rc = -1;
656 } else if (launch_spawner(tcdp) != 0) {
657 free_sigev_handler(tcdp);
658 tcdp = NULL;
659 port = -1;
660 rc = -1;
661 } else {
662 port = tcdp->tcd_port;
665 lmutex_lock(&sigev_aio_lock);
666 sigev_aio_tcd = tcdp;
667 sigev_aio_busy = 0;
668 (void) cond_broadcast(&sigev_aio_cv);
670 lmutex_unlock(&sigev_aio_lock);
671 sigevp->sigev_signo = port;
672 return (rc);
676 _aio_sigev_thread(aiocb_t *aiocbp)
678 if (aiocbp == NULL)
679 return (0);
680 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
683 #if !defined(_LP64)
685 _aio_sigev_thread64(aiocb64_t *aiocbp)
687 if (aiocbp == NULL)
688 return (0);
689 return (_aio_sigev_thread_init(&aiocbp->aio_sigevent));
691 #endif
694 * Cleanup POSIX aio after fork1() in the child process.
696 void
697 postfork1_child_sigev_aio(void)
699 thread_communication_data_t *tcdp;
701 if ((tcdp = sigev_aio_tcd) != NULL) {
702 sigev_aio_tcd = NULL;
703 tcd_teardown(tcdp);
708 * Utility function for the various postfork1_child_sigev_*() functions.
709 * Clean up the tcdp data structure and close the port.
711 void
712 tcd_teardown(thread_communication_data_t *tcdp)
714 if (tcdp->tcd_poolp != NULL)
715 tpool_abandon(tcdp->tcd_poolp);
716 tcdp->tcd_poolp = NULL;
717 tcdp->tcd_server_id = 0;
718 free_sigev_handler(tcdp);