1 #include "ECM_Consumer.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Event_Service_Constants.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/CosNamingC.h"
9 #include "tao/Timeprobe.h"
10 #include "tao/ORB_Core.h"
13 #include "ace/Get_Opt.h"
15 #include "ace/Sched_Params.h"
16 #include "ace/OS_NS_errno.h"
17 #include "ace/OS_NS_unistd.h"
20 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
23 return driver
.run (argc
, argv
);
26 // ****************************************************************
31 event_a_ (ACE_ES_EVENT_UNDEFINED
),
32 event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
38 // ****************************************************************
41 Driver::run (int argc
, ACE_TCHAR
* argv
[])
46 CORBA::ORB_init (argc
, argv
);
48 CORBA::Object_var poa_object
=
49 orb
->resolve_initial_references("RootPOA");
51 if (CORBA::is_nil (poa_object
.in ()))
52 ACE_ERROR_RETURN ((LM_ERROR
,
53 " (%P|%t) Unable to initialize the POA.\n"),
56 PortableServer::POA_var root_poa
=
57 PortableServer::POA::_narrow (poa_object
.in ());
59 PortableServer::POAManager_var poa_manager
=
60 root_poa
->the_POAManager ();
62 if (this->parse_args (argc
, argv
))
66 "Execution parameters:\n"
68 " event count = <%d>\n"
69 " supplier Event A = <%d>\n"
70 " supplier Event B = <%d>\n"
71 " pid file name = <%s>\n",
78 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")));
80 if (this->pid_file_name_
!= 0)
82 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
85 ACE_OS::fprintf (pid
, "%ld\n",
86 static_cast<long> (ACE_OS::getpid ()));
92 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
93 // Enable FIFO scheduling
95 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
97 ACE_SCOPE_PROCESS
)) != 0)
99 if (ACE_OS::last_error () == EPERM
)
100 ACE_DEBUG ((LM_DEBUG
,
101 "%s: user is not superuser, "
102 "so remain in time-sharing class\n", argv
[0]));
104 ACE_ERROR ((LM_ERROR
,
105 "%s: ACE_OS::sched_params failed\n", argv
[0]));
108 if (ACE_OS::thr_setprio (min_priority
) == -1)
110 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
111 "no real-time features\n"));
114 CORBA::Object_var naming_obj
=
115 orb
->resolve_initial_references ("NameService");
117 if (CORBA::is_nil (naming_obj
.in ()))
118 ACE_ERROR_RETURN ((LM_ERROR
,
119 " (%P|%t) Unable to get the Naming Service.\n"),
122 CosNaming::NamingContext_var naming_context
=
123 CosNaming::NamingContext::_narrow (naming_obj
.in ());
125 CosNaming::Name
name (1);
127 name
[0].id
= CORBA::string_dup ("EventService");
129 CORBA::Object_var ec_obj
=
130 naming_context
->resolve (name
);
132 RtecEventChannelAdmin::EventChannel_var channel
;
133 if (CORBA::is_nil (ec_obj
.in ()))
134 channel
= RtecEventChannelAdmin::EventChannel::_nil ();
136 channel
= RtecEventChannelAdmin::EventChannel::_narrow (ec_obj
.in ());
138 poa_manager
->activate ();
140 this->connect_consumers (channel
.in ());
142 ACE_DEBUG ((LM_DEBUG
, "connected consumer(s)\n"));
144 ACE_DEBUG ((LM_DEBUG
, "running the test\n"));
147 ACE_DEBUG ((LM_DEBUG
, "event loop finished\n"));
149 this->disconnect_consumers ();
153 catch (const CORBA::SystemException
& sys_ex
)
155 sys_ex
._tao_print_exception ("SYS_EX in Consumer");
157 catch (const CORBA::Exception
& ex
)
159 ex
._tao_print_exception ("NON SYS EX in Consumer");
165 Driver::push_consumer (void* /* consumer_cookie */,
166 ACE_hrtime_t
/* arrival */,
167 const RtecEventComm::EventSet
& events
)
170 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
171 // - this->consumers_);
173 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
175 if (events
.length () == 0)
177 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
181 ACE_GUARD (TAO_SYNCH_MUTEX
, ace_mon
, this->recv_count_mutex_
);
183 this->recv_count_
+= events
.length ();
185 int x
= this->event_count_
/ 10;
186 if (this->recv_count_
% x
== 0)
188 ACE_DEBUG ((LM_DEBUG
,
189 "ECM_Consumer (%P|%t): %d events received\n",
193 if (this->recv_count_
>= this->event_count_
)
195 TAO_ORB_Core_instance ()->orb ()->shutdown ();
198 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
200 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
201 for (u_int i
= 0; i
< events
.length (); ++i
)
203 const RtecEventComm::Event
& e
= events
[i
];
205 if (e
.data
.payload
.mb () == 0)
207 ACE_DEBUG ((LM_DEBUG
, "No data in event[%d]\n", i
));
211 // @@ TODO this is a little messy, infortunately we have to
212 // extract the first byte to determine the byte order, the CDR
213 // cannot do it for us because in certain cases the byte order
214 // is not in the encapsulation. Maybe we need another
215 // constructor for the InputCDR streams (but there are too many
218 // Note that there is no copying
219 int byte_order
= e
.data
.payload
[0];
221 ACE_Message_Block
* mb
=
222 ACE_Message_Block::duplicate (e
.data
.payload
.mb ());
223 mb
->rd_ptr (1); // skip the byte order
225 TAO_InputCDR
cdr (mb
, byte_order
);
227 ECM_IDLData::Info info
;
233 if (!cdr
.good_bit ())
234 ACE_ERROR ((LM_ERROR
, "Problem demarshalling C++ data\n"));
236 ACE_Message_Block::release (mb
);
238 CORBA::ULong n
= info
.trajectory
.length ();
239 // ACE_DEBUG ((LM_DEBUG, "Payload contains <%d> elements\n", n));
240 // ACE_DEBUG ((LM_DEBUG, "Inventory <%s> contains <%d> elements\n",
241 // other.description.in (),
242 // other.inventory.current_size ()));
244 for (CORBA::ULong j
= 0; j
< n
; ++j
)
246 ECM_IDLData::Point
& p
= info
.trajectory
[j
];
247 if (static_cast<CORBA::ULong
>(p
.x
) != j
||
248 static_cast<CORBA::ULong
>(p
.y
) != j
*j
)
250 ACE_DEBUG ((LM_DEBUG
,
251 "invalid data in trajectory[%d] = (%f,%f)\n",
256 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
260 Driver::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr channel
)
262 for (int i
= 0; i
< this->n_consumers_
; ++i
)
265 ACE_OS::sprintf (buf
, "consumer_%02d", i
);
267 ACE_NEW (this->consumers_
[i
],
268 Test_Consumer (this, this->consumers_
+ i
));
270 this->consumers_
[i
]->connect (this->event_a_
,
277 Driver::disconnect_consumers ()
279 for (int i
= 0; i
< this->n_consumers_
; ++i
)
281 this->consumers_
[i
]->disconnect ();
286 Driver::parse_args (int argc
, ACE_TCHAR
*argv
[])
288 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("dc:n:h:p:"));
291 while ((opt
= get_opt ()) != EOF
)
296 this->n_consumers_
= ACE_OS::atoi (get_opt
.opt_arg ());
300 this->event_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
306 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
308 this->event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
309 arg
= ACE_OS::strtok_r (0, ",", &aux
);
310 this->event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
315 this->pid_file_name_
= get_opt
.opt_arg ();
320 ACE_DEBUG ((LM_DEBUG
,
324 "-a (send data in events) "
326 "-p <pid file name> "
333 if (this->event_count_
<= 0)
335 ACE_DEBUG ((LM_DEBUG
,
336 "%s: event count (%d) is out of range, "
337 "reset to default (%d)\n",
338 argv
[0], this->event_count_
,
340 this->event_count_
= 100;
343 if (this->n_consumers_
<= 0)
345 ACE_ERROR_RETURN ((LM_ERROR
,
346 "%s: number of consumers or "
347 "suppliers out of range\n", argv
[0]), -1);
353 // ****************************************************************
355 Test_Consumer::Test_Consumer (Driver
*driver
, void *cookie
)
362 Test_Consumer::connect (int event_a
,
364 RtecEventChannelAdmin::EventChannel_ptr ec
)
366 ACE_ConsumerQOS_Factory qos
;
367 qos
.start_disjunction_group ();
368 qos
.insert_type (ACE_ES_EVENT_SHUTDOWN
, 0);
369 qos
.insert_type (event_a
, 0);
370 qos
.insert_type (event_b
, 0);
372 // = Connect as a consumer.
373 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin
=
374 ec
->for_consumers ();
376 this->supplier_proxy_
=
377 consumer_admin
->obtain_push_supplier ();
379 RtecEventComm::PushConsumer_var objref
= this->_this ();
381 this->supplier_proxy_
->connect_push_consumer (objref
.in (),
382 qos
.get_ConsumerQOS ());
386 Test_Consumer::disconnect ()
388 if (CORBA::is_nil (this->supplier_proxy_
.in ()))
391 RtecEventChannelAdmin::ProxyPushSupplier_var proxy
=
392 this->supplier_proxy_
._retn ();
394 proxy
->disconnect_push_supplier ();
398 Test_Consumer::push (const RtecEventComm::EventSet
& events
)
400 ACE_hrtime_t arrival
= ACE_OS::gethrtime ();
401 this->driver_
->push_consumer (this->cookie_
, arrival
, events
);
405 Test_Consumer::disconnect_push_consumer ()