Revert "Use a variable on the stack to not have a temporary in the call"
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / ThreadPool / Consumer.cpp
blobab02d46ca9ff4083134536669efed6ce7d21afa5
1 #include "Consumer.h"
3 #include "tao/debug.h"
5 #include "ace/High_Res_Timer.h"
6 #include "ace/Stats.h"
7 #include "ace/Throughput_Stats.h"
8 #include "ace/OS_NS_stdio.h"
9 #include "ace/OS_NS_unistd.h"
11 TAO_Notify_ThreadPool_Consumer::TAO_Notify_ThreadPool_Consumer (TAO_Notify_ORB_Objects& orb_objects)
12 : orb_objects_ (orb_objects)
13 , proxy_supplier_thread_count_ (0)
14 , max_events_ (10)
15 , events_received_count_ (0)
16 , t_first_ (0)
17 , t_last_ (0)
21 TAO_Notify_ThreadPool_Consumer::~TAO_Notify_ThreadPool_Consumer ()
25 void
26 TAO_Notify_ThreadPool_Consumer::init (PortableServer::POA_var& poa, CosNotifyChannelAdmin::ConsumerAdmin_var& admin,
27 int proxy_supplier_thread_count, int max_events, long delay)
29 this->default_POA_ = poa;
30 this->admin_ = admin;
31 this->proxy_supplier_thread_count_ = proxy_supplier_thread_count;
32 this->max_events_ = max_events;
33 this->delay_ = ACE_Time_Value (delay, 0);
35 ACE_DEBUG ((LM_DEBUG, "(%P, %t)Consumer Delay = %d, param = %d\n", delay_.sec (), delay));
37 this->connect ();
40 PortableServer::POA_ptr
41 TAO_Notify_ThreadPool_Consumer::_default_POA ()
43 return PortableServer::POA::_duplicate (this->default_POA_.in ());
46 void
47 TAO_Notify_ThreadPool_Consumer::run ()
49 // Nothing to do.
52 void
53 TAO_Notify_ThreadPool_Consumer::connect ()
55 // Activate the consumer with the default_POA_
56 CosNotifyComm::StructuredPushConsumer_var objref = this->_this ();
58 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier;
60 if (this->proxy_supplier_thread_count_ != 0)
62 // Narrow to the extended interface.
63 NotifyExt::ConsumerAdmin_var admin_ext = NotifyExt::ConsumerAdmin::_narrow (this->admin_.in ());
65 NotifyExt::ThreadPoolParams tp_params = { NotifyExt::CLIENT_PROPAGATED, 0,
66 0, static_cast<CORBA::ULong> (this->proxy_supplier_thread_count_),
67 0, 0, 0, 0, 0 };
69 CosNotification::QoSProperties qos (1);
70 qos.length (1);
71 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
72 qos[0].value <<= tp_params;
74 // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
75 proxysupplier = admin_ext->obtain_notification_push_supplier_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
76 , proxy_supplier_id_, qos);
78 else
80 proxysupplier = this->admin_->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT
81 , proxy_supplier_id_);
84 ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ()));
86 // narrow
87 this->proxy_supplier_ =
88 CosNotifyChannelAdmin::StructuredProxyPushSupplier::_narrow (proxysupplier.in ());
90 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ()));
92 this->proxy_supplier_->connect_structured_push_consumer (objref.in ());
94 // Call subscription_change to inform the supplier that this consumer is available.
95 CosNotification::EventTypeSeq added (1);
96 CosNotification::EventTypeSeq removed;
98 added.length (1);
99 added[0].domain_name = CORBA::string_dup ("TEST_DOMAIN");
101 /* We generate a unique Id for the consumer type so that the supplier can distinguish between the consumers.*/
102 char type[BUFSIZ];
103 ACE_OS::sprintf (type, "TEST_TYPE_%d", this->proxy_supplier_id_);
105 added[0].type_name = CORBA::string_dup (type);
107 this->proxy_supplier_->subscription_change (added, removed);
109 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Created Consumer %d with %d threads at the ProxySupplier\n", proxy_supplier_id_,
110 this->proxy_supplier_thread_count_));
113 void
114 TAO_Notify_ThreadPool_Consumer::disconnect ()
116 this->proxy_supplier_->disconnect_structured_push_supplier();
119 void
120 TAO_Notify_ThreadPool_Consumer::offer_change (const CosNotification::EventTypeSeq & /*added*/,
121 const CosNotification::EventTypeSeq & /*removed*/)
123 // No-Op.
126 void
127 TAO_Notify_ThreadPool_Consumer::push_structured_event (const CosNotification::StructuredEvent & /*notification*/)
129 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->lock_);
131 if (TAO_debug_level > 0)
132 ACE_DEBUG ((LM_DEBUG, "(%P, %t) Consumer received event %d\n",
133 this->events_received_count_));
135 // Increment the received count.
136 ++this->events_received_count_;
138 if (this->events_received_count_ == 1)
140 this->t_first_ = ACE_OS::gethrtime ();
142 else if (this->events_received_count_ == this->max_events_)
144 this->t_last_ = ACE_OS::gethrtime ();
146 // Disconnect from the EC
147 this->disconnect ();
149 // Deactivate this object.
150 this->deactivate ();
152 // We received the event, shutdown the ORB.
153 this->orb_objects_.orb_->shutdown (true);
156 // Eat CPU:
157 ACE_OS::sleep (this->delay_);
160 void
161 TAO_Notify_ThreadPool_Consumer::dump_throughput ()
163 ACE_High_Res_Timer::global_scale_factor_type gsf =
164 ACE_High_Res_Timer::global_scale_factor ();
166 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Consumer %d\n", proxy_supplier_id_));
168 ACE_Throughput_Stats::dump_throughput (ACE_TEXT("Total"), gsf,
169 t_last_ - t_first_,
170 this->max_events_);
173 void
174 TAO_Notify_ThreadPool_Consumer::deactivate ()
176 PortableServer::POA_var poa (this->_default_POA ());
178 PortableServer::ObjectId_var id (poa->servant_to_id (this));
180 poa->deactivate_object (id.in());
183 void
184 TAO_Notify_ThreadPool_Consumer::disconnect_structured_push_consumer ()
186 this->deactivate ();