1 /* fhandler_mqueue.cc: fhandler for POSIX message queue
3 This file is part of Cygwin.
5 This software is a copyrighted work licensed under the terms of the
6 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
10 #include "shared_info.h"
17 #include <sys/param.h>
19 #define MSGSIZE(i) roundup((i), sizeof(long))
23 struct mq_attr defattr
= { 0, 10, 8192, 0 }; /* Linux defaults. */
25 fhandler_mqueue::fhandler_mqueue () :
28 filebuf
= (char *) ccalloc_abort (HEAP_BUF
, 1, FILESIZE
);
31 fhandler_mqueue::~fhandler_mqueue ()
37 fhandler_mqueue::valid_path ()
39 const char *posix_basename
= get_name () + MQ_LEN
;
40 size_t len
= strlen (posix_basename
);
41 if (len
> 0 && len
<= NAME_MAX
&& !strpbrk (posix_basename
, "/\\"))
47 fhandler_mqueue::open (int flags
, mode_t mode
)
54 /* FIXME: reopen by handle semantics missing yet */
55 flags
&= ~(O_NOCTTY
| O_PATH
| O_BINARY
| O_TEXT
);
56 return mq_open (flags
, mode
, NULL
);
60 fhandler_mqueue::mq_open (int oflags
, mode_t mode
, struct mq_attr
*attr
)
64 PUNICODE_STRING mqstream
;
66 struct mq_info
*mqinfo
= NULL
;
69 if ((oflags
& ~(O_ACCMODE
| O_CLOEXEC
| O_CREAT
| O_EXCL
| O_NONBLOCK
))
70 || (oflags
& O_ACCMODE
) == O_ACCMODE
)
76 /* attach a stream suffix to the NT filename, thus creating a stream. */
77 mqstream
= pc
.get_nt_native_path (&ro_u_mq_suffix
);
78 pc
.get_object_attr (oa
, sec_none_nih
);
83 /* Create and disallow sharing */
84 status
= NtCreateFile (&get_handle (),
85 GENERIC_READ
| GENERIC_WRITE
| DELETE
86 | SYNCHRONIZE
, &oa
, &io
, NULL
,
87 FILE_ATTRIBUTE_NORMAL
, FILE_SHARE_DELETE
,
89 FILE_OPEN_FOR_BACKUP_INTENT
90 | FILE_SYNCHRONOUS_IO_NONALERT
,
92 if (!NT_SUCCESS (status
))
94 if (status
== STATUS_OBJECT_NAME_COLLISION
&& (oflags
& O_EXCL
) == 0)
96 __seterrno_from_nt_status (status
);
100 set_created_file_access (get_handle (), pc
, mode
);
105 /* Open the file, and loop while detecting a sharing violation. */
108 status
= NtOpenFile (&get_handle (),
109 GENERIC_READ
| GENERIC_WRITE
| SYNCHRONIZE
,
110 &oa
, &io
, FILE_SHARE_VALID_FLAGS
,
111 FILE_OPEN_FOR_BACKUP_INTENT
112 | FILE_SYNCHRONOUS_IO_NONALERT
);
113 if (NT_SUCCESS (status
))
115 if (status
== STATUS_OBJECT_NAME_NOT_FOUND
&& (oflags
& O_CREAT
))
117 if (status
!= STATUS_SHARING_VIOLATION
)
119 __seterrno_from_nt_status (status
);
125 /* We need the filename without STREAM_SUFFIX later on */
126 mqstream
->Length
-= ro_u_mq_suffix
.Length
;
127 mqstream
->Buffer
[mqstream
->Length
/ sizeof (WCHAR
)] = L
'\0';
133 /* Check minimum and maximum values. The max values are pretty much
134 arbitrary, taken from the linux mq_overview man page, up to Linux
135 3.4. These max values make sure that the internal mq_fattr
136 structure can use 32 bit types. */
137 if (attr
->mq_maxmsg
<= 0 || attr
->mq_maxmsg
> 32768
138 || attr
->mq_msgsize
<= 0 || attr
->mq_msgsize
> 1048576)
141 mqinfo
= mqinfo_create (attr
, mode
, oflags
& O_NONBLOCK
);
144 mqinfo
= mqinfo_open (oflags
& O_NONBLOCK
);
145 mq_open_finish (mqinfo
!= NULL
, created
);
146 /* Set fhandler open flags */
149 set_access (GENERIC_READ
| SYNCHRONIZE
);
150 close_on_exec (true);
151 set_flags (oflags
| O_CLOEXEC
, O_BINARY
);
154 return mqinfo
? 1 : 0;
158 fhandler_mqueue::_mqinfo (SIZE_T filesize
, mode_t mode
, int flags
,
161 WCHAR buf
[NAME_MAX
+ sizeof ("mqueue/XXX")];
162 UNICODE_STRING uname
;
163 OBJECT_ATTRIBUTES oa
;
165 LARGE_INTEGER fsiz
= { QuadPart
: (LONGLONG
) filesize
};
168 /* Set sectsize prior to using filesize in NtMapViewOfSection. It will
169 get pagesize aligned, which breaks the next NtMapViewOfSection in fork. */
170 mqinfo ()->mqi_sectsize
= filesize
;
171 mqinfo ()->mqi_mode
= mode
;
172 set_nonblocking (flags
& O_NONBLOCK
);
174 __small_swprintf (buf
, L
"mqueue/mtx%s", get_name ());
175 RtlInitUnicodeString (&uname
, buf
);
176 InitializeObjectAttributes (&oa
, &uname
, OBJ_OPENIF
| OBJ_CASE_INSENSITIVE
,
177 get_shared_parent_dir (),
178 everyone_sd (CYG_MUTANT_ACCESS
));
179 status
= NtCreateMutant (&mqinfo ()->mqi_lock
, CYG_MUTANT_ACCESS
, &oa
,
181 if (!NT_SUCCESS (status
))
184 wcsncpy (buf
+ 7, L
"snd", 3);
185 /* same length, no RtlInitUnicodeString required */
186 InitializeObjectAttributes (&oa
, &uname
, OBJ_OPENIF
| OBJ_CASE_INSENSITIVE
,
187 get_shared_parent_dir (),
188 everyone_sd (CYG_EVENT_ACCESS
));
189 status
= NtCreateEvent (&mqinfo ()->mqi_waitsend
, CYG_EVENT_ACCESS
, &oa
,
190 NotificationEvent
, FALSE
);
191 if (!NT_SUCCESS (status
))
193 wcsncpy (buf
+ 7, L
"rcv", 3);
194 /* same length, same attributes, no more init required */
195 status
= NtCreateEvent (&mqinfo ()->mqi_waitrecv
, CYG_EVENT_ACCESS
, &oa
,
196 NotificationEvent
, FALSE
);
197 if (!NT_SUCCESS (status
))
200 InitializeObjectAttributes (&oa
, NULL
, 0, NULL
, NULL
);
201 status
= NtCreateSection (&mqinfo ()->mqi_sect
, SECTION_ALL_ACCESS
, &oa
,
202 &fsiz
, PAGE_READWRITE
, SEC_COMMIT
, get_handle ());
203 if (!NT_SUCCESS (status
))
206 status
= NtMapViewOfSection (mqinfo ()->mqi_sect
, NtCurrentProcess (),
207 &mptr
, 0, filesize
, NULL
, &filesize
,
208 ViewShare
, MEM_TOP_DOWN
, PAGE_READWRITE
);
209 if (!NT_SUCCESS (status
))
212 mqinfo ()->mqi_hdr
= (struct mq_hdr
*) mptr
;
214 /* Special problem on Cygwin. /dev/mqueue is just a simple dir,
215 so there's a chance normal files are created in there. */
216 if (just_open
&& mqinfo ()->mqi_hdr
->mqh_magic
!= MQI_MAGIC
)
218 status
= STATUS_ACCESS_DENIED
;
222 mqinfo ()->mqi_magic
= MQI_MAGIC
;
226 if (mqinfo ()->mqi_sect
)
227 NtClose (mqinfo ()->mqi_sect
);
228 if (mqinfo ()->mqi_waitrecv
)
229 NtClose (mqinfo ()->mqi_waitrecv
);
230 if (mqinfo ()->mqi_waitsend
)
231 NtClose (mqinfo ()->mqi_waitsend
);
232 if (mqinfo ()->mqi_lock
)
233 NtClose (mqinfo ()->mqi_lock
);
234 __seterrno_from_nt_status (status
);
239 fhandler_mqueue::mqinfo_open (int flags
)
241 FILE_STANDARD_INFORMATION fsi
;
246 fsi
.EndOfFile
.QuadPart
= 0;
247 status
= NtQueryInformationFile (get_handle (), &io
, &fsi
, sizeof fsi
,
248 FileStandardInformation
);
249 if (!NT_SUCCESS (status
))
251 __seterrno_from_nt_status (status
);
254 if (get_file_attribute (get_handle (), pc
, mode
, NULL
, NULL
))
255 mode
= STD_RBITS
| STD_WBITS
;
257 return _mqinfo (fsi
.EndOfFile
.QuadPart
, mode
, flags
, true);
261 fhandler_mqueue::mqinfo_create (struct mq_attr
*attr
, mode_t mode
, int flags
)
265 FILE_END_OF_FILE_INFORMATION feofi
;
268 struct mq_info
*mqinfo
= NULL
;
270 msgsize
= MSGSIZE (attr
->mq_msgsize
);
271 filesize
= sizeof (struct mq_hdr
)
272 + (attr
->mq_maxmsg
* (sizeof (struct msg_hdr
) + msgsize
));
273 feofi
.EndOfFile
.QuadPart
= filesize
;
274 status
= NtSetInformationFile (get_handle (), &io
, &feofi
, sizeof feofi
,
275 FileEndOfFileInformation
);
276 if (!NT_SUCCESS (status
))
278 __seterrno_from_nt_status (status
);
282 mqinfo
= _mqinfo (filesize
, mode
, flags
, false);
286 /* Initialize header at beginning of file */
287 /* Create free list with all messages on it */
289 struct mq_hdr
*mqhdr
;
290 struct msg_hdr
*msghdr
;
292 mptr
= (int8_t *) mqinfo
->mqi_hdr
;
293 mqhdr
= mqinfo
->mqi_hdr
;
294 mqhdr
->mqh_attr
.mq_flags
= 0;
295 mqhdr
->mqh_attr
.mq_maxmsg
= attr
->mq_maxmsg
;
296 mqhdr
->mqh_attr
.mq_msgsize
= attr
->mq_msgsize
;
297 mqhdr
->mqh_attr
.mq_curmsgs
= 0;
298 mqhdr
->mqh_nwait
= 0;
301 mqhdr
->mqh_magic
= MQI_MAGIC
;
302 long index
= sizeof (struct mq_hdr
);
303 mqhdr
->mqh_free
= index
;
304 for (int i
= 0; i
< attr
->mq_maxmsg
- 1; i
++)
306 msghdr
= (struct msg_hdr
*) &mptr
[index
];
307 index
+= sizeof (struct msg_hdr
) + msgsize
;
308 msghdr
->msg_next
= index
;
310 msghdr
= (struct msg_hdr
*) &mptr
[index
];
311 msghdr
->msg_next
= 0; /* end of free list */
318 fhandler_mqueue::mq_open_finish (bool success
, bool created
)
322 OBJECT_ATTRIBUTES oa
;
327 /* If we have an open queue stream handle, close it and set it to NULL */
328 HANDLE queue_stream
= get_handle ();
332 /* In case of success, open the default stream for reading. This
333 can be used to implement various IO functions without exposing
334 the actual message queue. */
335 pc
.get_object_attr (oa
, sec_none_nih
);
336 status
= NtOpenFile (&def_stream
, GENERIC_READ
| SYNCHRONIZE
,
337 &oa
, &io
, FILE_SHARE_VALID_FLAGS
,
338 FILE_OPEN_FOR_BACKUP_INTENT
339 | FILE_SYNCHRONOUS_IO_NONALERT
);
340 if (NT_SUCCESS (status
))
341 set_handle (def_stream
);
342 else /* Note that we don't treat this as an error! */
344 debug_printf ("Opening default stream failed: status %y", status
);
350 /* In case of error at creation time, delete the file */
351 FILE_DISPOSITION_INFORMATION disp
= { TRUE
};
353 NtSetInformationFile (queue_stream
, &io
, &disp
, sizeof disp
,
354 FileDispositionInformation
);
355 /* We also have to set the delete disposition on the default stream,
356 otherwise only the queue stream will get deleted */
357 pc
.get_object_attr (oa
, sec_none_nih
);
358 status
= NtOpenFile (&def_stream
, DELETE
, &oa
, &io
,
359 FILE_SHARE_VALID_FLAGS
,
360 FILE_OPEN_FOR_BACKUP_INTENT
);
361 if (NT_SUCCESS (status
))
363 NtSetInformationFile (def_stream
, &io
, &disp
, sizeof disp
,
364 FileDispositionInformation
);
365 NtClose (def_stream
);
368 NtClose (queue_stream
);
373 fhandler_mqueue::get_proc_fd_name (char *buf
)
375 return strcpy (buf
, strrchr (get_name (), '/'));
379 fhandler_mqueue::fcntl (int cmd
, intptr_t arg
)
386 res
= close_on_exec () ? FD_CLOEXEC
: 0;
390 debug_printf ("GETFL: %y", res
);
400 /* Do what fhandler_virtual does for read/lseek */
402 fhandler_mqueue::fill_filebuf ()
404 unsigned long qsize
= 0;
409 if (mutex_lock (mqinfo ()->mqi_lock
, true) == 0)
411 struct mq_hdr
*mqhdr
= mqinfo ()->mqi_hdr
;
412 int8_t *mptr
= (int8_t *) mqhdr
;
413 struct msg_hdr
*msghdr
;
414 for (long index
= mqhdr
->mqh_head
; index
; index
= msghdr
->msg_next
)
416 msghdr
= (struct msg_hdr
*) &mptr
[index
];
417 qsize
+= msghdr
->msg_len
;
421 notify
= mqhdr
->mqh_event
.sigev_notify
;
422 if (notify
== SIGEV_SIGNAL
)
423 signo
= mqhdr
->mqh_event
.sigev_signo
;
424 notify_pid
= mqhdr
->mqh_pid
;
426 mutex_unlock (mqinfo ()->mqi_lock
);
428 /* QSIZE: bytes of all current msgs
429 NOTIFY: sigev_notify if there's a notifier
430 SIGNO: signal number if NOTIFY && sigev_notify == SIGEV_SIGNAL
431 NOTIFY_PID: if NOTIFY pid */
432 snprintf (filebuf
, FILESIZE
,
433 "QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
434 qsize
, notify
, signo
, notify_pid
);
435 filesize
= strlen (filebuf
);
440 fhandler_mqueue::read (void *in_ptr
, size_t& len
)
444 if (!filebuf
[0] && !fill_filebuf ())
449 if ((ssize_t
) len
> filesize
- position
)
450 len
= (size_t) (filesize
- position
);
451 if ((ssize_t
) len
< 0)
454 memcpy (in_ptr
, filebuf
+ position
, len
);
459 fhandler_mqueue::lseek (off_t offset
, int whence
)
461 if (!fill_filebuf ())
472 position
= filesize
+ offset
;
483 fhandler_mqueue::fstat (struct stat
*buf
)
485 int ret
= fhandler_disk_file::fstat (buf
);
488 buf
->st_size
= FILESIZE
;
489 buf
->st_dev
= FH_MQUEUE
;
495 fhandler_mqueue::_dup (HANDLE parent
, fhandler_mqueue
*fhc
)
500 SIZE_T filesize
= mqinfo ()->mqi_sectsize
;
503 if (!DuplicateHandle (parent
, mqinfo ()->mqi_sect
,
504 GetCurrentProcess (), &fhc
->mqinfo ()->mqi_sect
,
505 0, FALSE
, DUPLICATE_SAME_ACCESS
))
507 status
= NtMapViewOfSection (mqinfo ()->mqi_sect
, NtCurrentProcess (),
508 &mptr
, 0, filesize
, NULL
, &filesize
,
509 ViewShare
, MEM_TOP_DOWN
, PAGE_READWRITE
);
510 if (!NT_SUCCESS (status
))
511 api_fatal ("Mapping message queue failed in fork, status 0x%x\n",
514 fhc
->mqinfo ()->mqi_hdr
= (struct mq_hdr
*) mptr
;
515 if (!DuplicateHandle (parent
, mqinfo ()->mqi_waitsend
,
516 GetCurrentProcess (), &fhc
->mqinfo ()->mqi_waitsend
,
517 0, FALSE
, DUPLICATE_SAME_ACCESS
))
519 if (!DuplicateHandle (parent
, mqinfo ()->mqi_waitrecv
,
520 GetCurrentProcess (), &fhc
->mqinfo ()->mqi_waitrecv
,
521 0, FALSE
, DUPLICATE_SAME_ACCESS
))
523 if (!DuplicateHandle (parent
, mqinfo ()->mqi_lock
,
524 GetCurrentProcess (), &fhc
->mqinfo ()->mqi_lock
,
525 0, FALSE
, DUPLICATE_SAME_ACCESS
))
535 fhandler_mqueue::dup (fhandler_base
*child
, int flags
)
537 fhandler_mqueue
*fhc
= (fhandler_mqueue
*) child
;
539 int ret
= fhandler_disk_file::dup (child
, flags
);
542 memcpy (fhc
->filebuf
, filebuf
, FILESIZE
);
543 ret
= _dup (GetCurrentProcess (), fhc
);
549 fhandler_mqueue::fixup_after_fork (HANDLE parent
)
551 if (_dup (parent
, this))
552 api_fatal ("Creating IPC object failed in fork, %E");
556 fhandler_mqueue::ioctl (unsigned int cmd
, void *buf
)
558 return fhandler_base::ioctl (cmd
, buf
);
562 fhandler_mqueue::close ()
566 mqinfo ()->mqi_magic
= 0; /* just in case */
567 NtUnmapViewOfSection (NtCurrentProcess (), mqinfo ()->mqi_hdr
);
568 NtClose (mqinfo ()->mqi_sect
);
569 NtClose (mqinfo ()->mqi_waitsend
);
570 NtClose (mqinfo ()->mqi_waitrecv
);
571 NtClose (mqinfo ()->mqi_lock
);
579 fhandler_mqueue::mutex_lock (HANDLE mtx
, bool eintr
)
581 switch (cygwait (mtx
, cw_infinite
, cw_cancel
| cw_cancel_self
582 | (eintr
? cw_sig_eintr
: cw_sig_restart
)))
585 case WAIT_ABANDONED_0
:
593 return geterrno_from_win_error ();
597 fhandler_mqueue::mutex_unlock (HANDLE mtx
)
599 return ReleaseMutex (mtx
) ? 0 : geterrno_from_win_error ();
603 fhandler_mqueue::cond_timedwait (HANDLE evt
, HANDLE mtx
,
604 const struct timespec
*abstime
)
606 HANDLE w4
[4] = { evt
, };
611 wait_signal_arrived
here (w4
[1]);
612 if ((w4
[cnt
] = pthread::get_cancel_event ()) != NULL
)
616 if (!valid_timespec (*abstime
))
619 /* If a timeout is set, we create a waitable timer to wait for.
620 This is the easiest way to handle the absolute timeout value, given
621 that NtSetTimer also takes absolute times and given the double
622 dependency on evt *and* mtx, which requires to call WFMO twice. */
624 LARGE_INTEGER duetime
;
627 status
= NtCreateTimer (&w4
[timer_idx
], TIMER_ALL_ACCESS
, NULL
,
629 if (!NT_SUCCESS (status
))
630 return geterrno_from_nt_status (status
);
631 timespec_to_filetime (abstime
, &duetime
);
632 status
= NtSetTimer (w4
[timer_idx
], &duetime
, NULL
, NULL
, FALSE
, 0, NULL
);
633 if (!NT_SUCCESS (status
))
635 NtClose (w4
[timer_idx
]);
636 return geterrno_from_nt_status (status
);
640 if ((ret
= mutex_unlock (mtx
)) != 0)
642 /* Everything's set up, so now wait for the event to be signalled. */
644 switch (WaitForMultipleObjects (cnt
, w4
, FALSE
, INFINITE
))
648 case WAIT_OBJECT_0
+ 1:
649 if (_my_tls
.call_signal_handler ())
653 case WAIT_OBJECT_0
+ 2:
655 pthread::static_cancel_self ();
657 case WAIT_OBJECT_0
+ 3:
661 ret
= geterrno_from_win_error ();
666 /* At this point we need to lock the mutex. The wait is practically
667 the same as before, just that we now wait on the mutex instead of the
671 switch (WaitForMultipleObjects (cnt
, w4
, FALSE
, INFINITE
))
674 case WAIT_ABANDONED_0
:
676 case WAIT_OBJECT_0
+ 1:
677 if (_my_tls
.call_signal_handler ())
681 case WAIT_OBJECT_0
+ 2:
683 pthread_testcancel ();
685 case WAIT_OBJECT_0
+ 3:
689 ret
= geterrno_from_win_error ();
695 if (ret
!= ETIMEDOUT
)
696 NtCancelTimer (w4
[timer_idx
], NULL
);
697 NtClose (w4
[timer_idx
]);
703 fhandler_mqueue::cond_signal (HANDLE evt
)
709 fhandler_mqueue::mq_getattr (struct mq_attr
*mqstat
)
712 struct mq_hdr
*mqhdr
;
713 struct mq_fattr
*attr
;
717 mqhdr
= mqinfo ()->mqi_hdr
;
718 attr
= &mqhdr
->mqh_attr
;
719 if ((n
= mutex_lock (mqinfo ()->mqi_lock
, false)) != 0)
724 mqstat
->mq_flags
= is_nonblocking () ? O_NONBLOCK
: 0; /* per-open */
725 mqstat
->mq_maxmsg
= attr
->mq_maxmsg
; /* remaining three per-queue */
726 mqstat
->mq_msgsize
= attr
->mq_msgsize
;
727 mqstat
->mq_curmsgs
= attr
->mq_curmsgs
;
729 mutex_unlock (mqinfo ()->mqi_lock
);
738 fhandler_mqueue::mq_setattr (const struct mq_attr
*mqstat
,
739 struct mq_attr
*omqstat
)
742 struct mq_hdr
*mqhdr
;
743 struct mq_fattr
*attr
;
747 mqhdr
= mqinfo ()->mqi_hdr
;
748 attr
= &mqhdr
->mqh_attr
;
749 if ((n
= mutex_lock (mqinfo ()->mqi_lock
, false)) != 0)
757 omqstat
->mq_flags
= is_nonblocking () ? O_NONBLOCK
: 0;
758 omqstat
->mq_maxmsg
= attr
->mq_maxmsg
;
759 omqstat
->mq_msgsize
= attr
->mq_msgsize
;
760 omqstat
->mq_curmsgs
= attr
->mq_curmsgs
; /* and current status */
763 set_nonblocking (mqstat
->mq_flags
& O_NONBLOCK
);
765 mutex_unlock (mqinfo ()->mqi_lock
);
774 fhandler_mqueue::mq_notify (const struct sigevent
*notification
)
778 struct mq_hdr
*mqhdr
;
782 mqhdr
= mqinfo ()->mqi_hdr
;
783 if ((n
= mutex_lock (mqinfo ()->mqi_lock
, false)) != 0)
792 if (mqhdr
->mqh_pid
== pid
)
793 mqhdr
->mqh_pid
= 0; /* unregister calling process */
797 if (mqhdr
->mqh_pid
!= 0)
799 if (kill (mqhdr
->mqh_pid
, 0) != -1 || errno
!= ESRCH
)
802 mutex_unlock (mqinfo ()->mqi_lock
);
806 mqhdr
->mqh_pid
= pid
;
807 mqhdr
->mqh_event
= *notification
;
809 mutex_unlock (mqinfo ()->mqi_lock
);
818 fhandler_mqueue::mq_timedsend (const char *ptr
, size_t len
, unsigned int prio
,
819 const struct timespec
*abstime
)
822 long index
, freeindex
;
824 struct sigevent
*sigev
;
825 struct mq_hdr
*mqhdr
;
826 struct mq_fattr
*attr
;
827 struct msg_hdr
*msghdr
, *nmsghdr
, *pmsghdr
;
828 bool mutex_locked
= false;
831 pthread_testcancel ();
835 if (prio
>= MQ_PRIO_MAX
)
841 mqhdr
= mqinfo ()->mqi_hdr
; /* struct pointer */
842 mptr
= (int8_t *) mqhdr
; /* byte pointer */
843 attr
= &mqhdr
->mqh_attr
;
844 if ((n
= mutex_lock (mqinfo ()->mqi_lock
, true)) != 0)
850 if (len
> (size_t) attr
->mq_msgsize
)
852 set_errno (EMSGSIZE
);
855 if (attr
->mq_curmsgs
== 0)
857 if (mqhdr
->mqh_pid
!= 0 && mqhdr
->mqh_nwait
== 0)
859 sigev
= &mqhdr
->mqh_event
;
860 if (sigev
->sigev_notify
== SIGEV_SIGNAL
)
861 sigqueue (mqhdr
->mqh_pid
, sigev
->sigev_signo
,
863 mqhdr
->mqh_pid
= 0; /* unregister */
866 else if (attr
->mq_curmsgs
>= attr
->mq_maxmsg
)
869 if (is_nonblocking ())
874 /* Wait for room for one message on the queue */
875 while (attr
->mq_curmsgs
>= attr
->mq_maxmsg
)
877 int ret
= cond_timedwait (mqinfo ()->mqi_waitsend
,
878 mqinfo ()->mqi_lock
, abstime
);
887 /* nmsghdr will point to new message */
888 if ((freeindex
= mqhdr
->mqh_free
) == 0)
889 api_fatal ("mq_send: curmsgs = %ld; free = 0", attr
->mq_curmsgs
);
891 nmsghdr
= (struct msg_hdr
*) &mptr
[freeindex
];
892 nmsghdr
->msg_prio
= prio
;
893 nmsghdr
->msg_len
= len
;
894 memcpy (nmsghdr
+ 1, ptr
, len
); /* copy message from caller */
895 mqhdr
->mqh_free
= nmsghdr
->msg_next
; /* new freelist head */
897 /* Find right place for message in linked list */
898 index
= mqhdr
->mqh_head
;
899 pmsghdr
= (struct msg_hdr
*) &(mqhdr
->mqh_head
);
902 msghdr
= (struct msg_hdr
*) &mptr
[index
];
903 if (prio
> msghdr
->msg_prio
)
905 nmsghdr
->msg_next
= index
;
906 pmsghdr
->msg_next
= freeindex
;
909 index
= msghdr
->msg_next
;
914 /* Queue was empty or new goes at end of list */
915 pmsghdr
->msg_next
= freeindex
;
916 nmsghdr
->msg_next
= 0;
918 /* Wake up anyone blocked in mq_receive waiting for a message */
919 if (attr
->mq_curmsgs
== 0)
920 cond_signal (mqinfo ()->mqi_waitrecv
);
928 mutex_unlock (mqinfo ()->mqi_lock
);
933 fhandler_mqueue::mq_timedrecv (char *ptr
, size_t maxlen
, unsigned int *priop
,
934 const struct timespec
*abstime
)
940 struct mq_hdr
*mqhdr
;
941 struct mq_fattr
*attr
;
942 struct msg_hdr
*msghdr
;
943 bool mutex_locked
= false;
945 pthread_testcancel ();
949 mqhdr
= mqinfo ()->mqi_hdr
; /* struct pointer */
950 mptr
= (int8_t *) mqhdr
; /* byte pointer */
951 attr
= &mqhdr
->mqh_attr
;
952 if ((n
= mutex_lock (mqinfo ()->mqi_lock
, true)) != 0)
958 if (maxlen
< (size_t) attr
->mq_msgsize
)
960 set_errno (EMSGSIZE
);
963 if (attr
->mq_curmsgs
== 0) /* queue is empty */
965 if (is_nonblocking ())
970 /* Wait for a message to be placed onto queue */
972 while (attr
->mq_curmsgs
== 0)
974 int ret
= cond_timedwait (mqinfo ()->mqi_waitrecv
,
975 mqinfo ()->mqi_lock
, abstime
);
985 if ((index
= mqhdr
->mqh_head
) == 0)
986 api_fatal ("mq_receive: curmsgs = %ld; head = 0", attr
->mq_curmsgs
);
988 msghdr
= (struct msg_hdr
*) &mptr
[index
];
989 mqhdr
->mqh_head
= msghdr
->msg_next
; /* new head of list */
990 len
= msghdr
->msg_len
;
991 memcpy(ptr
, msghdr
+ 1, len
); /* copy the message itself */
993 *priop
= msghdr
->msg_prio
;
995 /* Just-read message goes to front of free list */
996 msghdr
->msg_next
= mqhdr
->mqh_free
;
997 mqhdr
->mqh_free
= index
;
999 /* Wake up anyone blocked in mq_send waiting for room */
1000 if (attr
->mq_curmsgs
== attr
->mq_maxmsg
)
1001 cond_signal (mqinfo ()->mqi_waitsend
);
1007 mutex_unlock (mqinfo ()->mqi_lock
);