repository_infos: Enable automatic updates on the main Haiku repostiory.
[haiku.git] / src / apps / cortex / NodeManager / NodeGroup.cpp
blobf623a9517bea76ac49461cb94cf22e6e8b8d8fcd
1 /*
2 * Copyright (c) 1999-2000, Eric Moon.
3 * All rights reserved.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions, and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions, and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 * derived from this software without specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 // NodeGroup.cpp
34 #include "NodeGroup.h"
35 //#include "NodeGroup_transport_thread.h"
37 #include "NodeManager.h"
38 #include "NodeRef.h"
40 #include <MediaRoster.h>
41 #include <OS.h>
42 #include <TimeSource.h>
44 #include <algorithm>
45 #include <functional>
47 #include "array_delete.h"
48 #include "BasicThread.h"
49 #include "node_manager_impl.h"
50 #include "functional_tools.h"
52 using namespace std;
54 __USE_CORTEX_NAMESPACE
55 #define D_METHOD(x) //PRINT (x)
56 #define D_ROSTER(x) //PRINT (x)
57 #define D_LOCK(x) //PRINT (x)
61 // -------------------------------------------------------- //
62 // *** ctor/dtor
63 // -------------------------------------------------------- //
65 // free the group, including all nodes within it
66 // (this call will result in the eventual deletion of the object.)
67 // returns B_OK on success; B_NOT_ALLOWED if release() has
68 // already been called; other error codes if the Media Roster
69 // call fails.
70 // * THE MANAGER MUST BE LOCKED
72 status_t NodeGroup::release() {
74 D_METHOD((
75 "NodeGroup::release()\n"));
77 if(isReleased())
78 return B_NOT_ALLOWED;
80 // clean up
81 lock();
83 // halt all nodes
84 _stop();
86 // remove & release all nodes
87 // +++++ causes triply-nested lock: eww!
88 while(m_nodes.size()) {
89 NodeRef* last = m_nodes.back();
90 removeNode(m_nodes.size()-1);
91 last->release();
94 unlock();
96 // [e.moon 7nov99]
97 // removing the released group is now NodeManager's responsibility
99 // remove from NodeManager
100 if(!m_manager->lock()) {
101 ASSERT(!"* m_manager->lock() failed.\n");
103 m_manager->_removeGroup(this);
104 m_manager->unlock();
106 // hand off to IObservable
107 return _inherited::release();
110 // call release() rather than deleting NodeGroup objects
111 NodeGroup::~NodeGroup() {
113 Autolock _l(this);
114 D_METHOD((
115 "~NodeGroup()\n"));
117 ASSERT(!m_nodes.size());
119 if(m_timeSourceObj) {
120 m_timeSourceObj->Release();
121 m_timeSourceObj = 0;
126 // -------------------------------------------------------- //
127 // *** accessors
128 // -------------------------------------------------------- //
130 // [e.moon 13oct99] moved to header
131 //inline uint32 NodeGroup::id() const { return m_id; }
133 // -------------------------------------------------------- //
134 // *** operations
135 // -------------------------------------------------------- //
137 // name access
138 const char* NodeGroup::name() const {
139 Autolock _l(this);
140 return m_name.String();
143 status_t NodeGroup::setName(const char* name) {
144 Autolock _l(this);
145 m_name = name;
146 return B_OK;
149 // content access
150 uint32 NodeGroup::countNodes() const {
151 Autolock _l(this);
152 return m_nodes.size();
155 NodeRef* NodeGroup::nodeAt(
156 uint32 index) const {
157 Autolock _l(this);
158 return (index < m_nodes.size()) ?
159 m_nodes[index] :
163 // add/remove nodes:
164 // - you may only add a node with no current group.
165 // - nodes added during playback will be started;
166 // nodes removed during playback will be stopped (unless
167 // the NO_START_STOP transport restriction flag is set
168 // for a given node.)
170 status_t NodeGroup::addNode(
171 NodeRef* node) {
173 D_METHOD((
174 "NodeGroup::addNode()\n"));
176 // lock the manager first; if the node has no current group,
177 // this locks the node.
178 m_manager->lock();
180 Autolock _l(this);
182 // precondition: GROUP_LOCKED not set
183 if(m_flags & GROUP_LOCKED)
184 return B_NOT_ALLOWED;
186 // precondition: no current group
187 if(node->m_group) {
188 // [e.moon 28sep99] whoops, forgot one
189 PRINT((
190 "!!! node already in group '%s'\n", node->m_group->name()));
192 m_manager->unlock();
193 return B_NOT_ALLOWED;
196 // add it
197 m_nodes.push_back(node);
198 node->_setGroup(this);
200 // release the manager
201 m_manager->unlock();
203 // first node? the transport is now ready to start
204 if(m_nodes.size() == 1) {
205 _changeState(TRANSPORT_INVALID, TRANSPORT_STOPPED);
208 // if(m_syncNode == media_node::null) {
209 // // assign as sync node
210 // setSyncNode(node->node());
211 // }
213 // initialize the new node
214 status_t err = node->_initTransportState();
215 if(err < B_OK)
216 return err;
218 // set time source
219 node->_setTimeSource(m_timeSource.node);
221 // set run mode
222 node->_setRunMode(m_runMode);
224 // add to cycle set if need be
225 // +++++ should I call _cycleAddRef() instead?
226 if(node->m_cycle)
227 _refCycleChanged(node);
229 if(m_transportState == TRANSPORT_RUNNING) {
230 // +++++ start if necessary!
232 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
234 // send notification
235 if(!LockLooper()) {
236 ASSERT(!"LockLooper() failed.");
238 BMessage m(M_NODE_ADDED);
239 m.AddInt32("groupID", id());
240 m.AddInt32("nodeID", node->id());
241 notify(&m);
242 UnlockLooper();
244 // success
245 return B_OK;
249 status_t NodeGroup::removeNode(
250 NodeRef* node) {
252 D_METHOD((
253 "NodeGroup::removeNode()\n"));
255 // lock the manager first; once the node is ungrouped,
256 // the manager lock applies to it
257 m_manager->lock();
259 Autolock _l(this);
261 // precondition: this must be the node's group
262 if(node->m_group != this) {
263 // [e.moon 28sep99] whoops, forgot one
264 PRINT((
265 "!!! node not in group '%s'\n", node->m_group->name()));
267 m_manager->unlock();
268 return B_NOT_ALLOWED;
271 // remove from the cycle set
272 if(node->m_cycle)
273 _cycleRemoveRef(node);
275 // remove it
276 ASSERT(m_nodes.size());
277 remove(
278 m_nodes.begin(),
279 m_nodes.end(),
280 node);
282 // should have removed one and only one entry
283 m_nodes.resize(m_nodes.size()-1);
285 // // 6aug99: the timesource is now the sync node...
286 // // is this the sync node? reassign if so
288 // if(node->node() == m_syncNode) {
290 // // look for another sync-capable node
291 // bool found = false;
292 // for(int n = 0; !found && n < m_nodes.size(); ++n)
293 // if(setSyncNode(m_nodes[n]->node()) == B_OK)
294 // found = true;
296 // // no luck? admit defeat:
297 // if(!found) {
298 // PRINT((
299 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
301 // // +++++ stop & set to invalid state?
303 // setSyncNode(media_node::null);
304 // }
305 // }
307 // stop the node if necessary
308 status_t err = node->_stop();
309 if(err < B_OK) {
310 PRINT((
311 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
312 " %s\n",
313 node->name(),
314 strerror(err)));
317 // clear the node's group pointer
318 node->_setGroup(0);
320 // release the manager lock; the node is now ungrouped and
321 // unlocked
322 m_manager->unlock();
324 // was that the last node? stop/disable the transport if so
325 if(!m_nodes.size()) {
327 // +++++ kill sync thread(s)
329 _changeState(TRANSPORT_INVALID);
332 // send notification
333 if(!LockLooper()) {
334 ASSERT(!"LockLooper() failed.");
336 BMessage m(M_NODE_REMOVED);
337 m.AddInt32("groupID", id());
338 m.AddInt32("nodeID", node->id());
339 notify(&m);
340 UnlockLooper();
342 // success
343 return B_OK;
346 status_t NodeGroup::removeNode(
347 uint32 index) {
349 D_METHOD((
350 "NodeGroup::removeNode(by index)\n"));
352 // +++++ icky nested lock
353 Autolock _l(this);
355 ASSERT(m_nodes.size() > index);
356 return removeNode(m_nodes[index]);
359 uint32 NodeGroup::groupFlags() const {
360 Autolock _l(this);
361 return m_flags;
364 status_t NodeGroup::setGroupFlags(
365 uint32 flags) {
366 Autolock _l(this);
367 m_flags = flags;
368 return B_OK;
372 // returns true if one or more nodes in the group have cycling
373 // enabled, and the start- and end-positions are valid
374 bool NodeGroup::canCycle() const {
375 Autolock _l(this);
377 return
378 m_cycleNodes.size() > 0 &&
379 m_endPosition - m_startPosition > s_minCyclePeriod;
382 // -------------------------------------------------------- //
383 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
384 // -------------------------------------------------------- //
386 // Fetch the current transport state
388 NodeGroup::transport_state_t NodeGroup::transportState() const {
389 Autolock _l(this);
390 return m_transportState;
393 // Set the starting media time:
394 // This is the point at which playback will begin in any media
395 // files/documents being played by the nodes in this group.
396 // When cycle mode is enabled, this is the point to which each
397 // node will be seek'd at the end of each cycle (loop).
399 // The starting time can't be changed in the B_OFFLINE run mode
400 // (this call will return an error.)
402 status_t NodeGroup::setStartPosition(
403 bigtime_t start) {
404 Autolock _l(this);
406 D_METHOD((
407 "NodeGroup::setStartPosition(%Ld)\n", start));
410 m_transportState == TRANSPORT_RUNNING ||
411 m_transportState == TRANSPORT_ROLLING ||
412 m_transportState == TRANSPORT_STARTING) {
414 if(m_runMode == BMediaNode::B_OFFLINE)
415 return B_NOT_ALLOWED;
417 ASSERT(m_timeSourceObj);
419 if(_cycleValid()) {
420 if(m_timeSourceObj->Now() >= m_cycleDeadline) {
421 // too late to change start position; defer
422 // PRINT((" - deferred\n"));
423 m_newStartPosition = start;
424 m_newStart = true;
425 return B_OK;
428 // not at deadline yet; fall through to set start position
432 m_startPosition = start;
434 // +++++ notify [e.moon 11oct99]
436 return B_OK;
439 // Fetch the starting position:
441 // +++++ if a previously-set start position was deferred, it won't be
442 // returned yet
444 bigtime_t NodeGroup::startPosition() const {
445 Autolock _l(this);
447 return m_startPosition;
450 // Set the ending media time:
451 // This is the point at which playback will end relative to
452 // media documents begin played by the nodes in this group;
453 // in cycle mode, this specifies the loop point. If the
454 // ending time is less than or equal to the starting time,
455 // the transport will continue until stopped manually.
456 // If the end position is changed while the transport is playing,
457 // it must take effect retroactively (if it's before the current
458 // position and looping is enabled, all nodes must 'warp' to
459 // the proper post-loop position.)
461 // The ending time can't be changed if run mode is B_OFFLINE and
462 // the transport is running (this call will return an error.)
464 status_t NodeGroup::setEndPosition(
465 bigtime_t end) {
466 Autolock _l(this);
468 D_METHOD((
469 "NodeGroup::setEndPosition(%Ld)\n", end));
472 m_transportState == TRANSPORT_RUNNING ||
473 m_transportState == TRANSPORT_ROLLING ||
474 m_transportState == TRANSPORT_STARTING) {
476 if(m_runMode == BMediaNode::B_OFFLINE)
477 return B_NOT_ALLOWED;
479 ASSERT(m_timeSourceObj);
481 bigtime_t endDelta = end - m_endPosition;
483 if(_cycleValid()) {
484 if(m_timeSourceObj->Now() >= m_cycleDeadline + endDelta) {
485 // too late to change end position; defer
486 // PRINT((" - deferred\n"));
487 m_newEndPosition = end;
488 m_newEnd = true;
489 return B_OK;
491 else {
492 // set new end position
493 m_endPosition = end;
495 // inform thread
496 ASSERT(m_cyclePort);
497 write_port(
498 m_cyclePort,
499 _CYCLE_END_CHANGED,
503 // // restart nodes' cycle threads with new end position
504 // _cycleInit(m_cycleStart);
505 // for(node_set::iterator it = m_cycleNodes.begin();
506 // it != m_cycleNodes.end(); ++it) {
507 // (*it)->_scheduleCycle(m_cycleBoundary);
508 // }
509 // return B_OK;
514 m_endPosition = end;
516 // +++++ notify [e.moon 11oct99]
518 return B_OK;
522 // Fetch the end position:
523 // Note that if the end position is less than or equal to the start
524 // position, it's ignored.
526 // +++++ if a previously-set end position was deferred, it won't be
527 // returned yet
529 bigtime_t NodeGroup::endPosition() const {
530 Autolock _l(this);
531 return m_endPosition;
534 // -------------------------------------------------------- //
535 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
536 // -------------------------------------------------------- //
538 // Preroll the group:
539 // Seeks, then prerolls, each node in the group (honoring the
540 // NO_SEEK and NO_PREROLL flags.) This ensures that the group
541 // can start as quickly as possible.
543 // Does not return until all nodes in the group have been
544 // prepared.
546 status_t NodeGroup::preroll() {
547 D_METHOD((
548 "NodeGroup::preroll()\n"));
550 Autolock _l(this);
551 return _preroll();
554 // Start all nodes in the group:
555 // Nodes with the NO_START_STOP flag aren't molested.
557 status_t NodeGroup::start() {
558 D_METHOD((
559 "NodeGroup::start()\n"));
561 Autolock _l(this);
562 return _start();
565 // Stop all nodes in the group:
566 // Nodes with the NO_START_STOP flag aren't molested.
568 status_t NodeGroup::stop() {
569 D_METHOD((
570 "NodeGroup::stop()\n"));
572 Autolock _l(this);
573 return _stop();
576 // Roll all nodes in the group:
577 // Queues a start and stop atomically (via BMediaRoster::RollNode()).
578 // Returns B_NOT_ALLOWED if endPosition <= startPosition;
580 status_t NodeGroup::roll() {
581 D_METHOD((
582 "NodeGroup::roll()\n"));
584 Autolock _l(this);
585 return _roll();
588 // -------------------------------------------------------- //
589 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
590 // -------------------------------------------------------- //
592 // time source control:
593 // getTimeSource():
594 // returns B_ERROR if no time source has been set; otherwise,
595 // returns the node ID of the current time source for all
596 // nodes in the group.
598 // setTimeSource():
599 // Calls SetTimeSourceFor() on every node in the group.
600 // The group must be stopped; B_NOT_ALLOWED will be returned
601 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
603 status_t NodeGroup::getTimeSource(
604 media_node* outTimeSource) const {
605 Autolock _l(this);
607 if(m_timeSource != media_node::null) {
608 *outTimeSource = m_timeSource;
609 return B_OK;
611 return B_ERROR;
614 status_t NodeGroup::setTimeSource(
615 const media_node& timeSource) {
617 Autolock _l(this);
619 if(m_transportState == TRANSPORT_RUNNING || m_transportState == TRANSPORT_ROLLING)
620 return B_NOT_ALLOWED;
622 if(m_timeSourceObj)
623 m_timeSourceObj->Release();
625 m_timeSource = timeSource;
627 // cache a BTimeSource*
628 m_timeSourceObj = m_manager->roster->MakeTimeSourceFor(timeSource);
629 ASSERT(m_timeSourceObj);
631 // apply new time source to all nodes
632 for_each(
633 m_nodes.begin(),
634 m_nodes.end(),
635 bind2nd(
636 mem_fun(&NodeRef::_setTimeSource),
637 m_timeSource.node
641 // // try to set as sync node
642 // err = setSyncNode(timeSource);
643 // if(err < B_OK) {
644 // PRINT((
645 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
646 // strerror(err)));
647 // }
649 // notify
650 if(!LockLooper()) {
651 ASSERT(!"LockLooper() failed.");
653 BMessage m(M_TIME_SOURCE_CHANGED);
654 m.AddInt32("groupID", id());
655 m.AddInt32("timeSourceID", timeSource.node);
656 notify(&m);
657 UnlockLooper();
659 return B_OK;
662 // run mode access:
663 // Sets the default run mode for the group. This will be
664 // applied to every node with a wildcard (0) run mode.
666 // Special case: if the run mode is B_OFFLINE, it will be
667 // applied to all nodes in the group.
669 BMediaNode::run_mode NodeGroup::runMode() const {
670 Autolock _l(this);
671 return m_runMode;
674 status_t NodeGroup::setRunMode(BMediaNode::run_mode mode) {
675 Autolock _l(this);
677 m_runMode = mode;
679 // apply to all nodes
680 for_each(
681 m_nodes.begin(),
682 m_nodes.end(),
683 bind2nd(
684 mem_fun(&NodeRef::_setRunModeAuto),
685 m_runMode
687 // bound_method(
688 // *this,
689 // &NodeGroup::setRunModeFor)
693 return B_OK;
696 // -------------------------------------------------------- //
697 // *** BHandler
698 // -------------------------------------------------------- //
700 void NodeGroup::MessageReceived(
701 BMessage* message) {
703 // PRINT((
704 // "NodeGroup::MessageReceived():\n"));
705 // message->PrintToStream();
706 status_t err;
708 switch(message->what) {
709 case M_SET_TIME_SOURCE:
711 media_node timeSource;
712 void* data;
713 ssize_t dataSize;
714 err = message->FindData(
715 "timeSourceNode",
716 B_RAW_TYPE,
717 (const void**)&data,
718 &dataSize);
719 if(err < B_OK) {
720 PRINT((
721 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
722 " no timeSourceNode!\n"));
723 break;
725 timeSource = *(media_node*)data;
727 setTimeSource(timeSource);
729 break;
731 case M_SET_RUN_MODE:
733 uint32 runMode;
734 err = message->FindInt32("runMode", (int32*)&runMode);
735 if(err < B_OK) {
736 PRINT((
737 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
738 " no runMode!\n"));
739 break;
742 if(runMode < BMediaNode::B_OFFLINE ||
743 runMode > BMediaNode::B_RECORDING) {
744 PRINT((
745 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
746 " invalid run mode (%" B_PRIu32 ")\n", runMode));
747 break;
750 setRunMode((BMediaNode::run_mode)runMode);
752 break;
754 case M_SET_START_POSITION:
756 bigtime_t position;
757 err = message->FindInt64("position", (int64*)&position);
758 if(err < B_OK) {
759 PRINT((
760 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
761 " no position!\n"));
762 break;
764 setStartPosition(position);
766 break;
768 case M_SET_END_POSITION:
770 bigtime_t position;
771 err = message->FindInt64("position", (int64*)&position);
772 if(err < B_OK) {
773 PRINT((
774 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
775 " no position!\n"));
776 break;
778 setEndPosition(position);
780 break;
782 case M_PREROLL:
783 preroll();
784 break;
786 case M_START:
787 start();
788 break;
790 case M_STOP:
791 stop();
792 break;
794 case M_ROLL:
795 roll();
796 break;
798 default:
799 _inherited::MessageReceived(message);
800 break;
805 // -------------------------------------------------------- //
806 // *** IPersistent
807 // -------------------------------------------------------- //
809 // !
810 #if CORTEX_XML
811 // !
813 // +++++
815 // Default constructor
816 NodeGroup::NodeGroup() :
817 m_manager(0) {} // +++++ finish initialization
820 // !
821 #endif /*CORTEX_XML*/
822 // !
824 // -------------------------------------------------------- //
825 // *** IObservable: [19aug99]
826 // -------------------------------------------------------- //
828 void NodeGroup::observerAdded(
829 const BMessenger& observer) {
831 BMessage m(M_OBSERVER_ADDED);
832 m.AddInt32("groupID", id());
833 m.AddMessenger("target", BMessenger(this));
834 observer.SendMessage(&m);
837 void NodeGroup::observerRemoved(
838 const BMessenger& observer) {
840 BMessage m(M_OBSERVER_REMOVED);
841 m.AddInt32("groupID", id());
842 m.AddMessenger("target", BMessenger(this));
843 observer.SendMessage(&m);
846 void NodeGroup::notifyRelease() {
848 BMessage m(M_RELEASED);
849 m.AddInt32("groupID", id());
850 m.AddMessenger("target", BMessenger(this));
851 notify(&m);
854 void NodeGroup::releaseComplete() {
855 // +++++
858 // -------------------------------------------------------- //
859 // *** ILockable: pass lock requests to m_lock
860 // -------------------------------------------------------- //
862 bool NodeGroup::lock(
863 lock_t type,
864 bigtime_t timeout) {
866 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
868 ASSERT(type == WRITE);
869 status_t err = m_lock.LockWithTimeout(timeout);
871 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
873 return err == B_OK;
876 bool NodeGroup::unlock(
877 lock_t type) {
879 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
881 ASSERT(type == WRITE);
882 m_lock.Unlock();
884 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
886 return true;
889 bool NodeGroup::isLocked(
890 lock_t type) const {
892 ASSERT(type == WRITE);
893 return m_lock.IsLocked();
896 // -------------------------------------------------------- //
897 // *** ctor (accessible to NodeManager)
898 // -------------------------------------------------------- //
900 NodeGroup::NodeGroup(
901 const char* name,
902 NodeManager* manager,
903 BMediaNode::run_mode runMode) :
905 ObservableHandler(name),
906 m_lock("NodeGroup::m_lock"),
907 m_manager(manager),
908 m_id(NextID()),
909 m_name(name),
910 m_flags(0),
911 m_transportState(TRANSPORT_INVALID),
912 m_runMode(runMode),
913 m_timeSourceObj(0),
914 m_released(false),
915 m_cycleThread(0),
916 m_cyclePort(0),
917 m_startPosition(0LL),
918 m_endPosition(0LL),
919 m_newStart(false),
920 m_newEnd(false) {
922 ASSERT(m_manager);
924 if(!m_manager->Lock()) {
925 ASSERT(!"m_manager->Lock() failed");
927 m_manager->AddHandler(this);
928 m_manager->Unlock();
930 // set default time source
931 media_node ts;
932 D_ROSTER(("# roster->GetTimeSource()\n"));
933 status_t err = m_manager->roster->GetTimeSource(&ts);
934 if(err < B_OK) {
935 PRINT((
936 "*** NodeGroup(): roster->GetTimeSource() failed:\n"
937 " %s\n", strerror(err)));
939 setTimeSource(ts);
942 // -------------------------------------------------------- //
943 // *** internal operations
944 // -------------------------------------------------------- //
946 uint32 NodeGroup::s_nextID = 1;
947 uint32 NodeGroup::NextID() {
948 return atomic_add((int32*)&s_nextID, 1);
951 // -------------------------------------------------------- //
952 // *** ref->group communication (LOCK REQUIRED)
953 // -------------------------------------------------------- //
955 // when a NodeRef's cycle state (ie. looping or not looping)
956 // changes, it must pass that information on via this method
958 void NodeGroup::_refCycleChanged(
959 NodeRef* ref) {
960 assert_locked(this);
961 D_METHOD((
962 "NodeGroup::_refCycleChanged('%s')\n",
963 ref->name()));
965 if(ref->m_cycle) {
966 _cycleAddRef(ref);
967 } else {
968 _cycleRemoveRef(ref);
971 // +++++ if running & cycle valid, the node should be properly
972 // seek'd and start'd
976 // when a cycling node's latency changes, call this method.
978 void NodeGroup::_refLatencyChanged(
979 NodeRef* ref) {
980 assert_locked(this);
981 D_METHOD((
982 "NodeGroup::_refLatencyChanged('%s')\n",
983 ref->name()));
985 if(!_cycleValid())
986 return;
988 // remove & replace ref (positions it properly)
989 _cycleRemoveRef(ref);
990 _cycleAddRef(ref);
992 // slap my thread up
993 ASSERT(m_cyclePort);
994 write_port(
995 m_cyclePort,
996 _CYCLE_LATENCY_CHANGED,
1000 // +++++ zat it?
1003 // when a NodeRef receives notification that it has been stopped,
1004 // but is labeled as still running, it must call this method.
1005 // [e.moon 11oct99: roll/B_OFFLINE support]
1007 void NodeGroup::_refStopped(
1008 NodeRef* ref) {
1009 assert_locked(this);
1010 D_METHOD((
1011 "NodeGroup::_refStopped('%s')\n",
1012 ref->name()));
1014 // roll/B_OFFLINE support [e.moon 11oct99]
1015 // (check to see if any other nodes in the group are still running;
1016 // mark group stopped if not.)
1017 if(m_transportState == TRANSPORT_ROLLING) {
1018 bool nodesRunning = false;
1019 for(node_set::iterator it = m_nodes.begin();
1020 it != m_nodes.end(); ++it) {
1021 if((*it)->isRunning()) {
1022 nodesRunning = true;
1023 break;
1026 if(!nodesRunning)
1027 // the group has stopped; update transport state
1028 _changeState(TRANSPORT_STOPPED);
1035 // -------------------------------------------------------- //
1036 // *** transport helpers (LOCK REQUIRED)
1037 // -------------------------------------------------------- //
1040 // Preroll all nodes in the group; this is the implementation
1041 // of preroll().
1042 // *** this method should not be called from the transport thread
1043 // (since preroll operations can block for a relatively long time.)
1045 status_t NodeGroup::_preroll() {
1046 assert_locked(this);
1048 D_METHOD((
1049 "NodeGroup::_preroll()\n"));
1052 m_transportState == TRANSPORT_RUNNING ||
1053 m_transportState == TRANSPORT_ROLLING)
1054 // too late
1055 return B_NOT_ALLOWED;
1057 // * preroll all nodes to the start position
1059 // +++++ currently, if an error is encountered it's ignored.
1060 // should the whole operation fail if one node couldn't
1061 // be prerolled?
1063 // My gut response is 'no', since the preroll step is
1064 // optional, but the caller should have some inkling that
1065 // one of its nodes didn't behave.
1067 // [e.moon 13oct99] making PPC compiler happy
1068 // for_each(
1069 // m_nodes.begin(),
1070 // m_nodes.end(),
1071 // bind2nd(
1072 // mem_fun(&NodeRef::_preroll),
1073 // m_startPosition
1074 // )
1075 // );
1076 for(node_set::iterator it = m_nodes.begin();
1077 it != m_nodes.end(); ++it) {
1078 (*it)->_preroll(m_startPosition);
1081 // replaces
1082 // bind2nd(
1083 // bound_method(*this, &NodeGroup::prerollNode),
1084 // m_startPosition
1085 // )
1087 return B_OK;
1091 //// functor: calculates latency of each node it's handed, caching
1092 //// the largest one found; includes initial latency if nodes report it.
1094 //class NodeGroup::calcLatencyFn { public:
1095 // bigtime_t& maxLatency;
1097 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1099 // void operator()(NodeRef* r) {
1100 // ASSERT(r);
1102 //// PRINT((
1103 //// "# calcLatencyFn(): '%s'\n",
1104 //// r->name()));
1106 // if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1107 // // node can't incur latency
1108 //// PRINT((
1109 //// "- not a producer\n"));
1110 // return;
1111 // }
1113 // bigtime_t latency;
1114 // status_t err =
1115 // BMediaRoster::Roster()->GetLatencyFor(
1116 // r->node(),
1117 // &latency);
1118 // if(err < B_OK) {
1119 // PRINT((
1120 // "* calcLatencyFn: GetLatencyFor() failed: %s\n",
1121 // strerror(err)));
1122 // return;
1123 // }
1124 //// PRINT(("- %Ld\n", latency));
1126 // bigtime_t add;
1127 // err = BMediaRoster::Roster()->GetInitialLatencyFor(
1128 // r->node(),
1129 // &add);
1130 //// PRINT(("- %Ld\n", add));
1131 // if(err < B_OK) {
1132 // PRINT((
1133 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1134 // strerror(err)));
1135 // }
1136 // else
1137 // latency += add;
1139 // if(latency > maxLatency)
1140 // maxLatency = latency;
1142 //// PRINT((
1143 //// "- max latency: %Ld\n",
1144 //// maxLatency));
1145 // }
1146 //};
1148 // Start all nodes in the group; this is the implementation of
1149 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead
1150 // in that case.
1152 // (this may be called from the transport thread or from
1153 // an API-implementation method.)
1155 status_t NodeGroup::_start() {
1156 assert_locked(this);
1158 D_METHOD((
1159 "NodeGroup::_start()\n"));
1160 status_t err;
1162 if(m_transportState != TRANSPORT_STOPPED)
1163 return B_NOT_ALLOWED;
1165 if(m_runMode == BMediaNode::B_OFFLINE)
1166 return B_NOT_ALLOWED;
1168 ASSERT(m_nodes.size());
1170 _changeState(TRANSPORT_STARTING);
1172 // * Find the highest latency in the group
1174 bigtime_t offset = 0LL;
1175 calcLatencyFn _f(offset);
1176 for_each(
1177 m_nodes.begin(),
1178 m_nodes.end(),
1179 _f);
1181 offset += s_rosterLatency;
1182 PRINT((
1183 "- offset: %" B_PRIdBIGTIME "\n", offset));
1185 // * Seek all nodes (in case one or more failed to preroll)
1187 for(node_set::iterator it = m_nodes.begin();
1188 it != m_nodes.end(); ++it) {
1189 err = (*it)->_seekStopped(m_startPosition);
1190 if(err < B_OK) {
1191 PRINT((
1192 "! NodeGroup('%s')::_start():\n"
1193 " ref('%s')->_seekStopped(%" B_PRIdBIGTIME ") failed:\n"
1194 " %s\n",
1195 name(), (*it)->name(), m_startPosition,
1196 strerror(err)));
1198 // +++++ continue?
1202 // * Start all nodes, allowing for the max latency found
1204 ASSERT(m_timeSourceObj);
1205 bigtime_t when = m_timeSourceObj->Now() + offset;
1207 // 10aug99: initialize cycle (loop) settings
1208 if(_cycleValid()) {
1209 _initCycleThread();
1210 _cycleInit(when);
1213 // start the nodes
1214 for(node_set::iterator it = m_nodes.begin();
1215 it != m_nodes.end(); ++it) {
1216 err = (*it)->_start(when);
1217 if(err < B_OK) {
1218 PRINT((
1219 "! NodeGroup('%s')::_start():\n"
1220 " ref('%s')->_start(%" B_PRIdBIGTIME ") failed:\n"
1221 " %s\n",
1222 name(), (*it)->name(), when,
1223 strerror(err)));
1225 // +++++ continue?
1229 // notify observers
1230 _changeState(TRANSPORT_RUNNING);
1231 return B_OK;
1234 // Stop all nodes in the group; this is the implementation of
1235 // stop().
1237 // (this may be called from the transport thread or from
1238 // an API-implementation method.)
1240 status_t NodeGroup::_stop() {
1242 D_METHOD((
1243 "NodeGroup::_stop()\n"));
1245 assert_locked(this);
1248 m_transportState != TRANSPORT_RUNNING &&
1249 m_transportState != TRANSPORT_ROLLING)
1250 return B_NOT_ALLOWED;
1252 _changeState(TRANSPORT_STOPPING);
1254 // * stop the cycle thread if need be
1255 _destroyCycleThread();
1257 // * stop all nodes
1258 // +++++ error reports would be nice
1260 for_each(
1261 m_nodes.begin(),
1262 m_nodes.end(),
1263 mem_fun(&NodeRef::_stop)
1266 // update transport state
1267 _changeState(TRANSPORT_STOPPED);
1269 return B_OK;
1272 // Roll all nodes in the group; this is the implementation of
1273 // roll().
1275 // (this may be called from the transport thread or from
1276 // an API-implementation method.)
1278 status_t NodeGroup::_roll() {
1280 D_METHOD((
1281 "NodeGroup::_roll()\n"));
1282 assert_locked(this);
1283 status_t err;
1285 if(m_transportState != TRANSPORT_STOPPED)
1286 return B_NOT_ALLOWED;
1288 bigtime_t period = m_endPosition - m_startPosition;
1289 if(period <= 0LL)
1290 return B_NOT_ALLOWED;
1292 _changeState(TRANSPORT_STARTING);
1294 bigtime_t tpStart = 0LL;
1295 bigtime_t tpStop = period;
1297 if(m_runMode != BMediaNode::B_OFFLINE) {
1299 // * Find the highest latency in the group
1300 bigtime_t offset = 0LL;
1301 calcLatencyFn _f(offset);
1302 for_each(
1303 m_nodes.begin(),
1304 m_nodes.end(),
1305 _f);
1307 offset += s_rosterLatency;
1308 PRINT((
1309 "- offset: %" B_PRIdBIGTIME "\n", offset));
1311 ASSERT(m_timeSourceObj);
1312 tpStart = m_timeSourceObj->Now() + offset;
1313 tpStop += tpStart;
1316 // * Roll all nodes; watch for errors
1317 bool allFailed = true;
1318 err = B_OK;
1319 for(
1320 node_set::iterator it = m_nodes.begin();
1321 it != m_nodes.end(); ++it) {
1323 status_t e = (*it)->_roll(
1324 tpStart,
1325 tpStop,
1326 m_startPosition);
1327 if(e < B_OK)
1328 err = e;
1329 else
1330 allFailed = false;
1333 if(!allFailed)
1334 // notify observers
1335 _changeState(TRANSPORT_ROLLING);
1337 return err;
1341 // State transition; notify listeners
1342 // +++++ [18aug99] DANGER: should notification happen in the middle
1343 // of such an operation?
1344 inline void NodeGroup::_changeState(
1345 transport_state_t to) {
1347 assert_locked(this);
1349 m_transportState = to;
1351 if(!LockLooper()) {
1352 ASSERT(!"LockLooper() failed.");
1354 BMessage m(M_TRANSPORT_STATE_CHANGED);
1355 m.AddInt32("groupID", id());
1356 m.AddInt32("transportState", m_transportState);
1357 notify(&m);
1358 UnlockLooper();
1361 // Enforce a state transition, and notify listeners
1362 inline void NodeGroup::_changeState(
1363 transport_state_t from,
1364 transport_state_t to) {
1366 assert_locked(this);
1367 ASSERT(m_transportState == from);
1369 _changeState(to);
1373 // -------------------------------------------------------- //
1374 // *** cycle thread & helpers (LOCK REQUIRED)
1375 // -------------------------------------------------------- //
1377 // *** cycle port definitions
1379 const int32 _portLength = 32;
1380 const char* const _portName = "NodeGroup::m_cyclePort";
1381 const size_t _portMsgMaxSize = 256;
1384 // set up the cycle thread (including its kernel port)
1385 status_t NodeGroup::_initCycleThread() {
1386 assert_locked(this);
1387 status_t err;
1388 D_METHOD((
1389 "NodeGroup::_initCycleThread()\n"));
1391 if(m_cycleThread) {
1392 // thread is still alive
1393 err = _destroyCycleThread();
1394 if(err < B_OK)
1395 return err;
1398 // create
1399 m_cycleThreadDone = false;
1400 m_cycleThread = spawn_thread(
1401 &_CycleThread,
1402 "NodeGroup[cycleThread]",
1403 B_NORMAL_PRIORITY,
1404 (void*)this);
1405 if(m_cycleThread < B_OK) {
1406 PRINT((
1407 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1408 " %s\n",
1409 strerror(m_cycleThread)));
1410 return m_cycleThread;
1413 // launch
1414 return resume_thread(m_cycleThread);
1417 // shut down the cycle thread/port
1418 status_t NodeGroup::_destroyCycleThread() {
1419 assert_locked(this);
1420 status_t err;
1421 D_METHOD((
1422 "NodeGroup::_destroyCycleThread()\n"));
1424 if(!m_cycleThread)
1425 return B_OK;
1427 if(!m_cycleThreadDone) {
1428 // kill the thread
1429 ASSERT(m_cyclePort);
1430 err = write_port_etc(
1431 m_cyclePort,
1432 _CYCLE_STOP,
1435 B_TIMEOUT,
1436 10000LL);
1438 if(err < B_OK) {
1439 // bad thread. die, thread, die.
1440 PRINT((
1441 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1442 delete_port(m_cyclePort);
1443 m_cyclePort = 0;
1444 kill_thread(m_cycleThread);
1445 m_cycleThread = 0;
1446 return B_OK;
1449 // the thread got the message; wait for it to quit
1450 unlock();
1451 while(wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED) {
1452 PRINT((
1453 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1455 lock();
1458 // it's up to the thread to close its port
1459 ASSERT(!m_cyclePort);
1461 m_cycleThread = 0;
1463 return B_OK;
1467 // 1) do the current positions specify a valid cycle region?
1468 // 2) are any nodes in the group cycle-enabled?
1470 bool NodeGroup::_cycleValid() {
1471 assert_locked(this);
1472 return
1473 (m_transportState == TRANSPORT_RUNNING ||
1474 m_transportState == TRANSPORT_STARTING) &&
1475 canCycle();
1478 // initialize the cycle members (call when starting)
1480 void NodeGroup::_cycleInit(
1481 bigtime_t startTime) {
1482 assert_locked(this);
1483 ASSERT(m_cycleNodes.size() > 0);
1484 D_METHOD((
1485 "NodeGroup::_cycleInit(%Ld)\n",
1486 startTime));
1488 // +++++ rescan latencies?
1490 // figure new boundary & deadline from region length
1491 bigtime_t cyclePeriod = m_endPosition - m_startPosition;
1493 if(cyclePeriod <= 0) {
1494 // cycle region is no longer valid
1495 m_cycleBoundary = 0LL;
1496 m_cycleDeadline = 0LL;
1498 // no no no -- deadlocks when the thread calls this method
1499 // // stop the thread
1500 // _destroyCycleThread();
1501 return;
1504 m_cycleStart = startTime;
1505 m_cycleBoundary = startTime + cyclePeriod;
1506 m_cycleDeadline = m_cycleBoundary - (m_cycleMaxLatency + s_rosterLatency);
1510 // add a ref to the cycle set (in proper order, based on latency)
1511 void NodeGroup::_cycleAddRef(
1512 NodeRef* ref) {
1513 assert_locked(this);
1515 // make sure it's not already there
1516 ASSERT(find(
1517 m_cycleNodes.begin(),
1518 m_cycleNodes.end(),
1519 ref) == m_cycleNodes.end());
1521 // [re]calc latency if 0
1522 if(!ref->m_latency)
1523 ref->_updateLatency();
1525 node_set::iterator it;
1526 for(it = m_cycleNodes.begin();
1527 it != m_cycleNodes.end(); ++it) {
1528 if(ref->m_latency > (*it)->m_latency) {
1529 m_cycleNodes.insert(it, ref);
1530 break;
1534 // not inserted? new ref belongs at the end
1535 if(it == m_cycleNodes.end())
1536 m_cycleNodes.insert(it, ref);
1539 // remove a ref from the cycle set
1540 void NodeGroup::_cycleRemoveRef(
1541 NodeRef* ref) {
1542 assert_locked(this);
1544 node_set::iterator it = find(
1545 m_cycleNodes.begin(),
1546 m_cycleNodes.end(),
1547 ref);
1548 ASSERT(it != m_cycleNodes.end());
1549 m_cycleNodes.erase(it);
1552 bigtime_t NodeGroup::_cycleBoundary() const {
1553 Autolock _l(this);
1554 return m_cycleBoundary;
1557 // cycle thread impl.
1558 /*static*/
1559 status_t NodeGroup::_CycleThread(void* user) {
1560 ((NodeGroup*)user)->_cycleThread();
1561 return B_OK;
1564 void NodeGroup::_cycleThread() {
1566 status_t err;
1567 int32 code;
1568 int32 errorCount = 0;
1570 // +++++ liability -- if the thread has to be killed, this buffer
1571 // won't be reclaimed
1572 char* msgBuffer = new char[_portMsgMaxSize];
1573 array_delete<char> _d(msgBuffer);
1575 // create port
1576 ASSERT(!m_cyclePort);
1577 m_cyclePort = create_port(
1578 _portLength,
1579 _portName);
1580 ASSERT(m_cyclePort >= B_OK);
1582 // the message-handling loop
1583 bool done = false;
1584 while(!done) {
1586 // *** wait until it's time to queue the next cycle, or until
1587 // *** a message arrives
1589 lock(); // **** BEGIN LOCKED SECTION ****
1590 if(!_cycleValid()) {
1591 unlock();
1592 break;
1595 ASSERT(m_cycleNodes.size() > 0);
1596 ASSERT(m_timeSourceObj);
1598 bigtime_t maxLatency = m_cycleNodes.front()->m_latency;
1599 bigtime_t wakeUpAt = m_timeSourceObj->RealTimeFor(
1600 m_cycleBoundary, maxLatency + s_rosterLatency);
1601 bigtime_t timeout = wakeUpAt - m_timeSourceObj->RealTime();
1603 if(timeout <= 0) {
1604 // +++++ whoops, I'm late.
1605 // +++++ adjust to compensate !!!
1606 PRINT((
1607 "*** NodeGroup::_cycleThread(): LATE\n"
1608 " by %" B_PRIdBIGTIME "\n", -timeout));
1611 // +++++ if timeout is very short, spin until the target time arrives
1613 unlock(); // **** END LOCKED SECTION ****
1615 // block until message arrives or it's time to wake up
1616 err = read_port_etc(
1617 m_cyclePort,
1618 &code,
1619 msgBuffer,
1620 _portMsgMaxSize,
1621 B_TIMEOUT,
1622 timeout);
1624 if(err == B_TIMED_OUT) {
1625 // the time has come to seek my nodes
1626 _handleCycleService();
1627 continue;
1629 else if(err < B_OK) {
1630 // any other error is bad news
1631 PRINT((
1632 "* NodeGroup::_cycleThread(): read_port error:\n"
1633 " %s\n"
1634 " ABORTING\n\n", strerror(err)));
1635 if(++errorCount > 10) {
1636 PRINT((
1637 "*** Too many errors; aborting.\n"));
1638 break;
1640 continue;
1643 errorCount = 0;
1645 // process the message
1646 switch(code) {
1647 case _CYCLE_STOP:
1648 // bail
1649 done = true;
1650 break;
1652 case _CYCLE_END_CHANGED:
1653 case _CYCLE_LATENCY_CHANGED:
1654 // fall through to next loop; for now, these messages
1655 // serve only to slap me out of my stupor and reassess
1656 // the timing situation...
1657 break;
1659 default:
1660 PRINT((
1661 "* NodeGroup::_cycleThread(): unknown message code '%"
1662 B_PRId32 "'\n", code));
1663 break;
1665 } // while(!done)
1668 // delete port
1669 delete_port(m_cyclePort);
1670 m_cyclePort = 0;
1672 // done
1673 m_cycleThreadDone = true;
1676 // cycle service: seek all nodes & initiate next cycle
1677 void NodeGroup::_handleCycleService() {
1678 Autolock _l(this);
1679 // D_METHOD((
1680 // "NodeGroup::_handleCycleService()\n"));
1681 status_t err;
1683 if(!_cycleValid()) {
1684 // PRINT((
1685 // "- _handleCycleService(): cycle not valid; quitting.\n"));
1686 return;
1689 // seek
1690 for(node_set::iterator it = m_cycleNodes.begin();
1691 it != m_cycleNodes.end(); ++it) {
1692 err = (*it)->_seek(
1693 m_startPosition,
1694 m_cycleBoundary);
1695 if(err < B_OK) {
1696 PRINT((
1697 "- _handleCycleService(): node('%s')::_seek() failed:\n"
1698 " %s\n",
1699 (*it)->name(), strerror(err)));
1703 // update cycle settings
1704 if(m_newStart) {
1705 m_newStart = false;
1706 m_startPosition = m_newStartPosition;
1708 if(m_newEnd) {
1709 m_newEnd = false;
1710 m_endPosition = m_newEndPosition;
1713 // prepare next cycle
1714 _cycleInit(m_cycleBoundary);
1717 // END -- NodeGroup.cpp --