Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Flow.cpp
blob8fb232dca066b8dc4c43674ad625cd13a182e024
1 // author : Boris Kolpackov <boris@kolpackov.net>
2 #include "Flow.h"
4 #include "ace/ACE.h"
5 #include "ace/OS_NS_unistd.h" // sleep
6 #include "ace/OS_NS_sys_time.h" // gettimeofday
8 #include "ace/os_include/os_math.h" // exp
10 namespace ACE_RMCast
12 Flow::
13 Flow (Parameters const& )
14 : //params_ (params),
15 nak_time_ (0, 0),
16 sample_start_time_ (0, 0),
17 sample_bytes_ (0),
18 current_tput_ (0.0),
19 cap_tput_ (0.0)
23 void Flow::send (Message_ptr m)
25 if (Data const* data = static_cast<Data const*> (m->find (Data::id)))
27 ACE_Time_Value now_time (ACE_OS::gettimeofday ());
29 Lock l (mutex_);
30 sample_bytes_ += data->size ();
32 if (sample_start_time_ == ACE_Time_Value (0, 0))
34 sample_start_time_ = now_time;
36 else
38 ACE_Time_Value delta (now_time - sample_start_time_);
40 if (delta > ACE_Time_Value (0, 2000))
42 current_tput_ =
43 double (sample_bytes_) / (delta.sec () * 1000000 + delta.usec ());
45 // cerr << "tput: " << current_tput_ << " bytes/usec" << endl;
47 sample_bytes_ = 0;
48 sample_start_time_ = ACE_Time_Value (0, 0);
52 if (!ACE::is_equal (cap_tput_, 0.0) &&
53 !ACE::is_equal (current_tput_, 0.0) &&
54 current_tput_ > cap_tput_)
56 double dev = (current_tput_ - cap_tput_) / current_tput_;
58 // cerr << "deviation: " << dev << endl;
60 // Cap decay algorithm.
63 ACE_Time_Value delta (now_time - nak_time_);
65 unsigned long msec = delta.msec ();
67 double x = msec / -16000.0;
68 double y = 1.0 * exp (x);
69 cap_tput_ = cap_tput_ / y;
71 // cerr << "cap decay: " << cap_tput_ << " bytes/usec" << endl;
74 l.release ();
77 timespec time;
78 time.tv_sec = 0;
79 time.tv_nsec = static_cast<unsigned long> (dev * 500000.0);
81 // Don't bother to sleep if the time is less than 10 usec.
83 if (time.tv_nsec > 10000)
84 ACE_OS::sleep (ACE_Time_Value (time));
88 out_->send (m);
91 void Flow::recv (Message_ptr m)
93 if (NAK const* nak = static_cast<NAK const*> (m->find (NAK::id)))
95 Address to (static_cast<To const*> (m->find (To::id))->address ());
97 if (nak->address () == to)
99 // This one is for us.
102 //cerr << "NAK from "
103 // << static_cast<From const*> (m->find (From::id))->address ()
104 // << " for " << nak->count () << " sns." << endl;
107 ACE_Time_Value nak_time (ACE_OS::gettimeofday ());
109 Lock l (mutex_);
111 nak_time_ = nak_time;
113 if (ACE::is_equal (cap_tput_, 0.0))
114 cap_tput_ = current_tput_;
116 if (!ACE::is_equal (cap_tput_, 0.0))
118 cap_tput_ = cap_tput_ - cap_tput_ / 6.0;
120 // cerr << "cap: " << cap_tput_ << " bytes/usec" << endl;
125 in_->recv (m);