4 * @author Carlos O'Ryan <coryan@uci.edu>
8 #include "Implicit_Deactivator.h"
9 #include "orbsvcs/Event_Service_Constants.h"
10 #include "ace/OS_NS_unistd.h"
12 Consumer::Consumer (CORBA::Long experiment_id
,
13 CORBA::Long event_type
,
14 CORBA::ULong iterations
,
15 CORBA::Long workload_in_usecs
,
16 ACE_High_Res_Timer::global_scale_factor_type gsf
,
17 PortableServer::POA_ptr poa
)
18 : experiment_id_ (experiment_id
)
19 , event_type_ (event_type
)
20 , sample_history_ (iterations
)
21 , workload_in_usecs_ (workload_in_usecs
)
23 , default_POA_ (PortableServer::POA::_duplicate (poa
))
28 Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec
)
30 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
34 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
35 if (!CORBA::is_nil (this->proxy_supplier_
.in ()))
38 this->proxy_supplier_
=
39 consumer_admin
->obtain_push_supplier ();
42 RtecEventComm::PushConsumer_var consumer
=
45 RtecEventChannelAdmin::ConsumerQOS consumer_qos
;
46 consumer_qos
.is_gateway
= 0;
47 consumer_qos
.dependencies
.length (2);
48 RtecEventComm::EventHeader
& h0
=
49 consumer_qos
.dependencies
[0].event
.header
;
50 h0
.type
= ACE_ES_DISJUNCTION_DESIGNATOR
;
53 RtecEventComm::EventHeader
& h1
=
54 consumer_qos
.dependencies
[1].event
.header
;
55 h1
.type
= this->event_type_
;
56 h1
.source
= this->experiment_id_
;
58 this->proxy_supplier_
->connect_push_consumer (consumer
.in (),
63 Consumer::disconnect ()
65 RtecEventChannelAdmin::ProxyPushSupplier_var proxy
;
67 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
68 if (CORBA::is_nil (this->proxy_supplier_
.in ()))
70 proxy
= this->proxy_supplier_
._retn ();
73 Implicit_Deactivator
deactivator (this);
76 proxy
->disconnect_push_supplier ();
77 } catch (const CORBA::Exception
&) {
82 Consumer::sample_history ()
84 return this->sample_history_
;
88 Consumer::push (const RtecEventComm::EventSet
&events
)
90 ACE_hrtime_t now
= ACE_OS::gethrtime ();
92 ACE_hrtime_t creation
;
93 ORBSVCS_Time::TimeT_to_hrtime (creation
,
94 events
[0].header
.creation_time
);
96 while (this->workload_in_usecs_
> 0)
98 ACE_hrtime_t elapsed
= ACE_OS::gethrtime () - now
;
99 if (elapsed
> this->gsf_
* this->workload_in_usecs_
)
104 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
105 if (sample_history_
.max_samples () == sample_history_
.sample_count ())
107 this->sample_history_
.sample (now
- creation
);
111 Consumer::disconnect_push_consumer ()
113 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->mutex_
);
114 this->proxy_supplier_
=
115 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
118 PortableServer::POA_ptr
119 Consumer::_default_POA ()
121 return PortableServer::POA::_duplicate (this->default_POA_
.in ());