4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #pragma ident "%Z%%M% %I% %E% SMI"
30 #include "thr_uberdata.h"
31 #include <sys/types.h>
44 #include <sys/aiocb.h>
48 #include "sigev_thread.h"
51 * There is but one spawner for all aio operations.
53 thread_communication_data_t
*sigev_aio_tcd
= NULL
;
56 * Set non-zero via _RT_DEBUG to enable debugging printf's.
58 static int _rt_debug
= 0;
61 init_sigev_thread(void)
65 if ((ldebug
= getenv("_RT_DEBUG")) != NULL
)
66 _rt_debug
= atoi(ldebug
);
70 * Routine to print debug messages:
71 * If _rt_debug is set, printf the debug message to stderr
72 * with an appropriate prefix.
76 dprintf(const char *format
, ...)
81 va_start(alist
, format
);
83 pthread_cleanup_push(funlockfile
, stderr
);
84 (void) fputs("DEBUG: ", stderr
);
85 (void) vfprintf(stderr
, format
, alist
);
86 pthread_cleanup_pop(1); /* funlockfile(stderr) */
92 * The notify_thread() function can be used as the start function of a new
93 * thread but it is normally called from notifier(), below, in the context
94 * of a thread pool worker thread. It is used as the start function of a
95 * new thread only when individual pthread attributes differ from those
96 * that are common to all workers. This only occurs in the AIO case.
99 notify_thread(void *arg
)
101 sigev_thread_data_t
*stdp
= arg
;
102 void (*function
)(union sigval
) = stdp
->std_func
;
103 union sigval argument
= stdp
->std_arg
;
105 lfree(stdp
, sizeof (*stdp
));
111 * Thread pool interface to call the user-supplied notification function.
116 (void) notify_thread(arg
);
120 * This routine adds a new work request, described by function
121 * and argument, to the list of outstanding jobs.
122 * It returns 0 indicating success. A value != 0 indicates an error.
125 sigev_add_work(thread_communication_data_t
*tcdp
,
126 void (*function
)(union sigval
), union sigval argument
)
128 tpool_t
*tpool
= tcdp
->tcd_poolp
;
129 sigev_thread_data_t
*stdp
;
133 if ((stdp
= lmalloc(sizeof (*stdp
))) == NULL
)
135 stdp
->std_func
= function
;
136 stdp
->std_arg
= argument
;
137 if (tpool_dispatch(tpool
, notifier
, stdp
) != 0) {
138 lfree(stdp
, sizeof (*stdp
));
145 sigev_destroy_pool(thread_communication_data_t
*tcdp
)
147 if (tcdp
->tcd_poolp
!= NULL
)
148 tpool_abandon(tcdp
->tcd_poolp
);
149 tcdp
->tcd_poolp
= NULL
;
151 if (tcdp
->tcd_subsystem
== MQ
) {
153 * synchronize with del_sigev_mq()
155 sig_mutex_lock(&tcdp
->tcd_lock
);
156 tcdp
->tcd_server_id
= 0;
157 if (tcdp
->tcd_msg_closing
) {
158 (void) cond_broadcast(&tcdp
->tcd_cv
);
159 sig_mutex_unlock(&tcdp
->tcd_lock
);
160 return; /* del_sigev_mq() will free the tcd */
162 sig_mutex_unlock(&tcdp
->tcd_lock
);
166 * now delete everything
168 free_sigev_handler(tcdp
);
172 * timer_spawner(), mqueue_spawner(), and aio_spawner() are the main
173 * functions for the daemon threads that get the event(s) for the
174 * respective SIGEV_THREAD subsystems. There is one timer spawner for
175 * each timer_create(), one mqueue spawner for every mq_open(), and
176 * exactly one aio spawner for all aio requests. These spawners add
177 * work requests to be done by a pool of daemon worker threads. In case
178 * the event requires creation of a worker thread with different pthread
179 * attributes than those from the pool of workers, a new daemon thread
180 * with these attributes is spawned apart from the pool of workers.
181 * If the spawner fails to add work or fails to create an additional
182 * thread because of lacking resources, it puts the event back into
183 * the kernel queue and re-tries some time later.
187 timer_spawner(void *arg
)
189 thread_communication_data_t
*tcdp
= (thread_communication_data_t
*)arg
;
190 port_event_t port_event
;
192 /* destroy the pool if we are cancelled */
193 pthread_cleanup_push(sigev_destroy_pool
, tcdp
);
196 if (port_get(tcdp
->tcd_port
, &port_event
, NULL
) != 0) {
197 dprintf("port_get on port %d failed with %d <%s>\n",
198 tcdp
->tcd_port
, errno
, strerror(errno
));
201 switch (port_event
.portev_source
) {
202 case PORT_SOURCE_TIMER
:
204 case PORT_SOURCE_ALERT
:
205 if (port_event
.portev_events
!= SIGEV_THREAD_TERM
)
209 dprintf("port_get on port %d returned %u "
210 "(not PORT_SOURCE_TIMER)\n",
211 tcdp
->tcd_port
, port_event
.portev_source
);
216 tcdp
->tcd_overruns
= port_event
.portev_events
- 1;
217 if (sigev_add_work(tcdp
,
218 tcdp
->tcd_notif
.sigev_notify_function
,
219 tcdp
->tcd_notif
.sigev_value
) != 0)
221 /* wait until job is done before looking for another */
222 tpool_wait(tcdp
->tcd_poolp
);
225 pthread_cleanup_pop(1);
230 mqueue_spawner(void *arg
)
232 thread_communication_data_t
*tcdp
= (thread_communication_data_t
*)arg
;
235 void (*function
)(union sigval
);
236 union sigval argument
;
238 /* destroy the pool if we are cancelled */
239 pthread_cleanup_push(sigev_destroy_pool
, tcdp
);
242 sig_mutex_lock(&tcdp
->tcd_lock
);
243 pthread_cleanup_push(sig_mutex_unlock
, &tcdp
->tcd_lock
);
244 while ((ntype
= tcdp
->tcd_msg_enabled
) == 0)
245 (void) sig_cond_wait(&tcdp
->tcd_cv
, &tcdp
->tcd_lock
);
246 pthread_cleanup_pop(1);
248 while (sem_wait(tcdp
->tcd_msg_avail
) == -1)
251 sig_mutex_lock(&tcdp
->tcd_lock
);
252 tcdp
->tcd_msg_enabled
= 0;
253 sig_mutex_unlock(&tcdp
->tcd_lock
);
255 /* ASSERT(ntype == SIGEV_THREAD || ntype == SIGEV_PORT); */
256 if (ntype
== SIGEV_THREAD
) {
257 function
= tcdp
->tcd_notif
.sigev_notify_function
;
258 argument
.sival_ptr
= tcdp
->tcd_msg_userval
;
259 ret
= sigev_add_work(tcdp
, function
, argument
);
260 } else { /* ntype == SIGEV_PORT */
261 ret
= _port_dispatch(tcdp
->tcd_port
, 0, PORT_SOURCE_MQ
,
262 0, (uintptr_t)tcdp
->tcd_msg_object
,
263 tcdp
->tcd_msg_userval
);
266 sig_mutex_unlock(&tcdp
->tcd_lock
);
268 pthread_cleanup_pop(1);
273 aio_spawner(void *arg
)
275 thread_communication_data_t
*tcdp
= (thread_communication_data_t
*)arg
;
277 void (*function
)(union sigval
);
278 union sigval argument
;
279 port_event_t port_event
;
280 struct sigevent
*sigevp
;
282 pthread_attr_t
*attrp
;
284 /* destroy the pool if we are cancelled */
285 pthread_cleanup_push(sigev_destroy_pool
, tcdp
);
288 if (port_get(tcdp
->tcd_port
, &port_event
, NULL
) != 0) {
290 dprintf("port_get on port %d failed with %d <%s>\n",
291 tcdp
->tcd_port
, error
, strerror(error
));
294 switch (port_event
.portev_source
) {
295 case PORT_SOURCE_AIO
:
297 case PORT_SOURCE_ALERT
:
298 if (port_event
.portev_events
!= SIGEV_THREAD_TERM
)
302 dprintf("port_get on port %d returned %u "
303 "(not PORT_SOURCE_AIO)\n",
304 tcdp
->tcd_port
, port_event
.portev_source
);
308 argument
.sival_ptr
= port_event
.portev_user
;
309 switch (port_event
.portev_events
) {
314 sigevp
= (struct sigevent
*)port_event
.portev_object
;
315 function
= sigevp
->sigev_notify_function
;
316 attrp
= sigevp
->sigev_notify_attributes
;
323 (aiocb_t
*)port_event
.portev_object
;
324 function
= aiocbp
->aio_sigevent
.sigev_notify_function
;
325 attrp
= aiocbp
->aio_sigevent
.sigev_notify_attributes
;
334 (aiocb64_t
*)port_event
.portev_object
;
335 function
= aiocbp
->aio_sigevent
.sigev_notify_function
;
336 attrp
= aiocbp
->aio_sigevent
.sigev_notify_attributes
;
346 if (function
== NULL
)
348 else if (pthread_attr_equal(attrp
, tcdp
->tcd_attrp
))
349 error
= sigev_add_work(tcdp
, function
, argument
);
352 * The attributes don't match.
353 * Spawn a thread with the non-matching attributes.
355 pthread_attr_t local_attr
;
356 sigev_thread_data_t
*stdp
;
358 if ((stdp
= lmalloc(sizeof (*stdp
))) == NULL
)
361 error
= pthread_attr_clone(&local_attr
, attrp
);
364 (void) pthread_attr_setdetachstate(
365 &local_attr
, PTHREAD_CREATE_DETACHED
);
366 (void) pthread_attr_setdaemonstate_np(
367 &local_attr
, PTHREAD_CREATE_DAEMON_NP
);
368 stdp
->std_func
= function
;
369 stdp
->std_arg
= argument
;
370 error
= pthread_create(NULL
, &local_attr
,
371 notify_thread
, stdp
);
372 (void) pthread_attr_destroy(&local_attr
);
374 if (error
&& stdp
!= NULL
)
375 lfree(stdp
, sizeof (*stdp
));
379 dprintf("Cannot add work, error=%d <%s>.\n",
380 error
, strerror(error
));
381 if (error
== EAGAIN
|| error
== ENOMEM
) {
382 /* (Temporary) no resources are available. */
383 if (_port_dispatch(tcdp
->tcd_port
, 0,
384 PORT_SOURCE_AIO
, port_event
.portev_events
,
385 port_event
.portev_object
,
386 port_event
.portev_user
) != 0)
390 delta
.tv_nsec
= NANOSEC
/ 20; /* 50 msec */
391 (void) nanosleep(&delta
, NULL
);
396 pthread_cleanup_pop(1);
401 * Allocate a thread_communication_data_t block.
403 static thread_communication_data_t
*
404 alloc_sigev_handler(subsystem_t caller
)
406 thread_communication_data_t
*tcdp
;
408 if ((tcdp
= lmalloc(sizeof (*tcdp
))) != NULL
) {
409 tcdp
->tcd_subsystem
= caller
;
411 (void) mutex_init(&tcdp
->tcd_lock
, USYNC_THREAD
, NULL
);
412 (void) cond_init(&tcdp
->tcd_cv
, USYNC_THREAD
, NULL
);
418 * Free a thread_communication_data_t block.
421 free_sigev_handler(thread_communication_data_t
*tcdp
)
423 if (tcdp
->tcd_attrp
) {
424 (void) pthread_attr_destroy(tcdp
->tcd_attrp
);
425 tcdp
->tcd_attrp
= NULL
;
427 (void) memset(&tcdp
->tcd_notif
, 0, sizeof (tcdp
->tcd_notif
));
429 switch (tcdp
->tcd_subsystem
) {
432 if (tcdp
->tcd_port
>= 0)
433 (void) close(tcdp
->tcd_port
);
436 tcdp
->tcd_msg_avail
= NULL
;
437 tcdp
->tcd_msg_object
= NULL
;
438 tcdp
->tcd_msg_userval
= NULL
;
439 tcdp
->tcd_msg_enabled
= 0;
443 lfree(tcdp
, sizeof (*tcdp
));
447 * Initialize data structure and create the port.
449 thread_communication_data_t
*
450 setup_sigev_handler(const struct sigevent
*sigevp
, subsystem_t caller
)
452 thread_communication_data_t
*tcdp
;
455 if (sigevp
== NULL
) {
460 if ((tcdp
= alloc_sigev_handler(caller
)) == NULL
) {
465 if (sigevp
->sigev_notify_attributes
== NULL
)
466 tcdp
->tcd_attrp
= NULL
; /* default attributes */
469 * We cannot just copy the sigevp->sigev_notify_attributes
470 * pointer. We need to initialize a new pthread_attr_t
471 * structure with the values from the user-supplied
474 tcdp
->tcd_attrp
= &tcdp
->tcd_user_attr
;
475 error
= pthread_attr_clone(tcdp
->tcd_attrp
,
476 sigevp
->sigev_notify_attributes
);
478 tcdp
->tcd_attrp
= NULL
;
479 free_sigev_handler(tcdp
);
484 tcdp
->tcd_notif
= *sigevp
;
485 tcdp
->tcd_notif
.sigev_notify_attributes
= tcdp
->tcd_attrp
;
487 if (caller
== TIMER
|| caller
== AIO
) {
488 if ((tcdp
->tcd_port
= port_create()) < 0 ||
489 fcntl(tcdp
->tcd_port
, FD_CLOEXEC
) == -1) {
490 free_sigev_handler(tcdp
);
499 * Create a thread pool and launch the spawner.
502 launch_spawner(thread_communication_data_t
*tcdp
)
506 void *(*spawner
)(void *);
510 switch (tcdp
->tcd_subsystem
) {
512 spawner
= timer_spawner
;
516 spawner
= mqueue_spawner
;
520 spawner
= aio_spawner
;
526 tcdp
->tcd_poolp
= tpool_create(1, maxworkers
, 20,
527 tcdp
->tcd_notif
.sigev_notify_attributes
);
528 if (tcdp
->tcd_poolp
== NULL
)
530 /* create the spawner with all signals blocked */
531 (void) sigfillset(&set
);
532 (void) thr_sigsetmask(SIG_SETMASK
, &set
, &oset
);
533 ret
= thr_create(NULL
, 0, spawner
, tcdp
,
534 THR_DETACHED
| THR_DAEMON
, &tcdp
->tcd_server_id
);
535 (void) thr_sigsetmask(SIG_SETMASK
, &oset
, NULL
);
537 tpool_destroy(tcdp
->tcd_poolp
);
538 tcdp
->tcd_poolp
= NULL
;
545 * Delete the data associated with the sigev_thread timer, if timer is
546 * associated with such a notification option.
547 * Destroy the timer_spawner thread.
550 del_sigev_timer(timer_t timer
)
553 thread_communication_data_t
*tcdp
;
555 if ((uint_t
)timer
< timer_max
&& (tcdp
= timer_tcd
[timer
]) != NULL
) {
556 sig_mutex_lock(&tcdp
->tcd_lock
);
557 if (tcdp
->tcd_port
>= 0) {
558 if ((rc
= port_alert(tcdp
->tcd_port
,
559 PORT_ALERT_SET
, SIGEV_THREAD_TERM
, NULL
)) == 0) {
560 dprintf("del_sigev_timer(%d) OK.\n", timer
);
563 timer_tcd
[timer
] = NULL
;
564 sig_mutex_unlock(&tcdp
->tcd_lock
);
570 sigev_timer_getoverrun(timer_t timer
)
572 thread_communication_data_t
*tcdp
;
574 if ((uint_t
)timer
< timer_max
&& (tcdp
= timer_tcd
[timer
]) != NULL
)
575 return (tcdp
->tcd_overruns
);
580 del_sigev_mq_cleanup(thread_communication_data_t
*tcdp
)
582 sig_mutex_unlock(&tcdp
->tcd_lock
);
583 free_sigev_handler(tcdp
);
587 * Delete the data associated with the sigev_thread message queue,
588 * if the message queue is associated with such a notification option.
589 * Destroy the mqueue_spawner thread.
592 del_sigev_mq(thread_communication_data_t
*tcdp
)
597 sig_mutex_lock(&tcdp
->tcd_lock
);
599 server_id
= tcdp
->tcd_server_id
;
600 tcdp
->tcd_msg_closing
= 1;
601 if ((rc
= pthread_cancel(server_id
)) != 0) { /* "can't happen" */
602 sig_mutex_unlock(&tcdp
->tcd_lock
);
603 dprintf("Fail to cancel %u with error %d <%s>.\n",
604 server_id
, rc
, strerror(rc
));
609 * wait for sigev_destroy_pool() to finish
611 pthread_cleanup_push(del_sigev_mq_cleanup
, tcdp
);
612 while (tcdp
->tcd_server_id
== server_id
)
613 (void) sig_cond_wait(&tcdp
->tcd_cv
, &tcdp
->tcd_lock
);
614 pthread_cleanup_pop(1);
619 * If the notification type is SIGEV_THREAD, set up
620 * the port number for notifications. Create the
621 * thread pool and launch the spawner if necessary.
622 * If the notification type is not SIGEV_THREAD, do nothing.
625 _aio_sigev_thread_init(struct sigevent
*sigevp
)
627 static mutex_t sigev_aio_lock
= DEFAULTMUTEX
;
628 static cond_t sigev_aio_cv
= DEFAULTCV
;
629 static int sigev_aio_busy
= 0;
631 thread_communication_data_t
*tcdp
;
636 if (sigevp
== NULL
||
637 sigevp
->sigev_notify
!= SIGEV_THREAD
||
638 sigevp
->sigev_notify_function
== NULL
)
641 lmutex_lock(&sigev_aio_lock
);
642 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &cancel_state
);
643 while (sigev_aio_busy
)
644 (void) cond_wait(&sigev_aio_cv
, &sigev_aio_lock
);
645 (void) pthread_setcancelstate(cancel_state
, NULL
);
646 if ((tcdp
= sigev_aio_tcd
) != NULL
)
647 port
= tcdp
->tcd_port
;
650 lmutex_unlock(&sigev_aio_lock
);
652 tcdp
= setup_sigev_handler(sigevp
, AIO
);
656 } else if (launch_spawner(tcdp
) != 0) {
657 free_sigev_handler(tcdp
);
662 port
= tcdp
->tcd_port
;
665 lmutex_lock(&sigev_aio_lock
);
666 sigev_aio_tcd
= tcdp
;
668 (void) cond_broadcast(&sigev_aio_cv
);
670 lmutex_unlock(&sigev_aio_lock
);
671 sigevp
->sigev_signo
= port
;
676 _aio_sigev_thread(aiocb_t
*aiocbp
)
680 return (_aio_sigev_thread_init(&aiocbp
->aio_sigevent
));
685 _aio_sigev_thread64(aiocb64_t
*aiocbp
)
689 return (_aio_sigev_thread_init(&aiocbp
->aio_sigevent
));
694 * Cleanup POSIX aio after fork1() in the child process.
697 postfork1_child_sigev_aio(void)
699 thread_communication_data_t
*tcdp
;
701 if ((tcdp
= sigev_aio_tcd
) != NULL
) {
702 sigev_aio_tcd
= NULL
;
708 * Utility function for the various postfork1_child_sigev_*() functions.
709 * Clean up the tcdp data structure and close the port.
712 tcd_teardown(thread_communication_data_t
*tcdp
)
714 if (tcdp
->tcd_poolp
!= NULL
)
715 tpool_abandon(tcdp
->tcd_poolp
);
716 tcdp
->tcd_poolp
= NULL
;
717 tcdp
->tcd_server_id
= 0;
718 free_sigev_handler(tcdp
);