2 * Copyright 2005, Ingo Weinhold, bonefish@users.sf.net. All rights reserved.
3 * Distributed under the terms of the MIT License.
11 #include <AutoDeleter.h>
14 #include <MessagePrivate.h>
15 #include <MessengerPrivate.h>
17 #include <TokenSpace.h>
18 #include <util/DoublyLinkedList.h>
20 #include <messaging.h>
23 #include "MessageDeliverer.h"
24 #include "Referenceable.h"
30 // sDeliverer -- the singleton instance
31 MessageDeliverer
*MessageDeliverer::sDeliverer
= NULL
;
33 static const bigtime_t kRetryDelay
= 100000; // 100 ms
35 // per port sanity limits
36 static const int32 kMaxMessagesPerPort
= 10000;
37 static const int32 kMaxDataPerPort
= 50 * 1024 * 1024; // 50 MB
43 MessagingTargetSet::~MessagingTargetSet()
50 // DefaultMessagingTargetSet
53 DefaultMessagingTargetSet::DefaultMessagingTargetSet(
54 const messaging_target
*targets
, int32 targetCount
)
55 : MessagingTargetSet(),
57 fTargetCount(targetCount
),
63 DefaultMessagingTargetSet::~DefaultMessagingTargetSet()
69 DefaultMessagingTargetSet::HasNext() const
71 return (fNextIndex
< fTargetCount
);
76 DefaultMessagingTargetSet::Next(port_id
&port
, int32
&token
)
78 if (fNextIndex
>= fTargetCount
)
81 port
= fTargets
[fNextIndex
].port
;
82 token
= fTargets
[fNextIndex
].token
;
90 DefaultMessagingTargetSet::Rewind()
98 // SingleMessagingTargetSet
101 SingleMessagingTargetSet::SingleMessagingTargetSet(BMessenger target
)
102 : MessagingTargetSet(),
105 BMessenger::Private
messengerPrivate(target
);
106 fPort
= messengerPrivate
.Port();
107 fToken
= (messengerPrivate
.IsPreferredTarget()
108 ? B_PREFERRED_TOKEN
: messengerPrivate
.Token());
112 SingleMessagingTargetSet::SingleMessagingTargetSet(port_id port
, int32 token
)
113 : MessagingTargetSet(),
121 SingleMessagingTargetSet::~SingleMessagingTargetSet()
127 SingleMessagingTargetSet::HasNext() const
134 SingleMessagingTargetSet::Next(port_id
&port
, int32
&token
)
141 fAtBeginning
= false;
148 SingleMessagingTargetSet::Rewind()
157 /*! \brief Encapsulates a message to be delivered.
159 Besides the flattened message it also stores the when the message was
160 created and when the delivery attempts shall time out.
162 class MessageDeliverer::Message
: public BReferenceable
{
164 Message(void *data
, int32 dataSize
, bigtime_t timeout
)
168 fCreationTime(system_time()),
171 if (B_INFINITE_TIMEOUT
- fCreationTime
<= timeout
)
172 fTimeoutTime
= B_INFINITE_TIMEOUT
;
173 else if (timeout
<= 0)
174 fTimeoutTime
= fCreationTime
;
176 fTimeoutTime
= fCreationTime
+ timeout
;
189 int32
DataSize() const
194 bigtime_t
CreationTime() const
196 return fCreationTime
;
199 bigtime_t
TimeoutTime() const
204 bool HasTimeout() const
206 return (fTimeoutTime
< B_INFINITE_TIMEOUT
);
209 void SetBusy(bool busy
)
222 bigtime_t fCreationTime
;
223 bigtime_t fTimeoutTime
;
228 /*! \brief Encapsulates a Message to be sent to a specific handler.
230 A TargetMessage is always associated with (i.e. queued in) a TargetPort.
231 While a Message stores only the message data and some timing info, this
232 object adds the token of a the target BHandler.
234 A Message can be referred to by more than one TargetMessage (when
235 broadcasting), but a TargetMessage is referred to exactly once, by
238 class MessageDeliverer::TargetMessage
239 : public DoublyLinkedListLinkImpl
<MessageDeliverer::TargetMessage
> {
241 TargetMessage(Message
*message
, int32 token
)
246 fMessage
->AcquireReference();
252 fMessage
->ReleaseReference();
255 Message
*GetMessage() const
270 // TargetMessageHandle
271 /*! \brief A small wrapper for TargetMessage providing a complete order.
273 This class only exists to provide the comparison operators required to
274 put a TargetMessage into a set. The order implemented is by ascending by
275 timeout time (primary) and by TargetMessage pointer (secondary).
276 Hence TargetMessageHandles referring to the same TargetMessage are equal
279 class MessageDeliverer::TargetMessageHandle
{
281 TargetMessageHandle(TargetMessage
*message
)
286 TargetMessageHandle(const TargetMessageHandle
&other
)
287 : fMessage(other
.fMessage
)
291 TargetMessage
*GetMessage() const
296 TargetMessageHandle
&operator=(const TargetMessageHandle
&other
)
298 fMessage
= other
.fMessage
;
302 bool operator==(const TargetMessageHandle
&other
) const
304 return (fMessage
== other
.fMessage
);
307 bool operator!=(const TargetMessageHandle
&other
) const
309 return (fMessage
!= other
.fMessage
);
312 bool operator<(const TargetMessageHandle
&other
) const
314 bigtime_t timeout
= fMessage
->GetMessage()->TimeoutTime();
315 bigtime_t otherTimeout
= other
.fMessage
->GetMessage()->TimeoutTime();
316 if (timeout
< otherTimeout
)
318 if (timeout
> otherTimeout
)
320 return (fMessage
< other
.fMessage
);
324 TargetMessage
*fMessage
;
328 /*! \brief Represents a full target port, queuing the not yet delivered
331 A TargetPort internally queues TargetMessages in the order the are to be
332 delivered. Furthermore the object maintains an ordered set of
333 TargetMessages that can timeout (in ascending order of timeout time), so
334 that timed out messages can be dropped easily.
336 class MessageDeliverer::TargetPort
{
338 TargetPort(port_id portID
)
348 while (!fMessages
.IsEmpty())
352 port_id
PortID() const
357 status_t
PushMessage(Message
*message
, int32 token
)
359 PRINT("MessageDeliverer::TargetPort::PushMessage(port: %" B_PRId32
", %p, %"
360 B_PRId32
")\n", fPortID
, message
, token
);
361 // create a target message
362 TargetMessage
*targetMessage
363 = new(nothrow
) TargetMessage(message
, token
);
368 fMessages
.Insert(targetMessage
);
370 fMessageSize
+= targetMessage
->GetMessage()->DataSize();
372 // add it to the timeoutable messages, if it has a timeout
373 if (message
->HasTimeout())
374 fTimeoutableMessages
.insert(targetMessage
);
381 Message
*PeekMessage(int32
&token
) const
383 if (!fMessages
.Head())
386 token
= fMessages
.Head()->Token();
387 return fMessages
.Head()->GetMessage();
392 if (fMessages
.Head()) {
393 PRINT("MessageDeliverer::TargetPort::PopMessage(): port: %" B_PRId32
", %p\n",
394 fPortID
, fMessages
.Head()->GetMessage());
395 _RemoveMessage(fMessages
.Head());
399 void DropTimedOutMessages()
401 bigtime_t now
= system_time();
403 while (fTimeoutableMessages
.begin() != fTimeoutableMessages
.end()) {
404 TargetMessage
*message
= fTimeoutableMessages
.begin()->GetMessage();
405 if (message
->GetMessage()->TimeoutTime() > now
)
408 PRINT("MessageDeliverer::TargetPort::DropTimedOutMessages(): port: %" B_PRId32
409 ": message %p timed out\n", fPortID
, message
->GetMessage());
410 _RemoveMessage(message
);
416 return fMessages
.IsEmpty();
420 void _RemoveMessage(TargetMessage
*message
)
422 fMessages
.Remove(message
);
424 fMessageSize
-= message
->GetMessage()->DataSize();
426 if (message
->GetMessage()->HasTimeout())
427 fTimeoutableMessages
.erase(message
);
432 void _EnforceLimits()
435 while (fMessageCount
> kMaxMessagesPerPort
) {
436 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
437 ": hit maximum message count limit.\n", fPortID
);
442 while (fMessageSize
> kMaxDataPerPort
) {
443 PRINT("MessageDeliverer::TargetPort::_EnforceLimits(): port: %" B_PRId32
444 ": hit maximum message size limit.\n", fPortID
);
449 typedef DoublyLinkedList
<TargetMessage
> MessageList
;
452 MessageList fMessages
;
455 set
<TargetMessageHandle
> fTimeoutableMessages
;
459 struct MessageDeliverer::TargetPortMap
: public map
<port_id
, TargetPort
*> {
465 /*! \class MessageDeliverer
466 \brief Service for delivering messages, which retries the delivery as long
467 as the target port is full.
469 For the user of the service only the MessageDeliverer::DeliverMessage()
470 will be of interest. Some of them allow broadcasting a message to several
473 The class maintains a TargetPort for each target port which was full at the
474 time a message was to be delivered to it. A TargetPort has a queue of
475 undelivered messages. A separate worker thread retries periodically to send
476 the yet undelivered messages to the respective target ports.
480 MessageDeliverer::MessageDeliverer()
481 : fLock("message deliverer"),
483 fDelivererThread(-1),
489 MessageDeliverer::~MessageDeliverer()
493 if (fDelivererThread
>= 0) {
495 wait_for_thread(fDelivererThread
, &result
);
503 MessageDeliverer::Init()
505 // create the target port map
506 fTargetPorts
= new(nothrow
) TargetPortMap
;
510 // spawn the deliverer thread
511 fDelivererThread
= spawn_thread(MessageDeliverer::_DelivererThreadEntry
,
512 "message deliverer", B_NORMAL_PRIORITY
+ 1, this);
513 if (fDelivererThread
< 0)
514 return fDelivererThread
;
516 // resume the deliverer thread
517 resume_thread(fDelivererThread
);
524 MessageDeliverer::CreateDefault()
529 // create the deliverer
530 MessageDeliverer
*deliverer
= new(nothrow
) MessageDeliverer
;
535 status_t error
= deliverer
->Init();
541 sDeliverer
= deliverer
;
547 MessageDeliverer::DeleteDefault()
557 MessageDeliverer::Default()
563 /*! \brief Delivers a message to the supplied target.
565 The method tries to send the message right now (if there are not already
566 messages pending for the target port). If that fails due to a full target
567 port, the message is queued for later delivery.
569 \param message The message to be delivered.
570 \param target A BMessenger identifying the delivery target.
571 \param timeout If given, the message will be dropped, when it couldn't be
572 delivered after this amount of microseconds.
574 - \c B_OK, if sending the message succeeded or if the target port was
575 full and the message has been queued,
576 - another error code otherwise.
579 MessageDeliverer::DeliverMessage(BMessage
*message
, BMessenger target
,
582 SingleMessagingTargetSet
set(target
);
583 return DeliverMessage(message
, set
, timeout
);
587 /*! \brief Delivers a message to the supplied targets.
589 The method tries to send the message right now to each of the given targets
590 (if there are not already messages pending for a target port). If that
591 fails due to a full target port, the message is queued for later delivery.
593 \param message The message to be delivered.
594 \param targets MessagingTargetSet providing the the delivery targets.
595 \param timeout If given, the message will be dropped, when it couldn't be
596 delivered after this amount of microseconds.
598 - \c B_OK, if for each of the given targets sending the message succeeded
599 or if the target port was full and the message has been queued,
600 - another error code otherwise.
603 MessageDeliverer::DeliverMessage(BMessage
*message
, MessagingTargetSet
&targets
,
609 // flatten the message
611 status_t error
= message
->Flatten(&mallocIO
, NULL
);
615 return DeliverMessage(mallocIO
.Buffer(), mallocIO
.BufferLength(), targets
,
620 /*! \brief Delivers a flattened message to the supplied targets.
622 The method tries to send the message right now to each of the given targets
623 (if there are not already messages pending for a target port). If that
624 fails due to a full target port, the message is queued for later delivery.
626 \param message The flattened message to be delivered. This may be a
627 flattened BMessage or KMessage.
628 \param messageSize The size of the flattened message buffer.
629 \param targets MessagingTargetSet providing the the delivery targets.
630 \param timeout If given, the message will be dropped, when it couldn't be
631 delivered after this amount of microseconds.
633 - \c B_OK, if for each of the given targets sending the message succeeded
634 or if the target port was full and the message has been queued,
635 - another error code otherwise.
638 MessageDeliverer::DeliverMessage(const void *messageData
, int32 messageSize
,
639 MessagingTargetSet
&targets
, bigtime_t timeout
)
641 if (!messageData
|| messageSize
<= 0)
645 void *data
= malloc(messageSize
);
648 memcpy(data
, messageData
, messageSize
);
651 Message
*message
= new(nothrow
) Message(data
, messageSize
, timeout
);
656 BReference
<Message
> _(message
, true);
658 // add the message to the respective target ports
659 BAutolock
locker(fLock
);
660 for (int32 targetIndex
= 0; targets
.HasNext(); targetIndex
++) {
663 targets
.Next(portID
, token
);
665 // get the target port
666 TargetPort
*port
= _GetTargetPort(portID
, true);
670 // try sending the message, if there are no queued messages yet
671 if (port
->IsEmpty()) {
672 status_t error
= _SendMessage(message
, portID
, token
);
673 // if the message was delivered OK, we're done with the target
675 _PutTargetPort(port
);
679 // if the port is not full, but an error occurred, we skip this target
680 if (error
!= B_WOULD_BLOCK
) {
681 _PutTargetPort(port
);
682 if (targetIndex
== 0 && !targets
.HasNext())
689 status_t error
= port
->PushMessage(message
, token
);
690 _PutTargetPort(port
);
699 MessageDeliverer::TargetPort
*
700 MessageDeliverer::_GetTargetPort(port_id portID
, bool create
)
702 // get the port from the map
703 TargetPortMap::iterator it
= fTargetPorts
->find(portID
);
704 if (it
!= fTargetPorts
->end())
711 TargetPort
*port
= new(nothrow
) TargetPort(portID
);
714 (*fTargetPorts
)[portID
] = port
;
721 MessageDeliverer::_PutTargetPort(TargetPort
*port
)
726 if (port
->IsEmpty()) {
727 fTargetPorts
->erase(port
->PortID());
734 MessageDeliverer::_SendMessage(Message
*message
, port_id portID
, int32 token
)
736 status_t error
= BMessage::Private::SendFlattenedMessage(message
->Data(),
737 message
->DataSize(), portID
, token
, 0);
738 //PRINT("MessageDeliverer::_SendMessage(%p, port: %ld, token: %ld): %lx\n",
739 //message, portID, token, error);
743 // _DelivererThreadEntry
745 MessageDeliverer::_DelivererThreadEntry(void *data
)
747 return ((MessageDeliverer
*)data
)->_DelivererThread();
752 MessageDeliverer::_DelivererThread()
754 while (!fTerminating
) {
759 // iterate through all target ports and try sending the messages
761 for (TargetPortMap::iterator it
= fTargetPorts
->begin();
762 it
!= fTargetPorts
->end();) {
763 TargetPort
*port
= it
->second
;
764 bool portError
= false;
766 port
->DropTimedOutMessages();
768 // try sending all messages
770 while (Message
*message
= port
->PeekMessage(token
)) {
771 status_t error
= B_OK
;
772 // if (message->TimeoutTime() > system_time()) {
773 error
= _SendMessage(message
, port
->PortID(), token
);
775 // // timeout, drop message
776 // PRINT("MessageDeliverer::_DelivererThread(): port %ld, "
777 // "message %p timed out\n", port->PortID(), message);
782 } else if (error
== B_WOULD_BLOCK
) {
783 // no luck yet -- port is still full
786 // unexpected error -- probably the port is gone
793 if (portError
|| port
->IsEmpty()) {
794 TargetPortMap::iterator oldIt
= it
;
797 fTargetPorts
->erase(oldIt
);