Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Throughput / ECT_Supplier.cpp
blobeed3d2b98c127ced93c5a610aff7dc269dfcca0a
1 #include "ECT_Supplier.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Time_Utilities.h"
7 #include "tao/Timeprobe.h"
8 #include "tao/debug.h"
10 #include "ace/Get_Opt.h"
11 #include "ace/Auto_Ptr.h"
12 #include "ace/Sched_Params.h"
13 #include "ace/High_Res_Timer.h"
14 #include "ace/ACE.h"
15 #include "ace/OS_NS_unistd.h"
17 Test_Supplier::Test_Supplier (ECT_Driver *driver)
18 : driver_ (driver),
19 supplier_ (this),
20 burst_count_ (0),
21 burst_size_ (0),
22 event_size_ (0),
23 burst_pause_ (0),
24 type_start_ (ACE_ES_EVENT_UNDEFINED),
25 type_count_ (1)
29 void
30 Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler,
31 const char* name,
32 int burst_count,
33 int burst_size,
34 int event_size,
35 int burst_pause,
36 int type_start,
37 int type_count,
38 RtecEventChannelAdmin::EventChannel_ptr ec)
40 this->burst_count_ = burst_count;
41 this->burst_size_ = burst_size;
42 this->event_size_ = event_size;
43 this->burst_pause_ = burst_pause;
44 this->type_start_ = type_start;
45 this->type_count_ = type_count;
47 RtecScheduler::handle_t rt_info =
48 scheduler->create (name);
50 ACE_Time_Value tv (0, burst_pause);
51 RtecScheduler::Period_t rate = tv.usec () * 10;
53 // The execution times are set to reasonable values, but
54 // actually they are changed on the real execution, i.e. we
55 // lie to the scheduler to obtain right priorities; but we
56 // don't care if the set is schedulable.
57 tv.set (0, 2000);
58 TimeBase::TimeT time;
59 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
60 scheduler->set (rt_info,
61 RtecScheduler::VERY_HIGH_CRITICALITY,
62 time, time, time,
63 rate,
64 RtecScheduler::VERY_LOW_IMPORTANCE,
65 time,
67 RtecScheduler::OPERATION);
69 this->supplier_id_ = ACE::crc32 (name);
70 ACE_DEBUG ((LM_DEBUG, "ID for <%s> is %04.4x\n", name,
71 this->supplier_id_));
73 ACE_SupplierQOS_Factory qos;
74 for (int i = 0; i != type_count; ++i)
76 qos.insert (this->supplier_id_,
77 type_start + i,
78 rt_info, 1);
80 qos.insert (this->supplier_id_,
81 ACE_ES_EVENT_SHUTDOWN,
82 rt_info, 1);
84 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
85 ec->for_suppliers ();
87 this->consumer_proxy_ =
88 supplier_admin->obtain_push_consumer ();
90 RtecEventComm::PushSupplier_var objref =
91 this->supplier_._this ();
93 this->consumer_proxy_->connect_push_supplier (objref.in (),
94 qos.get_SupplierQOS ());
97 void
98 Test_Supplier::disconnect (void)
100 if (CORBA::is_nil (this->consumer_proxy_.in ()))
101 return;
105 this->consumer_proxy_->disconnect_push_consumer ();
107 catch (const CORBA::Exception&)
109 // The consumer may be gone already, so we
110 // will ignore this exception
113 this->consumer_proxy_ =
114 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
116 // Deactivate the servant
117 PortableServer::POA_var poa =
118 this->supplier_._default_POA ();
119 PortableServer::ObjectId_var id =
120 poa->servant_to_id (&this->supplier_);
121 poa->deactivate_object (id.in ());
125 Test_Supplier::svc ()
129 // Initialize a time value to pace the test
130 ACE_Time_Value tv (0, this->burst_pause_);
132 // Pre-allocate a message to send
133 ACE_Message_Block mb (this->event_size_);
134 mb.wr_ptr (this->event_size_);
136 RtecEventComm::EventSet event (1);
137 event.length (1);
138 event[0].header.source = this->supplier_id ();
139 event[0].header.ttl = 1;
141 ACE_hrtime_t t = ACE_OS::gethrtime ();
142 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
144 // We use replace to minimize the copies, this should result
145 // in just one memory allocation;
146 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
147 event[0].data.payload.replace (this->event_size_,
148 &mb);
149 #else
150 // If the replace method is not available, we will need
151 // to do the copy manually. First, set the octet sequence length.
152 event[0].data.payload.length (this->event_size_);
154 // Now copy over each byte.
155 char* base = mb.data_block ()->base ();
156 for(CORBA::ULong i = 0; i < (CORBA::ULong)this->event_size_; i++)
158 event[0].data.payload[i] = base[i];
160 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
162 ACE_hrtime_t test_start = ACE_OS::gethrtime ();
164 for (int i = 0; i < this->burst_count_; ++i)
166 for (int j = 0; j < this->burst_size_; ++j)
168 event[0].header.type =
169 this->type_start_ + j % this->type_count_;
171 ACE_hrtime_t request_start = ACE_OS::gethrtime ();
172 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
173 request_start);
174 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
175 this->consumer_proxy ()->push (event);
177 ACE_hrtime_t end = ACE_OS::gethrtime ();
178 this->throughput_.sample (end - test_start,
179 end - request_start);
182 if (TAO_debug_level > 0
183 && i % 100 == 0)
185 ACE_DEBUG ((LM_DEBUG,
186 "ECT_Supplier (%P|%t): %d bursts sent\n",
187 i));
190 ACE_OS::sleep (tv);
193 // Send one event shutdown from each supplier
194 event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
195 ACE_hrtime_t request_start = ACE_OS::gethrtime ();
196 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time,
197 request_start);
198 this->consumer_proxy ()->push(event);
199 ACE_hrtime_t end = ACE_OS::gethrtime ();
200 this->throughput_.sample (end - test_start,
201 end - request_start);
203 catch (const CORBA::SystemException& sys_ex)
205 sys_ex._tao_print_exception ("SYS_EX");
207 catch (const CORBA::Exception& ex)
209 ex._tao_print_exception ("NON SYS EX");
212 ACE_DEBUG ((LM_DEBUG,
213 "Supplier %4.4x completed\n",
214 this->supplier_id_));
215 return 0;
218 void
219 Test_Supplier::disconnect_push_supplier (void)
223 int Test_Supplier::supplier_id (void) const
225 return this->supplier_id_;
228 RtecEventChannelAdmin::ProxyPushConsumer_ptr
229 Test_Supplier::consumer_proxy (void)
231 return this->consumer_proxy_.in ();
234 void
235 Test_Supplier::dump_results (const ACE_TCHAR* name,
236 ACE_Basic_Stats::scale_factor_type gsf)
238 this->throughput_.dump_results (name, gsf);
241 void
242 Test_Supplier::accumulate (ACE_Throughput_Stats& stats) const
244 stats.accumulate (this->throughput_);