Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Multicast_Test_IPV6.cpp
blobe217fa7c4972a0aa17255eefd5233d44cf8610c0
1 // ============================================================================
2 //FUZZ: disable check_for_lack_ACE_OS
3 /**
4 * @file Multicast_Test_IPV6.cpp
6 * @brief This program tests ACE_SOCK_Dgram_Mcast class.
8 * It specifically tests subscribing to multiple groups on the same
9 * socket on one or more physical interfaces (if available).
11 * The test can be run as a producer, consumer, or both
12 * producer/consumer (default). The test requires at least two (2)
13 * multicast groups which can be configured as command line options.
14 * The consumer subscribes to a single group per instance and an
15 * additional instance tries to subscribe to all groups on a single
16 * socket (if the ACE_SOCK_Dgram_Mcast instance bind()'s the first
17 * address to the socket, additional joins will fail). The producer
18 * iterates through the list of group addresses and sends a single
19 * message containing the destination address and port to each one. It
20 * also sends messages to five (5) additional groups and a message to
21 * an additional port for each group in order to produce a bit of
22 * "noise" in order to help validate how well the multicast filtering
23 * works on a particular platform.
25 * The list of destination groups start at ff01::1 (default) and
26 * increment by 1 up to 5 (default) groups. Both of these values, as
27 * well as others, can be overridden via command-line options. Use
28 * the -? option to display the usage message...
30 * @author Don Hinton <dhinton@dresystems.com>
31 * Brian Buesker <bbuesker@qualcomm.com>
33 // ============================================================================
34 //FUZZ: enable check_for_lack_ACE_OS
36 #include "test_config.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Vector_T.h"
39 #include "ace/SOCK_Dgram_Mcast.h"
40 #include "ace/ACE.h"
41 #include "ace/Reactor.h"
42 #include "ace/OS_NS_string.h"
43 #include "ace/OS_NS_strings.h"
44 #include "ace/Task.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infinite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
55 * SIGINT (CTRL-C).
57 static sig_atomic_t finished = 0;
58 extern "C" void handler (int)
60 finished = 1;
63 static const int MCT_ITERATIONS = 10;
64 static const int MCT_GROUPS = 5;
65 static const int MCT_MIN_GROUPS = 2;
67 #if defined (ACE_HAS_IPV6)
68 static const char MCT_START_GROUP[] = "ff01::1";
69 #else
70 // an IPv4 address that will ensure an error message is not printed when
71 // IPv6 is not enabled
72 static const char MCT_START_GROUP[] = "239.255.0.1";
73 #endif /* ACE_HAS_IPV6 */
74 static const int MCT_START_PORT = 16000;
76 static const size_t MAX_STRING_SIZE = 200;
78 int advance_addr (ACE_INET_Addr &addr);
80 // Keep track of errors so we can report them on exit.
81 static sig_atomic_t error = 0;
84 * MCast_Config holds configuration data for this test.
86 class MCT_Config
88 public:
90 enum
92 PRODUCER = 1,
93 CONSUMER = 2,
94 BOTH = PRODUCER | CONSUMER
97 MCT_Config (void)
98 : group_start_ (MCT_START_PORT, MCT_START_GROUP),
99 groups_ (0),
100 debug_ (0),
101 role_ (BOTH),
102 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
103 iterations_ (MCT_ITERATIONS),
104 ttl_ (1),
105 wait_ (2)
107 if (IP_MAX_MEMBERSHIPS == 0)
108 this->groups_ = MCT_GROUPS;
109 else
110 this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
113 ~MCT_Config (void)
116 //FUZZ: disable check_for_lack_ACE_OS
117 int open (int argc, ACE_TCHAR *argv[]);
118 //FUZZ: enable check_for_lack_ACE_OS
120 int debug (void) const { return this->debug_;}
121 void dump (void) const;
122 int groups (void) const { return this->groups_;}
123 const ACE_INET_Addr group_start (void) const { return this->group_start_;}
124 u_long role (void) const { return this->role_;}
125 int iterations (void) const { return this->iterations_;}
126 int ttl (void) const { return this->ttl_;}
128 //FUZZ: disable check_for_lack_ACE_OS
129 int wait (void) const { return this->wait_;}
130 //FUZZ: enable check_for_lack_ACE_OS
132 ACE_SOCK_Dgram_Mcast::options options (void) const
134 return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
137 int set_group (int port, const char *group);
139 private:
140 // Starting group address.
141 ACE_INET_Addr group_start_;
143 // Number of groups we will try to use in the test.
144 int groups_;
146 // Debug flag.
147 int debug_;
149 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
150 u_long role_;
152 // ACE_SOCK_Dgram_Mcast ctor options
153 u_long sdm_opts_;
155 // Producer iterations
156 int iterations_;
158 // TTL, time to live, for use over routers.
159 int ttl_;
161 // Time to wait on CONSUMER threads to end before killing test.
162 int wait_;
166 MCT_Config::open (int argc, ACE_TCHAR *argv[])
168 int retval = 0;
169 int help = 0;
171 //FUZZ: disable check_for_lack_ACE_OS
172 ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);
173 //FUZZ: enable check_for_lack_ACE_OS
175 if (getopt.long_option (ACE_TEXT ("GroupStart"),
176 'g',
177 ACE_Get_Opt::ARG_REQUIRED) != 0)
178 ACE_ERROR_RETURN ((LM_ERROR,
179 ACE_TEXT (" Unable to add GroupStart option.\n")),
182 if (getopt.long_option (ACE_TEXT ("Groups"),
183 'n',
184 ACE_Get_Opt::ARG_REQUIRED) != 0)
185 ACE_ERROR_RETURN ((LM_ERROR,
186 ACE_TEXT (" Unable to add Groups option.\n")), 1);
188 if (getopt.long_option (ACE_TEXT ("Debug"),
189 'd',
190 ACE_Get_Opt::NO_ARG) != 0)
191 ACE_ERROR_RETURN ((LM_ERROR,
192 ACE_TEXT (" Unable to add Debug option.\n")), 1);
194 if (getopt.long_option (ACE_TEXT ("Role"),
195 'r',
196 ACE_Get_Opt::ARG_REQUIRED) != 0)
197 ACE_ERROR_RETURN ((LM_ERROR,
198 ACE_TEXT (" Unable to add Role option.\n")), 1);
200 if (getopt.long_option (ACE_TEXT ("SDM_options"),
201 'm',
202 ACE_Get_Opt::ARG_REQUIRED) != 0)
203 ACE_ERROR_RETURN ((LM_ERROR,
204 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
207 if (getopt.long_option (ACE_TEXT ("Iterations"),
208 'i',
209 ACE_Get_Opt::ARG_REQUIRED) != 0)
210 ACE_ERROR_RETURN ((LM_ERROR,
211 ACE_TEXT (" Unable to add iterations option.\n")),
214 if (getopt.long_option (ACE_TEXT ("TTL"),
215 't',
216 ACE_Get_Opt::ARG_REQUIRED) != 0)
217 ACE_ERROR_RETURN ((LM_ERROR,
218 ACE_TEXT (" Unable to add TTL option.\n")),
221 if (getopt.long_option (ACE_TEXT ("Wait"),
222 'w',
223 ACE_Get_Opt::ARG_REQUIRED) != 0)
224 ACE_ERROR_RETURN ((LM_ERROR,
225 ACE_TEXT (" Unable to add wait option.\n")),
228 if (getopt.long_option (ACE_TEXT ("help"),
229 'h',
230 ACE_Get_Opt::NO_ARG) != 0)
231 ACE_ERROR_RETURN ((LM_ERROR,
232 ACE_TEXT (" Unable to add help option.\n")),
235 // Now, let's parse it...
236 int c = 0;
238 //FUZZ: disable check_for_lack_ACE_OS
239 while ((c = getopt ()) != EOF)
241 //FUZZ: enable check_for_lack_ACE_OS
242 switch (c)
244 case 0:
245 // Long Option. This should never happen.
246 retval = -1;
247 break;
248 case 'g':
250 // @todo validate all these, i.e., must be within range
251 // 224.255.0.0 to 238.255.255.255, but we only allow the
252 // administrative "site local" range, 239.255.0.0 to
253 // 239.255.255.255.
254 ACE_TCHAR *group = getopt.opt_arg ();
255 if (this->group_start_.set (group) != 0)
257 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
258 group));
261 break;
262 case 'i':
263 this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
264 break;
265 case 'n':
267 int n = ACE_OS::atoi (getopt.opt_arg ());
268 // I'm assuming 0 means unlimited, so just use whatever the
269 // user provides. Seems to work okay on Solaris 5.8.
270 if (IP_MAX_MEMBERSHIPS == 0)
271 this->groups_ = n;
272 else
273 this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
274 IP_MAX_MEMBERSHIPS);
275 break;
277 case 'd':
278 this->debug_ = 1;
279 break;
280 case 'r':
282 ACE_TCHAR *c = getopt.opt_arg ();
283 if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
284 this->role_ = CONSUMER;
285 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
286 this->role_ = PRODUCER;
287 else
289 help = 1;
290 retval = -1;
293 break;
294 case 'm':
296 //@todo add back OPT_BINDADDR_NO...
297 ACE_TCHAR *c = getopt.opt_arg ();
298 if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
299 ACE_SET_BITS (this->sdm_opts_,
300 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
301 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
302 ACE_CLR_BITS (this->sdm_opts_,
303 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
304 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
306 ACE_CLR_BITS (this->sdm_opts_,
307 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
308 ACE_SET_BITS (this->sdm_opts_,
309 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
311 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
312 ACE_SET_BITS (this->sdm_opts_,
313 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
314 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
315 ACE_CLR_BITS (this->sdm_opts_,
316 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
317 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
319 ACE_CLR_BITS (this->sdm_opts_,
320 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
321 ACE_SET_BITS (this->sdm_opts_,
322 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
324 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
325 this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
326 else
328 help = 1;
329 retval = -1;
332 break;
333 case 't':
334 this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
335 break;
336 case 'w':
337 this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
338 break;
339 case ':':
340 // This means an option requiring an argument didn't have one.
341 ACE_ERROR ((LM_ERROR,
342 ACE_TEXT (" Option '%c' requires an argument but ")
343 ACE_TEXT ("none was supplied\n"),
344 getopt.opt_opt ()));
345 help = 1;
346 retval = -1;
347 break;
348 case '?':
349 case 'h':
350 default:
351 if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
352 && getopt.opt_opt () != 'h')
353 // Don't allow unknown options.
354 ACE_ERROR ((LM_ERROR,
355 ACE_TEXT (" Found an unknown option (%c) ")
356 ACE_TEXT ("we couldn't handle.\n"),
357 getopt.opt_opt ()));
358 // getopt.last_option ())); //readd with "%s" when
359 // last_option() is available.
360 help = 1;
361 retval = -1;
362 break;
366 if (retval == -1)
368 if (help)
369 // print usage here
370 ACE_ERROR ((LM_ERROR,
371 ACE_TEXT ("usage: %s [options]\n")
372 ACE_TEXT ("Options:\n")
373 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
374 ACE_TEXT ("starting multicast group address\n")
375 ACE_TEXT (" ")
376 ACE_TEXT ("(default=239.255.0.1:16000)\n")
377 ACE_TEXT (" -n {#} --Groups={#} ")
378 ACE_TEXT ("number of groups (default=5)\n")
379 ACE_TEXT (" -d --Debug ")
380 ACE_TEXT ("debug flag (default=off)\n")
381 ACE_TEXT (" -r {STRING} --Role={STRING} ")
382 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
383 ACE_TEXT (" ")
384 ACE_TEXT ("(default=BOTH)\n")
385 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
386 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
387 ACE_TEXT (" ")
388 ACE_TEXT ("(default=DEFOPTS)\n")
389 ACE_TEXT (" -i {#} --Iterations={#} ")
390 ACE_TEXT ("number of iterations (default=100)\n")
391 ACE_TEXT (" -t {#} --TTL={#} ")
392 ACE_TEXT ("time to live (default=1)\n")
393 ACE_TEXT (" -w {#} --Wait={#} ")
394 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
395 ACE_TEXT (" ")
396 ACE_TEXT ("(default=2)\n")
397 ACE_TEXT (" -h/? --help ")
398 ACE_TEXT ("show this message\n"),
399 argv[0]));
401 return -1;
404 return 0;
407 void
408 MCT_Config::dump (void) const
410 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
411 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
412 ACE_DEBUG ((LM_DEBUG,
413 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
414 IP_MAX_MEMBERSHIPS));
415 ACE_DEBUG ((LM_DEBUG,
416 ACE_TEXT ("\tgroups_ = %d\n"),
417 this->groups_));
418 ACE_DEBUG ((LM_DEBUG,
419 ACE_TEXT ("\trole_ = %s\n"),
420 (ACE_BIT_ENABLED (this->role_, PRODUCER)
421 && ACE_BIT_ENABLED (this->role_, CONSUMER))
422 ? ACE_TEXT ("PRODUCER/CONSUMER")
423 : ACE_BIT_ENABLED (this->role_, PRODUCER)
424 ? ACE_TEXT ("PRODUCER")
425 : ACE_TEXT ("CONSUMER")));
426 ACE_DEBUG ((LM_DEBUG,
427 ACE_TEXT ("\tsdm_options_ = %d\n"),
428 this->sdm_opts_));
429 ACE_DEBUG ((LM_DEBUG,
430 ACE_TEXT ("\titerations_ = %d\n"),
431 this->iterations_));
432 ACE_DEBUG ((LM_DEBUG,
433 ACE_TEXT ("\tttl_ = %d\n"),
434 this->ttl_));
435 ACE_DEBUG ((LM_DEBUG,
436 ACE_TEXT ("\twait_ = %d\n"),
437 this->wait_));
438 // Note that this call to get_host_addr is the non-reentrant
439 // version, but it's okay for us.
440 ACE_DEBUG ((LM_DEBUG,
441 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
442 this->group_start_.get_host_addr (),
443 this->group_start_.get_port_number ()));
445 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
449 MCT_Config::set_group (int port, const char *group)
451 return group_start_.set (port, group);
454 /******************************************************************************/
456 class MCT_Event_Handler : public ACE_Event_Handler
458 public:
459 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
460 = ACE_SOCK_Dgram_Mcast::DEFOPTS);
461 virtual ~MCT_Event_Handler (void);
463 int join (const ACE_INET_Addr &mcast_addr,
464 int reuse_addr = 1,
465 const ACE_TCHAR *net_if = 0);
466 int leave (const ACE_INET_Addr &mcast_addr,
467 const ACE_TCHAR *net_if = 0);
469 // = Event Handler hooks.
470 virtual int handle_input (ACE_HANDLE handle);
471 virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask);
473 virtual ACE_HANDLE get_handle (void) const;
475 protected:
476 ACE_SOCK_Dgram_Mcast *mcast (void);
477 int find (const char *buf);
479 private:
480 ACE_SOCK_Dgram_Mcast mcast_;
482 // List of groups we've joined
483 ACE_Vector<ACE_CString*> address_vec_;
485 // Flag used to set the 'finished' flag when the last event handler
486 // gets removed from the reactor.
487 static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
490 ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;
492 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
493 : mcast_ (options)
495 // Increment the number of active handlers in the reactor. Note this isn't
496 // really correct, but it should work for our simple example.
497 ++MCT_Event_Handler::active_handlers_;
500 MCT_Event_Handler::~MCT_Event_Handler (void)
502 size_t size = this->address_vec_.size ();
503 for (size_t i = 0; i < size; ++i)
505 delete this->address_vec_[i];
506 this->address_vec_[i] = 0;
511 ACE_SOCK_Dgram_Mcast *
512 MCT_Event_Handler::mcast (void)
514 return &this->mcast_;
518 MCT_Event_Handler::find (const char *buf)
520 size_t size = this->address_vec_.size ();
521 size_t i;
522 for (i = 0; i < size; ++i)
524 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
525 return 0;
528 // Not found, so output message we received along with a list of groups
529 // we've joined for debugging.
530 ACE_CString local;
531 for (i = 0; i < size; ++i)
533 local += "\t";
534 local += this->address_vec_[i]->c_str ();
535 local += "\n";
537 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
538 buf, local.c_str ()));
540 return -1;
545 MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
546 int reuse_addr,
547 const ACE_TCHAR *net_if)
549 char buf[MAX_STRING_SIZE];
550 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
551 mcast_addr.get_host_addr (),
552 mcast_addr.get_port_number ());
554 if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
555 ACE_ERROR_RETURN ((LM_ERROR,
556 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
557 buf,
558 ACE_TEXT ("failed")),
559 -1);
560 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Joined %C\n"), buf));
562 ACE_CString *str = 0;
563 ACE_NEW_RETURN (str, ACE_CString (buf), -1);
564 this->address_vec_.push_back (str);
565 return 0;
569 MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
570 const ACE_TCHAR *net_if)
572 if (this->mcast_.leave (mcast_addr, net_if) == 0)
574 char buf[MAX_STRING_SIZE];
575 size_t size = this->address_vec_.size ();
576 for (size_t i = 0; i < size; ++i)
578 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
579 mcast_addr.get_host_addr (),
580 mcast_addr.get_port_number ());
581 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
583 this->address_vec_[i]->set ("");
584 break;
587 return 0;
589 return -1;
593 MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
595 char buf[MAX_STRING_SIZE];
596 ACE_OS::memset (buf, 0, sizeof buf);
597 ACE_INET_Addr addr;
599 if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
601 ++error;
602 ACE_ERROR_RETURN ((LM_ERROR,
603 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
604 ACE_TEXT ("calling recv\n")), -1);
607 // Zero length buffer means we are done.
608 if (ACE_OS::strlen (buf) == 0)
609 return -1;
610 else if (this->find (buf) == -1)
612 ++error;
613 ACE_DEBUG ((LM_ERROR,
614 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
615 ACE_TEXT ("Received dgram for a group we didn't join ")
616 ACE_TEXT ("(%s)\n"),
617 buf));
619 return 0;
623 MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
624 ACE_Reactor_Mask /*close_mask*/)
626 // If this is the last handler, use the finished flag to signal
627 // the task to exit.
628 if (--MCT_Event_Handler::active_handlers_ == 0)
629 finished = 1;
631 // The DONT_CALL flag keeps the reactor from calling handle_close ()
632 // again, since we commit suicide below.
633 this->reactor ()->remove_handler (this,
634 ACE_Event_Handler::ALL_EVENTS_MASK |
635 ACE_Event_Handler::DONT_CALL);
636 this->reactor (0);
637 delete this;
638 return 0;
641 ACE_HANDLE
642 MCT_Event_Handler::get_handle (void) const
644 return this->mcast_.get_handle ();
647 /******************************************************************************/
650 * Our MCT_Task object will be an Active Object if we are running the Consumer
651 * side of the test. open() calls active() which creates a thread and calls
652 * the svc() method that calls runs the reactor event loop.
654 class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
656 public:
657 MCT_Task (const MCT_Config &config,
658 ACE_Reactor *reactor = ACE_Reactor::instance ());
659 ~MCT_Task (void);
661 //FUZZ: disable check_for_lack_ACE_OS
662 // = Task hooks.
663 virtual int open (void *args = 0);
664 virtual int svc (void);
665 //FUZZ: enable check_for_lack_ACE_OS
667 private:
668 const MCT_Config &config_;
671 MCT_Task::MCT_Task (const MCT_Config &config,
672 ACE_Reactor *reactor)
673 : config_ (config)
675 this->reactor (reactor);
678 MCT_Task::~MCT_Task (void)
682 MCT_Task::open (void *)
684 MCT_Event_Handler *handler;
686 ACE_INET_Addr addr = this->config_.group_start ();
687 int groups = this->config_.groups ();
688 for (int i = 0; i < groups; ++i)
690 ACE_NEW_RETURN (handler,
691 MCT_Event_Handler (this->config_.options ()), -1);
692 // We subscribe to all groups for the first one and one each for
693 // all the others.
694 if (i == 0)
696 // go ahead and hide the other one since we want our own.
697 ACE_INET_Addr addr = this->config_.group_start ();
698 for (int j = 0; j < groups; ++j)
700 // If OPT_BINDADDR_YES is set, this will fail after the first
701 // join, so just break and keep on going, otherwise it's a
702 // real error.
703 if (j > 0
704 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
705 this->config_.options ()))
706 break;
708 if (handler->join (addr) == -1)
709 ACE_ERROR_RETURN ((LM_ERROR,
710 ACE_TEXT ("MCT_Task::open - join error\n")),
711 -1);
712 advance_addr (addr);
715 else
717 if (handler->join (addr) == -1)
718 ACE_ERROR_RETURN ((LM_ERROR,
719 ACE_TEXT ("MCT_Task::open - join error\n")),
720 -1);
723 advance_addr (addr);
725 if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
726 ACE_ERROR_RETURN ((LM_ERROR,
727 ACE_TEXT ("MCT_Task::open - cannot register ")
728 ACE_TEXT ("handler\n")),
729 -1);
732 if (this->activate (THR_NEW_LWP) == -1)
733 ACE_ERROR_RETURN ((LM_ERROR,
734 ACE_TEXT ("%p\n"),
735 ACE_TEXT ("MCT_TASK:open - activate failed")),
736 -1);
737 return 0;
741 MCT_Task::svc (void)
743 // make sure this thread owns the reactor or handle_events () won't do
744 // anything.
745 this->reactor ()->owner (ACE_Thread::self ());
747 // loop and call handle_events...
748 while (!finished)
749 this->reactor ()->handle_events ();
751 return 0;
754 /******************************************************************************/
756 int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
759 // Send each message twice, once to the right port, and once to the "wrong"
760 // port. This helps generate noise and lets us see if port filtering is
761 // working properly.
762 const char *address = addr.get_host_addr ();
763 int port = addr.get_port_number ();
765 for (int i = 0; i < 2; ++i)
767 char buf[MAX_STRING_SIZE];
768 if (done)
769 buf[0] = 0;
770 else
771 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d", address, port);
772 //ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sending (%s)\n"), buf));
773 if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
774 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
775 ACE_TEXT ("send_dgram - error calling send on ")
776 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
777 addr.set_port_number (++port);
779 return 0;
782 int producer (MCT_Config &config)
784 int retval = 0;
786 //FUZZ: disable check_for_lack_ACE_OS
787 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
788 ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &));
789 //FUZZ: enable check_for_lack_ACE_OS
791 // set the TTL or hop count based on the config.ttl () value
792 if (config.ttl () > 1 && config.group_start().get_type() == AF_INET)
794 int ttl = config.ttl ();
795 if (socket.set_option (IPPROTO_IP,
796 IP_MULTICAST_TTL,
797 (void*) &ttl,
798 sizeof ttl) != 0)
799 ACE_DEBUG ((LM_ERROR,
800 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
801 ACE_TEXT ("= %d\n"),
802 ttl));
803 else
804 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
806 #if defined (ACE_HAS_IPV6)
807 else
809 // for IPv6, a hop limit is used instead of TTL
810 int hops = config.ttl ();
811 if (socket.set_option (IPPROTO_IPV6,
812 IPV6_MULTICAST_HOPS,
813 (void*) &hops,
814 sizeof hops) != 0)
815 ACE_DEBUG ((LM_ERROR,
816 ACE_TEXT ("could net set socket option IPV6_MULTICAST_HOPS")
817 ACE_TEXT (" = %d\n"),
818 hops));
819 else
820 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IPV6_MULTICAST_HOPS = %d\n"),
821 hops));
824 // Turn on multicast loopback since the test relies on it and the
825 // ACE_SOCK_Dgram_Mcast documents the loopback state as indeterminate.
826 int do_loopback = 1;
827 if (socket.set_option (IPPROTO_IPV6,
828 IPV6_MULTICAST_LOOP,
829 (void *)&do_loopback,
830 sizeof (do_loopback)) == -1)
832 if (errno == ENOTSUP)
833 ACE_DEBUG ((LM_INFO,
834 ACE_TEXT ("IPV6_MULTICAST_LOOP not supported\n")));
835 else
836 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
837 ACE_TEXT ("Can't set IPV6_MULTICAST_LOOP")));
839 #endif /* ACE_HAS_IPV6 */
842 int iterations = config.iterations ();
843 // we add an extra 5 groups for noise.
844 int groups = config.groups () + 5;
845 for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
847 ACE_INET_Addr addr = config.group_start ();
848 for (int j = 0; j < groups && !finished; ++j)
850 if ((retval += send_dgram (socket, addr,
851 ((i + 1) == iterations))) == -1)
852 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
853 if ((retval += advance_addr (addr)) == -1)
854 ACE_ERROR ((LM_ERROR,
855 ACE_TEXT ("Calling advance_addr.\n")));
857 // Give the task thread a chance to run.
858 ACE_Thread::yield ();
860 return retval;
864 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
865 * Note that the algorithm is somewhat simplistic, but sufficient for our
866 * purpose.
868 int advance_addr (ACE_INET_Addr &addr)
870 int a, b, c, d;
871 if (addr.get_type () == AF_INET)
873 ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
874 if (d < 255)
875 ++d;
876 else if (c < 255)
878 d = 1;
879 ++c;
881 else if (b < 255)
883 d = 1;
884 c = 0;
885 ++b;
887 else if (a < 239)
889 d = 1;
890 c = 0;
891 b = 0;
892 ++a;
894 else
895 ACE_ERROR_RETURN ((LM_ERROR,
896 ACE_TEXT ("advance_addr - Cannot advance multicast ")
897 ACE_TEXT ("group address past %s\n"),
898 addr.get_host_addr ()),
899 -1);
901 ACE_TCHAR buf[MAX_STRING_SIZE];
902 ACE_OS::snprintf (buf, MAX_STRING_SIZE, ACE_TEXT ("%d.%d.%d.%d:%d"),
903 a, b, c, d, addr.get_port_number ());
904 addr.set (buf);
905 return 0;
907 #if defined (ACE_HAS_IPV6)
908 else // assume AF_INET6
910 sockaddr_in6 *saddr = reinterpret_cast<sockaddr_in6 *> (addr.get_addr ());
911 unsigned char *sin6_addr = reinterpret_cast<unsigned char *> (&saddr->sin6_addr);
912 int i = 15;
914 // i >= 2 is used here so that the flags and scope for the
915 // multicast address are not changed
916 while (i >= 2 && sin6_addr[i] == 0xff)
918 sin6_addr[i] = 0;
919 i--;
922 if (i >= 2)
924 sin6_addr[i]++;
926 else
928 ACE_ERROR_RETURN ((LM_ERROR,
929 ACE_TEXT ("advance_addr - Cannot advance ")
930 ACE_TEXT ("multicast group address past %s\n"),
931 addr.get_host_addr ()),
932 -1);
936 #endif /* ACE_HAS_IPV6 */
938 return 0;
942 run_main (int argc, ACE_TCHAR *argv[])
944 int retval = 0;
945 MCT_Config config;
946 retval = config.open (argc, argv);
947 if (retval != 0)
948 return 1;
950 const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test_IPV6");
951 ACE_TString test = temp;
953 u_long role = config.role ();
954 if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
955 || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
957 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
958 test += ACE_TEXT ("-PRODUCER");
959 else
960 test += ACE_TEXT ("-CONSUMER");
963 // Start test only if options are valid.
964 ACE_START_TEST (test.c_str ());
966 #if defined (ACE_HAS_IPV6)
968 # if !defined (ACE_LACKS_UNIX_SIGNALS)
969 // Register a signal handler to close down application gracefully.
970 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
971 # endif
973 // Dump the configuration info to the log if caller passed debug option.
974 if (config.debug ())
975 config.dump ();
977 ACE_Reactor *reactor = ACE_Reactor::instance ();
979 MCT_Task *task = new MCT_Task (config, reactor);
981 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
983 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
984 // Open makes it an active object.
985 retval += task->open ();
988 // now produce the datagrams...
989 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
990 retval += producer (config);
992 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
994 // and wait for everything to finish
995 ACE_DEBUG ((LM_INFO,
996 ACE_TEXT ("start waiting for consumer to finish...\n")));
997 // Wait for the threads to exit.
998 // But, wait for a limited time since we could hang if the last udp
999 // message isn't received.
1000 ACE_Time_Value max_wait ( config.wait ()/* seconds */);
1001 ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
1002 ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
1003 ? &wait_time : 0;
1004 if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
1006 if (errno == ETIME)
1007 ACE_ERROR ((LM_ERROR,
1008 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
1009 max_wait.msec ()));
1010 else
1011 ACE_OS::perror (ACE_TEXT ("wait"));
1013 ++error;
1017 delete task;
1018 #endif /* ACE_HAS_IPV6 */
1019 ACE_END_TEST;
1020 return (retval == 0 && error == 0) ? 0 : 1;
1023 #else
1025 run_main (int, ACE_TCHAR *[])
1027 ACE_START_TEST (ACE_TEXT ("Multicast_Test_IPV6"));
1029 ACE_ERROR ((LM_INFO,
1030 ACE_TEXT ("This test must be run on a platform ")
1031 ACE_TEXT ("that support IP multicast and threads.\n")));
1033 ACE_END_TEST;
1035 return 0;
1037 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */