2 * Copyright (c) 1999-2000, Eric Moon.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions, and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions, and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
16 * 3. The name of the author may not be used to endorse or promote products
17 * derived from this software without specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
20 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21 * OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
26 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
27 * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "NodeGroup.h"
35 //#include "NodeGroup_transport_thread.h"
37 #include "NodeManager.h"
40 #include <MediaRoster.h>
42 #include <TimeSource.h>
47 #include "array_delete.h"
48 #include "BasicThread.h"
49 #include "node_manager_impl.h"
50 #include "functional_tools.h"
54 __USE_CORTEX_NAMESPACE
55 #define D_METHOD(x) //PRINT (x)
56 #define D_ROSTER(x) //PRINT (x)
57 #define D_LOCK(x) //PRINT (x)
61 // -------------------------------------------------------- //
63 // -------------------------------------------------------- //
65 // free the group, including all nodes within it
66 // (this call will result in the eventual deletion of the object.)
67 // returns B_OK on success; B_NOT_ALLOWED if release() has
68 // already been called; other error codes if the Media Roster
70 // * THE MANAGER MUST BE LOCKED
72 status_t
NodeGroup::release() {
75 "NodeGroup::release()\n"));
86 // remove & release all nodes
87 // +++++ causes triply-nested lock: eww!
88 while(m_nodes
.size()) {
89 NodeRef
* last
= m_nodes
.back();
90 removeNode(m_nodes
.size()-1);
97 // removing the released group is now NodeManager's responsibility
99 // remove from NodeManager
100 if(!m_manager
->lock()) {
101 ASSERT(!"* m_manager->lock() failed.\n");
103 m_manager
->_removeGroup(this);
106 // hand off to IObservable
107 return _inherited::release();
110 // call release() rather than deleting NodeGroup objects
111 NodeGroup::~NodeGroup() {
117 ASSERT(!m_nodes
.size());
119 if(m_timeSourceObj
) {
120 m_timeSourceObj
->Release();
126 // -------------------------------------------------------- //
128 // -------------------------------------------------------- //
130 // [e.moon 13oct99] moved to header
131 //inline uint32 NodeGroup::id() const { return m_id; }
133 // -------------------------------------------------------- //
135 // -------------------------------------------------------- //
138 const char* NodeGroup::name() const {
140 return m_name
.String();
143 status_t
NodeGroup::setName(const char* name
) {
150 uint32
NodeGroup::countNodes() const {
152 return m_nodes
.size();
155 NodeRef
* NodeGroup::nodeAt(
156 uint32 index
) const {
158 return (index
< m_nodes
.size()) ?
164 // - you may only add a node with no current group.
165 // - nodes added during playback will be started;
166 // nodes removed during playback will be stopped (unless
167 // the NO_START_STOP transport restriction flag is set
168 // for a given node.)
170 status_t
NodeGroup::addNode(
174 "NodeGroup::addNode()\n"));
176 // lock the manager first; if the node has no current group,
177 // this locks the node.
182 // precondition: GROUP_LOCKED not set
183 if(m_flags
& GROUP_LOCKED
)
184 return B_NOT_ALLOWED
;
186 // precondition: no current group
188 // [e.moon 28sep99] whoops, forgot one
190 "!!! node already in group '%s'\n", node
->m_group
->name()));
193 return B_NOT_ALLOWED
;
197 m_nodes
.push_back(node
);
198 node
->_setGroup(this);
200 // release the manager
203 // first node? the transport is now ready to start
204 if(m_nodes
.size() == 1) {
205 _changeState(TRANSPORT_INVALID
, TRANSPORT_STOPPED
);
208 // if(m_syncNode == media_node::null) {
209 // // assign as sync node
210 // setSyncNode(node->node());
213 // initialize the new node
214 status_t err
= node
->_initTransportState();
219 node
->_setTimeSource(m_timeSource
.node
);
222 node
->_setRunMode(m_runMode
);
224 // add to cycle set if need be
225 // +++++ should I call _cycleAddRef() instead?
227 _refCycleChanged(node
);
229 if(m_transportState
== TRANSPORT_RUNNING
) {
230 // +++++ start if necessary!
232 // +++++ not started if TRANSPORT_ROLLING: is that proper? [e.moon 11oct99]
236 ASSERT(!"LockLooper() failed.");
238 BMessage
m(M_NODE_ADDED
);
239 m
.AddInt32("groupID", id());
240 m
.AddInt32("nodeID", node
->id());
249 status_t
NodeGroup::removeNode(
253 "NodeGroup::removeNode()\n"));
255 // lock the manager first; once the node is ungrouped,
256 // the manager lock applies to it
261 // precondition: this must be the node's group
262 if(node
->m_group
!= this) {
263 // [e.moon 28sep99] whoops, forgot one
265 "!!! node not in group '%s'\n", node
->m_group
->name()));
268 return B_NOT_ALLOWED
;
271 // remove from the cycle set
273 _cycleRemoveRef(node
);
276 ASSERT(m_nodes
.size());
282 // should have removed one and only one entry
283 m_nodes
.resize(m_nodes
.size()-1);
285 // // 6aug99: the timesource is now the sync node...
286 // // is this the sync node? reassign if so
288 // if(node->node() == m_syncNode) {
290 // // look for another sync-capable node
291 // bool found = false;
292 // for(int n = 0; !found && n < m_nodes.size(); ++n)
293 // if(setSyncNode(m_nodes[n]->node()) == B_OK)
296 // // no luck? admit defeat:
299 // "* NodeGroup::removeNode(): no sync-capable nodes left!\n"));
301 // // +++++ stop & set to invalid state?
303 // setSyncNode(media_node::null);
307 // stop the node if necessary
308 status_t err
= node
->_stop();
311 "*** NodeGroup::removeNode('%s'): error from node->_stop():\n"
317 // clear the node's group pointer
320 // release the manager lock; the node is now ungrouped and
324 // was that the last node? stop/disable the transport if so
325 if(!m_nodes
.size()) {
327 // +++++ kill sync thread(s)
329 _changeState(TRANSPORT_INVALID
);
334 ASSERT(!"LockLooper() failed.");
336 BMessage
m(M_NODE_REMOVED
);
337 m
.AddInt32("groupID", id());
338 m
.AddInt32("nodeID", node
->id());
346 status_t
NodeGroup::removeNode(
350 "NodeGroup::removeNode(by index)\n"));
352 // +++++ icky nested lock
355 ASSERT(m_nodes
.size() > index
);
356 return removeNode(m_nodes
[index
]);
359 uint32
NodeGroup::groupFlags() const {
364 status_t
NodeGroup::setGroupFlags(
372 // returns true if one or more nodes in the group have cycling
373 // enabled, and the start- and end-positions are valid
374 bool NodeGroup::canCycle() const {
378 m_cycleNodes
.size() > 0 &&
379 m_endPosition
- m_startPosition
> s_minCyclePeriod
;
382 // -------------------------------------------------------- //
383 // *** TRANSPORT POSITIONING (LOCK REQUIRED)
384 // -------------------------------------------------------- //
386 // Fetch the current transport state
388 NodeGroup::transport_state_t
NodeGroup::transportState() const {
390 return m_transportState
;
393 // Set the starting media time:
394 // This is the point at which playback will begin in any media
395 // files/documents being played by the nodes in this group.
396 // When cycle mode is enabled, this is the point to which each
397 // node will be seek'd at the end of each cycle (loop).
399 // The starting time can't be changed in the B_OFFLINE run mode
400 // (this call will return an error.)
402 status_t
NodeGroup::setStartPosition(
407 "NodeGroup::setStartPosition(%Ld)\n", start
));
410 m_transportState
== TRANSPORT_RUNNING
||
411 m_transportState
== TRANSPORT_ROLLING
||
412 m_transportState
== TRANSPORT_STARTING
) {
414 if(m_runMode
== BMediaNode::B_OFFLINE
)
415 return B_NOT_ALLOWED
;
417 ASSERT(m_timeSourceObj
);
420 if(m_timeSourceObj
->Now() >= m_cycleDeadline
) {
421 // too late to change start position; defer
422 // PRINT((" - deferred\n"));
423 m_newStartPosition
= start
;
428 // not at deadline yet; fall through to set start position
432 m_startPosition
= start
;
434 // +++++ notify [e.moon 11oct99]
439 // Fetch the starting position:
441 // +++++ if a previously-set start position was deferred, it won't be
444 bigtime_t
NodeGroup::startPosition() const {
447 return m_startPosition
;
450 // Set the ending media time:
451 // This is the point at which playback will end relative to
452 // media documents begin played by the nodes in this group;
453 // in cycle mode, this specifies the loop point. If the
454 // ending time is less than or equal to the starting time,
455 // the transport will continue until stopped manually.
456 // If the end position is changed while the transport is playing,
457 // it must take effect retroactively (if it's before the current
458 // position and looping is enabled, all nodes must 'warp' to
459 // the proper post-loop position.)
461 // The ending time can't be changed if run mode is B_OFFLINE and
462 // the transport is running (this call will return an error.)
464 status_t
NodeGroup::setEndPosition(
469 "NodeGroup::setEndPosition(%Ld)\n", end
));
472 m_transportState
== TRANSPORT_RUNNING
||
473 m_transportState
== TRANSPORT_ROLLING
||
474 m_transportState
== TRANSPORT_STARTING
) {
476 if(m_runMode
== BMediaNode::B_OFFLINE
)
477 return B_NOT_ALLOWED
;
479 ASSERT(m_timeSourceObj
);
481 bigtime_t endDelta
= end
- m_endPosition
;
484 if(m_timeSourceObj
->Now() >= m_cycleDeadline
+ endDelta
) {
485 // too late to change end position; defer
486 // PRINT((" - deferred\n"));
487 m_newEndPosition
= end
;
492 // set new end position
503 // // restart nodes' cycle threads with new end position
504 // _cycleInit(m_cycleStart);
505 // for(node_set::iterator it = m_cycleNodes.begin();
506 // it != m_cycleNodes.end(); ++it) {
507 // (*it)->_scheduleCycle(m_cycleBoundary);
516 // +++++ notify [e.moon 11oct99]
522 // Fetch the end position:
523 // Note that if the end position is less than or equal to the start
524 // position, it's ignored.
526 // +++++ if a previously-set end position was deferred, it won't be
529 bigtime_t
NodeGroup::endPosition() const {
531 return m_endPosition
;
534 // -------------------------------------------------------- //
535 // *** TRANSPORT OPERATIONS (LOCK REQUIRED)
536 // -------------------------------------------------------- //
538 // Preroll the group:
539 // Seeks, then prerolls, each node in the group (honoring the
540 // NO_SEEK and NO_PREROLL flags.) This ensures that the group
541 // can start as quickly as possible.
543 // Does not return until all nodes in the group have been
546 status_t
NodeGroup::preroll() {
548 "NodeGroup::preroll()\n"));
554 // Start all nodes in the group:
555 // Nodes with the NO_START_STOP flag aren't molested.
557 status_t
NodeGroup::start() {
559 "NodeGroup::start()\n"));
565 // Stop all nodes in the group:
566 // Nodes with the NO_START_STOP flag aren't molested.
568 status_t
NodeGroup::stop() {
570 "NodeGroup::stop()\n"));
576 // Roll all nodes in the group:
577 // Queues a start and stop atomically (via BMediaRoster::RollNode()).
578 // Returns B_NOT_ALLOWED if endPosition <= startPosition;
580 status_t
NodeGroup::roll() {
582 "NodeGroup::roll()\n"));
588 // -------------------------------------------------------- //
589 // *** TIME SOURCE & RUN-MODE OPERATIONS (LOCK REQUIRED)
590 // -------------------------------------------------------- //
592 // time source control:
594 // returns B_ERROR if no time source has been set; otherwise,
595 // returns the node ID of the current time source for all
596 // nodes in the group.
599 // Calls SetTimeSourceFor() on every node in the group.
600 // The group must be stopped; B_NOT_ALLOWED will be returned
601 // if the state is TRANSPORT_RUNNING or TRANSPORT_ROLLING.
603 status_t
NodeGroup::getTimeSource(
604 media_node
* outTimeSource
) const {
607 if(m_timeSource
!= media_node::null
) {
608 *outTimeSource
= m_timeSource
;
614 status_t
NodeGroup::setTimeSource(
615 const media_node
& timeSource
) {
619 if(m_transportState
== TRANSPORT_RUNNING
|| m_transportState
== TRANSPORT_ROLLING
)
620 return B_NOT_ALLOWED
;
623 m_timeSourceObj
->Release();
625 m_timeSource
= timeSource
;
627 // cache a BTimeSource*
628 m_timeSourceObj
= m_manager
->roster
->MakeTimeSourceFor(timeSource
);
629 ASSERT(m_timeSourceObj
);
631 // apply new time source to all nodes
636 mem_fun(&NodeRef::_setTimeSource
),
641 // // try to set as sync node
642 // err = setSyncNode(timeSource);
645 // "* NodeGroup::setTimeSource(): setSyncNode() failed: %s\n",
651 ASSERT(!"LockLooper() failed.");
653 BMessage
m(M_TIME_SOURCE_CHANGED
);
654 m
.AddInt32("groupID", id());
655 m
.AddInt32("timeSourceID", timeSource
.node
);
663 // Sets the default run mode for the group. This will be
664 // applied to every node with a wildcard (0) run mode.
666 // Special case: if the run mode is B_OFFLINE, it will be
667 // applied to all nodes in the group.
669 BMediaNode::run_mode
NodeGroup::runMode() const {
674 status_t
NodeGroup::setRunMode(BMediaNode::run_mode mode
) {
679 // apply to all nodes
684 mem_fun(&NodeRef::_setRunModeAuto
),
689 // &NodeGroup::setRunModeFor)
696 // -------------------------------------------------------- //
698 // -------------------------------------------------------- //
700 void NodeGroup::MessageReceived(
704 // "NodeGroup::MessageReceived():\n"));
705 // message->PrintToStream();
708 switch(message
->what
) {
709 case M_SET_TIME_SOURCE
:
711 media_node timeSource
;
714 err
= message
->FindData(
721 "* NodeGroup::MessageReceived(M_SET_TIME_SOURCE):\n"
722 " no timeSourceNode!\n"));
725 timeSource
= *(media_node
*)data
;
727 setTimeSource(timeSource
);
734 err
= message
->FindInt32("runMode", (int32
*)&runMode
);
737 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
742 if(runMode
< BMediaNode::B_OFFLINE
||
743 runMode
> BMediaNode::B_RECORDING
) {
745 "* NodeGroup::MessageReceived(M_SET_RUN_MODE):\n"
746 " invalid run mode (%" B_PRIu32
")\n", runMode
));
750 setRunMode((BMediaNode::run_mode
)runMode
);
754 case M_SET_START_POSITION
:
757 err
= message
->FindInt64("position", (int64
*)&position
);
760 "* NodeGroup::MessageReceived(M_SET_START_POSITION):\n"
764 setStartPosition(position
);
768 case M_SET_END_POSITION
:
771 err
= message
->FindInt64("position", (int64
*)&position
);
774 "* NodeGroup::MessageReceived(M_SET_END_POSITION):\n"
778 setEndPosition(position
);
799 _inherited::MessageReceived(message
);
805 // -------------------------------------------------------- //
807 // -------------------------------------------------------- //
815 // Default constructor
816 NodeGroup::NodeGroup() :
817 m_manager(0) {} // +++++ finish initialization
821 #endif /*CORTEX_XML*/
824 // -------------------------------------------------------- //
825 // *** IObservable: [19aug99]
826 // -------------------------------------------------------- //
828 void NodeGroup::observerAdded(
829 const BMessenger
& observer
) {
831 BMessage
m(M_OBSERVER_ADDED
);
832 m
.AddInt32("groupID", id());
833 m
.AddMessenger("target", BMessenger(this));
834 observer
.SendMessage(&m
);
837 void NodeGroup::observerRemoved(
838 const BMessenger
& observer
) {
840 BMessage
m(M_OBSERVER_REMOVED
);
841 m
.AddInt32("groupID", id());
842 m
.AddMessenger("target", BMessenger(this));
843 observer
.SendMessage(&m
);
846 void NodeGroup::notifyRelease() {
848 BMessage
m(M_RELEASED
);
849 m
.AddInt32("groupID", id());
850 m
.AddMessenger("target", BMessenger(this));
854 void NodeGroup::releaseComplete() {
858 // -------------------------------------------------------- //
859 // *** ILockable: pass lock requests to m_lock
860 // -------------------------------------------------------- //
862 bool NodeGroup::lock(
866 D_LOCK(("*** NodeGroup::lock(): %ld\n", find_thread(0)));
868 ASSERT(type
== WRITE
);
869 status_t err
= m_lock
.LockWithTimeout(timeout
);
871 D_LOCK(("*** NodeGroup::lock() ACQUIRED: %ld\n", find_thread(0)));
876 bool NodeGroup::unlock(
879 D_LOCK(("*** NodeGroup::unlock(): %ld\n", find_thread(0)));
881 ASSERT(type
== WRITE
);
884 D_LOCK(("*** NodeGroup::unlock() RELEASED: %ld\n", find_thread(0)));
889 bool NodeGroup::isLocked(
892 ASSERT(type
== WRITE
);
893 return m_lock
.IsLocked();
896 // -------------------------------------------------------- //
897 // *** ctor (accessible to NodeManager)
898 // -------------------------------------------------------- //
900 NodeGroup::NodeGroup(
902 NodeManager
* manager
,
903 BMediaNode::run_mode runMode
) :
905 ObservableHandler(name
),
906 m_lock("NodeGroup::m_lock"),
911 m_transportState(TRANSPORT_INVALID
),
917 m_startPosition(0LL),
924 if(!m_manager
->Lock()) {
925 ASSERT(!"m_manager->Lock() failed");
927 m_manager
->AddHandler(this);
930 // set default time source
932 D_ROSTER(("# roster->GetTimeSource()\n"));
933 status_t err
= m_manager
->roster
->GetTimeSource(&ts
);
936 "*** NodeGroup(): roster->GetTimeSource() failed:\n"
937 " %s\n", strerror(err
)));
942 // -------------------------------------------------------- //
943 // *** internal operations
944 // -------------------------------------------------------- //
946 uint32
NodeGroup::s_nextID
= 1;
947 uint32
NodeGroup::NextID() {
948 return atomic_add((int32
*)&s_nextID
, 1);
951 // -------------------------------------------------------- //
952 // *** ref->group communication (LOCK REQUIRED)
953 // -------------------------------------------------------- //
955 // when a NodeRef's cycle state (ie. looping or not looping)
956 // changes, it must pass that information on via this method
958 void NodeGroup::_refCycleChanged(
962 "NodeGroup::_refCycleChanged('%s')\n",
968 _cycleRemoveRef(ref
);
971 // +++++ if running & cycle valid, the node should be properly
972 // seek'd and start'd
976 // when a cycling node's latency changes, call this method.
978 void NodeGroup::_refLatencyChanged(
982 "NodeGroup::_refLatencyChanged('%s')\n",
988 // remove & replace ref (positions it properly)
989 _cycleRemoveRef(ref
);
996 _CYCLE_LATENCY_CHANGED
,
1003 // when a NodeRef receives notification that it has been stopped,
1004 // but is labeled as still running, it must call this method.
1005 // [e.moon 11oct99: roll/B_OFFLINE support]
1007 void NodeGroup::_refStopped(
1009 assert_locked(this);
1011 "NodeGroup::_refStopped('%s')\n",
1014 // roll/B_OFFLINE support [e.moon 11oct99]
1015 // (check to see if any other nodes in the group are still running;
1016 // mark group stopped if not.)
1017 if(m_transportState
== TRANSPORT_ROLLING
) {
1018 bool nodesRunning
= false;
1019 for(node_set::iterator it
= m_nodes
.begin();
1020 it
!= m_nodes
.end(); ++it
) {
1021 if((*it
)->isRunning()) {
1022 nodesRunning
= true;
1027 // the group has stopped; update transport state
1028 _changeState(TRANSPORT_STOPPED
);
1035 // -------------------------------------------------------- //
1036 // *** transport helpers (LOCK REQUIRED)
1037 // -------------------------------------------------------- //
1040 // Preroll all nodes in the group; this is the implementation
1042 // *** this method should not be called from the transport thread
1043 // (since preroll operations can block for a relatively long time.)
1045 status_t
NodeGroup::_preroll() {
1046 assert_locked(this);
1049 "NodeGroup::_preroll()\n"));
1052 m_transportState
== TRANSPORT_RUNNING
||
1053 m_transportState
== TRANSPORT_ROLLING
)
1055 return B_NOT_ALLOWED
;
1057 // * preroll all nodes to the start position
1059 // +++++ currently, if an error is encountered it's ignored.
1060 // should the whole operation fail if one node couldn't
1063 // My gut response is 'no', since the preroll step is
1064 // optional, but the caller should have some inkling that
1065 // one of its nodes didn't behave.
1067 // [e.moon 13oct99] making PPC compiler happy
1072 // mem_fun(&NodeRef::_preroll),
1076 for(node_set::iterator it
= m_nodes
.begin();
1077 it
!= m_nodes
.end(); ++it
) {
1078 (*it
)->_preroll(m_startPosition
);
1083 // bound_method(*this, &NodeGroup::prerollNode),
1091 //// functor: calculates latency of each node it's handed, caching
1092 //// the largest one found; includes initial latency if nodes report it.
1094 //class NodeGroup::calcLatencyFn { public:
1095 // bigtime_t& maxLatency;
1097 // calcLatencyFn(bigtime_t& _m) : maxLatency(_m) {}
1099 // void operator()(NodeRef* r) {
1103 //// "# calcLatencyFn(): '%s'\n",
1106 // if(!(r->node().kind & B_BUFFER_PRODUCER)) {
1107 // // node can't incur latency
1109 //// "- not a producer\n"));
1113 // bigtime_t latency;
1115 // BMediaRoster::Roster()->GetLatencyFor(
1120 // "* calcLatencyFn: GetLatencyFor() failed: %s\n",
1124 //// PRINT(("- %Ld\n", latency));
1127 // err = BMediaRoster::Roster()->GetInitialLatencyFor(
1130 //// PRINT(("- %Ld\n", add));
1133 // "* calcLatencyFn: GetInitialLatencyFor() failed: %s\n",
1139 // if(latency > maxLatency)
1140 // maxLatency = latency;
1143 //// "- max latency: %Ld\n",
1148 // Start all nodes in the group; this is the implementation of
1149 // start(). Fails if the run mode is B_OFFLINE; use _roll() instead
1152 // (this may be called from the transport thread or from
1153 // an API-implementation method.)
1155 status_t
NodeGroup::_start() {
1156 assert_locked(this);
1159 "NodeGroup::_start()\n"));
1162 if(m_transportState
!= TRANSPORT_STOPPED
)
1163 return B_NOT_ALLOWED
;
1165 if(m_runMode
== BMediaNode::B_OFFLINE
)
1166 return B_NOT_ALLOWED
;
1168 ASSERT(m_nodes
.size());
1170 _changeState(TRANSPORT_STARTING
);
1172 // * Find the highest latency in the group
1174 bigtime_t offset
= 0LL;
1175 calcLatencyFn
_f(offset
);
1181 offset
+= s_rosterLatency
;
1183 "- offset: %" B_PRIdBIGTIME
"\n", offset
));
1185 // * Seek all nodes (in case one or more failed to preroll)
1187 for(node_set::iterator it
= m_nodes
.begin();
1188 it
!= m_nodes
.end(); ++it
) {
1189 err
= (*it
)->_seekStopped(m_startPosition
);
1192 "! NodeGroup('%s')::_start():\n"
1193 " ref('%s')->_seekStopped(%" B_PRIdBIGTIME
") failed:\n"
1195 name(), (*it
)->name(), m_startPosition
,
1202 // * Start all nodes, allowing for the max latency found
1204 ASSERT(m_timeSourceObj
);
1205 bigtime_t when
= m_timeSourceObj
->Now() + offset
;
1207 // 10aug99: initialize cycle (loop) settings
1214 for(node_set::iterator it
= m_nodes
.begin();
1215 it
!= m_nodes
.end(); ++it
) {
1216 err
= (*it
)->_start(when
);
1219 "! NodeGroup('%s')::_start():\n"
1220 " ref('%s')->_start(%" B_PRIdBIGTIME
") failed:\n"
1222 name(), (*it
)->name(), when
,
1230 _changeState(TRANSPORT_RUNNING
);
1234 // Stop all nodes in the group; this is the implementation of
1237 // (this may be called from the transport thread or from
1238 // an API-implementation method.)
1240 status_t
NodeGroup::_stop() {
1243 "NodeGroup::_stop()\n"));
1245 assert_locked(this);
1248 m_transportState
!= TRANSPORT_RUNNING
&&
1249 m_transportState
!= TRANSPORT_ROLLING
)
1250 return B_NOT_ALLOWED
;
1252 _changeState(TRANSPORT_STOPPING
);
1254 // * stop the cycle thread if need be
1255 _destroyCycleThread();
1258 // +++++ error reports would be nice
1263 mem_fun(&NodeRef::_stop
)
1266 // update transport state
1267 _changeState(TRANSPORT_STOPPED
);
1272 // Roll all nodes in the group; this is the implementation of
1275 // (this may be called from the transport thread or from
1276 // an API-implementation method.)
1278 status_t
NodeGroup::_roll() {
1281 "NodeGroup::_roll()\n"));
1282 assert_locked(this);
1285 if(m_transportState
!= TRANSPORT_STOPPED
)
1286 return B_NOT_ALLOWED
;
1288 bigtime_t period
= m_endPosition
- m_startPosition
;
1290 return B_NOT_ALLOWED
;
1292 _changeState(TRANSPORT_STARTING
);
1294 bigtime_t tpStart
= 0LL;
1295 bigtime_t tpStop
= period
;
1297 if(m_runMode
!= BMediaNode::B_OFFLINE
) {
1299 // * Find the highest latency in the group
1300 bigtime_t offset
= 0LL;
1301 calcLatencyFn
_f(offset
);
1307 offset
+= s_rosterLatency
;
1309 "- offset: %" B_PRIdBIGTIME
"\n", offset
));
1311 ASSERT(m_timeSourceObj
);
1312 tpStart
= m_timeSourceObj
->Now() + offset
;
1316 // * Roll all nodes; watch for errors
1317 bool allFailed
= true;
1320 node_set::iterator it
= m_nodes
.begin();
1321 it
!= m_nodes
.end(); ++it
) {
1323 status_t e
= (*it
)->_roll(
1335 _changeState(TRANSPORT_ROLLING
);
1341 // State transition; notify listeners
1342 // +++++ [18aug99] DANGER: should notification happen in the middle
1343 // of such an operation?
1344 inline void NodeGroup::_changeState(
1345 transport_state_t to
) {
1347 assert_locked(this);
1349 m_transportState
= to
;
1352 ASSERT(!"LockLooper() failed.");
1354 BMessage
m(M_TRANSPORT_STATE_CHANGED
);
1355 m
.AddInt32("groupID", id());
1356 m
.AddInt32("transportState", m_transportState
);
1361 // Enforce a state transition, and notify listeners
1362 inline void NodeGroup::_changeState(
1363 transport_state_t from
,
1364 transport_state_t to
) {
1366 assert_locked(this);
1367 ASSERT(m_transportState
== from
);
1373 // -------------------------------------------------------- //
1374 // *** cycle thread & helpers (LOCK REQUIRED)
1375 // -------------------------------------------------------- //
1377 // *** cycle port definitions
1379 const int32 _portLength
= 32;
1380 const char* const _portName
= "NodeGroup::m_cyclePort";
1381 const size_t _portMsgMaxSize
= 256;
1384 // set up the cycle thread (including its kernel port)
1385 status_t
NodeGroup::_initCycleThread() {
1386 assert_locked(this);
1389 "NodeGroup::_initCycleThread()\n"));
1392 // thread is still alive
1393 err
= _destroyCycleThread();
1399 m_cycleThreadDone
= false;
1400 m_cycleThread
= spawn_thread(
1402 "NodeGroup[cycleThread]",
1405 if(m_cycleThread
< B_OK
) {
1407 "* NodeGroup::_initCycleThread(): spawn_thread() failed:\n"
1409 strerror(m_cycleThread
)));
1410 return m_cycleThread
;
1414 return resume_thread(m_cycleThread
);
1417 // shut down the cycle thread/port
1418 status_t
NodeGroup::_destroyCycleThread() {
1419 assert_locked(this);
1422 "NodeGroup::_destroyCycleThread()\n"));
1427 if(!m_cycleThreadDone
) {
1429 ASSERT(m_cyclePort
);
1430 err
= write_port_etc(
1439 // bad thread. die, thread, die.
1441 "* NodeGroup::_destroyCycleThread(): port write failed; killing.\n"));
1442 delete_port(m_cyclePort
);
1444 kill_thread(m_cycleThread
);
1449 // the thread got the message; wait for it to quit
1451 while(wait_for_thread(m_cycleThread
, &err
) == B_INTERRUPTED
) {
1453 "! wait_for_thread(m_cycleThread, &err) == B_INTERRUPTED\n"));
1458 // it's up to the thread to close its port
1459 ASSERT(!m_cyclePort
);
1467 // 1) do the current positions specify a valid cycle region?
1468 // 2) are any nodes in the group cycle-enabled?
1470 bool NodeGroup::_cycleValid() {
1471 assert_locked(this);
1473 (m_transportState
== TRANSPORT_RUNNING
||
1474 m_transportState
== TRANSPORT_STARTING
) &&
1478 // initialize the cycle members (call when starting)
1480 void NodeGroup::_cycleInit(
1481 bigtime_t startTime
) {
1482 assert_locked(this);
1483 ASSERT(m_cycleNodes
.size() > 0);
1485 "NodeGroup::_cycleInit(%Ld)\n",
1488 // +++++ rescan latencies?
1490 // figure new boundary & deadline from region length
1491 bigtime_t cyclePeriod
= m_endPosition
- m_startPosition
;
1493 if(cyclePeriod
<= 0) {
1494 // cycle region is no longer valid
1495 m_cycleBoundary
= 0LL;
1496 m_cycleDeadline
= 0LL;
1498 // no no no -- deadlocks when the thread calls this method
1499 // // stop the thread
1500 // _destroyCycleThread();
1504 m_cycleStart
= startTime
;
1505 m_cycleBoundary
= startTime
+ cyclePeriod
;
1506 m_cycleDeadline
= m_cycleBoundary
- (m_cycleMaxLatency
+ s_rosterLatency
);
1510 // add a ref to the cycle set (in proper order, based on latency)
1511 void NodeGroup::_cycleAddRef(
1513 assert_locked(this);
1515 // make sure it's not already there
1517 m_cycleNodes
.begin(),
1519 ref
) == m_cycleNodes
.end());
1521 // [re]calc latency if 0
1523 ref
->_updateLatency();
1525 node_set::iterator it
;
1526 for(it
= m_cycleNodes
.begin();
1527 it
!= m_cycleNodes
.end(); ++it
) {
1528 if(ref
->m_latency
> (*it
)->m_latency
) {
1529 m_cycleNodes
.insert(it
, ref
);
1534 // not inserted? new ref belongs at the end
1535 if(it
== m_cycleNodes
.end())
1536 m_cycleNodes
.insert(it
, ref
);
1539 // remove a ref from the cycle set
1540 void NodeGroup::_cycleRemoveRef(
1542 assert_locked(this);
1544 node_set::iterator it
= find(
1545 m_cycleNodes
.begin(),
1548 ASSERT(it
!= m_cycleNodes
.end());
1549 m_cycleNodes
.erase(it
);
1552 bigtime_t
NodeGroup::_cycleBoundary() const {
1554 return m_cycleBoundary
;
1557 // cycle thread impl.
1559 status_t
NodeGroup::_CycleThread(void* user
) {
1560 ((NodeGroup
*)user
)->_cycleThread();
1564 void NodeGroup::_cycleThread() {
1568 int32 errorCount
= 0;
1570 // +++++ liability -- if the thread has to be killed, this buffer
1571 // won't be reclaimed
1572 char* msgBuffer
= new char[_portMsgMaxSize
];
1573 array_delete
<char> _d(msgBuffer
);
1576 ASSERT(!m_cyclePort
);
1577 m_cyclePort
= create_port(
1580 ASSERT(m_cyclePort
>= B_OK
);
1582 // the message-handling loop
1586 // *** wait until it's time to queue the next cycle, or until
1587 // *** a message arrives
1589 lock(); // **** BEGIN LOCKED SECTION ****
1590 if(!_cycleValid()) {
1595 ASSERT(m_cycleNodes
.size() > 0);
1596 ASSERT(m_timeSourceObj
);
1598 bigtime_t maxLatency
= m_cycleNodes
.front()->m_latency
;
1599 bigtime_t wakeUpAt
= m_timeSourceObj
->RealTimeFor(
1600 m_cycleBoundary
, maxLatency
+ s_rosterLatency
);
1601 bigtime_t timeout
= wakeUpAt
- m_timeSourceObj
->RealTime();
1604 // +++++ whoops, I'm late.
1605 // +++++ adjust to compensate !!!
1607 "*** NodeGroup::_cycleThread(): LATE\n"
1608 " by %" B_PRIdBIGTIME
"\n", -timeout
));
1611 // +++++ if timeout is very short, spin until the target time arrives
1613 unlock(); // **** END LOCKED SECTION ****
1615 // block until message arrives or it's time to wake up
1616 err
= read_port_etc(
1624 if(err
== B_TIMED_OUT
) {
1625 // the time has come to seek my nodes
1626 _handleCycleService();
1629 else if(err
< B_OK
) {
1630 // any other error is bad news
1632 "* NodeGroup::_cycleThread(): read_port error:\n"
1634 " ABORTING\n\n", strerror(err
)));
1635 if(++errorCount
> 10) {
1637 "*** Too many errors; aborting.\n"));
1645 // process the message
1652 case _CYCLE_END_CHANGED
:
1653 case _CYCLE_LATENCY_CHANGED
:
1654 // fall through to next loop; for now, these messages
1655 // serve only to slap me out of my stupor and reassess
1656 // the timing situation...
1661 "* NodeGroup::_cycleThread(): unknown message code '%"
1662 B_PRId32
"'\n", code
));
1669 delete_port(m_cyclePort
);
1673 m_cycleThreadDone
= true;
1676 // cycle service: seek all nodes & initiate next cycle
1677 void NodeGroup::_handleCycleService() {
1680 // "NodeGroup::_handleCycleService()\n"));
1683 if(!_cycleValid()) {
1685 // "- _handleCycleService(): cycle not valid; quitting.\n"));
1690 for(node_set::iterator it
= m_cycleNodes
.begin();
1691 it
!= m_cycleNodes
.end(); ++it
) {
1697 "- _handleCycleService(): node('%s')::_seek() failed:\n"
1699 (*it
)->name(), strerror(err
)));
1703 // update cycle settings
1706 m_startPosition
= m_newStartPosition
;
1710 m_endPosition
= m_newEndPosition
;
1713 // prepare next cycle
1714 _cycleInit(m_cycleBoundary
);
1717 // END -- NodeGroup.cpp --