1 #include "Counting_Supplier.h"
2 #include "orbsvcs/Event_Utilities.h"
3 #include "ace/OS_NS_unistd.h"
5 EC_Counting_Supplier::EC_Counting_Supplier ()
8 consumer_adapter_ (this),
10 event_type_ (ACE_ES_EVENT_UNDEFINED
)
15 EC_Counting_Supplier::activate (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin
,
18 RtecEventComm::PushConsumer_var consumer
=
19 this->consumer_adapter_
._this ();
21 this->supplier_proxy_
=
22 consumer_admin
->obtain_push_supplier ();
24 // Let's say that the execution time for event 2 is 1
26 ACE_Time_Value
tv (0, milliseconds
* 1000);
28 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
30 ACE_ConsumerQOS_Factory consumer_qos
;
31 consumer_qos
.start_disjunction_group (1);
32 consumer_qos
.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT
,
36 this->supplier_proxy_
->connect_push_consumer (consumer
.in (),
37 consumer_qos
.get_ConsumerQOS ());
41 EC_Counting_Supplier::deactivate ()
43 this->supplier_proxy_
->disconnect_push_supplier ();
45 PortableServer::POA_var consumer_poa
=
46 this->consumer_adapter_
._default_POA ();
47 PortableServer::ObjectId_var consumer_id
=
48 consumer_poa
->servant_to_id (&this->consumer_adapter_
);
49 consumer_poa
->deactivate_object (consumer_id
.in ());
53 EC_Counting_Supplier::connect (
54 RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin
,
60 this->event_source_
= event_source
;
61 this->event_type_
= event_type
;
63 ACE_SupplierQOS_Factory supplier_qos
;
64 supplier_qos
.insert (published_source
,
67 this->connect (supplier_admin
,
68 supplier_qos
.get_SupplierQOS ());
72 EC_Counting_Supplier::connect (
73 RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin
,
74 const RtecEventChannelAdmin::SupplierQOS
&qos
)
76 RtecEventComm::PushSupplier_var supplier
=
79 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
81 this->consumer_proxy_
=
82 supplier_admin
->obtain_push_consumer ();
85 this->consumer_proxy_
->connect_push_supplier (supplier
.in (),
90 EC_Counting_Supplier::disconnect ()
92 if (!CORBA::is_nil (this->consumer_proxy_
.in ()))
94 this->consumer_proxy_
->disconnect_push_consumer ();
97 PortableServer::POA_var supplier_poa
=
98 this->_default_POA ();
99 PortableServer::ObjectId_var supplier_id
=
100 supplier_poa
->servant_to_id (this);
101 supplier_poa
->deactivate_object (supplier_id
.in ());
103 this->consumer_proxy_
=
104 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
108 EC_Counting_Supplier::push (const RtecEventComm::EventSet
&)
110 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
113 RtecEventComm::EventSet
event (1);
115 event
[0].header
.source
= this->event_source_
;
116 event
[0].header
.type
= this->event_type_
;
117 event
[0].header
.ttl
= 1;
119 this->consumer_proxy_
->push (event
);
124 EC_Counting_Supplier::disconnect_push_consumer ()
129 EC_Counting_Supplier::disconnect_push_supplier ()
131 this->disconnect_count
++;
132 this->consumer_proxy_
=
133 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
136 // ****************************************************************
138 EC_Counting_Supplier_Task::
139 EC_Counting_Supplier_Task (EC_Counting_Supplier
*s
)
147 EC_Counting_Supplier_Task::svc ()
153 catch (const CORBA::Exception
&)
161 EC_Counting_Supplier_Task::stop ()
163 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
164 this->stop_flag_
= 1;
168 EC_Counting_Supplier_Task::push_count ()
170 return this->push_count_
;
174 EC_Counting_Supplier_Task::run ()
176 this->event_
.length (1);
180 this->supplier_
->push (this->event_
);
182 // Sleep for a short time to avoid spinning...
185 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
189 stop
= this->stop_flag_
;