Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Custom_Marshal / ECM_Supplier.cpp
blob9c0adf80b8c1752897a80e007816de6f5d285395
1 #include "ace/Get_Opt.h"
2 #include "ace/Auto_Ptr.h"
3 #include "ace/Sched_Params.h"
4 #include "ace/ACE.h"
5 #include "ace/OS_NS_unistd.h"
7 #include "tao/Timeprobe.h"
8 #include "tao/CDR.h"
9 #include "orbsvcs/Event_Utilities.h"
10 #include "orbsvcs/Event_Service_Constants.h"
11 #include "orbsvcs/Time_Utilities.h"
12 #include "orbsvcs/CosNamingC.h"
13 #include "ECM_Supplier.h"
14 #include "ECM_Data.h"
15 #include "ace/OS_NS_errno.h"
19 ECMS_Driver::ECMS_Driver (void)
20 : n_suppliers_ (1),
21 event_count_ (100),
22 event_period_ (100),
23 event_size_ (32),
24 event_a_ (ACE_ES_EVENT_UNDEFINED),
25 event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
26 pid_file_name_ (0)
30 // ****************************************************************
32 int
33 ECMS_Driver::run (int argc, ACE_TCHAR* argv[])
35 try
37 CORBA::ORB_var orb =
38 CORBA::ORB_init (argc, argv);
40 CORBA::Object_var poa_object =
41 orb->resolve_initial_references("RootPOA");
42 if (CORBA::is_nil (poa_object.in ()))
43 ACE_ERROR_RETURN ((LM_ERROR,
44 " (%P|%t) Unable to initialize the POA.\n"),
45 1);
47 PortableServer::POA_var root_poa =
48 PortableServer::POA::_narrow (poa_object.in ());
50 PortableServer::POAManager_var poa_manager =
51 root_poa->the_POAManager ();
53 if (this->parse_args (argc, argv))
54 return 1;
56 ACE_DEBUG ((LM_DEBUG,
57 "Execution parameters:\n"
58 " suppliers = <%d>\n"
59 " event count = <%d>\n"
60 " event period = <%d>\n"
61 " event size = <%d>\n"
62 " supplier Event A = <%d>\n"
63 " supplier Event B = <%d>\n"
64 " pid file name = <%s>\n",
66 this->n_suppliers_,
67 this->event_count_,
68 this->event_period_,
69 this->event_size_,
70 this->event_a_,
71 this->event_b_,
73 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")) );
75 if (this->pid_file_name_ != 0)
77 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
78 if (pid != 0)
80 ACE_OS::fprintf (pid, "%ld\n",
81 static_cast<long> (ACE_OS::getpid ()));
82 ACE_OS::fclose (pid);
86 int min_priority =
87 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
88 // Enable FIFO scheduling, e.g., RT scheduling class on Solaris.
90 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
91 min_priority,
92 ACE_SCOPE_PROCESS)) != 0)
94 if (ACE_OS::last_error () == EPERM)
95 ACE_DEBUG ((LM_DEBUG,
96 "%s: user is not superuser, "
97 "so remain in time-sharing class\n", argv[0]));
98 else
99 ACE_ERROR ((LM_ERROR,
100 "%s: ACE_OS::sched_params failed\n", argv[0]));
103 if (ACE_OS::thr_setprio (min_priority) == -1)
105 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
106 "no real-time features\n"));
109 CORBA::Object_var naming_obj =
110 orb->resolve_initial_references ("NameService");
111 if (CORBA::is_nil (naming_obj.in ()))
112 ACE_ERROR_RETURN ((LM_ERROR,
113 " (%P|%t) Unable to get the Naming Service.\n"),
116 CosNaming::NamingContext_var naming_context =
117 CosNaming::NamingContext::_narrow (naming_obj.in ());
119 CosNaming::Name name (1);
120 name.length (1);
121 name[0].id = CORBA::string_dup ("EventService");
123 CORBA::Object_var ec_obj =
124 naming_context->resolve (name);
126 RtecEventChannelAdmin::EventChannel_var channel;
127 if (CORBA::is_nil (ec_obj.in ()))
128 channel = RtecEventChannelAdmin::EventChannel::_nil ();
129 else
130 channel = RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in ());
132 poa_manager->activate ();
134 this->connect_suppliers (channel.in ());
136 ACE_DEBUG ((LM_DEBUG, "connected supplier(s)\n"));
138 this->activate_suppliers ();
140 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
142 // Wait for the supplier threads...
143 if (ACE_Thread_Manager::instance ()->wait () == -1)
145 ACE_ERROR ((LM_ERROR, "Thread_Manager wait failed\n"));
146 return 1;
149 ACE_DEBUG ((LM_DEBUG, "suppliers finished\n"));
151 this->disconnect_suppliers ();
153 // @@ Deactivate the suppliers (as CORBA Objects?)
155 catch (const CORBA::SystemException& sys_ex)
157 sys_ex._tao_print_exception ("SYS_EX in Supplier");
159 catch (const CORBA::Exception& ex)
161 ex._tao_print_exception ("NON SYS EX in Supplier");
163 return 0;
167 ECMS_Driver::supplier_task (Test_Supplier *supplier,
168 void* /* cookie */)
172 ACE_Time_Value tv (0, this->event_period_);
174 CORBA::ULong n = this->event_size_;
176 ECM_IDLData::Info info;
177 info.mobile_name = CORBA::string_dup ("test");
178 info.mobile_speed = 1;
179 info.trajectory.length (n);
181 ECM_Data other;
182 other.description = CORBA::string_dup ("some data");
184 for (CORBA::ULong j = 0; j < n; ++j)
186 info.trajectory[j].x = j;
187 info.trajectory[j].y = j * j;
188 other.inventory.bind (j, j + 1);
191 ACE_DEBUG ((LM_DEBUG,
192 "The inventory contains (%d) elements\n",
193 other.inventory.current_size ()));
195 // We have to make it big enough so we get a contiguous block,
196 // otherwise the octet sequence will not work correctly.
197 // NOTE: we could pre-allocate enough memory in the CDR stream
198 // but we want to show that chaining works!
199 TAO_OutputCDR cdr;
201 CORBA::Boolean byte_order = TAO_ENCAP_BYTE_ORDER;
202 cdr << CORBA::Any::from_boolean (byte_order);
204 // The typecode name standard, the encode method is not (in
205 // general the CDR interface is not specified).
206 if (!(cdr << info))
207 throw CORBA::MARSHAL ();
209 // Here we marshall a non-IDL type.
210 cdr << other;
212 if (!cdr.good_bit ())
213 ACE_ERROR ((LM_ERROR, "Problem marshalling C++ data\n"));
215 const ACE_Message_Block* mb = cdr.begin ();
216 // NOTE: total_length () return the length of the complete
217 // chain.
218 CORBA::ULong mblen = cdr.total_length ();
220 for (CORBA::Long i = 0; i < this->event_count_; ++i)
222 RtecEventComm::EventSet event (1);
223 event.length (1);
224 event[0].header.source = supplier->supplier_id ();
225 event[0].header.ttl = 1;
227 ACE_hrtime_t t = ACE_OS::gethrtime ();
228 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
230 if (i == static_cast<CORBA::Long> (this->event_count_) - 1)
231 event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
232 else if (i % 2 == 0)
233 event[0].header.type = this->event_a_;
234 else
235 event[0].header.type = this->event_b_;
237 // We use replace to minimize the copies, this should result
238 // in just one memory allocation;
239 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
240 event[0].data.payload.replace (mblen, mb);
241 #else
242 // If the replace method is not available, we will need
243 // to do the copy manually. First, set the octet sequence length.
244 event[0].data.payload.length (mblen);
246 // Now copy over each byte.
247 char* base = mb->data_block ()->base ();
248 for(CORBA::ULong i = 0; i < mblen; i++)
250 event[0].data.payload[i] = base[i];
252 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
254 supplier->consumer_proxy ()->push(event);
256 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
258 ACE_OS::sleep (tv);
261 catch (const CORBA::SystemException& sys_ex)
263 sys_ex._tao_print_exception ("SYS_EX");
265 catch (const CORBA::Exception& ex)
267 ex._tao_print_exception ("NON SYS EX");
269 return 0;
272 void
273 ECMS_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr channel)
275 for (int i = 0; i < this->n_suppliers_; ++i)
277 char buf[BUFSIZ];
278 ACE_OS::sprintf (buf, "supplier_%02d", i);
280 ACE_NEW (this->suppliers_[i], Test_Supplier (this));
282 this->suppliers_[i]->connect (buf,
283 this->event_a_,
284 this->event_b_,
285 channel);
289 void
290 ECMS_Driver::activate_suppliers (void)
292 for (int i = 0; i < this->n_suppliers_; ++i)
294 this->suppliers_[i]->activate ();
298 void
299 ECMS_Driver::disconnect_suppliers (void)
301 for (int i = 0; i < this->n_suppliers_; ++i)
303 this->suppliers_[i]->disconnect ();
308 ECMS_Driver::parse_args (int argc, ACE_TCHAR *argv [])
310 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("ds:n:t:h:p:b:"));
311 int opt;
313 while ((opt = get_opt ()) != EOF)
315 switch (opt)
317 case 's':
318 this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
319 break;
321 case 'n':
322 this->event_count_ = ACE_OS::atoi (get_opt.opt_arg ());
323 break;
325 case 't':
326 this->event_period_ = ACE_OS::atoi (get_opt.opt_arg ());
327 break;
329 case 'b':
330 this->event_size_ = ACE_OS::atoi (get_opt.opt_arg ());
331 break;
333 case 'h':
335 char* aux;
336 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
338 this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
339 arg = ACE_OS::strtok_r (0, ",", &aux);
340 this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
342 break;
344 case 'p':
345 this->pid_file_name_ = get_opt.opt_arg ();
346 break;
348 case '?':
349 default:
350 ACE_DEBUG ((LM_DEBUG,
351 "Usage: %s "
352 "[ORB options] "
353 "-s <nsuppliers> "
354 "-n <event count> "
355 "-t <event period (usecs)> "
356 "-h <eventa,eventb> "
357 "-p <pid file name> "
358 "\n",
359 argv[0]));
360 return -1;
364 if (this->event_count_ <= 0)
366 ACE_DEBUG ((LM_DEBUG,
367 "%s: event count (%d) is out of range, "
368 "reset to default (%d)\n",
369 argv[0], this->event_count_,
370 100));
371 this->event_count_ = 100;
374 if (this->event_size_ < 0)
376 ACE_DEBUG ((LM_DEBUG,
377 "%s: event size (%d) is out of range, "
378 "reset to default (%d)\n",
379 argv[0], this->event_size_,
380 32));
381 this->event_count_ = 32;
384 if (this->n_suppliers_ <= 0)
386 this->n_suppliers_ = 1;
387 ACE_ERROR_RETURN ((LM_ERROR,
388 "%s: number of suppliers out of range, "
389 "reset to default (%d)\n",
390 argv[0], 1), -1);
393 return 0;
396 Test_Supplier::Test_Supplier (ECMS_Driver *driver)
397 : driver_ (driver),
398 supplier_ (this)
402 void
403 Test_Supplier::connect (const char* name,
404 int event_a,
405 int event_b,
406 RtecEventChannelAdmin::EventChannel_ptr ec)
408 this->supplier_id_ = ACE::crc32 (name);
409 ACE_DEBUG ((LM_DEBUG,
410 "ID for <%s> is %04.4x\n",
411 name,
412 this->supplier_id_));
414 ACE_SupplierQOS_Factory qos;
415 qos.insert (this->supplier_id_,
416 event_a,
417 0, 1);
418 qos.insert (this->supplier_id_,
419 event_b,
420 0, 1);
421 qos.insert (this->supplier_id_,
422 ACE_ES_EVENT_SHUTDOWN,
423 0, 1);
425 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
426 ec->for_suppliers ();
428 this->consumer_proxy_ =
429 supplier_admin->obtain_push_consumer ();
431 RtecEventComm::PushSupplier_var objref =
432 this->supplier_._this ();
434 this->consumer_proxy_->connect_push_supplier (objref.in (),
435 qos.get_SupplierQOS ());
438 void
439 Test_Supplier::disconnect (void)
441 if (CORBA::is_nil (this->consumer_proxy_.in ()))
442 return;
444 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
445 this->consumer_proxy_._retn ();
449 proxy->disconnect_push_consumer ();
451 catch (const CORBA::OBJECT_NOT_EXIST&)
453 // Ignore, the EC can shutdown before we get a chance to
454 // disconnect
456 catch (const CORBA::TRANSIENT&)
458 // Ignore, the EC can shutdown before we get a chance to
459 // disconnect
461 catch (const CORBA::Exception&)
463 throw;
468 Test_Supplier::svc ()
470 return this->driver_->supplier_task (this, this->cookie_);
473 void
474 Test_Supplier::disconnect_push_supplier (void)
476 this->consumer_proxy_ =
477 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
480 int Test_Supplier::supplier_id (void) const
482 return this->supplier_id_;
485 RtecEventChannelAdmin::ProxyPushConsumer_ptr
486 Test_Supplier::consumer_proxy (void)
488 return this->consumer_proxy_.in ();
492 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
494 ECMS_Driver driver;
495 return driver.run (argc, argv);