Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / tests / Multicast_Test.cpp
blob524767532ec24c0ac084a9f22c9e784c1b476ec1
1 // ============================================================================
2 //
3 // = LIBRARY
4 // tests
5 //
6 // = DESCRIPTION
7 // This program tests ACE_SOCK_Dgram_Mcast class.
8 // It specifically tests subscribing to multiple groups on the same socket
9 // on one or more physical interfaces (if available).
11 // The test can be run as a producer, consumer, or both producer/consumer
12 // (default). The test requires at least two (2) multicast groups which can
13 // be configured as command line options. The consumer subscribes to a
14 // single group per instance and an additional instance tries to subscribe
15 // to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
16 // bind()'s the first address to the socket, additional joins will fail).
17 // The producer iterates through the list of group addresses and sends a
18 // single message containing the destination address and port to each one.
19 // It also sends messages to five (5) additional groups and a message to an
20 // additional port for each group in order to produce a bit of "noise" in
21 // order to help validate how well the multicast filtering works on a
22 // particular platform.
24 // The list of destination groups start at 239.255.0.1 (default) and
25 // increment by 1 up to 5 (default) groups. Both of these values, as well
26 // as others, can be overridden via command-line options. Use the -?
27 // option to display the usage message...
29 // = AUTHOR
30 // Don Hinton <dhinton@dresystems.com>
32 // ============================================================================
34 #include "test_config.h"
35 #include "ace/Get_Opt.h"
36 #include "ace/Vector_T.h"
37 #include "ace/SOCK_Dgram_Mcast.h"
38 #include "ace/ACE.h"
39 #include "ace/Reactor.h"
40 #include "ace/OS_NS_stdio.h"
41 #include "ace/OS_NS_string.h"
42 #include "ace/OS_NS_strings.h"
43 #include "ace/OS_NS_sys_time.h"
44 #include "ace/Task.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infninite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
55 * SIGINT (CTRL-C).
57 static sig_atomic_t finished = 0;
58 extern "C" void handler (int)
60 finished = 1;
63 static const int MCT_ITERATIONS = 10;
64 static const int MCT_GROUPS = 5;
65 static const int MCT_MIN_GROUPS = 2;
67 static const char MCT_START_GROUP[] = "239.255.0.1";
68 static const int MCT_START_PORT = 16000;
70 static const size_t MAX_STRING_SIZE = 200;
72 int advance_addr (ACE_INET_Addr &addr);
74 // Keep track of errors so we can report them on exit.
75 static sig_atomic_t error = 0;
78 * MCast_Config holds configuration data for this test.
80 class MCT_Config
82 public:
83 enum
85 PRODUCER = 1,
86 CONSUMER = 2,
87 BOTH = PRODUCER | CONSUMER
90 MCT_Config ()
91 : group_start_ (MCT_START_PORT, MCT_START_GROUP),
92 groups_ (0),
93 debug_ (0),
94 role_ (BOTH),
95 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
96 iterations_ (MCT_ITERATIONS),
97 ttl_ (1),
98 wait_ (2)
100 if (IP_MAX_MEMBERSHIPS == 0)
101 this->groups_ = MCT_GROUPS;
102 else
103 this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
105 ~MCT_Config ()
108 //FUZZ: disable check_for_lack_ACE_OS
109 int open (int argc, ACE_TCHAR *argv[]);
110 //FUZZ: enable check_for_lack_ACE_OS
112 int debug () const { return this->debug_;}
113 void dump () const;
114 int groups () const { return this->groups_;}
115 const ACE_INET_Addr group_start () const { return this->group_start_;}
116 u_long role () const { return this->role_;}
117 int iterations () const { return this->iterations_;}
118 int ttl () const { return this->ttl_;}
120 //FUZZ: disable check_for_lack_ACE_OS
121 int wait () const { return this->wait_;}
122 //FUZZ: enable check_for_lack_ACE_OS
124 ACE_SOCK_Dgram_Mcast::options options () const
126 return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
129 private:
130 // Starting group address. (only IPv4 capable right now...)
131 ACE_INET_Addr group_start_;
133 // Number of groups we will try to use in the test.
134 int groups_;
136 // Debug flag.
137 int debug_;
139 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
140 u_long role_;
142 // ACE_SOCK_Dgram_Mcast ctor options
143 u_long sdm_opts_;
145 // Producer iterations
146 int iterations_;
148 // TTL, time to live, for use over routers.
149 int ttl_;
151 // Time to wait on CONSUMER threads to end before killing test.
152 int wait_;
156 MCT_Config::open (int argc, ACE_TCHAR *argv[])
158 int retval = 0;
159 int help = 0;
161 //FUZZ: disable check_for_lack_ACE_OS
162 ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);
163 //FUZZ: enable check_for_lack_ACE_OS
165 if (getopt.long_option (ACE_TEXT ("GroupStart"),
166 'g',
167 ACE_Get_Opt::ARG_REQUIRED) != 0)
168 ACE_ERROR_RETURN ((LM_ERROR,
169 ACE_TEXT (" Unable to add GroupStart option.\n")),
172 if (getopt.long_option (ACE_TEXT ("Groups"),
173 'n',
174 ACE_Get_Opt::ARG_REQUIRED) != 0)
175 ACE_ERROR_RETURN ((LM_ERROR,
176 ACE_TEXT (" Unable to add Groups option.\n")), 1);
178 if (getopt.long_option (ACE_TEXT ("Debug"),
179 'd',
180 ACE_Get_Opt::NO_ARG) != 0)
181 ACE_ERROR_RETURN ((LM_ERROR,
182 ACE_TEXT (" Unable to add Debug option.\n")), 1);
184 if (getopt.long_option (ACE_TEXT ("Role"),
185 'r',
186 ACE_Get_Opt::ARG_REQUIRED) != 0)
187 ACE_ERROR_RETURN ((LM_ERROR,
188 ACE_TEXT (" Unable to add Role option.\n")), 1);
190 if (getopt.long_option (ACE_TEXT ("SDM_options"),
191 'm',
192 ACE_Get_Opt::ARG_REQUIRED) != 0)
193 ACE_ERROR_RETURN ((LM_ERROR,
194 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
197 if (getopt.long_option (ACE_TEXT ("Iterations"),
198 'i',
199 ACE_Get_Opt::ARG_REQUIRED) != 0)
200 ACE_ERROR_RETURN ((LM_ERROR,
201 ACE_TEXT (" Unable to add iterations option.\n")),
204 if (getopt.long_option (ACE_TEXT ("TTL"),
205 't',
206 ACE_Get_Opt::ARG_REQUIRED) != 0)
207 ACE_ERROR_RETURN ((LM_ERROR,
208 ACE_TEXT (" Unable to add TTL option.\n")),
211 if (getopt.long_option (ACE_TEXT ("Wait"),
212 'w',
213 ACE_Get_Opt::ARG_REQUIRED) != 0)
214 ACE_ERROR_RETURN ((LM_ERROR,
215 ACE_TEXT (" Unable to add wait option.\n")),
218 if (getopt.long_option (ACE_TEXT ("help"),
219 'h',
220 ACE_Get_Opt::NO_ARG) != 0)
221 ACE_ERROR_RETURN ((LM_ERROR,
222 ACE_TEXT (" Unable to add help option.\n")),
225 //FUZZ: disable check_for_lack_ACE_OS
226 // Now, let's parse it...
227 int c = 0;
228 while ((c = getopt ()) != EOF)
230 //FUZZ: enable check_for_lack_ACE_OS
231 switch (c)
233 case 0:
234 // Long Option. This should never happen.
235 retval = -1;
236 break;
237 case 'g':
239 // @todo validate all these, i.e., must be within range
240 // 224.255.0.0 to 238.255.255.255, but we only allow the
241 // administrative "site local" range, 239.255.0.0 to
242 // 239.255.255.255.
243 ACE_TCHAR *group = getopt.opt_arg ();
244 if (this->group_start_.set (group) != 0)
246 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
247 group));
250 break;
251 case 'i':
252 this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
253 break;
254 case 'n':
256 int n = ACE_OS::atoi (getopt.opt_arg ());
257 // I'm assuming 0 means unlimited, so just use whatever the
258 // user provides.
259 if (IP_MAX_MEMBERSHIPS == 0)
260 this->groups_ = n;
261 else
262 this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
263 IP_MAX_MEMBERSHIPS);
264 break;
266 case 'd':
267 this->debug_ = 1;
268 break;
269 case 'r':
271 ACE_TCHAR *c = getopt.opt_arg ();
272 if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
273 this->role_ = CONSUMER;
274 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
275 this->role_ = PRODUCER;
276 else
278 help = 1;
279 retval = -1;
282 break;
283 case 'm':
285 //@todo add back OPT_BINDADDR_NO...
286 ACE_TCHAR *c = getopt.opt_arg ();
287 if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
288 ACE_SET_BITS (this->sdm_opts_,
289 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
290 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
291 ACE_CLR_BITS (this->sdm_opts_,
292 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
293 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
295 ACE_CLR_BITS (this->sdm_opts_,
296 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
297 ACE_SET_BITS (this->sdm_opts_,
298 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
300 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
301 ACE_SET_BITS (this->sdm_opts_,
302 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
303 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
304 ACE_CLR_BITS (this->sdm_opts_,
305 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
306 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
308 ACE_CLR_BITS (this->sdm_opts_,
309 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
310 ACE_SET_BITS (this->sdm_opts_,
311 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
313 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
314 this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
315 else
317 help = 1;
318 retval = -1;
321 break;
322 case 't':
323 this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
324 break;
325 case 'w':
326 this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
327 break;
328 case ':':
329 // This means an option requiring an argument didn't have one.
330 ACE_ERROR ((LM_ERROR,
331 ACE_TEXT (" Option '%c' requires an argument but ")
332 ACE_TEXT ("none was supplied\n"),
333 getopt.opt_opt ()));
334 help = 1;
335 retval = -1;
336 break;
337 case '?':
338 case 'h':
339 default:
340 if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
341 && getopt.opt_opt () != 'h')
342 // Don't allow unknown options.
343 ACE_ERROR ((LM_ERROR,
344 ACE_TEXT (" Found an unknown option (%c) ")
345 ACE_TEXT ("we couldn't handle.\n"),
346 getopt.opt_opt ()));
347 // getopt.last_option ())); //readd with "%s" when
348 // last_option() is available.
349 help = 1;
350 retval = -1;
351 break;
355 if (retval == -1)
357 if (help)
358 // print usage here
359 ACE_ERROR ((LM_ERROR,
360 ACE_TEXT ("usage: %s [options]\n")
361 ACE_TEXT ("Options:\n")
362 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
363 ACE_TEXT ("starting multicast group address\n")
364 ACE_TEXT (" ")
365 ACE_TEXT ("(default=239.255.0.1:16000)\n")
366 ACE_TEXT (" -n {#} --Groups={#} ")
367 ACE_TEXT ("number of groups (default=5)\n")
368 ACE_TEXT (" -d --Debug ")
369 ACE_TEXT ("debug flag (default=off)\n")
370 ACE_TEXT (" -r {STRING} --Role={STRING} ")
371 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
372 ACE_TEXT (" ")
373 ACE_TEXT ("(default=BOTH)\n")
374 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
375 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
376 ACE_TEXT (" ")
377 ACE_TEXT ("(default=DEFOPTS)\n")
378 ACE_TEXT (" -i {#} --Iterations={#} ")
379 ACE_TEXT ("number of iterations (default=100)\n")
380 ACE_TEXT (" -t {#} --TTL={#} ")
381 ACE_TEXT ("time to live (default=1)\n")
382 ACE_TEXT (" -w {#} --Wait={#} ")
383 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
384 ACE_TEXT (" ")
385 ACE_TEXT ("(default=2)\n")
386 ACE_TEXT (" -h/? --help ")
387 ACE_TEXT ("show this message\n"),
388 argv[0]));
390 return -1;
393 return 0;
396 void
397 MCT_Config::dump () const
399 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
400 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
401 ACE_DEBUG ((LM_DEBUG,
402 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
403 IP_MAX_MEMBERSHIPS));
404 ACE_DEBUG ((LM_DEBUG,
405 ACE_TEXT ("\tgroups_ = %d\n"),
406 this->groups_));
407 ACE_DEBUG ((LM_DEBUG,
408 ACE_TEXT ("\trole_ = %s\n"),
409 (ACE_BIT_ENABLED (this->role_, PRODUCER)
410 && ACE_BIT_ENABLED (this->role_, CONSUMER))
411 ? ACE_TEXT ("PRODUCER/CONSUMER")
412 : ACE_BIT_ENABLED (this->role_, PRODUCER)
413 ? ACE_TEXT ("PRODUCER")
414 : ACE_TEXT ("CONSUMER")));
415 ACE_DEBUG ((LM_DEBUG,
416 ACE_TEXT ("\tsdm_options_ = %d\n"),
417 this->sdm_opts_));
418 ACE_DEBUG ((LM_DEBUG,
419 ACE_TEXT ("\titerations_ = %d\n"),
420 this->iterations_));
421 ACE_DEBUG ((LM_DEBUG,
422 ACE_TEXT ("\tttl_ = %d\n"),
423 this->ttl_));
424 ACE_DEBUG ((LM_DEBUG,
425 ACE_TEXT ("\twait_ = %d\n"),
426 this->wait_));
427 // Note that this call to get_host_addr is the non-reentrant
428 // version, but it's okay for us.
429 ACE_DEBUG ((LM_DEBUG,
430 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
431 this->group_start_.get_host_addr (),
432 this->group_start_.get_port_number ()));
434 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
437 /******************************************************************************/
439 class MCT_Event_Handler : public ACE_Event_Handler
441 public:
442 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
443 = ACE_SOCK_Dgram_Mcast::DEFOPTS);
444 ~MCT_Event_Handler () override;
446 int join (const ACE_INET_Addr &mcast_addr,
447 int reuse_addr = 1,
448 const ACE_TCHAR *net_if = 0);
449 int leave (const ACE_INET_Addr &mcast_addr,
450 const ACE_TCHAR *net_if = 0);
452 // = Event Handler hooks.
453 int handle_input (ACE_HANDLE handle) override;
454 int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask) override;
456 ACE_HANDLE get_handle () const override;
458 // Turn loopback on/off. Must be called after at least 1 join() is performed.
459 int loopback (bool on_off);
461 protected:
462 ACE_SOCK_Dgram_Mcast *mcast ();
463 int find (const char *buf);
465 private:
466 ACE_SOCK_Dgram_Mcast mcast_;
468 // List of groups we've joined
469 ACE_Vector<ACE_CString*> address_vec_;
471 // Flag used to set the 'finished' flag when the last event handler
472 // gets removed from the reactor.
473 static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
476 ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;
478 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
479 : mcast_ (options)
481 // Increment the number of active handlers in the reactor. Note this isn't
482 // really correct, but it should work for our simple example.
483 ++MCT_Event_Handler::active_handlers_;
486 MCT_Event_Handler::~MCT_Event_Handler ()
488 size_t size = this->address_vec_.size ();
489 for (size_t i = 0; i < size; ++i)
491 delete this->address_vec_[i];
492 this->address_vec_[i] = 0;
494 mcast_.close ();
498 ACE_SOCK_Dgram_Mcast *
499 MCT_Event_Handler::mcast ()
501 return &this->mcast_;
505 MCT_Event_Handler::find (const char *buf)
507 size_t const size = this->address_vec_.size ();
508 size_t i = 0;
509 for (i = 0; i < size; ++i)
511 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
512 return 0;
515 // Not found, so output message we received along with a list of groups
516 // we've joined for debugging.
517 ACE_CString local;
518 for (i = 0; i < size; ++i)
520 local += "\t";
521 local += this->address_vec_[i]->c_str ();
522 local += "\n";
524 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
525 buf, local.c_str ()));
527 return -1;
532 MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
533 int reuse_addr,
534 const ACE_TCHAR *net_if)
536 char buf[MAX_STRING_SIZE];
537 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
538 mcast_addr.get_host_addr (),
539 mcast_addr.get_port_number ());
541 if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
542 ACE_ERROR_RETURN ((LM_ERROR,
543 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
544 buf,
545 ACE_TEXT ("failed")),
546 -1);
547 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Joined %C\n"), buf));
549 ACE_CString *str = 0;
550 ACE_NEW_RETURN (str, ACE_CString (buf), -1);
551 this->address_vec_.push_back (str);
552 return 0;
556 MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
557 const ACE_TCHAR *net_if)
559 if (this->mcast_.leave (mcast_addr, net_if) == 0)
561 char buf[MAX_STRING_SIZE];
562 size_t size = this->address_vec_.size ();
563 for (size_t i = 0; i < size; ++i)
565 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
566 mcast_addr.get_host_addr (),
567 mcast_addr.get_port_number ());
568 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
570 this->address_vec_[i]->set ("");
571 break;
574 return 0;
576 return -1;
580 MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
582 char buf[MAX_STRING_SIZE];
583 ACE_OS::memset (buf, 0, sizeof buf);
584 ACE_INET_Addr addr;
586 if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
588 ++error;
589 ACE_ERROR_RETURN ((LM_ERROR,
590 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
591 ACE_TEXT ("calling recv\n")), -1);
594 // Zero length buffer means we are done.
595 if (ACE_OS::strlen (buf) == 0)
596 return -1;
597 else if (this->find (buf) == -1)
599 ++error;
600 ACE_DEBUG ((LM_ERROR,
601 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
602 ACE_TEXT ("Received dgram for a group we didn't join ")
603 ACE_TEXT ("(%s)\n"),
604 buf));
606 return 0;
610 MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
611 ACE_Reactor_Mask /*close_mask*/)
613 // If this is the last handler, use the finished flag to signal
614 // the task to exit.
615 if (--MCT_Event_Handler::active_handlers_ == 0)
616 finished = 1;
618 // The DONT_CALL flag keeps the reactor from calling handle_close ()
619 // again, since we commit suicide below.
620 this->reactor ()->remove_handler (this,
621 ACE_Event_Handler::ALL_EVENTS_MASK |
622 ACE_Event_Handler::DONT_CALL);
623 this->reactor (0);
624 delete this;
625 return 0;
628 ACE_HANDLE
629 MCT_Event_Handler::get_handle () const
631 return this->mcast_.get_handle ();
634 // Turn loopback on/off
636 MCT_Event_Handler::loopback (bool on_off)
638 char loopback_on = on_off ? 1 : 0;
639 return this->mcast_.set_option (IP_MULTICAST_LOOP, loopback_on);
642 /******************************************************************************/
645 * Our MCT_Task object will be an Active Object if we are running the Consumer
646 * side of the test. open() calls active() which creates a thread and calls
647 * the svc() method that calls runs the reactor event loop.
649 class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
651 public:
652 MCT_Task (const MCT_Config &config,
653 ACE_Reactor *reactor = ACE_Reactor::instance ());
654 ~MCT_Task () override;
656 //FUZZ: disable check_for_lack_ACE_OS
657 // = Task hooks.
658 int open (void *args = 0) override;
659 //FUZZ: enable check_for_lack_ACE_OS
661 int svc () override;
663 private:
664 const MCT_Config &config_;
667 MCT_Task::MCT_Task (const MCT_Config &config,
668 ACE_Reactor *reactor)
669 : config_ (config)
671 this->reactor (reactor);
674 MCT_Task::~MCT_Task ()
678 MCT_Task::open (void *)
680 MCT_Event_Handler *handler = 0;
682 ACE_INET_Addr addr = this->config_.group_start ();
683 int groups = this->config_.groups ();
684 for (int i = 0; i < groups; ++i)
686 ACE_NEW_RETURN (handler,
687 MCT_Event_Handler (this->config_.options ()), -1);
688 // We subscribe to all groups for the first one and one each for
689 // all the others.
690 if (i == 0)
692 // go ahead and hide the other one since we want our own.
693 ACE_INET_Addr addr = this->config_.group_start ();
694 for (int j = 0; j < groups; ++j)
696 // If OPT_BINDADDR_YES is set, this will fail after the first
697 // join, so just break and keep on going, otherwise it's a
698 // real error.
699 if (j > 0
700 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
701 this->config_.options ()))
702 break;
704 if (handler->join (addr) == -1)
705 ACE_ERROR_RETURN ((LM_ERROR,
706 ACE_TEXT ("MCT_Task::open - join error\n")),
707 -1);
708 advance_addr (addr);
711 else
713 if (handler->join (addr) == -1)
714 ACE_ERROR_RETURN ((LM_ERROR,
715 ACE_TEXT ("MCT_Task::open - join error\n")),
716 -1);
719 advance_addr (addr);
721 // This test needs loopback because we're both sending and receiving.
722 // Loopback is usually the default, but be sure.
723 if (-1 == handler->loopback (true))
724 ACE_ERROR ((LM_WARNING,
725 ACE_TEXT ("%p\n"),
726 ACE_TEXT ("MCT_Task::open - enable loopback")));
728 if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
729 ACE_ERROR_RETURN ((LM_ERROR,
730 ACE_TEXT ("MCT_Task::open - cannot register ")
731 ACE_TEXT ("handler\n")),
732 -1);
735 if (this->activate (THR_NEW_LWP) == -1)
736 ACE_ERROR_RETURN ((LM_ERROR,
737 ACE_TEXT ("%p\n"),
738 ACE_TEXT ("MCT_TASK:open - activate failed")),
739 -1);
740 return 0;
744 MCT_Task::svc ()
746 // make sure this thread owns the reactor or handle_events () won't do
747 // anything.
748 this->reactor ()->owner (ACE_Thread::self ());
750 // loop and call handle_events...
751 while (!finished)
752 this->reactor ()->handle_events ();
754 return 0;
757 /******************************************************************************/
759 int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
761 // Send each message twice, once to the right port, and once to the "wrong"
762 // port. This helps generate noise and lets us see if port filtering is
763 // working properly.
764 const char *address = addr.get_host_addr ();
765 int port = addr.get_port_number ();
767 for (int i = 0; i < 2; ++i)
769 char buf[MAX_STRING_SIZE];
770 if (done)
771 buf[0] = 0;
772 else
773 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d", address, port);
775 if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
776 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Send to %C, %p\n"),
777 address,
778 ACE_TEXT ("send_dgram - error calling send on ")
779 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
780 addr.set_port_number (++port);
782 return 0;
785 int producer (MCT_Config &config)
787 int retval = 0;
789 //FUZZ: disable check_for_lack_ACE_OS
790 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
791 ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET);
792 //FUZZ: enable check_for_lack_ACE_OS
794 // Note that is is IPv4 specific and needs to be changed once
796 if (config.ttl () > 1)
798 int ttl = config.ttl ();
799 if (socket.set_option (IPPROTO_IP,
800 IP_MULTICAST_TTL,
801 (void*) &ttl,
802 sizeof ttl) != 0)
803 ACE_DEBUG ((LM_ERROR,
804 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
805 ACE_TEXT ("= %d\n"),
806 ttl));
807 else
808 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
811 int iterations = config.iterations ();
812 // we add an extra 5 groups for noise.
813 int groups = config.groups () + 5;
814 for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
816 ACE_INET_Addr addr = config.group_start ();
817 for (int j = 0; j < groups && !finished; ++j)
819 if ((retval += send_dgram (socket, addr,
820 ((i + 1) == iterations))) == -1)
821 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
822 if ((retval += advance_addr (addr)) == -1)
823 ACE_ERROR ((LM_ERROR,
824 ACE_TEXT ("Calling advance_addr.\n")));
826 // Give the task thread a chance to run.
827 ACE_Thread::yield ();
829 socket.close ();
830 return retval;
834 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
835 * Note that the algorithm is somewhat simplistic, but sufficient for our
836 * purpose.
838 int advance_addr (ACE_INET_Addr &addr)
840 int a, b, c, d;
841 ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
842 if (d < 255)
843 ++d;
844 else if (c < 255)
846 d = 1;
847 ++c;
849 else if (b < 255)
851 d = 1;
852 c = 0;
853 ++b;
855 else if (a < 239)
857 d = 1;
858 c = 0;
859 b = 0;
860 ++a;
862 else
863 ACE_ERROR_RETURN ((LM_ERROR,
864 ACE_TEXT ("advance_addr - Cannot advance multicast ")
865 ACE_TEXT ("group address past %s\n"),
866 addr.get_host_addr ()),
867 -1);
869 ACE_TCHAR buf[MAX_STRING_SIZE];
870 ACE_OS::snprintf (buf, MAX_STRING_SIZE, ACE_TEXT ("%d.%d.%d.%d:%d"),
871 a, b, c, d, addr.get_port_number ());
872 addr.set (buf);
873 return 0;
877 run_main (int argc, ACE_TCHAR *argv[])
879 int retval = 0;
880 MCT_Config config;
881 retval = config.open (argc, argv);
882 if (retval != 0)
883 return 1;
885 const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test");
886 ACE_TString test = temp;
888 u_long role = config.role ();
889 if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
890 || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
892 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
893 test += ACE_TEXT ("-PRODUCER");
894 else
895 test += ACE_TEXT ("-CONSUMER");
898 // Start test only if options are valid.
899 ACE_START_TEST (test.c_str ());
901 // Register a signal handler to close down application gracefully.
902 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
904 // Dump the configuration info to the log if caller passed debug option.
905 if (config.debug ())
906 config.dump ();
908 ACE_Reactor *reactor = ACE_Reactor::instance ();
910 MCT_Task *task = new MCT_Task (config, reactor);
912 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
914 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
915 // Open makes it an active object.
916 retval += task->open ();
919 // now produce the datagrams...
920 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
921 retval += producer (config);
923 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
925 // and wait for everything to finish
926 ACE_DEBUG ((LM_INFO,
927 ACE_TEXT ("start waiting for consumer to finish...\n")));
928 // Wait for the threads to exit.
929 // But, wait for a limited time since we could hang if the last udp
930 // message isn't received.
931 ACE_Time_Value max_wait ( config.wait ()/* seconds */);
932 ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
933 ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
934 ? &wait_time : 0;
935 if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
937 // We will no longer wait for this thread, so we must
938 // force it to exit otherwise the thread will be referencing
939 // deleted memory.
940 finished = 1;
941 reactor->end_reactor_event_loop ();
943 if (errno == ETIME)
944 ACE_ERROR ((LM_ERROR,
945 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
946 max_wait.msec ()));
947 else
948 ACE_OS::perror (ACE_TEXT ("wait"));
950 ++error;
952 // This should exit now that we ended the reactor loop.
953 task->wait ();
957 delete task;
958 ACE_END_TEST;
959 return (retval == 0 && error == 0) ? 0 : 1;
962 #else
964 run_main (int, ACE_TCHAR *[])
966 ACE_START_TEST (ACE_TEXT ("Multicast_Test"));
968 ACE_ERROR ((LM_INFO,
969 ACE_TEXT ("This test must be run on a platform ")
970 ACE_TEXT ("that support IP multicast.\n")));
972 ACE_END_TEST;
973 return 1;
975 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */