4 #include "tao/ORB_Core.h"
6 TAO_Notify_Lanes_Supplier::TAO_Notify_Lanes_Supplier (TAO_Notify_ORB_Objects
& orb_objects
)
7 : orb_objects_ (orb_objects
)
8 , proxy_consumer_id_ (0)
9 , expected_consumer_count_ (2)
10 , consumers_connected_ (lock_
)
15 TAO_Notify_Lanes_Supplier::~TAO_Notify_Lanes_Supplier ()
20 TAO_Notify_Lanes_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var
& admin
, int expected_consumer_count
)
22 // First initialize the class members.
24 this->expected_consumer_count_
= expected_consumer_count
;
30 TAO_Notify_Lanes_Supplier::run ()
32 // The Priority at which we send the first event to the first consumer.
33 RTCORBA::Priority priority
= 1;
36 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
38 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_
));
40 // Wait till the consumers are ready to go.
41 while (this->consumer_count_
!= this->expected_consumer_count_
)
42 this->consumers_connected_
.wait ();
45 // Send an event each to each consumer.
46 // Each Consumer expects a different priority.
47 for (int i
= 0; i
< this->expected_consumer_count_
; ++i
, ++priority
)
49 // Set this threads priority.
50 this->orb_objects_
.current_
->the_priority (priority
);
52 // Make sure the priority was set, get the priority of the current thread.
53 RTCORBA::Priority thread_priority
=
54 this->orb_objects_
.current_
->the_priority ();
56 // We will send this event.
57 CosNotification::StructuredEvent event
;
59 // Populate the Priority field so that the consumer can deduce the suppliers priority
60 // to do a sanity check when it receives the event.
61 CosNotification::PropertySeq
& opt
= event
.header
.variable_header
;
65 buffer
<<= (CORBA::Short
) thread_priority
;
67 opt
[0].name
= CORBA::string_dup (CosNotification::Priority
);
68 opt
[0].value
= buffer
;
70 // Set the domain and type nams in the event's fixed header.
72 ACE_OS::sprintf (type
, "TEST_TYPE_%d", thread_priority
);
74 event
.header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup("TEST_DOMAIN");
75 event
.header
.fixed_header
.event_type
.type_name
= CORBA::string_dup(type
);
78 "(%P, %t) Supplier is sending an event of type %s at priority %d\n", type
, thread_priority
));
81 this->send_event (event
);
82 } // repeat for the next consumer at the next priority.
84 // Disconnect from the EC
87 // Deactivate this object.
90 // we're done. shutdown the ORB to exit the process.
91 this->orb_objects_
.orb_
->shutdown (true);
95 TAO_Notify_Lanes_Supplier::connect ()
97 // Activate the supplier object.
98 CosNotifyComm::StructuredPushSupplier_var objref
= this->_this ();
101 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer
=
102 this->admin_
->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
103 , proxy_consumer_id_
);
105 ACE_ASSERT (!CORBA::is_nil (proxyconsumer
.in ()));
108 this->proxy_consumer_
=
109 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer
.in ());
111 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_
.in ()));
113 // connect to the proxyconsumer.
114 proxy_consumer_
->connect_structured_push_supplier (objref
.in ());
118 TAO_Notify_Lanes_Supplier::disconnect ()
120 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
122 this->proxy_consumer_
->disconnect_structured_push_consumer();
126 TAO_Notify_Lanes_Supplier::deactivate ()
128 PortableServer::POA_var
poa (this->_default_POA ());
130 PortableServer::ObjectId_var
id (poa
->servant_to_id (this));
132 poa
->deactivate_object (id
.in());
136 TAO_Notify_Lanes_Supplier::subscription_change (const CosNotification::EventTypeSeq
& added
,
137 const CosNotification::EventTypeSeq
& /*removed */)
139 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
141 // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
142 if (added
.length () > 0)
144 if (++this->consumer_count_
== this->expected_consumer_count_
)
145 this->consumers_connected_
.signal ();
150 TAO_Notify_Lanes_Supplier::send_event (const CosNotification::StructuredEvent
& event
)
152 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
154 proxy_consumer_
->push_structured_event (event
);
158 TAO_Notify_Lanes_Supplier::disconnect_structured_push_supplier ()