Merge pull request #2306 from mitza-oci/warnings
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / lib / Supplier.cpp
blobffce96cff06bd37eeb6aa3e7397a27ff2efc01ae
1 #include "Supplier.h"
2 #include "orbsvcs/Time_Utilities.h"
3 #include "orbsvcs/Event_Utilities.h"
5 #include "tao/debug.h"
6 #include "ace/OS_NS_unistd.h"
8 EC_Supplier::EC_Supplier (EC_Driver *driver,
9 void* cookie)
10 : driver_ (driver),
11 cookie_ (cookie),
12 push_count_ (0),
13 burst_count_ (0),
14 burst_size_ (0),
15 payload_size_ (0),
16 burst_pause_ (0),
17 shutdown_event_type_ (0),
18 is_active_ (0)
22 void
23 EC_Supplier::send_event (int event_number)
25 if (CORBA::is_nil (this->consumer_proxy_.in ()))
26 return;
28 // Create the event...
30 RtecEventComm::EventSet event (1);
31 event.length (1);
33 event[0].header.ttl = 1;
35 ACE_hrtime_t t = ACE_OS::gethrtime ();
36 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
38 // We use replace to minimize the copies, this should result
39 // in just one memory allocation:
40 event[0].data.payload.length (this->payload_size_);
42 this->event_type (event_number, event[0]);
44 this->send_event (event);
47 void
48 EC_Supplier::send_event (const RtecEventComm::EventSet& event)
50 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
52 if (this->push_count_ == 0)
53 this->throughput_start_ = ACE_OS::gethrtime ();
55 this->push_count_ += event.length ();
57 if (TAO_debug_level > 0
58 && this->push_count_ % 100 == 0)
60 ACE_DEBUG ((LM_DEBUG,
61 "EC_Consumer (%P|%t): %d events received\n",
62 this->push_count_));
65 ACE_hrtime_t start = ACE_OS::gethrtime ();
67 this->consumer_proxy_->push (event);
69 ACE_hrtime_t end = ACE_OS::gethrtime ();
70 this->throughput_.sample (end - this->throughput_start_,
71 end - start);
74 void
75 EC_Supplier::event_type (int event_number,
76 RtecEventComm::Event &event)
78 CORBA::ULong l = this->qos_.publications.length ();
80 if (l == 0)
82 event.header.source = 0;
83 event.header.type = this->shutdown_event_type_;
85 else
87 int i = event_number % l;
88 int type = this->qos_.publications[i].event.header.type;
89 if (type == this->shutdown_event_type_)
90 i = 0;
92 RtecEventComm::EventHeader& header =
93 this->qos_.publications[i].event.header;
95 event.header.source = header.source;
96 event.header.type = header.type;
100 void
101 EC_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin,
102 const RtecEventChannelAdmin::SupplierQOS& qos,
103 int shutdown_event_type)
105 this->consumer_proxy_ =
106 supplier_admin->obtain_push_consumer ();
108 this->connect (qos, shutdown_event_type);
111 void
112 EC_Supplier::connect (const RtecEventChannelAdmin::SupplierQOS& qos,
113 int shutdown_event_type)
115 if (CORBA::is_nil (this->consumer_proxy_.in ()))
116 return; // @@ Throw?
118 this->qos_ = qos;
119 this->shutdown_event_type_ = shutdown_event_type;
121 if (CORBA::is_nil (this->myself_.in ()))
123 this->myself_ = this->_this ();
125 this->is_active_ = 1;
127 this->consumer_proxy_->connect_push_supplier (this->myself_.in (),
128 qos);
131 void
132 EC_Supplier::disconnect ()
134 if (CORBA::is_nil (this->consumer_proxy_.in ()))
135 return;
137 this->consumer_proxy_->disconnect_push_consumer ();
139 this->consumer_proxy_ =
140 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
143 void
144 EC_Supplier::shutdown ()
146 if (!this->is_active_)
147 return;
149 // Deactivate the servant
150 PortableServer::POA_var poa =
151 this->_default_POA ();
152 PortableServer::ObjectId_var id =
153 poa->servant_to_id (this);
154 poa->deactivate_object (id.in ());
155 this->is_active_ = 0;
156 this->myself_ = RtecEventComm::PushSupplier::_nil ();
159 void
160 EC_Supplier::disconnect_push_supplier ()
162 this->driver_->supplier_disconnect (this->cookie_);
163 this->consumer_proxy_ =
164 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
167 void
168 EC_Supplier::dump_results (
169 const ACE_TCHAR* name,
170 ACE_High_Res_Timer::global_scale_factor_type gsf)
172 this->throughput_.dump_results (name, gsf);
175 void
176 EC_Supplier::accumulate (ACE_Throughput_Stats& stats) const
178 stats.accumulate (this->throughput_);
181 // ****************************************************************
183 EC_Supplier_Task::EC_Supplier_Task (EC_Supplier* supplier,
184 EC_Driver* driver,
185 void* cookie,
186 int burst_count,
187 int burst_size,
188 int burst_pause,
189 int payload_size,
190 int shutdown_event_type,
191 ACE_Thread_Manager* thr_mgr)
192 : ACE_Task_Base (thr_mgr),
193 supplier_ (supplier),
194 driver_ (driver),
195 cookie_ (cookie),
196 burst_count_ (burst_count),
197 burst_size_ (burst_size),
198 burst_pause_ (burst_pause),
199 payload_size_ (payload_size),
200 shutdown_event_type_ (shutdown_event_type)
205 EC_Supplier_Task::svc ()
207 // Initialize a time value to pace the test
208 ACE_Time_Value tv (0, this->burst_pause_);
210 RtecEventComm::EventSet event (1);
211 event.length (1);
213 event[0].header.ttl = 1;
215 ACE_hrtime_t t = ACE_OS::gethrtime ();
216 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
218 // We use replace to minimize the copies, this should result
219 // in just one memory allocation;
220 event[0].data.payload.length (this->payload_size_);
222 for (int i = 0; i < this->burst_count_; ++i)
224 for (int j = 0; j < this->burst_size_; ++j)
228 this->supplier_->event_type (j, event[0]);
230 ACE_hrtime_t now = ACE_OS::gethrtime ();
231 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
232 now);
233 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
235 this->supplier_->send_event (event);
237 catch (const CORBA::SystemException& sys_ex)
239 sys_ex._tao_print_exception ("SYS_EX");
241 catch (const CORBA::Exception& ex)
243 ex._tao_print_exception ("SYS_EX");
246 ACE_OS::sleep (tv);
251 // Send one event shutdown from each supplier
252 event[0].header.type = this->shutdown_event_type_;
253 ACE_hrtime_t now = ACE_OS::gethrtime ();
254 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
255 now);
256 this->supplier_->send_event (event);
258 catch (const CORBA::SystemException& sys_ex)
260 sys_ex._tao_print_exception ("SYS_EX");
262 catch (const CORBA::Exception& ex)
264 ex._tao_print_exception ("SYS_EX");
267 ACE_DEBUG ((LM_DEBUG,
268 "Supplier task finished\n"));
269 return 0;