5 #include "tao/ORB_Core.h"
7 TAO_Notify_ThreadPool_Supplier::TAO_Notify_ThreadPool_Supplier (TAO_Notify_ORB_Objects
& orb_objects
)
8 : orb_objects_ (orb_objects
)
9 , proxy_consumer_id_ (0)
10 , expected_consumer_count_ (2)
11 , consumers_connected_ (lock_
)
14 , proxy_consumer_thread_count_ (0)
18 TAO_Notify_ThreadPool_Supplier::~TAO_Notify_ThreadPool_Supplier ()
23 TAO_Notify_ThreadPool_Supplier::init (CosNotifyChannelAdmin::SupplierAdmin_var
& admin
, int expected_consumer_count
,int max_events
,
24 int proxy_consumer_thread_count
)
26 // First initialize the class members.
28 this->expected_consumer_count_
= expected_consumer_count
;
29 this->max_events_
= max_events
;
30 this->proxy_consumer_thread_count_
= proxy_consumer_thread_count
;
36 TAO_Notify_ThreadPool_Supplier::run (void)
39 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
41 ACE_DEBUG ((LM_DEBUG
, "(%P, %t) Waiting for %d consumers to connect...\n", this->expected_consumer_count_
-1));
43 // Wait till the consumers are ready to go.
44 while (this->consumer_count_
!= this->expected_consumer_count_
)
45 this->consumers_connected_
.wait ();
49 "(%P, %t) Supplier is sending an events...\n"));
51 // Send events to each consumer.
52 for (int i
= 0; i
< this->max_events_
; ++i
)
54 for (int j
= 0; j
< this->expected_consumer_count_
; ++j
)
57 this->send_event (this->event_
[j
]);
61 // Disconnect from the EC
64 // Deactivate this object.
67 // we're done. shutdown the ORB to exit the process.
68 this->orb_objects_
.orb_
->shutdown (1);
72 TAO_Notify_ThreadPool_Supplier::connect (void)
74 // Activate the supplier object.
75 CosNotifyComm::StructuredPushSupplier_var objref
= this->_this ();
77 CosNotifyChannelAdmin::ProxyConsumer_var proxyconsumer
;
79 if (this->proxy_consumer_thread_count_
!= 0)
81 // Narrow to the extended interface.
82 NotifyExt::SupplierAdmin_var admin_ext
= NotifyExt::SupplierAdmin::_narrow (this->admin_
.in ());
84 NotifyExt::ThreadPoolParams tp_params
= { NotifyExt::CLIENT_PROPAGATED
, 0,
85 0, static_cast<CORBA::ULong
> (this->proxy_consumer_thread_count_
),
88 CosNotification::QoSProperties
qos (1);
90 qos
[0].name
= CORBA::string_dup (NotifyExt::ThreadPool
);
91 qos
[0].value
<<= tp_params
;
93 // Obtain the proxy. The QoS is applied to the POA in which the Proxy is hosted.
94 proxyconsumer
= admin_ext
->obtain_notification_push_consumer_with_qos (CosNotifyChannelAdmin::STRUCTURED_EVENT
95 , proxy_consumer_id_
, qos
);
100 proxyconsumer
= this->admin_
->obtain_notification_push_consumer (CosNotifyChannelAdmin::STRUCTURED_EVENT
101 , proxy_consumer_id_
);
104 ACE_ASSERT (!CORBA::is_nil (proxyconsumer
.in ()));
107 this->proxy_consumer_
=
108 CosNotifyChannelAdmin::StructuredProxyPushConsumer::_narrow (proxyconsumer
.in ());
110 ACE_ASSERT (!CORBA::is_nil (proxy_consumer_
.in ()));
112 // connect to the proxyconsumer.
113 proxy_consumer_
->connect_structured_push_supplier (objref
.in ());
115 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Created Supplier %d with %d threads at the ProxyConsumer\n", proxy_consumer_id_
,
116 this->proxy_consumer_thread_count_
));
120 TAO_Notify_ThreadPool_Supplier::disconnect (void)
122 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
124 this->proxy_consumer_
->disconnect_structured_push_consumer();
128 TAO_Notify_ThreadPool_Supplier::deactivate (void)
130 PortableServer::POA_var
poa (this->_default_POA ());
132 PortableServer::ObjectId_var
id (poa
->servant_to_id (this));
134 poa
->deactivate_object (id
.in());
138 TAO_Notify_ThreadPool_Supplier::subscription_change (const CosNotification::EventTypeSeq
& added
,
139 const CosNotification::EventTypeSeq
& /*removed */)
141 ACE_GUARD (TAO_SYNCH_MUTEX
, mon
, this->lock_
);
143 // Count the number of consumers connect and signal the supplier thread when the expected count have connected.
144 // Only 1 consumer connects at a time.
145 if (added
.length () > 0)
147 // Set the domain and type nams in the event's fixed header.
148 this->event_
[consumer_count_
].header
.fixed_header
.event_type
.domain_name
= CORBA::string_dup(added
[0].domain_name
);
149 this->event_
[consumer_count_
].header
.fixed_header
.event_type
.type_name
= CORBA::string_dup(added
[0].type_name
);
151 ++this->consumer_count_
;
153 ACE_DEBUG ((LM_DEBUG
, "(%P,%t) Received Type %d: (%s)\n", this->consumer_count_
, added
[0].type_name
.in ()));
155 if (this->consumer_count_
== this->expected_consumer_count_
)
156 this->consumers_connected_
.signal ();
161 TAO_Notify_ThreadPool_Supplier::send_event (const CosNotification::StructuredEvent
& event
)
163 ACE_ASSERT (!CORBA::is_nil (this->proxy_consumer_
.in ()));
165 proxy_consumer_
->push_structured_event (event
);
169 TAO_Notify_ThreadPool_Supplier::disconnect_structured_push_supplier (void)