1 #include "Counting_Consumer.h"
2 #include "ace/OS_NS_unistd.h"
4 CEC_Counting_Consumer::CEC_Counting_Consumer (const char* name
)
12 CEC_Counting_Consumer::connect (CosEventChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
14 // The canonical protocol to connect to the EC
16 CosEventComm::PushConsumer_var consumer
=
19 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
21 this->supplier_proxy_
=
22 consumer_admin
->obtain_push_supplier ();
25 this->supplier_proxy_
->connect_push_consumer (consumer
.in ());
29 CEC_Counting_Consumer::disconnect ()
31 if (!CORBA::is_nil (this->supplier_proxy_
.in ()))
33 this->supplier_proxy_
->disconnect_push_supplier ();
36 PortableServer::POA_var consumer_poa
=
37 this->_default_POA ();
38 PortableServer::ObjectId_var consumer_id
=
39 consumer_poa
->servant_to_id (this);
40 consumer_poa
->deactivate_object (consumer_id
.in ());
42 this->supplier_proxy_
=
43 CosEventChannelAdmin::ProxyPushSupplier::_nil ();
47 CEC_Counting_Consumer::dump_results (int expected_count
, int tolerance
)
49 int diff
= this->event_count
- expected_count
;
50 if (diff
> tolerance
|| diff
< -tolerance
)
53 "ERROR - %s unexpected number of events <%d>\n",
60 "%s - number of events <%d> within margins\n",
67 CEC_Counting_Consumer::push (const CORBA::Any
&)
71 if (this->event_count
% 10 == 0)
74 "%s (%P|%t): %d events received\n",
82 CEC_Counting_Consumer::disconnect_push_consumer ()
84 this->disconnect_count
++;
85 this->supplier_proxy_
=
86 CosEventChannelAdmin::ProxyPushSupplier::_nil ();
89 // ****************************************************************
91 CEC_Pull_Counting_Consumer::CEC_Pull_Counting_Consumer (const char* name
)
99 CEC_Pull_Counting_Consumer::connect (CosEventChannelAdmin::ConsumerAdmin_ptr consumer_admin
)
101 // The canonical protocol to connect to the EC
103 CosEventComm::PullConsumer_var consumer
=
106 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
108 this->supplier_proxy_
=
109 consumer_admin
->obtain_pull_supplier ();
112 this->supplier_proxy_
->connect_pull_consumer (consumer
.in ());
116 CEC_Pull_Counting_Consumer::disconnect ()
118 if (!CORBA::is_nil (this->supplier_proxy_
.in ()))
120 this->supplier_proxy_
->disconnect_pull_supplier ();
123 PortableServer::POA_var consumer_poa
=
124 this->_default_POA ();
125 PortableServer::ObjectId_var consumer_id
=
126 consumer_poa
->servant_to_id (this);
127 consumer_poa
->deactivate_object (consumer_id
.in ());
129 this->supplier_proxy_
=
130 CosEventChannelAdmin::ProxyPullSupplier::_nil ();
134 CEC_Pull_Counting_Consumer::pull ()
136 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
141 return this->supplier_proxy_
->pull ();
145 CEC_Pull_Counting_Consumer::try_pull (CORBA::Boolean_out has_event
)
147 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
153 CORBA::Any_var event
=
154 this->supplier_proxy_
->try_pull (has_event
);
159 return event
._retn ();
163 CEC_Pull_Counting_Consumer::dump_results (int expected_count
, int tolerance
)
165 int diff
= this->event_count
- expected_count
;
166 if (diff
> tolerance
|| diff
< -tolerance
)
168 ACE_DEBUG ((LM_DEBUG
,
169 "ERROR - %s unexpected number of events <%d>\n",
175 ACE_DEBUG ((LM_DEBUG
,
176 "%s - number of events <%d> within margins\n",
183 CEC_Pull_Counting_Consumer::disconnect_pull_consumer ()
185 this->disconnect_count
++;
186 this->supplier_proxy_
=
187 CosEventChannelAdmin::ProxyPullSupplier::_nil ();
190 // ****************************************************************
192 CEC_Counting_Consumer_Task::
193 CEC_Counting_Consumer_Task (CEC_Pull_Counting_Consumer
*s
,
198 milliseconds_ (milliseconds
)
203 CEC_Counting_Consumer_Task::svc ()
209 catch (const CORBA::Exception
&)
217 CEC_Counting_Consumer_Task::stop ()
219 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
220 this->stop_flag_
= 1;
224 CEC_Counting_Consumer_Task::pull_count ()
226 return this->pull_count_
;
230 CEC_Counting_Consumer_Task::run ()
233 event
<<= CORBA::Long(0);
237 CORBA::Boolean has_event
;
238 CORBA::Any_var event
=
239 this->consumer_
->try_pull (has_event
);
241 if (this->milliseconds_
!= 0)
243 ACE_Time_Value
tv (0, 1000 * this->milliseconds_
);
248 // Sleep for a short time to avoid spinning... kind of klugy
254 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
258 stop
= this->stop_flag_
;