libroot_debug: Merge guarded heap into libroot_debug.
[haiku.git] / src / system / kernel / posix / xsi_message_queue.cpp
blob00046cb906cdf6f010913961cbd517213e5ae2ea
1 /*
2 * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
5 * Authors:
6 * Salvatore Benedetto <salvatore.benedetto@gmail.com>
7 */
9 #include <posix/xsi_message_queue.h>
11 #include <new>
13 #include <sys/ipc.h>
14 #include <sys/types.h>
16 #include <OS.h>
18 #include <kernel.h>
19 #include <syscall_restart.h>
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 # define TRACE(x) dprintf x
30 # define TRACE_ERROR(x) dprintf x
31 #else
32 # define TRACE(x) /* nothing */
33 # define TRACE_ERROR(x) dprintf x
34 #endif
36 // Queue for holding blocked threads
37 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
38 queued_thread(Thread *_thread, int32 _message_length)
40 thread(_thread),
41 message_length(_message_length),
42 queued(false)
46 Thread *thread;
47 int32 message_length;
48 bool queued;
51 typedef DoublyLinkedList<queued_thread> ThreadQueue;
54 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
55 queued_message(const void *_message, ssize_t _length)
57 initOK(false),
58 length(_length)
60 message = (char *)malloc(sizeof(char) * _length);
61 if (message == NULL)
62 return;
64 if (user_memcpy(&type, _message, sizeof(long)) != B_OK
65 || user_memcpy(message, (void *)((char *)_message + sizeof(long)),
66 _length) != B_OK) {
67 free(message);
68 return;
70 initOK = true;
73 ~queued_message()
75 if (initOK)
76 free(message);
79 ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
81 if (_length > length)
82 _length = length;
84 if (user_memcpy(_message, &type, sizeof(long)) != B_OK
85 || user_memcpy((void *)((char *)_message + sizeof(long)), message,
86 _length) != B_OK)
87 return B_ERROR;
88 return _length;
91 bool initOK;
92 ssize_t length;
93 char *message;
94 long type;
97 typedef DoublyLinkedList<queued_message> MessageQueue;
99 // Arbitrary limit
100 #define MAX_BYTES_PER_QUEUE 2048
102 class XsiMessageQueue {
103 public:
104 XsiMessageQueue(int flags)
106 fBytesInQueue(0),
107 fThreadsWaitingToReceive(0),
108 fThreadsWaitingToSend(0)
110 mutex_init(&fLock, "XsiMessageQueue private mutex");
111 SetIpcKey((key_t)-1);
112 SetPermissions(flags);
113 // Initialize all fields to zero
114 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
115 fMessageQueue.msg_ctime = (time_t)real_time_clock();
116 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
119 // Implemented after sXsiMessageCount is declared
120 ~XsiMessageQueue();
122 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
124 thread_prepare_to_block(thread, B_CAN_INTERRUPT,
125 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
126 // Unlock the queue before blocking
127 queueLocker->Unlock();
129 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
130 // interruption, we will still be queued. A WakeUpThread() at this point will
131 // call thread_unblock() and might thus screw with our trying to re-lock the
132 // mutex.
133 return thread_block();
136 void DoIpcSet(struct msqid_ds *result)
138 fMessageQueue.msg_perm.uid = result->msg_perm.uid;
139 fMessageQueue.msg_perm.gid = result->msg_perm.gid;
140 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
141 | (result->msg_perm.mode & 0x01ff);
142 fMessageQueue.msg_qbytes = result->msg_qbytes;
143 fMessageQueue.msg_ctime = (time_t)real_time_clock();
146 void Deque(queued_thread *queueEntry, bool waitForMessage)
148 if (queueEntry->queued) {
149 if (waitForMessage) {
150 fWaitingToReceive.Remove(queueEntry);
151 fThreadsWaitingToReceive--;
152 } else {
153 fWaitingToSend.Remove(queueEntry);
154 fThreadsWaitingToSend--;
159 void Enqueue(queued_thread *queueEntry, bool waitForMessage)
161 if (waitForMessage) {
162 fWaitingToReceive.Add(queueEntry);
163 fThreadsWaitingToReceive++;
164 } else {
165 fWaitingToSend.Add(queueEntry);
166 fThreadsWaitingToSend++;
168 queueEntry->queued = true;
171 struct msqid_ds &GetMessageQueue()
173 return fMessageQueue;
176 bool HasPermission() const
178 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
179 return true;
181 uid_t uid = geteuid();
182 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
183 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
184 return true;
186 gid_t gid = getegid();
187 if (gid == fMessageQueue.msg_perm.gid
188 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
189 return true;
191 return false;
194 bool HasReadPermission() const
196 // TODO: fix this
197 return HasPermission();
200 int ID() const
202 return fID;
205 // Implemented after sXsiMessageCount is declared
206 bool Insert(queued_message *message);
208 key_t IpcKey() const
210 return fMessageQueue.msg_perm.key;
213 mutex &Lock()
215 return fLock;
218 msglen_t MaxBytes() const
220 return fMessageQueue.msg_qbytes;
223 // Implemented after sXsiMessageCount is declared
224 queued_message *Remove(long typeRequested);
226 uint32 SequenceNumber() const
228 return fSequenceNumber;
231 // Implemented after sMessageQueueHashTable is declared
232 void SetID();
234 void SetIpcKey(key_t key)
236 fMessageQueue.msg_perm.key = key;
239 void SetPermissions(int flags)
241 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
242 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
243 fMessageQueue.msg_perm.mode = (flags & 0x01ff);
246 void WakeUpThread(bool waitForMessage)
248 if (waitForMessage) {
249 // Wake up all waiting thread for a message
250 // TODO: this can cause starvation for any
251 // very-unlucky-and-slow thread
252 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
253 entry->queued = false;
254 fThreadsWaitingToReceive--;
255 thread_unblock(entry->thread, 0);
257 } else {
258 // Wake up only one thread waiting to send
259 if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
260 entry->queued = false;
261 fThreadsWaitingToSend--;
262 thread_unblock(entry->thread, 0);
267 XsiMessageQueue*& Link()
269 return fLink;
272 private:
273 msglen_t fBytesInQueue;
274 int fID;
275 mutex fLock;
276 MessageQueue fMessage;
277 struct msqid_ds fMessageQueue;
278 uint32 fSequenceNumber;
279 uint32 fThreadsWaitingToReceive;
280 uint32 fThreadsWaitingToSend;
282 ThreadQueue fWaitingToReceive;
283 ThreadQueue fWaitingToSend;
285 XsiMessageQueue* fLink;
289 // Xsi message queue hash table
290 struct MessageQueueHashTableDefinition {
291 typedef int KeyType;
292 typedef XsiMessageQueue ValueType;
294 size_t HashKey (const int key) const
296 return (size_t)key;
299 size_t Hash(XsiMessageQueue *variable) const
301 return (size_t)variable->ID();
304 bool Compare(const int key, XsiMessageQueue *variable) const
306 return (int)key == (int)variable->ID();
309 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
311 return variable->Link();
316 // IPC class
317 class Ipc {
318 public:
319 Ipc(key_t key)
320 : fKey(key),
321 fMessageQueueId(-1)
325 key_t Key() const
327 return fKey;
330 int MessageQueueID() const
332 return fMessageQueueId;
335 void SetMessageQueueID(XsiMessageQueue *messageQueue)
337 fMessageQueueId = messageQueue->ID();
340 Ipc*& Link()
342 return fLink;
345 private:
346 key_t fKey;
347 int fMessageQueueId;
348 Ipc* fLink;
352 struct IpcHashTableDefinition {
353 typedef key_t KeyType;
354 typedef Ipc ValueType;
356 size_t HashKey (const key_t key) const
358 return (size_t)(key);
361 size_t Hash(Ipc *variable) const
363 return (size_t)HashKey(variable->Key());
366 bool Compare(const key_t key, Ipc *variable) const
368 return (key_t)key == (key_t)variable->Key();
371 Ipc*& GetLink(Ipc *variable) const
373 return variable->Link();
377 // Arbitrary limits
378 #define MAX_XSI_MESSAGE 4096
379 #define MAX_XSI_MESSAGE_QUEUE 1024
380 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
381 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
383 static mutex sIpcLock;
384 static mutex sXsiMessageQueueLock;
386 static uint32 sGlobalSequenceNumber = 1;
387 static int32 sXsiMessageCount = 0;
388 static int32 sXsiMessageQueueCount = 0;
391 // #pragma mark -
394 XsiMessageQueue::~XsiMessageQueue()
396 mutex_destroy(&fLock);
398 // Wake up any threads still waiting
399 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
400 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
401 entry->queued = false;
402 thread_unblock(entry->thread, EIDRM);
404 while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
405 entry->queued = false;
406 thread_unblock(entry->thread, EIDRM);
410 // Free up any remaining messages
411 if (fMessageQueue.msg_qnum) {
412 while (queued_message *message = fMessage.RemoveHead()) {
413 atomic_add(&sXsiMessageCount, -1);
414 delete message;
420 bool
421 XsiMessageQueue::Insert(queued_message *message)
423 // The only situation that would make us (potentially) wait
424 // is that we exceed with bytes or with the total number of messages
425 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
426 return true;
428 while (true) {
429 int32 oldCount = atomic_get(&sXsiMessageCount);
430 if (oldCount >= MAX_XSI_MESSAGE)
431 return true;
432 // If another thread updates the counter we keep
433 // iterating
434 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
435 == oldCount)
436 break;
439 fMessage.Add(message);
440 fMessageQueue.msg_qnum++;
441 fMessageQueue.msg_lspid = getpid();
442 fMessageQueue.msg_stime = real_time_clock();
443 fBytesInQueue += message->length;
444 if (fThreadsWaitingToReceive)
445 WakeUpThread(true /* WaitForMessage */);
446 return false;
450 queued_message*
451 XsiMessageQueue::Remove(long typeRequested)
453 queued_message *message = NULL;
454 if (typeRequested < 0) {
455 // Return first message of the lowest type
456 // that is less than or equal to the absolute
457 // value of type requested.
458 MessageQueue::Iterator iterator = fMessage.GetIterator();
459 while (iterator.HasNext()) {
460 queued_message *current = iterator.Next();
461 if (current->type <= -typeRequested) {
462 message = iterator.Remove();
463 break;
466 } else if (typeRequested == 0) {
467 // Return the first message on the queue
468 message = fMessage.RemoveHead();
469 } else {
470 // Return the first message of type requested
471 MessageQueue::Iterator iterator = fMessage.GetIterator();
472 while (iterator.HasNext()) {
473 queued_message *current = iterator.Next();
474 if (current->type == typeRequested) {
475 message = iterator.Remove();
476 break;
481 if (message == NULL)
482 return NULL;
484 fMessageQueue.msg_qnum--;
485 fMessageQueue.msg_lrpid = getpid();
486 fMessageQueue.msg_rtime = real_time_clock();
487 fBytesInQueue -= message->length;
488 atomic_add(&sXsiMessageCount, -1);
489 if (fThreadsWaitingToSend)
490 WakeUpThread(false /* WaitForMessage */);
491 return message;
495 void
496 XsiMessageQueue::SetID()
498 fID = real_time_clock();
499 // The lock is held before calling us
500 while (true) {
501 if (sMessageQueueHashTable.Lookup(fID) == NULL)
502 break;
503 fID++;
505 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
506 fSequenceNumber = sGlobalSequenceNumber;
510 // #pragma mark - Kernel exported API
513 void
514 xsi_msg_init()
516 // Initialize hash tables
517 status_t status = sIpcHashTable.Init();
518 if (status != B_OK)
519 panic("xsi_msg_init() failed to initialize ipc hash table\n");
520 status = sMessageQueueHashTable.Init();
521 if (status != B_OK)
522 panic("xsi_msg_init() failed to initialize message queue hash table\n");
524 mutex_init(&sIpcLock, "global POSIX message queue IPC table");
525 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
529 // #pragma mark - Syscalls
533 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
535 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
536 MutexLocker ipcHashLocker(sIpcLock);
537 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
538 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
539 if (messageQueue == NULL) {
540 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
541 return EINVAL;
543 if (!IS_USER_ADDRESS(buffer)) {
544 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
545 return B_BAD_ADDRESS;
548 // Lock the message queue itself and release both the ipc hash table lock
549 // and the message queue hash table lock _only_ if the command it's not
550 // IPC_RMID, this prevents undesidered situation from happening while
551 // (hopefully) improving the concurrency.
552 MutexLocker messageQueueLocker;
553 if (command != IPC_RMID) {
554 messageQueueLocker.SetTo(&messageQueue->Lock(), false);
555 messageQueueHashLocker.Unlock();
556 ipcHashLocker.Unlock();
557 } else
558 // Since we are going to delete the message queue object
559 // along with its mutex, we can't use a MutexLocker object,
560 // as the mutex itself won't exist on function exit
561 mutex_lock(&messageQueue->Lock());
563 switch (command) {
564 case IPC_STAT: {
565 if (!messageQueue->HasReadPermission()) {
566 TRACE_ERROR(("xsi_msgctl: calling process has not read "
567 "permission on message queue %d, key %d\n", messageQueueID,
568 (int)messageQueue->IpcKey()));
569 return EACCES;
571 struct msqid_ds msg = messageQueue->GetMessageQueue();
572 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
573 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
574 return B_BAD_ADDRESS;
576 break;
579 case IPC_SET: {
580 if (!messageQueue->HasPermission()) {
581 TRACE_ERROR(("xsi_msgctl: calling process has not permission "
582 "on message queue %d, key %d\n", messageQueueID,
583 (int)messageQueue->IpcKey()));
584 return EPERM;
586 struct msqid_ds msg;
587 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
588 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
589 return B_BAD_ADDRESS;
591 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
592 TRACE_ERROR(("xsi_msgctl: user does not have permission to "
593 "increase the maximum number of bytes allowed on queue\n"));
594 return EPERM;
596 if (msg.msg_qbytes == 0) {
597 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
598 return EINVAL;
601 messageQueue->DoIpcSet(&msg);
602 break;
605 case IPC_RMID: {
606 // If this was the command, we are still holding the message
607 // queue hash table lock along with the ipc one, but not the
608 // message queue lock itself. This prevents other process
609 // to try and acquire a destroyed mutex
610 if (!messageQueue->HasPermission()) {
611 TRACE_ERROR(("xsi_msgctl: calling process has not permission "
612 "on message queue %d, key %d\n", messageQueueID,
613 (int)messageQueue->IpcKey()));
614 return EPERM;
616 key_t key = messageQueue->IpcKey();
617 Ipc *ipcKey = NULL;
618 if (key != -1) {
619 ipcKey = sIpcHashTable.Lookup(key);
620 sIpcHashTable.Remove(ipcKey);
622 sMessageQueueHashTable.Remove(messageQueue);
623 // Wake up of any threads waiting on this
624 // queue happens in destructor
625 if (key != -1)
626 delete ipcKey;
627 atomic_add(&sXsiMessageQueueCount, -1);
629 delete messageQueue;
630 break;
633 default:
634 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
635 return EINVAL;
638 return B_OK;
643 _user_xsi_msgget(key_t key, int flags)
645 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
646 XsiMessageQueue *messageQueue = NULL;
647 Ipc *ipcKey = NULL;
648 // Default assumptions
649 bool isPrivate = true;
650 bool create = true;
652 if (key != IPC_PRIVATE) {
653 isPrivate = false;
654 // Check if key already exist, if it does it already has a message
655 // queue associated with it
656 ipcKey = sIpcHashTable.Lookup(key);
657 if (ipcKey == NULL) {
658 if (!(flags & IPC_CREAT)) {
659 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
660 "caller did not ask for creation\n", (int)key));
661 return ENOENT;
663 ipcKey = new(std::nothrow) Ipc(key);
664 if (ipcKey == NULL) {
665 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
666 "for key %d\n", (int)key));
667 return ENOMEM;
669 sIpcHashTable.Insert(ipcKey);
670 } else {
671 // The IPC key exist and it already has a message queue
672 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
673 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
674 return EEXIST;
676 int messageQueueID = ipcKey->MessageQueueID();
678 MutexLocker _(sXsiMessageQueueLock);
679 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
680 if (!messageQueue->HasPermission()) {
681 TRACE_ERROR(("xsi_msgget: calling process has not permission "
682 "on message queue %d, key %d\n", messageQueue->ID(),
683 (int)key));
684 return EACCES;
686 create = false;
690 if (create) {
691 // Create a new message queue for this key
692 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
693 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
694 "message queues\n"));
695 return ENOSPC;
698 messageQueue = new(std::nothrow) XsiMessageQueue(flags);
699 if (messageQueue == NULL) {
700 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
701 "message queue\n"));
702 return ENOMEM;
704 atomic_add(&sXsiMessageQueueCount, 1);
706 MutexLocker _(sXsiMessageQueueLock);
707 messageQueue->SetID();
708 if (isPrivate)
709 messageQueue->SetIpcKey((key_t)-1);
710 else {
711 messageQueue->SetIpcKey(key);
712 ipcKey->SetMessageQueueID(messageQueue);
714 sMessageQueueHashTable.Insert(messageQueue);
717 return messageQueue->ID();
721 ssize_t
722 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
723 size_t messageSize, long messageType, int messageFlags)
725 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
726 messageQueueID, messageSize));
727 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
728 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
729 if (messageQueue == NULL) {
730 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
731 messageQueueID));
732 return EINVAL;
734 MutexLocker messageQueueLocker(messageQueue->Lock());
735 messageQueueHashLocker.Unlock();
737 if (messageSize > MAX_BYTES_PER_QUEUE) {
738 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
739 return EINVAL;
741 if (!messageQueue->HasPermission()) {
742 TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
743 "on message queue id %d, key %d\n", messageQueueID,
744 (int)messageQueue->IpcKey()));
745 return EACCES;
747 if (!IS_USER_ADDRESS(messagePointer)) {
748 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
749 return B_BAD_ADDRESS;
752 queued_message *message = NULL;
753 while (true) {
754 message = messageQueue->Remove(messageType);
756 if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
757 // We are going to sleep
758 Thread *thread = thread_get_current_thread();
759 queued_thread queueEntry(thread, messageSize);
760 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
762 uint32 sequenceNumber = messageQueue->SequenceNumber();
764 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
765 status_t result
766 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
767 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
769 messageQueueHashLocker.Lock();
770 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
771 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
772 && sequenceNumber != messageQueue->SequenceNumber())) {
773 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = "
774 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
775 sequenceNumber));
776 return EIDRM;
777 } else if (result == B_INTERRUPTED) {
778 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
779 "waiting on message queue %d\n",(int)thread->id,
780 messageQueueID));
781 messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
782 return EINTR;
783 } else {
784 messageQueueLocker.Lock();
785 messageQueueHashLocker.Unlock();
787 } else if (message == NULL) {
788 // There is not message of type requested and
789 // we can't wait
790 return ENOMSG;
791 } else {
792 // Message received correctly (so far)
793 if ((ssize_t)messageSize < message->length
794 && !(messageFlags & MSG_NOERROR)) {
795 TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
796 // Put the message back inside. Since we hold the
797 // queue message lock, not one else could have filled
798 // up the queue meanwhile
799 messageQueue->Insert(message);
800 return E2BIG;
803 ssize_t result
804 = message->copy_to_user_buffer(messagePointer, messageSize);
805 if (result < 0) {
806 messageQueue->Insert(message);
807 return B_BAD_ADDRESS;
810 delete message;
811 TRACE(("xsi_msgrcv: message received correctly\n"));
812 return result;
816 return B_OK;
821 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
822 size_t messageSize, int messageFlags)
824 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
825 messageQueueID, messageSize));
826 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
827 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
828 if (messageQueue == NULL) {
829 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
830 messageQueueID));
831 return EINVAL;
833 MutexLocker messageQueueLocker(messageQueue->Lock());
834 messageQueueHashLocker.Unlock();
836 if (messageSize > MAX_BYTES_PER_QUEUE) {
837 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
838 return EINVAL;
840 if (!messageQueue->HasPermission()) {
841 TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
842 "on message queue id %d, key %d\n", messageQueueID,
843 (int)messageQueue->IpcKey()));
844 return EACCES;
846 if (!IS_USER_ADDRESS(messagePointer)) {
847 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
848 return B_BAD_ADDRESS;
851 queued_message *message
852 = new(std::nothrow) queued_message(messagePointer, messageSize);
853 if (message == NULL || message->initOK != true) {
854 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
855 delete message;
856 return ENOMEM;
859 bool notSent = true;
860 status_t result = B_OK;
861 while (notSent) {
862 bool goToSleep = messageQueue->Insert(message);
864 if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
865 // We are going to sleep
866 Thread *thread = thread_get_current_thread();
867 queued_thread queueEntry(thread, messageSize);
868 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
870 uint32 sequenceNumber = messageQueue->SequenceNumber();
872 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
873 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
874 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
876 messageQueueHashLocker.Lock();
877 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
878 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
879 && sequenceNumber != messageQueue->SequenceNumber())) {
880 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = "
881 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
882 sequenceNumber));
883 delete message;
884 notSent = false;
885 result = EIDRM;
886 } else if (result == B_INTERRUPTED) {
887 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
888 "waiting on message queue %d\n",(int)thread->id,
889 messageQueueID));
890 messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
891 delete message;
892 notSent = false;
893 result = EINTR;
894 } else {
895 messageQueueLocker.Lock();
896 messageQueueHashLocker.Unlock();
898 } else if (goToSleep) {
899 // We did not send the message and we can't wait
900 delete message;
901 notSent = false;
902 result = EAGAIN;
903 } else {
904 // Message delivered correctly
905 TRACE(("xsi_msgsnd: message sent correctly\n"));
906 notSent = false;
910 return result;