dmake: do not set MAKEFLAGS=k
[unleashed/tickless.git] / usr / src / lib / libc / port / rt / mqueue.c
blobb53af9aa6f0ba6dd03c6649dd76ce138817d4a92
1 /*
2 * CDDL HEADER START
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
19 * CDDL HEADER END
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
27 #include "lint.h"
28 #include "mtlib.h"
29 #define _KMEMUSER
30 #include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
31 #undef _KMEMUSER
32 #include <mqueue.h>
33 #include <sys/types.h>
34 #include <sys/file.h>
35 #include <sys/mman.h>
36 #include <errno.h>
37 #include <stdarg.h>
38 #include <limits.h>
39 #include <pthread.h>
40 #include <assert.h>
41 #include <string.h>
42 #include <unistd.h>
43 #include <stdlib.h>
44 #include <sys/stat.h>
45 #include <inttypes.h>
46 #include "sigev_thread.h"
47 #include "pos4obj.h"
50 * Default values per message queue
52 #define MQ_MAXMSG 128
53 #define MQ_MAXSIZE 1024
55 #define MQ_MAGIC 0x4d534751 /* "MSGQ" */
58 * Message header which is part of messages in link list
60 typedef struct {
61 uint64_t msg_next; /* offset of next message in the link */
62 uint64_t msg_len; /* length of the message */
63 } msghdr_t;
66 * message queue description
68 struct mq_dn {
69 size_t mqdn_flags; /* open description flags */
73 * message queue descriptor structure
75 typedef struct mq_des {
76 struct mq_des *mqd_next; /* list of all open mq descriptors, */
77 struct mq_des *mqd_prev; /* needed for fork-safety */
78 int mqd_magic; /* magic # to identify mq_des */
79 int mqd_flags; /* operation flag per open */
80 struct mq_header *mqd_mq; /* address pointer of message Q */
81 struct mq_dn *mqd_mqdn; /* open description */
82 thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
83 int mqd_ownerdead; /* mq_exclusive is inconsistent */
84 } mqdes_t;
87 * message queue common header, part of the mmap()ed file.
88 * Since message queues may be shared between 32- and 64-bit processes,
89 * care must be taken to make sure that the elements of this structure
90 * are identical for both _LP64 and _ILP32 cases.
92 typedef struct mq_header {
93 /* first field must be mq_totsize, DO NOT insert before this */
94 int64_t mq_totsize; /* total size of the Queue */
95 int64_t mq_maxsz; /* max size of each message */
96 uint32_t mq_maxmsg; /* max messages in the queue */
97 uint32_t mq_maxprio; /* maximum mqueue priority */
98 uint32_t mq_curmaxprio; /* current maximum MQ priority */
99 uint32_t mq_mask; /* priority bitmask */
100 uint64_t mq_freep; /* free message's head pointer */
101 uint64_t mq_headpp; /* pointer to head pointers */
102 uint64_t mq_tailpp; /* pointer to tail pointers */
103 signotify_id_t mq_sigid; /* notification id (3 int's) */
104 uint32_t mq_ntype; /* notification type (SIGEV_*) */
105 uint64_t mq_des; /* pointer to msg Q descriptor */
106 mutex_t mq_exclusive; /* acquire for exclusive access */
107 sem_t mq_rblocked; /* number of processes rblocked */
108 sem_t mq_notfull; /* mq_send()'s block on this */
109 sem_t mq_notempty; /* mq_receive()'s block on this */
110 sem_t mq_spawner; /* spawner thread blocks on this */
111 } mqhdr_t;
114 * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
115 * If this assumption is somehow invalidated, mq_open() needs to be changed
116 * back to the old version which kept a count and enforced a limit.
117 * We make sure that this is pointed out to those changing <sys/param.h>
118 * by checking _MQ_OPEN_MAX at compile time.
120 #if _MQ_OPEN_MAX != -1
121 #error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
122 #endif
124 #define MQ_ALIGNSIZE 8 /* 64-bit alignment */
126 #ifdef DEBUG
127 #define MQ_ASSERT(x) assert(x);
129 #define MQ_ASSERT_PTR(_m, _p) \
130 assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
131 !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
132 _m->mq_totsize));
134 #define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
135 int _val; \
136 (void) sem_getvalue((sem), &_val); \
137 assert((_val) <= val); }
138 #else
139 #define MQ_ASSERT(x)
140 #define MQ_ASSERT_PTR(_m, _p)
141 #define MQ_ASSERT_SEMVAL_LEQ(sem, val)
142 #endif
144 #define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
145 #define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
146 (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
147 #define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
148 (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
150 #define MQ_RESERVED ((mqdes_t *)-1)
152 #define ABS_TIME 0
153 #define REL_TIME 1
155 static mutex_t mq_list_lock = DEFAULTMUTEX;
156 static mqdes_t *mq_list = NULL;
158 extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
160 static int
161 mq_is_valid(mqdes_t *mqdp)
164 * Any use of a message queue after it was closed is
165 * undefined. But the standard strongly favours EBADF
166 * returns. Before we dereference which could be fatal,
167 * we first do some pointer sanity checks.
169 if (mqdp != NULL && mqdp != MQ_RESERVED &&
170 ((uintptr_t)mqdp & 0x7) == 0) {
171 return (mqdp->mqd_magic == MQ_MAGIC);
174 return (0);
177 static void
178 mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
180 int i;
181 uint64_t temp;
182 uint64_t currentp;
183 uint64_t nextp;
186 * We only need to initialize the non-zero fields. The use of
187 * ftruncate() on the message queue file assures that the
188 * pages will be zero-filled.
190 (void) mutex_init(&mqhp->mq_exclusive,
191 USYNC_PROCESS | LOCK_ROBUST, NULL);
192 (void) sem_init(&mqhp->mq_rblocked, 1, 0);
193 (void) sem_init(&mqhp->mq_notempty, 1, 0);
194 (void) sem_init(&mqhp->mq_spawner, 1, 0);
195 (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
197 mqhp->mq_maxsz = msgsize;
198 mqhp->mq_maxmsg = maxmsg;
201 * As of this writing (1997), there are 32 message queue priorities.
202 * If this is to change, then the size of the mq_mask will
203 * also have to change. If DEBUG is defined, assert that
204 * _MQ_PRIO_MAX hasn't changed.
206 mqhp->mq_maxprio = _MQ_PRIO_MAX;
207 #if defined(DEBUG)
208 /* LINTED always true */
209 MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
210 #endif
213 * Since the message queue can be mapped into different
214 * virtual address ranges by different processes, we don't
215 * keep track of pointers, only offsets into the shared region.
217 mqhp->mq_headpp = sizeof (mqhdr_t);
218 mqhp->mq_tailpp = mqhp->mq_headpp +
219 mqhp->mq_maxprio * sizeof (uint64_t);
220 mqhp->mq_freep = mqhp->mq_tailpp +
221 mqhp->mq_maxprio * sizeof (uint64_t);
223 currentp = mqhp->mq_freep;
224 MQ_PTR(mqhp, currentp)->msg_next = 0;
226 temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
227 for (i = 1; i < mqhp->mq_maxmsg; i++) {
228 nextp = currentp + sizeof (msghdr_t) + temp;
229 MQ_PTR(mqhp, currentp)->msg_next = nextp;
230 MQ_PTR(mqhp, nextp)->msg_next = 0;
231 currentp = nextp;
235 static size_t
236 mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
238 uint64_t currentp;
239 msghdr_t *curbuf;
240 uint64_t *headpp;
241 uint64_t *tailpp;
243 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
246 * Get the head and tail pointers for the queue of maximum
247 * priority. We shouldn't be here unless there is a message for
248 * us, so it's fair to assert that both the head and tail
249 * pointers are non-NULL.
251 headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
252 tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
254 if (msg_prio != NULL)
255 *msg_prio = mqhp->mq_curmaxprio;
257 currentp = *headpp;
258 MQ_ASSERT_PTR(mqhp, currentp);
259 curbuf = MQ_PTR(mqhp, currentp);
261 if ((*headpp = curbuf->msg_next) == 0) {
263 * We just nuked the last message in this priority's queue.
264 * Twiddle this priority's bit, and then find the next bit
265 * tipped.
267 uint_t prio = mqhp->mq_curmaxprio;
269 mqhp->mq_mask &= ~(1u << prio);
271 for (; prio != 0; prio--)
272 if (mqhp->mq_mask & (1u << prio))
273 break;
274 mqhp->mq_curmaxprio = prio;
276 *tailpp = 0;
280 * Copy the message, and put the buffer back on the free list.
282 (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
283 curbuf->msg_next = mqhp->mq_freep;
284 mqhp->mq_freep = currentp;
286 return (curbuf->msg_len);
290 static void
291 mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
293 uint64_t currentp;
294 msghdr_t *curbuf;
295 uint64_t *headpp;
296 uint64_t *tailpp;
298 MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
301 * Grab a free message block, and link it in. We shouldn't
302 * be here unless there is room in the queue for us; it's
303 * fair to assert that the free pointer is non-NULL.
305 currentp = mqhp->mq_freep;
306 MQ_ASSERT_PTR(mqhp, currentp);
307 curbuf = MQ_PTR(mqhp, currentp);
310 * Remove a message from the free list, and copy in the new contents.
312 mqhp->mq_freep = curbuf->msg_next;
313 curbuf->msg_next = 0;
314 (void) memcpy((char *)&curbuf[1], msgp, len);
315 curbuf->msg_len = len;
317 headpp = HEAD_PTR(mqhp, prio);
318 tailpp = TAIL_PTR(mqhp, prio);
320 if (*tailpp == 0) {
322 * This is the first message on this queue. Set the
323 * head and tail pointers, and tip the appropriate bit
324 * in the priority mask.
326 *headpp = currentp;
327 *tailpp = currentp;
328 mqhp->mq_mask |= (1u << prio);
329 if (prio > mqhp->mq_curmaxprio)
330 mqhp->mq_curmaxprio = prio;
331 } else {
332 MQ_ASSERT_PTR(mqhp, *tailpp);
333 MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
334 *tailpp = currentp;
339 * Send a notification and also delete the registration.
341 static void
342 do_notify(mqhdr_t *mqhp)
344 (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
345 if (mqhp->mq_ntype == SIGEV_THREAD ||
346 mqhp->mq_ntype == SIGEV_PORT)
347 (void) sem_post(&mqhp->mq_spawner);
348 mqhp->mq_ntype = 0;
349 mqhp->mq_des = 0;
353 * Called when the mq_exclusive lock draws EOWNERDEAD or ENOTRECOVERABLE.
354 * Wake up anyone waiting on mq_*send() or mq_*receive() and ensure that
355 * they fail with errno == EBADMSG. Trigger any registered notification.
357 static void
358 owner_dead(mqdes_t *mqdp, int error)
360 mqhdr_t *mqhp = mqdp->mqd_mq;
362 mqdp->mqd_ownerdead = 1;
363 (void) sem_post(&mqhp->mq_notfull);
364 (void) sem_post(&mqhp->mq_notempty);
365 if (error == EOWNERDEAD) {
366 if (mqhp->mq_sigid.sn_pid != 0)
367 do_notify(mqhp);
368 (void) mutex_unlock(&mqhp->mq_exclusive);
370 errno = EBADMSG;
373 mqd_t
374 mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
376 va_list ap;
377 mode_t mode = 0;
378 struct mq_attr *attr = NULL;
379 int fd;
380 int err;
381 int cr_flag = 0;
382 int locked = 0;
383 uint64_t total_size;
384 size_t msgsize;
385 ssize_t maxmsg;
386 uint64_t temp;
387 void *ptr;
388 mqdes_t *mqdp;
389 mqhdr_t *mqhp;
390 struct mq_dn *mqdnp;
392 if (__pos4obj_check(path) == -1)
393 return ((mqd_t)-1);
395 /* acquire MSGQ lock to have atomic operation */
396 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
397 goto out;
398 locked = 1;
400 va_start(ap, oflag);
401 /* filter oflag to have READ/WRITE/CREATE modes only */
402 oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
403 if ((oflag & O_CREAT) != 0) {
404 mode = va_arg(ap, mode_t);
405 attr = va_arg(ap, struct mq_attr *);
407 va_end(ap);
409 if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
410 mode, &cr_flag)) < 0)
411 goto out;
413 /* closing permission file */
414 (void) __close_nc(fd);
416 /* Try to open/create data file */
417 if (cr_flag) {
418 cr_flag = PFILE_CREATE;
419 if (attr == NULL) {
420 maxmsg = MQ_MAXMSG;
421 msgsize = MQ_MAXSIZE;
422 } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
423 errno = EINVAL;
424 goto out;
425 } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
426 errno = ENOSPC;
427 goto out;
428 } else {
429 maxmsg = attr->mq_maxmsg;
430 msgsize = attr->mq_msgsize;
433 /* adjust for message size at word boundary */
434 temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
436 total_size = sizeof (mqhdr_t) +
437 maxmsg * (temp + sizeof (msghdr_t)) +
438 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
440 if (total_size > SSIZE_MAX) {
441 errno = ENOSPC;
442 goto out;
446 * data file is opened with read/write to those
447 * who have read or write permission
449 mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
450 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
451 (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
452 goto out;
454 cr_flag |= DFILE_CREATE | DFILE_OPEN;
456 /* force permissions to avoid umask effect */
457 if (fchmod(fd, mode) < 0)
458 goto out;
460 if (ftruncate64(fd, (off64_t)total_size) < 0)
461 goto out;
462 } else {
463 if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
464 O_RDWR, 0666, &err)) < 0)
465 goto out;
466 cr_flag = DFILE_OPEN;
468 /* Message queue has not been initialized yet */
469 if (read(fd, &total_size, sizeof (total_size)) !=
470 sizeof (total_size) || total_size == 0) {
471 errno = ENOENT;
472 goto out;
475 /* Message queue too big for this process to handle */
476 if (total_size > SSIZE_MAX) {
477 errno = EFBIG;
478 goto out;
482 if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
483 errno = ENOMEM;
484 goto out;
486 cr_flag |= ALLOC_MEM;
488 if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
489 MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
490 goto out;
491 mqhp = ptr;
492 cr_flag |= DFILE_MMAP;
494 /* closing data file */
495 (void) __close_nc(fd);
496 cr_flag &= ~DFILE_OPEN;
499 * create, unlink, size, mmap, and close description file
500 * all for a flag word in anonymous shared memory
502 if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
503 0666, &err)) < 0)
504 goto out;
505 cr_flag |= DFILE_OPEN;
506 (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
507 if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
508 goto out;
510 if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
511 PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
512 goto out;
513 mqdnp = ptr;
514 cr_flag |= MQDNP_MMAP;
516 (void) __close_nc(fd);
517 cr_flag &= ~DFILE_OPEN;
519 mqdp->mqd_flags = FFLAGS(oflag) & (FREAD|FWRITE);
520 mqdnp->mqdn_flags = FFLAGS(oflag) & (FNONBLOCK);
522 /* new message queue requires initialization */
523 if ((cr_flag & DFILE_CREATE) != 0) {
524 /* message queue header has to be initialized */
525 mq_init(mqhp, msgsize, maxmsg);
526 mqhp->mq_totsize = total_size;
528 mqdp->mqd_mq = mqhp;
529 mqdp->mqd_mqdn = mqdnp;
530 mqdp->mqd_magic = MQ_MAGIC;
531 mqdp->mqd_tcd = NULL;
532 mqdp->mqd_ownerdead = 0;
533 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
534 lmutex_lock(&mq_list_lock);
535 mqdp->mqd_next = mq_list;
536 mqdp->mqd_prev = NULL;
537 if (mq_list)
538 mq_list->mqd_prev = mqdp;
539 mq_list = mqdp;
540 lmutex_unlock(&mq_list_lock);
541 return ((mqd_t)mqdp);
544 locked = 0; /* fall into the error case */
545 out:
546 err = errno;
547 if ((cr_flag & DFILE_OPEN) != 0)
548 (void) __close_nc(fd);
549 if ((cr_flag & DFILE_CREATE) != 0)
550 (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
551 if ((cr_flag & PFILE_CREATE) != 0)
552 (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
553 if ((cr_flag & ALLOC_MEM) != 0)
554 free((void *)mqdp);
555 if ((cr_flag & DFILE_MMAP) != 0)
556 (void) munmap((caddr_t)mqhp, (size_t)total_size);
557 if ((cr_flag & MQDNP_MMAP) != 0)
558 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
559 if (locked)
560 (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
561 errno = err;
562 return ((mqd_t)-1);
565 static void
566 mq_close_cleanup(mqdes_t *mqdp)
568 mqhdr_t *mqhp = mqdp->mqd_mq;
569 struct mq_dn *mqdnp = mqdp->mqd_mqdn;
571 /* invalidate the descriptor before freeing it */
572 mqdp->mqd_magic = 0;
573 if (!mqdp->mqd_ownerdead)
574 (void) mutex_unlock(&mqhp->mq_exclusive);
576 lmutex_lock(&mq_list_lock);
577 if (mqdp->mqd_next)
578 mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
579 if (mqdp->mqd_prev)
580 mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
581 if (mq_list == mqdp)
582 mq_list = mqdp->mqd_next;
583 lmutex_unlock(&mq_list_lock);
585 free(mqdp);
586 (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
587 (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
591 mq_close(mqd_t mqdes)
593 mqdes_t *mqdp = (mqdes_t *)mqdes;
594 mqhdr_t *mqhp;
595 thread_communication_data_t *tcdp;
596 int error;
598 if (!mq_is_valid(mqdp)) {
599 errno = EBADF;
600 return (-1);
603 mqhp = mqdp->mqd_mq;
604 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
605 mqdp->mqd_ownerdead = 1;
606 if (error == EOWNERDEAD)
607 (void) mutex_unlock(&mqhp->mq_exclusive);
608 /* carry on regardless, without holding mq_exclusive */
611 if (mqhp->mq_des == (uintptr_t)mqdp &&
612 mqhp->mq_sigid.sn_pid == getpid()) {
613 /* notification is set for this descriptor, remove it */
614 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
615 mqhp->mq_ntype = 0;
616 mqhp->mq_des = 0;
619 pthread_cleanup_push(mq_close_cleanup, mqdp);
620 if ((tcdp = mqdp->mqd_tcd) != NULL) {
621 mqdp->mqd_tcd = NULL;
622 del_sigev_mq(tcdp); /* possible cancellation point */
624 pthread_cleanup_pop(1); /* finish in the cleanup handler */
626 return (0);
630 mq_unlink(const char *path)
632 int err;
634 if (__pos4obj_check(path) < 0)
635 return (-1);
637 if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
638 return (-1);
641 err = __pos4obj_unlink(path, MQ_PERM_TYPE);
643 if (err == 0 || (err == -1 && errno == EEXIST)) {
644 errno = 0;
645 err = __pos4obj_unlink(path, MQ_DATA_TYPE);
648 if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
649 return (-1);
651 return (err);
655 static int
656 __mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
657 uint_t msg_prio, const timespec_t *timeout, int abs_rel)
659 mqdes_t *mqdp = (mqdes_t *)mqdes;
660 mqhdr_t *mqhp;
661 int err;
662 int notify = 0;
665 * sem_*wait() does cancellation, if called.
666 * pthread_testcancel() ensures that cancellation takes place if
667 * there is a cancellation pending when mq_*send() is called.
669 pthread_testcancel();
671 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
672 errno = EBADF;
673 return (-1);
676 mqhp = mqdp->mqd_mq;
678 if (msg_prio >= mqhp->mq_maxprio) {
679 errno = EINVAL;
680 return (-1);
682 if (msg_len > mqhp->mq_maxsz) {
683 errno = EMSGSIZE;
684 return (-1);
687 if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
688 err = sem_trywait(&mqhp->mq_notfull);
689 else {
691 * We might get cancelled here...
693 if (timeout == NULL)
694 err = sem_wait(&mqhp->mq_notfull);
695 else if (abs_rel == ABS_TIME)
696 err = sem_timedwait(&mqhp->mq_notfull, timeout);
697 else
698 err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
700 if (err == -1) {
702 * errno has been set to EAGAIN / EINTR / ETIMEDOUT
703 * by sem_*wait(), so we can just return.
705 return (-1);
709 * By the time we're here, we know that we've got the capacity
710 * to add to the queue...now acquire the exclusive lock.
712 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
713 owner_dead(mqdp, err);
714 return (-1);
718 * Now determine if we want to kick the notification. POSIX
719 * requires that if a process has registered for notification,
720 * we must kick it when the queue makes an empty to non-empty
721 * transition, and there are no blocked receivers. Note that
722 * this mechanism does _not_ guarantee that the kicked process
723 * will be able to receive a message without blocking;
724 * another receiver could intervene in the meantime. Thus,
725 * the notification mechanism is inherently racy; all we can
726 * do is hope to minimize the window as much as possible.
727 * In general, we want to avoid kicking the notification when
728 * there are clearly receivers blocked. We'll determine if
729 * we want to kick the notification before the mq_putmsg(),
730 * but the actual signotify() won't be done until the message
731 * is on the queue.
733 if (mqhp->mq_sigid.sn_pid != 0) {
734 int nmessages, nblocked;
736 (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
737 (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
739 if (nmessages == 0 && nblocked == 0)
740 notify = 1;
743 mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
744 (void) sem_post(&mqhp->mq_notempty);
746 if (notify) {
747 /* notify and also delete the registration */
748 do_notify(mqhp);
751 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
752 (void) mutex_unlock(&mqhp->mq_exclusive);
754 return (0);
758 mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
760 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
761 NULL, ABS_TIME));
765 mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
766 uint_t msg_prio, const timespec_t *abs_timeout)
768 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
769 abs_timeout, ABS_TIME));
773 mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
774 uint_t msg_prio, const timespec_t *rel_timeout)
776 return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
777 rel_timeout, REL_TIME));
780 static void
781 decrement_rblocked(mqhdr_t *mqhp)
783 int cancel_state;
785 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
786 while (sem_wait(&mqhp->mq_rblocked) == -1)
787 continue;
788 (void) pthread_setcancelstate(cancel_state, NULL);
791 static ssize_t
792 __mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
793 uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
795 mqdes_t *mqdp = (mqdes_t *)mqdes;
796 mqhdr_t *mqhp;
797 ssize_t msg_size;
798 int err;
801 * sem_*wait() does cancellation, if called.
802 * pthread_testcancel() ensures that cancellation takes place if
803 * there is a cancellation pending when mq_*receive() is called.
805 pthread_testcancel();
807 if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
808 errno = EBADF;
809 return (ssize_t)(-1);
812 mqhp = mqdp->mqd_mq;
814 if (msg_len < mqhp->mq_maxsz) {
815 errno = EMSGSIZE;
816 return (ssize_t)(-1);
820 * The semaphoring scheme for mq_[timed]receive is a little hairy
821 * thanks to POSIX.1b's arcane notification mechanism. First,
822 * we try to take the common case and do a sem_trywait().
823 * If that doesn't work, and O_NONBLOCK hasn't been set,
824 * then note that we're going to sleep by incrementing the rblocked
825 * semaphore. We decrement that semaphore after waking up.
827 if (sem_trywait(&mqhp->mq_notempty) == -1) {
828 if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
830 * errno has been set to EAGAIN or EINTR by
831 * sem_trywait(), so we can just return.
833 return (-1);
836 * If we're here, then we're probably going to block...
837 * increment the rblocked semaphore. If we get
838 * cancelled, decrement_rblocked() will decrement it.
840 (void) sem_post(&mqhp->mq_rblocked);
842 pthread_cleanup_push(decrement_rblocked, mqhp);
843 if (timeout == NULL)
844 err = sem_wait(&mqhp->mq_notempty);
845 else if (abs_rel == ABS_TIME)
846 err = sem_timedwait(&mqhp->mq_notempty, timeout);
847 else
848 err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
849 pthread_cleanup_pop(1);
851 if (err == -1) {
853 * We took a signal or timeout while waiting
854 * on mq_notempty...
856 return (-1);
860 if ((err = mutex_lock(&mqhp->mq_exclusive)) != 0) {
861 owner_dead(mqdp, err);
862 return (-1);
864 msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
865 (void) sem_post(&mqhp->mq_notfull);
866 MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
867 (void) mutex_unlock(&mqhp->mq_exclusive);
869 return (msg_size);
872 ssize_t
873 mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
875 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
876 NULL, ABS_TIME));
879 ssize_t
880 mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
881 uint_t *msg_prio, const timespec_t *abs_timeout)
883 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
884 abs_timeout, ABS_TIME));
887 ssize_t
888 mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
889 uint_t *msg_prio, const timespec_t *rel_timeout)
891 return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
892 rel_timeout, REL_TIME));
896 * Only used below, in mq_notify().
897 * We already have a spawner thread.
898 * Verify that the attributes match; cancel it if necessary.
900 static int
901 cancel_if_necessary(thread_communication_data_t *tcdp,
902 const struct sigevent *sigevp)
904 int do_cancel = !pthread_attr_equal(tcdp->tcd_attrp,
905 sigevp->sigev_notify_attributes);
907 if (do_cancel) {
909 * Attributes don't match, cancel the spawner thread.
911 (void) pthread_cancel(tcdp->tcd_server_id);
912 } else {
914 * Reuse the existing spawner thread with possibly
915 * changed notification function and value.
917 tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
918 tcdp->tcd_notif.sigev_signo = 0;
919 tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
920 tcdp->tcd_notif.sigev_notify_function =
921 sigevp->sigev_notify_function;
924 return (do_cancel);
928 mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
930 mqdes_t *mqdp = (mqdes_t *)mqdes;
931 mqhdr_t *mqhp;
932 thread_communication_data_t *tcdp;
933 siginfo_t mq_siginfo;
934 struct sigevent sigevent;
935 struct stat64 statb;
936 port_notify_t *pn;
937 void *userval;
938 int rval = -1;
939 int ntype;
940 int port;
941 int error;
943 if (!mq_is_valid(mqdp)) {
944 errno = EBADF;
945 return (-1);
948 mqhp = mqdp->mqd_mq;
950 if ((error = mutex_lock(&mqhp->mq_exclusive)) != 0) {
951 mqdp->mqd_ownerdead = 1;
952 sigevp = NULL;
953 if (error == EOWNERDEAD)
954 (void) mutex_unlock(&mqhp->mq_exclusive);
955 /* carry on regardless, without holding mq_exclusive */
958 if (sigevp == NULL) { /* remove notification */
959 if (mqhp->mq_des == (uintptr_t)mqdp &&
960 mqhp->mq_sigid.sn_pid == getpid()) {
961 /* notification is set for this descriptor, remove it */
962 (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
963 if ((tcdp = mqdp->mqd_tcd) != NULL) {
964 sig_mutex_lock(&tcdp->tcd_lock);
965 if (tcdp->tcd_msg_enabled) {
966 /* cancel the spawner thread */
967 tcdp = mqdp->mqd_tcd;
968 mqdp->mqd_tcd = NULL;
969 (void) pthread_cancel(
970 tcdp->tcd_server_id);
972 sig_mutex_unlock(&tcdp->tcd_lock);
974 mqhp->mq_ntype = 0;
975 mqhp->mq_des = 0;
976 } else {
977 /* notification is not set for this descriptor */
978 errno = EBUSY;
979 goto bad;
981 } else { /* register notification with this process */
982 switch (ntype = sigevp->sigev_notify) {
983 case SIGEV_THREAD:
984 userval = sigevp->sigev_value.sival_ptr;
985 port = -1;
986 break;
987 case SIGEV_PORT:
988 pn = sigevp->sigev_value.sival_ptr;
989 userval = pn->portnfy_user;
990 port = pn->portnfy_port;
991 if (fstat64(port, &statb) != 0 ||
992 !S_ISPORT(statb.st_mode)) {
993 errno = EBADF;
994 goto bad;
996 (void) memset(&sigevent, 0, sizeof (sigevent));
997 sigevent.sigev_notify = SIGEV_PORT;
998 sigevp = &sigevent;
999 break;
1001 switch (ntype) {
1002 case SIGEV_NONE:
1003 mq_siginfo.si_signo = 0;
1004 mq_siginfo.si_code = SI_MESGQ;
1005 break;
1006 case SIGEV_SIGNAL:
1007 mq_siginfo.si_signo = sigevp->sigev_signo;
1008 mq_siginfo.si_value = sigevp->sigev_value;
1009 mq_siginfo.si_code = SI_MESGQ;
1010 break;
1011 case SIGEV_THREAD:
1012 if ((tcdp = mqdp->mqd_tcd) != NULL &&
1013 cancel_if_necessary(tcdp, sigevp))
1014 mqdp->mqd_tcd = NULL;
1015 /* FALLTHROUGH */
1016 case SIGEV_PORT:
1017 if ((tcdp = mqdp->mqd_tcd) == NULL) {
1018 /* we must create a spawner thread */
1019 tcdp = setup_sigev_handler(sigevp, MQ);
1020 if (tcdp == NULL) {
1021 errno = EBADF;
1022 goto bad;
1024 tcdp->tcd_msg_enabled = 0;
1025 tcdp->tcd_msg_closing = 0;
1026 tcdp->tcd_msg_avail = &mqhp->mq_spawner;
1027 if (launch_spawner(tcdp) != 0) {
1028 free_sigev_handler(tcdp);
1029 goto bad;
1031 mqdp->mqd_tcd = tcdp;
1033 mq_siginfo.si_signo = 0;
1034 mq_siginfo.si_code = SI_MESGQ;
1035 break;
1036 default:
1037 errno = EINVAL;
1038 goto bad;
1041 /* register notification */
1042 if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
1043 goto bad;
1044 mqhp->mq_ntype = ntype;
1045 mqhp->mq_des = (uintptr_t)mqdp;
1046 switch (ntype) {
1047 case SIGEV_THREAD:
1048 case SIGEV_PORT:
1049 tcdp->tcd_port = port;
1050 tcdp->tcd_msg_object = mqdp;
1051 tcdp->tcd_msg_userval = userval;
1052 sig_mutex_lock(&tcdp->tcd_lock);
1053 tcdp->tcd_msg_enabled = ntype;
1054 sig_mutex_unlock(&tcdp->tcd_lock);
1055 (void) cond_broadcast(&tcdp->tcd_cv);
1056 break;
1060 rval = 0; /* success */
1061 bad:
1062 if (error == 0) {
1063 (void) mutex_unlock(&mqhp->mq_exclusive);
1064 } else {
1065 errno = EBADMSG;
1066 rval = -1;
1068 return (rval);
1072 mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
1074 mqdes_t *mqdp = (mqdes_t *)mqdes;
1075 mqhdr_t *mqhp;
1076 uint_t flag = 0;
1078 if (!mq_is_valid(mqdp)) {
1079 errno = EBADF;
1080 return (-1);
1083 /* store current attributes */
1084 if (omqstat != NULL) {
1085 int count;
1087 mqhp = mqdp->mqd_mq;
1088 omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1089 omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1090 omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1091 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1092 omqstat->mq_curmsgs = count;
1095 /* set description attributes */
1096 if ((mqstat->mq_flags & O_NONBLOCK) != 0)
1097 flag = FNONBLOCK;
1098 mqdp->mqd_mqdn->mqdn_flags = flag;
1100 return (0);
1104 mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
1106 mqdes_t *mqdp = (mqdes_t *)mqdes;
1107 mqhdr_t *mqhp;
1108 int count;
1110 if (!mq_is_valid(mqdp)) {
1111 errno = EBADF;
1112 return (-1);
1115 mqhp = mqdp->mqd_mq;
1117 mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
1118 mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
1119 mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
1120 (void) sem_getvalue(&mqhp->mq_notempty, &count);
1121 mqstat->mq_curmsgs = count;
1122 return (0);
1126 * Cleanup after fork1() in the child process.
1128 void
1129 postfork1_child_sigev_mq(void)
1131 thread_communication_data_t *tcdp;
1132 mqdes_t *mqdp;
1134 for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
1135 if ((tcdp = mqdp->mqd_tcd) != NULL) {
1136 mqdp->mqd_tcd = NULL;
1137 tcd_teardown(tcdp);