Merge pull request #1551 from DOCGroup/plm_jira_333
[ACE_TAO.git] / TAO / orbsvcs / examples / Notify / Subscribe / Subscribe.cpp
bloba6f96cbe249a7c59abc3d339fff45bc19a8b384a
1 /* -*- C++ -*- */
2 #include "Subscribe.h"
4 #define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
5 #define NAMING_SERVICE_NAME "NameService"
7 #define DOMAIN_A "domain_a"
8 #define DOMAIN_B "domain_b"
9 #define DOMAIN_C "domain_c"
11 #define TYPE_A "type_a"
12 #define TYPE_B "type_b"
13 #define TYPE_C "type_c"
15 #define EVENT_COUNT 4 // number of events we expect the consumer to get from the EC
17 ACE_Atomic_Op <TAO_SYNCH_MUTEX, int> g_result_count = 0; // we wait for 4 events.
19 Subscribe::Subscribe (void)
20 : done_ (0)
22 // No-Op.
23 ifgop_ = CosNotifyChannelAdmin::OR_OP;
26 Subscribe::~Subscribe ()
28 this->ec_->destroy ();
31 void
32 Subscribe::init (int argc, ACE_TCHAR *argv [])
34 init_ORB (argc, argv);
35 resolve_naming_service ();
36 resolve_Notify_factory ();
37 create_EC ();
38 create_supplieradmin ();
39 create_consumeradmin ();
40 create_consumers ();
41 create_suppliers ();
44 void
45 Subscribe::run (void)
47 this->send_events ();
49 if (g_result_count != EVENT_COUNT) // if we still need to wait for events, run the orb.
50 { // if we still need to wait for events, run the orb.
51 while (!this->done_)
52 if (this->orb_->work_pending ())
53 this->orb_->perform_work ();
57 void
58 Subscribe::done (void)
60 this->done_ = 1;
63 void
64 Subscribe::init_ORB (int argc, ACE_TCHAR *argv [])
66 this->orb_ = CORBA::ORB_init (argc, argv);
68 CORBA::Object_ptr poa_object =
69 this->orb_->resolve_initial_references("RootPOA");
71 if (CORBA::is_nil (poa_object))
73 ACE_ERROR ((LM_ERROR,
74 " (%P|%t) Unable to initialize the POA.\n"));
75 return;
77 this->root_poa_ =
78 PortableServer::POA::_narrow (poa_object);
80 PortableServer::POAManager_var poa_manager =
81 root_poa_->the_POAManager ();
83 poa_manager->activate ();
86 void
87 Subscribe::resolve_naming_service (void)
89 CORBA::Object_var naming_obj =
90 this->orb_->resolve_initial_references (NAMING_SERVICE_NAME);
92 // Need to check return value for errors.
93 if (CORBA::is_nil (naming_obj.in ()))
94 throw CORBA::UNKNOWN ();
96 this->naming_context_ =
97 CosNaming::NamingContext::_narrow (naming_obj.in ());
100 void
101 Subscribe::resolve_Notify_factory (void)
103 CosNaming::Name name (1);
104 name.length (1);
105 name[0].id = CORBA::string_dup (NOTIFY_FACTORY_NAME);
107 CORBA::Object_var obj =
108 this->naming_context_->resolve (name);
110 this->notify_factory_ =
111 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj.in ());
114 void
115 Subscribe::create_EC (void)
117 CosNotifyChannelAdmin::ChannelID id;
119 ec_ = notify_factory_->create_channel (initial_qos_,
120 initial_admin_,
121 id);
123 ACE_ASSERT (!CORBA::is_nil (ec_.in ()));
126 void
127 Subscribe::create_supplieradmin (void)
129 CosNotifyChannelAdmin::AdminID adminid;
131 supplier_admin_ =
132 ec_->new_for_suppliers (this->ifgop_, adminid);
134 ACE_ASSERT (!CORBA::is_nil (supplier_admin_.in ()));
137 void
138 Subscribe:: create_consumeradmin (void)
140 CosNotifyChannelAdmin::AdminID adminid;
142 consumer_admin_ =
143 ec_->new_for_consumers (this->ifgop_, adminid);
145 ACE_ASSERT (!CORBA::is_nil (consumer_admin_.in ()));
148 void
149 Subscribe::create_consumers (void)
151 consumer_1_ = new Subscribe_StructuredPushConsumer (this);
152 consumer_1_->connect (this->consumer_admin_.in ());
154 consumer_2_ = new Subscribe_StructuredPushConsumer (this);
155 consumer_2_->connect (this->consumer_admin_.in ());
158 void
159 Subscribe::create_suppliers (void)
161 supplier_1_ = new Subscribe_StructuredPushSupplier ();
162 supplier_1_->connect (this->supplier_admin_.in ());
164 supplier_2_ = new Subscribe_StructuredPushSupplier ();
165 supplier_2_->connect (this->supplier_admin_.in ());
168 void
169 Subscribe::send_events (void)
171 // Setup the CA to receive event_type : "domain_A", "Type_a"
172 CosNotification::EventTypeSeq added(1);
173 CosNotification::EventTypeSeq removed (0);
174 added.length (1);
176 added[0].domain_name = CORBA::string_dup (DOMAIN_A);
177 added[0].type_name = CORBA::string_dup (TYPE_A);
179 this->consumer_admin_->subscription_change (added, removed);
181 // Setup the Consumer 1 to receive event_type : "domain_B", "Type_b"
182 CosNotification::EventTypeSeq added_1(1);
183 CosNotification::EventTypeSeq removed_1 (0);
184 added_1.length (1);
186 added_1[0].domain_name = CORBA::string_dup (DOMAIN_B);
187 added_1[0].type_name = CORBA::string_dup (TYPE_B);
188 removed_1.length (0);
190 this->consumer_1_->get_proxy_supplier ()->subscription_change (added_1, removed_1);
191 // now the expected subscription for consumer 1 should be
192 // A and B
193 this->consumer_1_->expected_subscription_.length (2);
194 this->consumer_1_->expected_subscription_[0] = added[0];
195 this->consumer_1_->expected_subscription_[1] = added_1[0];
197 // Setup the Consumer 2 to receive event_type : "domain_C", "Type_c"
198 CosNotification::EventTypeSeq added_2(1);
199 CosNotification::EventTypeSeq removed_2 (0);
200 added_2.length (1);
202 added_2[0].domain_name = CORBA::string_dup (DOMAIN_C);
203 added_2[0].type_name = CORBA::string_dup (TYPE_C);
204 removed_2.length (0);
206 this->consumer_2_->get_proxy_supplier ()->subscription_change (added_2, removed_2);
207 // now the expected subscription for consumer 2 should be
208 // A and C
209 this->consumer_2_->expected_subscription_.length (2);
210 this->consumer_2_->expected_subscription_[0] = added[0];
211 this->consumer_2_->expected_subscription_[1] = added_2[0];
213 // Create the events - one of each type
214 // Event 1
215 CosNotification::StructuredEvent event1;
216 event1.header.fixed_header.event_type.domain_name =
217 CORBA::string_dup(DOMAIN_A);
218 event1.header.fixed_header.event_type.type_name =
219 CORBA::string_dup(TYPE_A);
220 event1.header.fixed_header.event_name = CORBA::string_dup("");
221 event1.header.variable_header.length (0); // put nothing here
222 event1.filterable_data.length (0);
223 event1.remainder_of_body <<= (CORBA::Long)10;
225 // Event 2
226 CosNotification::StructuredEvent event2;
227 event2.header.fixed_header.event_type.domain_name =
228 CORBA::string_dup(DOMAIN_B);
229 event2.header.fixed_header.event_type.type_name =
230 CORBA::string_dup(TYPE_B);
231 event2.header.fixed_header.event_name = CORBA::string_dup("");
232 event2.header.variable_header.length (0); // put nothing here
233 event2.filterable_data.length (0);
234 event2.remainder_of_body <<= (CORBA::Long)10;
236 // event 3
237 CosNotification::StructuredEvent event3;
238 event3.header.fixed_header.event_type.domain_name =
239 CORBA::string_dup(DOMAIN_C);
240 event3.header.fixed_header.event_type.type_name =
241 CORBA::string_dup(TYPE_C);
242 event3.header.fixed_header.event_name = CORBA::string_dup("");
243 event3.header.variable_header.length (0); // put nothing here
244 event3.filterable_data.length (0);
245 event3.remainder_of_body <<= (CORBA::Long)10;
247 // let supplier 1 send all these events
248 for (int i = 0; i < 1; ++i)
250 supplier_1_->send_event (event1);
251 supplier_1_->send_event (event2);
252 supplier_1_->send_event (event3);
256 /*****************************************************************/
257 Subscribe_StructuredPushConsumer::Subscribe_StructuredPushConsumer (Subscribe* subscribe)
258 : expected_subscription_ (2),
259 subscribe_ (subscribe)
263 Subscribe_StructuredPushConsumer::~Subscribe_StructuredPushConsumer ()
267 void
268 Subscribe_StructuredPushConsumer::connect
269 (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin)
271 // Activate the consumer with the default_POA_
272 CosNotifyComm::StructuredPushConsumer_var objref =
273 this->_this ();
275 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier =
276 consumer_admin->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_supplier_id_);
278 ACE_ASSERT (!CORBA::is_nil (proxysupplier.in ()));
280 // narrow
281 this->proxy_supplier_ =
282 CosNotifyChannelAdmin::StructuredProxyPushSupplier::
283 _narrow (proxysupplier.in ());
285 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_.in ()));
287 proxy_supplier_->connect_structured_push_consumer (objref.in ());
290 void
291 Subscribe_StructuredPushConsumer::disconnect (void)
293 this->proxy_supplier_->
294 disconnect_structured_push_supplier();
297 void
298 Subscribe_StructuredPushConsumer::offer_change
299 (const CosNotification::EventTypeSeq & /*added*/,
300 const CosNotification::EventTypeSeq & /*removed*/)
302 // No-Op.
305 void
306 Subscribe_StructuredPushConsumer::push_structured_event
307 (const CosNotification::StructuredEvent & notification)
309 const char* domain_name =
310 notification.header.fixed_header.event_type.domain_name;
312 const char* type_name =
313 notification.header.fixed_header.event_type.type_name;
315 bool found = false;
317 for (CORBA::ULong i = 0; i < expected_subscription_.length (); i++)
319 if ( ACE_OS::strcmp (expected_subscription_[i].domain_name, domain_name) == 0)
320 found = true;
323 if (found)
324 ACE_DEBUG ((LM_DEBUG, "Structured Subscribe Consumer %d received subscribed event, domain = %s, type = %s\n",
325 this->proxy_supplier_id_, domain_name, type_name));
326 else
327 ACE_ERROR ((LM_ERROR, "Error: Structured Subscribe Consumer %d received not subscribed event , domain = %s, type = %s\n",
328 this->proxy_supplier_id_, domain_name, type_name));
330 if (++g_result_count == EVENT_COUNT)
331 subscribe_->done ();
335 void
336 Subscribe_StructuredPushConsumer::disconnect_structured_push_consumer
337 (void)
339 // No-Op.
342 CosNotifyChannelAdmin::StructuredProxyPushSupplier_ptr
343 Subscribe_StructuredPushConsumer::get_proxy_supplier (void)
345 return proxy_supplier_.in ();
348 /*****************************************************************/
350 Subscribe_StructuredPushSupplier::Subscribe_StructuredPushSupplier (void)
354 Subscribe_StructuredPushSupplier::~Subscribe_StructuredPushSupplier ()
358 void
359 Subscribe_StructuredPushSupplier::connect
360 (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin)
362 CosNotifyComm::StructuredPushSupplier_var objref =
363 this->_this ();
365 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer =
366 supplier_admin->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT, proxy_consumer_id_);
368 ACE_ASSERT (!CORBA::is_nil (proxyconsumer.in ()));
370 // narrow
371 this->proxy_consumer_ =
372 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer.in ());
374 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_.in ()));
376 proxy_consumer_->connect_structured_push_supplier (objref.in ());
379 void
380 Subscribe_StructuredPushSupplier::disconnect (void)
382 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
384 this->proxy_consumer_->disconnect_structured_push_consumer();
387 void
388 Subscribe_StructuredPushSupplier::subscription_change
389 (const CosNotification::EventTypeSeq & /*added*/,
390 const CosNotification::EventTypeSeq & /*removed */)
392 //No-Op.
395 void
396 Subscribe_StructuredPushSupplier::send_event
397 (const CosNotification::StructuredEvent& event)
399 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_.in ()));
401 proxy_consumer_->push_structured_event (event);
404 void
405 Subscribe_StructuredPushSupplier::disconnect_structured_push_supplier
406 (void)
408 // No-Op.