1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_unistd.h"
4 #include "ace/OS_NS_stdlib.h" // abort
5 #include "ace/OS_NS_sys_time.h" // gettimeofday
6 #include "Acknowledge.h"
12 Acknowledge (Parameters
const& params
)
14 hold_ (params
.addr_map_size ()),
16 nrtm_timer_ (params_
.nrtm_timeout ()),
22 in_start (In_Element
* in
)
24 Element::in_start (in
);
28 out_start (Out_Element
* out
)
30 Element::out_start (out
);
32 tracker_mgr_
.spawn (track_thunk
, this);
52 // I would normally use iterators in the logic below but ACE_Map_Manager
53 // iterates over entries in no particular order so it is pretty much
54 // unusable here. Instead we will do slow and cumbersome find's.
63 if (q
.find (sn
, e
) == -1 || e
->int_id_
.lost ()) break;
65 Message_ptr
m (e
->int_id_
.msg ());
87 if (hold_
.current_size () != 0)
89 for (Map::iterator
i (hold_
.begin ()), e (hold_
.end ());
93 Queue
& q
= (*i
).int_id_
;
95 if (q
.current_size () == 0) continue;
97 track_queue ((*i
).ext_id_
, q
, msgs
);
101 if (--nrtm_timer_
== 0)
103 nrtm_timer_
= params_
.nrtm_timeout ();
107 unsigned short max_payload_size (
108 params_
.max_packet_size () - max_service_size
);
110 u32
max_elem (NRTM::max_count (max_payload_size
));
112 Profile_ptr
nrtm (create_nrtm (max_elem
));
116 Message_ptr
m (new Message
);
125 for (Messages::Iterator
i (msgs
); !i
.done (); i
.advance ())
130 //FUZZ: disable check_for_lack_ACE_OS
132 //FUZZ: enable check_for_lack_ACE_OS
135 // Go to sleep but watch for "manual cancellation" request.
138 //FUZZ: disable check_for_lack_ACE_OS
139 ACE_Time_Value
time (ACE_OS::gettimeofday ());
140 //FUZZ: enable check_for_lack_ACE_OS
142 time
+= params_
.tick ();
148 if (cond_
.wait (&time
) == -1)
164 track_queue (Address
const& addr
, Queue
& q
, Messages
& msgs
)
166 unsigned short max_payload_size (
167 params_
.max_packet_size () - max_service_size
);
169 u32
max_elem (NAK::max_count (max_payload_size
));
172 Queue::iterator
i (q
.begin ()), e (q
.end ());
174 // Track existing losses.
178 std::unique_ptr
<NAK
> nak (new NAK (addr
));
180 // Inner loop that fills NAK profile with up to max_elem elements.
182 for (; i
!= e
&& nak
->count () < max_elem
; ++i
)
184 u64
sn ((*i
).ext_id_
);
185 Descr
& d
= (*i
).int_id_
;
189 d
.timer (d
.timer () - 1);
193 //@@ Need exp fallback.
195 d
.nak_count (d
.nak_count () + 1);
196 d
.timer ((d
.nak_count () + 1) * params_
.nak_timeout ());
202 // cerr << 6 << "NAK # " << d.nak_count () << ": "
203 // << addr << " " << sn << endl;
212 // cerr << 5 << "NAK: " << addr << " " << nak->count () << " sns"
215 Message_ptr
m (new Message
);
217 m
->add (Profile_ptr (nak
.release ()));
223 // Detect and record new losses.
225 for (u64
sn (q
.sn () + 1), end (q
.max_sn ()); sn
< end
; ++sn
)
227 if (q
.find (sn
) == -1)
229 q
.bind (sn
, Descr (1));
234 void Acknowledge::recv (Message_ptr m
)
236 // Handle NRTM. There could be some nasty interaction with code
237 // that handles data below (like missing message and NAK). This
238 // is why I hold the lock at the beginning (which may be not very
243 if (NRTM
const* nrtm
= static_cast<NRTM
const*> (m
->find (NRTM::id
)))
245 for (Map::iterator
i (hold_
.begin ()), e (hold_
.end ()); i
!= e
; ++i
)
247 u64
sn (nrtm
->find ((*i
).ext_id_
));
251 Queue
& q
= (*i
).int_id_
;
253 u64
old (q
.max_sn ());
259 q
.bind (sn
, Descr (1));
265 if (m
->find (Data::id
) || m
->find (NoData::id
))
268 static_cast<From
const*> (m
->find (From::id
))->address ());
270 u64
sn (static_cast<SN
const*> (m
->find (SN::id
))->num ());
274 if (hold_
.find (from
, e
) == -1)
276 // First message from this source.
278 hold_
.bind (from
, Queue (sn
));
283 Queue
& q
= e
->int_id_
;
289 //cerr << 6 << "DUP " << from << " " << q.sn () << " >= " << sn
292 else if (sn
== q
.sn () + 1)
297 q
.rebind (sn
, Descr (m
));
302 // Some messages are missing. Insert this one into the queue.
304 q
.rebind (sn
, Descr (m
));
312 // Just forward it up.
318 void Acknowledge::send (Message_ptr m
)
320 if (Data
const* data
= static_cast<Data
const*> (m
->find (Data::id
)))
322 size_t max_payload_size (
323 params_
.max_packet_size () - max_service_size
);
325 if (max_payload_size
> data
->size ())
327 u32
max_size (max_payload_size
- data
->size ());
328 u32
max_elem (NRTM::max_count (max_size
));
334 Profile_ptr
nrtm (create_nrtm (max_elem
));
341 nrtm_timer_
= params_
.nrtm_timeout (); // Reset timer.
347 Profile_ptr
Acknowledge::
348 create_nrtm (u32 max_elem
)
352 std::unique_ptr
<NRTM
> nrtm (new NRTM ());
354 // Gather the information.
357 for (Map::iterator
i (hold_
.begin ()), e (hold_
.end ()); i
!= e
; ++i
)
359 Address
addr ((*i
).ext_id_
);
360 Queue
& q
= (*i
).int_id_
;
362 //@@ Should look for the highest known number.
364 nrtm
->insert (addr
, q
.sn ());
372 return Profile_ptr (0);
374 return Profile_ptr (nrtm
.release ());
377 ACE_THR_FUNC_RETURN
Acknowledge::
378 track_thunk (void* obj
)
380 reinterpret_cast<Acknowledge
*> (obj
)->track ();