2 #include "orbsvcs/Time_Utilities.h"
3 #include "orbsvcs/Event_Utilities.h"
6 #include "ace/OS_NS_unistd.h"
8 EC_Supplier::EC_Supplier (EC_Driver
*driver
,
17 shutdown_event_type_ (0),
23 EC_Supplier::send_event (int event_number
)
25 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
28 // Create the event...
30 RtecEventComm::EventSet
event (1);
33 event
[0].header
.ttl
= 1;
35 ACE_hrtime_t t
= ACE_OS::gethrtime ();
36 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
, t
);
38 // We use replace to minimize the copies, this should result
39 // in just one memory allocation:
40 event
[0].data
.payload
.length (this->payload_size_
);
42 this->event_type (event_number
, event
[0]);
44 this->send_event (event
);
48 EC_Supplier::send_event (const RtecEventComm::EventSet
& event
)
50 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
52 if (this->push_count_
== 0)
53 this->throughput_start_
= ACE_OS::gethrtime ();
55 this->push_count_
+= event
.length ();
57 if (TAO_debug_level
> 0
58 && this->push_count_
% 100 == 0)
61 "EC_Consumer (%P|%t): %d events received\n",
65 ACE_hrtime_t start
= ACE_OS::gethrtime ();
67 this->consumer_proxy_
->push (event
);
69 ACE_hrtime_t end
= ACE_OS::gethrtime ();
70 this->throughput_
.sample (end
- this->throughput_start_
,
75 EC_Supplier::event_type (int event_number
,
76 RtecEventComm::Event
&event
)
78 CORBA::ULong l
= this->qos_
.publications
.length ();
82 event
.header
.source
= 0;
83 event
.header
.type
= this->shutdown_event_type_
;
87 int i
= event_number
% l
;
88 int type
= this->qos_
.publications
[i
].event
.header
.type
;
89 if (type
== this->shutdown_event_type_
)
92 RtecEventComm::EventHeader
& header
=
93 this->qos_
.publications
[i
].event
.header
;
95 event
.header
.source
= header
.source
;
96 event
.header
.type
= header
.type
;
101 EC_Supplier::connect (RtecEventChannelAdmin::SupplierAdmin_ptr supplier_admin
,
102 const RtecEventChannelAdmin::SupplierQOS
& qos
,
103 int shutdown_event_type
)
105 this->consumer_proxy_
=
106 supplier_admin
->obtain_push_consumer ();
108 this->connect (qos
, shutdown_event_type
);
112 EC_Supplier::connect (const RtecEventChannelAdmin::SupplierQOS
& qos
,
113 int shutdown_event_type
)
115 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
119 this->shutdown_event_type_
= shutdown_event_type
;
121 if (CORBA::is_nil (this->myself_
.in ()))
123 this->myself_
= this->_this ();
125 this->is_active_
= 1;
127 this->consumer_proxy_
->connect_push_supplier (this->myself_
.in (),
132 EC_Supplier::disconnect ()
134 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
137 this->consumer_proxy_
->disconnect_push_consumer ();
139 this->consumer_proxy_
=
140 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
144 EC_Supplier::shutdown ()
146 if (!this->is_active_
)
149 // Deactivate the servant
150 PortableServer::POA_var poa
=
151 this->_default_POA ();
152 PortableServer::ObjectId_var id
=
153 poa
->servant_to_id (this);
154 poa
->deactivate_object (id
.in ());
155 this->is_active_
= 0;
156 this->myself_
= RtecEventComm::PushSupplier::_nil ();
160 EC_Supplier::disconnect_push_supplier ()
162 this->driver_
->supplier_disconnect (this->cookie_
);
163 this->consumer_proxy_
=
164 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
168 EC_Supplier::dump_results (
169 const ACE_TCHAR
* name
,
170 ACE_High_Res_Timer::global_scale_factor_type gsf
)
172 this->throughput_
.dump_results (name
, gsf
);
176 EC_Supplier::accumulate (ACE_Throughput_Stats
& stats
) const
178 stats
.accumulate (this->throughput_
);
181 // ****************************************************************
183 EC_Supplier_Task::EC_Supplier_Task (EC_Supplier
* supplier
,
190 int shutdown_event_type
,
191 ACE_Thread_Manager
* thr_mgr
)
192 : ACE_Task_Base (thr_mgr
),
193 supplier_ (supplier
),
196 burst_count_ (burst_count
),
197 burst_size_ (burst_size
),
198 burst_pause_ (burst_pause
),
199 payload_size_ (payload_size
),
200 shutdown_event_type_ (shutdown_event_type
)
205 EC_Supplier_Task::svc ()
207 // Initialize a time value to pace the test
208 ACE_Time_Value
tv (0, this->burst_pause_
);
210 RtecEventComm::EventSet
event (1);
213 event
[0].header
.ttl
= 1;
215 ACE_hrtime_t t
= ACE_OS::gethrtime ();
216 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
, t
);
218 // We use replace to minimize the copies, this should result
219 // in just one memory allocation;
220 event
[0].data
.payload
.length (this->payload_size_
);
222 for (int i
= 0; i
< this->burst_count_
; ++i
)
224 for (int j
= 0; j
< this->burst_size_
; ++j
)
228 this->supplier_
->event_type (j
, event
[0]);
230 ACE_hrtime_t now
= ACE_OS::gethrtime ();
231 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
,
233 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
235 this->supplier_
->send_event (event
);
237 catch (const CORBA::SystemException
& sys_ex
)
239 sys_ex
._tao_print_exception ("SYS_EX");
241 catch (const CORBA::Exception
& ex
)
243 ex
._tao_print_exception ("SYS_EX");
251 // Send one event shutdown from each supplier
252 event
[0].header
.type
= this->shutdown_event_type_
;
253 ACE_hrtime_t now
= ACE_OS::gethrtime ();
254 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
,
256 this->supplier_
->send_event (event
);
258 catch (const CORBA::SystemException
& sys_ex
)
260 sys_ex
._tao_print_exception ("SYS_EX");
262 catch (const CORBA::Exception
& ex
)
264 ex
._tao_print_exception ("SYS_EX");
267 ACE_DEBUG ((LM_DEBUG
,
268 "Supplier task finished\n"));