1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "ace/Time_Value.h" // ACE_Time_Value
3 #include "ace/OS_NS_stdlib.h" // abort
4 #include "ace/OS_NS_sys_time.h" // gettimeofday
6 #include "Retransmit.h"
11 Retransmit (Parameters
const& params
)
19 out_start (Out_Element
* out
)
21 Element::out_start (out
);
23 tracker_mgr_
.spawn (track_thunk
, this);
40 void Retransmit::send (Message_ptr m
)
42 if (m
->find (Data::id
) != 0)
44 SN
const* sn
= static_cast<SN
const*> (m
->find (SN::id
));
47 queue_
.bind (sn
->num (), Descr (m
->clone ()));
53 void Retransmit::recv (Message_ptr m
)
55 if (NAK
const* nak
= static_cast<NAK
const*> (m
->find (NAK::id
)))
57 Address
to (static_cast<To
const*> (m
->find (To::id
))->address ());
59 if (nak
->address () == to
)
63 for (NAK::iterator
j (const_cast<NAK
*> (nak
)->begin ());
74 if (queue_
.find (*psn
, pair
) == 0)
76 //cerr << 5 << "PRTM " << to << " " << pair->ext_id_ << endl;
78 m
= pair
->int_id_
.message ();
80 pair
->int_id_
.reset ();
84 //cerr << 4 << "message " << *psn << " not available" << endl;
86 m
= Message_ptr (new Message
);
87 m
->add (Profile_ptr (new SN (*psn
)));
88 m
->add (Profile_ptr (new NoData
));
99 ACE_THR_FUNC_RETURN
Retransmit::
100 track_thunk (void* obj
)
102 reinterpret_cast<Retransmit
*> (obj
)->track ();
113 for (Queue::iterator
i (queue_
); !i
.done ();)
115 if ((*i
).int_id_
.inc () >= params_
.retention_timeout ())
117 u64
sn ((*i
).ext_id_
);
127 //FUZZ: disable check_for_lack_ACE_OS
128 // Go to sleep but watch for "manual cancellation" request.
130 ACE_Time_Value
time (ACE_OS::gettimeofday ());
131 //FUZZ: enable check_for_lack_ACE_OS
133 time
+= params_
.tick ();
137 if (cond_
.wait (&time
) == -1)