Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / lib / Supplier.cpp
bloba1877edb1970adfbf569b9d8e2875df80c6de4da
1 #include "Supplier.h"
2 #include "orbsvcs/Time_Utilities.h"
3 #include "orbsvcs/Event_Utilities.h"
5 #include "tao/debug.h"
6 #include "ace/OS_NS_unistd.h"
8 EC_Supplier::EC_Supplier (EC_Driver *driver,
9 void* cookie)
10 : driver_ (driver),
11 cookie_ (cookie),
12 push_count_ (0),
13 burst_count_ (0),
14 burst_size_ (0),
15 payload_size_ (0),
16 burst_pause_ (0),
17 shutdown_event_type_ (0),
18 is_active_ (0)
22 void
23 EC_Supplier::send_event (int event_number)
25 if (CORBA::is_nil (this->consumer_proxy_.in ()))
26 return;
28 // Create the event...
30 RtecEventComm::EventSet event (1);
31 event.length (1);
33 event[0].header.ttl = 1;
35 ACE_hrtime_t t = ACE_OS::gethrtime ();
36 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
38 // We use replace to minimize the copies, this should result
39 // in just one memory allocation:
40 event[0].data.payload.length (this->payload_size_);
42 this->event_type (event_number, event[0]);
44 this->send_event (event);
47 void
48 EC_Supplier::send_event (const RtecEventComm::EventSet& event)
50 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
52 if (this->push_count_ == 0)
53 this->throughput_start_ = ACE_OS::gethrtime ();
55 this->push_count_ += event.length ();
57 if (TAO_debug_level > 0
58 && this->push_count_ % 100 == 0)
60 ACE_DEBUG ((LM_DEBUG,
61 "EC_Consumer (%P|%t): %d events received\n",
62 this->push_count_));
65 ACE_hrtime_t start = ACE_OS::gethrtime ();
67 this->consumer_proxy_->push (event);
69 ACE_hrtime_t end = ACE_OS::gethrtime ();
70 this->throughput_.sample (end - this->throughput_start_,
71 end - start);
74 void
75 EC_Supplier::event_type (int event_number,
76 RtecEventComm::Event &event)
78 CORBA::ULong l = this->qos_.publications.length ();
80 if (l == 0)
82 event.header.source = 0;
83 event.header.type = this->shutdown_event_type_;
85 else
87 int i = event_number % l;
88 int type = this->qos_.publications[i].event.header.type;
89 if (type == this->shutdown_event_type_)
90 i = 0;
92 RtecEventComm::EventHeader& header =
93 this->qos_.publications[i].event.header;
95 event.header.source = header.source;
96 event.header.type = header.type;
100 void
101 EC_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
102 const RtecEventChannelAdmin::SupplierQOS& qos,
103 int shutdown_event_type)
105 this->consumer_proxy_ =
106 supplier_admin->obtain_push_consumer ();
108 this->connect (qos, shutdown_event_type);
111 void
112 EC_Supplier::connect (const RtecEventChannelAdmin::SupplierQOS& qos,
113 int shutdown_event_type)
115 if (CORBA::is_nil (this->consumer_proxy_.in ()))
116 return; // @@ Throw?
118 this->qos_ = qos;
119 this->shutdown_event_type_ = shutdown_event_type;
121 if (CORBA::is_nil (this->myself_.in ()))
123 this->myself_ = this->_this ();
125 this->is_active_ = 1;
127 this->consumer_proxy_->connect_push_supplier (this->myself_.in (),
128 qos);
131 void
132 EC_Supplier::disconnect (void)
134 if (CORBA::is_nil (this->consumer_proxy_.in ()))
135 return;
137 this->consumer_proxy_->disconnect_push_consumer ();
139 this->consumer_proxy_ =
140 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
143 void
144 EC_Supplier::shutdown (void)
146 if (!this->is_active_)
147 return;
149 // Deactivate the servant
150 PortableServer::POA_var poa =
151 this->_default_POA ();
152 PortableServer::ObjectId_var id =
153 poa->servant_to_id (this);
154 poa->deactivate_object (id.in ());
155 this->is_active_ = 0;
156 this->myself_ = RtecEventComm::PushSupplier::_nil ();
159 void
160 EC_Supplier::disconnect_push_supplier (void)
162 this->driver_->supplier_disconnect (this->cookie_);
163 this->consumer_proxy_ =
164 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
167 void
168 EC_Supplier::dump_results (
169 const ACE_TCHAR* name,
170 ACE_High_Res_Timer::global_scale_factor_type gsf)
172 this->throughput_.dump_results (name, gsf);
175 void
176 EC_Supplier::accumulate (ACE_Throughput_Stats& stats) const
178 stats.accumulate (this->throughput_);
181 // ****************************************************************
183 EC_Supplier_Task::EC_Supplier_Task (EC_Supplier* supplier,
184 EC_Driver* driver,
185 void* cookie,
186 int burst_count,
187 int burst_size,
188 int burst_pause,
189 int payload_size,
190 int shutdown_event_type,
191 ACE_Thread_Manager* thr_mgr)
192 : ACE_Task_Base (thr_mgr),
193 supplier_ (supplier),
194 driver_ (driver),
195 cookie_ (cookie),
196 burst_count_ (burst_count),
197 burst_size_ (burst_size),
198 burst_pause_ (burst_pause),
199 payload_size_ (payload_size),
200 shutdown_event_type_ (shutdown_event_type)
205 EC_Supplier_Task::svc (void)
208 // Initialize a time value to pace the test
209 ACE_Time_Value tv (0, this->burst_pause_);
211 RtecEventComm::EventSet event (1);
212 event.length (1);
214 event[0].header.ttl = 1;
216 ACE_hrtime_t t = ACE_OS::gethrtime ();
217 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
219 // We use replace to minimize the copies, this should result
220 // in just one memory allocation;
221 event[0].data.payload.length (this->payload_size_);
223 for (int i = 0; i < this->burst_count_; ++i)
225 for (int j = 0; j < this->burst_size_; ++j)
229 this->supplier_->event_type (j, event[0]);
231 ACE_hrtime_t now = ACE_OS::gethrtime ();
232 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
233 now);
234 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
236 this->supplier_->send_event (event);
239 catch (const CORBA::SystemException& sys_ex)
241 sys_ex._tao_print_exception ("SYS_EX");
243 catch (const CORBA::Exception& ex)
245 ex._tao_print_exception ("SYS_EX");
248 ACE_OS::sleep (tv);
253 // Send one event shutdown from each supplier
254 event[0].header.type = this->shutdown_event_type_;
255 ACE_hrtime_t now = ACE_OS::gethrtime ();
256 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
257 now);
258 this->supplier_->send_event (event);
260 catch (const CORBA::SystemException& sys_ex)
262 sys_ex._tao_print_exception ("SYS_EX");
264 catch (const CORBA::Exception& ex)
266 ex._tao_print_exception ("SYS_EX");
269 ACE_DEBUG ((LM_DEBUG,
270 "Supplier task finished\n"));
271 return 0;