Cygwin: sched_setscheduler: allow changes of the priority
[newlib-cygwin.git] / winsup / cygwin / fhandler / mqueue.cc
blob2bf2cb0ea74bb1fb5177a27ac15abf2f1dfcf47e
1 /* fhandler_mqueue.cc: fhandler for POSIX message queue
3 This file is part of Cygwin.
5 This software is a copyrighted work licensed under the terms of the
6 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
7 details. */
9 #include "winsup.h"
10 #include "shared_info.h"
11 #include "path.h"
12 #include "fhandler.h"
13 #include "dtable.h"
14 #include "clock.h"
15 #include <stdio.h>
16 #include <mqueue.h>
17 #include <sys/param.h>
19 #define MSGSIZE(i) roundup((i), sizeof(long))
21 #define FILESIZE 80
23 struct mq_attr defattr = { 0, 10, 8192, 0 }; /* Linux defaults. */
25 fhandler_mqueue::fhandler_mqueue () :
26 fhandler_disk_file ()
28 filebuf = (char *) ccalloc_abort (HEAP_BUF, 1, FILESIZE);
31 fhandler_mqueue::~fhandler_mqueue ()
33 cfree (filebuf);
36 bool
37 fhandler_mqueue::valid_path ()
39 const char *posix_basename = get_name () + MQ_LEN;
40 size_t len = strlen (posix_basename);
41 if (len > 0 && len <= NAME_MAX && !strpbrk (posix_basename, "/\\"))
42 return true;
43 return false;
46 int
47 fhandler_mqueue::open (int flags, mode_t mode)
49 if (!valid_path ())
51 set_errno (EINVAL);
52 return 0;
54 /* FIXME: reopen by handle semantics missing yet */
55 flags &= ~(O_NOCTTY | O_PATH | O_BINARY | O_TEXT);
56 return mq_open (flags, mode, NULL);
59 int
60 fhandler_mqueue::mq_open (int oflags, mode_t mode, struct mq_attr *attr)
62 NTSTATUS status;
63 IO_STATUS_BLOCK io;
64 PUNICODE_STRING mqstream;
65 OBJECT_ATTRIBUTES oa;
66 struct mq_info *mqinfo = NULL;
67 bool created = false;
69 if ((oflags & ~(O_ACCMODE | O_CLOEXEC | O_CREAT | O_EXCL | O_NONBLOCK))
70 || (oflags & O_ACCMODE) == O_ACCMODE)
72 set_errno (EINVAL);
73 return 0;
76 /* attach a stream suffix to the NT filename, thus creating a stream. */
77 mqstream = pc.get_nt_native_path (&ro_u_mq_suffix);
78 pc.get_object_attr (oa, sec_none_nih);
80 again:
81 if (oflags & O_CREAT)
83 /* Create and disallow sharing */
84 status = NtCreateFile (&get_handle (),
85 GENERIC_READ | GENERIC_WRITE | DELETE
86 | SYNCHRONIZE, &oa, &io, NULL,
87 FILE_ATTRIBUTE_NORMAL, FILE_SHARE_DELETE,
88 FILE_CREATE,
89 FILE_OPEN_FOR_BACKUP_INTENT
90 | FILE_SYNCHRONOUS_IO_NONALERT,
91 NULL, 0);
92 if (!NT_SUCCESS (status))
94 if (status == STATUS_OBJECT_NAME_COLLISION && (oflags & O_EXCL) == 0)
95 goto exists;
96 __seterrno_from_nt_status (status);
97 return 0;
99 if (pc.has_acls ())
100 set_created_file_access (get_handle (), pc, mode);
101 created = true;
102 goto out;
104 exists:
105 /* Open the file, and loop while detecting a sharing violation. */
106 while (true)
108 status = NtOpenFile (&get_handle (),
109 GENERIC_READ | GENERIC_WRITE | SYNCHRONIZE,
110 &oa, &io, FILE_SHARE_VALID_FLAGS,
111 FILE_OPEN_FOR_BACKUP_INTENT
112 | FILE_SYNCHRONOUS_IO_NONALERT);
113 if (NT_SUCCESS (status))
114 break;
115 if (status == STATUS_OBJECT_NAME_NOT_FOUND && (oflags & O_CREAT))
116 goto again;
117 if (status != STATUS_SHARING_VIOLATION)
119 __seterrno_from_nt_status (status);
120 return 0;
122 Sleep (100L);
124 out:
125 /* We need the filename without STREAM_SUFFIX later on */
126 mqstream->Length -= ro_u_mq_suffix.Length;
127 mqstream->Buffer[mqstream->Length / sizeof (WCHAR)] = L'\0';
129 if (created)
131 if (attr == NULL)
132 attr = &defattr;
133 /* Check minimum and maximum values. The max values are pretty much
134 arbitrary, taken from the linux mq_overview man page, up to Linux
135 3.4. These max values make sure that the internal mq_fattr
136 structure can use 32 bit types. */
137 if (attr->mq_maxmsg <= 0 || attr->mq_maxmsg > 32768
138 || attr->mq_msgsize <= 0 || attr->mq_msgsize > 1048576)
139 set_errno (EINVAL);
140 else
141 mqinfo = mqinfo_create (attr, mode, oflags & O_NONBLOCK);
143 else
144 mqinfo = mqinfo_open (oflags & O_NONBLOCK);
145 mq_open_finish (mqinfo != NULL, created);
146 /* Set fhandler open flags */
147 if (mqinfo)
149 set_access (GENERIC_READ | SYNCHRONIZE);
150 close_on_exec (true);
151 set_flags (oflags | O_CLOEXEC, O_BINARY);
152 set_open_status ();
154 return mqinfo ? 1 : 0;
157 struct mq_info *
158 fhandler_mqueue::_mqinfo (SIZE_T filesize, mode_t mode, int flags,
159 bool just_open)
161 WCHAR buf[NAME_MAX + sizeof ("mqueue/XXX")];
162 UNICODE_STRING uname;
163 OBJECT_ATTRIBUTES oa;
164 NTSTATUS status;
165 LARGE_INTEGER fsiz = { QuadPart: (LONGLONG) filesize };
166 PVOID mptr = NULL;
168 /* Set sectsize prior to using filesize in NtMapViewOfSection. It will
169 get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */
170 mqinfo ()->mqi_sectsize = filesize;
171 mqinfo ()->mqi_mode = mode;
172 set_nonblocking (flags & O_NONBLOCK);
174 __small_swprintf (buf, L"mqueue/mtx%s", get_name ());
175 RtlInitUnicodeString (&uname, buf);
176 InitializeObjectAttributes (&oa, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
177 get_shared_parent_dir (),
178 everyone_sd (CYG_MUTANT_ACCESS));
179 status = NtCreateMutant (&mqinfo ()->mqi_lock, CYG_MUTANT_ACCESS, &oa,
180 FALSE);
181 if (!NT_SUCCESS (status))
182 goto err;
184 wcsncpy (buf + 7, L"snd", 3);
185 /* same length, no RtlInitUnicodeString required */
186 InitializeObjectAttributes (&oa, &uname, OBJ_OPENIF | OBJ_CASE_INSENSITIVE,
187 get_shared_parent_dir (),
188 everyone_sd (CYG_EVENT_ACCESS));
189 status = NtCreateEvent (&mqinfo ()->mqi_waitsend, CYG_EVENT_ACCESS, &oa,
190 NotificationEvent, FALSE);
191 if (!NT_SUCCESS (status))
192 goto err;
193 wcsncpy (buf + 7, L"rcv", 3);
194 /* same length, same attributes, no more init required */
195 status = NtCreateEvent (&mqinfo ()->mqi_waitrecv, CYG_EVENT_ACCESS, &oa,
196 NotificationEvent, FALSE);
197 if (!NT_SUCCESS (status))
198 goto err;
200 InitializeObjectAttributes (&oa, NULL, 0, NULL, NULL);
201 status = NtCreateSection (&mqinfo ()->mqi_sect, SECTION_ALL_ACCESS, &oa,
202 &fsiz, PAGE_READWRITE, SEC_COMMIT, get_handle ());
203 if (!NT_SUCCESS (status))
204 goto err;
206 status = NtMapViewOfSection (mqinfo ()->mqi_sect, NtCurrentProcess (),
207 &mptr, 0, filesize, NULL, &filesize,
208 ViewShare, MEM_TOP_DOWN, PAGE_READWRITE);
209 if (!NT_SUCCESS (status))
210 goto err;
212 mqinfo ()->mqi_hdr = (struct mq_hdr *) mptr;
214 /* Special problem on Cygwin. /dev/mqueue is just a simple dir,
215 so there's a chance normal files are created in there. */
216 if (just_open && mqinfo ()->mqi_hdr->mqh_magic != MQI_MAGIC)
218 status = STATUS_ACCESS_DENIED;
219 goto err;
222 mqinfo ()->mqi_magic = MQI_MAGIC;
223 return mqinfo ();
225 err:
226 if (mqinfo ()->mqi_sect)
227 NtClose (mqinfo ()->mqi_sect);
228 if (mqinfo ()->mqi_waitrecv)
229 NtClose (mqinfo ()->mqi_waitrecv);
230 if (mqinfo ()->mqi_waitsend)
231 NtClose (mqinfo ()->mqi_waitsend);
232 if (mqinfo ()->mqi_lock)
233 NtClose (mqinfo ()->mqi_lock);
234 __seterrno_from_nt_status (status);
235 return NULL;
238 struct mq_info *
239 fhandler_mqueue::mqinfo_open (int flags)
241 FILE_STANDARD_INFORMATION fsi;
242 IO_STATUS_BLOCK io;
243 NTSTATUS status;
244 mode_t mode;
246 fsi.EndOfFile.QuadPart = 0;
247 status = NtQueryInformationFile (get_handle (), &io, &fsi, sizeof fsi,
248 FileStandardInformation);
249 if (!NT_SUCCESS (status))
251 __seterrno_from_nt_status (status);
252 return NULL;
254 if (get_file_attribute (get_handle (), pc, mode, NULL, NULL))
255 mode = STD_RBITS | STD_WBITS;
257 return _mqinfo (fsi.EndOfFile.QuadPart, mode, flags, true);
260 struct mq_info *
261 fhandler_mqueue::mqinfo_create (struct mq_attr *attr, mode_t mode, int flags)
263 long msgsize;
264 off_t filesize = 0;
265 FILE_END_OF_FILE_INFORMATION feofi;
266 IO_STATUS_BLOCK io;
267 NTSTATUS status;
268 struct mq_info *mqinfo = NULL;
270 msgsize = MSGSIZE (attr->mq_msgsize);
271 filesize = sizeof (struct mq_hdr)
272 + (attr->mq_maxmsg * (sizeof (struct msg_hdr) + msgsize));
273 feofi.EndOfFile.QuadPart = filesize;
274 status = NtSetInformationFile (get_handle (), &io, &feofi, sizeof feofi,
275 FileEndOfFileInformation);
276 if (!NT_SUCCESS (status))
278 __seterrno_from_nt_status (status);
279 return NULL;
282 mqinfo = _mqinfo (filesize, mode, flags, false);
284 if (mqinfo)
286 /* Initialize header at beginning of file */
287 /* Create free list with all messages on it */
288 int8_t *mptr;
289 struct mq_hdr *mqhdr;
290 struct msg_hdr *msghdr;
292 mptr = (int8_t *) mqinfo->mqi_hdr;
293 mqhdr = mqinfo->mqi_hdr;
294 mqhdr->mqh_attr.mq_flags = 0;
295 mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
296 mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
297 mqhdr->mqh_attr.mq_curmsgs = 0;
298 mqhdr->mqh_nwait = 0;
299 mqhdr->mqh_pid = 0;
300 mqhdr->mqh_head = 0;
301 mqhdr->mqh_magic = MQI_MAGIC;
302 long index = sizeof (struct mq_hdr);
303 mqhdr->mqh_free = index;
304 for (int i = 0; i < attr->mq_maxmsg - 1; i++)
306 msghdr = (struct msg_hdr *) &mptr[index];
307 index += sizeof (struct msg_hdr) + msgsize;
308 msghdr->msg_next = index;
310 msghdr = (struct msg_hdr *) &mptr[index];
311 msghdr->msg_next = 0; /* end of free list */
314 return mqinfo;
317 void
318 fhandler_mqueue::mq_open_finish (bool success, bool created)
320 NTSTATUS status;
321 HANDLE def_stream;
322 OBJECT_ATTRIBUTES oa;
323 IO_STATUS_BLOCK io;
325 if (get_handle ())
327 /* If we have an open queue stream handle, close it and set it to NULL */
328 HANDLE queue_stream = get_handle ();
329 set_handle (NULL);
330 if (success)
332 /* In case of success, open the default stream for reading. This
333 can be used to implement various IO functions without exposing
334 the actual message queue. */
335 pc.get_object_attr (oa, sec_none_nih);
336 status = NtOpenFile (&def_stream, GENERIC_READ | SYNCHRONIZE,
337 &oa, &io, FILE_SHARE_VALID_FLAGS,
338 FILE_OPEN_FOR_BACKUP_INTENT
339 | FILE_SYNCHRONOUS_IO_NONALERT);
340 if (NT_SUCCESS (status))
341 set_handle (def_stream);
342 else /* Note that we don't treat this as an error! */
344 debug_printf ("Opening default stream failed: status %y", status);
345 nohandle (true);
348 else if (created)
350 /* In case of error at creation time, delete the file */
351 FILE_DISPOSITION_INFORMATION disp = { TRUE };
353 NtSetInformationFile (queue_stream, &io, &disp, sizeof disp,
354 FileDispositionInformation);
355 /* We also have to set the delete disposition on the default stream,
356 otherwise only the queue stream will get deleted */
357 pc.get_object_attr (oa, sec_none_nih);
358 status = NtOpenFile (&def_stream, DELETE, &oa, &io,
359 FILE_SHARE_VALID_FLAGS,
360 FILE_OPEN_FOR_BACKUP_INTENT);
361 if (NT_SUCCESS (status))
363 NtSetInformationFile (def_stream, &io, &disp, sizeof disp,
364 FileDispositionInformation);
365 NtClose (def_stream);
368 NtClose (queue_stream);
372 char *
373 fhandler_mqueue::get_proc_fd_name (char *buf)
375 return strcpy (buf, strrchr (get_name (), '/'));
379 fhandler_mqueue::fcntl (int cmd, intptr_t arg)
381 int res;
383 switch (cmd)
385 case F_GETFD:
386 res = close_on_exec () ? FD_CLOEXEC : 0;
387 break;
388 case F_GETFL:
389 res = get_flags ();
390 debug_printf ("GETFL: %y", res);
391 break;
392 default:
393 set_errno (EINVAL);
394 res = -1;
395 break;
397 return res;
400 /* Do what fhandler_virtual does for read/lseek */
401 bool
402 fhandler_mqueue::fill_filebuf ()
404 unsigned long qsize = 0;
405 int notify = 0;
406 int signo = 0;
407 int notify_pid = 0;
409 if (mutex_lock (mqinfo ()->mqi_lock, true) == 0)
411 struct mq_hdr *mqhdr = mqinfo ()->mqi_hdr;
412 int8_t *mptr = (int8_t *) mqhdr;
413 struct msg_hdr *msghdr;
414 for (long index = mqhdr->mqh_head; index; index = msghdr->msg_next)
416 msghdr = (struct msg_hdr *) &mptr[index];
417 qsize += msghdr->msg_len;
419 if (mqhdr->mqh_pid)
421 notify = mqhdr->mqh_event.sigev_notify;
422 if (notify == SIGEV_SIGNAL)
423 signo = mqhdr->mqh_event.sigev_signo;
424 notify_pid = mqhdr->mqh_pid;
426 mutex_unlock (mqinfo ()->mqi_lock);
428 /* QSIZE: bytes of all current msgs
429 NOTIFY: sigev_notify if there's a notifier
430 SIGNO: signal number if NOTIFY && sigev_notify == SIGEV_SIGNAL
431 NOTIFY_PID: if NOTIFY pid */
432 snprintf (filebuf, FILESIZE,
433 "QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
434 qsize, notify, signo, notify_pid);
435 filesize = strlen (filebuf);
436 return true;
439 void
440 fhandler_mqueue::read (void *in_ptr, size_t& len)
442 if (len == 0)
443 return;
444 if (!filebuf[0] && !fill_filebuf ())
446 len = (size_t) -1;
447 return;
449 if ((ssize_t) len > filesize - position)
450 len = (size_t) (filesize - position);
451 if ((ssize_t) len < 0)
452 len = 0;
453 else
454 memcpy (in_ptr, filebuf + position, len);
455 position += len;
458 off_t
459 fhandler_mqueue::lseek (off_t offset, int whence)
461 if (!fill_filebuf ())
462 return (off_t) -1;
463 switch (whence)
465 case SEEK_SET:
466 position = offset;
467 break;
468 case SEEK_CUR:
469 position += offset;
470 break;
471 case SEEK_END:
472 position = filesize + offset;
473 break;
474 default:
475 set_errno (EINVAL);
476 return (off_t) -1;
478 return position;
483 fhandler_mqueue::fstat (struct stat *buf)
485 int ret = fhandler_disk_file::fstat (buf);
486 if (!ret)
488 buf->st_size = FILESIZE;
489 buf->st_dev = FH_MQUEUE;
491 return ret;
495 fhandler_mqueue::_dup (HANDLE parent, fhandler_mqueue *fhc)
497 __try
499 PVOID mptr = NULL;
500 SIZE_T filesize = mqinfo ()->mqi_sectsize;
501 NTSTATUS status;
503 if (!DuplicateHandle (parent, mqinfo ()->mqi_sect,
504 GetCurrentProcess (), &fhc->mqinfo ()->mqi_sect,
505 0, FALSE, DUPLICATE_SAME_ACCESS))
506 __leave;
507 status = NtMapViewOfSection (mqinfo ()->mqi_sect, NtCurrentProcess (),
508 &mptr, 0, filesize, NULL, &filesize,
509 ViewShare, MEM_TOP_DOWN, PAGE_READWRITE);
510 if (!NT_SUCCESS (status))
511 api_fatal ("Mapping message queue failed in fork, status 0x%x\n",
512 status);
514 fhc->mqinfo ()->mqi_hdr = (struct mq_hdr *) mptr;
515 if (!DuplicateHandle (parent, mqinfo ()->mqi_waitsend,
516 GetCurrentProcess (), &fhc->mqinfo ()->mqi_waitsend,
517 0, FALSE, DUPLICATE_SAME_ACCESS))
518 __leave;
519 if (!DuplicateHandle (parent, mqinfo ()->mqi_waitrecv,
520 GetCurrentProcess (), &fhc->mqinfo ()->mqi_waitrecv,
521 0, FALSE, DUPLICATE_SAME_ACCESS))
522 __leave;
523 if (!DuplicateHandle (parent, mqinfo ()->mqi_lock,
524 GetCurrentProcess (), &fhc->mqinfo ()->mqi_lock,
525 0, FALSE, DUPLICATE_SAME_ACCESS))
526 __leave;
527 return 0;
529 __except (EFAULT) {}
530 __endtry
531 return -1;
535 fhandler_mqueue::dup (fhandler_base *child, int flags)
537 fhandler_mqueue *fhc = (fhandler_mqueue *) child;
539 int ret = fhandler_disk_file::dup (child, flags);
540 if (!ret)
542 memcpy (fhc->filebuf, filebuf, FILESIZE);
543 ret = _dup (GetCurrentProcess (), fhc);
545 return ret;
548 void
549 fhandler_mqueue::fixup_after_fork (HANDLE parent)
551 if (_dup (parent, this))
552 api_fatal ("Creating IPC object failed in fork, %E");
556 fhandler_mqueue::ioctl (unsigned int cmd, void *buf)
558 return fhandler_base::ioctl (cmd, buf);
562 fhandler_mqueue::close ()
564 __try
566 mqinfo ()->mqi_magic = 0; /* just in case */
567 NtUnmapViewOfSection (NtCurrentProcess (), mqinfo ()->mqi_hdr);
568 NtClose (mqinfo ()->mqi_sect);
569 NtClose (mqinfo ()->mqi_waitsend);
570 NtClose (mqinfo ()->mqi_waitrecv);
571 NtClose (mqinfo ()->mqi_lock);
573 __except (0) {}
574 __endtry
575 return 0;
579 fhandler_mqueue::mutex_lock (HANDLE mtx, bool eintr)
581 switch (cygwait (mtx, cw_infinite, cw_cancel | cw_cancel_self
582 | (eintr ? cw_sig_eintr : cw_sig_restart)))
584 case WAIT_OBJECT_0:
585 case WAIT_ABANDONED_0:
586 return 0;
587 case WAIT_SIGNALED:
588 set_errno (EINTR);
589 return 1;
590 default:
591 break;
593 return geterrno_from_win_error ();
597 fhandler_mqueue::mutex_unlock (HANDLE mtx)
599 return ReleaseMutex (mtx) ? 0 : geterrno_from_win_error ();
603 fhandler_mqueue::cond_timedwait (HANDLE evt, HANDLE mtx,
604 const struct timespec *abstime)
606 HANDLE w4[4] = { evt, };
607 DWORD cnt = 2;
608 DWORD timer_idx = 0;
609 int ret = 0;
611 wait_signal_arrived here (w4[1]);
612 if ((w4[cnt] = pthread::get_cancel_event ()) != NULL)
613 ++cnt;
614 if (abstime)
616 if (!valid_timespec (*abstime))
617 return EINVAL;
619 /* If a timeout is set, we create a waitable timer to wait for.
620 This is the easiest way to handle the absolute timeout value, given
621 that NtSetTimer also takes absolute times and given the double
622 dependency on evt *and* mtx, which requires to call WFMO twice. */
623 NTSTATUS status;
624 LARGE_INTEGER duetime;
626 timer_idx = cnt++;
627 status = NtCreateTimer (&w4[timer_idx], TIMER_ALL_ACCESS, NULL,
628 NotificationTimer);
629 if (!NT_SUCCESS (status))
630 return geterrno_from_nt_status (status);
631 timespec_to_filetime (abstime, &duetime);
632 status = NtSetTimer (w4[timer_idx], &duetime, NULL, NULL, FALSE, 0, NULL);
633 if (!NT_SUCCESS (status))
635 NtClose (w4[timer_idx]);
636 return geterrno_from_nt_status (status);
639 ResetEvent (evt);
640 if ((ret = mutex_unlock (mtx)) != 0)
641 return ret;
642 /* Everything's set up, so now wait for the event to be signalled. */
643 restart1:
644 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
646 case WAIT_OBJECT_0:
647 break;
648 case WAIT_OBJECT_0 + 1:
649 if (_my_tls.call_signal_handler ())
650 goto restart1;
651 ret = EINTR;
652 break;
653 case WAIT_OBJECT_0 + 2:
654 if (timer_idx != 2)
655 pthread::static_cancel_self ();
656 fallthrough;
657 case WAIT_OBJECT_0 + 3:
658 ret = ETIMEDOUT;
659 break;
660 default:
661 ret = geterrno_from_win_error ();
662 break;
664 if (ret == 0)
666 /* At this point we need to lock the mutex. The wait is practically
667 the same as before, just that we now wait on the mutex instead of the
668 event. */
669 restart2:
670 w4[0] = mtx;
671 switch (WaitForMultipleObjects (cnt, w4, FALSE, INFINITE))
673 case WAIT_OBJECT_0:
674 case WAIT_ABANDONED_0:
675 break;
676 case WAIT_OBJECT_0 + 1:
677 if (_my_tls.call_signal_handler ())
678 goto restart2;
679 ret = EINTR;
680 break;
681 case WAIT_OBJECT_0 + 2:
682 if (timer_idx != 2)
683 pthread_testcancel ();
684 fallthrough;
685 case WAIT_OBJECT_0 + 3:
686 ret = ETIMEDOUT;
687 break;
688 default:
689 ret = geterrno_from_win_error ();
690 break;
693 if (timer_idx)
695 if (ret != ETIMEDOUT)
696 NtCancelTimer (w4[timer_idx], NULL);
697 NtClose (w4[timer_idx]);
699 return ret;
702 void
703 fhandler_mqueue::cond_signal (HANDLE evt)
705 SetEvent (evt);
709 fhandler_mqueue::mq_getattr (struct mq_attr *mqstat)
711 int n;
712 struct mq_hdr *mqhdr;
713 struct mq_fattr *attr;
715 __try
717 mqhdr = mqinfo ()->mqi_hdr;
718 attr = &mqhdr->mqh_attr;
719 if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
721 errno = n;
722 __leave;
724 mqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0; /* per-open */
725 mqstat->mq_maxmsg = attr->mq_maxmsg; /* remaining three per-queue */
726 mqstat->mq_msgsize = attr->mq_msgsize;
727 mqstat->mq_curmsgs = attr->mq_curmsgs;
729 mutex_unlock (mqinfo ()->mqi_lock);
730 return 0;
732 __except (EBADF) {}
733 __endtry
734 return -1;
738 fhandler_mqueue::mq_setattr (const struct mq_attr *mqstat,
739 struct mq_attr *omqstat)
741 int n;
742 struct mq_hdr *mqhdr;
743 struct mq_fattr *attr;
745 __try
747 mqhdr = mqinfo ()->mqi_hdr;
748 attr = &mqhdr->mqh_attr;
749 if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
751 errno = n;
752 __leave;
755 if (omqstat != NULL)
757 omqstat->mq_flags = is_nonblocking () ? O_NONBLOCK : 0;
758 omqstat->mq_maxmsg = attr->mq_maxmsg;
759 omqstat->mq_msgsize = attr->mq_msgsize;
760 omqstat->mq_curmsgs = attr->mq_curmsgs; /* and current status */
763 set_nonblocking (mqstat->mq_flags & O_NONBLOCK);
765 mutex_unlock (mqinfo ()->mqi_lock);
766 return 0;
768 __except (EBADF) {}
769 __endtry
770 return -1;
774 fhandler_mqueue::mq_notify (const struct sigevent *notification)
776 int n;
777 pid_t pid;
778 struct mq_hdr *mqhdr;
780 __try
782 mqhdr = mqinfo ()->mqi_hdr;
783 if ((n = mutex_lock (mqinfo ()->mqi_lock, false)) != 0)
785 errno = n;
786 __leave;
789 pid = myself->pid;
790 if (!notification)
792 if (mqhdr->mqh_pid == pid)
793 mqhdr->mqh_pid = 0; /* unregister calling process */
795 else
797 if (mqhdr->mqh_pid != 0)
799 if (kill (mqhdr->mqh_pid, 0) != -1 || errno != ESRCH)
801 set_errno (EBUSY);
802 mutex_unlock (mqinfo ()->mqi_lock);
803 __leave;
806 mqhdr->mqh_pid = pid;
807 mqhdr->mqh_event = *notification;
809 mutex_unlock (mqinfo ()->mqi_lock);
810 return 0;
812 __except (EBADF) {}
813 __endtry
814 return -1;
818 fhandler_mqueue::mq_timedsend (const char *ptr, size_t len, unsigned int prio,
819 const struct timespec *abstime)
821 int n;
822 long index, freeindex;
823 int8_t *mptr;
824 struct sigevent *sigev;
825 struct mq_hdr *mqhdr;
826 struct mq_fattr *attr;
827 struct msg_hdr *msghdr, *nmsghdr, *pmsghdr;
828 bool mutex_locked = false;
829 int ret = -1;
831 pthread_testcancel ();
833 __try
835 if (prio >= MQ_PRIO_MAX)
837 set_errno (EINVAL);
838 __leave;
841 mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
842 mptr = (int8_t *) mqhdr; /* byte pointer */
843 attr = &mqhdr->mqh_attr;
844 if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
846 errno = n;
847 __leave;
849 mutex_locked = true;
850 if (len > (size_t) attr->mq_msgsize)
852 set_errno (EMSGSIZE);
853 __leave;
855 if (attr->mq_curmsgs == 0)
857 if (mqhdr->mqh_pid != 0 && mqhdr->mqh_nwait == 0)
859 sigev = &mqhdr->mqh_event;
860 if (sigev->sigev_notify == SIGEV_SIGNAL)
861 sigqueue (mqhdr->mqh_pid, sigev->sigev_signo,
862 sigev->sigev_value);
863 mqhdr->mqh_pid = 0; /* unregister */
866 else if (attr->mq_curmsgs >= attr->mq_maxmsg)
868 /* Queue is full */
869 if (is_nonblocking ())
871 set_errno (EAGAIN);
872 __leave;
874 /* Wait for room for one message on the queue */
875 while (attr->mq_curmsgs >= attr->mq_maxmsg)
877 int ret = cond_timedwait (mqinfo ()->mqi_waitsend,
878 mqinfo ()->mqi_lock, abstime);
879 if (ret != 0)
881 set_errno (ret);
882 __leave;
887 /* nmsghdr will point to new message */
888 if ((freeindex = mqhdr->mqh_free) == 0)
889 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr->mq_curmsgs);
891 nmsghdr = (struct msg_hdr *) &mptr[freeindex];
892 nmsghdr->msg_prio = prio;
893 nmsghdr->msg_len = len;
894 memcpy (nmsghdr + 1, ptr, len); /* copy message from caller */
895 mqhdr->mqh_free = nmsghdr->msg_next; /* new freelist head */
897 /* Find right place for message in linked list */
898 index = mqhdr->mqh_head;
899 pmsghdr = (struct msg_hdr *) &(mqhdr->mqh_head);
900 while (index)
902 msghdr = (struct msg_hdr *) &mptr[index];
903 if (prio > msghdr->msg_prio)
905 nmsghdr->msg_next = index;
906 pmsghdr->msg_next = freeindex;
907 break;
909 index = msghdr->msg_next;
910 pmsghdr = msghdr;
912 if (index == 0)
914 /* Queue was empty or new goes at end of list */
915 pmsghdr->msg_next = freeindex;
916 nmsghdr->msg_next = 0;
918 /* Wake up anyone blocked in mq_receive waiting for a message */
919 if (attr->mq_curmsgs == 0)
920 cond_signal (mqinfo ()->mqi_waitrecv);
921 attr->mq_curmsgs++;
923 ret = 0;
925 __except (EBADF) {}
926 __endtry
927 if (mutex_locked)
928 mutex_unlock (mqinfo ()->mqi_lock);
929 return ret;
932 ssize_t
933 fhandler_mqueue::mq_timedrecv (char *ptr, size_t maxlen, unsigned int *priop,
934 const struct timespec *abstime)
936 int n;
937 long index;
938 int8_t *mptr;
939 ssize_t len = -1;
940 struct mq_hdr *mqhdr;
941 struct mq_fattr *attr;
942 struct msg_hdr *msghdr;
943 bool mutex_locked = false;
945 pthread_testcancel ();
947 __try
949 mqhdr = mqinfo ()->mqi_hdr; /* struct pointer */
950 mptr = (int8_t *) mqhdr; /* byte pointer */
951 attr = &mqhdr->mqh_attr;
952 if ((n = mutex_lock (mqinfo ()->mqi_lock, true)) != 0)
954 errno = n;
955 __leave;
957 mutex_locked = true;
958 if (maxlen < (size_t) attr->mq_msgsize)
960 set_errno (EMSGSIZE);
961 __leave;
963 if (attr->mq_curmsgs == 0) /* queue is empty */
965 if (is_nonblocking ())
967 set_errno (EAGAIN);
968 __leave;
970 /* Wait for a message to be placed onto queue */
971 mqhdr->mqh_nwait++;
972 while (attr->mq_curmsgs == 0)
974 int ret = cond_timedwait (mqinfo ()->mqi_waitrecv,
975 mqinfo ()->mqi_lock, abstime);
976 if (ret != 0)
978 set_errno (ret);
979 __leave;
982 mqhdr->mqh_nwait--;
985 if ((index = mqhdr->mqh_head) == 0)
986 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
988 msghdr = (struct msg_hdr *) &mptr[index];
989 mqhdr->mqh_head = msghdr->msg_next; /* new head of list */
990 len = msghdr->msg_len;
991 memcpy(ptr, msghdr + 1, len); /* copy the message itself */
992 if (priop != NULL)
993 *priop = msghdr->msg_prio;
995 /* Just-read message goes to front of free list */
996 msghdr->msg_next = mqhdr->mqh_free;
997 mqhdr->mqh_free = index;
999 /* Wake up anyone blocked in mq_send waiting for room */
1000 if (attr->mq_curmsgs == attr->mq_maxmsg)
1001 cond_signal (mqinfo ()->mqi_waitsend);
1002 attr->mq_curmsgs--;
1004 __except (EBADF) {}
1005 __endtry
1006 if (mutex_locked)
1007 mutex_unlock (mqinfo ()->mqi_lock);
1008 return len;