Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Retransmit.cpp
blobc2c682d98780e2d1c03709d6153ccf206f775902
1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_stdlib.h" // abort
4 #include "ace/OS_NS_sys_time.h" // gettimeofday
6 #include "Retransmit.h"
8 namespace ACE_RMCast
10 Retransmit::
11 Retransmit (Parameters const& params)
12 : params_ (params),
13 cond_ (mutex_),
14 stop_ (false)
18 void Retransmit::
19 out_start (Out_Element* out)
21 Element::out_start (out);
23 tracker_mgr_.spawn (track_thunk, this);
26 void Retransmit::
27 out_stop ()
30 Lock l (mutex_);
31 stop_ = true;
32 cond_.signal ();
35 tracker_mgr_.wait ();
37 Element::out_stop ();
40 void Retransmit::send (Message_ptr m)
42 if (m->find (Data::id) != 0)
44 SN const* sn = static_cast<SN const*> (m->find (SN::id));
46 Lock l (mutex_);
47 queue_.bind (sn->num (), Descr (m->clone ()));
50 out_->send (m);
53 void Retransmit::recv (Message_ptr m)
55 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
57 Address to (static_cast<To const*> (m->find (To::id))->address ());
59 if (nak->address () == to)
61 Lock l (mutex_);
63 for (NAK::iterator j (const_cast<NAK*> (nak)->begin ());
64 !j.done ();
65 j.advance ())
67 u64* psn;
68 j.next (psn);
70 Message_ptr m;
72 Queue::ENTRY* pair;
74 if (queue_.find (*psn, pair) == 0)
76 //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
78 m = pair->int_id_.message ();
80 pair->int_id_.reset ();
82 else
84 //cerr << 4 << "message " << *psn << " not available" << endl;
86 m = Message_ptr (new Message);
87 m->add (Profile_ptr (new SN (*psn)));
88 m->add (Profile_ptr (new NoData));
91 out_->send (m);
96 in_->recv (m);
99 ACE_THR_FUNC_RETURN Retransmit::
100 track_thunk (void* obj)
102 reinterpret_cast<Retransmit*> (obj)->track ();
103 return 0;
106 void Retransmit::
107 track ()
109 while (true)
111 Lock l (mutex_);
113 for (Queue::iterator i (queue_); !i.done ();)
115 if ((*i).int_id_.inc () >= params_.retention_timeout ())
117 u64 sn ((*i).ext_id_);
118 i.advance ();
119 queue_.unbind (sn);
121 else
123 i.advance ();
127 //FUZZ: disable check_for_lack_ACE_OS
128 // Go to sleep but watch for "manual cancellation" request.
130 ACE_Time_Value time (ACE_OS::gettimeofday ());
131 //FUZZ: enable check_for_lack_ACE_OS
133 time += params_.tick ();
135 while (!stop_)
137 if (cond_.wait (&time) == -1)
139 if (errno != ETIME)
140 ACE_OS::abort ();
141 else
142 break;
146 if (stop_)
147 break;