4 #include "orbsvcs/Event/EC_Event_Channel.h"
5 #include "orbsvcs/Event/EC_Default_Factory.h"
6 #include "ace/Arg_Shifter.h"
7 #include "ace/High_Res_Timer.h"
10 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
12 TAO_EC_Default_Factory::init_svcs ();
14 return master
.run (argc
, argv
);
17 // ****************************************************************
19 EC_Master::EC_Master (void)
26 EC_Master::~EC_Master (void)
28 if (this->channels_
!= 0)
30 for (int i
= 0; i
< this->n_channels_
; ++i
)
31 delete this->channels_
[i
];
32 delete[] this->channels_
;
37 EC_Master::run (int argc
, ACE_TCHAR
* argv
[])
41 // Calibrate the high resolution timer *before* starting the
43 ACE_High_Res_Timer::calibrate ();
45 this->seed_
= static_cast<unsigned int> (ACE_OS::time (0));
47 this->initialize_orb_and_poa (argc
, argv
);
49 if (this->parse_args (argc
, argv
))
53 "The seed value is %d\n", this->seed_
));
55 ACE_NEW_RETURN (this->channels_
,
56 EC_Observer
*[this->n_channels_
],
60 for (int i
= 0; i
!= this->n_channels_
; ++i
)
62 ACE_OS::rand_r (&this->seed_
);
63 ACE_NEW_RETURN (this->channels_
[i
],
67 this->root_poa_
.in (),
75 ACE_NEW_RETURN (targv
, ACE_TCHAR
*[argc
], 1);
77 for (int i
= 0; i
!= this->n_channels_
; ++i
)
80 for (int j
= 0; j
< targc
; ++j
)
82 this->channels_
[i
]->run_init (targc
, targv
);
88 for (int i
= 0; i
!= this->n_channels_
; ++i
)
90 this->channels_
[i
]->execute_test ();
94 if (ACE_Thread_Manager::instance ()->wait () == -1)
97 "EC_Master (%P|%t) thread manager wait failed\n"));
102 for (int i
= 0; i
!= this->n_channels_
; ++i
)
104 this->channels_
[i
]->dump_results ();
109 for (int i
= 0; i
!= this->n_channels_
; ++i
)
111 this->channels_
[i
]->run_cleanup ();
116 for (int i
= 0; i
!= this->n_channels_
; ++i
)
118 this->channels_
[i
]->disconnect_clients ();
119 this->channels_
[i
]->shutdown_clients ();
120 this->channels_
[i
]->destroy_ec ();
121 this->channels_
[i
]->deactivate_ec ();
122 this->channels_
[i
]->cleanup_tasks ();
123 this->channels_
[i
]->cleanup_suppliers ();
124 this->channels_
[i
]->cleanup_consumers ();
125 this->channels_
[i
]->cleanup_ec ();
129 this->root_poa_
->destroy (1,
132 this->orb_
->destroy ();
134 catch (const CORBA::Exception
& ex
)
136 ex
._tao_print_exception ("EC_Driver::run");
140 ACE_ERROR ((LM_ERROR
, "EC_Driver (%P|%t) non-corba exception raised\n"));
146 EC_Master::initialize_orb_and_poa (int &argc
, ACE_TCHAR
* argv
[])
149 CORBA::ORB_init (argc
, argv
);
151 CORBA::Object_var poa_object
=
152 this->orb_
->resolve_initial_references("RootPOA");
154 if (CORBA::is_nil (poa_object
.in ()))
156 ACE_ERROR ((LM_ERROR
,
157 "EC_Driver (%P|%t) Unable to initialize the POA.\n"));
162 PortableServer::POA::_narrow (poa_object
.in ());
164 PortableServer::POAManager_var poa_manager
=
165 this->root_poa_
->the_POAManager ();
167 poa_manager
->activate ();
171 EC_Master::parse_args (int &argc
, ACE_TCHAR
*argv
[])
173 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
175 while (arg_shifter
.is_anything_left ())
177 const ACE_TCHAR
*arg
= arg_shifter
.get_current ();
179 if (ACE_OS::strcmp (arg
, ACE_TEXT("-channels")) == 0)
181 arg_shifter
.consume_arg ();
182 this->n_channels_
= ACE_OS::atoi (arg_shifter
.get_current ());
184 else if (ACE_OS::strcmp (arg
, ACE_TEXT("-seed")) == 0)
186 arg_shifter
.consume_arg ();
187 this->seed_
= ACE_OS::atoi (arg_shifter
.get_current ());
190 arg_shifter
.ignore_arg ();
196 EC_Master::channel_count (void) const
198 return this->n_channels_
;
202 EC_Master::channel (int i
) const
204 return this->channels_
[i
];
207 // ****************************************************************
209 EC_Observer::EC_Observer (EC_Master
*master
,
212 PortableServer::POA_ptr root_poa
,
219 this->orb_
= CORBA::ORB::_duplicate (orb
);
220 this->root_poa_
= PortableServer::POA::_duplicate (root_poa
);
223 EC_Observer::~EC_Observer (void)
225 if (this->gwys_
!= 0)
226 delete[] this->gwys_
;
230 EC_Observer::initialize_orb_and_poa (int&, ACE_TCHAR
*[])
235 EC_Observer::parse_args (int& argc
, ACE_TCHAR
* argv
[])
237 return this->EC_Driver::parse_args (argc
, argv
);
241 EC_Observer::print_args (void) const
243 this->EC_Driver::print_args ();
247 EC_Observer::print_usage (void)
249 this->EC_Driver::print_usage ();
253 EC_Observer::execute_test (void)
255 int peer_count
= this->master_
->channel_count ();
256 ACE_NEW (this->gwys_
, TAO_EC_Gateway_IIOP
[peer_count
]);
258 for (int i
= 0; i
!= peer_count
; ++i
)
263 RtecEventChannelAdmin::EventChannel_ptr rmt_ec
=
264 this->master_
->channel (i
)->event_channel_
.in ();
266 this->gwys_
[i
].init (rmt_ec
,
267 this->event_channel_
.in ());
269 RtecEventChannelAdmin::Observer_var obs
=
270 this->gwys_
[i
]._this ();
272 RtecEventChannelAdmin::Observer_Handle h
=
273 rmt_ec
->append_observer (obs
.in ());
275 this->gwys_
[i
].observer_handle (h
);
279 if (this->allocate_tasks () == -1)
282 this->activate_tasks ();
284 if (this->verbose ())
285 ACE_DEBUG ((LM_DEBUG
, "EC_Observer[%d] (%P|%t) suppliers are active\n",
290 EC_Observer::run_cleanup (void)
292 for (int j
= 0; j
!= this->master_
->channel_count (); ++j
)
297 RtecEventChannelAdmin::EventChannel_ptr rmt_ec
=
298 this->master_
->channel (j
)->event_channel_
.in ();
299 rmt_ec
->remove_observer (this->gwys_
[j
].observer_handle ());
301 this->gwys_
[j
].shutdown ();
306 EC_Observer::dump_results (void)
308 ACE_DEBUG ((LM_DEBUG
, "===== Results for %d =====\n", this->id_
));
310 ACE_Throughput_Stats throughput
;
311 ACE_High_Res_Timer::global_scale_factor_type gsf
=
312 ACE_High_Res_Timer::global_scale_factor ();
313 for (int j
= 0; j
< this->n_consumers_
; ++j
)
315 this->consumers_
[j
]->accumulate (throughput
);
317 ACE_DEBUG ((LM_DEBUG
, "\n"));
319 ACE_Throughput_Stats suppliers
;
320 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
322 this->suppliers_
[i
]->accumulate (suppliers
);
325 ACE_DEBUG ((LM_DEBUG
, "\nTotals:\n"));
326 throughput
.dump_results (ACE_TEXT("EC_Consumer/totals"), gsf
);
328 ACE_DEBUG ((LM_DEBUG
, "\n"));
329 suppliers
.dump_results (ACE_TEXT("EC_Supplier/totals"), gsf
);
333 EC_Observer::connect_consumer (
334 RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin
,
339 this->EC_Driver::connect_consumer (consumer_admin
, i
);
342 unsigned int x
= ACE_OS::rand_r (&this->seed_
);
343 if (x
< RAND_MAX
/ 8)
344 this->EC_Driver::connect_consumer (consumer_admin
, i
);
348 EC_Observer::consumer_push (void*,
349 const RtecEventComm::EventSet
&)
351 unsigned int x
= ACE_OS::rand_r (&this->seed_
);
352 if (x
< (RAND_MAX
/ 64))
354 if (this->verbose ())
355 ACE_DEBUG ((LM_DEBUG
,
356 "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_
));
358 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
359 this->event_channel_
->for_consumers ();
361 for (int i
= 1; i
< this->n_consumers_
; ++i
)
363 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
365 if (this->consumers_
[i
]->connected ())
367 this->consumers_
[i
]->disconnect ();
371 this->EC_Driver::connect_consumer (consumer_admin
.in (),