Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / ThreadPool / Supplier.cpp
blobd58430352d45d83494dc42591783882eeb20692f
1 #include "Supplier.h"
5 #include "tao/ORB_Core.h"
7 TAO_Notify_ThreadPool_Supplier::TAO_Notify_ThreadPool_Supplier (TAO_Notify_ORB_Objects& orb_objects)
8 : orb_objects_ (orb_objects)
9 , proxy_consumer_id_ (0)
10 , expected_consumer_count_ (2)
11 , consumers_connected_ (lock_)
12 , consumer_count_ (0)
13 , max_events_ (10)
14 , proxy_consumer_thread_count_ (0)
18 TAO_Notify_ThreadPool_Supplier::~TAO_Notify_ThreadPool_Supplier ()
22 void
23 TAO_Notify_ThreadPool_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var& admin, int expected_consumer_count ,int max_events,
24 int proxy_consumer_thread_count)
26 // First initialize the class members.
27 this->admin_ = admin;
28 this->expected_consumer_count_ = expected_consumer_count;
29 this->max_events_ = max_events;
30 this->proxy_consumer_thread_count_ = proxy_consumer_thread_count;
32 this->connect ();
35 void
36 TAO_Notify_ThreadPool_Supplier::run (void)
39 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
41 ACE_DEBUG ((LM_DEBUG, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_-1));
43 // Wait till the consumers are ready to go.
44 while (this->consumer_count_ != this->expected_consumer_count_)
45 this->consumers_connected_.wait ();
48 ACE_DEBUG ((LM_DEBUG,
49 "(%P, %t) Supplier is sending an events...\n"));
51 // Send events to each consumer.
52 for (int i = 0; i < this->max_events_; ++i)
54 for (int j = 0; j < this->expected_consumer_count_; ++j)
56 // send the event
57 this->send_event (this->event_[j]);
61 // Disconnect from the EC
62 this->disconnect ();
64 // Deactivate this object.
65 this->deactivate ();
67 // we're done. shutdown the ORB to exit the process.
68 this->orb_objects_.orb_->shutdown (1);
71 void
72 TAO_Notify_ThreadPool_Supplier::connect (void)
74 // Activate the supplier object.
75 CosNotifyComm::StructuredPushSupplier_var objref = this->_this ();
77 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer;
79 if (this->proxy_consumer_thread_count_ != 0)
81 // Narrow to the extended interface.
82 NotifyExt::SupplierAdmin_var admin_ext = NotifyExt::SupplierAdmin::_narrow (this->admin_.in ());
84 NotifyExt::ThreadPoolParams tp_params = { NotifyExt::CLIENT_PROPAGATED, 0,
85 0, static_cast<CORBA::ULong> (this->proxy_consumer_thread_count_),
86 0, 0, 0, 0, 0 };
88 CosNotification::QoSProperties qos (1);
89 qos.length (1);
90 qos[0].name = CORBA::string_dup (NotifyExt::ThreadPool);
91 qos[0].value <<= tp_params;
93 // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
94 proxyconsumer = admin_ext->obtain_notification_push_consumer_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
95 , proxy_consumer_id_, qos);
97 else
99 // Obtain the proxy.
100 proxyconsumer = this->admin_->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
101 , proxy_consumer_id_);
104 ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
106 // narrow
107 this->proxy_consumer_ =
108 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ());
110 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
112 // connect to the proxyconsumer.
113 proxy_consumer_->connect_structured_push_supplier (objref.in ());
115 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Created Supplier %d with %d threads at the ProxyConsumer\n", proxy_consumer_id_,
116 this->proxy_consumer_thread_count_));
119 void
120 TAO_Notify_ThreadPool_Supplier::disconnect (void)
122 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
124 this->proxy_consumer_->disconnect_structured_push_consumer();
127 void
128 TAO_Notify_ThreadPool_Supplier::deactivate (void)
130 PortableServer::POA_var poa (this->_default_POA ());
132 PortableServer::ObjectId_var id (poa->servant_to_id (this));
134 poa->deactivate_object (id.in());
137 void
138 TAO_Notify_ThreadPool_Supplier::subscription_change (const CosNotification::EventTypeSeq & added,
139 const CosNotification::EventTypeSeq & /*removed */)
141 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
143 // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
144 // Only 1 consumer connects at a time.
145 if (added.length () > 0)
147 // Set the domain and type nams in the event's fixed header.
148 this->event_[consumer_count_].header.fixed_header.event_type.domain_name = CORBA::string_dup(added[0].domain_name);
149 this->event_[consumer_count_].header.fixed_header.event_type.type_name = CORBA::string_dup(added[0].type_name);
151 ++this->consumer_count_;
153 ACE_DEBUG ((LM_DEBUG, "(%P,%t) Received Type %d: (%s)\n", this->consumer_count_, added[0].type_name.in ()));
155 if (this->consumer_count_ == this->expected_consumer_count_)
156 this->consumers_connected_.signal ();
160 void
161 TAO_Notify_ThreadPool_Supplier::send_event (const CosNotification::StructuredEvent& event)
163 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
165 proxy_consumer_->push_structured_event (event);
168 void
169 TAO_Notify_ThreadPool_Supplier::disconnect_structured_push_supplier (void)
171 this->deactivate ();