Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / Performance / Latency.cpp
blobd44f1be20cdfc77840256c4fb423bba253b521d1
1 #include "Latency.h"
2 #include "orbsvcs/Event_Service_Constants.h"
4 #include "tao/Messaging/Messaging.h"
5 #include "tao/Strategies/advanced_resource.h"
6 #include "tao/PortableServer/PortableServer.h"
7 #include "ace/High_Res_Timer.h"
8 #include "ace/Get_Opt.h"
9 #include "ace/Sample_History.h"
10 #include "ace/Basic_Stats.h"
11 #include "ace/Sched_Params.h"
12 #include "ace/OS_NS_errno.h"
16 int iterations = 1000;
17 int do_dump_history = 0;
18 const ACE_TCHAR *ec_ior = ACE_TEXT("file://ec.ior");
20 /// Parse the arguments.
21 static int parse_args (int argc, ACE_TCHAR *argv[]);
23 int
24 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
26 int priority =
27 (ACE_Sched_Params::priority_min (ACE_SCHED_FIFO)
28 + ACE_Sched_Params::priority_max (ACE_SCHED_FIFO)) / 2;
29 priority = ACE_Sched_Params::next_priority (ACE_SCHED_FIFO,
30 priority);
31 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
33 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
34 priority,
35 ACE_SCOPE_PROCESS)) != 0)
37 if (ACE_OS::last_error () == EPERM)
39 ACE_DEBUG ((LM_DEBUG,
40 "server (%P|%t): user is not superuser, "
41 "test runs in time-shared class\n"));
43 else
44 ACE_ERROR ((LM_ERROR,
45 "server (%P|%t): sched_params failed\n"));
48 try
50 CORBA::ORB_var orb =
51 CORBA::ORB_init (argc, argv);
53 #if (TAO_HAS_CORBA_MESSAGING == 1)
54 CORBA::Object_var manager_object =
55 orb->resolve_initial_references ("ORBPolicyManager");
57 CORBA::PolicyManager_var policy_manager =
58 CORBA::PolicyManager::_narrow (manager_object.in ());
60 CORBA::Any sync_scope;
61 sync_scope <<= Messaging::SYNC_WITH_TARGET;
63 CORBA::PolicyList policy_list (1);
64 policy_list.length (1);
65 policy_list[0] =
66 orb->create_policy (Messaging::SYNC_SCOPE_POLICY_TYPE,
67 sync_scope);
68 policy_manager->set_policy_overrides (policy_list,
69 CORBA::SET_OVERRIDE);
70 #else
71 ACE_DEBUG ((LM_DEBUG,
72 "CORBA Messaging disabled in this configuration,"
73 " test may not be optimally configured\n"));
74 #endif /* TAO_HAS_MESSAGING */
76 CORBA::Object_var poa_object =
77 orb->resolve_initial_references("RootPOA");
79 if (CORBA::is_nil (poa_object.in ()))
80 ACE_ERROR_RETURN ((LM_ERROR,
81 " (%P|%t) Unable to initialize the POA.\n"),
82 1);
84 PortableServer::POA_var root_poa =
85 PortableServer::POA::_narrow (poa_object.in ());
87 PortableServer::POAManager_var poa_manager =
88 root_poa->the_POAManager ();
90 poa_manager->activate ();
92 if (parse_args (argc, argv) != 0)
93 return 1;
95 // Get the event channel object reference
96 CORBA::Object_var object =
97 orb->string_to_object (ec_ior);
99 RtecEventChannelAdmin::EventChannel_var ec =
100 RtecEventChannelAdmin::EventChannel::_narrow (object.in ());
101 if (CORBA::is_nil (ec.in ()))
103 ACE_ERROR ((LM_ERROR,
104 "(%P|%t) Invalid or nil event channel\n"));
105 return 1;
108 ACE_DEBUG ((LM_DEBUG, "Resolved event service\n"));
110 // Now create the history
111 ACE_Sample_History history (iterations);
112 TAO_SYNCH_MUTEX history_mutex;
114 // The consumer
115 EC_Latency_Consumer consumer (&history,
116 &history_mutex,
117 iterations);
118 // Connect the consumer
120 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
121 ec->for_consumers ();
123 RtecEventChannelAdmin::ProxyPushSupplier_var proxy_supplier =
124 consumer_admin->obtain_push_supplier ();
126 RtecEventComm::PushConsumer_var consumer_reference =
127 consumer._this ();
129 // Simple subscription, but usually the helper classes in
130 // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
131 RtecEventChannelAdmin::ConsumerQOS consumer_qos;
132 consumer_qos.dependencies.length (2);
133 RtecEventComm::EventHeader& h0 =
134 consumer_qos.dependencies[0].event.header;
135 h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
136 h0.source = 1;
138 RtecEventComm::EventHeader& h1 =
139 consumer_qos.dependencies[1].event.header;
140 h1.type = ACE_ES_EVENT_UNDEFINED; // first free event type
141 h1.source = ACE_ES_EVENT_SOURCE_ANY;
143 proxy_supplier->connect_push_consumer (consumer_reference.in (),
144 consumer_qos);
146 ACE_DEBUG ((LM_DEBUG, "Connected consumer\n"));
148 // The supplier
149 EC_Latency_Supplier supplier;
151 // The canonical protocol to connect to the EC
152 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
153 ec->for_suppliers ();
155 RtecEventChannelAdmin::ProxyPushConsumer_var proxy_consumer =
156 supplier_admin->obtain_push_consumer ();
158 RtecEventComm::PushSupplier_var supplier_reference =
159 supplier._this ();
161 // Simple publication, but usually the helper classes in
162 // $TAO_ROOT/orbsvcs/Event_Utils.h are a better way to do this.
163 RtecEventChannelAdmin::SupplierQOS supplier_qos;
164 supplier_qos.publications.length (1);
165 RtecEventComm::EventHeader& sh0 =
166 supplier_qos.publications[0].event.header;
167 sh0.type = ACE_ES_EVENT_UNDEFINED; // first free event type
168 sh0.source = 1; // first free event source
170 proxy_consumer->connect_push_supplier (supplier_reference.in (),
171 supplier_qos);
173 ACE_DEBUG ((LM_DEBUG, "Connected supplier\n"));
175 Task task (proxy_consumer.in (), iterations);
177 task.activate ();
179 ACE_hrtime_t start = ACE_OS::gethrtime ();
180 while (!task.done () || !consumer.done ())
182 ACE_Time_Value tv (1, 0);
183 orb->run (tv);
185 ACE_hrtime_t end = ACE_OS::gethrtime ();
187 ACE_Thread_Manager::instance ()->wait ();
189 // Calibrate the high resolution timer *before* starting the
190 // test.
191 ACE_DEBUG ((LM_DEBUG, "Calibrating high res timer ...."));
192 ACE_High_Res_Timer::calibrate ();
194 ACE_High_Res_Timer::global_scale_factor_type gsf =
195 ACE_High_Res_Timer::global_scale_factor ();
196 ACE_DEBUG ((LM_DEBUG, "Done (%d)\n", gsf));
197 if (do_dump_history)
199 history.dump_samples (ACE_TEXT("HISTORY"), gsf);
202 ACE_Basic_Stats stats;
203 history.collect_basic_stats (stats);
204 stats.dump_results (ACE_TEXT("Latency"), gsf);
206 ACE_hrtime_t elapsed_microseconds = (end - start) / gsf;
207 double elapsed_seconds =
208 ACE_CU64_TO_CU32(elapsed_microseconds) / 1000000.0;
209 double throughput =
210 double(iterations) / elapsed_seconds;
212 ACE_DEBUG ((LM_DEBUG, "Throughtput: %f\n", throughput));
214 proxy_supplier->disconnect_push_supplier ();
216 proxy_consumer->disconnect_push_consumer ();
218 PortableServer::ObjectId_var id;
220 id = root_poa->servant_to_id (&consumer);
221 root_poa->deactivate_object (id.in ());
223 id = root_poa->servant_to_id (&supplier);
224 root_poa->deactivate_object (id.in ());
226 orb->destroy ();
228 catch (const CORBA::Exception& ex)
230 ex._tao_print_exception (argv[0]);
232 return 0;
235 // ****************************************************************
238 parse_args (int argc, ACE_TCHAR *argv[])
240 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT("hi:k:"));
241 int c;
243 while ((c = get_opts ()) != -1)
244 switch (c)
246 case 'h':
247 do_dump_history = 1;
248 break;
250 case 'i':
251 iterations = ACE_OS::atoi (get_opts.opt_arg ());
252 break;
254 case 'k':
255 ec_ior = get_opts.opt_arg ();
256 break;
258 case '?':
259 default:
260 ACE_ERROR_RETURN ((LM_ERROR,
261 "usage: %s "
262 "-i <iterations>"
263 "-k <IOR>"
264 "\n",
265 argv [0]),
266 -1);
268 // Indicates successful parsing of the command line
269 return 0;
272 // ****************************************************************
274 EC_Latency_Consumer::EC_Latency_Consumer (ACE_Sample_History *history,
275 TAO_SYNCH_MUTEX *mutex,
276 int message_count)
277 : history_ (history)
278 , mutex_ (mutex)
279 , remaining_messages_ (message_count)
284 EC_Latency_Consumer::done (void)
286 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, *this->mutex_, -1);
287 return this->remaining_messages_ <= 0;
290 void
291 EC_Latency_Consumer::push (const RtecEventComm::EventSet& events)
293 ACE_hrtime_t creation;
294 ORBSVCS_Time::TimeT_to_hrtime (creation,
295 events[0].header.creation_time);
296 ACE_hrtime_t now = ACE_OS::gethrtime ();
298 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, *this->mutex_);
299 this->history_->sample (now - creation);
300 if (this->remaining_messages_ % 1000 == 0)
302 ACE_DEBUG ((LM_DEBUG, "Only %d messages to go\n",
303 this->remaining_messages_));
306 this->remaining_messages_--;
309 void
310 EC_Latency_Consumer::disconnect_push_consumer (void)
314 // ****************************************************************
316 EC_Latency_Supplier::EC_Latency_Supplier (void)
320 void
321 EC_Latency_Supplier::disconnect_push_supplier (void)
325 // ****************************************************************
327 Task::Task (RtecEventChannelAdmin::ProxyPushConsumer_ptr consumer,
328 int iterations)
329 : consumer_ (RtecEventChannelAdmin::ProxyPushConsumer::_duplicate (consumer))
330 , remaining_messages_ (iterations)
335 Task::done (void)
337 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->mutex_, 1);
338 return this->remaining_messages_ == 0;
342 Task::svc (void)
346 RtecEventComm::EventSet event (1);
347 event.length (1);
348 event[0].header.type = ACE_ES_EVENT_UNDEFINED;
349 event[0].header.source = 1;
350 event[0].header.ttl = 1;
351 event[0].data.payload.length(1024);
353 for (;;)
355 ACE_hrtime_t creation = ACE_OS::gethrtime ();
356 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
357 creation);
358 this->consumer_->push (event);
360 // ACE_Time_Value tv (0, 5000);
361 // ACE_OS::sleep (tv);
363 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->mutex_, -1);
364 if (this->remaining_messages_ % 1000 == 0)
366 ACE_DEBUG ((LM_DEBUG, "Only %d messages to go\n",
367 this->remaining_messages_));
370 this->remaining_messages_--;
371 if (this->remaining_messages_ == 0)
372 return 0;
375 catch (const CORBA::SystemException& ex)
377 ex._tao_print_exception ("Task::svc");
379 return 0;