Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / Lanes / Supplier.cpp
blob9d5fef0101d0bf60d97aa3e2b3ab6e07917eab44
1 #include "Supplier.h"
5 #include "tao/ORB_Core.h"
7 TAO_Notify_Lanes_Supplier::TAO_Notify_Lanes_Supplier (TAO_Notify_ORB_Objects& orb_objects)
8 : orb_objects_ (orb_objects)
9 , proxy_consumer_id_ (0)
10 , expected_consumer_count_ (2)
11 , consumers_connected_ (lock_)
12 , consumer_count_ (0)
16 TAO_Notify_Lanes_Supplier::~TAO_Notify_Lanes_Supplier ()
20 void
21 TAO_Notify_Lanes_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var& admin, int expected_consumer_count)
23 // First initialize the class members.
24 this->admin_ = admin;
25 this->expected_consumer_count_ = expected_consumer_count;
27 this->connect ();
30 void
31 TAO_Notify_Lanes_Supplier::run (void)
33 // The Priority at which we send the first event to the first consumer.
34 RTCORBA::Priority priority = 1;
37 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
39 ACE_DEBUG ((LM_DEBUG, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_));
41 // Wait till the consumers are ready to go.
42 while (this->consumer_count_ != this->expected_consumer_count_)
43 this->consumers_connected_.wait ();
46 // Send an event each to each consumer.
47 // Each Consumer expects a different priority.
48 for (int i = 0; i < this->expected_consumer_count_; ++i, ++priority)
50 // Set this threads priority.
51 this->orb_objects_.current_->the_priority (priority);
53 // Make sure the priority was set, get the priority of the current thread.
54 RTCORBA::Priority thread_priority =
55 this->orb_objects_.current_->the_priority ();
57 // We will send this event.
58 CosNotification::StructuredEvent event;
60 // Populate the Priority field so that the consumer can deduce the suppliers priority
61 // to do a sanity check when it receives the event.
62 CosNotification::PropertySeq& opt = event.header.variable_header;
63 opt.length (1);
65 CORBA::Any buffer;
66 buffer <<= (CORBA::Short) thread_priority;
68 opt[0].name = CORBA::string_dup (CosNotification::Priority);
69 opt[0].value = buffer;
71 // Set the domain and type nams in the event's fixed header.
72 char type[BUFSIZ];
73 ACE_OS::sprintf (type, "TEST_TYPE_%d", thread_priority);
75 event.header.fixed_header.event_type.domain_name = CORBA::string_dup("TEST_DOMAIN");
76 event.header.fixed_header.event_type.type_name = CORBA::string_dup(type);
78 ACE_DEBUG ((LM_DEBUG,
79 "(%P, %t) Supplier is sending an event of type %s at priority %d\n", type, thread_priority));
81 // send the event
82 this->send_event (event);
83 } // repeat for the next consumer at the next priority.
85 // Disconnect from the EC
86 this->disconnect ();
88 // Deactivate this object.
89 this->deactivate ();
91 // we're done. shutdown the ORB to exit the process.
92 this->orb_objects_.orb_->shutdown (1);
95 void
96 TAO_Notify_Lanes_Supplier::connect (void)
98 // Activate the supplier object.
99 CosNotifyComm::StructuredPushSupplier_var objref = this->_this ();
101 // Obtain the proxy.
102 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer =
103 this->admin_->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
104 , proxy_consumer_id_);
106 ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
108 // narrow
109 this->proxy_consumer_ =
110 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ());
112 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
114 // connect to the proxyconsumer.
115 proxy_consumer_->connect_structured_push_supplier (objref.in ());
118 void
119 TAO_Notify_Lanes_Supplier::disconnect (void)
121 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
123 this->proxy_consumer_->disconnect_structured_push_consumer();
126 void
127 TAO_Notify_Lanes_Supplier::deactivate (void)
129 PortableServer::POA_var poa (this->_default_POA ());
131 PortableServer::ObjectId_var id (poa->servant_to_id (this));
133 poa->deactivate_object (id.in());
136 void
137 TAO_Notify_Lanes_Supplier::subscription_change (const CosNotification::EventTypeSeq & added,
138 const CosNotification::EventTypeSeq & /*removed */)
140 ACE_GUARD (TAO_SYNCH_MUTEX, mon, this->lock_);
142 // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
143 if (added.length () > 0)
145 if (++this->consumer_count_ == this->expected_consumer_count_)
146 this->consumers_connected_.signal ();
150 void
151 TAO_Notify_Lanes_Supplier::send_event (const CosNotification::StructuredEvent& event)
153 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
155 proxy_consumer_->push_structured_event (event);
158 void
159 TAO_Notify_Lanes_Supplier::disconnect_structured_push_supplier (void)
161 this->deactivate ();