5 #include "tao/ORB_Core.h"
7 TAO_Notify_Lanes_Supplier::TAO_Notify_Lanes_Supplier (TAO_Notify_ORB_Objects
& orb_objects
)
8 : orb_objects_ (orb_objects
)
9 , proxy_consumer_id_ (0)
10 , expected_consumer_count_ (2)
11 , consumers_connected_ (lock_
)
16 TAO_Notify_Lanes_Supplier::~TAO_Notify_Lanes_Supplier ()
21 TAO_Notify_Lanes_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var
& admin
, int expected_consumer_count
)
23 // First initialize the class members.
25 this->expected_consumer_count_
= expected_consumer_count
;
31 TAO_Notify_Lanes_Supplier::run (void)
33 // The Priority at which we send the first event to the first consumer.
34 RTCORBA::Priority priority
= 1;
37 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
39 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_
));
41 // Wait till the consumers are ready to go.
42 while (this->consumer_count_
!= this->expected_consumer_count_
)
43 this->consumers_connected_
.wait ();
46 // Send an event each to each consumer.
47 // Each Consumer expects a different priority.
48 for (int i
= 0; i
< this->expected_consumer_count_
; ++i
, ++priority
)
50 // Set this threads priority.
51 this->orb_objects_
.current_
->the_priority (priority
);
53 // Make sure the priority was set, get the priority of the current thread.
54 RTCORBA::Priority thread_priority
=
55 this->orb_objects_
.current_
->the_priority ();
57 // We will send this event.
58 CosNotification::StructuredEvent event
;
60 // Populate the Priority field so that the consumer can deduce the suppliers priority
61 // to do a sanity check when it receives the event.
62 CosNotification::PropertySeq
& opt
= event
.header
.variable_header
;
66 buffer
<<= (CORBA::Short
) thread_priority
;
68 opt
[0].name
= CORBA::string_dup (CosNotification::Priority
);
69 opt
[0].value
= buffer
;
71 // Set the domain and type nams in the event's fixed header.
73 ACE_OS::sprintf (type
, "TEST_TYPE_%d", thread_priority
);
75 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("TEST_DOMAIN");
76 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup(type
);
79 "(%P, %t) Supplier is sending an event of type %s at priority %d\n", type
, thread_priority
));
82 this->send_event (event
);
83 } // repeat for the next consumer at the next priority.
85 // Disconnect from the EC
88 // Deactivate this object.
91 // we're done. shutdown the ORB to exit the process.
92 this->orb_objects_
.orb_
->shutdown (1);
96 TAO_Notify_Lanes_Supplier::connect (void)
98 // Activate the supplier object.
99 CosNotifyComm::StructuredPushSupplier_var objref
= this->_this ();
102 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer
=
103 this->admin_
->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
104 , proxy_consumer_id_
);
106 ACE_ASSERT (!CORBA::is_nil (proxyconsumer
.in ()));
109 this->proxy_consumer_
=
110 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer
.in ());
112 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_
.in ()));
114 // connect to the proxyconsumer.
115 proxy_consumer_
->connect_structured_push_supplier (objref
.in ());
119 TAO_Notify_Lanes_Supplier::disconnect (void)
121 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
123 this->proxy_consumer_
->disconnect_structured_push_consumer();
127 TAO_Notify_Lanes_Supplier::deactivate (void)
129 PortableServer::POA_var
poa (this->_default_POA ());
131 PortableServer::ObjectId_var
id (poa
->servant_to_id (this));
133 poa
->deactivate_object (id
.in());
137 TAO_Notify_Lanes_Supplier::subscription_change (const CosNotification::EventTypeSeq
& added
,
138 const CosNotification::EventTypeSeq
& /*removed */)
140 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
142 // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
143 if (added
.length () > 0)
145 if (++this->consumer_count_
== this->expected_consumer_count_
)
146 this->consumers_connected_
.signal ();
151 TAO_Notify_Lanes_Supplier::send_event (const CosNotification::StructuredEvent
& event
)
153 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
155 proxy_consumer_
->push_structured_event (event
);
159 TAO_Notify_Lanes_Supplier::disconnect_structured_push_supplier (void)