2 * Copyright 2015, Haiku.
3 * Distributed under the terms of the MIT License.
6 * Joseph Groover <looncraz@looncraz.net>
10 #include "DelayedMessage.h"
19 #include <LinkSender.h>
20 #include <ServerProtocol.h>
23 // DelayedMessageSender constants
24 static const int32 kWakeupMessage
= AS_LAST_CODE
+ 2048;
25 static const int32 kExitMessage
= kWakeupMessage
+ 1;
27 static const char* kName
= "DMT is here for you, eventually...";
28 static int32 kPriority
= B_URGENT_DISPLAY_PRIORITY
;
29 static int32 kPortCapacity
= 10;
32 //! Data attachment structure.
34 Attachment(const void* data
, size_t size
);
37 const void* constData
;
43 typedef BObjectList
<Attachment
> AttachmentList
;
46 /*! \class ScheduledMessage
47 \brief Responsible for sending of delayed message.
49 class ScheduledMessage
{
51 ScheduledMessage(DelayedMessage
& message
);
54 int32
CountTargets() const;
57 bigtime_t
ScheduledTime() const;
60 bool Merge(DelayedMessage
& message
);
62 status_t
SendMessageToPort(port_id port
);
63 bool operator<(const ScheduledMessage
& other
) const;
65 DelayedMessageData
* fData
;
69 /*! \class DelayedMessageSender DelayedMessageSender.h
70 \brief Responsible for scheduling and sending of delayed messages
72 class DelayedMessageSender
{
74 explicit DelayedMessageSender();
75 ~DelayedMessageSender();
77 status_t
ScheduleMessage (DelayedMessage
& message
);
79 int32
CountDelayedMessages() const;
80 int64
CountSentMessages() const;
84 int32
_SendDelayedMessages();
85 static int32
_thread_func(void* sender
);
86 void _Wakeup(bigtime_t whatTime
);
89 typedef BObjectList
<ScheduledMessage
> ScheduledList
;
91 mutable BLocker fLock
;
92 ScheduledList fMessages
;
94 bigtime_t fScheduledWakeup
;
100 mutable int64 fSentCount
;
104 DelayedMessageSender gDelayedMessageSender
;
107 /*! \class DelayedMessageData DelayedMessageSender.h
108 \brief Owns DelayedMessage data, allocates memory and copies data only
111 class DelayedMessageData
{
112 typedef BObjectList
<port_id
> PortList
;
113 typedef void(*FailureCallback
)(int32 code
, port_id port
, void* data
);
115 DelayedMessageData(int32 code
, bigtime_t delay
,
116 bool isSpecificTime
);
117 ~DelayedMessageData();
119 bool AddTarget(port_id port
);
120 void RemoveTarget(port_id port
);
121 int32
CountTargets() const;
123 void MergeTargets(DelayedMessageData
* other
);
126 bool MergeData(DelayedMessageData
* other
);
128 bool IsValid() const;
129 // Only valid after a successful CopyData().
131 status_t
Attach(const void* data
, size_t size
);
133 bool Compare(Attachment
* one
, Attachment
* two
,
136 void SetMerge(DMMergeMode mode
, uint32 mask
);
137 void SendFailed(port_id port
);
139 void SetFailureCallback(FailureCallback callback
,
143 int32
& Code() {return fCode
;}
144 const int32
& Code() const {return fCode
;}
146 bigtime_t
& ScheduledTime() {return fScheduledTime
;}
147 const bigtime_t
& ScheduledTime() const {return fScheduledTime
;}
149 AttachmentList
& Attachments() {return fAttachments
;}
150 const AttachmentList
& Attachments() const {return fAttachments
;}
152 PortList
& Targets() {return fTargets
;}
153 const PortList
& Targets() const {return fTargets
;}
159 bigtime_t fScheduledTime
;
162 AttachmentList fAttachments
;
165 DMMergeMode fMergeMode
;
168 FailureCallback fFailureCallback
;
177 DelayedMessage::DelayedMessage(int32 code
, bigtime_t delay
,
180 fData(new(std::nothrow
) DelayedMessageData(code
, delay
< DM_MINIMUM_DELAY
181 ? DM_MINIMUM_DELAY
: delay
, isSpecificTime
)),
187 DelayedMessage::~DelayedMessage()
189 // Message is canceled without a handoff.
196 DelayedMessage::AddTarget(port_id port
)
198 if (fData
== NULL
|| fHandedOff
)
201 return fData
->AddTarget(port
);
206 DelayedMessage::SetMerge(DMMergeMode mode
, uint32 match
)
208 if (fData
== NULL
|| fHandedOff
)
211 fData
->SetMerge(mode
, match
);
216 DelayedMessage::SetFailureCallback(void (*callback
)(int32
, port_id
, void*),
219 if (fData
== NULL
|| fHandedOff
)
222 fData
->SetFailureCallback(callback
, data
);
226 //! Attach data to message. Memory is not allocated nor copied until handoff.
228 DelayedMessage::Attach(const void* data
, size_t size
)
236 if (data
== NULL
|| size
== 0)
239 return fData
->Attach(data
, size
);
244 DelayedMessage::Flush()
252 if (fData
->CountTargets() == 0)
255 return gDelayedMessageSender
.ScheduleMessage(*this);
259 /*! The data handoff occurs upon scheduling and reduces copies to only
260 when a message is actually scheduled. Canceled messages have low cost.
263 DelayedMessage::HandOff()
265 if (fData
== NULL
|| fHandedOff
)
268 if (fData
->CopyData()) {
280 Attachment::Attachment(const void* _data
, size_t _size
)
289 Attachment::~Attachment()
298 DelayedMessageData::DelayedMessageData(int32 code
, bigtime_t delay
,
302 fScheduledTime(delay
+ isSpecificTime
? 0 : system_time()),
305 fAttachments(3, true),
308 fMergeMode(DM_NO_MERGE
),
309 fMergeMask(DM_DATA_DEFAULT
),
311 fFailureCallback(NULL
),
317 DelayedMessageData::~DelayedMessageData()
323 DelayedMessageData::AddTarget(port_id port
)
328 // check for duplicates:
329 for (int32 index
= 0; index
< fTargets
.CountItems(); ++index
) {
330 if (port
== *fTargets
.ItemAt(index
))
334 return fTargets
.AddItem(new(std::nothrow
) port_id(port
));
339 DelayedMessageData::RemoveTarget(port_id port
)
341 if (port
== B_BAD_PORT_ID
)
344 // Search for a match by value.
345 for (int32 index
= 0; index
< fTargets
.CountItems(); ++index
) {
346 port_id
* target
= fTargets
.ItemAt(index
);
347 if (port
== *target
) {
348 fTargets
.RemoveItem(target
, true);
356 DelayedMessageData::CountTargets() const
358 return fTargets
.CountItems();
363 DelayedMessageData::MergeTargets(DelayedMessageData
* other
)
365 // Failure to add one target does not abort the loop!
366 // It could just mean we already have the target.
367 for (int32 index
= 0; index
< other
->fTargets
.CountItems(); ++index
)
368 AddTarget(*(other
->fTargets
.ItemAt(index
)));
372 //! Copy data from original location - merging failed
374 DelayedMessageData::CopyData()
376 Attachment
* attached
= NULL
;
378 for (int32 index
= 0; index
< fAttachments
.CountItems(); ++index
) {
379 attached
= fAttachments
.ItemAt(index
);
381 if (attached
== NULL
|| attached
->data
!= NULL
)
384 attached
->data
= malloc(attached
->size
);
385 if (attached
->data
== NULL
)
388 memcpy(attached
->data
, attached
->constData
, attached
->size
);
397 DelayedMessageData::MergeData(DelayedMessageData
* other
)
401 || other
->fCode
!= fCode
402 || fMergeMode
== DM_NO_MERGE
403 || other
->fMergeMode
== DM_NO_MERGE
404 || other
->fMergeMode
!= fMergeMode
405 || other
->fAttachments
.CountItems() != fAttachments
.CountItems())
408 if (other
->fMergeMode
== DM_MERGE_CANCEL
) {
414 Attachment
* attached
= NULL
;
415 Attachment
* otherAttached
= NULL
;
417 for (int32 index
= 0; index
< fAttachments
.CountItems(); ++index
) {
418 attached
= fAttachments
.ItemAt(index
);
419 otherAttached
= other
->fAttachments
.ItemAt(index
);
422 || otherAttached
== NULL
423 || attached
->data
== NULL
424 || otherAttached
->constData
== NULL
425 || attached
->size
!= otherAttached
->size
)
428 // Compares depending upon mode & flags
429 if (!Compare(attached
, otherAttached
, index
))
433 // add any targets not included in the existing message!
436 // since these are duplicates, we need not copy anything...
437 if (fMergeMode
== DM_MERGE_DUPLICATES
)
442 // Import the new data!
443 for (int32 index
= 0; index
< fAttachments
.CountItems(); ++index
) {
444 attached
= fAttachments
.ItemAt(index
);
445 otherAttached
= other
->fAttachments
.ItemAt(index
);
447 // We already have allocated our memory, but the other data
448 // has not. So this reduces memory allocations.
449 memcpy(attached
->data
, otherAttached
->constData
, attached
->size
);
457 DelayedMessageData::IsValid() const
464 DelayedMessageData::Attach(const void* data
, size_t size
)
466 // Sanity checking already performed
467 Attachment
* attach
= new(std::nothrow
) Attachment(data
, size
);
472 if (fAttachments
.AddItem(attach
) == false) {
482 DelayedMessageData::Compare(Attachment
* one
, Attachment
* two
, int32 index
)
484 if (fMergeMode
== DM_MERGE_DUPLICATES
) {
486 // Default-policy: all data must match
487 if (fMergeMask
== DM_DATA_DEFAULT
|| (fMergeMask
& 1 << index
) != 0)
488 return memcmp(one
->data
, two
->constData
, one
->size
) == 0;
490 } else if (fMergeMode
== DM_MERGE_REPLACE
) {
492 // Default Policy: no data needs to match
493 if (fMergeMask
!= DM_DATA_DEFAULT
&& (fMergeMask
& 1 << index
) != 0)
494 return memcmp(one
->data
, two
->constData
, one
->size
) == 0;
502 DelayedMessageData::SetMerge(DMMergeMode mode
, uint32 mask
)
510 DelayedMessageData::SendFailed(port_id port
)
512 if (fFailureCallback
!= NULL
)
513 fFailureCallback(fCode
, port
, fFailureData
);
518 DelayedMessageData::SetFailureCallback(FailureCallback callback
, void* data
)
520 fFailureCallback
= callback
;
528 ScheduledMessage::ScheduledMessage(DelayedMessage
& message
)
530 fData(message
.HandOff())
535 ScheduledMessage::~ScheduledMessage()
542 ScheduledMessage::CountTargets() const
547 return fData
->CountTargets();
552 ScheduledMessage::ScheduledTime() const
557 return fData
->ScheduledTime();
561 //! Send our message and data to their intended target(s)
563 ScheduledMessage::SendMessage()
565 if (fData
== NULL
|| !fData
->IsValid())
569 for (int32 index
= 0; index
< fData
->Targets().CountItems(); ++index
) {
570 port_id port
= *(fData
->Targets().ItemAt(index
));
571 status_t error
= SendMessageToPort(port
);
578 if (error
!= B_TIMED_OUT
)
579 fData
->SendFailed(port
);
587 ScheduledMessage::SendMessageToPort(port_id port
)
589 if (fData
== NULL
|| !fData
->IsValid())
592 if (port
== B_BAD_PORT_ID
)
595 BPrivate::LinkSender
sender(port
);
596 if (sender
.StartMessage(fData
->Code()) != B_OK
)
599 AttachmentList
& list
= fData
->Attachments();
600 Attachment
* attached
= NULL
;
601 status_t error
= B_OK
;
603 // The data has been checked already, so we assume it is all good
604 for (int32 index
= 0; index
< list
.CountItems(); ++index
) {
605 attached
= list
.ItemAt(index
);
607 error
= sender
.Attach(attached
->data
, attached
->size
);
609 sender
.CancelMessage();
614 // We do not want to ever hold up the sender thread for too long, we
615 // set a 1 second sending delay, which should be more than enough for
616 // 99.992% of all cases. Approximately.
617 error
= sender
.Flush(1000000);
619 if (error
== B_OK
|| error
== B_BAD_PORT_ID
)
620 fData
->RemoveTarget(port
);
627 ScheduledMessage::IsValid() const
629 return fData
!= NULL
&& fData
->IsValid();
634 ScheduledMessage::Merge(DelayedMessage
& other
)
639 return fData
->MergeData(other
.Data());
644 ScheduledMessage::operator<(const ScheduledMessage
& other
) const
646 if (!IsValid() || !other
.IsValid())
649 return fData
->ScheduledTime() < other
.fData
->ScheduledTime();
654 CompareMessages(const ScheduledMessage
* one
, const ScheduledMessage
* two
)
663 DelayedMessageSender::DelayedMessageSender()
665 fLock("DelayedMessageSender"),
667 fScheduledWakeup(B_INFINITE_TIMEOUT
),
669 fThread(spawn_thread(&_thread_func
, kName
, kPriority
, this)),
670 fPort(create_port(kPortCapacity
, "DelayedMessageSender")),
673 resume_thread(fThread
);
677 DelayedMessageSender::~DelayedMessageSender()
679 // write the exit message to our port
680 write_port(fPort
, kExitMessage
, NULL
, 0);
682 status_t status
= B_OK
;
683 while (wait_for_thread(fThread
, &status
) == B_OK
);
685 // We now know the thread has exited, it is safe to cleanup
691 DelayedMessageSender::ScheduleMessage(DelayedMessage
& message
)
695 // Can we merge with a pending message?
696 ScheduledMessage
* pending
= NULL
;
697 for (int32 index
= 0; index
< fMessages
.CountItems(); ++index
) {
698 pending
= fMessages
.ItemAt(index
);
699 if (pending
->Merge(message
))
703 // Guess not, add it to our list!
704 ScheduledMessage
* scheduled
= new(std::nothrow
) ScheduledMessage(message
);
706 if (scheduled
== NULL
)
709 if (!scheduled
->IsValid()) {
714 if (fMessages
.AddItem(scheduled
)) {
715 fMessages
.SortItems(&CompareMessages
);
716 _Wakeup(scheduled
->ScheduledTime());
725 DelayedMessageSender::CountDelayedMessages() const
728 return fMessages
.CountItems();
733 DelayedMessageSender::CountSentMessages() const
735 return atomic_get64(&fSentCount
);
740 DelayedMessageSender::_MessageLoop()
743 status_t status
= B_TIMED_OUT
;
744 bigtime_t timeout
= B_INFINITE_TIMEOUT
;
747 timeout
= atomic_get64(&fScheduledWakeup
) - (system_time()
748 + (DM_MINIMUM_DELAY
/ 2));
750 if (timeout
> DM_MINIMUM_DELAY
/ 4) {
751 status
= read_port_etc(fPort
, &code
, NULL
, 0, B_RELATIVE_TIMEOUT
,
754 status
= B_TIMED_OUT
;
756 if (status
== B_INTERRUPTED
)
759 if (status
== B_TIMED_OUT
) {
760 _SendDelayedMessages();
764 if (status
== B_OK
) {
772 // TODO: trace unhandled messages
786 DelayedMessageSender::_thread_func(void* sender
)
788 (static_cast<DelayedMessageSender
*>(sender
))->_MessageLoop();
793 //! Sends pending messages, call ONLY from sender thread!
795 DelayedMessageSender::_SendDelayedMessages()
797 // avoid sending messages during times of contention
798 if (fLock
.LockWithTimeout(30000) != B_OK
) {
799 atomic_add64(&fScheduledWakeup
, DM_MINIMUM_DELAY
);
803 atomic_set64(&fScheduledWakeup
, B_INFINITE_TIMEOUT
);
805 if (fMessages
.CountItems() == 0) {
812 bigtime_t time
= system_time() + DM_MINIMUM_DELAY
/ 2;
813 // capture any that may be on the verge of being sent.
815 BObjectList
<ScheduledMessage
> remove
;
817 ScheduledMessage
* message
= NULL
;
818 for (int32 index
= 0; index
< fMessages
.CountItems(); ++index
) {
819 message
= fMessages
.ItemAt(index
);
821 if (message
->ScheduledTime() > time
) {
822 atomic_set64(&fScheduledWakeup
, message
->ScheduledTime());
826 int32 sendCount
= message
->SendMessage();
830 if (message
->CountTargets() == 0)
831 remove
.AddItem(message
);
834 // remove serviced messages
835 for (int32 index
= 0; index
< remove
.CountItems(); ++index
)
836 fMessages
.RemoveItem(remove
.ItemAt(index
));
838 atomic_add64(&fSentCount
, sent
);
840 // catch any partly-failed messages (possibly late):
841 if (fMessages
.CountItems() > 0
842 && atomic_get64(&fScheduledWakeup
) == B_INFINITE_TIMEOUT
) {
844 fMessages
.SortItems(&CompareMessages
);
845 message
= fMessages
.ItemAt(0);
846 bigtime_t timeout
= message
->ScheduledTime() - time
;
849 timeout
= DM_MINIMUM_DELAY
;
851 atomic_set64(&fScheduledWakeup
, timeout
);
860 DelayedMessageSender::_Wakeup(bigtime_t when
)
862 if (atomic_get64(&fScheduledWakeup
) < when
863 && atomic_get(&fWakeupRetry
) == 0)
866 atomic_set64(&fScheduledWakeup
, when
);
868 BPrivate::LinkSender
sender(fPort
);
869 sender
.StartMessage(kWakeupMessage
);
870 status_t error
= sender
.Flush(30000);
871 atomic_set(&fWakeupRetry
, (int32
)error
== B_TIMED_OUT
);