2 #include "orbsvcs/Event/EC_Event_Channel.h"
3 #include "orbsvcs/Event/EC_Default_Factory.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Time_Utilities.h"
6 #include "ace/Arg_Shifter.h"
7 #include "ace/OS_NS_strings.h"
8 #include "ace/OS_NS_unistd.h"
11 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
14 return driver
.run (argc
, argv
);
17 // ****************************************************************
19 const int base_type
= 20;
22 deactivate_servant (PortableServer::Servant servant
)
24 PortableServer::POA_var poa
=
25 servant
->_default_POA ();
26 PortableServer::ObjectId_var oid
=
27 poa
->servant_to_id (servant
);
28 poa
->deactivate_object (oid
.in ());
32 RND_Driver::RND_Driver (void)
40 TAO_EC_Default_Factory::init_svcs ();
44 RND_Driver::run (int argc
, ACE_TCHAR
*argv
[])
49 CORBA::ORB_init (argc
, argv
);
51 // ****************************************************************
53 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
55 while (arg_shifter
.is_anything_left ())
57 const ACE_TCHAR
*arg
= arg_shifter
.get_current ();
59 if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-suppliers")) == 0)
61 arg_shifter
.consume_arg ();
63 if (arg_shifter
.is_parameter_next ())
65 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
66 int n
= ACE_OS::atoi (opt
);
68 this->nsuppliers_
= n
;
69 arg_shifter
.consume_arg ();
72 else if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-consumers")) == 0)
74 arg_shifter
.consume_arg ();
76 if (arg_shifter
.is_parameter_next ())
78 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
79 int n
= ACE_OS::atoi (opt
);
81 this->nconsumers_
= n
;
82 arg_shifter
.consume_arg ();
85 else if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-max_recursion")) == 0)
87 arg_shifter
.consume_arg ();
89 if (arg_shifter
.is_parameter_next ())
91 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
92 int n
= ACE_OS::atoi (opt
);
94 this->max_recursion_
= n
;
95 arg_shifter
.consume_arg ();
98 else if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-verbose")) == 0)
100 arg_shifter
.consume_arg ();
105 arg_shifter
.ignore_arg ();
108 // ****************************************************************
110 CORBA::Object_var object
=
111 orb
->resolve_initial_references ("RootPOA");
112 PortableServer::POA_var poa
=
113 PortableServer::POA::_narrow (object
.in ());
114 PortableServer::POAManager_var poa_manager
=
115 poa
->the_POAManager ();
116 poa_manager
->activate ();
118 // ****************************************************************
120 TAO_EC_Event_Channel_Attributes
attributes (poa
.in (),
122 attributes
.consumer_reconnect
= 1;
123 attributes
.supplier_reconnect
= 1;
125 TAO_EC_Event_Channel
ec_impl (attributes
);
128 RtecEventChannelAdmin::EventChannel_var event_channel
=
131 // ****************************************************************
133 // Obtain the consumer admin..
134 this->consumer_admin_
=
135 event_channel
->for_consumers ();
137 // Obtain the supplier admin..
138 this->supplier_admin_
=
139 event_channel
->for_suppliers ();
141 // ****************************************************************
144 // Let's say that the execution time for event 2 is 1
146 ACE_Time_Value
tv (0, 50000);
147 TimeBase::TimeT time
;
148 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
150 ACE_ConsumerQOS_Factory qos
;
151 qos
.start_disjunction_group ();
152 // The types int the range [0,ACE_ES_EVENT_UNDEFINED) are
153 // reserved for the EC...
154 qos
.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT
,
158 this->timer_
.connect (this->consumer_admin_
.in (),
159 qos
.get_ConsumerQOS ());
162 // ****************************************************************
165 ACE_SupplierQOS_Factory qos
;
166 qos
.insert (0, base_type
, 0, 1);
168 this->supplier_
.connect (this->supplier_admin_
.in (),
169 qos
.get_SupplierQOS ());
172 // ****************************************************************
174 ACE_NEW_RETURN (this->consumers_
,
175 RND_Consumer
*[this->nconsumers_
],
177 for (int i
= 0; i
!= this->nconsumers_
; ++i
)
179 ACE_NEW_RETURN (this->consumers_
[i
],
183 CORBA::Object_var obj
=
184 this->consumers_
[i
]->_this ();
187 // ****************************************************************
189 ACE_NEW_RETURN (this->suppliers_
,
190 RND_Supplier
*[this->nsuppliers_
],
192 for (int j
= 0; j
!= this->nsuppliers_
; ++j
)
194 ACE_NEW_RETURN (this->suppliers_
[j
],
195 RND_Supplier (this->verbose_
),
197 this->suppliers_
[j
]->activate ();
199 CORBA::Object_var obj
=
200 this->suppliers_
[j
]->_this ();
203 // ****************************************************************
205 ACE_Time_Value
tv (30, 0);
208 ACE_Thread_Manager::instance ()->wait ();
210 // ****************************************************************
213 for (int k
= 0; k
!= this->nsuppliers_
; ++k
)
215 deactivate_servant (this->suppliers_
[k
]);
216 this->suppliers_
[k
]->_remove_ref ();
218 delete[] this->suppliers_
;
219 this->suppliers_
= 0;
222 // ****************************************************************
224 // We destroy now to verify that the callbacks work and do not
225 // produce any problems.
226 event_channel
->destroy ();
228 // ****************************************************************
231 for (int k
= 0; k
!= this->nconsumers_
; ++k
)
233 deactivate_servant (this->consumers_
[k
]);
234 this->consumers_
[k
]->_remove_ref ();
236 delete[] this->consumers_
;
237 this->consumers_
= 0;
240 // ****************************************************************
242 deactivate_servant (&ec_impl
);
244 // ****************************************************************
248 // ****************************************************************
252 catch (const CORBA::Exception
& ex
)
254 ex
._tao_print_exception ("Random");
261 RND_Driver::timer (const RtecEventComm::Event
&e
)
263 int r
= ACE_OS::rand ();
274 // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n"));
275 if (e
.header
.source
< this->max_recursion_
)
277 RtecEventComm::EventSet
event (1);
280 event
[0].header
.source
++;
281 this->supplier_
.push (event
);
291 // ACE_DEBUG ((LM_DEBUG, "Received event\n"));
296 int n
= ACE_OS::rand () % this->nsuppliers_
;
298 // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n));
300 ACE_SupplierQOS_Factory qos
;
301 qos
.insert (0, base_type
, 0, 1);
303 this->suppliers_
[n
]->connect (this->supplier_admin_
.in (),
304 qos
.get_SupplierQOS ());
310 int n
= ACE_OS::rand () % this->nconsumers_
;
312 // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n));
314 ACE_ConsumerQOS_Factory qos
;
315 qos
.start_disjunction_group ();
316 qos
.insert_type (base_type
, 0);
318 this->consumers_
[n
]->connect (this->consumer_admin_
.in (),
319 qos
.get_ConsumerQOS ());
325 int n
= ACE_OS::rand () % this->nsuppliers_
;
327 // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n));
329 this->suppliers_
[n
]->disconnect ();
335 int n
= ACE_OS::rand () % this->nconsumers_
;
337 // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n));
339 this->consumers_
[n
]->disconnect ();
346 RND_Driver::event (const RtecEventComm::Event
&e
)
351 // ****************************************************************
354 RND_Timer::push (const RtecEventComm::EventSet
&event
)
358 this->driver_
->timer (event
[0]);
360 catch (const CORBA::Exception
&)
365 // ****************************************************************
368 RND_Consumer::connect (RtecEventChannelAdmin::ConsumerAdmin_ptr admin
,
369 const RtecEventChannelAdmin::ConsumerQOS
&qos
)
371 RtecEventChannelAdmin::ProxyPushSupplier_var proxy
;
373 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
375 if (CORBA::is_nil (this->proxy_
.in ()))
377 this->proxy_
= admin
->obtain_push_supplier ();
380 RtecEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_
.in ());
382 RtecEventComm::PushConsumer_var me
=
384 proxy
->connect_push_consumer (me
.in (),
389 RND_Consumer::disconnect (void)
391 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
393 if (CORBA::is_nil (this->proxy_
.in ()))
395 this->proxy_
->disconnect_push_supplier ();
397 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
401 RND_Consumer::push (const RtecEventComm::EventSet
&event
)
403 this->driver_
->event (event
[0]);
407 RND_Consumer::disconnect_push_consumer (void)
411 // ****************************************************************
414 RND_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr admin
,
415 const RtecEventChannelAdmin::SupplierQOS
&qos
)
417 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
;
419 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
421 if (CORBA::is_nil (this->proxy_
.in ()))
423 this->proxy_
= admin
->obtain_push_consumer ();
427 RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_
.in ());
429 RtecEventComm::PushSupplier_var me
=
431 proxy
->connect_push_supplier (me
.in (),
436 RND_Supplier::disconnect (void)
438 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
440 if (CORBA::is_nil (this->proxy_
.in ()))
442 this->proxy_
->disconnect_push_consumer ();
444 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
448 RND_Supplier::push_new_event (void)
450 RtecEventComm::EventSet
event (1);
452 event
[0].header
.type
= base_type
;
453 event
[0].header
.source
= 0;
459 RND_Supplier::push (RtecEventComm::EventSet
&event
)
461 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
;
463 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
465 if (CORBA::is_nil (this->proxy_
.in ()))
469 RtecEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_
.in ());
476 RND_Supplier::disconnect_push_supplier (void)
481 RND_Supplier::svc (void)
483 ACE_DEBUG ((LM_DEBUG
, "Thread %t started\n"));
485 int niterations
= 5000;
486 for (int i
= 0; i
!= niterations
; ++i
)
490 ACE_Time_Value
tv (0, 10000);
493 this->push_new_event ();
495 catch (const CORBA::Exception
&)
499 && i
* 100 / niterations
>= percent
)
501 ACE_DEBUG ((LM_DEBUG
, "Thread %t %d%%\n", percent
));
505 ACE_DEBUG ((LM_DEBUG
, "Thread %t completed\n"));