Changes to attempt to silence bcc64x
[ACE_TAO.git] / TAO / orbsvcs / tests / Notify / lib / Periodic_Consumer.cpp
blob87615780943a5e860535c774a9f36cb32309bb06
1 #include "Periodic_Consumer.h"
3 #include "ace/Arg_Shifter.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Task.h"
6 #include "tao/debug.h"
7 #include "orbsvcs/Time_Utilities.h"
8 #include "StructuredEvent.h"
9 #include "Task_Stats.h"
10 #include "Task_Callback.h"
11 #include "LookupManager.h"
12 #include "Priority_Mapping.h"
14 int WARMUP_COUNT = 10;
16 TAO_Notify_Tests_Periodic_Consumer::TAO_Notify_Tests_Periodic_Consumer ()
17 : count_ (-2)
18 , warmup_countdown_ (WARMUP_COUNT)
19 , max_count_ (-1)
20 , load_ (0)
21 , client_ (0)
22 , check_priority_ (0)
23 , stop_received_ (0)
27 TAO_Notify_Tests_Periodic_Consumer::~TAO_Notify_Tests_Periodic_Consumer ()
31 void
32 TAO_Notify_Tests_Periodic_Consumer::task_callback (TAO_Notify_Tests_Task_Callback* client)
34 this->client_ = client;
37 int
38 TAO_Notify_Tests_Periodic_Consumer::init_state (ACE_Arg_Shifter& arg_shifter)
40 // First, let the base class look for options.
41 if (TAO_Notify_Tests_StructuredPushConsumer::init_state (arg_shifter) == -1)
42 return -1;
44 const ACE_TCHAR *current_arg = 0;
46 while (arg_shifter.is_anything_left ())
48 if (0 != (current_arg = arg_shifter.get_the_parameter (ACE_TEXT("-MaxCount"))))
50 this->max_count_ = ACE_OS::atoi (current_arg);
51 arg_shifter.consume_arg ();
53 if (max_count_ == 0)
55 if (this->client_)
56 this->client_->done (this);
59 else if (arg_shifter.cur_arg_strncasecmp (ACE_TEXT("-Check_Priority")) == 0)
61 this->check_priority_ = 1;
63 arg_shifter.consume_arg ();
65 else
67 break;
69 } /* while */
71 return 0;
74 void
75 TAO_Notify_Tests_Periodic_Consumer::handle_start_event (const CosNotification::PropertySeq& prop_seq)
77 if (TAO_debug_level > 0)
78 ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received inital (-1)th event\n", this->name_.c_str ()));
80 for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
82 if (ACE_OS::strcmp (prop_seq[i].name.in (), "BaseTime") == 0)
84 TimeBase::TimeT base_time;
85 ACE_hrtime_t base_time_hrtime;
87 prop_seq[i].value >>= base_time;
89 ORBSVCS_Time::TimeT_to_hrtime (base_time_hrtime, base_time);
90 stats_.base_time (base_time_hrtime);
92 // if max_count has not been already specified, get it from the supplier.
93 else if (this->max_count_ == -1 &&
94 ACE_OS::strcmp (prop_seq[i].name.in (), "MaxCount") == 0)
96 prop_seq[i].value >>= this->max_count_;
98 else if (ACE_OS::strcmp (prop_seq[i].name.in (), "Load") == 0)
100 prop_seq[i].value >>= this->load_;
104 if (TAO_debug_level > 0)
106 ACE_DEBUG ((LM_DEBUG, "(%P, %t) Maxcount = %d, Load = %d\n",
107 this->max_count_, this->load_));
111 void
112 TAO_Notify_Tests_Periodic_Consumer::check_priority (const CosNotification::PropertySeq& prop_seq)
114 // Check if the event carries a Priority.
115 int event_has_priority_set = 0;
116 CORBA::Short event_priority = 0;
118 for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
120 if (ACE_OS::strcmp (prop_seq[i].name.in (), CosNotification::Priority) == 0)
122 prop_seq[i].value >>= event_priority;
124 event_has_priority_set = 1;
125 break;
129 if (event_has_priority_set == 1)
131 // Confirm that the current thread is at the priority set in the event
132 ACE_hthread_t current;
133 ACE_Thread::self (current);
135 int priority;
136 if (ACE_Thread::getprio (current, priority) == -1)
138 ACE_DEBUG ((LM_DEBUG,
139 ACE_TEXT ("TAO (%P|%t) - ")
140 ACE_TEXT (" ACE_Thread::get_prio\n")));
142 return;
145 CORBA::Short native_priority = CORBA::Short (priority);
147 TAO_Notify_Tests_Priority_Mapping* priority_mapping;
148 LOOKUP_MANAGER->resolve (priority_mapping);
150 CORBA::Short corba_priority;
152 priority_mapping->to_CORBA (native_priority, corba_priority);
154 if (TAO_debug_level > 0)
155 ACE_DEBUG ((LM_DEBUG,
156 "Periodic Consumer expected priority = %d, received priority = %d/%d (native/corba)\n",
157 event_priority, native_priority, corba_priority));
159 if (corba_priority != event_priority)
160 ACE_DEBUG ((LM_DEBUG,
161 "Error: Periodic Consumer expected priority = %d, received priority = %d\n",
162 event_priority, corba_priority));
166 void
167 TAO_Notify_Tests_Periodic_Consumer::push_structured_event (const CosNotification::StructuredEvent & notification)
169 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX, ace_mon, this->lock_,
170 CORBA::INTERNAL ());
172 const CosNotification::PropertySeq& prop_seq =
173 notification.header.variable_header;
175 if (this->count_ == -2)
177 if (--warmup_countdown_ == 0)
178 this->count_ = -1;
180 return;
182 else if (this->count_ == -1)
184 this->handle_start_event (prop_seq);
186 if (this->max_count_ > 0)
187 this->stats_.init (this->max_count_);
189 this->count_ = 0;
190 return;
193 if (this->check_priority_)
195 this->check_priority (prop_seq);
198 if (TAO_debug_level > 0)
200 ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s received %d event type (%s,%s)\n", this->name_.c_str (), this->count_,
201 notification.header.fixed_header.event_type.domain_name.in(),
202 notification.header.fixed_header.event_type.type_name.in()));
205 for (CORBA::ULong i = 0; i < prop_seq.length (); ++i)
207 if (ACE_OS::strcmp (prop_seq[i].name.in (), "Stop") == 0)
209 this->stop_received_ = 1;
213 TimeBase::TimeT send_time, now;
214 ACE_hrtime_t send_time_hrtime;
216 notification.remainder_of_body >>= send_time;
218 ORBSVCS_Time::TimeT_to_hrtime (send_time_hrtime, send_time);
220 now = ACE_OS::gethrtime ();
222 stats_.sample (send_time_hrtime, now);
224 // Eat CPU
225 static CORBA::ULong prime_number = 9619;
227 (void)ACE::gcd (prime_number, prime_number/2 -1);
229 for (CORBA::ULong load = this->load_; load != 0; --load)
230 ACE::is_prime (prime_number,
232 prime_number / 2);
234 // ---
236 if (++this->count_ >= this->max_count_ || this->stop_received_ == 1)
238 stats_.end_time (ACE_OS::gethrtime ());
240 if (this->client_)
241 this->client_->done (this);
243 if (TAO_debug_level > 0)
244 ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer %s done\n", this->name_.c_str ()));
248 void
249 TAO_Notify_Tests_Periodic_Consumer::dump_stats (ACE_TCHAR* msg, int dump_samples)
251 ACE_TCHAR buf[BUFSIZ];
252 ACE_OS::sprintf (buf, ACE_TEXT("%s.dat"), this->name_.c_str ());
254 ACE_TString fname (buf);
256 ACE_OS::sprintf (buf,
257 ACE_TEXT("%s# Consumer Name = %s, Proxy ID = %d Load = %u\n"),
258 msg,
259 this->name_.c_str (), this->proxy_id_, this->load_);
261 stats_.dump_samples (fname.c_str (), buf, dump_samples);