1 #include "Periodic_Supplier.h"
3 #include "ace/Arg_Shifter.h"
4 #include "ace/High_Res_Timer.h"
5 #include "ace/Barrier.h"
6 #include "ace/OS_NS_unistd.h"
8 #include "tao/ORB_Core.h"
9 #include "orbsvcs/Time_Utilities.h"
10 #include "StructuredEvent.h"
11 #include "Task_Stats.h"
12 #include "Task_Callback.h"
13 #include "LookupManager.h"
14 #include "Priority_Mapping.h"
16 TAO_Notify_Tests_Periodic_Supplier::TAO_Notify_Tests_Periodic_Supplier ()
20 total_deadlines_missed_ (0),
30 TAO_Notify_Tests_Periodic_Supplier::~TAO_Notify_Tests_Periodic_Supplier ()
36 TAO_Notify_Tests_Periodic_Supplier::task_callback(TAO_Notify_Tests_Task_Callback
* client
)
38 this->client_
= client
;
42 TAO_Notify_Tests_Periodic_Supplier::init_state (ACE_Arg_Shifter
& arg_shifter
)
44 // First, let the base class look for options.
45 if (TAO_Notify_Tests_StructuredPushSupplier::init_state (arg_shifter
) == -1)
48 const ACE_TCHAR
*current_arg
= 0;
50 while (arg_shifter
.is_anything_left ())
52 if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-EventType"))))
54 this->event_
.type ("*", ACE_TEXT_ALWAYS_CHAR(current_arg
)) ;
55 zeroth_event
.type ("*", ACE_TEXT_ALWAYS_CHAR(current_arg
)) ;
56 arg_shifter
.consume_arg ();
58 else if (arg_shifter
.cur_arg_strncasecmp (ACE_TEXT("-FilterLongData")) == 0) // -FilterLongData name value
60 arg_shifter
.consume_arg ();
62 ACE_CString name
= ACE_TEXT_ALWAYS_CHAR(arg_shifter
.get_current ());
64 arg_shifter
.consume_arg ();
66 CORBA::Long value
= (CORBA::Long
)ACE_OS::atoi (arg_shifter
.get_current ());
68 arg_shifter
.consume_arg ();
71 buffer
<<= (CORBA::Long
) value
;
73 this->event_
.filter (name
.c_str (), buffer
);
75 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Priority"))))
77 priority_
= ACE_OS::atoi (current_arg
);
78 arg_shifter
.consume_arg ();
81 buffer
<<= (CORBA::Short
) this->priority_
;
82 this->event_
.qos (CosNotification::Priority
, buffer
);
84 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Period"))))
86 period_
= ACE_OS::atoi (current_arg
);
87 arg_shifter
.consume_arg ();
89 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-ExecTime"))))
91 exec_time_
= ACE_OS::atoi (current_arg
);
92 arg_shifter
.consume_arg ();
94 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Phase"))))
96 phase_
= ACE_OS::atoi (current_arg
);
97 arg_shifter
.consume_arg ();
99 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Iter"))))
101 iter_
= ACE_OS::atoi (current_arg
);
102 arg_shifter
.consume_arg ();
104 if (stats_
.init (iter_
) == -1)
107 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-Load"))))
109 load_
= ACE_OS::atoi (current_arg
);
110 arg_shifter
.consume_arg ();
112 else if (0 != (current_arg
= arg_shifter
.get_the_parameter (ACE_TEXT("-RunTime")))) // in seconds
114 run_time_
= ACE_OS::atoi (current_arg
);
115 arg_shifter
.consume_arg ();
119 ACE_DEBUG ((LM_DEBUG
, "parse Task unknown option %s\n",
120 arg_shifter
.get_current ()));
121 if (TAO_debug_level
> 0)
122 ACE_DEBUG ((LM_DEBUG
, "event type %s, priority %d, period %duS, exec_time %duS, phase %duS, iter %d, load %d\n",
123 event_
.type(), priority_
, period_
, exec_time_
, phase_
, iter_
, load_
));
131 TAO_Notify_Tests_Periodic_Supplier::activate_task (ACE_Barrier
* barrier
)
135 long flags
= THR_NEW_LWP
| THR_JOINABLE
;
139 LOOKUP_MANAGER
->resolve (orb
);
142 orb
->orb_core ()->orb_params ()->thread_creation_flags ();
144 TAO_Notify_Tests_Priority_Mapping
* priority_mapping
;
145 LOOKUP_MANAGER
->resolve (priority_mapping
);
147 CORBA::Short native_prio
;
149 priority_mapping
->to_native (this->priority_
, native_prio
);
151 // Become an active object.
152 if (this->ACE_Task
<ACE_SYNCH
>::activate (flags
,
157 if (ACE_OS::last_error () == EPERM
)
158 ACE_ERROR_RETURN ((LM_ERROR
,
159 ACE_TEXT ("Insufficient privilege to activate ACE_Task.\n")),
162 ACE_DEBUG ((LM_ERROR
,
163 ACE_TEXT ("(%t) Task activation at priority %d failed, ")
164 ACE_TEXT ("exiting!\n%a"),
169 ACE_DEBUG ((LM_ERROR
, "Activated Periodic Supplier Thread at priority %d\n", this->priority_
));
175 TAO_Notify_Tests_Periodic_Supplier::send_warmup_events ()
177 int WARMUP_COUNT
= 10;
179 for (int i
= 0; i
< WARMUP_COUNT
; ++i
)
181 this->send_event (this->event_
.event ());
186 TAO_Notify_Tests_Periodic_Supplier::send_prologue ()
189 // send the base time and max count.
190 TimeBase::TimeT base_time
;
191 ORBSVCS_Time::hrtime_to_TimeT (base_time
,
192 BASE_TIME::instance ()->base_time_
);
195 buffer
<<= base_time
;
196 zeroth_event
.opt_header ("BaseTime", buffer
);
198 buffer
<<= this->iter_
;
199 zeroth_event
.opt_header ("MaxCount", buffer
);
201 buffer
<<= this->load_
;
202 zeroth_event
.opt_header ("Load", buffer
);
204 if (TAO_debug_level
> 0)
205 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Supplier (%s) sending event 0th event\n"));
207 this->send_event (zeroth_event
.event ());
211 TAO_Notify_Tests_Periodic_Supplier::handle_svc ()
213 this->send_prologue ();
215 ACE_hrtime_t before
, after
;
216 TimeBase::TimeT
time_t;
220 ACE_hrtime_t base_time
= BASE_TIME::instance ()->base_time_
;
222 for (int i
= 0; i
< iter_
; ++i
)
224 before
= ACE_OS::gethrtime ();
226 ORBSVCS_Time::hrtime_to_TimeT (time_t,
231 this->event_
.payload (buffer
);
233 if (this->run_time_
!= 0 &&
234 Task_Stats::diff_sec (base_time
, before
) > this->run_time_
)
236 // Time up, send a "Stop" event.
237 buffer
<<= (CORBA::Long
) 1;
238 this->event_
.opt_header ("Stop", buffer
);
240 i
= iter_
; // Load the iter so that the loop exits.
243 if (TAO_debug_level
> 0)
244 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Supplier (%s) sending event #%d\n",
245 this->name_
.c_str (), i
));
247 this->send_event (this->event_
.event ());
249 after
= ACE_OS::gethrtime ();
251 stats_
.sample (before
, after
);
253 if (period_
!= 0) // blast mode, no sleep.
255 ACE_UINT32 elapsed_microseconds
=
256 Task_Stats::diff_usec (before
, after
);
258 // did we miss any deadlines?
260 (int)elapsed_microseconds
> period_
? elapsed_microseconds
/period_
: 0;
261 this->total_deadlines_missed_
+= missed
;
263 /* Start -- "Immediate run if last call missed deadline" */
264 if (missed
> 0) // if we missed
267 long sleep_time
= period_
- elapsed_microseconds
;
268 /* End -- "Immediate run if last call missed deadline" */
271 * This logic sleeps till the next period.
272 * So, if we missed a deadline we wait.
274 long sleep_time = (missed + 1)*period_ ;
275 sleep_time -= elapsed_microseconds;
278 if (TAO_debug_level
> 0)
279 ACE_DEBUG ((LM_DEBUG
, "(%t) sleep time = %d uSec, missed %d deadlines\n", sleep_time
, missed
));
281 ACE_Time_Value
t_sleep (0, sleep_time
);
282 ACE_OS::sleep (t_sleep
);
287 stats_
.end_time (ACE_OS::gethrtime ());
290 this->client_
->done (this);
294 TAO_Notify_Tests_Periodic_Supplier::svc ()
296 if (TAO_debug_level
> 0)
297 ACE_DEBUG ((LM_DEBUG
, "Thread_Task (%t) - wait\n"));
301 // First, send warmup events.
302 this->send_warmup_events ();
304 // Next, wait for other threads.
305 this->barrier_
->wait ();
307 // first thread here inits the Base_Time.
308 stats_
.base_time (BASE_TIME::instance ()->base_time_
);
310 // now wait till the phase_ period expires.
311 ACE_OS::sleep (ACE_Time_Value (0, phase_
));
315 catch (const CORBA::UserException
& ue
)
317 ue
._tao_print_exception (
318 "Error: Periodic supplier: error sending event. ");
320 catch (const CORBA::SystemException
& se
)
322 se
._tao_print_exception (
323 "Error: Periodic supplier: error sending event. ");
330 TAO_Notify_Tests_Periodic_Supplier::dump_stats (ACE_TCHAR
* msg
, int dump_samples
)
332 ACE_TCHAR buf
[BUFSIZ
];
333 ACE_OS::sprintf (buf
, ACE_TEXT("%s.dat"), this->name_
.c_str ());
335 ACE_TString
fname (buf
);
337 ACE_OS::sprintf (buf
, ACE_TEXT("%s# : Supplier Name = %s, Proxy ID = %d, Event Type = %s, priority %d, period %ld, exec_time %ld, phase %ld, iter_ %d , load_ %d, deadlines missed = %ld\n"),
338 msg
, this->name_
.c_str (), this->proxy_id_
, this->event_
.type (), priority_
, period_
, exec_time_
, phase_
, iter_
, load_
, this->total_deadlines_missed_
);
340 stats_
.dump_samples (fname
.c_str (), buf
, dump_samples
);
343 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, Base_Time
, TAO_SYNCH_MUTEX
);