1 #include "ECT_Supplier.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Time_Utilities.h"
7 #include "tao/Timeprobe.h"
10 #include "ace/Get_Opt.h"
11 #include "ace/Auto_Ptr.h"
12 #include "ace/Sched_Params.h"
13 #include "ace/High_Res_Timer.h"
15 #include "ace/OS_NS_unistd.h"
17 Test_Supplier::Test_Supplier (ECT_Driver
*driver
)
24 type_start_ (ACE_ES_EVENT_UNDEFINED
),
30 Test_Supplier::connect (RtecScheduler::Scheduler_ptr scheduler
,
38 RtecEventChannelAdmin::EventChannel_ptr ec
)
40 this->burst_count_
= burst_count
;
41 this->burst_size_
= burst_size
;
42 this->event_size_
= event_size
;
43 this->burst_pause_
= burst_pause
;
44 this->type_start_
= type_start
;
45 this->type_count_
= type_count
;
47 RtecScheduler::handle_t rt_info
=
48 scheduler
->create (name
);
50 ACE_Time_Value
tv (0, burst_pause
);
51 RtecScheduler::Period_t rate
= tv
.usec () * 10;
53 // The execution times are set to reasonable values, but
54 // actually they are changed on the real execution, i.e. we
55 // lie to the scheduler to obtain right priorities; but we
56 // don't care if the set is schedulable.
59 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
60 scheduler
->set (rt_info
,
61 RtecScheduler::VERY_HIGH_CRITICALITY
,
64 RtecScheduler::VERY_LOW_IMPORTANCE
,
67 RtecScheduler::OPERATION
);
69 this->supplier_id_
= ACE::crc32 (name
);
70 ACE_DEBUG ((LM_DEBUG
, "ID for <%s> is %04.4x\n", name
,
73 ACE_SupplierQOS_Factory qos
;
74 for (int i
= 0; i
!= type_count
; ++i
)
76 qos
.insert (this->supplier_id_
,
80 qos
.insert (this->supplier_id_
,
81 ACE_ES_EVENT_SHUTDOWN
,
84 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
87 this->consumer_proxy_
=
88 supplier_admin
->obtain_push_consumer ();
90 RtecEventComm::PushSupplier_var objref
=
91 this->supplier_
._this ();
93 this->consumer_proxy_
->connect_push_supplier (objref
.in (),
94 qos
.get_SupplierQOS ());
98 Test_Supplier::disconnect (void)
100 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
105 this->consumer_proxy_
->disconnect_push_consumer ();
107 catch (const CORBA::Exception
&)
109 // The consumer may be gone already, so we
110 // will ignore this exception
113 this->consumer_proxy_
=
114 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
116 // Deactivate the servant
117 PortableServer::POA_var poa
=
118 this->supplier_
._default_POA ();
119 PortableServer::ObjectId_var id
=
120 poa
->servant_to_id (&this->supplier_
);
121 poa
->deactivate_object (id
.in ());
125 Test_Supplier::svc ()
129 // Initialize a time value to pace the test
130 ACE_Time_Value
tv (0, this->burst_pause_
);
132 // Pre-allocate a message to send
133 ACE_Message_Block
mb (this->event_size_
);
134 mb
.wr_ptr (this->event_size_
);
136 RtecEventComm::EventSet
event (1);
138 event
[0].header
.source
= this->supplier_id ();
139 event
[0].header
.ttl
= 1;
141 ACE_hrtime_t t
= ACE_OS::gethrtime ();
142 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
, t
);
144 // We use replace to minimize the copies, this should result
145 // in just one memory allocation;
146 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
147 event
[0].data
.payload
.replace (this->event_size_
,
150 // If the replace method is not available, we will need
151 // to do the copy manually. First, set the octet sequence length.
152 event
[0].data
.payload
.length (this->event_size_
);
154 // Now copy over each byte.
155 char* base
= mb
.data_block ()->base ();
156 for(CORBA::ULong i
= 0; i
< (CORBA::ULong
)this->event_size_
; i
++)
158 event
[0].data
.payload
[i
] = base
[i
];
160 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
162 ACE_hrtime_t test_start
= ACE_OS::gethrtime ();
164 for (int i
= 0; i
< this->burst_count_
; ++i
)
166 for (int j
= 0; j
< this->burst_size_
; ++j
)
168 event
[0].header
.type
=
169 this->type_start_
+ j
% this->type_count_
;
171 ACE_hrtime_t request_start
= ACE_OS::gethrtime ();
172 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
,
174 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
175 this->consumer_proxy ()->push (event
);
177 ACE_hrtime_t end
= ACE_OS::gethrtime ();
178 this->throughput_
.sample (end
- test_start
,
179 end
- request_start
);
182 if (TAO_debug_level
> 0
185 ACE_DEBUG ((LM_DEBUG
,
186 "ECT_Supplier (%P|%t): %d bursts sent\n",
193 // Send one event shutdown from each supplier
194 event
[0].header
.type
= ACE_ES_EVENT_SHUTDOWN
;
195 ACE_hrtime_t request_start
= ACE_OS::gethrtime ();
196 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
,
198 this->consumer_proxy ()->push(event
);
199 ACE_hrtime_t end
= ACE_OS::gethrtime ();
200 this->throughput_
.sample (end
- test_start
,
201 end
- request_start
);
203 catch (const CORBA::SystemException
& sys_ex
)
205 sys_ex
._tao_print_exception ("SYS_EX");
207 catch (const CORBA::Exception
& ex
)
209 ex
._tao_print_exception ("NON SYS EX");
212 ACE_DEBUG ((LM_DEBUG
,
213 "Supplier %4.4x completed\n",
214 this->supplier_id_
));
219 Test_Supplier::disconnect_push_supplier (void)
223 int Test_Supplier::supplier_id (void) const
225 return this->supplier_id_
;
228 RtecEventChannelAdmin::ProxyPushConsumer_ptr
229 Test_Supplier::consumer_proxy (void)
231 return this->consumer_proxy_
.in ();
235 Test_Supplier::dump_results (const ACE_TCHAR
* name
,
236 ACE_Basic_Stats::scale_factor_type gsf
)
238 this->throughput_
.dump_results (name
, gsf
);
242 Test_Supplier::accumulate (ACE_Throughput_Stats
& stats
) const
244 stats
.accumulate (this->throughput_
);