2 #include "orbsvcs/Event_Service_Constants.h"
4 #include "tao/Messaging/Messaging.h"
5 #include "tao/Strategies/advanced_resource.h"
6 #include "tao/PortableServer/PortableServer.h"
7 #include "ace/High_Res_Timer.h"
8 #include "ace/Get_Opt.h"
9 #include "ace/Sample_History.h"
10 #include "ace/Basic_Stats.h"
11 #include "ace/Sched_Params.h"
12 #include "ace/OS_NS_errno.h"
16 int iterations
= 1000;
17 int do_dump_history
= 0;
18 const ACE_TCHAR
*ec_ior
= ACE_TEXT("file://ec.ior");
20 /// Parse the arguments.
21 static int parse_args (int argc
, ACE_TCHAR
*argv
[]);
24 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
27 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
)
28 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO
)) / 2;
29 priority
= ACE_Sched_Params::next_priority (ACE_SCHED_FIFO
,
31 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
33 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
35 ACE_SCOPE_PROCESS
)) != 0)
37 if (ACE_OS::last_error () == EPERM
)
40 "server (%P|%t): user is not superuser, "
41 "test runs in time-shared class\n"));
45 "server (%P|%t): sched_params failed\n"));
51 CORBA::ORB_init (argc
, argv
);
53 #if (TAO_HAS_CORBA_MESSAGING == 1)
54 CORBA::Object_var manager_object
=
55 orb
->resolve_initial_references ("ORBPolicyManager");
57 CORBA::PolicyManager_var policy_manager
=
58 CORBA::PolicyManager::_narrow (manager_object
.in ());
60 CORBA::Any sync_scope
;
61 sync_scope
<<= Messaging::SYNC_WITH_TARGET
;
63 CORBA::PolicyList
policy_list (1);
64 policy_list
.length (1);
66 orb
->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE
,
68 policy_manager
->set_policy_overrides (policy_list
,
72 "CORBA Messaging disabled in this configuration,"
73 " test may not be optimally configured\n"));
74 #endif /* TAO_HAS_MESSAGING */
76 CORBA::Object_var poa_object
=
77 orb
->resolve_initial_references("RootPOA");
79 if (CORBA::is_nil (poa_object
.in ()))
80 ACE_ERROR_RETURN ((LM_ERROR
,
81 " (%P|%t) Unable to initialize the POA.\n"),
84 PortableServer::POA_var root_poa
=
85 PortableServer::POA::_narrow (poa_object
.in ());
87 PortableServer::POAManager_var poa_manager
=
88 root_poa
->the_POAManager ();
90 poa_manager
->activate ();
92 if (parse_args (argc
, argv
) != 0)
95 // Get the event channel object reference
96 CORBA::Object_var object
=
97 orb
->string_to_object (ec_ior
);
99 RtecEventChannelAdmin::EventChannel_var ec
=
100 RtecEventChannelAdmin::EventChannel::_narrow (object
.in ());
101 if (CORBA::is_nil (ec
.in ()))
103 ACE_ERROR ((LM_ERROR
,
104 "(%P|%t) Invalid or nil event channel\n"));
108 ACE_DEBUG ((LM_DEBUG
, "Resolved event service\n"));
110 // Now create the history
111 ACE_Sample_History
history (iterations
);
112 TAO_SYNCH_MUTEX history_mutex
;
115 EC_Latency_Consumer
consumer (&history
,
118 // Connect the consumer
120 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
121 ec
->for_consumers ();
123 RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier
=
124 consumer_admin
->obtain_push_supplier ();
126 RtecEventComm::PushConsumer_var consumer_reference
=
129 // Simple subscription, but usually the helper classes in
130 // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
131 RtecEventChannelAdmin::ConsumerQOS consumer_qos
;
132 consumer_qos
.dependencies
.length (2);
133 RtecEventComm::EventHeader
& h0
=
134 consumer_qos
.dependencies
[0].event
.header
;
135 h0
.type
= ACE_ES_DISJUNCTION_DESIGNATOR
;
138 RtecEventComm::EventHeader
& h1
=
139 consumer_qos
.dependencies
[1].event
.header
;
140 h1
.type
= ACE_ES_EVENT_UNDEFINED
; // first free event type
141 h1
.source
= ACE_ES_EVENT_SOURCE_ANY
;
143 proxy_supplier
->connect_push_consumer (consumer_reference
.in (),
146 ACE_DEBUG ((LM_DEBUG
, "Connected consumer\n"));
149 EC_Latency_Supplier supplier
;
151 // The canonical protocol to connect to the EC
152 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
153 ec
->for_suppliers ();
155 RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer
=
156 supplier_admin
->obtain_push_consumer ();
158 RtecEventComm::PushSupplier_var supplier_reference
=
161 // Simple publication, but usually the helper classes in
162 // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
163 RtecEventChannelAdmin::SupplierQOS supplier_qos
;
164 supplier_qos
.publications
.length (1);
165 RtecEventComm::EventHeader
& sh0
=
166 supplier_qos
.publications
[0].event
.header
;
167 sh0
.type
= ACE_ES_EVENT_UNDEFINED
; // first free event type
168 sh0
.source
= 1; // first free event source
170 proxy_consumer
->connect_push_supplier (supplier_reference
.in (),
173 ACE_DEBUG ((LM_DEBUG
, "Connected supplier\n"));
175 Task
task (proxy_consumer
.in (), iterations
);
179 ACE_hrtime_t start
= ACE_OS::gethrtime ();
180 while (!task
.done () || !consumer
.done ())
182 ACE_Time_Value
tv (1, 0);
185 ACE_hrtime_t end
= ACE_OS::gethrtime ();
187 ACE_Thread_Manager::instance ()->wait ();
189 // Calibrate the high resolution timer *before* starting the
191 ACE_DEBUG ((LM_DEBUG
, "Calibrating high res timer ...."));
192 ACE_High_Res_Timer::calibrate ();
194 ACE_High_Res_Timer::global_scale_factor_type gsf
=
195 ACE_High_Res_Timer::global_scale_factor ();
196 ACE_DEBUG ((LM_DEBUG
, "Done (%d)\n", gsf
));
199 history
.dump_samples (ACE_TEXT("HISTORY"), gsf
);
202 ACE_Basic_Stats stats
;
203 history
.collect_basic_stats (stats
);
204 stats
.dump_results (ACE_TEXT("Latency"), gsf
);
206 ACE_hrtime_t elapsed_microseconds
= (end
- start
) / gsf
;
207 double elapsed_seconds
=
208 ACE_CU64_TO_CU32(elapsed_microseconds
) / 1000000.0;
210 double(iterations
) / elapsed_seconds
;
212 ACE_DEBUG ((LM_DEBUG
, "Throughtput: %f\n", throughput
));
214 proxy_supplier
->disconnect_push_supplier ();
216 proxy_consumer
->disconnect_push_consumer ();
218 PortableServer::ObjectId_var id
;
220 id
= root_poa
->servant_to_id (&consumer
);
221 root_poa
->deactivate_object (id
.in ());
223 id
= root_poa
->servant_to_id (&supplier
);
224 root_poa
->deactivate_object (id
.in ());
228 catch (const CORBA::Exception
& ex
)
230 ex
._tao_print_exception (argv
[0]);
235 // ****************************************************************
238 parse_args (int argc
, ACE_TCHAR
*argv
[])
240 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT("hi:k:"));
243 while ((c
= get_opts ()) != -1)
251 iterations
= ACE_OS::atoi (get_opts
.opt_arg ());
255 ec_ior
= get_opts
.opt_arg ();
260 ACE_ERROR_RETURN ((LM_ERROR
,
268 // Indicates successful parsing of the command line
272 // ****************************************************************
274 EC_Latency_Consumer::EC_Latency_Consumer (ACE_Sample_History
*history
,
275 TAO_SYNCH_MUTEX
*mutex
,
279 , remaining_messages_ (message_count
)
284 EC_Latency_Consumer::done (void)
286 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, *this->mutex_
, -1);
287 return this->remaining_messages_
<= 0;
291 EC_Latency_Consumer::push (const RtecEventComm::EventSet
& events
)
293 ACE_hrtime_t creation
;
294 ORBSVCS_Time::TimeT_to_hrtime (creation
,
295 events
[0].header
.creation_time
);
296 ACE_hrtime_t now
= ACE_OS::gethrtime ();
298 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, *this->mutex_
);
299 this->history_
->sample (now
- creation
);
300 if (this->remaining_messages_
% 1000 == 0)
302 ACE_DEBUG ((LM_DEBUG
, "Only %d messages to go\n",
303 this->remaining_messages_
));
306 this->remaining_messages_
--;
310 EC_Latency_Consumer::disconnect_push_consumer (void)
314 // ****************************************************************
316 EC_Latency_Supplier::EC_Latency_Supplier (void)
321 EC_Latency_Supplier::disconnect_push_supplier (void)
325 // ****************************************************************
327 Task::Task (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer
,
329 : consumer_ (RtecEventChannelAdmin::ProxyPushConsumer::_duplicate (consumer
))
330 , remaining_messages_ (iterations
)
337 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
, 1);
338 return this->remaining_messages_
== 0;
346 RtecEventComm::EventSet
event (1);
348 event
[0].header
.type
= ACE_ES_EVENT_UNDEFINED
;
349 event
[0].header
.source
= 1;
350 event
[0].header
.ttl
= 1;
351 event
[0].data
.payload
.length(1024);
355 ACE_hrtime_t creation
= ACE_OS::gethrtime ();
356 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
,
358 this->consumer_
->push (event
);
360 // ACE_Time_Value tv (0, 5000);
361 // ACE_OS::sleep (tv);
363 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
, -1);
364 if (this->remaining_messages_
% 1000 == 0)
366 ACE_DEBUG ((LM_DEBUG
, "Only %d messages to go\n",
367 this->remaining_messages_
));
370 this->remaining_messages_
--;
371 if (this->remaining_messages_
== 0)
375 catch (const CORBA::SystemException
& ex
)
377 ex
._tao_print_exception ("Task::svc");