4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
30 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
33 #include <sys/types.h>
46 #include "sigev_thread.h"
50 * Default values per message queue
53 #define MQ_MAXSIZE 1024
55 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
58 * Message header which is part of messages in link list
61 uint64_t msg_next
; /* offset of next message in the link */
62 uint64_t msg_len
; /* length of the message */
66 * message queue description
69 size_t mqdn_flags
; /* open description flags */
73 * message queue descriptor structure
75 typedef struct mq_des
{
76 struct mq_des
*mqd_next
; /* list of all open mq descriptors, */
77 struct mq_des
*mqd_prev
; /* needed for fork-safety */
78 int mqd_magic
; /* magic # to identify mq_des */
79 int mqd_flags
; /* operation flag per open */
80 struct mq_header
*mqd_mq
; /* address pointer of message Q */
81 struct mq_dn
*mqd_mqdn
; /* open description */
82 thread_communication_data_t
*mqd_tcd
; /* SIGEV_THREAD notification */
83 int mqd_ownerdead
; /* mq_exclusive is inconsistent */
87 * message queue common header, part of the mmap()ed file.
88 * Since message queues may be shared between 32- and 64-bit processes,
89 * care must be taken to make sure that the elements of this structure
90 * are identical for both _LP64 and _ILP32 cases.
92 typedef struct mq_header
{
93 /* first field must be mq_totsize, DO NOT insert before this */
94 int64_t mq_totsize
; /* total size of the Queue */
95 int64_t mq_maxsz
; /* max size of each message */
96 uint32_t mq_maxmsg
; /* max messages in the queue */
97 uint32_t mq_maxprio
; /* maximum mqueue priority */
98 uint32_t mq_curmaxprio
; /* current maximum MQ priority */
99 uint32_t mq_mask
; /* priority bitmask */
100 uint64_t mq_freep
; /* free message's head pointer */
101 uint64_t mq_headpp
; /* pointer to head pointers */
102 uint64_t mq_tailpp
; /* pointer to tail pointers */
103 signotify_id_t mq_sigid
; /* notification id (3 int's) */
104 uint32_t mq_ntype
; /* notification type (SIGEV_*) */
105 uint64_t mq_des
; /* pointer to msg Q descriptor */
106 mutex_t mq_exclusive
; /* acquire for exclusive access */
107 sem_t mq_rblocked
; /* number of processes rblocked */
108 sem_t mq_notfull
; /* mq_send()'s block on this */
109 sem_t mq_notempty
; /* mq_receive()'s block on this */
110 sem_t mq_spawner
; /* spawner thread blocks on this */
114 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
115 * If this assumption is somehow invalidated, mq_open() needs to be changed
116 * back to the old version which kept a count and enforced a limit.
117 * We make sure that this is pointed out to those changing <sys/param.h>
118 * by checking _MQ_OPEN_MAX at compile time.
120 #if _MQ_OPEN_MAX != -1
121 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
124 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
127 #define MQ_ASSERT(x) assert(x);
129 #define MQ_ASSERT_PTR(_m, _p) \
130 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
131 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
134 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
136 (void) sem_getvalue((sem), &_val); \
137 assert((_val) <= val); }
140 #define MQ_ASSERT_PTR(_m, _p)
141 #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
144 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
145 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
146 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
147 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
148 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
150 #define MQ_RESERVED ((mqdes_t *)-1)
155 static mutex_t mq_list_lock
= DEFAULTMUTEX
;
156 static mqdes_t
*mq_list
= NULL
;
158 extern int __signotify(int cmd
, siginfo_t
*sigonfo
, signotify_id_t
*sn_id
);
161 mq_is_valid(mqdes_t
*mqdp
)
164 * Any use of a message queue after it was closed is
165 * undefined. But the standard strongly favours EBADF
166 * returns. Before we dereference which could be fatal,
167 * we first do some pointer sanity checks.
169 if (mqdp
!= NULL
&& mqdp
!= MQ_RESERVED
&&
170 ((uintptr_t)mqdp
& 0x7) == 0) {
171 return (mqdp
->mqd_magic
== MQ_MAGIC
);
178 mq_init(mqhdr_t
*mqhp
, size_t msgsize
, ssize_t maxmsg
)
186 * We only need to initialize the non-zero fields. The use of
187 * ftruncate() on the message queue file assures that the
188 * pages will be zero-filled.
190 (void) mutex_init(&mqhp
->mq_exclusive
,
191 USYNC_PROCESS
| LOCK_ROBUST
, NULL
);
192 (void) sem_init(&mqhp
->mq_rblocked
, 1, 0);
193 (void) sem_init(&mqhp
->mq_notempty
, 1, 0);
194 (void) sem_init(&mqhp
->mq_spawner
, 1, 0);
195 (void) sem_init(&mqhp
->mq_notfull
, 1, (uint_t
)maxmsg
);
197 mqhp
->mq_maxsz
= msgsize
;
198 mqhp
->mq_maxmsg
= maxmsg
;
201 * As of this writing (1997), there are 32 message queue priorities.
202 * If this is to change, then the size of the mq_mask will
203 * also have to change. If DEBUG is defined, assert that
204 * _MQ_PRIO_MAX hasn't changed.
206 mqhp
->mq_maxprio
= _MQ_PRIO_MAX
;
208 /* LINTED always true */
209 MQ_ASSERT(sizeof (mqhp
->mq_mask
) * 8 >= _MQ_PRIO_MAX
);
213 * Since the message queue can be mapped into different
214 * virtual address ranges by different processes, we don't
215 * keep track of pointers, only offsets into the shared region.
217 mqhp
->mq_headpp
= sizeof (mqhdr_t
);
218 mqhp
->mq_tailpp
= mqhp
->mq_headpp
+
219 mqhp
->mq_maxprio
* sizeof (uint64_t);
220 mqhp
->mq_freep
= mqhp
->mq_tailpp
+
221 mqhp
->mq_maxprio
* sizeof (uint64_t);
223 currentp
= mqhp
->mq_freep
;
224 MQ_PTR(mqhp
, currentp
)->msg_next
= 0;
226 temp
= (mqhp
->mq_maxsz
+ MQ_ALIGNSIZE
- 1) & ~(MQ_ALIGNSIZE
- 1);
227 for (i
= 1; i
< mqhp
->mq_maxmsg
; i
++) {
228 nextp
= currentp
+ sizeof (msghdr_t
) + temp
;
229 MQ_PTR(mqhp
, currentp
)->msg_next
= nextp
;
230 MQ_PTR(mqhp
, nextp
)->msg_next
= 0;
236 mq_getmsg(mqhdr_t
*mqhp
, char *msgp
, uint_t
*msg_prio
)
243 MQ_ASSERT(MUTEX_HELD(&mqhp
->mq_exclusive
));
246 * Get the head and tail pointers for the queue of maximum
247 * priority. We shouldn't be here unless there is a message for
248 * us, so it's fair to assert that both the head and tail
249 * pointers are non-NULL.
251 headpp
= HEAD_PTR(mqhp
, mqhp
->mq_curmaxprio
);
252 tailpp
= TAIL_PTR(mqhp
, mqhp
->mq_curmaxprio
);
254 if (msg_prio
!= NULL
)
255 *msg_prio
= mqhp
->mq_curmaxprio
;
258 MQ_ASSERT_PTR(mqhp
, currentp
);
259 curbuf
= MQ_PTR(mqhp
, currentp
);
261 if ((*headpp
= curbuf
->msg_next
) == 0) {
263 * We just nuked the last message in this priority's queue.
264 * Twiddle this priority's bit, and then find the next bit
267 uint_t prio
= mqhp
->mq_curmaxprio
;
269 mqhp
->mq_mask
&= ~(1u << prio
);
271 for (; prio
!= 0; prio
--)
272 if (mqhp
->mq_mask
& (1u << prio
))
274 mqhp
->mq_curmaxprio
= prio
;
280 * Copy the message, and put the buffer back on the free list.
282 (void) memcpy(msgp
, (char *)&curbuf
[1], curbuf
->msg_len
);
283 curbuf
->msg_next
= mqhp
->mq_freep
;
284 mqhp
->mq_freep
= currentp
;
286 return (curbuf
->msg_len
);
291 mq_putmsg(mqhdr_t
*mqhp
, const char *msgp
, ssize_t len
, uint_t prio
)
298 MQ_ASSERT(MUTEX_HELD(&mqhp
->mq_exclusive
));
301 * Grab a free message block, and link it in. We shouldn't
302 * be here unless there is room in the queue for us; it's
303 * fair to assert that the free pointer is non-NULL.
305 currentp
= mqhp
->mq_freep
;
306 MQ_ASSERT_PTR(mqhp
, currentp
);
307 curbuf
= MQ_PTR(mqhp
, currentp
);
310 * Remove a message from the free list, and copy in the new contents.
312 mqhp
->mq_freep
= curbuf
->msg_next
;
313 curbuf
->msg_next
= 0;
314 (void) memcpy((char *)&curbuf
[1], msgp
, len
);
315 curbuf
->msg_len
= len
;
317 headpp
= HEAD_PTR(mqhp
, prio
);
318 tailpp
= TAIL_PTR(mqhp
, prio
);
322 * This is the first message on this queue. Set the
323 * head and tail pointers, and tip the appropriate bit
324 * in the priority mask.
328 mqhp
->mq_mask
|= (1u << prio
);
329 if (prio
> mqhp
->mq_curmaxprio
)
330 mqhp
->mq_curmaxprio
= prio
;
332 MQ_ASSERT_PTR(mqhp
, *tailpp
);
333 MQ_PTR(mqhp
, *tailpp
)->msg_next
= currentp
;
339 * Send a notification and also delete the registration.
342 do_notify(mqhdr_t
*mqhp
)
344 (void) __signotify(SN_SEND
, NULL
, &mqhp
->mq_sigid
);
345 if (mqhp
->mq_ntype
== SIGEV_THREAD
||
346 mqhp
->mq_ntype
== SIGEV_PORT
)
347 (void) sem_post(&mqhp
->mq_spawner
);
353 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
354 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
355 * they fail with errno == EBADMSG. Trigger any registered notification.
358 owner_dead(mqdes_t
*mqdp
, int error
)
360 mqhdr_t
*mqhp
= mqdp
->mqd_mq
;
362 mqdp
->mqd_ownerdead
= 1;
363 (void) sem_post(&mqhp
->mq_notfull
);
364 (void) sem_post(&mqhp
->mq_notempty
);
365 if (error
== EOWNERDEAD
) {
366 if (mqhp
->mq_sigid
.sn_pid
!= 0)
368 (void) mutex_unlock(&mqhp
->mq_exclusive
);
374 mq_open(const char *path
, int oflag
, /* mode_t mode, mq_attr *attr */ ...)
378 struct mq_attr
*attr
= NULL
;
392 if (__pos4obj_check(path
) == -1)
395 /* acquire MSGQ lock to have atomic operation */
396 if (__pos4obj_lock(path
, MQ_LOCK_TYPE
) < 0)
401 /* filter oflag to have READ/WRITE/CREATE modes only */
402 oflag
= oflag
& (O_RDONLY
|O_WRONLY
|O_RDWR
|O_CREAT
|O_EXCL
|O_NONBLOCK
);
403 if ((oflag
& O_CREAT
) != 0) {
404 mode
= va_arg(ap
, mode_t
);
405 attr
= va_arg(ap
, struct mq_attr
*);
409 if ((fd
= __pos4obj_open(path
, MQ_PERM_TYPE
, oflag
,
410 mode
, &cr_flag
)) < 0)
413 /* closing permission file */
414 (void) __close_nc(fd
);
416 /* Try to open/create data file */
418 cr_flag
= PFILE_CREATE
;
421 msgsize
= MQ_MAXSIZE
;
422 } else if (attr
->mq_maxmsg
<= 0 || attr
->mq_msgsize
<= 0) {
425 } else if (attr
->mq_maxmsg
> _SEM_VALUE_MAX
) {
429 maxmsg
= attr
->mq_maxmsg
;
430 msgsize
= attr
->mq_msgsize
;
433 /* adjust for message size at word boundary */
434 temp
= (msgsize
+ MQ_ALIGNSIZE
- 1) & ~(MQ_ALIGNSIZE
- 1);
436 total_size
= sizeof (mqhdr_t
) +
437 maxmsg
* (temp
+ sizeof (msghdr_t
)) +
438 2 * _MQ_PRIO_MAX
* sizeof (uint64_t);
440 if (total_size
> SSIZE_MAX
) {
446 * data file is opened with read/write to those
447 * who have read or write permission
449 mode
= mode
| (mode
& 0444) >> 1 | (mode
& 0222) << 1;
450 if ((fd
= __pos4obj_open(path
, MQ_DATA_TYPE
,
451 (O_RDWR
|O_CREAT
|O_EXCL
), mode
, &err
)) < 0)
454 cr_flag
|= DFILE_CREATE
| DFILE_OPEN
;
456 /* force permissions to avoid umask effect */
457 if (fchmod(fd
, mode
) < 0)
460 if (ftruncate64(fd
, (off64_t
)total_size
) < 0)
463 if ((fd
= __pos4obj_open(path
, MQ_DATA_TYPE
,
464 O_RDWR
, 0666, &err
)) < 0)
466 cr_flag
= DFILE_OPEN
;
468 /* Message queue has not been initialized yet */
469 if (read(fd
, &total_size
, sizeof (total_size
)) !=
470 sizeof (total_size
) || total_size
== 0) {
475 /* Message queue too big for this process to handle */
476 if (total_size
> SSIZE_MAX
) {
482 if ((mqdp
= (mqdes_t
*)malloc(sizeof (mqdes_t
))) == NULL
) {
486 cr_flag
|= ALLOC_MEM
;
488 if ((ptr
= mmap64(NULL
, total_size
, PROT_READ
|PROT_WRITE
,
489 MAP_SHARED
, fd
, (off64_t
)0)) == MAP_FAILED
)
492 cr_flag
|= DFILE_MMAP
;
494 /* closing data file */
495 (void) __close_nc(fd
);
496 cr_flag
&= ~DFILE_OPEN
;
499 * create, unlink, size, mmap, and close description file
500 * all for a flag word in anonymous shared memory
502 if ((fd
= __pos4obj_open(path
, MQ_DSCN_TYPE
, O_RDWR
| O_CREAT
,
505 cr_flag
|= DFILE_OPEN
;
506 (void) __pos4obj_unlink(path
, MQ_DSCN_TYPE
);
507 if (ftruncate64(fd
, (off64_t
)sizeof (struct mq_dn
)) < 0)
510 if ((ptr
= mmap64(NULL
, sizeof (struct mq_dn
),
511 PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, (off64_t
)0)) == MAP_FAILED
)
514 cr_flag
|= MQDNP_MMAP
;
516 (void) __close_nc(fd
);
517 cr_flag
&= ~DFILE_OPEN
;
519 mqdp
->mqd_flags
= FFLAGS(oflag
) & (FREAD
|FWRITE
);
520 mqdnp
->mqdn_flags
= FFLAGS(oflag
) & (FNONBLOCK
);
522 /* new message queue requires initialization */
523 if ((cr_flag
& DFILE_CREATE
) != 0) {
524 /* message queue header has to be initialized */
525 mq_init(mqhp
, msgsize
, maxmsg
);
526 mqhp
->mq_totsize
= total_size
;
529 mqdp
->mqd_mqdn
= mqdnp
;
530 mqdp
->mqd_magic
= MQ_MAGIC
;
531 mqdp
->mqd_tcd
= NULL
;
532 mqdp
->mqd_ownerdead
= 0;
533 if (__pos4obj_unlock(path
, MQ_LOCK_TYPE
) == 0) {
534 lmutex_lock(&mq_list_lock
);
535 mqdp
->mqd_next
= mq_list
;
536 mqdp
->mqd_prev
= NULL
;
538 mq_list
->mqd_prev
= mqdp
;
540 lmutex_unlock(&mq_list_lock
);
541 return ((mqd_t
)mqdp
);
544 locked
= 0; /* fall into the error case */
547 if ((cr_flag
& DFILE_OPEN
) != 0)
548 (void) __close_nc(fd
);
549 if ((cr_flag
& DFILE_CREATE
) != 0)
550 (void) __pos4obj_unlink(path
, MQ_DATA_TYPE
);
551 if ((cr_flag
& PFILE_CREATE
) != 0)
552 (void) __pos4obj_unlink(path
, MQ_PERM_TYPE
);
553 if ((cr_flag
& ALLOC_MEM
) != 0)
555 if ((cr_flag
& DFILE_MMAP
) != 0)
556 (void) munmap((caddr_t
)mqhp
, (size_t)total_size
);
557 if ((cr_flag
& MQDNP_MMAP
) != 0)
558 (void) munmap((caddr_t
)mqdnp
, sizeof (struct mq_dn
));
560 (void) __pos4obj_unlock(path
, MQ_LOCK_TYPE
);
566 mq_close_cleanup(mqdes_t
*mqdp
)
568 mqhdr_t
*mqhp
= mqdp
->mqd_mq
;
569 struct mq_dn
*mqdnp
= mqdp
->mqd_mqdn
;
571 /* invalidate the descriptor before freeing it */
573 if (!mqdp
->mqd_ownerdead
)
574 (void) mutex_unlock(&mqhp
->mq_exclusive
);
576 lmutex_lock(&mq_list_lock
);
578 mqdp
->mqd_next
->mqd_prev
= mqdp
->mqd_prev
;
580 mqdp
->mqd_prev
->mqd_next
= mqdp
->mqd_next
;
582 mq_list
= mqdp
->mqd_next
;
583 lmutex_unlock(&mq_list_lock
);
586 (void) munmap((caddr_t
)mqdnp
, sizeof (struct mq_dn
));
587 (void) munmap((caddr_t
)mqhp
, (size_t)mqhp
->mq_totsize
);
591 mq_close(mqd_t mqdes
)
593 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
595 thread_communication_data_t
*tcdp
;
598 if (!mq_is_valid(mqdp
)) {
604 if ((error
= mutex_lock(&mqhp
->mq_exclusive
)) != 0) {
605 mqdp
->mqd_ownerdead
= 1;
606 if (error
== EOWNERDEAD
)
607 (void) mutex_unlock(&mqhp
->mq_exclusive
);
608 /* carry on regardless, without holding mq_exclusive */
611 if (mqhp
->mq_des
== (uintptr_t)mqdp
&&
612 mqhp
->mq_sigid
.sn_pid
== getpid()) {
613 /* notification is set for this descriptor, remove it */
614 (void) __signotify(SN_CANCEL
, NULL
, &mqhp
->mq_sigid
);
619 pthread_cleanup_push(mq_close_cleanup
, mqdp
);
620 if ((tcdp
= mqdp
->mqd_tcd
) != NULL
) {
621 mqdp
->mqd_tcd
= NULL
;
622 del_sigev_mq(tcdp
); /* possible cancellation point */
624 pthread_cleanup_pop(1); /* finish in the cleanup handler */
630 mq_unlink(const char *path
)
634 if (__pos4obj_check(path
) < 0)
637 if (__pos4obj_lock(path
, MQ_LOCK_TYPE
) < 0) {
641 err
= __pos4obj_unlink(path
, MQ_PERM_TYPE
);
643 if (err
== 0 || (err
== -1 && errno
== EEXIST
)) {
645 err
= __pos4obj_unlink(path
, MQ_DATA_TYPE
);
648 if (__pos4obj_unlock(path
, MQ_LOCK_TYPE
) < 0)
656 __mq_timedsend(mqd_t mqdes
, const char *msg_ptr
, size_t msg_len
,
657 uint_t msg_prio
, const timespec_t
*timeout
, int abs_rel
)
659 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
665 * sem_*wait() does cancellation, if called.
666 * pthread_testcancel() ensures that cancellation takes place if
667 * there is a cancellation pending when mq_*send() is called.
669 pthread_testcancel();
671 if (!mq_is_valid(mqdp
) || (mqdp
->mqd_flags
& FWRITE
) == 0) {
678 if (msg_prio
>= mqhp
->mq_maxprio
) {
682 if (msg_len
> mqhp
->mq_maxsz
) {
687 if (mqdp
->mqd_mqdn
->mqdn_flags
& O_NONBLOCK
)
688 err
= sem_trywait(&mqhp
->mq_notfull
);
691 * We might get cancelled here...
694 err
= sem_wait(&mqhp
->mq_notfull
);
695 else if (abs_rel
== ABS_TIME
)
696 err
= sem_timedwait(&mqhp
->mq_notfull
, timeout
);
698 err
= sem_reltimedwait_np(&mqhp
->mq_notfull
, timeout
);
702 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
703 * by sem_*wait(), so we can just return.
709 * By the time we're here, we know that we've got the capacity
710 * to add to the queue...now acquire the exclusive lock.
712 if ((err
= mutex_lock(&mqhp
->mq_exclusive
)) != 0) {
713 owner_dead(mqdp
, err
);
718 * Now determine if we want to kick the notification. POSIX
719 * requires that if a process has registered for notification,
720 * we must kick it when the queue makes an empty to non-empty
721 * transition, and there are no blocked receivers. Note that
722 * this mechanism does _not_ guarantee that the kicked process
723 * will be able to receive a message without blocking;
724 * another receiver could intervene in the meantime. Thus,
725 * the notification mechanism is inherently racy; all we can
726 * do is hope to minimize the window as much as possible.
727 * In general, we want to avoid kicking the notification when
728 * there are clearly receivers blocked. We'll determine if
729 * we want to kick the notification before the mq_putmsg(),
730 * but the actual signotify() won't be done until the message
733 if (mqhp
->mq_sigid
.sn_pid
!= 0) {
734 int nmessages
, nblocked
;
736 (void) sem_getvalue(&mqhp
->mq_notempty
, &nmessages
);
737 (void) sem_getvalue(&mqhp
->mq_rblocked
, &nblocked
);
739 if (nmessages
== 0 && nblocked
== 0)
743 mq_putmsg(mqhp
, msg_ptr
, (ssize_t
)msg_len
, msg_prio
);
744 (void) sem_post(&mqhp
->mq_notempty
);
747 /* notify and also delete the registration */
751 MQ_ASSERT_SEMVAL_LEQ(&mqhp
->mq_notempty
, ((int)mqhp
->mq_maxmsg
));
752 (void) mutex_unlock(&mqhp
->mq_exclusive
);
758 mq_send(mqd_t mqdes
, const char *msg_ptr
, size_t msg_len
, uint_t msg_prio
)
760 return (__mq_timedsend(mqdes
, msg_ptr
, msg_len
, msg_prio
,
765 mq_timedsend(mqd_t mqdes
, const char *msg_ptr
, size_t msg_len
,
766 uint_t msg_prio
, const timespec_t
*abs_timeout
)
768 return (__mq_timedsend(mqdes
, msg_ptr
, msg_len
, msg_prio
,
769 abs_timeout
, ABS_TIME
));
773 mq_reltimedsend_np(mqd_t mqdes
, const char *msg_ptr
, size_t msg_len
,
774 uint_t msg_prio
, const timespec_t
*rel_timeout
)
776 return (__mq_timedsend(mqdes
, msg_ptr
, msg_len
, msg_prio
,
777 rel_timeout
, REL_TIME
));
781 decrement_rblocked(mqhdr_t
*mqhp
)
785 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &cancel_state
);
786 while (sem_wait(&mqhp
->mq_rblocked
) == -1)
788 (void) pthread_setcancelstate(cancel_state
, NULL
);
792 __mq_timedreceive(mqd_t mqdes
, char *msg_ptr
, size_t msg_len
,
793 uint_t
*msg_prio
, const timespec_t
*timeout
, int abs_rel
)
795 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
801 * sem_*wait() does cancellation, if called.
802 * pthread_testcancel() ensures that cancellation takes place if
803 * there is a cancellation pending when mq_*receive() is called.
805 pthread_testcancel();
807 if (!mq_is_valid(mqdp
) || (mqdp
->mqd_flags
& FREAD
) == 0) {
809 return (ssize_t
)(-1);
814 if (msg_len
< mqhp
->mq_maxsz
) {
816 return (ssize_t
)(-1);
820 * The semaphoring scheme for mq_[timed]receive is a little hairy
821 * thanks to POSIX.1b's arcane notification mechanism. First,
822 * we try to take the common case and do a sem_trywait().
823 * If that doesn't work, and O_NONBLOCK hasn't been set,
824 * then note that we're going to sleep by incrementing the rblocked
825 * semaphore. We decrement that semaphore after waking up.
827 if (sem_trywait(&mqhp
->mq_notempty
) == -1) {
828 if ((mqdp
->mqd_mqdn
->mqdn_flags
& O_NONBLOCK
) != 0) {
830 * errno has been set to EAGAIN or EINTR by
831 * sem_trywait(), so we can just return.
836 * If we're here, then we're probably going to block...
837 * increment the rblocked semaphore. If we get
838 * cancelled, decrement_rblocked() will decrement it.
840 (void) sem_post(&mqhp
->mq_rblocked
);
842 pthread_cleanup_push(decrement_rblocked
, mqhp
);
844 err
= sem_wait(&mqhp
->mq_notempty
);
845 else if (abs_rel
== ABS_TIME
)
846 err
= sem_timedwait(&mqhp
->mq_notempty
, timeout
);
848 err
= sem_reltimedwait_np(&mqhp
->mq_notempty
, timeout
);
849 pthread_cleanup_pop(1);
853 * We took a signal or timeout while waiting
860 if ((err
= mutex_lock(&mqhp
->mq_exclusive
)) != 0) {
861 owner_dead(mqdp
, err
);
864 msg_size
= mq_getmsg(mqhp
, msg_ptr
, msg_prio
);
865 (void) sem_post(&mqhp
->mq_notfull
);
866 MQ_ASSERT_SEMVAL_LEQ(&mqhp
->mq_notfull
, ((int)mqhp
->mq_maxmsg
));
867 (void) mutex_unlock(&mqhp
->mq_exclusive
);
873 mq_receive(mqd_t mqdes
, char *msg_ptr
, size_t msg_len
, uint_t
*msg_prio
)
875 return (__mq_timedreceive(mqdes
, msg_ptr
, msg_len
, msg_prio
,
880 mq_timedreceive(mqd_t mqdes
, char *msg_ptr
, size_t msg_len
,
881 uint_t
*msg_prio
, const timespec_t
*abs_timeout
)
883 return (__mq_timedreceive(mqdes
, msg_ptr
, msg_len
, msg_prio
,
884 abs_timeout
, ABS_TIME
));
888 mq_reltimedreceive_np(mqd_t mqdes
, char *msg_ptr
, size_t msg_len
,
889 uint_t
*msg_prio
, const timespec_t
*rel_timeout
)
891 return (__mq_timedreceive(mqdes
, msg_ptr
, msg_len
, msg_prio
,
892 rel_timeout
, REL_TIME
));
896 * Only used below, in mq_notify().
897 * We already have a spawner thread.
898 * Verify that the attributes match; cancel it if necessary.
901 cancel_if_necessary(thread_communication_data_t
*tcdp
,
902 const struct sigevent
*sigevp
)
904 int do_cancel
= !pthread_attr_equal(tcdp
->tcd_attrp
,
905 sigevp
->sigev_notify_attributes
);
909 * Attributes don't match, cancel the spawner thread.
911 (void) pthread_cancel(tcdp
->tcd_server_id
);
914 * Reuse the existing spawner thread with possibly
915 * changed notification function and value.
917 tcdp
->tcd_notif
.sigev_notify
= SIGEV_THREAD
;
918 tcdp
->tcd_notif
.sigev_signo
= 0;
919 tcdp
->tcd_notif
.sigev_value
= sigevp
->sigev_value
;
920 tcdp
->tcd_notif
.sigev_notify_function
=
921 sigevp
->sigev_notify_function
;
928 mq_notify(mqd_t mqdes
, const struct sigevent
*sigevp
)
930 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
932 thread_communication_data_t
*tcdp
;
933 siginfo_t mq_siginfo
;
934 struct sigevent sigevent
;
943 if (!mq_is_valid(mqdp
)) {
950 if ((error
= mutex_lock(&mqhp
->mq_exclusive
)) != 0) {
951 mqdp
->mqd_ownerdead
= 1;
953 if (error
== EOWNERDEAD
)
954 (void) mutex_unlock(&mqhp
->mq_exclusive
);
955 /* carry on regardless, without holding mq_exclusive */
958 if (sigevp
== NULL
) { /* remove notification */
959 if (mqhp
->mq_des
== (uintptr_t)mqdp
&&
960 mqhp
->mq_sigid
.sn_pid
== getpid()) {
961 /* notification is set for this descriptor, remove it */
962 (void) __signotify(SN_CANCEL
, NULL
, &mqhp
->mq_sigid
);
963 if ((tcdp
= mqdp
->mqd_tcd
) != NULL
) {
964 sig_mutex_lock(&tcdp
->tcd_lock
);
965 if (tcdp
->tcd_msg_enabled
) {
966 /* cancel the spawner thread */
967 tcdp
= mqdp
->mqd_tcd
;
968 mqdp
->mqd_tcd
= NULL
;
969 (void) pthread_cancel(
970 tcdp
->tcd_server_id
);
972 sig_mutex_unlock(&tcdp
->tcd_lock
);
977 /* notification is not set for this descriptor */
981 } else { /* register notification with this process */
982 switch (ntype
= sigevp
->sigev_notify
) {
984 userval
= sigevp
->sigev_value
.sival_ptr
;
988 pn
= sigevp
->sigev_value
.sival_ptr
;
989 userval
= pn
->portnfy_user
;
990 port
= pn
->portnfy_port
;
991 if (fstat64(port
, &statb
) != 0 ||
992 !S_ISPORT(statb
.st_mode
)) {
996 (void) memset(&sigevent
, 0, sizeof (sigevent
));
997 sigevent
.sigev_notify
= SIGEV_PORT
;
1003 mq_siginfo
.si_signo
= 0;
1004 mq_siginfo
.si_code
= SI_MESGQ
;
1007 mq_siginfo
.si_signo
= sigevp
->sigev_signo
;
1008 mq_siginfo
.si_value
= sigevp
->sigev_value
;
1009 mq_siginfo
.si_code
= SI_MESGQ
;
1012 if ((tcdp
= mqdp
->mqd_tcd
) != NULL
&&
1013 cancel_if_necessary(tcdp
, sigevp
))
1014 mqdp
->mqd_tcd
= NULL
;
1017 if ((tcdp
= mqdp
->mqd_tcd
) == NULL
) {
1018 /* we must create a spawner thread */
1019 tcdp
= setup_sigev_handler(sigevp
, MQ
);
1024 tcdp
->tcd_msg_enabled
= 0;
1025 tcdp
->tcd_msg_closing
= 0;
1026 tcdp
->tcd_msg_avail
= &mqhp
->mq_spawner
;
1027 if (launch_spawner(tcdp
) != 0) {
1028 free_sigev_handler(tcdp
);
1031 mqdp
->mqd_tcd
= tcdp
;
1033 mq_siginfo
.si_signo
= 0;
1034 mq_siginfo
.si_code
= SI_MESGQ
;
1041 /* register notification */
1042 if (__signotify(SN_PROC
, &mq_siginfo
, &mqhp
->mq_sigid
) < 0)
1044 mqhp
->mq_ntype
= ntype
;
1045 mqhp
->mq_des
= (uintptr_t)mqdp
;
1049 tcdp
->tcd_port
= port
;
1050 tcdp
->tcd_msg_object
= mqdp
;
1051 tcdp
->tcd_msg_userval
= userval
;
1052 sig_mutex_lock(&tcdp
->tcd_lock
);
1053 tcdp
->tcd_msg_enabled
= ntype
;
1054 sig_mutex_unlock(&tcdp
->tcd_lock
);
1055 (void) cond_broadcast(&tcdp
->tcd_cv
);
1060 rval
= 0; /* success */
1063 (void) mutex_unlock(&mqhp
->mq_exclusive
);
1072 mq_setattr(mqd_t mqdes
, const struct mq_attr
*mqstat
, struct mq_attr
*omqstat
)
1074 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
1078 if (!mq_is_valid(mqdp
)) {
1083 /* store current attributes */
1084 if (omqstat
!= NULL
) {
1087 mqhp
= mqdp
->mqd_mq
;
1088 omqstat
->mq_flags
= mqdp
->mqd_mqdn
->mqdn_flags
;
1089 omqstat
->mq_maxmsg
= (long)mqhp
->mq_maxmsg
;
1090 omqstat
->mq_msgsize
= (long)mqhp
->mq_maxsz
;
1091 (void) sem_getvalue(&mqhp
->mq_notempty
, &count
);
1092 omqstat
->mq_curmsgs
= count
;
1095 /* set description attributes */
1096 if ((mqstat
->mq_flags
& O_NONBLOCK
) != 0)
1098 mqdp
->mqd_mqdn
->mqdn_flags
= flag
;
1104 mq_getattr(mqd_t mqdes
, struct mq_attr
*mqstat
)
1106 mqdes_t
*mqdp
= (mqdes_t
*)mqdes
;
1110 if (!mq_is_valid(mqdp
)) {
1115 mqhp
= mqdp
->mqd_mq
;
1117 mqstat
->mq_flags
= mqdp
->mqd_mqdn
->mqdn_flags
;
1118 mqstat
->mq_maxmsg
= (long)mqhp
->mq_maxmsg
;
1119 mqstat
->mq_msgsize
= (long)mqhp
->mq_maxsz
;
1120 (void) sem_getvalue(&mqhp
->mq_notempty
, &count
);
1121 mqstat
->mq_curmsgs
= count
;
1126 * Cleanup after fork1() in the child process.
1129 postfork1_child_sigev_mq(void)
1131 thread_communication_data_t
*tcdp
;
1134 for (mqdp
= mq_list
; mqdp
; mqdp
= mqdp
->mqd_next
) {
1135 if ((tcdp
= mqdp
->mqd_tcd
) != NULL
) {
1136 mqdp
->mqd_tcd
= NULL
;