Make UEFI boot-platform build again
[haiku.git] / src / servers / app / DelayedMessage.cpp
blob634fda67bac510ea252e9affcafcb1b43cce3bb4
1 /*
2 * Copyright 2015, Haiku.
3 * Distributed under the terms of the MIT License.
5 * Authors:
6 * Joseph Groover <looncraz@looncraz.net>
7 */
10 #include "DelayedMessage.h"
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
16 #include <Autolock.h>
17 #include <String.h>
19 #include <LinkSender.h>
20 #include <ServerProtocol.h>
23 // DelayedMessageSender constants
24 static const int32 kWakeupMessage = AS_LAST_CODE + 2048;
25 static const int32 kExitMessage = kWakeupMessage + 1;
27 static const char* kName = "DMT is here for you, eventually...";
28 static int32 kPriority = B_URGENT_DISPLAY_PRIORITY;
29 static int32 kPortCapacity = 10;
32 //! Data attachment structure.
33 struct Attachment {
34 Attachment(const void* data, size_t size);
35 ~Attachment();
37 const void* constData;
38 void* data;
39 size_t size;
43 typedef BObjectList<Attachment> AttachmentList;
46 /*! \class ScheduledMessage
47 \brief Responsible for sending of delayed message.
49 class ScheduledMessage {
50 public:
51 ScheduledMessage(DelayedMessage& message);
52 ~ScheduledMessage();
54 int32 CountTargets() const;
56 void Finalize();
57 bigtime_t ScheduledTime() const;
58 int32 SendMessage();
59 bool IsValid() const;
60 bool Merge(DelayedMessage& message);
62 status_t SendMessageToPort(port_id port);
63 bool operator<(const ScheduledMessage& other) const;
65 DelayedMessageData* fData;
69 /*! \class DelayedMessageSender DelayedMessageSender.h
70 \brief Responsible for scheduling and sending of delayed messages
72 class DelayedMessageSender {
73 public:
74 explicit DelayedMessageSender();
75 ~DelayedMessageSender();
77 status_t ScheduleMessage (DelayedMessage& message);
79 int32 CountDelayedMessages() const;
80 int64 CountSentMessages() const;
82 private:
83 void _MessageLoop();
84 int32 _SendDelayedMessages();
85 static int32 _thread_func(void* sender);
86 void _Wakeup(bigtime_t whatTime);
88 private:
89 typedef BObjectList<ScheduledMessage> ScheduledList;
91 mutable BLocker fLock;
92 ScheduledList fMessages;
94 bigtime_t fScheduledWakeup;
96 int32 fWakeupRetry;
97 thread_id fThread;
98 port_id fPort;
100 mutable int64 fSentCount;
104 DelayedMessageSender gDelayedMessageSender;
107 /*! \class DelayedMessageData DelayedMessageSender.h
108 \brief Owns DelayedMessage data, allocates memory and copies data only
109 when needed,
111 class DelayedMessageData {
112 typedef BObjectList<port_id> PortList;
113 typedef void(*FailureCallback)(int32 code, port_id port, void* data);
114 public:
115 DelayedMessageData(int32 code, bigtime_t delay,
116 bool isSpecificTime);
117 ~DelayedMessageData();
119 bool AddTarget(port_id port);
120 void RemoveTarget(port_id port);
121 int32 CountTargets() const;
123 void MergeTargets(DelayedMessageData* other);
125 bool CopyData();
126 bool MergeData(DelayedMessageData* other);
128 bool IsValid() const;
129 // Only valid after a successful CopyData().
131 status_t Attach(const void* data, size_t size);
133 bool Compare(Attachment* one, Attachment* two,
134 int32 index);
136 void SetMerge(DMMergeMode mode, uint32 mask);
137 void SendFailed(port_id port);
139 void SetFailureCallback(FailureCallback callback,
140 void* data);
142 // Accessors.
143 int32& Code() {return fCode;}
144 const int32& Code() const {return fCode;}
146 bigtime_t& ScheduledTime() {return fScheduledTime;}
147 const bigtime_t& ScheduledTime() const {return fScheduledTime;}
149 AttachmentList& Attachments() {return fAttachments;}
150 const AttachmentList& Attachments() const {return fAttachments;}
152 PortList& Targets() {return fTargets;}
153 const PortList& Targets() const {return fTargets;}
155 private:
156 // Data members.
158 int32 fCode;
159 bigtime_t fScheduledTime;
160 bool fValid;
162 AttachmentList fAttachments;
163 PortList fTargets;
165 DMMergeMode fMergeMode;
166 uint32 fMergeMask;
168 FailureCallback fFailureCallback;
169 void* fFailureData;
173 // #pragma mark -
177 DelayedMessage::DelayedMessage(int32 code, bigtime_t delay,
178 bool isSpecificTime)
180 fData(new(std::nothrow) DelayedMessageData(code, delay < DM_MINIMUM_DELAY
181 ? DM_MINIMUM_DELAY : delay, isSpecificTime)),
182 fHandedOff(false)
187 DelayedMessage::~DelayedMessage()
189 // Message is canceled without a handoff.
190 if (!fHandedOff)
191 delete fData;
195 bool
196 DelayedMessage::AddTarget(port_id port)
198 if (fData == NULL || fHandedOff)
199 return false;
201 return fData->AddTarget(port);
205 void
206 DelayedMessage::SetMerge(DMMergeMode mode, uint32 match)
208 if (fData == NULL || fHandedOff)
209 return;
211 fData->SetMerge(mode, match);
215 void
216 DelayedMessage::SetFailureCallback(void (*callback)(int32, port_id, void*),
217 void* data)
219 if (fData == NULL || fHandedOff)
220 return;
222 fData->SetFailureCallback(callback, data);
226 //! Attach data to message. Memory is not allocated nor copied until handoff.
227 status_t
228 DelayedMessage::Attach(const void* data, size_t size)
230 if (fData == NULL)
231 return B_NO_MEMORY;
233 if (fHandedOff)
234 return B_ERROR;
236 if (data == NULL || size == 0)
237 return B_BAD_VALUE;
239 return fData->Attach(data, size);
243 status_t
244 DelayedMessage::Flush()
246 if (fData == NULL)
247 return B_NO_MEMORY;
249 if (fHandedOff)
250 return B_ERROR;
252 if (fData->CountTargets() == 0)
253 return B_BAD_VALUE;
255 return gDelayedMessageSender.ScheduleMessage(*this);
259 /*! The data handoff occurs upon scheduling and reduces copies to only
260 when a message is actually scheduled. Canceled messages have low cost.
262 DelayedMessageData*
263 DelayedMessage::HandOff()
265 if (fData == NULL || fHandedOff)
266 return NULL;
268 if (fData->CopyData()) {
269 fHandedOff = true;
270 return fData;
273 return NULL;
277 // #pragma mark -
280 Attachment::Attachment(const void* _data, size_t _size)
282 constData(_data),
283 data(NULL),
284 size(_size)
289 Attachment::~Attachment()
291 free(data);
295 // #pragma mark -
298 DelayedMessageData::DelayedMessageData(int32 code, bigtime_t delay,
299 bool isSpecificTime)
301 fCode(code),
302 fScheduledTime(delay + isSpecificTime ? 0 : system_time()),
303 fValid(false),
305 fAttachments(3, true),
306 fTargets(4, true),
308 fMergeMode(DM_NO_MERGE),
309 fMergeMask(DM_DATA_DEFAULT),
311 fFailureCallback(NULL),
312 fFailureData(NULL)
317 DelayedMessageData::~DelayedMessageData()
322 bool
323 DelayedMessageData::AddTarget(port_id port)
325 if (port <= 0)
326 return false;
328 // check for duplicates:
329 for (int32 index = 0; index < fTargets.CountItems(); ++index) {
330 if (port == *fTargets.ItemAt(index))
331 return false;
334 return fTargets.AddItem(new(std::nothrow) port_id(port));
338 void
339 DelayedMessageData::RemoveTarget(port_id port)
341 if (port == B_BAD_PORT_ID)
342 return;
344 // Search for a match by value.
345 for (int32 index = 0; index < fTargets.CountItems(); ++index) {
346 port_id* target = fTargets.ItemAt(index);
347 if (port == *target) {
348 fTargets.RemoveItem(target, true);
349 return;
355 int32
356 DelayedMessageData::CountTargets() const
358 return fTargets.CountItems();
362 void
363 DelayedMessageData::MergeTargets(DelayedMessageData* other)
365 // Failure to add one target does not abort the loop!
366 // It could just mean we already have the target.
367 for (int32 index = 0; index < other->fTargets.CountItems(); ++index)
368 AddTarget(*(other->fTargets.ItemAt(index)));
372 //! Copy data from original location - merging failed
373 bool
374 DelayedMessageData::CopyData()
376 Attachment* attached = NULL;
378 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
379 attached = fAttachments.ItemAt(index);
381 if (attached == NULL || attached->data != NULL)
382 return false;
384 attached->data = malloc(attached->size);
385 if (attached->data == NULL)
386 return false;
388 memcpy(attached->data, attached->constData, attached->size);
391 fValid = true;
392 return true;
396 bool
397 DelayedMessageData::MergeData(DelayedMessageData* other)
399 if (!fValid
400 || other == NULL
401 || other->fCode != fCode
402 || fMergeMode == DM_NO_MERGE
403 || other->fMergeMode == DM_NO_MERGE
404 || other->fMergeMode != fMergeMode
405 || other->fAttachments.CountItems() != fAttachments.CountItems())
406 return false;
408 if (other->fMergeMode == DM_MERGE_CANCEL) {
409 MergeTargets(other);
410 return true;
413 // Compare data
414 Attachment* attached = NULL;
415 Attachment* otherAttached = NULL;
417 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
418 attached = fAttachments.ItemAt(index);
419 otherAttached = other->fAttachments.ItemAt(index);
421 if (attached == NULL
422 || otherAttached == NULL
423 || attached->data == NULL
424 || otherAttached->constData == NULL
425 || attached->size != otherAttached->size)
426 return false;
428 // Compares depending upon mode & flags
429 if (!Compare(attached, otherAttached, index))
430 return false;
433 // add any targets not included in the existing message!
434 MergeTargets(other);
436 // since these are duplicates, we need not copy anything...
437 if (fMergeMode == DM_MERGE_DUPLICATES)
438 return true;
440 // DM_MERGE_REPLACE:
442 // Import the new data!
443 for (int32 index = 0; index < fAttachments.CountItems(); ++index) {
444 attached = fAttachments.ItemAt(index);
445 otherAttached = other->fAttachments.ItemAt(index);
447 // We already have allocated our memory, but the other data
448 // has not. So this reduces memory allocations.
449 memcpy(attached->data, otherAttached->constData, attached->size);
452 return true;
456 bool
457 DelayedMessageData::IsValid() const
459 return fValid;
463 status_t
464 DelayedMessageData::Attach(const void* data, size_t size)
466 // Sanity checking already performed
467 Attachment* attach = new(std::nothrow) Attachment(data, size);
469 if (attach == NULL)
470 return B_NO_MEMORY;
472 if (fAttachments.AddItem(attach) == false) {
473 delete attach;
474 return B_ERROR;
477 return B_OK;
481 bool
482 DelayedMessageData::Compare(Attachment* one, Attachment* two, int32 index)
484 if (fMergeMode == DM_MERGE_DUPLICATES) {
486 // Default-policy: all data must match
487 if (fMergeMask == DM_DATA_DEFAULT || (fMergeMask & 1 << index) != 0)
488 return memcmp(one->data, two->constData, one->size) == 0;
490 } else if (fMergeMode == DM_MERGE_REPLACE) {
492 // Default Policy: no data needs to match
493 if (fMergeMask != DM_DATA_DEFAULT && (fMergeMask & 1 << index) != 0)
494 return memcmp(one->data, two->constData, one->size) == 0;
497 return true;
501 void
502 DelayedMessageData::SetMerge(DMMergeMode mode, uint32 mask)
504 fMergeMode = mode;
505 fMergeMask = mask;
509 void
510 DelayedMessageData::SendFailed(port_id port)
512 if (fFailureCallback != NULL)
513 fFailureCallback(fCode, port, fFailureData);
517 void
518 DelayedMessageData::SetFailureCallback(FailureCallback callback, void* data)
520 fFailureCallback = callback;
521 fFailureData = data;
525 // #pragma mark -
528 ScheduledMessage::ScheduledMessage(DelayedMessage& message)
530 fData(message.HandOff())
535 ScheduledMessage::~ScheduledMessage()
537 delete fData;
541 int32
542 ScheduledMessage::CountTargets() const
544 if (fData == NULL)
545 return 0;
547 return fData->CountTargets();
551 bigtime_t
552 ScheduledMessage::ScheduledTime() const
554 if (fData == NULL)
555 return 0;
557 return fData->ScheduledTime();
561 //! Send our message and data to their intended target(s)
562 int32
563 ScheduledMessage::SendMessage()
565 if (fData == NULL || !fData->IsValid())
566 return 0;
568 int32 sent = 0;
569 for (int32 index = 0; index < fData->Targets().CountItems(); ++index) {
570 port_id port = *(fData->Targets().ItemAt(index));
571 status_t error = SendMessageToPort(port);
573 if (error == B_OK) {
574 ++sent;
575 continue;
578 if (error != B_TIMED_OUT)
579 fData->SendFailed(port);
582 return sent;
586 status_t
587 ScheduledMessage::SendMessageToPort(port_id port)
589 if (fData == NULL || !fData->IsValid())
590 return B_BAD_DATA;
592 if (port == B_BAD_PORT_ID)
593 return B_BAD_VALUE;
595 BPrivate::LinkSender sender(port);
596 if (sender.StartMessage(fData->Code()) != B_OK)
597 return B_ERROR;
599 AttachmentList& list = fData->Attachments();
600 Attachment* attached = NULL;
601 status_t error = B_OK;
603 // The data has been checked already, so we assume it is all good
604 for (int32 index = 0; index < list.CountItems(); ++index) {
605 attached = list.ItemAt(index);
607 error = sender.Attach(attached->data, attached->size);
608 if (error != B_OK) {
609 sender.CancelMessage();
610 return error;
614 // We do not want to ever hold up the sender thread for too long, we
615 // set a 1 second sending delay, which should be more than enough for
616 // 99.992% of all cases. Approximately.
617 error = sender.Flush(1000000);
619 if (error == B_OK || error == B_BAD_PORT_ID)
620 fData->RemoveTarget(port);
622 return error;
626 bool
627 ScheduledMessage::IsValid() const
629 return fData != NULL && fData->IsValid();
633 bool
634 ScheduledMessage::Merge(DelayedMessage& other)
636 if (!IsValid())
637 return false;
639 return fData->MergeData(other.Data());
643 bool
644 ScheduledMessage::operator<(const ScheduledMessage& other) const
646 if (!IsValid() || !other.IsValid())
647 return false;
649 return fData->ScheduledTime() < other.fData->ScheduledTime();
654 CompareMessages(const ScheduledMessage* one, const ScheduledMessage* two)
656 return *one < *two;
660 // #pragma mark -
663 DelayedMessageSender::DelayedMessageSender()
665 fLock("DelayedMessageSender"),
666 fMessages(20, true),
667 fScheduledWakeup(B_INFINITE_TIMEOUT),
668 fWakeupRetry(0),
669 fThread(spawn_thread(&_thread_func, kName, kPriority, this)),
670 fPort(create_port(kPortCapacity, "DelayedMessageSender")),
671 fSentCount(0)
673 resume_thread(fThread);
677 DelayedMessageSender::~DelayedMessageSender()
679 // write the exit message to our port
680 write_port(fPort, kExitMessage, NULL, 0);
682 status_t status = B_OK;
683 while (wait_for_thread(fThread, &status) == B_OK);
685 // We now know the thread has exited, it is safe to cleanup
686 delete_port(fPort);
690 status_t
691 DelayedMessageSender::ScheduleMessage(DelayedMessage& message)
693 BAutolock _(fLock);
695 // Can we merge with a pending message?
696 ScheduledMessage* pending = NULL;
697 for (int32 index = 0; index < fMessages.CountItems(); ++index) {
698 pending = fMessages.ItemAt(index);
699 if (pending->Merge(message))
700 return B_OK;
703 // Guess not, add it to our list!
704 ScheduledMessage* scheduled = new(std::nothrow) ScheduledMessage(message);
706 if (scheduled == NULL)
707 return B_NO_MEMORY;
709 if (!scheduled->IsValid()) {
710 delete scheduled;
711 return B_BAD_DATA;
714 if (fMessages.AddItem(scheduled)) {
715 fMessages.SortItems(&CompareMessages);
716 _Wakeup(scheduled->ScheduledTime());
717 return B_OK;
720 return B_ERROR;
724 int32
725 DelayedMessageSender::CountDelayedMessages() const
727 BAutolock _(fLock);
728 return fMessages.CountItems();
732 int64
733 DelayedMessageSender::CountSentMessages() const
735 return atomic_get64(&fSentCount);
739 void
740 DelayedMessageSender::_MessageLoop()
742 int32 code = -1;
743 status_t status = B_TIMED_OUT;
744 bigtime_t timeout = B_INFINITE_TIMEOUT;
746 while (true) {
747 timeout = atomic_get64(&fScheduledWakeup) - (system_time()
748 + (DM_MINIMUM_DELAY / 2));
750 if (timeout > DM_MINIMUM_DELAY / 4) {
751 status = read_port_etc(fPort, &code, NULL, 0, B_RELATIVE_TIMEOUT,
752 timeout);
753 } else
754 status = B_TIMED_OUT;
756 if (status == B_INTERRUPTED)
757 continue;
759 if (status == B_TIMED_OUT) {
760 _SendDelayedMessages();
761 continue;
764 if (status == B_OK) {
765 switch (code) {
766 case kWakeupMessage:
767 continue;
769 case kExitMessage:
770 return;
772 // TODO: trace unhandled messages
773 default:
774 continue;
778 // port deleted?
779 if (status < B_OK)
780 break;
785 int32
786 DelayedMessageSender::_thread_func(void* sender)
788 (static_cast<DelayedMessageSender*>(sender))->_MessageLoop();
789 return 0;
793 //! Sends pending messages, call ONLY from sender thread!
794 int32
795 DelayedMessageSender::_SendDelayedMessages()
797 // avoid sending messages during times of contention
798 if (fLock.LockWithTimeout(30000) != B_OK) {
799 atomic_add64(&fScheduledWakeup, DM_MINIMUM_DELAY);
800 return 0;
803 atomic_set64(&fScheduledWakeup, B_INFINITE_TIMEOUT);
805 if (fMessages.CountItems() == 0) {
806 fLock.Unlock();
807 return 0;
810 int32 sent = 0;
812 bigtime_t time = system_time() + DM_MINIMUM_DELAY / 2;
813 // capture any that may be on the verge of being sent.
815 BObjectList<ScheduledMessage> remove;
817 ScheduledMessage* message = NULL;
818 for (int32 index = 0; index < fMessages.CountItems(); ++index) {
819 message = fMessages.ItemAt(index);
821 if (message->ScheduledTime() > time) {
822 atomic_set64(&fScheduledWakeup, message->ScheduledTime());
823 break;
826 int32 sendCount = message->SendMessage();
827 if (sendCount > 0)
828 sent += sendCount;
830 if (message->CountTargets() == 0)
831 remove.AddItem(message);
834 // remove serviced messages
835 for (int32 index = 0; index < remove.CountItems(); ++index)
836 fMessages.RemoveItem(remove.ItemAt(index));
838 atomic_add64(&fSentCount, sent);
840 // catch any partly-failed messages (possibly late):
841 if (fMessages.CountItems() > 0
842 && atomic_get64(&fScheduledWakeup) == B_INFINITE_TIMEOUT) {
844 fMessages.SortItems(&CompareMessages);
845 message = fMessages.ItemAt(0);
846 bigtime_t timeout = message->ScheduledTime() - time;
848 if (timeout < 0)
849 timeout = DM_MINIMUM_DELAY;
851 atomic_set64(&fScheduledWakeup, timeout);
854 fLock.Unlock();
855 return sent;
859 void
860 DelayedMessageSender::_Wakeup(bigtime_t when)
862 if (atomic_get64(&fScheduledWakeup) < when
863 && atomic_get(&fWakeupRetry) == 0)
864 return;
866 atomic_set64(&fScheduledWakeup, when);
868 BPrivate::LinkSender sender(fPort);
869 sender.StartMessage(kWakeupMessage);
870 status_t error = sender.Flush(30000);
871 atomic_set(&fWakeupRetry, (int32)error == B_TIMED_OUT);