Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / orbsvcs / tests / Event / lib / Consumer.cpp
blob56df011d037b85a70a06fcd7a70588facd6ebf05
1 #include "Consumer.h"
2 #include "orbsvcs/Event_Service_Constants.h"
3 #include "orbsvcs/Time_Utilities.h"
5 #include "tao/debug.h"
7 EC_Consumer::EC_Consumer (EC_Driver *driver,
8 void *cookie)
9 : driver_ (driver),
10 cookie_ (cookie),
11 push_count_ (0),
12 shutdown_event_type_ (ACE_ES_EVENT_SHUTDOWN),
13 is_active_ (0)
17 void
18 EC_Consumer::connect (
19 RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin,
20 const RtecEventChannelAdmin::ConsumerQOS& qos,
21 int shutdown_event_type)
23 this->supplier_proxy_ =
24 consumer_admin->obtain_push_supplier ();
26 this->connect (qos, shutdown_event_type);
29 void
30 EC_Consumer::connect (
31 const RtecEventChannelAdmin::ConsumerQOS& qos,
32 int shutdown_event_type)
34 if (CORBA::is_nil (this->supplier_proxy_.in ()))
35 return; // @@ Throw?
37 this->shutdown_event_type_ = shutdown_event_type;
39 if (CORBA::is_nil (this->myself_.in ()))
41 this->myself_ = this->_this ();
43 this->is_active_ = 1;
45 this->supplier_proxy_->connect_push_consumer (this->myself_.in (),
46 qos);
49 int
50 EC_Consumer::connected () const
52 return !CORBA::is_nil (this->supplier_proxy_.in ());
55 void
56 EC_Consumer::disconnect ()
58 if (CORBA::is_nil (this->supplier_proxy_.in ()))
59 return;
61 this->supplier_proxy_->disconnect_push_supplier ();
63 this->supplier_proxy_ =
64 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
67 void
68 EC_Consumer::shutdown ()
70 if (!this->is_active_)
71 return;
73 // Deactivate the servant
74 PortableServer::POA_var poa =
75 this->_default_POA ();
76 PortableServer::ObjectId_var id =
77 poa->servant_to_id (this);
78 poa->deactivate_object (id.in ());
79 this->myself_ = RtecEventComm::PushConsumer::_nil ();
80 this->is_active_ = 0;
83 void
84 EC_Consumer::dump_results (const ACE_TCHAR* name,
85 ACE_High_Res_Timer::global_scale_factor_type gsf)
87 this->throughput_.dump_results (name, gsf);
90 void
91 EC_Consumer::accumulate (ACE_Throughput_Stats& throughput) const
93 throughput.accumulate (this->throughput_);
96 void
97 EC_Consumer::push (const RtecEventComm::EventSet& events)
99 this->driver_->consumer_push (this->cookie_, events);
101 if (events.length () == 0)
103 ACE_DEBUG ((LM_DEBUG,
104 "EC_Consumer (%P|%t) no events\n"));
105 return;
108 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
109 if (this->push_count_ == 0)
110 this->throughput_start_ = ACE_OS::gethrtime ();
112 this->push_count_ += events.length ();
114 if (TAO_debug_level > 0
115 && this->push_count_ % 100 == 0)
117 ACE_DEBUG ((LM_DEBUG,
118 "EC_Consumer (%P|%t): %d events received\n",
119 this->push_count_));
122 for (u_int i = 0; i < events.length (); ++i)
124 const RtecEventComm::Event& e = events[i];
126 ACE_hrtime_t creation;
127 ORBSVCS_Time::TimeT_to_hrtime (creation,
128 e.header.creation_time);
130 const ACE_hrtime_t now = ACE_OS::gethrtime ();
131 this->throughput_.sample (now - this->throughput_start_,
132 now - creation);
134 if (e.header.type == this->shutdown_event_type_)
135 this->driver_->consumer_shutdown (this->cookie_);
139 void
140 EC_Consumer::disconnect_push_consumer ()
142 this->driver_->consumer_disconnect (this->cookie_);
143 this->supplier_proxy_ =
144 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();