2 #include "orbsvcs/CosEvent/CEC_EventChannel.h"
3 #include "orbsvcs/CosEvent/CEC_Default_Factory.h"
4 #include "ace/Arg_Shifter.h"
5 #include "ace/OS_NS_strings.h"
6 #include "ace/OS_NS_unistd.h"
9 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
12 return driver
.run (argc
, argv
);
15 // ****************************************************************
18 deactivate_servant (PortableServer::Servant servant
)
20 PortableServer::POA_var poa
=
21 servant
->_default_POA ();
22 PortableServer::ObjectId_var oid
=
23 poa
->servant_to_id (servant
);
24 poa
->deactivate_object (oid
.in ());
28 RND_Driver::RND_Driver ()
33 TAO_CEC_Default_Factory::init_svcs ();
37 RND_Driver::run (int argc
, ACE_TCHAR
*argv
[])
42 CORBA::ORB_init (argc
, argv
);
44 // ****************************************************************
46 ACE_Arg_Shifter
arg_shifter (argc
, argv
);
48 while (arg_shifter
.is_anything_left ())
50 const ACE_TCHAR
*arg
= arg_shifter
.get_current ();
52 if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-suppliers")) == 0)
54 arg_shifter
.consume_arg ();
56 if (arg_shifter
.is_parameter_next ())
58 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
59 int n
= ACE_OS::atoi (opt
);
61 this->nsuppliers_
= n
;
62 arg_shifter
.consume_arg ();
65 else if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-consumers")) == 0)
67 arg_shifter
.consume_arg ();
69 if (arg_shifter
.is_parameter_next ())
71 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
72 int n
= ACE_OS::atoi (opt
);
74 this->nconsumers_
= n
;
75 arg_shifter
.consume_arg ();
78 else if (ACE_OS::strcasecmp (arg
, ACE_TEXT("-max_recursion")) == 0)
80 arg_shifter
.consume_arg ();
82 if (arg_shifter
.is_parameter_next ())
84 const ACE_TCHAR
* opt
= arg_shifter
.get_current ();
85 int n
= ACE_OS::atoi (opt
);
87 this->max_recursion_
= n
;
88 arg_shifter
.consume_arg ();
92 arg_shifter
.ignore_arg ();
95 // ****************************************************************
97 CORBA::Object_var object
=
98 orb
->resolve_initial_references ("RootPOA");
99 PortableServer::POA_var poa
=
100 PortableServer::POA::_narrow (object
.in ());
101 PortableServer::POAManager_var poa_manager
=
102 poa
->the_POAManager ();
103 poa_manager
->activate ();
105 // ****************************************************************
107 TAO_CEC_EventChannel_Attributes
attributes (poa
.in (),
109 attributes
.consumer_reconnect
= 1;
110 attributes
.supplier_reconnect
= 1;
112 TAO_CEC_EventChannel
ec_impl (attributes
);
115 CosEventChannelAdmin::EventChannel_var event_channel
=
118 // ****************************************************************
120 // Obtain the consumer admin..
121 this->consumer_admin_
=
122 event_channel
->for_consumers ();
124 // Obtain the supplier admin..
125 this->supplier_admin_
=
126 event_channel
->for_suppliers ();
128 // ****************************************************************
130 this->supplier_
.connect (this->supplier_admin_
.in ());
132 // ****************************************************************
134 ACE_NEW_RETURN (this->consumers_
,
135 RND_Consumer
*[this->nconsumers_
],
137 for (int i
= 0; i
!= this->nconsumers_
; ++i
)
139 ACE_NEW_RETURN (this->consumers_
[i
],
143 CORBA::Object_var obj
=
144 this->consumers_
[i
]->_this ();
147 // ****************************************************************
149 ACE_NEW_RETURN (this->suppliers_
,
150 RND_Supplier
*[this->nsuppliers_
],
152 for (int j
= 0; j
!= this->nsuppliers_
; ++j
)
154 ACE_NEW_RETURN (this->suppliers_
[j
],
157 this->suppliers_
[j
]->activate ();
159 CORBA::Object_var obj
=
160 this->suppliers_
[j
]->_this ();
163 // ****************************************************************
165 for (int event_count
= 0; event_count
!= 500; ++event_count
)
167 ACE_Time_Value
tv (0, 50000);
169 this->supplier_
.push_new_event ();
172 ACE_Thread_Manager::instance ()->wait ();
174 // ****************************************************************
177 for (int k
= 0; k
!= this->nsuppliers_
; ++k
)
179 deactivate_servant (this->suppliers_
[k
]);
180 this->suppliers_
[k
]->_remove_ref ();
182 delete[] this->suppliers_
;
183 this->suppliers_
= 0;
186 // ****************************************************************
188 // We destroy now to verify that the callbacks work and do not
189 // produce any problems.
190 event_channel
->destroy ();
192 // ****************************************************************
195 for (int k
= 0; k
!= this->nconsumers_
; ++k
)
197 deactivate_servant (this->consumers_
[k
]);
198 this->consumers_
[k
]->_remove_ref ();
200 delete[] this->consumers_
;
201 this->consumers_
= 0;
204 // ****************************************************************
206 deactivate_servant (&ec_impl
);
208 // ****************************************************************
210 poa
->destroy (true, true);
212 // ****************************************************************
216 catch (const CORBA::Exception
& ex
)
218 ex
._tao_print_exception ("Random");
225 RND_Driver::timer (const CORBA::Any
&e
)
227 int r
= ACE_OS::rand ();
238 CORBA::Long recursion
;
240 // ACE_DEBUG ((LM_DEBUG, "Pushing an event\n"));
241 if (recursion
< this->max_recursion_
)
243 CORBA::Any new_event
;
245 new_event
<<= recursion
;
246 this->supplier_
.push (new_event
);
256 // ACE_DEBUG ((LM_DEBUG, "Received event\n"));
261 int n
= ACE_OS::rand () % this->nsuppliers_
;
263 // ACE_DEBUG ((LM_DEBUG, "Connecting supplier %d\n", n));
265 this->suppliers_
[n
]->connect (this->supplier_admin_
.in ());
271 int n
= ACE_OS::rand () % this->nconsumers_
;
273 // ACE_DEBUG ((LM_DEBUG, "Connecting consumer %d\n", n));
275 this->consumers_
[n
]->connect (this->consumer_admin_
.in ());
281 int n
= ACE_OS::rand () % this->nsuppliers_
;
283 // ACE_DEBUG ((LM_DEBUG, "Disconnecting supplier %d\n", n));
285 this->suppliers_
[n
]->disconnect ();
291 int n
= ACE_OS::rand () % this->nconsumers_
;
293 // ACE_DEBUG ((LM_DEBUG, "Disconnecting consumer %d\n", n));
295 this->consumers_
[n
]->disconnect ();
302 RND_Driver::event (const CORBA::Any
&e
)
307 // ****************************************************************
310 RND_Consumer::connect (CosEventChannelAdmin::ConsumerAdmin_ptr admin
)
312 CosEventChannelAdmin::ProxyPushSupplier_var proxy
;
314 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
316 if (CORBA::is_nil (this->proxy_
.in ()))
318 this->proxy_
= admin
->obtain_push_supplier ();
321 CosEventChannelAdmin::ProxyPushSupplier::_duplicate(this->proxy_
.in ());
323 CosEventComm::PushConsumer_var me
=
325 proxy
->connect_push_consumer (me
.in ());
329 RND_Consumer::disconnect ()
331 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
333 if (CORBA::is_nil (this->proxy_
.in ()))
335 this->proxy_
->disconnect_push_supplier ();
337 CosEventChannelAdmin::ProxyPushSupplier::_nil ();
341 RND_Consumer::push (const CORBA::Any
&event
)
343 this->driver_
->event (event
);
347 RND_Consumer::disconnect_push_consumer ()
351 // ****************************************************************
354 RND_Supplier::connect (CosEventChannelAdmin::SupplierAdmin_ptr admin
)
356 CosEventChannelAdmin::ProxyPushConsumer_var proxy
;
358 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
360 if (CORBA::is_nil (this->proxy_
.in ()))
362 this->proxy_
= admin
->obtain_push_consumer ();
366 CosEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_
.in ());
368 CosEventComm::PushSupplier_var me
;
370 int r
= ACE_OS::rand () % 2;
375 proxy
->connect_push_supplier (me
.in ());
379 RND_Supplier::disconnect ()
381 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
383 if (CORBA::is_nil (this->proxy_
.in ()))
385 this->proxy_
->disconnect_push_consumer ();
387 CosEventChannelAdmin::ProxyPushConsumer::_nil ();
391 RND_Supplier::push_new_event ()
394 CORBA::Long recursion
= 0;
401 RND_Supplier::push (CORBA::Any
&event
)
403 CosEventChannelAdmin::ProxyPushConsumer_var proxy
;
405 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
407 if (CORBA::is_nil (this->proxy_
.in ()))
411 CosEventChannelAdmin::ProxyPushConsumer::_duplicate(this->proxy_
.in ());
418 RND_Supplier::disconnect_push_supplier ()
425 ACE_DEBUG ((LM_DEBUG
, "Thread %t started\n"));
427 int niterations
= 5000;
428 for (int i
= 0; i
!= niterations
; ++i
)
432 ACE_Time_Value
tv (0, 10000);
435 this->push_new_event ();
437 catch (const CORBA::Exception
&)
440 if (i
* 100 / niterations
>= percent
)
442 ACE_DEBUG ((LM_DEBUG
, "Thread %t %d%%\n", percent
));
446 ACE_DEBUG ((LM_DEBUG
, "Thread %t completed\n"));