1 // author : Boris Kolpackov <boris@kolpackov.net>
5 #include "ace/OS_NS_unistd.h" // sleep
6 #include "ace/OS_NS_sys_time.h" // gettimeofday
8 #include "ace/os_include/os_math.h" // exp
13 Flow (Parameters
const& )
16 sample_start_time_ (0, 0),
23 void Flow::send (Message_ptr m
)
25 if (Data
const* data
= static_cast<Data
const*> (m
->find (Data::id
)))
27 ACE_Time_Value
now_time (ACE_OS::gettimeofday ());
30 sample_bytes_
+= data
->size ();
32 if (sample_start_time_
== ACE_Time_Value (0, 0))
34 sample_start_time_
= now_time
;
38 ACE_Time_Value
delta (now_time
- sample_start_time_
);
40 if (delta
> ACE_Time_Value (0, 2000))
43 double (sample_bytes_
) / (delta
.sec () * 1000000 + delta
.usec ());
45 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
48 sample_start_time_
= ACE_Time_Value (0, 0);
52 if (!ACE::is_equal (cap_tput_
, 0.0) &&
53 !ACE::is_equal (current_tput_
, 0.0) &&
54 current_tput_
> cap_tput_
)
56 double dev
= (current_tput_
- cap_tput_
) / current_tput_
;
58 // cerr << "deviation: " << dev << endl;
60 // Cap decay algorithm.
63 ACE_Time_Value
delta (now_time
- nak_time_
);
65 unsigned long msec
= delta
.msec ();
67 double x
= msec
/ -16000.0;
68 double y
= 1.0 * exp (x
);
69 cap_tput_
= cap_tput_
/ y
;
71 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
79 time
.tv_nsec
= static_cast<unsigned long> (dev
* 500000.0);
81 // Don't bother to sleep if the time is less than 10 usec.
83 if (time
.tv_nsec
> 10000)
84 ACE_OS::sleep (ACE_Time_Value (time
));
91 void Flow::recv (Message_ptr m
)
93 if (NAK
const* nak
= static_cast<NAK
const*> (m
->find (NAK::id
)))
95 Address
to (static_cast<To
const*> (m
->find (To::id
))->address ());
97 if (nak
->address () == to
)
99 // This one is for us.
102 //cerr << "NAK from "
103 // << static_cast<From const*> (m->find (From::id))->address ()
104 // << " for " << nak->count () << " sns." << endl;
107 ACE_Time_Value
nak_time (ACE_OS::gettimeofday ());
111 nak_time_
= nak_time
;
113 if (ACE::is_equal (cap_tput_
, 0.0))
114 cap_tput_
= current_tput_
;
116 if (!ACE::is_equal (cap_tput_
, 0.0))
118 cap_tput_
= cap_tput_
- cap_tput_
/ 6.0;
120 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;