Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Throughput / ECT_Consumer.cpp
blobb81e9af1c9264876442732a8f9077cce19f094ef
1 #include "ECT_Consumer.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Time_Utilities.h"
7 #include "tao/Timeprobe.h"
8 #include "tao/debug.h"
10 #include "ace/Get_Opt.h"
11 #include "ace/Auto_Ptr.h"
12 #include "ace/Sched_Params.h"
13 #include "ace/OS_NS_unistd.h"
15 Test_Consumer::Test_Consumer (ECT_Driver *driver,
16 void *cookie,
17 int n_suppliers,
18 int stall_length)
19 : driver_ (driver),
20 cookie_ (cookie),
21 n_suppliers_ (n_suppliers),
22 recv_count_ (0),
23 shutdown_count_ (0),
24 stall_length_(stall_length)
28 void
29 Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler,
30 const char* name,
31 int type_start,
32 int type_count,
33 RtecEventChannelAdmin::EventChannel_ptr ec)
35 RtecScheduler::handle_t rt_info =
36 scheduler->create (name);
38 // The worst case execution time is far less than 2
39 // milliseconds, but that is a safe estimate....
40 ACE_Time_Value tv (0, 2000);
41 TimeBase::TimeT time;
42 ORBSVCS_Time::Time_Value_to_TimeT (time, tv);
43 scheduler->set (rt_info,
44 RtecScheduler::VERY_HIGH_CRITICALITY,
45 time, time, time,
47 RtecScheduler::VERY_LOW_IMPORTANCE,
48 time,
50 RtecScheduler::OPERATION);
52 ACE_ConsumerQOS_Factory qos;
53 qos.start_disjunction_group ();
54 qos.insert_type (ACE_ES_EVENT_SHUTDOWN, rt_info);
55 for (int i = 0; i != type_count; ++i)
57 qos.insert_type (type_start + i, rt_info);
60 // = Connect as a consumer.
61 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
62 ec->for_consumers ();
64 this->supplier_proxy_ =
65 consumer_admin->obtain_push_supplier ();
67 RtecEventComm::PushConsumer_var objref = this->_this ();
69 this->supplier_proxy_->connect_push_consumer (objref.in (),
70 qos.get_ConsumerQOS ());
73 void
74 Test_Consumer::disconnect (void)
76 if (CORBA::is_nil (this->supplier_proxy_.in ()))
77 return;
79 this->supplier_proxy_->disconnect_push_supplier ();
81 this->supplier_proxy_ =
82 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
84 // Deactivate the servant
85 PortableServer::POA_var poa =
86 this->_default_POA ();
87 PortableServer::ObjectId_var id =
88 poa->servant_to_id (this);
89 poa->deactivate_object (id.in ());
92 void
93 Test_Consumer::dump_results (const ACE_TCHAR* name,
94 ACE_Basic_Stats::scale_factor_type gsf)
96 this->throughput_.dump_results (name, gsf);
99 void
100 Test_Consumer::accumulate (ACE_Throughput_Stats& stats) const
102 stats.accumulate (this->throughput_);
105 void
106 Test_Consumer::push (const RtecEventComm::EventSet& events)
108 if (events.length () == 0)
110 ACE_DEBUG ((LM_DEBUG,
111 "ECT_Consumer (%P|%t) no events\n"));
112 return;
115 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
117 // We start the timer as soon as we receive the first event...
118 if (this->recv_count_ == 0)
120 this->first_event_ = ACE_OS::gethrtime ();
121 ACE_DEBUG ((LM_DEBUG,
122 "ECT_Consumer (%P|%t) stalling for %d seconds\n", this->stall_length_));
123 ACE_OS::sleep(this->stall_length_);
124 ACE_DEBUG ((LM_DEBUG, "ECT_Consumer (%P|%t) finished stalling\n"));
128 this->recv_count_ += events.length ();
130 if (TAO_debug_level > 0
131 && this->recv_count_ % 100 == 0)
133 ACE_DEBUG ((LM_DEBUG,
134 "ECT_Consumer (%P|%t): %d events received\n",
135 this->recv_count_));
138 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
140 for (u_int i = 0; i < events.length (); ++i)
142 const RtecEventComm::Event& e = events[i];
144 if (e.header.type == ACE_ES_EVENT_SHUTDOWN)
146 this->shutdown_count_++;
147 if (this->shutdown_count_ >= this->n_suppliers_)
149 // We stop the timer as soon as we realize it is time to
150 // do so.
151 this->driver_->shutdown_consumer (this->cookie_);
154 else
156 ACE_hrtime_t creation;
157 ORBSVCS_Time::TimeT_to_hrtime (creation,
158 e.header.creation_time);
160 const ACE_hrtime_t now = ACE_OS::gethrtime ();
161 this->throughput_.sample (now - this->first_event_,
162 now - creation);
167 void
168 Test_Consumer::disconnect_push_consumer (void)