Merge pull request #2303 from jwillemsen/jwi-803
[ACE_TAO.git] / TAO / orbsvcs / tests / EC_Custom_Marshal / ECM_Consumer.cpp
blob9ce7e5ae6bfc4521384c9d3f45e5a233186b217b
1 #include "ECM_Consumer.h"
2 #include "ECM_Data.h"
4 #include "orbsvcs/Event_Utilities.h"
5 #include "orbsvcs/Event_Service_Constants.h"
6 #include "orbsvcs/Time_Utilities.h"
7 #include "orbsvcs/CosNamingC.h"
9 #include "tao/Timeprobe.h"
10 #include "tao/ORB_Core.h"
11 #include "tao/CDR.h"
13 #include "ace/Get_Opt.h"
14 #include <memory>
15 #include "ace/Sched_Params.h"
16 #include "ace/OS_NS_errno.h"
17 #include "ace/OS_NS_unistd.h"
19 int
20 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
22 Driver driver;
23 return driver.run (argc, argv);
26 // ****************************************************************
28 Driver::Driver ()
29 : n_consumers_ (1),
30 event_count_ (100),
31 event_a_ (ACE_ES_EVENT_UNDEFINED),
32 event_b_ (ACE_ES_EVENT_UNDEFINED + 1),
33 pid_file_name_ (0),
34 recv_count_ (0)
38 // ****************************************************************
40 int
41 Driver::run (int argc, ACE_TCHAR* argv[])
43 try
45 CORBA::ORB_var orb =
46 CORBA::ORB_init (argc, argv);
48 CORBA::Object_var poa_object =
49 orb->resolve_initial_references("RootPOA");
51 if (CORBA::is_nil (poa_object.in ()))
52 ACE_ERROR_RETURN ((LM_ERROR,
53 " (%P|%t) Unable to initialize the POA.\n"),
54 1);
56 PortableServer::POA_var root_poa =
57 PortableServer::POA::_narrow (poa_object.in ());
59 PortableServer::POAManager_var poa_manager =
60 root_poa->the_POAManager ();
62 if (this->parse_args (argc, argv))
63 return 1;
65 ACE_DEBUG ((LM_DEBUG,
66 "Execution parameters:\n"
67 " consumers = <%d>\n"
68 " event count = <%d>\n"
69 " supplier Event A = <%d>\n"
70 " supplier Event B = <%d>\n"
71 " pid file name = <%s>\n",
73 this->n_consumers_,
74 this->event_count_,
75 this->event_a_,
76 this->event_b_,
78 this->pid_file_name_?this->pid_file_name_:ACE_TEXT("nil")));
80 if (this->pid_file_name_ != 0)
82 FILE* pid = ACE_OS::fopen (this->pid_file_name_, "w");
83 if (pid != 0)
85 ACE_OS::fprintf (pid, "%ld\n",
86 static_cast<long> (ACE_OS::getpid ()));
87 ACE_OS::fclose (pid);
91 int min_priority =
92 ACE_Sched_Params::priority_min (ACE_SCHED_FIFO);
93 // Enable FIFO scheduling
95 if (ACE_OS::sched_params (ACE_Sched_Params (ACE_SCHED_FIFO,
96 min_priority,
97 ACE_SCOPE_PROCESS)) != 0)
99 if (ACE_OS::last_error () == EPERM)
100 ACE_DEBUG ((LM_DEBUG,
101 "%s: user is not superuser, "
102 "so remain in time-sharing class\n", argv[0]));
103 else
104 ACE_ERROR ((LM_ERROR,
105 "%s: ACE_OS::sched_params failed\n", argv[0]));
108 if (ACE_OS::thr_setprio (min_priority) == -1)
110 ACE_ERROR ((LM_ERROR, "(%P|%t) main thr_setprio failed,"
111 "no real-time features\n"));
114 CORBA::Object_var naming_obj =
115 orb->resolve_initial_references ("NameService");
117 if (CORBA::is_nil (naming_obj.in ()))
118 ACE_ERROR_RETURN ((LM_ERROR,
119 " (%P|%t) Unable to get the Naming Service.\n"),
122 CosNaming::NamingContext_var naming_context =
123 CosNaming::NamingContext::_narrow (naming_obj.in ());
125 CosNaming::Name name (1);
126 name.length (1);
127 name[0].id = CORBA::string_dup ("EventService");
129 CORBA::Object_var ec_obj =
130 naming_context->resolve (name);
132 RtecEventChannelAdmin::EventChannel_var channel;
133 if (CORBA::is_nil (ec_obj.in ()))
134 channel = RtecEventChannelAdmin::EventChannel::_nil ();
135 else
136 channel = RtecEventChannelAdmin::EventChannel::_narrow (ec_obj.in ());
138 poa_manager->activate ();
140 this->connect_consumers (channel.in ());
142 ACE_DEBUG ((LM_DEBUG, "connected consumer(s)\n"));
144 ACE_DEBUG ((LM_DEBUG, "running the test\n"));
145 orb->run ();
147 ACE_DEBUG ((LM_DEBUG, "event loop finished\n"));
149 this->disconnect_consumers ();
151 channel->destroy ();
153 catch (const CORBA::SystemException& sys_ex)
155 sys_ex._tao_print_exception ("SYS_EX in Consumer");
157 catch (const CORBA::Exception& ex)
159 ex._tao_print_exception ("NON SYS EX in Consumer");
161 return 0;
164 void
165 Driver::push_consumer (void* /* consumer_cookie */,
166 ACE_hrtime_t /* arrival */,
167 const RtecEventComm::EventSet& events)
169 // int ID =
170 // (reinterpret_cast<Test_Consumer**> (consumer_cookie)
171 // - this->consumers_);
173 // ACE_DEBUG ((LM_DEBUG, "(%t) events received by consumer %d\n", ID));
175 if (events.length () == 0)
177 // ACE_DEBUG ((LM_DEBUG, "no events\n"));
178 return;
181 ACE_GUARD (TAO_SYNCH_MUTEX, ace_mon, this->recv_count_mutex_);
183 this->recv_count_ += events.length ();
185 int x = this->event_count_ / 10;
186 if (this->recv_count_ % x == 0)
188 ACE_DEBUG ((LM_DEBUG,
189 "ECM_Consumer (%P|%t): %d events received\n",
190 this->recv_count_));
193 if (this->recv_count_ >= this->event_count_)
195 TAO_ORB_Core_instance ()->orb ()->shutdown ();
198 // ACE_DEBUG ((LM_DEBUG, "%d event(s)\n", events.length ()));
200 #if (TAO_NO_COPY_OCTET_SEQUENCES == 1)
201 for (u_int i = 0; i < events.length (); ++i)
203 const RtecEventComm::Event& e = events[i];
205 if (e.data.payload.mb () == 0)
207 ACE_DEBUG ((LM_DEBUG, "No data in event[%d]\n", i));
208 continue;
211 // @@ TODO this is a little messy, infortunately we have to
212 // extract the first byte to determine the byte order, the CDR
213 // cannot do it for us because in certain cases the byte order
214 // is not in the encapsulation. Maybe we need another
215 // constructor for the InputCDR streams (but there are too many
216 // already!)?
218 // Note that there is no copying
219 int byte_order = e.data.payload[0];
221 ACE_Message_Block* mb =
222 ACE_Message_Block::duplicate (e.data.payload.mb ());
223 mb->rd_ptr (1); // skip the byte order
225 TAO_InputCDR cdr (mb, byte_order);
227 ECM_IDLData::Info info;
228 cdr >> info;
230 ECM_Data other;
231 cdr >> other;
233 if (!cdr.good_bit ())
234 ACE_ERROR ((LM_ERROR, "Problem demarshalling C++ data\n"));
236 ACE_Message_Block::release (mb);
238 CORBA::ULong n = info.trajectory.length ();
239 // ACE_DEBUG ((LM_DEBUG, "Payload contains <%d> elements\n", n));
240 // ACE_DEBUG ((LM_DEBUG, "Inventory <%s> contains <%d> elements\n",
241 // other.description.in (),
242 // other.inventory.current_size ()));
244 for (CORBA::ULong j = 0; j < n; ++j)
246 ECM_IDLData::Point& p = info.trajectory[j];
247 if (static_cast<CORBA::ULong>(p.x) != j ||
248 static_cast<CORBA::ULong>(p.y) != j*j)
250 ACE_DEBUG ((LM_DEBUG,
251 "invalid data in trajectory[%d] = (%f,%f)\n",
252 j, p.x, p.y));
256 #endif /* TAO_NO_COPY_OCTET_SEQUENCES == 1 */
259 void
260 Driver::connect_consumers (RtecEventChannelAdmin::EventChannel_ptr channel)
262 for (int i = 0; i < this->n_consumers_; ++i)
264 char buf[BUFSIZ];
265 ACE_OS::sprintf (buf, "consumer_%02d", i);
267 ACE_NEW (this->consumers_[i],
268 Test_Consumer (this, this->consumers_ + i));
270 this->consumers_[i]->connect (this->event_a_,
271 this->event_b_,
272 channel);
276 void
277 Driver::disconnect_consumers ()
279 for (int i = 0; i < this->n_consumers_; ++i)
281 this->consumers_[i]->disconnect ();
286 Driver::parse_args (int argc, ACE_TCHAR *argv [])
288 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT("dc:n:h:p:"));
289 int opt;
291 while ((opt = get_opt ()) != EOF)
293 switch (opt)
295 case 'c':
296 this->n_consumers_ = ACE_OS::atoi (get_opt.opt_arg ());
297 break;
299 case 'n':
300 this->event_count_ = ACE_OS::atoi (get_opt.opt_arg ());
301 break;
303 case 'h':
305 char* aux;
306 char* arg = ACE_OS::strtok_r (ACE_TEXT_ALWAYS_CHAR(get_opt.opt_arg ()), ",", &aux);
308 this->event_a_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
309 arg = ACE_OS::strtok_r (0, ",", &aux);
310 this->event_b_ = ACE_ES_EVENT_UNDEFINED + ACE_OS::atoi (arg);
312 break;
314 case 'p':
315 this->pid_file_name_ = get_opt.opt_arg ();
316 break;
318 case '?':
319 default:
320 ACE_DEBUG ((LM_DEBUG,
321 "Usage: %s "
322 "[ORB options] "
323 "-s <global|local> "
324 "-a (send data in events) "
325 "-h <args> "
326 "-p <pid file name> "
327 "\n",
328 argv[0]));
329 return -1;
333 if (this->event_count_ <= 0)
335 ACE_DEBUG ((LM_DEBUG,
336 "%s: event count (%d) is out of range, "
337 "reset to default (%d)\n",
338 argv[0], this->event_count_,
339 100));
340 this->event_count_ = 100;
343 if (this->n_consumers_ <= 0)
345 ACE_ERROR_RETURN ((LM_ERROR,
346 "%s: number of consumers or "
347 "suppliers out of range\n", argv[0]), -1);
350 return 0;
353 // ****************************************************************
355 Test_Consumer::Test_Consumer (Driver *driver, void *cookie)
356 : driver_ (driver),
357 cookie_ (cookie)
361 void
362 Test_Consumer::connect (int event_a,
363 int event_b,
364 RtecEventChannelAdmin::EventChannel_ptr ec)
366 ACE_ConsumerQOS_Factory qos;
367 qos.start_disjunction_group ();
368 qos.insert_type (ACE_ES_EVENT_SHUTDOWN, 0);
369 qos.insert_type (event_a, 0);
370 qos.insert_type (event_b, 0);
372 // = Connect as a consumer.
373 RtecEventChannelAdmin::ConsumerAdmin_var consumer_admin =
374 ec->for_consumers ();
376 this->supplier_proxy_ =
377 consumer_admin->obtain_push_supplier ();
379 RtecEventComm::PushConsumer_var objref = this->_this ();
381 this->supplier_proxy_->connect_push_consumer (objref.in (),
382 qos.get_ConsumerQOS ());
385 void
386 Test_Consumer::disconnect ()
388 if (CORBA::is_nil (this->supplier_proxy_.in ()))
389 return;
391 RtecEventChannelAdmin::ProxyPushSupplier_var proxy =
392 this->supplier_proxy_._retn ();
394 proxy->disconnect_push_supplier ();
397 void
398 Test_Consumer::push (const RtecEventComm::EventSet& events)
400 ACE_hrtime_t arrival = ACE_OS::gethrtime ();
401 this->driver_->push_consumer (this->cookie_, arrival, events);
404 void
405 Test_Consumer::disconnect_push_consumer ()