1 #include "ace/Get_Opt.h"
2 #include "ace/Auto_Ptr.h"
3 #include "ace/Sched_Params.h"
5 #include "ace/OS_NS_unistd.h"
7 #include "tao/Timeprobe.h"
9 #include "orbsvcs/Event_Utilities.h"
10 #include "orbsvcs/Event_Service_Constants.h"
11 #include "orbsvcs/Time_Utilities.h"
12 #include "orbsvcs/CosNamingC.h"
13 #include "ECM_Supplier.h"
15 #include "ace/OS_NS_errno.h"
19 ECMS_Driver::ECMS_Driver (void)
24 event_a_ (ACE_ES_EVENT_UNDEFINED
),
25 event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
30 // ****************************************************************
33 ECMS_Driver::run (int argc
, ACE_TCHAR
* argv
[])
38 CORBA::ORB_init (argc
, argv
);
40 CORBA::Object_var poa_object
=
41 orb
->resolve_initial_references("RootPOA");
42 if (CORBA::is_nil (poa_object
.in ()))
43 ACE_ERROR_RETURN ((LM_ERROR
,
44 " (%P|%t) Unable to initialize the POA.\n"),
47 PortableServer::POA_var root_poa
=
48 PortableServer::POA::_narrow (poa_object
.in ());
50 PortableServer::POAManager_var poa_manager
=
51 root_poa
->the_POAManager ();
53 if (this->parse_args (argc
, argv
))
57 "Execution parameters:\n"
59 " event count = <%d>\n"
60 " event period = <%d>\n"
61 " event size = <%d>\n"
62 " supplier Event A = <%d>\n"
63 " supplier Event B = <%d>\n"
64 " pid file name = <%s>\n",
73 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")) );
75 if (this->pid_file_name_
!= 0)
77 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
80 ACE_OS::fprintf (pid
, "%ld\n",
81 static_cast<long> (ACE_OS::getpid ()));
87 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
88 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
90 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
92 ACE_SCOPE_PROCESS
)) != 0)
94 if (ACE_OS::last_error () == EPERM
)
96 "%s: user is not superuser, "
97 "so remain in time-sharing class\n", argv
[0]));
100 "%s: ACE_OS::sched_params failed\n", argv
[0]));
103 if (ACE_OS::thr_setprio (min_priority
) == -1)
105 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
106 "no real-time features\n"));
109 CORBA::Object_var naming_obj
=
110 orb
->resolve_initial_references ("NameService");
111 if (CORBA::is_nil (naming_obj
.in ()))
112 ACE_ERROR_RETURN ((LM_ERROR
,
113 " (%P|%t) Unable to get the Naming Service.\n"),
116 CosNaming::NamingContext_var naming_context
=
117 CosNaming::NamingContext::_narrow (naming_obj
.in ());
119 CosNaming::Name
name (1);
121 name
[0].id
= CORBA::string_dup ("EventService");
123 CORBA::Object_var ec_obj
=
124 naming_context
->resolve (name
);
126 RtecEventChannelAdmin::EventChannel_var channel
;
127 if (CORBA::is_nil (ec_obj
.in ()))
128 channel
= RtecEventChannelAdmin::EventChannel::_nil ();
130 channel
= RtecEventChannelAdmin::EventChannel::_narrow (ec_obj
.in ());
132 poa_manager
->activate ();
134 this->connect_suppliers (channel
.in ());
136 ACE_DEBUG ((LM_DEBUG
, "connected supplier(s)\n"));
138 this->activate_suppliers ();
140 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
142 // Wait for the supplier threads...
143 if (ACE_Thread_Manager::instance ()->wait () == -1)
145 ACE_ERROR ((LM_ERROR
, "Thread_Manager wait failed\n"));
149 ACE_DEBUG ((LM_DEBUG
, "suppliers finished\n"));
151 this->disconnect_suppliers ();
153 // @@ Deactivate the suppliers (as CORBA Objects?)
155 catch (const CORBA::SystemException
& sys_ex
)
157 sys_ex
._tao_print_exception ("SYS_EX in Supplier");
159 catch (const CORBA::Exception
& ex
)
161 ex
._tao_print_exception ("NON SYS EX in Supplier");
167 ECMS_Driver::supplier_task (Test_Supplier
*supplier
,
172 ACE_Time_Value
tv (0, this->event_period_
);
174 CORBA::ULong n
= this->event_size_
;
176 ECM_IDLData::Info info
;
177 info
.mobile_name
= CORBA::string_dup ("test");
178 info
.mobile_speed
= 1;
179 info
.trajectory
.length (n
);
182 other
.description
= CORBA::string_dup ("some data");
184 for (CORBA::ULong j
= 0; j
< n
; ++j
)
186 info
.trajectory
[j
].x
= j
;
187 info
.trajectory
[j
].y
= j
* j
;
188 other
.inventory
.bind (j
, j
+ 1);
191 ACE_DEBUG ((LM_DEBUG
,
192 "The inventory contains (%d) elements\n",
193 other
.inventory
.current_size ()));
195 // We have to make it big enough so we get a contiguous block,
196 // otherwise the octet sequence will not work correctly.
197 // NOTE: we could pre-allocate enough memory in the CDR stream
198 // but we want to show that chaining works!
201 CORBA::Boolean byte_order
= TAO_ENCAP_BYTE_ORDER
;
202 cdr
<< CORBA::Any::from_boolean (byte_order
);
204 // The typecode name standard, the encode method is not (in
205 // general the CDR interface is not specified).
207 throw CORBA::MARSHAL ();
209 // Here we marshall a non-IDL type.
212 if (!cdr
.good_bit ())
213 ACE_ERROR ((LM_ERROR
, "Problem marshalling C++ data\n"));
215 const ACE_Message_Block
* mb
= cdr
.begin ();
216 // NOTE: total_length () return the length of the complete
218 CORBA::ULong mblen
= cdr
.total_length ();
220 for (CORBA::Long i
= 0; i
< this->event_count_
; ++i
)
222 RtecEventComm::EventSet
event (1);
224 event
[0].header
.source
= supplier
->supplier_id ();
225 event
[0].header
.ttl
= 1;
227 ACE_hrtime_t t
= ACE_OS::gethrtime ();
228 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
, t
);
230 if (i
== static_cast<CORBA::Long
> (this->event_count_
) - 1)
231 event
[0].header
.type
= ACE_ES_EVENT_SHUTDOWN
;
233 event
[0].header
.type
= this->event_a_
;
235 event
[0].header
.type
= this->event_b_
;
237 // We use replace to minimize the copies, this should result
238 // in just one memory allocation;
239 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
240 event
[0].data
.payload
.replace (mblen
, mb
);
242 // If the replace method is not available, we will need
243 // to do the copy manually. First, set the octet sequence length.
244 event
[0].data
.payload
.length (mblen
);
246 // Now copy over each byte.
247 char* base
= mb
->data_block ()->base ();
248 for(CORBA::ULong i
= 0; i
< mblen
; i
++)
250 event
[0].data
.payload
[i
] = base
[i
];
252 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
254 supplier
->consumer_proxy ()->push(event
);
256 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
261 catch (const CORBA::SystemException
& sys_ex
)
263 sys_ex
._tao_print_exception ("SYS_EX");
265 catch (const CORBA::Exception
& ex
)
267 ex
._tao_print_exception ("NON SYS EX");
273 ECMS_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr channel
)
275 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
278 ACE_OS::sprintf (buf
, "supplier_%02d", i
);
280 ACE_NEW (this->suppliers_
[i
], Test_Supplier (this));
282 this->suppliers_
[i
]->connect (buf
,
290 ECMS_Driver::activate_suppliers (void)
292 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
294 this->suppliers_
[i
]->activate ();
299 ECMS_Driver::disconnect_suppliers (void)
301 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
303 this->suppliers_
[i
]->disconnect ();
308 ECMS_Driver::parse_args (int argc
, ACE_TCHAR
*argv
[])
310 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("ds:n:t:h:p:b:"));
313 while ((opt
= get_opt ()) != EOF
)
318 this->n_suppliers_
= ACE_OS::atoi (get_opt
.opt_arg ());
322 this->event_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
326 this->event_period_
= ACE_OS::atoi (get_opt
.opt_arg ());
330 this->event_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
336 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
338 this->event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
339 arg
= ACE_OS::strtok_r (0, ",", &aux
);
340 this->event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
345 this->pid_file_name_
= get_opt
.opt_arg ();
350 ACE_DEBUG ((LM_DEBUG
,
355 "-t <event period (usecs)> "
356 "-h <eventa,eventb> "
357 "-p <pid file name> "
364 if (this->event_count_
<= 0)
366 ACE_DEBUG ((LM_DEBUG
,
367 "%s: event count (%d) is out of range, "
368 "reset to default (%d)\n",
369 argv
[0], this->event_count_
,
371 this->event_count_
= 100;
374 if (this->event_size_
< 0)
376 ACE_DEBUG ((LM_DEBUG
,
377 "%s: event size (%d) is out of range, "
378 "reset to default (%d)\n",
379 argv
[0], this->event_size_
,
381 this->event_count_
= 32;
384 if (this->n_suppliers_
<= 0)
386 this->n_suppliers_
= 1;
387 ACE_ERROR_RETURN ((LM_ERROR
,
388 "%s: number of suppliers out of range, "
389 "reset to default (%d)\n",
396 Test_Supplier::Test_Supplier (ECMS_Driver
*driver
)
403 Test_Supplier::connect (const char* name
,
406 RtecEventChannelAdmin::EventChannel_ptr ec
)
408 this->supplier_id_
= ACE::crc32 (name
);
409 ACE_DEBUG ((LM_DEBUG
,
410 "ID for <%s> is %04.4x\n",
412 this->supplier_id_
));
414 ACE_SupplierQOS_Factory qos
;
415 qos
.insert (this->supplier_id_
,
418 qos
.insert (this->supplier_id_
,
421 qos
.insert (this->supplier_id_
,
422 ACE_ES_EVENT_SHUTDOWN
,
425 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
426 ec
->for_suppliers ();
428 this->consumer_proxy_
=
429 supplier_admin
->obtain_push_consumer ();
431 RtecEventComm::PushSupplier_var objref
=
432 this->supplier_
._this ();
434 this->consumer_proxy_
->connect_push_supplier (objref
.in (),
435 qos
.get_SupplierQOS ());
439 Test_Supplier::disconnect (void)
441 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
444 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
=
445 this->consumer_proxy_
._retn ();
449 proxy
->disconnect_push_consumer ();
451 catch (const CORBA::OBJECT_NOT_EXIST
&)
453 // Ignore, the EC can shutdown before we get a chance to
456 catch (const CORBA::TRANSIENT
&)
458 // Ignore, the EC can shutdown before we get a chance to
461 catch (const CORBA::Exception
&)
468 Test_Supplier::svc ()
470 return this->driver_
->supplier_task (this, this->cookie_
);
474 Test_Supplier::disconnect_push_supplier (void)
476 this->consumer_proxy_
=
477 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
480 int Test_Supplier::supplier_id (void) const
482 return this->supplier_id_
;
485 RtecEventChannelAdmin::ProxyPushConsumer_ptr
486 Test_Supplier::consumer_proxy (void)
488 return this->consumer_proxy_
.in ();
492 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
495 return driver
.run (argc
, argv
);