Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Acknowledge.cpp
blobbd617f220fea13fd9fe33d7124cf37440ba4dca9
1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_unistd.h"
4 #include "ace/OS_NS_stdlib.h" // abort
5 #include "ace/OS_NS_sys_time.h" // gettimeofday
6 #include "Acknowledge.h"
7 #include <memory>
9 namespace ACE_RMCast
11 Acknowledge::
12 Acknowledge (Parameters const& params)
13 : params_ (params),
14 hold_ (params.addr_map_size ()),
15 cond_ (mutex_),
16 nrtm_timer_ (params_.nrtm_timeout ()),
17 stop_ (false)
21 void Acknowledge::
22 in_start (In_Element* in)
24 Element::in_start (in);
27 void Acknowledge::
28 out_start (Out_Element* out)
30 Element::out_start (out);
32 tracker_mgr_.spawn (track_thunk, this);
35 void Acknowledge::
36 out_stop ()
39 Lock l (mutex_);
40 stop_ = true;
41 cond_.signal ();
44 tracker_mgr_.wait ();
46 Element::out_stop ();
49 void Acknowledge::
50 collapse (Queue& q)
52 // I would normally use iterators in the logic below but ACE_Map_Manager
53 // iterates over entries in no particular order so it is pretty much
54 // unusable here. Instead we will do slow and cumbersome find's.
57 u64 sn (q.sn () + 1);
59 for (;; ++sn)
61 Queue::ENTRY* e = 0;
63 if (q.find (sn, e) == -1 || e->int_id_.lost ()) break;
65 Message_ptr m (e->int_id_.msg ());
66 q.unbind (sn);
68 in_->recv (m);
71 q.sn (sn - 1);
74 void Acknowledge::
75 track ()
77 while (true)
79 Messages msgs;
82 Lock l (mutex_);
84 if (stop_)
85 break;
87 if (hold_.current_size () != 0)
89 for (Map::iterator i (hold_.begin ()), e (hold_.end ());
90 i != e;
91 ++i)
93 Queue& q = (*i).int_id_;
95 if (q.current_size () == 0) continue;
97 track_queue ((*i).ext_id_, q, msgs);
101 if (--nrtm_timer_ == 0)
103 nrtm_timer_ = params_.nrtm_timeout ();
105 // Send NRTM.
107 unsigned short max_payload_size (
108 params_.max_packet_size () - max_service_size);
110 u32 max_elem (NRTM::max_count (max_payload_size));
112 Profile_ptr nrtm (create_nrtm (max_elem));
114 if (!nrtm.null ())
116 Message_ptr m (new Message);
117 m->add (nrtm);
118 msgs.push_back (m);
123 // Send stuff off.
125 for (Messages::Iterator i (msgs); !i.done (); i.advance ())
127 Message_ptr* ppm;
128 i.next (ppm);
130 //FUZZ: disable check_for_lack_ACE_OS
131 send (*ppm);
132 //FUZZ: enable check_for_lack_ACE_OS
135 // Go to sleep but watch for "manual cancellation" request.
138 //FUZZ: disable check_for_lack_ACE_OS
139 ACE_Time_Value time (ACE_OS::gettimeofday ());
140 //FUZZ: enable check_for_lack_ACE_OS
142 time += params_.tick ();
144 Lock l (mutex_);
146 while (!stop_)
148 if (cond_.wait (&time) == -1)
150 if (errno != ETIME)
151 ACE_OS::abort ();
152 else
153 break;
157 if (stop_)
158 break;
163 void Acknowledge::
164 track_queue (Address const& addr, Queue& q, Messages& msgs)
166 unsigned short max_payload_size (
167 params_.max_packet_size () - max_service_size);
169 u32 max_elem (NAK::max_count (max_payload_size));
170 u32 count (0);
172 Queue::iterator i (q.begin ()), e (q.end ());
174 // Track existing losses.
176 while (i != e)
178 std::unique_ptr<NAK> nak (new NAK (addr));
180 // Inner loop that fills NAK profile with up to max_elem elements.
182 for (; i != e && nak->count () < max_elem; ++i)
184 u64 sn ((*i).ext_id_);
185 Descr& d = (*i).int_id_;
187 if (d.lost ())
189 d.timer (d.timer () - 1);
191 if (d.timer () == 0)
193 //@@ Need exp fallback.
195 d.nak_count (d.nak_count () + 1);
196 d.timer ((d.nak_count () + 1) * params_.nak_timeout ());
198 nak->add (sn);
200 ++count;
202 // cerr << 6 << "NAK # " << d.nak_count () << ": "
203 // << addr << " " << sn << endl;
208 // Send this NAK.
210 if (nak->count ())
212 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
213 // << endl;
215 Message_ptr m (new Message);
217 m->add (Profile_ptr (nak.release ()));
219 msgs.push_back (m);
223 // Detect and record new losses.
225 for (u64 sn (q.sn () + 1), end (q.max_sn ()); sn < end; ++sn)
227 if (q.find (sn) == -1)
229 q.bind (sn, Descr (1));
234 void Acknowledge::recv (Message_ptr m)
236 // Handle NRTM. There could be some nasty interaction with code
237 // that handles data below (like missing message and NAK). This
238 // is why I hold the lock at the beginning (which may be not very
239 // efficient).
241 Lock l (mutex_);
243 if (NRTM const* nrtm = static_cast<NRTM const*> (m->find (NRTM::id)))
245 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
247 u64 sn (nrtm->find ((*i).ext_id_));
249 if (sn != 0)
251 Queue& q = (*i).int_id_;
253 u64 old (q.max_sn ());
255 if (old < sn)
257 // Mark as lost.
259 q.bind (sn, Descr (1));
265 if (m->find (Data::id) || m->find (NoData::id))
267 Address from (
268 static_cast<From const*> (m->find (From::id))->address ());
270 u64 sn (static_cast<SN const*> (m->find (SN::id))->num ());
272 Map::ENTRY* e = 0;
274 if (hold_.find (from, e) == -1)
276 // First message from this source.
278 hold_.bind (from, Queue (sn));
279 in_->recv (m);
281 else
283 Queue& q = e->int_id_;
285 if (sn <= q.sn ())
287 // Duplicate.
289 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
290 // << endl;
292 else if (sn == q.sn () + 1)
294 // Next message.
297 q.rebind (sn, Descr (m));
298 collapse (q);
300 else
302 // Some messages are missing. Insert this one into the queue.
304 q.rebind (sn, Descr (m));
308 else
310 l.release ();
312 // Just forward it up.
314 in_->recv (m);
318 void Acknowledge::send (Message_ptr m)
320 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
322 size_t max_payload_size (
323 params_.max_packet_size () - max_service_size);
325 if (max_payload_size > data->size ())
327 u32 max_size (max_payload_size - data->size ());
328 u32 max_elem (NRTM::max_count (max_size));
330 if (max_elem > 0)
332 Lock l (mutex_);
334 Profile_ptr nrtm (create_nrtm (max_elem));
336 if (nrtm.get ())
337 m->add (nrtm);
341 nrtm_timer_ = params_.nrtm_timeout (); // Reset timer.
344 out_->send (m);
347 Profile_ptr Acknowledge::
348 create_nrtm (u32 max_elem)
350 // Prepare NRTM.
352 std::unique_ptr<NRTM> nrtm (new NRTM ());
354 // Gather the information.
357 for (Map::iterator i (hold_.begin ()), e (hold_.end ()); i != e; ++i)
359 Address addr ((*i).ext_id_);
360 Queue& q = (*i).int_id_;
362 //@@ Should look for the highest known number.
364 nrtm->insert (addr, q.sn ());
366 if (--max_elem == 0)
367 break;
371 if (nrtm->empty ())
372 return Profile_ptr (0);
373 else
374 return Profile_ptr (nrtm.release ());
377 ACE_THR_FUNC_RETURN Acknowledge::
378 track_thunk (void* obj)
380 reinterpret_cast<Acknowledge*> (obj)->track ();
381 return 0;