1 #include "Periodic_Consumer.h"
3 #include "ace/Arg_Shifter.h"
4 #include "ace/High_Res_Timer.h"
7 #include "orbsvcs/Time_Utilities.h"
8 #include "StructuredEvent.h"
9 #include "Task_Stats.h"
10 #include "Task_Callback.h"
11 #include "LookupManager.h"
12 #include "Priority_Mapping.h"
14 int WARMUP_COUNT
= 10;
16 TAO_Notify_Tests_Periodic_Consumer::TAO_Notify_Tests_Periodic_Consumer ()
18 , warmup_countdown_ (WARMUP_COUNT
)
27 TAO_Notify_Tests_Periodic_Consumer::~TAO_Notify_Tests_Periodic_Consumer ()
32 TAO_Notify_Tests_Periodic_Consumer::task_callback (TAO_Notify_Tests_Task_Callback
* client
)
34 this->client_
= client
;
38 TAO_Notify_Tests_Periodic_Consumer::init_state (ACE_Arg_Shifter
& arg_shifter
)
40 // First, let the base class look for options.
41 if (TAO_Notify_Tests_StructuredPushConsumer::init_state (arg_shifter
) == -1)
44 const ACE_TCHAR
*current_arg
= 0;
46 while (arg_shifter
.is_anything_left ())
48 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-MaxCount"))))
50 this->max_count_
= ACE_OS::atoi (current_arg
);
51 arg_shifter
.consume_arg ();
56 this->client_
->done (this);
59 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-Check_Priority")) == 0)
61 this->check_priority_
= 1;
63 arg_shifter
.consume_arg ();
75 TAO_Notify_Tests_Periodic_Consumer::handle_start_event (const CosNotification::PropertySeq
& prop_seq
)
77 if (TAO_debug_level
> 0)
78 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Consumer %s received inital (-1)th event\n", this->name_
.c_str ()));
80 for (CORBA::ULong i
= 0; i
< prop_seq
.length (); ++i
)
82 if (ACE_OS::strcmp (prop_seq
[i
].name
.in (), "BaseTime") == 0)
84 TimeBase::TimeT base_time
;
85 ACE_hrtime_t base_time_hrtime
;
87 prop_seq
[i
].value
>>= base_time
;
89 ORBSVCS_Time::TimeT_to_hrtime (base_time_hrtime
, base_time
);
90 stats_
.base_time (base_time_hrtime
);
92 // if max_count has not been already specified, get it from the supplier.
93 else if (this->max_count_
== -1 &&
94 ACE_OS::strcmp (prop_seq
[i
].name
.in (), "MaxCount") == 0)
96 prop_seq
[i
].value
>>= this->max_count_
;
98 else if (ACE_OS::strcmp (prop_seq
[i
].name
.in (), "Load") == 0)
100 prop_seq
[i
].value
>>= this->load_
;
104 if (TAO_debug_level
> 0)
106 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Maxcount = %d, Load = %d\n",
107 this->max_count_
, this->load_
));
112 TAO_Notify_Tests_Periodic_Consumer::check_priority (const CosNotification::PropertySeq
& prop_seq
)
114 // Check if the event carries a Priority.
115 int event_has_priority_set
= 0;
116 CORBA::Short event_priority
= 0;
118 for (CORBA::ULong i
= 0; i
< prop_seq
.length (); ++i
)
120 if (ACE_OS::strcmp (prop_seq
[i
].name
.in (), CosNotification::Priority
) == 0)
122 prop_seq
[i
].value
>>= event_priority
;
124 event_has_priority_set
= 1;
129 if (event_has_priority_set
== 1)
131 // Confirm that the current thread is at the priority set in the event
132 ACE_hthread_t current
;
133 ACE_Thread::self (current
);
136 if (ACE_Thread::getprio (current
, priority
) == -1)
138 ACE_DEBUG ((LM_DEBUG
,
139 ACE_TEXT ("TAO (%P|%t) - ")
140 ACE_TEXT (" ACE_Thread::get_prio\n")));
145 CORBA::Short native_priority
= CORBA::Short (priority
);
147 TAO_Notify_Tests_Priority_Mapping
* priority_mapping
;
148 LOOKUP_MANAGER
->resolve (priority_mapping
);
150 CORBA::Short corba_priority
;
152 priority_mapping
->to_CORBA (native_priority
, corba_priority
);
154 if (TAO_debug_level
> 0)
155 ACE_DEBUG ((LM_DEBUG
,
156 "Periodic Consumer expected priority = %d, received priority = %d/%d (native/corba)\n",
157 event_priority
, native_priority
, corba_priority
));
159 if (corba_priority
!= event_priority
)
160 ACE_DEBUG ((LM_DEBUG
,
161 "Error: Periodic Consumer expected priority = %d, received priority = %d\n",
162 event_priority
, corba_priority
));
167 TAO_Notify_Tests_Periodic_Consumer::push_structured_event (const CosNotification::StructuredEvent
& notification
)
169 ACE_GUARD_THROW_EX (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
,
172 const CosNotification::PropertySeq
& prop_seq
=
173 notification
.header
.variable_header
;
175 if (this->count_
== -2)
177 if (--warmup_countdown_
== 0)
182 else if (this->count_
== -1)
184 this->handle_start_event (prop_seq
);
186 if (this->max_count_
> 0)
187 this->stats_
.init (this->max_count_
);
193 if (this->check_priority_
)
195 this->check_priority (prop_seq
);
198 if (TAO_debug_level
> 0)
200 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Consumer %s received %d event type (%s,%s)\n", this->name_
.c_str (), this->count_
,
201 notification
.header
.fixed_header
.event_type
.domain_name
.in(),
202 notification
.header
.fixed_header
.event_type
.type_name
.in()));
205 for (CORBA::ULong i
= 0; i
< prop_seq
.length (); ++i
)
207 if (ACE_OS::strcmp (prop_seq
[i
].name
.in (), "Stop") == 0)
209 this->stop_received_
= 1;
213 TimeBase::TimeT send_time
, now
;
214 ACE_hrtime_t send_time_hrtime
;
216 notification
.remainder_of_body
>>= send_time
;
218 ORBSVCS_Time::TimeT_to_hrtime (send_time_hrtime
, send_time
);
220 now
= ACE_OS::gethrtime ();
222 stats_
.sample (send_time_hrtime
, now
);
225 static CORBA::ULong prime_number
= 9619;
227 (void)ACE::gcd (prime_number
, prime_number
/2 -1);
229 for (CORBA::ULong load
= this->load_
; load
!= 0; --load
)
230 ACE::is_prime (prime_number
,
236 if (++this->count_
>= this->max_count_
|| this->stop_received_
== 1)
238 stats_
.end_time (ACE_OS::gethrtime ());
241 this->client_
->done (this);
243 if (TAO_debug_level
> 0)
244 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Consumer %s done\n", this->name_
.c_str ()));
249 TAO_Notify_Tests_Periodic_Consumer::dump_stats (ACE_TCHAR
* msg
, int dump_samples
)
251 ACE_TCHAR buf
[BUFSIZ
];
252 ACE_OS::sprintf (buf
, ACE_TEXT("%s.dat"), this->name_
.c_str ());
254 ACE_TString
fname (buf
);
256 ACE_OS::sprintf (buf
,
257 ACE_TEXT("%s# Consumer Name = %s, Proxy ID = %d Load = %u\n"),
259 this->name_
.c_str (), this->proxy_id_
, this->load_
);
261 stats_
.dump_samples (fname
.c_str (), buf
, dump_samples
);