Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / performance-tests / RTEvent / lib / Consumer.cpp
blob3e0a502b12be3426682b86941e39fb02b174bacd
1 /**
2 * @file Consumer.cpp
4 * @author Carlos O'Ryan <coryan@uci.edu>
5 */
7 #include "Consumer.h"
8 #include "Implicit_Deactivator.h"
9 #include "orbsvcs/Event_Service_Constants.h"
10 #include "ace/OS_NS_unistd.h"
12 Consumer::Consumer (CORBA::Long experiment_id,
13 CORBA::Long event_type,
14 CORBA::ULong iterations,
15 CORBA::Long workload_in_usecs,
16 ACE_High_Res_Timer::global_scale_factor_type gsf,
17 PortableServer::POA_ptr poa)
18 : experiment_id_ (experiment_id)
19 , event_type_ (event_type)
20 , sample_history_ (iterations)
21 , workload_in_usecs_ (workload_in_usecs)
22 , gsf_ (gsf)
23 , default_POA_ (PortableServer::POA::_duplicate (poa))
27 void
28 Consumer::connect (RtecEventChannelAdmin::EventChannel_ptr ec)
30 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
31 ec->for_consumers ();
34 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
35 if (!CORBA::is_nil (this->proxy_supplier_.in ()))
36 return;
38 this->proxy_supplier_ =
39 consumer_admin->obtain_push_supplier ();
42 RtecEventComm::PushConsumer_var consumer =
43 this->_this ();
45 RtecEventChannelAdmin::ConsumerQOS consumer_qos;
46 consumer_qos.is_gateway = 0;
47 consumer_qos.dependencies.length (2);
48 RtecEventComm::EventHeader& h0 =
49 consumer_qos.dependencies[0].event.header;
50 h0.type = ACE_ES_DISJUNCTION_DESIGNATOR;
51 h0.source = 1;
53 RtecEventComm::EventHeader& h1 =
54 consumer_qos.dependencies[1].event.header;
55 h1.type = this->event_type_;
56 h1.source = this->experiment_id_;
58 this->proxy_supplier_->connect_push_consumer (consumer.in (),
59 consumer_qos);
62 void
63 Consumer::disconnect (void)
65 RtecEventChannelAdmin::ProxyPushSupplier_var proxy;
67 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
68 if (CORBA::is_nil (this->proxy_supplier_.in ()))
69 return;
70 proxy = this->proxy_supplier_._retn ();
73 Implicit_Deactivator deactivator (this);
75 try{
76 proxy->disconnect_push_supplier ();
77 } catch (const CORBA::Exception&) {
81 ACE_Sample_History &
82 Consumer::sample_history (void)
84 return this->sample_history_;
87 void
88 Consumer::push (const RtecEventComm::EventSet &events)
90 ACE_hrtime_t now = ACE_OS::gethrtime ();
92 ACE_hrtime_t creation;
93 ORBSVCS_Time::TimeT_to_hrtime (creation,
94 events[0].header.creation_time);
96 while (this->workload_in_usecs_ > 0)
98 ACE_hrtime_t elapsed = ACE_OS::gethrtime () - now;
99 if (elapsed > this->gsf_ * this->workload_in_usecs_)
100 break;
101 ACE_OS::sleep (0);
104 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
105 if (sample_history_.max_samples () == sample_history_.sample_count ())
106 return;
107 this->sample_history_.sample (now - creation);
110 void
111 Consumer::disconnect_push_consumer (void)
113 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->mutex_);
114 this->proxy_supplier_ =
115 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
118 PortableServer::POA_ptr
119 Consumer::_default_POA (void)
121 return PortableServer::POA::_duplicate (this->default_POA_.in ());