4 #define NOTIFY_FACTORY_NAME "NotifyEventChannelFactory"
5 #define NAMING_SERVICE_NAME "NameService"
7 #define DOMAIN_A "domain_a"
8 #define DOMAIN_B "domain_b"
9 #define DOMAIN_C "domain_c"
11 #define TYPE_A "type_a"
12 #define TYPE_B "type_b"
13 #define TYPE_C "type_c"
15 #define EVENT_COUNT 4 // number of events we expect the consumer to get from the EC
17 ACE_Atomic_Op
<TAO_SYNCH_MUTEX
, int> g_result_count
= 0; // we wait for 4 events.
19 Subscribe::Subscribe ()
23 ifgop_
= CosNotifyChannelAdmin::OR_OP
;
26 Subscribe::~Subscribe ()
28 this->ec_
->destroy ();
32 Subscribe::init (int argc
, ACE_TCHAR
*argv
[])
34 init_ORB (argc
, argv
);
35 resolve_naming_service ();
36 resolve_Notify_factory ();
38 create_supplieradmin ();
39 create_consumeradmin ();
49 if (g_result_count
!= EVENT_COUNT
) // if we still need to wait for events, run the orb.
50 { // if we still need to wait for events, run the orb.
52 if (this->orb_
->work_pending ())
53 this->orb_
->perform_work ();
64 Subscribe::init_ORB (int argc
, ACE_TCHAR
*argv
[])
66 this->orb_
= CORBA::ORB_init (argc
, argv
);
68 CORBA::Object_ptr poa_object
=
69 this->orb_
->resolve_initial_references("RootPOA");
71 if (CORBA::is_nil (poa_object
))
74 " (%P|%t) Unable to initialize the POA.\n"));
78 PortableServer::POA::_narrow (poa_object
);
80 PortableServer::POAManager_var poa_manager
=
81 root_poa_
->the_POAManager ();
83 poa_manager
->activate ();
87 Subscribe::resolve_naming_service ()
89 CORBA::Object_var naming_obj
=
90 this->orb_
->resolve_initial_references (NAMING_SERVICE_NAME
);
92 // Need to check return value for errors.
93 if (CORBA::is_nil (naming_obj
.in ()))
94 throw CORBA::UNKNOWN ();
96 this->naming_context_
=
97 CosNaming::NamingContext::_narrow (naming_obj
.in ());
101 Subscribe::resolve_Notify_factory ()
103 CosNaming::Name
name (1);
105 name
[0].id
= CORBA::string_dup (NOTIFY_FACTORY_NAME
);
107 CORBA::Object_var obj
=
108 this->naming_context_
->resolve (name
);
110 this->notify_factory_
=
111 CosNotifyChannelAdmin::EventChannelFactory::_narrow (obj
.in ());
115 Subscribe::create_EC ()
117 CosNotifyChannelAdmin::ChannelID id
;
119 ec_
= notify_factory_
->create_channel (initial_qos_
,
123 ACE_ASSERT (!CORBA::is_nil (ec_
.in ()));
127 Subscribe::create_supplieradmin ()
129 CosNotifyChannelAdmin::AdminID adminid
;
132 ec_
->new_for_suppliers (this->ifgop_
, adminid
);
134 ACE_ASSERT (!CORBA::is_nil (supplier_admin_
.in ()));
138 Subscribe:: create_consumeradmin ()
140 CosNotifyChannelAdmin::AdminID adminid
;
143 ec_
->new_for_consumers (this->ifgop_
, adminid
);
145 ACE_ASSERT (!CORBA::is_nil (consumer_admin_
.in ()));
149 Subscribe::create_consumers ()
151 consumer_1_
= new Subscribe_StructuredPushConsumer (this);
152 consumer_1_
->connect (this->consumer_admin_
.in ());
154 consumer_2_
= new Subscribe_StructuredPushConsumer (this);
155 consumer_2_
->connect (this->consumer_admin_
.in ());
159 Subscribe::create_suppliers ()
161 supplier_1_
= new Subscribe_StructuredPushSupplier ();
162 supplier_1_
->connect (this->supplier_admin_
.in ());
164 supplier_2_
= new Subscribe_StructuredPushSupplier ();
165 supplier_2_
->connect (this->supplier_admin_
.in ());
169 Subscribe::send_events ()
171 // Setup the CA to receive event_type : "domain_A", "Type_a"
172 CosNotification::EventTypeSeq
added(1);
173 CosNotification::EventTypeSeq
removed (0);
176 added
[0].domain_name
= CORBA::string_dup (DOMAIN_A
);
177 added
[0].type_name
= CORBA::string_dup (TYPE_A
);
179 this->consumer_admin_
->subscription_change (added
, removed
);
181 // Setup the Consumer 1 to receive event_type : "domain_B", "Type_b"
182 CosNotification::EventTypeSeq
added_1(1);
183 CosNotification::EventTypeSeq
removed_1 (0);
186 added_1
[0].domain_name
= CORBA::string_dup (DOMAIN_B
);
187 added_1
[0].type_name
= CORBA::string_dup (TYPE_B
);
188 removed_1
.length (0);
190 this->consumer_1_
->get_proxy_supplier ()->subscription_change (added_1
, removed_1
);
191 // now the expected subscription for consumer 1 should be
193 this->consumer_1_
->expected_subscription_
.length (2);
194 this->consumer_1_
->expected_subscription_
[0] = added
[0];
195 this->consumer_1_
->expected_subscription_
[1] = added_1
[0];
197 // Setup the Consumer 2 to receive event_type : "domain_C", "Type_c"
198 CosNotification::EventTypeSeq
added_2(1);
199 CosNotification::EventTypeSeq
removed_2 (0);
202 added_2
[0].domain_name
= CORBA::string_dup (DOMAIN_C
);
203 added_2
[0].type_name
= CORBA::string_dup (TYPE_C
);
204 removed_2
.length (0);
206 this->consumer_2_
->get_proxy_supplier ()->subscription_change (added_2
, removed_2
);
207 // now the expected subscription for consumer 2 should be
209 this->consumer_2_
->expected_subscription_
.length (2);
210 this->consumer_2_
->expected_subscription_
[0] = added
[0];
211 this->consumer_2_
->expected_subscription_
[1] = added_2
[0];
213 // Create the events - one of each type
215 CosNotification::StructuredEvent event1
;
216 event1
.header
.fixed_header
.event_type
.domain_name
=
217 CORBA::string_dup(DOMAIN_A
);
218 event1
.header
.fixed_header
.event_type
.type_name
=
219 CORBA::string_dup(TYPE_A
);
220 event1
.header
.fixed_header
.event_name
= CORBA::string_dup("");
221 event1
.header
.variable_header
.length (0); // put nothing here
222 event1
.filterable_data
.length (0);
223 event1
.remainder_of_body
<<= (CORBA::Long
)10;
226 CosNotification::StructuredEvent event2
;
227 event2
.header
.fixed_header
.event_type
.domain_name
=
228 CORBA::string_dup(DOMAIN_B
);
229 event2
.header
.fixed_header
.event_type
.type_name
=
230 CORBA::string_dup(TYPE_B
);
231 event2
.header
.fixed_header
.event_name
= CORBA::string_dup("");
232 event2
.header
.variable_header
.length (0); // put nothing here
233 event2
.filterable_data
.length (0);
234 event2
.remainder_of_body
<<= (CORBA::Long
)10;
237 CosNotification::StructuredEvent event3
;
238 event3
.header
.fixed_header
.event_type
.domain_name
=
239 CORBA::string_dup(DOMAIN_C
);
240 event3
.header
.fixed_header
.event_type
.type_name
=
241 CORBA::string_dup(TYPE_C
);
242 event3
.header
.fixed_header
.event_name
= CORBA::string_dup("");
243 event3
.header
.variable_header
.length (0); // put nothing here
244 event3
.filterable_data
.length (0);
245 event3
.remainder_of_body
<<= (CORBA::Long
)10;
247 // let supplier 1 send all these events
248 for (int i
= 0; i
< 1; ++i
)
250 supplier_1_
->send_event (event1
);
251 supplier_1_
->send_event (event2
);
252 supplier_1_
->send_event (event3
);
256 /*****************************************************************/
257 Subscribe_StructuredPushConsumer::Subscribe_StructuredPushConsumer (Subscribe
* subscribe
)
258 : expected_subscription_ (2),
259 subscribe_ (subscribe
)
263 Subscribe_StructuredPushConsumer::~Subscribe_StructuredPushConsumer ()
268 Subscribe_StructuredPushConsumer::connect
269 (CosNotifyChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
271 // Activate the consumer with the default_POA_
272 CosNotifyComm::StructuredPushConsumer_var objref
=
275 CosNotifyChannelAdmin::ProxySupplier_var proxysupplier
=
276 consumer_admin
->obtain_notification_push_supplier (CosNotifyChannelAdmin::STRUCTURED_EVENT
, proxy_supplier_id_
);
278 ACE_ASSERT (!CORBA::is_nil (proxysupplier
.in ()));
281 this->proxy_supplier_
=
282 CosNotifyChannelAdmin::StructuredProxyPushSupplier::
283 _narrow (proxysupplier
.in ());
285 ACE_ASSERT (!CORBA::is_nil (proxy_supplier_
.in ()));
287 proxy_supplier_
->connect_structured_push_consumer (objref
.in ());
291 Subscribe_StructuredPushConsumer::disconnect ()
293 this->proxy_supplier_
->
294 disconnect_structured_push_supplier();
298 Subscribe_StructuredPushConsumer::offer_change
299 (const CosNotification::EventTypeSeq
& /*added*/,
300 const CosNotification::EventTypeSeq
& /*removed*/)
306 Subscribe_StructuredPushConsumer::push_structured_event
307 (const CosNotification::StructuredEvent
& notification
)
309 const char* domain_name
=
310 notification
.header
.fixed_header
.event_type
.domain_name
;
312 const char* type_name
=
313 notification
.header
.fixed_header
.event_type
.type_name
;
317 for (CORBA::ULong i
= 0; i
< expected_subscription_
.length (); i
++)
319 if ( ACE_OS::strcmp (expected_subscription_
[i
].domain_name
, domain_name
) == 0)
324 ACE_DEBUG ((LM_DEBUG
, "Structured Subscribe Consumer %d received subscribed event, domain = %s, type = %s\n",
325 this->proxy_supplier_id_
, domain_name
, type_name
));
327 ACE_ERROR ((LM_ERROR
, "Error: Structured Subscribe Consumer %d received not subscribed event , domain = %s, type = %s\n",
328 this->proxy_supplier_id_
, domain_name
, type_name
));
330 if (++g_result_count
== EVENT_COUNT
)
335 Subscribe_StructuredPushConsumer::disconnect_structured_push_consumer
341 CosNotifyChannelAdmin::StructuredProxyPushSupplier_ptr
342 Subscribe_StructuredPushConsumer::get_proxy_supplier ()
344 return proxy_supplier_
.in ();
347 /*****************************************************************/
349 Subscribe_StructuredPushSupplier::Subscribe_StructuredPushSupplier ()
353 Subscribe_StructuredPushSupplier::~Subscribe_StructuredPushSupplier ()
358 Subscribe_StructuredPushSupplier::connect
359 (CosNotifyChannelAdmin::SupplierAdmin_ptr supplier_admin
)
361 CosNotifyComm::StructuredPushSupplier_var objref
=
364 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer
=
365 supplier_admin
->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
, proxy_consumer_id_
);
367 ACE_ASSERT (!CORBA::is_nil (proxyconsumer
.in ()));
370 this->proxy_consumer_
=
371 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer
.in ());
373 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_
.in ()));
375 proxy_consumer_
->connect_structured_push_supplier (objref
.in ());
379 Subscribe_StructuredPushSupplier::disconnect ()
381 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
383 this->proxy_consumer_
->disconnect_structured_push_consumer();
387 Subscribe_StructuredPushSupplier::subscription_change
388 (const CosNotification::EventTypeSeq
& /*added*/,
389 const CosNotification::EventTypeSeq
& /*removed */)
395 Subscribe_StructuredPushSupplier::send_event
396 (const CosNotification::StructuredEvent
& event
)
398 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
400 proxy_consumer_
->push_structured_event (event
);
404 Subscribe_StructuredPushSupplier::disconnect_structured_push_supplier