Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / lib / Counting_Supplier.cpp
blob05f6d5d455617a2e93bad84cf3a96f8512de394f
1 #include "Counting_Supplier.h"
2 #include "orbsvcs/Event_Utilities.h"
3 #include "ace/OS_NS_unistd.h"
5 EC_Counting_Supplier::EC_Counting_Supplier ()
6 : event_count (0),
7 disconnect_count (0),
8 consumer_adapter_ (this),
9 event_source_ (-1),
10 event_type_ (ACE_ES_EVENT_UNDEFINED)
14 void
15 EC_Counting_Supplier::activate (RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
16 int milliseconds)
18 RtecEventComm::PushConsumer_var consumer =
19 this->consumer_adapter_._this ();
21 this->supplier_proxy_ =
22 consumer_admin->obtain_push_supplier ();
24 // Let's say that the execution time for event 2 is 1
25 // milliseconds...
26 ACE_Time_Value tv (0, milliseconds * 1000);
27 TimeBase::TimeT time;
28 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
30 ACE_ConsumerQOS_Factory consumer_qos;
31 consumer_qos.start_disjunction_group (1);
32 consumer_qos.insert_time (ACE_ES_EVENT_INTERVAL_TIMEOUT,
33 time,
34 0);
36 this->supplier_proxy_->connect_push_consumer (consumer.in (),
37 consumer_qos.get_ConsumerQOS ());
40 void
41 EC_Counting_Supplier::deactivate ()
43 this->supplier_proxy_->disconnect_push_supplier ();
45 PortableServer::POA_var consumer_poa =
46 this->consumer_adapter_._default_POA ();
47 PortableServer::ObjectId_var consumer_id =
48 consumer_poa->servant_to_id (&this->consumer_adapter_);
49 consumer_poa->deactivate_object (consumer_id.in ());
52 void
53 EC_Counting_Supplier::connect (
54 RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
55 int published_source,
56 int published_type,
57 int event_source,
58 int event_type)
60 this->event_source_ = event_source;
61 this->event_type_ = event_type;
63 ACE_SupplierQOS_Factory supplier_qos;
64 supplier_qos.insert (published_source,
65 published_type,
66 0, 1);
67 this->connect (supplier_admin,
68 supplier_qos.get_SupplierQOS ());
71 void
72 EC_Counting_Supplier::connect (
73 RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
74 const RtecEventChannelAdmin::SupplierQOS &qos)
76 RtecEventComm::PushSupplier_var supplier =
77 this->_this ();
79 if (CORBA::is_nil (this->consumer_proxy_.in ()))
81 this->consumer_proxy_ =
82 supplier_admin->obtain_push_consumer ();
85 this->consumer_proxy_->connect_push_supplier (supplier.in (),
86 qos);
89 void
90 EC_Counting_Supplier::disconnect ()
92 if (!CORBA::is_nil (this->consumer_proxy_.in ()))
94 this->consumer_proxy_->disconnect_push_consumer ();
97 PortableServer::POA_var supplier_poa =
98 this->_default_POA ();
99 PortableServer::ObjectId_var supplier_id =
100 supplier_poa->servant_to_id (this);
101 supplier_poa->deactivate_object (supplier_id.in ());
103 this->consumer_proxy_ =
104 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
107 void
108 EC_Counting_Supplier::push (const RtecEventComm::EventSet&)
110 if (CORBA::is_nil (this->consumer_proxy_.in ()))
111 return;
113 RtecEventComm::EventSet event (1);
114 event.length (1);
115 event[0].header.source = this->event_source_;
116 event[0].header.type = this->event_type_;
117 event[0].header.ttl = 1;
119 this->consumer_proxy_->push (event);
120 this->event_count++;
123 void
124 EC_Counting_Supplier::disconnect_push_consumer ()
128 void
129 EC_Counting_Supplier::disconnect_push_supplier ()
131 this->disconnect_count++;
132 this->consumer_proxy_ =
133 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
136 // ****************************************************************
138 EC_Counting_Supplier_Task::
139 EC_Counting_Supplier_Task (EC_Counting_Supplier *s)
140 : supplier_ (s),
141 stop_flag_ (0),
142 push_count_ (0)
147 EC_Counting_Supplier_Task::svc ()
151 this->run ();
153 catch (const CORBA::Exception&)
155 return -1;
157 return 0;
160 void
161 EC_Counting_Supplier_Task::stop ()
163 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
164 this->stop_flag_ = 1;
167 CORBA::ULong
168 EC_Counting_Supplier_Task::push_count ()
170 return this->push_count_;
173 void
174 EC_Counting_Supplier_Task::run ()
176 this->event_.length (1);
178 int stop = 0;
179 do {
180 this->supplier_->push (this->event_);
182 // Sleep for a short time to avoid spinning...
183 ACE_OS::sleep (0);
185 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
187 this->push_count_++;
189 stop = this->stop_flag_;
190 } while (stop == 0);