libroot/posix/stdio: Remove unused portions.
[haiku.git] / src / system / kernel / posix / xsi_message_queue.cpp
blobb2807ab15690388ff786fd5d47a4935621e139c9
1 /*
2 * Copyright 2008-2011, Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
5 * Authors:
6 * Salvatore Benedetto <salvatore.benedetto@gmail.com>
7 */
9 #include <posix/xsi_message_queue.h>
11 #include <new>
13 #include <sys/ipc.h>
14 #include <sys/types.h>
16 #include <OS.h>
18 #include <kernel.h>
19 #include <syscall_restart.h>
21 #include <util/atomic.h>
22 #include <util/AutoLock.h>
23 #include <util/DoublyLinkedList.h>
24 #include <util/OpenHashTable.h>
27 #define TRACE_XSI_MSG_QUEUE
28 #ifdef TRACE_XSI_MSG_QUEUE
29 # define TRACE(x) dprintf x
30 # define TRACE_ERROR(x) dprintf x
31 #else
32 # define TRACE(x) /* nothing */
33 # define TRACE_ERROR(x) dprintf x
34 #endif
37 namespace {
39 // Queue for holding blocked threads
40 struct queued_thread : DoublyLinkedListLinkImpl<queued_thread> {
41 queued_thread(Thread *_thread, int32 _message_length)
43 thread(_thread),
44 message_length(_message_length),
45 queued(false)
49 Thread *thread;
50 int32 message_length;
51 bool queued;
54 typedef DoublyLinkedList<queued_thread> ThreadQueue;
57 struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
58 queued_message(const void *_message, ssize_t _length)
60 initOK(false),
61 length(_length)
63 message = (char *)malloc(sizeof(char) * _length);
64 if (message == NULL)
65 return;
67 if (user_memcpy(&type, _message, sizeof(long)) != B_OK
68 || user_memcpy(message, (void *)((char *)_message + sizeof(long)),
69 _length) != B_OK) {
70 free(message);
71 return;
73 initOK = true;
76 ~queued_message()
78 if (initOK)
79 free(message);
82 ssize_t copy_to_user_buffer(void *_message, ssize_t _length)
84 if (_length > length)
85 _length = length;
87 if (user_memcpy(_message, &type, sizeof(long)) != B_OK
88 || user_memcpy((void *)((char *)_message + sizeof(long)), message,
89 _length) != B_OK)
90 return B_ERROR;
91 return _length;
94 bool initOK;
95 ssize_t length;
96 char *message;
97 long type;
100 typedef DoublyLinkedList<queued_message> MessageQueue;
102 // Arbitrary limit
103 #define MAX_BYTES_PER_QUEUE 2048
105 class XsiMessageQueue {
106 public:
107 XsiMessageQueue(int flags)
109 fBytesInQueue(0),
110 fThreadsWaitingToReceive(0),
111 fThreadsWaitingToSend(0)
113 mutex_init(&fLock, "XsiMessageQueue private mutex");
114 SetIpcKey((key_t)-1);
115 SetPermissions(flags);
116 // Initialize all fields to zero
117 memset((void *)&fMessageQueue, 0, sizeof(struct msqid_ds));
118 fMessageQueue.msg_ctime = (time_t)real_time_clock();
119 fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
122 // Implemented after sXsiMessageCount is declared
123 ~XsiMessageQueue();
125 status_t BlockAndUnlock(Thread *thread, MutexLocker *queueLocker)
127 thread_prepare_to_block(thread, B_CAN_INTERRUPT,
128 THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
129 // Unlock the queue before blocking
130 queueLocker->Unlock();
132 // TODO: We've got a serious race condition: If BlockAndUnlock() returned due to
133 // interruption, we will still be queued. A WakeUpThread() at this point will
134 // call thread_unblock() and might thus screw with our trying to re-lock the
135 // mutex.
136 return thread_block();
139 void DoIpcSet(struct msqid_ds *result)
141 fMessageQueue.msg_perm.uid = result->msg_perm.uid;
142 fMessageQueue.msg_perm.gid = result->msg_perm.gid;
143 fMessageQueue.msg_perm.mode = (fMessageQueue.msg_perm.mode & ~0x01ff)
144 | (result->msg_perm.mode & 0x01ff);
145 fMessageQueue.msg_qbytes = result->msg_qbytes;
146 fMessageQueue.msg_ctime = (time_t)real_time_clock();
149 void Deque(queued_thread *queueEntry, bool waitForMessage)
151 if (queueEntry->queued) {
152 if (waitForMessage) {
153 fWaitingToReceive.Remove(queueEntry);
154 fThreadsWaitingToReceive--;
155 } else {
156 fWaitingToSend.Remove(queueEntry);
157 fThreadsWaitingToSend--;
162 void Enqueue(queued_thread *queueEntry, bool waitForMessage)
164 if (waitForMessage) {
165 fWaitingToReceive.Add(queueEntry);
166 fThreadsWaitingToReceive++;
167 } else {
168 fWaitingToSend.Add(queueEntry);
169 fThreadsWaitingToSend++;
171 queueEntry->queued = true;
174 struct msqid_ds &GetMessageQueue()
176 return fMessageQueue;
179 bool HasPermission() const
181 if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
182 return true;
184 uid_t uid = geteuid();
185 if (uid == 0 || (uid == fMessageQueue.msg_perm.uid
186 && (fMessageQueue.msg_perm.mode & S_IWUSR) != 0))
187 return true;
189 gid_t gid = getegid();
190 if (gid == fMessageQueue.msg_perm.gid
191 && (fMessageQueue.msg_perm.mode & S_IWGRP) != 0)
192 return true;
194 return false;
197 bool HasReadPermission() const
199 // TODO: fix this
200 return HasPermission();
203 int ID() const
205 return fID;
208 // Implemented after sXsiMessageCount is declared
209 bool Insert(queued_message *message);
211 key_t IpcKey() const
213 return fMessageQueue.msg_perm.key;
216 mutex &Lock()
218 return fLock;
221 msglen_t MaxBytes() const
223 return fMessageQueue.msg_qbytes;
226 // Implemented after sXsiMessageCount is declared
227 queued_message *Remove(long typeRequested);
229 uint32 SequenceNumber() const
231 return fSequenceNumber;
234 // Implemented after sMessageQueueHashTable is declared
235 void SetID();
237 void SetIpcKey(key_t key)
239 fMessageQueue.msg_perm.key = key;
242 void SetPermissions(int flags)
244 fMessageQueue.msg_perm.uid = fMessageQueue.msg_perm.cuid = geteuid();
245 fMessageQueue.msg_perm.gid = fMessageQueue.msg_perm.cgid = getegid();
246 fMessageQueue.msg_perm.mode = (flags & 0x01ff);
249 void WakeUpThread(bool waitForMessage)
251 if (waitForMessage) {
252 // Wake up all waiting thread for a message
253 // TODO: this can cause starvation for any
254 // very-unlucky-and-slow thread
255 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
256 entry->queued = false;
257 fThreadsWaitingToReceive--;
258 thread_unblock(entry->thread, 0);
260 } else {
261 // Wake up only one thread waiting to send
262 if (queued_thread *entry = fWaitingToSend.RemoveHead()) {
263 entry->queued = false;
264 fThreadsWaitingToSend--;
265 thread_unblock(entry->thread, 0);
270 XsiMessageQueue*& Link()
272 return fLink;
275 private:
276 msglen_t fBytesInQueue;
277 int fID;
278 mutex fLock;
279 MessageQueue fMessage;
280 struct msqid_ds fMessageQueue;
281 uint32 fSequenceNumber;
282 uint32 fThreadsWaitingToReceive;
283 uint32 fThreadsWaitingToSend;
285 ThreadQueue fWaitingToReceive;
286 ThreadQueue fWaitingToSend;
288 XsiMessageQueue* fLink;
292 // Xsi message queue hash table
293 struct MessageQueueHashTableDefinition {
294 typedef int KeyType;
295 typedef XsiMessageQueue ValueType;
297 size_t HashKey (const int key) const
299 return (size_t)key;
302 size_t Hash(XsiMessageQueue *variable) const
304 return (size_t)variable->ID();
307 bool Compare(const int key, XsiMessageQueue *variable) const
309 return (int)key == (int)variable->ID();
312 XsiMessageQueue*& GetLink(XsiMessageQueue *variable) const
314 return variable->Link();
319 // IPC class
320 class Ipc {
321 public:
322 Ipc(key_t key)
323 : fKey(key),
324 fMessageQueueId(-1)
328 key_t Key() const
330 return fKey;
333 int MessageQueueID() const
335 return fMessageQueueId;
338 void SetMessageQueueID(XsiMessageQueue *messageQueue)
340 fMessageQueueId = messageQueue->ID();
343 Ipc*& Link()
345 return fLink;
348 private:
349 key_t fKey;
350 int fMessageQueueId;
351 Ipc* fLink;
355 struct IpcHashTableDefinition {
356 typedef key_t KeyType;
357 typedef Ipc ValueType;
359 size_t HashKey (const key_t key) const
361 return (size_t)(key);
364 size_t Hash(Ipc *variable) const
366 return (size_t)HashKey(variable->Key());
369 bool Compare(const key_t key, Ipc *variable) const
371 return (key_t)key == (key_t)variable->Key();
374 Ipc*& GetLink(Ipc *variable) const
376 return variable->Link();
380 } // namespace
383 // Arbitrary limits
384 #define MAX_XSI_MESSAGE 4096
385 #define MAX_XSI_MESSAGE_QUEUE 1024
386 static BOpenHashTable<IpcHashTableDefinition> sIpcHashTable;
387 static BOpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
389 static mutex sIpcLock;
390 static mutex sXsiMessageQueueLock;
392 static uint32 sGlobalSequenceNumber = 1;
393 static int32 sXsiMessageCount = 0;
394 static int32 sXsiMessageQueueCount = 0;
397 // #pragma mark -
400 XsiMessageQueue::~XsiMessageQueue()
402 mutex_destroy(&fLock);
404 // Wake up any threads still waiting
405 if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
406 while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
407 entry->queued = false;
408 thread_unblock(entry->thread, EIDRM);
410 while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
411 entry->queued = false;
412 thread_unblock(entry->thread, EIDRM);
416 // Free up any remaining messages
417 if (fMessageQueue.msg_qnum) {
418 while (queued_message *message = fMessage.RemoveHead()) {
419 atomic_add(&sXsiMessageCount, -1);
420 delete message;
426 bool
427 XsiMessageQueue::Insert(queued_message *message)
429 // The only situation that would make us (potentially) wait
430 // is that we exceed with bytes or with the total number of messages
431 if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
432 return true;
434 while (true) {
435 int32 oldCount = atomic_get(&sXsiMessageCount);
436 if (oldCount >= MAX_XSI_MESSAGE)
437 return true;
438 // If another thread updates the counter we keep
439 // iterating
440 if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
441 == oldCount)
442 break;
445 fMessage.Add(message);
446 fMessageQueue.msg_qnum++;
447 fMessageQueue.msg_lspid = getpid();
448 fMessageQueue.msg_stime = real_time_clock();
449 fBytesInQueue += message->length;
450 if (fThreadsWaitingToReceive)
451 WakeUpThread(true /* WaitForMessage */);
452 return false;
456 queued_message*
457 XsiMessageQueue::Remove(long typeRequested)
459 queued_message *message = NULL;
460 if (typeRequested < 0) {
461 // Return first message of the lowest type
462 // that is less than or equal to the absolute
463 // value of type requested.
464 MessageQueue::Iterator iterator = fMessage.GetIterator();
465 while (iterator.HasNext()) {
466 queued_message *current = iterator.Next();
467 if (current->type <= -typeRequested) {
468 message = iterator.Remove();
469 break;
472 } else if (typeRequested == 0) {
473 // Return the first message on the queue
474 message = fMessage.RemoveHead();
475 } else {
476 // Return the first message of type requested
477 MessageQueue::Iterator iterator = fMessage.GetIterator();
478 while (iterator.HasNext()) {
479 queued_message *current = iterator.Next();
480 if (current->type == typeRequested) {
481 message = iterator.Remove();
482 break;
487 if (message == NULL)
488 return NULL;
490 fMessageQueue.msg_qnum--;
491 fMessageQueue.msg_lrpid = getpid();
492 fMessageQueue.msg_rtime = real_time_clock();
493 fBytesInQueue -= message->length;
494 atomic_add(&sXsiMessageCount, -1);
495 if (fThreadsWaitingToSend)
496 WakeUpThread(false /* WaitForMessage */);
497 return message;
501 void
502 XsiMessageQueue::SetID()
504 fID = real_time_clock();
505 // The lock is held before calling us
506 while (true) {
507 if (sMessageQueueHashTable.Lookup(fID) == NULL)
508 break;
509 fID++;
511 sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
512 fSequenceNumber = sGlobalSequenceNumber;
516 // #pragma mark - Kernel exported API
519 void
520 xsi_msg_init()
522 // Initialize hash tables
523 status_t status = sIpcHashTable.Init();
524 if (status != B_OK)
525 panic("xsi_msg_init() failed to initialize ipc hash table\n");
526 status = sMessageQueueHashTable.Init();
527 if (status != B_OK)
528 panic("xsi_msg_init() failed to initialize message queue hash table\n");
530 mutex_init(&sIpcLock, "global POSIX message queue IPC table");
531 mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
535 // #pragma mark - Syscalls
539 _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
541 TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
542 MutexLocker ipcHashLocker(sIpcLock);
543 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
544 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
545 if (messageQueue == NULL) {
546 TRACE_ERROR(("xsi_msgctl: message queue id %d not valid\n", messageQueueID));
547 return EINVAL;
549 if (!IS_USER_ADDRESS(buffer)) {
550 TRACE_ERROR(("xsi_msgctl: buffer address is not valid\n"));
551 return B_BAD_ADDRESS;
554 // Lock the message queue itself and release both the ipc hash table lock
555 // and the message queue hash table lock _only_ if the command it's not
556 // IPC_RMID, this prevents undesidered situation from happening while
557 // (hopefully) improving the concurrency.
558 MutexLocker messageQueueLocker;
559 if (command != IPC_RMID) {
560 messageQueueLocker.SetTo(&messageQueue->Lock(), false);
561 messageQueueHashLocker.Unlock();
562 ipcHashLocker.Unlock();
563 } else
564 // Since we are going to delete the message queue object
565 // along with its mutex, we can't use a MutexLocker object,
566 // as the mutex itself won't exist on function exit
567 mutex_lock(&messageQueue->Lock());
569 switch (command) {
570 case IPC_STAT: {
571 if (!messageQueue->HasReadPermission()) {
572 TRACE_ERROR(("xsi_msgctl: calling process has not read "
573 "permission on message queue %d, key %d\n", messageQueueID,
574 (int)messageQueue->IpcKey()));
575 return EACCES;
577 struct msqid_ds msg = messageQueue->GetMessageQueue();
578 if (user_memcpy(buffer, &msg, sizeof(struct msqid_ds)) < B_OK) {
579 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
580 return B_BAD_ADDRESS;
582 break;
585 case IPC_SET: {
586 if (!messageQueue->HasPermission()) {
587 TRACE_ERROR(("xsi_msgctl: calling process has not permission "
588 "on message queue %d, key %d\n", messageQueueID,
589 (int)messageQueue->IpcKey()));
590 return EPERM;
592 struct msqid_ds msg;
593 if (user_memcpy(&msg, buffer, sizeof(struct msqid_ds)) < B_OK) {
594 TRACE_ERROR(("xsi_msgctl: user_memcpy failed\n"));
595 return B_BAD_ADDRESS;
597 if (msg.msg_qbytes > messageQueue->MaxBytes() && getuid() != 0) {
598 TRACE_ERROR(("xsi_msgctl: user does not have permission to "
599 "increase the maximum number of bytes allowed on queue\n"));
600 return EPERM;
602 if (msg.msg_qbytes == 0) {
603 TRACE_ERROR(("xsi_msgctl: can't set msg_qbytes to 0!\n"));
604 return EINVAL;
607 messageQueue->DoIpcSet(&msg);
608 break;
611 case IPC_RMID: {
612 // If this was the command, we are still holding the message
613 // queue hash table lock along with the ipc one, but not the
614 // message queue lock itself. This prevents other process
615 // to try and acquire a destroyed mutex
616 if (!messageQueue->HasPermission()) {
617 TRACE_ERROR(("xsi_msgctl: calling process has not permission "
618 "on message queue %d, key %d\n", messageQueueID,
619 (int)messageQueue->IpcKey()));
620 return EPERM;
622 key_t key = messageQueue->IpcKey();
623 Ipc *ipcKey = NULL;
624 if (key != -1) {
625 ipcKey = sIpcHashTable.Lookup(key);
626 sIpcHashTable.Remove(ipcKey);
628 sMessageQueueHashTable.Remove(messageQueue);
629 // Wake up of any threads waiting on this
630 // queue happens in destructor
631 if (key != -1)
632 delete ipcKey;
633 atomic_add(&sXsiMessageQueueCount, -1);
635 delete messageQueue;
636 break;
639 default:
640 TRACE_ERROR(("xsi_semctl: command %d not valid\n", command));
641 return EINVAL;
644 return B_OK;
649 _user_xsi_msgget(key_t key, int flags)
651 TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
652 XsiMessageQueue *messageQueue = NULL;
653 Ipc *ipcKey = NULL;
654 // Default assumptions
655 bool isPrivate = true;
656 bool create = true;
658 if (key != IPC_PRIVATE) {
659 isPrivate = false;
660 // Check if key already exist, if it does it already has a message
661 // queue associated with it
662 ipcKey = sIpcHashTable.Lookup(key);
663 if (ipcKey == NULL) {
664 if (!(flags & IPC_CREAT)) {
665 TRACE_ERROR(("xsi_msgget: key %d does not exist, but the "
666 "caller did not ask for creation\n", (int)key));
667 return ENOENT;
669 ipcKey = new(std::nothrow) Ipc(key);
670 if (ipcKey == NULL) {
671 TRACE_ERROR(("xsi_msgget: failed to create new Ipc object "
672 "for key %d\n", (int)key));
673 return ENOMEM;
675 sIpcHashTable.Insert(ipcKey);
676 } else {
677 // The IPC key exist and it already has a message queue
678 if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
679 TRACE_ERROR(("xsi_msgget: key %d already exist\n", (int)key));
680 return EEXIST;
682 int messageQueueID = ipcKey->MessageQueueID();
684 MutexLocker _(sXsiMessageQueueLock);
685 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
686 if (!messageQueue->HasPermission()) {
687 TRACE_ERROR(("xsi_msgget: calling process has not permission "
688 "on message queue %d, key %d\n", messageQueue->ID(),
689 (int)key));
690 return EACCES;
692 create = false;
696 if (create) {
697 // Create a new message queue for this key
698 if (atomic_get(&sXsiMessageQueueCount) >= MAX_XSI_MESSAGE_QUEUE) {
699 TRACE_ERROR(("xsi_msgget: reached limit of maximun number of "
700 "message queues\n"));
701 return ENOSPC;
704 messageQueue = new(std::nothrow) XsiMessageQueue(flags);
705 if (messageQueue == NULL) {
706 TRACE_ERROR(("xsi_msgget: failed to allocate new xsi "
707 "message queue\n"));
708 return ENOMEM;
710 atomic_add(&sXsiMessageQueueCount, 1);
712 MutexLocker _(sXsiMessageQueueLock);
713 messageQueue->SetID();
714 if (isPrivate)
715 messageQueue->SetIpcKey((key_t)-1);
716 else {
717 messageQueue->SetIpcKey(key);
718 ipcKey->SetMessageQueueID(messageQueue);
720 sMessageQueueHashTable.Insert(messageQueue);
723 return messageQueue->ID();
727 ssize_t
728 _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
729 size_t messageSize, long messageType, int messageFlags)
731 TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
732 messageQueueID, messageSize));
733 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
734 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
735 if (messageQueue == NULL) {
736 TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
737 messageQueueID));
738 return EINVAL;
740 MutexLocker messageQueueLocker(messageQueue->Lock());
741 messageQueueHashLocker.Unlock();
743 if (messageSize > MAX_BYTES_PER_QUEUE) {
744 TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
745 return EINVAL;
747 if (!messageQueue->HasPermission()) {
748 TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
749 "on message queue id %d, key %d\n", messageQueueID,
750 (int)messageQueue->IpcKey()));
751 return EACCES;
753 if (!IS_USER_ADDRESS(messagePointer)) {
754 TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
755 return B_BAD_ADDRESS;
758 queued_message *message = NULL;
759 while (true) {
760 message = messageQueue->Remove(messageType);
762 if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
763 // We are going to sleep
764 Thread *thread = thread_get_current_thread();
765 queued_thread queueEntry(thread, messageSize);
766 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
768 uint32 sequenceNumber = messageQueue->SequenceNumber();
770 TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
771 status_t result
772 = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
773 TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
775 messageQueueHashLocker.Lock();
776 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
777 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
778 && sequenceNumber != messageQueue->SequenceNumber())) {
779 TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = "
780 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
781 sequenceNumber));
782 return EIDRM;
783 } else if (result == B_INTERRUPTED) {
784 TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
785 "waiting on message queue %d\n",(int)thread->id,
786 messageQueueID));
787 messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
788 return EINTR;
789 } else {
790 messageQueueLocker.Lock();
791 messageQueueHashLocker.Unlock();
793 } else if (message == NULL) {
794 // There is not message of type requested and
795 // we can't wait
796 return ENOMSG;
797 } else {
798 // Message received correctly (so far)
799 if ((ssize_t)messageSize < message->length
800 && !(messageFlags & MSG_NOERROR)) {
801 TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
802 // Put the message back inside. Since we hold the
803 // queue message lock, not one else could have filled
804 // up the queue meanwhile
805 messageQueue->Insert(message);
806 return E2BIG;
809 ssize_t result
810 = message->copy_to_user_buffer(messagePointer, messageSize);
811 if (result < 0) {
812 messageQueue->Insert(message);
813 return B_BAD_ADDRESS;
816 delete message;
817 TRACE(("xsi_msgrcv: message received correctly\n"));
818 return result;
822 return B_OK;
827 _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
828 size_t messageSize, int messageFlags)
830 TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
831 messageQueueID, messageSize));
832 MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
833 XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
834 if (messageQueue == NULL) {
835 TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
836 messageQueueID));
837 return EINVAL;
839 MutexLocker messageQueueLocker(messageQueue->Lock());
840 messageQueueHashLocker.Unlock();
842 if (messageSize > MAX_BYTES_PER_QUEUE) {
843 TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
844 return EINVAL;
846 if (!messageQueue->HasPermission()) {
847 TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
848 "on message queue id %d, key %d\n", messageQueueID,
849 (int)messageQueue->IpcKey()));
850 return EACCES;
852 if (!IS_USER_ADDRESS(messagePointer)) {
853 TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
854 return B_BAD_ADDRESS;
857 queued_message *message
858 = new(std::nothrow) queued_message(messagePointer, messageSize);
859 if (message == NULL || message->initOK != true) {
860 TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
861 delete message;
862 return ENOMEM;
865 bool notSent = true;
866 status_t result = B_OK;
867 while (notSent) {
868 bool goToSleep = messageQueue->Insert(message);
870 if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
871 // We are going to sleep
872 Thread *thread = thread_get_current_thread();
873 queued_thread queueEntry(thread, messageSize);
874 messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
876 uint32 sequenceNumber = messageQueue->SequenceNumber();
878 TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
879 result = messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
880 TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
882 messageQueueHashLocker.Lock();
883 messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
884 if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
885 && sequenceNumber != messageQueue->SequenceNumber())) {
886 TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = "
887 "%" B_PRIu32 ") got destroyed\n", messageQueueID,
888 sequenceNumber));
889 delete message;
890 notSent = false;
891 result = EIDRM;
892 } else if (result == B_INTERRUPTED) {
893 TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
894 "waiting on message queue %d\n",(int)thread->id,
895 messageQueueID));
896 messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
897 delete message;
898 notSent = false;
899 result = EINTR;
900 } else {
901 messageQueueLocker.Lock();
902 messageQueueHashLocker.Unlock();
904 } else if (goToSleep) {
905 // We did not send the message and we can't wait
906 delete message;
907 notSent = false;
908 result = EAGAIN;
909 } else {
910 // Message delivered correctly
911 TRACE(("xsi_msgsnd: message sent correctly\n"));
912 notSent = false;
916 return result;