Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / Basic / Random.cpp
blobb3b557a1c896c0dc65c399cf8ba04e5d79be0e94
1 #include "Random.h"
2 #include "orbsvcs/Event/EC_Event_Channel.h"
3 #include "orbsvcs/Event/EC_Default_Factory.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Time_Utilities.h"
6 #include "ace/Arg_Shifter.h"
7 #include "ace/OS_NS_strings.h"
8 #include "ace/OS_NS_unistd.h"
10 int
11 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
13 RND_Driver driver;
14 return driver.run (argc, argv);
17 // ****************************************************************
19 const int base_type = 20;
21 void
22 deactivate_servant (PortableServer::Servant servant)
24 PortableServer::POA_var poa =
25 servant->_default_POA ();
26 PortableServer::ObjectId_var oid =
27 poa->servant_to_id (servant);
28 poa->deactivate_object (oid.in ());
32 RND_Driver::RND_Driver (void)
33 : timer_ (this),
34 supplier_ (0),
35 nsuppliers_ (4),
36 nconsumers_ (4),
37 max_recursion_ (1),
38 verbose_ (0)
40 TAO_EC_Default_Factory::init_svcs ();
43 int
44 RND_Driver::run (int argc, ACE_TCHAR *argv[])
46 try
48 CORBA::ORB_var orb =
49 CORBA::ORB_init (argc, argv);
51 // ****************************************************************
53 ACE_Arg_Shifter arg_shifter (argc, argv);
55 while (arg_shifter.is_anything_left ())
57 const ACE_TCHAR *arg = arg_shifter.get_current ();
59 if (ACE_OS::strcasecmp (arg, ACE_TEXT("-suppliers")) == 0)
61 arg_shifter.consume_arg ();
63 if (arg_shifter.is_parameter_next ())
65 const ACE_TCHAR* opt = arg_shifter.get_current ();
66 int n = ACE_OS::atoi (opt);
67 if (n >= 1)
68 this->nsuppliers_ = n;
69 arg_shifter.consume_arg ();
72 else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-consumers")) == 0)
74 arg_shifter.consume_arg ();
76 if (arg_shifter.is_parameter_next ())
78 const ACE_TCHAR* opt = arg_shifter.get_current ();
79 int n = ACE_OS::atoi (opt);
80 if (n >= 1)
81 this->nconsumers_ = n;
82 arg_shifter.consume_arg ();
85 else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-max_recursion")) == 0)
87 arg_shifter.consume_arg ();
89 if (arg_shifter.is_parameter_next ())
91 const ACE_TCHAR* opt = arg_shifter.get_current ();
92 int n = ACE_OS::atoi (opt);
93 if (n >= 0)
94 this->max_recursion_ = n;
95 arg_shifter.consume_arg ();
98 else if (ACE_OS::strcasecmp (arg, ACE_TEXT("-verbose")) == 0)
100 arg_shifter.consume_arg ();
102 this->verbose_ = 1;
104 else
105 arg_shifter.ignore_arg ();
108 // ****************************************************************
110 CORBA::Object_var object =
111 orb->resolve_initial_references ("RootPOA");
112 PortableServer::POA_var poa =
113 PortableServer::POA::_narrow (object.in ());
114 PortableServer::POAManager_var poa_manager =
115 poa->the_POAManager ();
116 poa_manager->activate ();
118 // ****************************************************************
120 TAO_EC_Event_Channel_Attributes attributes (poa.in (),
121 poa.in ());
122 attributes.consumer_reconnect = 1;
123 attributes.supplier_reconnect = 1;
125 TAO_EC_Event_Channel ec_impl (attributes);
126 ec_impl.activate ();
128 RtecEventChannelAdmin::EventChannel_var event_channel =
129 ec_impl._this ();
131 // ****************************************************************
133 // Obtain the consumer admin..
134 this->consumer_admin_ =
135 event_channel->for_consumers ();
137 // Obtain the supplier admin..
138 this->supplier_admin_ =
139 event_channel->for_suppliers ();
141 // ****************************************************************
144 // Let's say that the execution time for event 2 is 1
145 // milliseconds...
146 ACE_Time_Value tv (0, 50000);
147 TimeBase::TimeT time;
148 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
150 ACE_ConsumerQOS_Factory qos;
151 qos.start_disjunction_group ();
152 // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are
153 // reserved for the EC...
154 qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
155 time,
158 this->timer_.connect (this->consumer_admin_.in (),
159 qos.get_ConsumerQOS ());
162 // ****************************************************************
165 ACE_SupplierQOS_Factory qos;
166 qos.insert (0, base_type, 0, 1);
168 this->supplier_.connect (this->supplier_admin_.in (),
169 qos.get_SupplierQOS ());
172 // ****************************************************************
174 ACE_NEW_RETURN (this->consumers_,
175 RND_Consumer*[this->nconsumers_],
177 for (int i = 0; i != this->nconsumers_; ++i)
179 ACE_NEW_RETURN (this->consumers_[i],
180 RND_Consumer (this),
183 CORBA::Object_var obj =
184 this->consumers_[i]->_this ();
187 // ****************************************************************
189 ACE_NEW_RETURN (this->suppliers_,
190 RND_Supplier*[this->nsuppliers_],
192 for (int j = 0; j != this->nsuppliers_; ++j)
194 ACE_NEW_RETURN (this->suppliers_[j],
195 RND_Supplier (this->verbose_),
197 this->suppliers_[j]->activate ();
199 CORBA::Object_var obj =
200 this->suppliers_[j]->_this ();
203 // ****************************************************************
205 ACE_Time_Value tv (30, 0);
206 orb->run (tv);
208 ACE_Thread_Manager::instance ()->wait ();
210 // ****************************************************************
213 for (int k = 0; k != this->nsuppliers_; ++k)
215 deactivate_servant (this->suppliers_[k]);
216 this->suppliers_[k]->_remove_ref ();
218 delete[] this->suppliers_;
219 this->suppliers_ = 0;
222 // ****************************************************************
224 // We destroy now to verify that the callbacks work and do not
225 // produce any problems.
226 event_channel->destroy ();
228 // ****************************************************************
231 for (int k = 0; k != this->nconsumers_; ++k)
233 deactivate_servant (this->consumers_[k]);
234 this->consumers_[k]->_remove_ref ();
236 delete[] this->consumers_;
237 this->consumers_ = 0;
240 // ****************************************************************
242 deactivate_servant (&ec_impl);
244 // ****************************************************************
246 poa->destroy (1, 1);
248 // ****************************************************************
250 orb->destroy ();
252 catch (const CORBA::Exception& ex)
254 ex._tao_print_exception ("Random");
255 return 1;
257 return 0;
260 void
261 RND_Driver::timer (const RtecEventComm::Event &e)
263 int r = ACE_OS::rand ();
264 if (r < 0)
265 r = -r;
267 int n = r% 20;
269 switch (n)
271 case 0:
272 case 1:
274 // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n"));
275 if (e.header.source < this->max_recursion_)
277 RtecEventComm::EventSet event (1);
278 event.length (1);
279 event[0] = e;
280 event[0].header.source ++;
281 this->supplier_.push (event);
284 break;
286 default:
287 case 2:
288 case 3:
289 case 4:
290 case 5:
291 // ACE_DEBUG ((LM_DEBUG, "Received event\n"));
292 break;
294 case 6:
296 int n = ACE_OS::rand () % this->nsuppliers_;
298 // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n));
300 ACE_SupplierQOS_Factory qos;
301 qos.insert (0, base_type, 0, 1);
303 this->suppliers_[n]->connect (this->supplier_admin_.in (),
304 qos.get_SupplierQOS ());
306 break;
308 case 7:
310 int n = ACE_OS::rand () % this->nconsumers_;
312 // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n));
314 ACE_ConsumerQOS_Factory qos;
315 qos.start_disjunction_group ();
316 qos.insert_type (base_type, 0);
318 this->consumers_[n]->connect (this->consumer_admin_.in (),
319 qos.get_ConsumerQOS ());
321 break;
323 case 8:
325 int n = ACE_OS::rand () % this->nsuppliers_;
327 // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n));
329 this->suppliers_[n]->disconnect ();
331 break;
333 case 9:
335 int n = ACE_OS::rand () % this->nconsumers_;
337 // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n));
339 this->consumers_[n]->disconnect ();
341 break;
345 void
346 RND_Driver::event (const RtecEventComm::Event &e)
348 this->timer (e);
351 // ****************************************************************
353 void
354 RND_Timer::push (const RtecEventComm::EventSet &event)
358 this->driver_->timer (event[0]);
360 catch (const CORBA::Exception&)
365 // ****************************************************************
367 void
368 RND_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin,
369 const RtecEventChannelAdmin::ConsumerQOS &qos)
371 RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
373 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
375 if (CORBA::is_nil (this->proxy_.in ()))
377 this->proxy_ = admin->obtain_push_supplier ();
379 proxy =
380 RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_.in ());
382 RtecEventComm::PushConsumer_var me =
383 this->_this ();
384 proxy->connect_push_consumer (me.in (),
385 qos);
388 void
389 RND_Consumer::disconnect (void)
391 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
393 if (CORBA::is_nil (this->proxy_.in ()))
394 return;
395 this->proxy_->disconnect_push_supplier ();
396 this->proxy_ =
397 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
400 void
401 RND_Consumer::push (const RtecEventComm::EventSet &event)
403 this->driver_->event (event[0]);
406 void
407 RND_Consumer::disconnect_push_consumer (void)
411 // ****************************************************************
413 void
414 RND_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin,
415 const RtecEventChannelAdmin::SupplierQOS &qos)
417 RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
419 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
421 if (CORBA::is_nil (this->proxy_.in ()))
423 this->proxy_ = admin->obtain_push_consumer ();
426 proxy =
427 RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
429 RtecEventComm::PushSupplier_var me =
430 this->_this ();
431 proxy->connect_push_supplier (me.in (),
432 qos);
435 void
436 RND_Supplier::disconnect (void)
438 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
440 if (CORBA::is_nil (this->proxy_.in ()))
441 return;
442 this->proxy_->disconnect_push_consumer ();
443 this->proxy_ =
444 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
447 void
448 RND_Supplier::push_new_event (void)
450 RtecEventComm::EventSet event (1);
451 event.length (1);
452 event[0].header.type = base_type;
453 event[0].header.source = 0;
455 this->push (event);
458 void
459 RND_Supplier::push (RtecEventComm::EventSet &event)
461 RtecEventChannelAdmin::ProxyPushConsumer_var proxy;
463 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
465 if (CORBA::is_nil (this->proxy_.in ()))
466 return;
468 proxy =
469 RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_.in ());
472 proxy->push (event);
475 void
476 RND_Supplier::disconnect_push_supplier (void)
481 RND_Supplier::svc (void)
483 ACE_DEBUG ((LM_DEBUG, "Thread %t started\n"));
484 int percent = 10;
485 int niterations = 5000;
486 for (int i = 0; i != niterations; ++i)
490 ACE_Time_Value tv (0, 10000);
491 ACE_OS::sleep (tv);
493 this->push_new_event ();
495 catch (const CORBA::Exception&)
498 if (this->verbose_
499 && i * 100 / niterations >= percent)
501 ACE_DEBUG ((LM_DEBUG, "Thread %t %d%%\n", percent));
502 percent += 10;
505 ACE_DEBUG ((LM_DEBUG, "Thread %t completed\n"));
506 return 0;