2 #include "orbsvcs/Event_Service_Constants.h"
3 #include "orbsvcs/Time_Utilities.h"
7 EC_Consumer::EC_Consumer (EC_Driver
*driver
,
12 shutdown_event_type_ (ACE_ES_EVENT_SHUTDOWN
),
18 EC_Consumer::connect (
19 RtecEventChannelAdmin::ConsumerAdmin_ptr consumer_admin
,
20 const RtecEventChannelAdmin::ConsumerQOS
& qos
,
21 int shutdown_event_type
)
23 this->supplier_proxy_
=
24 consumer_admin
->obtain_push_supplier ();
26 this->connect (qos
, shutdown_event_type
);
30 EC_Consumer::connect (
31 const RtecEventChannelAdmin::ConsumerQOS
& qos
,
32 int shutdown_event_type
)
34 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
37 this->shutdown_event_type_
= shutdown_event_type
;
39 if (CORBA::is_nil (this->myself_
.in ()))
41 this->myself_
= this->_this ();
45 this->supplier_proxy_
->connect_push_consumer (this->myself_
.in (),
50 EC_Consumer::connected () const
52 return !CORBA::is_nil (this->supplier_proxy_
.in ());
56 EC_Consumer::disconnect ()
58 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
61 this->supplier_proxy_
->disconnect_push_supplier ();
63 this->supplier_proxy_
=
64 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
68 EC_Consumer::shutdown ()
70 if (!this->is_active_
)
73 // Deactivate the servant
74 PortableServer::POA_var poa
=
75 this->_default_POA ();
76 PortableServer::ObjectId_var id
=
77 poa
->servant_to_id (this);
78 poa
->deactivate_object (id
.in ());
79 this->myself_
= RtecEventComm::PushConsumer::_nil ();
84 EC_Consumer::dump_results (const ACE_TCHAR
* name
,
85 ACE_High_Res_Timer::global_scale_factor_type gsf
)
87 this->throughput_
.dump_results (name
, gsf
);
91 EC_Consumer::accumulate (ACE_Throughput_Stats
& throughput
) const
93 throughput
.accumulate (this->throughput_
);
97 EC_Consumer::push (const RtecEventComm::EventSet
& events
)
99 this->driver_
->consumer_push (this->cookie_
, events
);
101 if (events
.length () == 0)
103 ACE_DEBUG ((LM_DEBUG
,
104 "EC_Consumer (%P|%t) no events\n"));
108 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
109 if (this->push_count_
== 0)
110 this->throughput_start_
= ACE_OS::gethrtime ();
112 this->push_count_
+= events
.length ();
114 if (TAO_debug_level
> 0
115 && this->push_count_
% 100 == 0)
117 ACE_DEBUG ((LM_DEBUG
,
118 "EC_Consumer (%P|%t): %d events received\n",
122 for (u_int i
= 0; i
< events
.length (); ++i
)
124 const RtecEventComm::Event
& e
= events
[i
];
126 ACE_hrtime_t creation
;
127 ORBSVCS_Time::TimeT_to_hrtime (creation
,
128 e
.header
.creation_time
);
130 const ACE_hrtime_t now
= ACE_OS::gethrtime ();
131 this->throughput_
.sample (now
- this->throughput_start_
,
134 if (e
.header
.type
== this->shutdown_event_type_
)
135 this->driver_
->consumer_shutdown (this->cookie_
);
140 EC_Consumer::disconnect_push_consumer ()
142 this->driver_
->consumer_disconnect (this->cookie_
);
143 this->supplier_proxy_
=
144 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();