vfs: check userland buffers before reading them.
[haiku.git] / src / servers / registrar / MessageDeliverer.cpp
blob0db7e3b7d2b0903dcc5f4aa603fe66983e9e728f
1 /*
2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3 * Distributed under the terms of the MIT License.
4 */
6 #include <map>
7 #include <new>
8 #include <set>
9 #include <string.h>
11 #include <AutoDeleter.h>
12 #include <Autolock.h>
13 #include <DataIO.h>
14 #include <MessagePrivate.h>
15 #include <MessengerPrivate.h>
16 #include <OS.h>
17 #include <TokenSpace.h>
18 #include <util/DoublyLinkedList.h>
20 #include <messaging.h>
22 #include "Debug.h"
23 #include "MessageDeliverer.h"
24 #include "Referenceable.h"
26 using std::map;
27 using std::nothrow;
28 using std::set;
30 // sDeliverer -- the singleton instance
31 MessageDeliverer *MessageDeliverer::sDeliverer = NULL;
33 static const bigtime_t kRetryDelay = 100000; // 100 ms
35 // per port sanity limits
36 static const int32 kMaxMessagesPerPort = 10000;
37 static const int32 kMaxDataPerPort = 50 * 1024 * 1024; // 50 MB
40 // MessagingTargetSet
42 // destructor
43 MessagingTargetSet::~MessagingTargetSet()
48 // #pragma mark -
50 // DefaultMessagingTargetSet
52 // constructor
53 DefaultMessagingTargetSet::DefaultMessagingTargetSet(
54 const messaging_target *targets, int32 targetCount)
55 : MessagingTargetSet(),
56 fTargets(targets),
57 fTargetCount(targetCount),
58 fNextIndex(0)
62 // destructor
63 DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
67 // HasNext
68 bool
69 DefaultMessagingTargetSet::HasNext() const
71 return (fNextIndex < fTargetCount);
74 // Next
75 bool
76 DefaultMessagingTargetSet::Next(port_id &port, int32 &token)
78 if (fNextIndex >= fTargetCount)
79 return false;
81 port = fTargets[fNextIndex].port;
82 token = fTargets[fNextIndex].token;
83 fNextIndex++;
85 return true;
88 // Rewind
89 void
90 DefaultMessagingTargetSet::Rewind()
92 fNextIndex = 0;
96 // #pragma mark -
98 // SingleMessagingTargetSet
100 // constructor
101 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target)
102 : MessagingTargetSet(),
103 fAtBeginning(true)
105 BMessenger::Private messengerPrivate(target);
106 fPort = messengerPrivate.Port();
107 fToken = (messengerPrivate.IsPreferredTarget()
108 ? B_PREFERRED_TOKEN : messengerPrivate.Token());
111 // constructor
112 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port, int32 token)
113 : MessagingTargetSet(),
114 fPort(port),
115 fToken(token),
116 fAtBeginning(true)
120 // destructor
121 SingleMessagingTargetSet::~SingleMessagingTargetSet()
125 // HasNext
126 bool
127 SingleMessagingTargetSet::HasNext() const
129 return fAtBeginning;
132 // Next
133 bool
134 SingleMessagingTargetSet::Next(port_id &port, int32 &token)
136 if (!fAtBeginning)
137 return false;
139 port = fPort;
140 token = fToken;
141 fAtBeginning = false;
143 return true;
146 // Rewind
147 void
148 SingleMessagingTargetSet::Rewind()
150 fAtBeginning = true;
154 // #pragma mark -
156 // Message
157 /*! \brief Encapsulates a message to be delivered.
159 Besides the flattened message it also stores the when the message was
160 created and when the delivery attempts shall time out.
162 class MessageDeliverer::Message : public BReferenceable {
163 public:
164 Message(void *data, int32 dataSize, bigtime_t timeout)
165 : BReferenceable(),
166 fData(data),
167 fDataSize(dataSize),
168 fCreationTime(system_time()),
169 fBusy(false)
171 if (B_INFINITE_TIMEOUT - fCreationTime <= timeout)
172 fTimeoutTime = B_INFINITE_TIMEOUT;
173 else if (timeout <= 0)
174 fTimeoutTime = fCreationTime;
175 else
176 fTimeoutTime = fCreationTime + timeout;
179 ~Message()
181 free(fData);
184 void *Data() const
186 return fData;
189 int32 DataSize() const
191 return fDataSize;
194 bigtime_t CreationTime() const
196 return fCreationTime;
199 bigtime_t TimeoutTime() const
201 return fTimeoutTime;
204 bool HasTimeout() const
206 return (fTimeoutTime < B_INFINITE_TIMEOUT);
209 void SetBusy(bool busy)
211 fBusy = busy;
214 bool IsBusy() const
216 return fBusy;
219 private:
220 void *fData;
221 int32 fDataSize;
222 bigtime_t fCreationTime;
223 bigtime_t fTimeoutTime;
224 bool fBusy;
227 // TargetMessage
228 /*! \brief Encapsulates a Message to be sent to a specific handler.
230 A TargetMessage is always associated with (i.e. queued in) a TargetPort.
231 While a Message stores only the message data and some timing info, this
232 object adds the token of a the target BHandler.
234 A Message can be referred to by more than one TargetMessage (when
235 broadcasting), but a TargetMessage is referred to exactly once, by
236 the TargetPort.
238 class MessageDeliverer::TargetMessage
239 : public DoublyLinkedListLinkImpl<MessageDeliverer::TargetMessage> {
240 public:
241 TargetMessage(Message *message, int32 token)
242 : fMessage(message),
243 fToken(token)
245 if (fMessage)
246 fMessage->AcquireReference();
249 ~TargetMessage()
251 if (fMessage)
252 fMessage->ReleaseReference();
255 Message *GetMessage() const
257 return fMessage;
260 int32 Token() const
262 return fToken;
265 private:
266 Message *fMessage;
267 int32 fToken;
270 // TargetMessageHandle
271 /*! \brief A small wrapper for TargetMessage providing a complete order.
273 This class only exists to provide the comparison operators required to
274 put a TargetMessage into a set. The order implemented is by ascending by
275 timeout time (primary) and by TargetMessage pointer (secondary).
276 Hence TargetMessageHandles referring to the same TargetMessage are equal
277 (and only those).
279 class MessageDeliverer::TargetMessageHandle {
280 public:
281 TargetMessageHandle(TargetMessage *message)
282 : fMessage(message)
286 TargetMessageHandle(const TargetMessageHandle &other)
287 : fMessage(other.fMessage)
291 TargetMessage *GetMessage() const
293 return fMessage;
296 TargetMessageHandle &operator=(const TargetMessageHandle &other)
298 fMessage = other.fMessage;
299 return *this;
302 bool operator==(const TargetMessageHandle &other) const
304 return (fMessage == other.fMessage);
307 bool operator!=(const TargetMessageHandle &other) const
309 return (fMessage != other.fMessage);
312 bool operator<(const TargetMessageHandle &other) const
314 bigtime_t timeout = fMessage->GetMessage()->TimeoutTime();
315 bigtime_t otherTimeout = other.fMessage->GetMessage()->TimeoutTime();
316 if (timeout < otherTimeout)
317 return true;
318 if (timeout > otherTimeout)
319 return false;
320 return (fMessage < other.fMessage);
323 private:
324 TargetMessage *fMessage;
327 // TargetPort
328 /*! \brief Represents a full target port, queuing the not yet delivered
329 messages.
331 A TargetPort internally queues TargetMessages in the order the are to be
332 delivered. Furthermore the object maintains an ordered set of
333 TargetMessages that can timeout (in ascending order of timeout time), so
334 that timed out messages can be dropped easily.
336 class MessageDeliverer::TargetPort {
337 public:
338 TargetPort(port_id portID)
339 : fPortID(portID),
340 fMessages(),
341 fMessageCount(0),
342 fMessageSize(0)
346 ~TargetPort()
348 while (!fMessages.IsEmpty())
349 PopMessage();
352 port_id PortID() const
354 return fPortID;
357 status_t PushMessage(Message *message, int32 token)
359 PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32 ", %p, %"
360 B_PRId32 ")\n", fPortID, message, token);
361 // create a target message
362 TargetMessage *targetMessage
363 = new(nothrow) TargetMessage(message, token);
364 if (!targetMessage)
365 return B_NO_MEMORY;
367 // push it
368 fMessages.Insert(targetMessage);
369 fMessageCount++;
370 fMessageSize += targetMessage->GetMessage()->DataSize();
372 // add it to the timeoutable messages, if it has a timeout
373 if (message->HasTimeout())
374 fTimeoutableMessages.insert(targetMessage);
376 _EnforceLimits();
378 return B_OK;
381 Message *PeekMessage(int32 &token) const
383 if (!fMessages.Head())
384 return NULL;
386 token = fMessages.Head()->Token();
387 return fMessages.Head()->GetMessage();
390 void PopMessage()
392 if (fMessages.Head()) {
393 PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32 ", %p\n",
394 fPortID, fMessages.Head()->GetMessage());
395 _RemoveMessage(fMessages.Head());
399 void DropTimedOutMessages()
401 bigtime_t now = system_time();
403 while (fTimeoutableMessages.begin() != fTimeoutableMessages.end()) {
404 TargetMessage *message = fTimeoutableMessages.begin()->GetMessage();
405 if (message->GetMessage()->TimeoutTime() > now)
406 break;
408 PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
409 ": message %p timed out\n", fPortID, message->GetMessage());
410 _RemoveMessage(message);
414 bool IsEmpty() const
416 return fMessages.IsEmpty();
419 private:
420 void _RemoveMessage(TargetMessage *message)
422 fMessages.Remove(message);
423 fMessageCount--;
424 fMessageSize -= message->GetMessage()->DataSize();
426 if (message->GetMessage()->HasTimeout())
427 fTimeoutableMessages.erase(message);
429 delete message;
432 void _EnforceLimits()
434 // message count
435 while (fMessageCount > kMaxMessagesPerPort) {
436 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
437 ": hit maximum message count limit.\n", fPortID);
438 PopMessage();
441 // message size
442 while (fMessageSize > kMaxDataPerPort) {
443 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
444 ": hit maximum message size limit.\n", fPortID);
445 PopMessage();
449 typedef DoublyLinkedList<TargetMessage> MessageList;
451 port_id fPortID;
452 MessageList fMessages;
453 int32 fMessageCount;
454 int32 fMessageSize;
455 set<TargetMessageHandle> fTimeoutableMessages;
458 // TargetPortMap
459 struct MessageDeliverer::TargetPortMap : public map<port_id, TargetPort*> {
463 // #pragma mark -
465 /*! \class MessageDeliverer
466 \brief Service for delivering messages, which retries the delivery as long
467 as the target port is full.
469 For the user of the service only the MessageDeliverer::DeliverMessage()
470 will be of interest. Some of them allow broadcasting a message to several
471 recepients.
473 The class maintains a TargetPort for each target port which was full at the
474 time a message was to be delivered to it. A TargetPort has a queue of
475 undelivered messages. A separate worker thread retries periodically to send
476 the yet undelivered messages to the respective target ports.
479 // constructor
480 MessageDeliverer::MessageDeliverer()
481 : fLock("message deliverer"),
482 fTargetPorts(NULL),
483 fDelivererThread(-1),
484 fTerminating(false)
488 // destructor
489 MessageDeliverer::~MessageDeliverer()
491 fTerminating = true;
493 if (fDelivererThread >= 0) {
494 int32 result;
495 wait_for_thread(fDelivererThread, &result);
498 delete fTargetPorts;
501 // Init
502 status_t
503 MessageDeliverer::Init()
505 // create the target port map
506 fTargetPorts = new(nothrow) TargetPortMap;
507 if (!fTargetPorts)
508 return B_NO_MEMORY;
510 // spawn the deliverer thread
511 fDelivererThread = spawn_thread(MessageDeliverer::_DelivererThreadEntry,
512 "message deliverer", B_NORMAL_PRIORITY + 1, this);
513 if (fDelivererThread < 0)
514 return fDelivererThread;
516 // resume the deliverer thread
517 resume_thread(fDelivererThread);
519 return B_OK;
522 // CreateDefault
523 status_t
524 MessageDeliverer::CreateDefault()
526 if (sDeliverer)
527 return B_OK;
529 // create the deliverer
530 MessageDeliverer *deliverer = new(nothrow) MessageDeliverer;
531 if (!deliverer)
532 return B_NO_MEMORY;
534 // init it
535 status_t error = deliverer->Init();
536 if (error != B_OK) {
537 delete deliverer;
538 return error;
541 sDeliverer = deliverer;
542 return B_OK;
545 // DeleteDefault
546 void
547 MessageDeliverer::DeleteDefault()
549 if (sDeliverer) {
550 delete sDeliverer;
551 sDeliverer = NULL;
555 // Default
556 MessageDeliverer *
557 MessageDeliverer::Default()
559 return sDeliverer;
562 // DeliverMessage
563 /*! \brief Delivers a message to the supplied target.
565 The method tries to send the message right now (if there are not already
566 messages pending for the target port). If that fails due to a full target
567 port, the message is queued for later delivery.
569 \param message The message to be delivered.
570 \param target A BMessenger identifying the delivery target.
571 \param timeout If given, the message will be dropped, when it couldn't be
572 delivered after this amount of microseconds.
573 \return
574 - \c B_OK, if sending the message succeeded or if the target port was
575 full and the message has been queued,
576 - another error code otherwise.
578 status_t
579 MessageDeliverer::DeliverMessage(BMessage *message, BMessenger target,
580 bigtime_t timeout)
582 SingleMessagingTargetSet set(target);
583 return DeliverMessage(message, set, timeout);
586 // DeliverMessage
587 /*! \brief Delivers a message to the supplied targets.
589 The method tries to send the message right now to each of the given targets
590 (if there are not already messages pending for a target port). If that
591 fails due to a full target port, the message is queued for later delivery.
593 \param message The message to be delivered.
594 \param targets MessagingTargetSet providing the the delivery targets.
595 \param timeout If given, the message will be dropped, when it couldn't be
596 delivered after this amount of microseconds.
597 \return
598 - \c B_OK, if for each of the given targets sending the message succeeded
599 or if the target port was full and the message has been queued,
600 - another error code otherwise.
602 status_t
603 MessageDeliverer::DeliverMessage(BMessage *message, MessagingTargetSet &targets,
604 bigtime_t timeout)
606 if (!message)
607 return B_BAD_VALUE;
609 // flatten the message
610 BMallocIO mallocIO;
611 status_t error = message->Flatten(&mallocIO, NULL);
612 if (error < B_OK)
613 return error;
615 return DeliverMessage(mallocIO.Buffer(), mallocIO.BufferLength(), targets,
616 timeout);
619 // DeliverMessage
620 /*! \brief Delivers a flattened message to the supplied targets.
622 The method tries to send the message right now to each of the given targets
623 (if there are not already messages pending for a target port). If that
624 fails due to a full target port, the message is queued for later delivery.
626 \param message The flattened message to be delivered. This may be a
627 flattened BMessage or KMessage.
628 \param messageSize The size of the flattened message buffer.
629 \param targets MessagingTargetSet providing the the delivery targets.
630 \param timeout If given, the message will be dropped, when it couldn't be
631 delivered after this amount of microseconds.
632 \return
633 - \c B_OK, if for each of the given targets sending the message succeeded
634 or if the target port was full and the message has been queued,
635 - another error code otherwise.
637 status_t
638 MessageDeliverer::DeliverMessage(const void *messageData, int32 messageSize,
639 MessagingTargetSet &targets, bigtime_t timeout)
641 if (!messageData || messageSize <= 0)
642 return B_BAD_VALUE;
644 // clone the buffer
645 void *data = malloc(messageSize);
646 if (!data)
647 return B_NO_MEMORY;
648 memcpy(data, messageData, messageSize);
650 // create a Message
651 Message *message = new(nothrow) Message(data, messageSize, timeout);
652 if (!message) {
653 free(data);
654 return B_NO_MEMORY;
656 BReference<Message> _(message, true);
658 // add the message to the respective target ports
659 BAutolock locker(fLock);
660 for (int32 targetIndex = 0; targets.HasNext(); targetIndex++) {
661 port_id portID;
662 int32 token;
663 targets.Next(portID, token);
665 // get the target port
666 TargetPort *port = _GetTargetPort(portID, true);
667 if (!port)
668 return B_NO_MEMORY;
670 // try sending the message, if there are no queued messages yet
671 if (port->IsEmpty()) {
672 status_t error = _SendMessage(message, portID, token);
673 // if the message was delivered OK, we're done with the target
674 if (error == B_OK) {
675 _PutTargetPort(port);
676 continue;
679 // if the port is not full, but an error occurred, we skip this target
680 if (error != B_WOULD_BLOCK) {
681 _PutTargetPort(port);
682 if (targetIndex == 0 && !targets.HasNext())
683 return error;
684 continue;
688 // add the message
689 status_t error = port->PushMessage(message, token);
690 _PutTargetPort(port);
691 if (error != B_OK)
692 return error;
695 return B_OK;
698 // _GetTargetPort
699 MessageDeliverer::TargetPort *
700 MessageDeliverer::_GetTargetPort(port_id portID, bool create)
702 // get the port from the map
703 TargetPortMap::iterator it = fTargetPorts->find(portID);
704 if (it != fTargetPorts->end())
705 return it->second;
707 if (!create)
708 return NULL;
710 // create a port
711 TargetPort *port = new(nothrow) TargetPort(portID);
712 if (!port)
713 return NULL;
714 (*fTargetPorts)[portID] = port;
716 return port;
719 // _PutTargetPort
720 void
721 MessageDeliverer::_PutTargetPort(TargetPort *port)
723 if (!port)
724 return;
726 if (port->IsEmpty()) {
727 fTargetPorts->erase(port->PortID());
728 delete port;
732 // _SendMessage
733 status_t
734 MessageDeliverer::_SendMessage(Message *message, port_id portID, int32 token)
736 status_t error = BMessage::Private::SendFlattenedMessage(message->Data(),
737 message->DataSize(), portID, token, 0);
738 //PRINT("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
739 //message, portID, token, error);
740 return error;
743 // _DelivererThreadEntry
744 int32
745 MessageDeliverer::_DelivererThreadEntry(void *data)
747 return ((MessageDeliverer*)data)->_DelivererThread();
750 // _DelivererThread
751 int32
752 MessageDeliverer::_DelivererThread()
754 while (!fTerminating) {
755 snooze(kRetryDelay);
756 if (fTerminating)
757 break;
759 // iterate through all target ports and try sending the messages
760 BAutolock _(fLock);
761 for (TargetPortMap::iterator it = fTargetPorts->begin();
762 it != fTargetPorts->end();) {
763 TargetPort *port = it->second;
764 bool portError = false;
766 port->DropTimedOutMessages();
768 // try sending all messages
769 int32 token;
770 while (Message *message = port->PeekMessage(token)) {
771 status_t error = B_OK;
772 // if (message->TimeoutTime() > system_time()) {
773 error = _SendMessage(message, port->PortID(), token);
774 // } else {
775 // // timeout, drop message
776 // PRINT("MessageDeliverer::_DelivererThread(): port %ld, "
777 // "message %p timed out\n", port->PortID(), message);
778 // }
780 if (error == B_OK) {
781 port->PopMessage();
782 } else if (error == B_WOULD_BLOCK) {
783 // no luck yet -- port is still full
784 break;
785 } else {
786 // unexpected error -- probably the port is gone
787 portError = true;
788 break;
792 // next port
793 if (portError || port->IsEmpty()) {
794 TargetPortMap::iterator oldIt = it;
795 ++it;
796 delete port;
797 fTargetPorts->erase(oldIt);
798 } else
799 ++it;
803 return 0;