Merge pull request #2216 from jwillemsen/jwi-cxxversionchecks
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Custom_Marshal / ECM_Supplier.cpp
blob775d1e0d34b1c2782f514d7dea35882326ec13f4
1 #include "ace/Get_Opt.h"
2 #include <memory>
3 #include "ace/Sched_Params.h"
4 #include "ace/ACE.h"
5 #include "ace/OS_NS_unistd.h"
7 #include "tao/Timeprobe.h"
8 #include "tao/CDR.h"
9 #include "orbsvcs/Event_Utilities.h"
10 #include "orbsvcs/Event_Service_Constants.h"
11 #include "orbsvcs/Time_Utilities.h"
12 #include "orbsvcs/CosNamingC.h"
13 #include "ECM_Supplier.h"
14 #include "ECM_Data.h"
15 #include "ace/OS_NS_errno.h"
18 ECMS_Driver::ECMS_Driver ()
19 : n_suppliers_ (1),
20 event_count_ (100),
21 event_period_ (100),
22 event_size_ (32),
23 event_a_ (ACE_ES_EVENT_UNDEFINED),
24 event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
25 pid_file_name_ (0)
29 // ****************************************************************
31 int
32 ECMS_Driver::run (int argc, ACE_TCHAR* argv[])
34 try
36 CORBA::ORB_var orb =
37 CORBA::ORB_init (argc, argv);
39 CORBA::Object_var poa_object =
40 orb->resolve_initial_references("RootPOA");
41 if (CORBA::is_nil (poa_object.in ()))
42 ACE_ERROR_RETURN ((LM_ERROR,
43 " (%P|%t) Unable to initialize the POA.\n"),
44 1);
46 PortableServer::POA_var root_poa =
47 PortableServer::POA::_narrow (poa_object.in ());
49 PortableServer::POAManager_var poa_manager =
50 root_poa->the_POAManager ();
52 if (this->parse_args (argc, argv))
53 return 1;
55 ACE_DEBUG ((LM_DEBUG,
56 "Execution parameters:\n"
57 " suppliers = <%d>\n"
58 " event count = <%d>\n"
59 " event period = <%d>\n"
60 " event size = <%d>\n"
61 " supplier Event A = <%d>\n"
62 " supplier Event B = <%d>\n"
63 " pid file name = <%s>\n",
65 this->n_suppliers_,
66 this->event_count_,
67 this->event_period_,
68 this->event_size_,
69 this->event_a_,
70 this->event_b_,
72 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")));
74 if (this->pid_file_name_ != 0)
76 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
77 if (pid != 0)
79 ACE_OS::fprintf (pid, "%ld\n",
80 static_cast<long> (ACE_OS::getpid ()));
81 ACE_OS::fclose (pid);
85 int min_priority =
86 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
87 // Enable FIFO scheduling
89 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
90 min_priority,
91 ACE_SCOPE_PROCESS)) != 0)
93 if (ACE_OS::last_error () == EPERM)
94 ACE_DEBUG ((LM_DEBUG,
95 "%s: user is not superuser, "
96 "so remain in time-sharing class\n", argv[0]));
97 else
98 ACE_ERROR ((LM_ERROR,
99 "%s: ACE_OS::sched_params failed\n", argv[0]));
102 if (ACE_OS::thr_setprio (min_priority) == -1)
104 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
105 "no real-time features\n"));
108 CORBA::Object_var naming_obj =
109 orb->resolve_initial_references ("NameService");
110 if (CORBA::is_nil (naming_obj.in ()))
111 ACE_ERROR_RETURN ((LM_ERROR,
112 " (%P|%t) Unable to get the Naming Service.\n"),
115 CosNaming::NamingContext_var naming_context =
116 CosNaming::NamingContext::_narrow (naming_obj.in ());
118 CosNaming::Name name (1);
119 name.length (1);
120 name[0].id = CORBA::string_dup ("EventService");
122 CORBA::Object_var ec_obj =
123 naming_context->resolve (name);
125 RtecEventChannelAdmin::EventChannel_var channel;
126 if (CORBA::is_nil (ec_obj.in ()))
127 channel = RtecEventChannelAdmin::EventChannel::_nil ();
128 else
129 channel = RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in ());
131 poa_manager->activate ();
133 this->connect_suppliers (channel.in ());
135 ACE_DEBUG ((LM_DEBUG, "connected supplier(s)\n"));
137 this->activate_suppliers ();
139 ACE_DEBUG ((LM_DEBUG, "suppliers are active\n"));
141 // Wait for the supplier threads...
142 if (ACE_Thread_Manager::instance ()->wait () == -1)
144 ACE_ERROR ((LM_ERROR, "Thread_Manager wait failed\n"));
145 return 1;
148 ACE_DEBUG ((LM_DEBUG, "suppliers finished\n"));
150 this->disconnect_suppliers ();
152 // @@ Deactivate the suppliers (as CORBA Objects?)
154 catch (const CORBA::SystemException& sys_ex)
156 sys_ex._tao_print_exception ("SYS_EX in Supplier");
158 catch (const CORBA::Exception& ex)
160 ex._tao_print_exception ("NON SYS EX in Supplier");
162 return 0;
166 ECMS_Driver::supplier_task (Test_Supplier *supplier,
167 void* /* cookie */)
171 ACE_Time_Value tv (0, this->event_period_);
173 CORBA::ULong n = this->event_size_;
175 ECM_IDLData::Info info;
176 info.mobile_name = CORBA::string_dup ("test");
177 info.mobile_speed = 1;
178 info.trajectory.length (n);
180 ECM_Data other;
181 other.description = CORBA::string_dup ("some data");
183 for (CORBA::ULong j = 0; j < n; ++j)
185 info.trajectory[j].x = j;
186 info.trajectory[j].y = j * j;
187 other.inventory.bind (j, j + 1);
190 ACE_DEBUG ((LM_DEBUG,
191 "The inventory contains (%d) elements\n",
192 other.inventory.current_size ()));
194 // We have to make it big enough so we get a contiguous block,
195 // otherwise the octet sequence will not work correctly.
196 // NOTE: we could pre-allocate enough memory in the CDR stream
197 // but we want to show that chaining works!
198 TAO_OutputCDR cdr;
200 CORBA::Boolean byte_order = TAO_ENCAP_BYTE_ORDER;
201 cdr << CORBA::Any::from_boolean (byte_order);
203 // The typecode name standard, the encode method is not (in
204 // general the CDR interface is not specified).
205 if (!(cdr << info))
206 throw CORBA::MARSHAL ();
208 // Here we marshall a non-IDL type.
209 cdr << other;
211 if (!cdr.good_bit ())
212 ACE_ERROR ((LM_ERROR, "Problem marshalling C++ data\n"));
214 const ACE_Message_Block* mb = cdr.begin ();
215 // NOTE: total_length () return the length of the complete
216 // chain.
217 CORBA::ULong mblen = cdr.total_length ();
219 for (CORBA::Long i = 0; i < this->event_count_; ++i)
221 RtecEventComm::EventSet event (1);
222 event.length (1);
223 event[0].header.source = supplier->supplier_id ();
224 event[0].header.ttl = 1;
226 ACE_hrtime_t t = ACE_OS::gethrtime ();
227 ORBSVCS_Time::hrtime_to_TimeT (event[0].header.creation_time, t);
229 if (i == static_cast<CORBA::Long> (this->event_count_) - 1)
230 event[0].header.type = ACE_ES_EVENT_SHUTDOWN;
231 else if (i % 2 == 0)
232 event[0].header.type = this->event_a_;
233 else
234 event[0].header.type = this->event_b_;
236 // We use replace to minimize the copies, this should result
237 // in just one memory allocation;
238 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
239 event[0].data.payload.replace (mblen, mb);
240 #else
241 // If the replace method is not available, we will need
242 // to do the copy manually. First, set the octet sequence length.
243 event[0].data.payload.length (mblen);
245 // Now copy over each byte.
246 char* base = mb->data_block ()->base ();
247 for(CORBA::ULong i = 0; i < mblen; i++)
249 event[0].data.payload[i] = base[i];
251 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
253 supplier->consumer_proxy ()->push(event);
255 // ACE_DEBUG ((LM_DEBUG, "(%t) supplier push event\n"));
257 ACE_OS::sleep (tv);
260 catch (const CORBA::SystemException& sys_ex)
262 sys_ex._tao_print_exception ("SYS_EX");
264 catch (const CORBA::Exception& ex)
266 ex._tao_print_exception ("NON SYS EX");
268 return 0;
271 void
272 ECMS_Driver::connect_suppliers (RtecEventChannelAdmin::EventChannel_ptr channel)
274 for (int i = 0; i < this->n_suppliers_; ++i)
276 char buf[BUFSIZ];
277 ACE_OS::sprintf (buf, "supplier_%02d", i);
279 ACE_NEW (this->suppliers_[i], Test_Supplier (this));
281 this->suppliers_[i]->connect (buf,
282 this->event_a_,
283 this->event_b_,
284 channel);
288 void
289 ECMS_Driver::activate_suppliers ()
291 for (int i = 0; i < this->n_suppliers_; ++i)
293 this->suppliers_[i]->activate ();
297 void
298 ECMS_Driver::disconnect_suppliers ()
300 for (int i = 0; i < this->n_suppliers_; ++i)
302 this->suppliers_[i]->disconnect ();
307 ECMS_Driver::parse_args (int argc, ACE_TCHAR *argv [])
309 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("ds:n:t:h:p:b:"));
310 int opt;
312 while ((opt = get_opt ()) != EOF)
314 switch (opt)
316 case 's':
317 this->n_suppliers_ = ACE_OS::atoi (get_opt.opt_arg ());
318 break;
320 case 'n':
321 this->event_count_ = ACE_OS::atoi (get_opt.opt_arg ());
322 break;
324 case 't':
325 this->event_period_ = ACE_OS::atoi (get_opt.opt_arg ());
326 break;
328 case 'b':
329 this->event_size_ = ACE_OS::atoi (get_opt.opt_arg ());
330 break;
332 case 'h':
334 char* aux;
335 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
337 this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
338 arg = ACE_OS::strtok_r (0, ",", &aux);
339 this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
341 break;
343 case 'p':
344 this->pid_file_name_ = get_opt.opt_arg ();
345 break;
347 case '?':
348 default:
349 ACE_DEBUG ((LM_DEBUG,
350 "Usage: %s "
351 "[ORB options] "
352 "-s <nsuppliers> "
353 "-n <event count> "
354 "-t <event period (usecs)> "
355 "-h <eventa,eventb> "
356 "-p <pid file name> "
357 "\n",
358 argv[0]));
359 return -1;
363 if (this->event_count_ <= 0)
365 ACE_DEBUG ((LM_DEBUG,
366 "%s: event count (%d) is out of range, "
367 "reset to default (%d)\n",
368 argv[0], this->event_count_,
369 100));
370 this->event_count_ = 100;
373 if (this->event_size_ < 0)
375 ACE_DEBUG ((LM_DEBUG,
376 "%s: event size (%d) is out of range, "
377 "reset to default (%d)\n",
378 argv[0], this->event_size_,
379 32));
380 this->event_count_ = 32;
383 if (this->n_suppliers_ <= 0)
385 this->n_suppliers_ = 1;
386 ACE_ERROR_RETURN ((LM_ERROR,
387 "%s: number of suppliers out of range, "
388 "reset to default (%d)\n",
389 argv[0], 1), -1);
392 return 0;
395 Test_Supplier::Test_Supplier (ECMS_Driver *driver)
396 : driver_ (driver),
397 supplier_ (this)
401 void
402 Test_Supplier::connect (const char* name,
403 int event_a,
404 int event_b,
405 RtecEventChannelAdmin::EventChannel_ptr ec)
407 this->supplier_id_ = ACE::crc32 (name);
408 ACE_DEBUG ((LM_DEBUG,
409 "ID for <%s> is %04.4x\n",
410 name,
411 this->supplier_id_));
413 ACE_SupplierQOS_Factory qos;
414 qos.insert (this->supplier_id_,
415 event_a,
416 0, 1);
417 qos.insert (this->supplier_id_,
418 event_b,
419 0, 1);
420 qos.insert (this->supplier_id_,
421 ACE_ES_EVENT_SHUTDOWN,
422 0, 1);
424 RtecEventChannelAdmin::SupplierAdmin_var supplier_admin =
425 ec->for_suppliers ();
427 this->consumer_proxy_ =
428 supplier_admin->obtain_push_consumer ();
430 RtecEventComm::PushSupplier_var objref =
431 this->supplier_._this ();
433 this->consumer_proxy_->connect_push_supplier (objref.in (),
434 qos.get_SupplierQOS ());
437 void
438 Test_Supplier::disconnect ()
440 if (CORBA::is_nil (this->consumer_proxy_.in ()))
441 return;
443 RtecEventChannelAdmin::ProxyPushConsumer_var proxy =
444 this->consumer_proxy_._retn ();
448 proxy->disconnect_push_consumer ();
450 catch (const CORBA::OBJECT_NOT_EXIST&)
452 // Ignore, the EC can shutdown before we get a chance to
453 // disconnect
455 catch (const CORBA::TRANSIENT&)
457 // Ignore, the EC can shutdown before we get a chance to
458 // disconnect
460 catch (const CORBA::Exception&)
462 throw;
467 Test_Supplier::svc ()
469 return this->driver_->supplier_task (this, this->cookie_);
472 void
473 Test_Supplier::disconnect_push_supplier ()
475 this->consumer_proxy_ =
476 RtecEventChannelAdmin::ProxyPushConsumer::_nil ();
479 int Test_Supplier::supplier_id () const
481 return this->supplier_id_;
484 RtecEventChannelAdmin::ProxyPushConsumer_ptr
485 Test_Supplier::consumer_proxy ()
487 return this->consumer_proxy_.in ();
491 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
493 ECMS_Driver driver;
494 return driver.run (argc, argv);