Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / Basic / Observer.cpp
blob425b64dcf12eef5c89c452086859afa5a865bbaa
1 #include "Observer.h"
2 #include "Consumer.h"
3 #include "Supplier.h"
4 #include "orbsvcs/Event/EC_Event_Channel.h"
5 #include "orbsvcs/Event/EC_Default_Factory.h"
6 #include "ace/Arg_Shifter.h"
7 #include "ace/High_Res_Timer.h"
9 int
10 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
12 TAO_EC_Default_Factory::init_svcs ();
13 EC_Master master;
14 return master.run (argc, argv);
17 // ****************************************************************
19 EC_Master::EC_Master (void)
20 : seed_ (0),
21 n_channels_ (4),
22 channels_ (0)
26 EC_Master::~EC_Master (void)
28 if (this->channels_ != 0)
30 for (int i = 0; i < this->n_channels_; ++i)
31 delete this->channels_[i];
32 delete[] this->channels_;
36 int
37 EC_Master::run (int argc, ACE_TCHAR* argv[])
39 try
41 // Calibrate the high resolution timer *before* starting the
42 // test.
43 ACE_High_Res_Timer::calibrate ();
45 this->seed_ = static_cast<unsigned int> (ACE_OS::time (0));
47 this->initialize_orb_and_poa (argc, argv);
49 if (this->parse_args (argc, argv))
50 return 1;
52 ACE_DEBUG ((LM_DEBUG,
53 "The seed value is %d\n", this->seed_));
55 ACE_NEW_RETURN (this->channels_,
56 EC_Observer*[this->n_channels_],
57 1);
60 for (int i = 0; i != this->n_channels_; ++i)
62 ACE_OS::rand_r (&this->seed_);
63 ACE_NEW_RETURN (this->channels_[i],
64 EC_Observer (this,
65 this->seed_,
66 this->orb_.in (),
67 this->root_poa_.in (),
68 i),
69 1);
74 ACE_TCHAR** targv;
75 ACE_NEW_RETURN (targv, ACE_TCHAR*[argc], 1);
77 for (int i = 0; i != this->n_channels_; ++i)
79 int targc = argc;
80 for (int j = 0; j < targc; ++j)
81 targv[j] = argv[j];
82 this->channels_[i]->run_init (targc, targv);
84 delete[] targv;
88 for (int i = 0; i != this->n_channels_; ++i)
90 this->channels_[i]->execute_test ();
94 if (ACE_Thread_Manager::instance ()->wait () == -1)
96 ACE_ERROR ((LM_ERROR,
97 "EC_Master (%P|%t) thread manager wait failed\n"));
98 return 1;
102 for (int i = 0; i != this->n_channels_; ++i)
104 this->channels_[i]->dump_results ();
109 for (int i = 0; i != this->n_channels_; ++i)
111 this->channels_[i]->run_cleanup ();
116 for (int i = 0; i != this->n_channels_; ++i)
118 this->channels_[i]->disconnect_clients ();
119 this->channels_[i]->shutdown_clients ();
120 this->channels_[i]->destroy_ec ();
121 this->channels_[i]->deactivate_ec ();
122 this->channels_[i]->cleanup_tasks ();
123 this->channels_[i]->cleanup_suppliers ();
124 this->channels_[i]->cleanup_consumers ();
125 this->channels_[i]->cleanup_ec ();
129 this->root_poa_->destroy (1,
132 this->orb_->destroy ();
134 catch (const CORBA::Exception& ex)
136 ex._tao_print_exception ("EC_Driver::run");
138 catch (...)
140 ACE_ERROR ((LM_ERROR, "EC_Driver (%P|%t) non-corba exception raised\n"));
142 return 0;
145 void
146 EC_Master::initialize_orb_and_poa (int &argc, ACE_TCHAR* argv[])
148 this->orb_ =
149 CORBA::ORB_init (argc, argv);
151 CORBA::Object_var poa_object =
152 this->orb_->resolve_initial_references("RootPOA");
154 if (CORBA::is_nil (poa_object.in ()))
156 ACE_ERROR ((LM_ERROR,
157 "EC_Driver (%P|%t) Unable to initialize the POA.\n"));
158 return;
161 this->root_poa_ =
162 PortableServer::POA::_narrow (poa_object.in ());
164 PortableServer::POAManager_var poa_manager =
165 this->root_poa_->the_POAManager ();
167 poa_manager->activate ();
171 EC_Master::parse_args (int &argc, ACE_TCHAR *argv [])
173 ACE_Arg_Shifter arg_shifter (argc, argv);
175 while (arg_shifter.is_anything_left ())
177 const ACE_TCHAR *arg = arg_shifter.get_current ();
179 if (ACE_OS::strcmp (arg, ACE_TEXT("-channels")) == 0)
181 arg_shifter.consume_arg ();
182 this->n_channels_ = ACE_OS::atoi (arg_shifter.get_current ());
184 else if (ACE_OS::strcmp (arg, ACE_TEXT("-seed")) == 0)
186 arg_shifter.consume_arg ();
187 this->seed_ = ACE_OS::atoi (arg_shifter.get_current ());
190 arg_shifter.ignore_arg ();
192 return 0;
196 EC_Master::channel_count (void) const
198 return this->n_channels_;
201 EC_Observer*
202 EC_Master::channel (int i) const
204 return this->channels_[i];
207 // ****************************************************************
209 EC_Observer::EC_Observer (EC_Master *master,
210 unsigned int seed,
211 CORBA::ORB_ptr orb,
212 PortableServer::POA_ptr root_poa,
213 int id)
214 : master_ (master),
215 seed_ (seed),
216 id_ (id),
217 gwys_ (0)
219 this->orb_ = CORBA::ORB::_duplicate (orb);
220 this->root_poa_ = PortableServer::POA::_duplicate (root_poa);
223 EC_Observer::~EC_Observer (void)
225 if (this->gwys_ != 0)
226 delete[] this->gwys_;
229 void
230 EC_Observer::initialize_orb_and_poa (int&, ACE_TCHAR*[])
235 EC_Observer::parse_args (int& argc, ACE_TCHAR* argv[])
237 return this->EC_Driver::parse_args (argc, argv);
240 void
241 EC_Observer::print_args (void) const
243 this->EC_Driver::print_args ();
246 void
247 EC_Observer::print_usage (void)
249 this->EC_Driver::print_usage ();
252 void
253 EC_Observer::execute_test (void)
255 int peer_count = this->master_->channel_count ();
256 ACE_NEW (this->gwys_, TAO_EC_Gateway_IIOP[peer_count]);
258 for (int i = 0; i != peer_count; ++i)
260 if (i == this->id_)
261 continue;
263 RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
264 this->master_->channel (i)->event_channel_.in ();
266 this->gwys_[i].init (rmt_ec,
267 this->event_channel_.in ());
269 RtecEventChannelAdmin::Observer_var obs =
270 this->gwys_[i]._this ();
272 RtecEventChannelAdmin::Observer_Handle h =
273 rmt_ec->append_observer (obs.in ());
275 this->gwys_[i].observer_handle (h);
279 if (this->allocate_tasks () == -1)
280 return;
282 this->activate_tasks ();
284 if (this->verbose ())
285 ACE_DEBUG ((LM_DEBUG, "EC_Observer[%d] (%P|%t) suppliers are active\n",
286 this->id_));
289 void
290 EC_Observer::run_cleanup (void)
292 for (int j = 0; j != this->master_->channel_count (); ++j)
294 if (j == this->id_)
295 continue;
297 RtecEventChannelAdmin::EventChannel_ptr rmt_ec =
298 this->master_->channel (j)->event_channel_.in ();
299 rmt_ec->remove_observer (this->gwys_[j].observer_handle ());
301 this->gwys_[j].shutdown ();
305 void
306 EC_Observer::dump_results (void)
308 ACE_DEBUG ((LM_DEBUG, "===== Results for %d =====\n", this->id_));
310 ACE_Throughput_Stats throughput;
311 ACE_High_Res_Timer::global_scale_factor_type gsf =
312 ACE_High_Res_Timer::global_scale_factor ();
313 for (int j = 0; j < this->n_consumers_; ++j)
315 this->consumers_[j]->accumulate (throughput);
317 ACE_DEBUG ((LM_DEBUG, "\n"));
319 ACE_Throughput_Stats suppliers;
320 for (int i = 0; i < this->n_suppliers_; ++i)
322 this->suppliers_[i]->accumulate (suppliers);
325 ACE_DEBUG ((LM_DEBUG, "\nTotals:\n"));
326 throughput.dump_results (ACE_TEXT("EC_Consumer/totals"), gsf);
328 ACE_DEBUG ((LM_DEBUG, "\n"));
329 suppliers.dump_results (ACE_TEXT("EC_Supplier/totals"), gsf);
332 void
333 EC_Observer::connect_consumer (
334 RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
335 int i)
337 if (i == 0)
339 this->EC_Driver::connect_consumer (consumer_admin, i);
340 return;
342 unsigned int x = ACE_OS::rand_r (&this->seed_);
343 if (x < RAND_MAX / 8)
344 this->EC_Driver::connect_consumer (consumer_admin, i);
347 void
348 EC_Observer::consumer_push (void*,
349 const RtecEventComm::EventSet&)
351 unsigned int x = ACE_OS::rand_r (&this->seed_);
352 if (x < (RAND_MAX / 64))
354 if (this->verbose ())
355 ACE_DEBUG ((LM_DEBUG,
356 "EC_Observer[%d] (%P|%t) reconnecting\n", this->id_));
358 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
359 this->event_channel_->for_consumers ();
361 for (int i = 1; i < this->n_consumers_; ++i)
363 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
365 if (this->consumers_[i]->connected ())
367 this->consumers_[i]->disconnect ();
369 else
371 this->EC_Driver::connect_consumer (consumer_admin.in (),