1 #include "ECT_Consumer.h"
3 #include "orbsvcs/Event_Utilities.h"
4 #include "orbsvcs/Event_Service_Constants.h"
5 #include "orbsvcs/Time_Utilities.h"
7 #include "tao/Timeprobe.h"
10 #include "ace/Get_Opt.h"
11 #include "ace/Auto_Ptr.h"
12 #include "ace/Sched_Params.h"
13 #include "ace/OS_NS_unistd.h"
15 Test_Consumer::Test_Consumer (ECT_Driver
*driver
,
21 n_suppliers_ (n_suppliers
),
24 stall_length_(stall_length
)
29 Test_Consumer::connect (RtecScheduler::Scheduler_ptr scheduler
,
33 RtecEventChannelAdmin::EventChannel_ptr ec
)
35 RtecScheduler::handle_t rt_info
=
36 scheduler
->create (name
);
38 // The worst case execution time is far less than 2
39 // milliseconds, but that is a safe estimate....
40 ACE_Time_Value
tv (0, 2000);
42 ORBSVCS_Time::Time_Value_to_TimeT (time
, tv
);
43 scheduler
->set (rt_info
,
44 RtecScheduler::VERY_HIGH_CRITICALITY
,
47 RtecScheduler::VERY_LOW_IMPORTANCE
,
50 RtecScheduler::OPERATION
);
52 ACE_ConsumerQOS_Factory qos
;
53 qos
.start_disjunction_group ();
54 qos
.insert_type (ACE_ES_EVENT_SHUTDOWN
, rt_info
);
55 for (int i
= 0; i
!= type_count
; ++i
)
57 qos
.insert_type (type_start
+ i
, rt_info
);
60 // = Connect as a consumer.
61 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
64 this->supplier_proxy_
=
65 consumer_admin
->obtain_push_supplier ();
67 RtecEventComm::PushConsumer_var objref
= this->_this ();
69 this->supplier_proxy_
->connect_push_consumer (objref
.in (),
70 qos
.get_ConsumerQOS ());
74 Test_Consumer::disconnect (void)
76 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
79 this->supplier_proxy_
->disconnect_push_supplier ();
81 this->supplier_proxy_
=
82 RtecEventChannelAdmin::ProxyPushSupplier::_nil ();
84 // Deactivate the servant
85 PortableServer::POA_var poa
=
86 this->_default_POA ();
87 PortableServer::ObjectId_var id
=
88 poa
->servant_to_id (this);
89 poa
->deactivate_object (id
.in ());
93 Test_Consumer::dump_results (const ACE_TCHAR
* name
,
94 ACE_Basic_Stats::scale_factor_type gsf
)
96 this->throughput_
.dump_results (name
, gsf
);
100 Test_Consumer::accumulate (ACE_Throughput_Stats
& stats
) const
102 stats
.accumulate (this->throughput_
);
106 Test_Consumer::push (const RtecEventComm::EventSet
& events
)
108 if (events
.length () == 0)
110 ACE_DEBUG ((LM_DEBUG
,
111 "ECT_Consumer (%P|%t) no events\n"));
115 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->lock_
);
117 // We start the timer as soon as we receive the first event...
118 if (this->recv_count_
== 0)
120 this->first_event_
= ACE_OS::gethrtime ();
121 ACE_DEBUG ((LM_DEBUG
,
122 "ECT_Consumer (%P|%t) stalling for %d seconds\n", this->stall_length_
));
123 ACE_OS::sleep(this->stall_length_
);
124 ACE_DEBUG ((LM_DEBUG
, "ECT_Consumer (%P|%t) finished stalling\n"));
128 this->recv_count_
+= events
.length ();
130 if (TAO_debug_level
> 0
131 && this->recv_count_
% 100 == 0)
133 ACE_DEBUG ((LM_DEBUG
,
134 "ECT_Consumer (%P|%t): %d events received\n",
138 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
140 for (u_int i
= 0; i
< events
.length (); ++i
)
142 const RtecEventComm::Event
& e
= events
[i
];
144 if (e
.header
.type
== ACE_ES_EVENT_SHUTDOWN
)
146 this->shutdown_count_
++;
147 if (this->shutdown_count_
>= this->n_suppliers_
)
149 // We stop the timer as soon as we realize it is time to
151 this->driver_
->shutdown_consumer (this->cookie_
);
156 ACE_hrtime_t creation
;
157 ORBSVCS_Time::TimeT_to_hrtime (creation
,
158 e
.header
.creation_time
);
160 const ACE_hrtime_t now
= ACE_OS::gethrtime ();
161 this->throughput_
.sample (now
- this->first_event_
,
168 Test_Consumer::disconnect_push_consumer (void)