5 #include "ace/High_Res_Timer.h"
7 #include "ace/Throughput_Stats.h"
8 #include "ace/OS_NS_stdio.h"
9 #include "ace/OS_NS_unistd.h"
11 TAO_Notify_ThreadPool_Consumer::TAO_Notify_ThreadPool_Consumer (TAO_Notify_ORB_Objects
& orb_objects
)
12 : orb_objects_ (orb_objects
)
13 , proxy_supplier_thread_count_ (0)
15 , events_received_count_ (0)
21 TAO_Notify_ThreadPool_Consumer::~TAO_Notify_ThreadPool_Consumer ()
26 TAO_Notify_ThreadPool_Consumer::init (PortableServer::POA_var
& poa
, CosNotifyChannelAdmin::ConsumerAdmin_var
& admin
,
27 int proxy_supplier_thread_count
, int max_events
, long delay
)
29 this->default_POA_
= poa
;
31 this->proxy_supplier_thread_count_
= proxy_supplier_thread_count
;
32 this->max_events_
= max_events
;
33 this->delay_
= ACE_Time_Value (delay
, 0);
35 ACE_DEBUG ((LM_DEBUG
, "(%P, %t)Consumer Delay = %d, param = %d\n", delay_
.sec (), delay
));
40 PortableServer::POA_ptr
41 TAO_Notify_ThreadPool_Consumer::_default_POA ()
43 return PortableServer::POA::_duplicate (this->default_POA_
.in ());
47 TAO_Notify_ThreadPool_Consumer::run ()
53 TAO_Notify_ThreadPool_Consumer::connect ()
55 // Activate the consumer with the default_POA_
56 CosNotifyComm::StructuredPushConsumer_var objref
= this->_this ();
58 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
;
60 if (this->proxy_supplier_thread_count_
!= 0)
62 // Narrow to the extended interface.
63 NotifyExt::ConsumerAdmin_var admin_ext
= NotifyExt::ConsumerAdmin::_narrow (this->admin_
.in ());
65 NotifyExt::ThreadPoolParams tp_params
= { NotifyExt::CLIENT_PROPAGATED
, 0,
66 0, static_cast<CORBA::ULong
> (this->proxy_supplier_thread_count_
),
69 CosNotification::QoSProperties
qos (1);
71 qos
[0].name
= CORBA::string_dup (NotifyExt::ThreadPool
);
72 qos
[0].value
<<= tp_params
;
74 // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
75 proxysupplier
= admin_ext
->obtain_notification_push_supplier_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
76 , proxy_supplier_id_
, qos
);
80 proxysupplier
= this->admin_
->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT
81 , proxy_supplier_id_
);
84 ACE_ASSERT (!CORBA::is_nil (proxysupplier
.in ()));
87 this->proxy_supplier_
=
88 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (proxysupplier
.in ());
90 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_
.in ()));
92 this->proxy_supplier_
->connect_structured_push_consumer (objref
.in ());
94 // Call subscription_change to inform the supplier that this consumer is available.
95 CosNotification::EventTypeSeq
added (1);
96 CosNotification::EventTypeSeq removed
;
99 added
[0].domain_name
= CORBA::string_dup ("TEST_DOMAIN");
101 /* We generate a unique Id for the consumer type so that the supplier can distinguish between the consumers.*/
103 ACE_OS::sprintf (type
, "TEST_TYPE_%d", this->proxy_supplier_id_
);
105 added
[0].type_name
= CORBA::string_dup (type
);
107 this->proxy_supplier_
->subscription_change (added
, removed
);
109 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Created Consumer %d with %d threads at the ProxySupplier\n", proxy_supplier_id_
,
110 this->proxy_supplier_thread_count_
));
114 TAO_Notify_ThreadPool_Consumer::disconnect ()
116 this->proxy_supplier_
->disconnect_structured_push_supplier();
120 TAO_Notify_ThreadPool_Consumer::offer_change (const CosNotification::EventTypeSeq
& /*added*/,
121 const CosNotification::EventTypeSeq
& /*removed*/)
127 TAO_Notify_ThreadPool_Consumer::push_structured_event (const CosNotification::StructuredEvent
& /*notification*/)
129 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
131 if (TAO_debug_level
> 0)
132 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Consumer received event %d\n",
133 this->events_received_count_
));
135 // Increment the received count.
136 ++this->events_received_count_
;
138 if (this->events_received_count_
== 1)
140 this->t_first_
= ACE_OS::gethrtime ();
142 else if (this->events_received_count_
== this->max_events_
)
144 this->t_last_
= ACE_OS::gethrtime ();
146 // Disconnect from the EC
149 // Deactivate this object.
152 // We received the event, shutdown the ORB.
153 this->orb_objects_
.orb_
->shutdown (true);
157 ACE_OS::sleep (this->delay_
);
161 TAO_Notify_ThreadPool_Consumer::dump_throughput ()
163 ACE_High_Res_Timer::global_scale_factor_type gsf
=
164 ACE_High_Res_Timer::global_scale_factor ();
166 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Consumer %d\n", proxy_supplier_id_
));
168 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Total"), gsf
,
174 TAO_Notify_ThreadPool_Consumer::deactivate ()
176 PortableServer::POA_var
poa (this->_default_POA ());
178 PortableServer::ObjectId_var
id (poa
->servant_to_id (this));
180 poa
->deactivate_object (id
.in());
184 TAO_Notify_ThreadPool_Consumer::disconnect_structured_push_consumer ()