1 #include "ace/Get_Opt.h"
3 #include "ace/Sched_Params.h"
5 #include "ace/OS_NS_unistd.h"
7 #include "tao/Timeprobe.h"
9 #include "orbsvcs/Event_Utilities.h"
10 #include "orbsvcs/Event_Service_Constants.h"
11 #include "orbsvcs/Time_Utilities.h"
12 #include "orbsvcs/CosNamingC.h"
13 #include "ECM_Supplier.h"
15 #include "ace/OS_NS_errno.h"
18 ECMS_Driver::ECMS_Driver ()
23 event_a_ (ACE_ES_EVENT_UNDEFINED
),
24 event_b_ (ACE_ES_EVENT_UNDEFINED
+ 1),
29 // ****************************************************************
32 ECMS_Driver::run (int argc
, ACE_TCHAR
* argv
[])
37 CORBA::ORB_init (argc
, argv
);
39 CORBA::Object_var poa_object
=
40 orb
->resolve_initial_references("RootPOA");
41 if (CORBA::is_nil (poa_object
.in ()))
42 ACE_ERROR_RETURN ((LM_ERROR
,
43 " (%P|%t) Unable to initialize the POA.\n"),
46 PortableServer::POA_var root_poa
=
47 PortableServer::POA::_narrow (poa_object
.in ());
49 PortableServer::POAManager_var poa_manager
=
50 root_poa
->the_POAManager ();
52 if (this->parse_args (argc
, argv
))
56 "Execution parameters:\n"
58 " event count = <%d>\n"
59 " event period = <%d>\n"
60 " event size = <%d>\n"
61 " supplier Event A = <%d>\n"
62 " supplier Event B = <%d>\n"
63 " pid file name = <%s>\n",
72 this->pid_file_name_
?this->pid_file_name_
:ACE_TEXT("nil")));
74 if (this->pid_file_name_
!= 0)
76 FILE* pid
= ACE_OS::fopen (this->pid_file_name_
, "w");
79 ACE_OS::fprintf (pid
, "%ld\n",
80 static_cast<long> (ACE_OS::getpid ()));
86 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO
);
87 // Enable FIFO scheduling
89 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO
,
91 ACE_SCOPE_PROCESS
)) != 0)
93 if (ACE_OS::last_error () == EPERM
)
95 "%s: user is not superuser, "
96 "so remain in time-sharing class\n", argv
[0]));
99 "%s: ACE_OS::sched_params failed\n", argv
[0]));
102 if (ACE_OS::thr_setprio (min_priority
) == -1)
104 ACE_ERROR ((LM_ERROR
, "(%P|%t) main thr_setprio failed,"
105 "no real-time features\n"));
108 CORBA::Object_var naming_obj
=
109 orb
->resolve_initial_references ("NameService");
110 if (CORBA::is_nil (naming_obj
.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR
,
112 " (%P|%t) Unable to get the Naming Service.\n"),
115 CosNaming::NamingContext_var naming_context
=
116 CosNaming::NamingContext::_narrow (naming_obj
.in ());
118 CosNaming::Name
name (1);
120 name
[0].id
= CORBA::string_dup ("EventService");
122 CORBA::Object_var ec_obj
=
123 naming_context
->resolve (name
);
125 RtecEventChannelAdmin::EventChannel_var channel
;
126 if (CORBA::is_nil (ec_obj
.in ()))
127 channel
= RtecEventChannelAdmin::EventChannel::_nil ();
129 channel
= RtecEventChannelAdmin::EventChannel::_narrow (ec_obj
.in ());
131 poa_manager
->activate ();
133 this->connect_suppliers (channel
.in ());
135 ACE_DEBUG ((LM_DEBUG
, "connected supplier(s)\n"));
137 this->activate_suppliers ();
139 ACE_DEBUG ((LM_DEBUG
, "suppliers are active\n"));
141 // Wait for the supplier threads...
142 if (ACE_Thread_Manager::instance ()->wait () == -1)
144 ACE_ERROR ((LM_ERROR
, "Thread_Manager wait failed\n"));
148 ACE_DEBUG ((LM_DEBUG
, "suppliers finished\n"));
150 this->disconnect_suppliers ();
152 // @@ Deactivate the suppliers (as CORBA Objects?)
154 catch (const CORBA::SystemException
& sys_ex
)
156 sys_ex
._tao_print_exception ("SYS_EX in Supplier");
158 catch (const CORBA::Exception
& ex
)
160 ex
._tao_print_exception ("NON SYS EX in Supplier");
166 ECMS_Driver::supplier_task (Test_Supplier
*supplier
,
171 ACE_Time_Value
tv (0, this->event_period_
);
173 CORBA::ULong n
= this->event_size_
;
175 ECM_IDLData::Info info
;
176 info
.mobile_name
= CORBA::string_dup ("test");
177 info
.mobile_speed
= 1;
178 info
.trajectory
.length (n
);
181 other
.description
= CORBA::string_dup ("some data");
183 for (CORBA::ULong j
= 0; j
< n
; ++j
)
185 info
.trajectory
[j
].x
= j
;
186 info
.trajectory
[j
].y
= j
* j
;
187 other
.inventory
.bind (j
, j
+ 1);
190 ACE_DEBUG ((LM_DEBUG
,
191 "The inventory contains (%d) elements\n",
192 other
.inventory
.current_size ()));
194 // We have to make it big enough so we get a contiguous block,
195 // otherwise the octet sequence will not work correctly.
196 // NOTE: we could pre-allocate enough memory in the CDR stream
197 // but we want to show that chaining works!
200 CORBA::Boolean byte_order
= TAO_ENCAP_BYTE_ORDER
;
201 cdr
<< CORBA::Any::from_boolean (byte_order
);
203 // The typecode name standard, the encode method is not (in
204 // general the CDR interface is not specified).
206 throw CORBA::MARSHAL ();
208 // Here we marshall a non-IDL type.
211 if (!cdr
.good_bit ())
212 ACE_ERROR ((LM_ERROR
, "Problem marshalling C++ data\n"));
214 const ACE_Message_Block
* mb
= cdr
.begin ();
215 // NOTE: total_length () return the length of the complete
217 CORBA::ULong mblen
= cdr
.total_length ();
219 for (CORBA::Long i
= 0; i
< this->event_count_
; ++i
)
221 RtecEventComm::EventSet
event (1);
223 event
[0].header
.source
= supplier
->supplier_id ();
224 event
[0].header
.ttl
= 1;
226 ACE_hrtime_t t
= ACE_OS::gethrtime ();
227 ORBSVCS_Time::hrtime_to_TimeT (event
[0].header
.creation_time
, t
);
229 if (i
== static_cast<CORBA::Long
> (this->event_count_
) - 1)
230 event
[0].header
.type
= ACE_ES_EVENT_SHUTDOWN
;
232 event
[0].header
.type
= this->event_a_
;
234 event
[0].header
.type
= this->event_b_
;
236 // We use replace to minimize the copies, this should result
237 // in just one memory allocation;
238 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
239 event
[0].data
.payload
.replace (mblen
, mb
);
241 // If the replace method is not available, we will need
242 // to do the copy manually. First, set the octet sequence length.
243 event
[0].data
.payload
.length (mblen
);
245 // Now copy over each byte.
246 char* base
= mb
->data_block ()->base ();
247 for(CORBA::ULong i
= 0; i
< mblen
; i
++)
249 event
[0].data
.payload
[i
] = base
[i
];
251 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
253 supplier
->consumer_proxy ()->push(event
);
255 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
260 catch (const CORBA::SystemException
& sys_ex
)
262 sys_ex
._tao_print_exception ("SYS_EX");
264 catch (const CORBA::Exception
& ex
)
266 ex
._tao_print_exception ("NON SYS EX");
272 ECMS_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr channel
)
274 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
277 ACE_OS::sprintf (buf
, "supplier_%02d", i
);
279 ACE_NEW (this->suppliers_
[i
], Test_Supplier (this));
281 this->suppliers_
[i
]->connect (buf
,
289 ECMS_Driver::activate_suppliers ()
291 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
293 this->suppliers_
[i
]->activate ();
298 ECMS_Driver::disconnect_suppliers ()
300 for (int i
= 0; i
< this->n_suppliers_
; ++i
)
302 this->suppliers_
[i
]->disconnect ();
307 ECMS_Driver::parse_args (int argc
, ACE_TCHAR
*argv
[])
309 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT("ds:n:t:h:p:b:"));
312 while ((opt
= get_opt ()) != EOF
)
317 this->n_suppliers_
= ACE_OS::atoi (get_opt
.opt_arg ());
321 this->event_count_
= ACE_OS::atoi (get_opt
.opt_arg ());
325 this->event_period_
= ACE_OS::atoi (get_opt
.opt_arg ());
329 this->event_size_
= ACE_OS::atoi (get_opt
.opt_arg ());
335 char* arg
= ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt
.opt_arg ()), ",", &aux
);
337 this->event_a_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
338 arg
= ACE_OS::strtok_r (0, ",", &aux
);
339 this->event_b_
= ACE_ES_EVENT_UNDEFINED
+ ACE_OS::atoi (arg
);
344 this->pid_file_name_
= get_opt
.opt_arg ();
349 ACE_DEBUG ((LM_DEBUG
,
354 "-t <event period (usecs)> "
355 "-h <eventa,eventb> "
356 "-p <pid file name> "
363 if (this->event_count_
<= 0)
365 ACE_DEBUG ((LM_DEBUG
,
366 "%s: event count (%d) is out of range, "
367 "reset to default (%d)\n",
368 argv
[0], this->event_count_
,
370 this->event_count_
= 100;
373 if (this->event_size_
< 0)
375 ACE_DEBUG ((LM_DEBUG
,
376 "%s: event size (%d) is out of range, "
377 "reset to default (%d)\n",
378 argv
[0], this->event_size_
,
380 this->event_count_
= 32;
383 if (this->n_suppliers_
<= 0)
385 this->n_suppliers_
= 1;
386 ACE_ERROR_RETURN ((LM_ERROR
,
387 "%s: number of suppliers out of range, "
388 "reset to default (%d)\n",
395 Test_Supplier::Test_Supplier (ECMS_Driver
*driver
)
402 Test_Supplier::connect (const char* name
,
405 RtecEventChannelAdmin::EventChannel_ptr ec
)
407 this->supplier_id_
= ACE::crc32 (name
);
408 ACE_DEBUG ((LM_DEBUG
,
409 "ID for <%s> is %04.4x\n",
411 this->supplier_id_
));
413 ACE_SupplierQOS_Factory qos
;
414 qos
.insert (this->supplier_id_
,
417 qos
.insert (this->supplier_id_
,
420 qos
.insert (this->supplier_id_
,
421 ACE_ES_EVENT_SHUTDOWN
,
424 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin
=
425 ec
->for_suppliers ();
427 this->consumer_proxy_
=
428 supplier_admin
->obtain_push_consumer ();
430 RtecEventComm::PushSupplier_var objref
=
431 this->supplier_
._this ();
433 this->consumer_proxy_
->connect_push_supplier (objref
.in (),
434 qos
.get_SupplierQOS ());
438 Test_Supplier::disconnect ()
440 if (CORBA::is_nil (this->consumer_proxy_
.in ()))
443 RtecEventChannelAdmin::ProxyPushConsumer_var proxy
=
444 this->consumer_proxy_
._retn ();
448 proxy
->disconnect_push_consumer ();
450 catch (const CORBA::OBJECT_NOT_EXIST
&)
452 // Ignore, the EC can shutdown before we get a chance to
455 catch (const CORBA::TRANSIENT
&)
457 // Ignore, the EC can shutdown before we get a chance to
460 catch (const CORBA::Exception
&)
467 Test_Supplier::svc ()
469 return this->driver_
->supplier_task (this, this->cookie_
);
473 Test_Supplier::disconnect_push_supplier ()
475 this->consumer_proxy_
=
476 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
479 int Test_Supplier::supplier_id () const
481 return this->supplier_id_
;
484 RtecEventChannelAdmin::ProxyPushConsumer_ptr
485 Test_Supplier::consumer_proxy ()
487 return this->consumer_proxy_
.in ();
491 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
494 return driver
.run (argc
, argv
);