1 // ============================================================================
2 //FUZZ: disable check_for_lack_ACE_OS
4 * @file Multicast_Test_IPV6.cpp
6 * @brief This program tests ACE_SOCK_Dgram_Mcast class.
8 * It specifically tests subscribing to multiple groups on the same
9 * socket on one or more physical interfaces (if available).
11 * The test can be run as a producer, consumer, or both
12 * producer/consumer (default). The test requires at least two (2)
13 * multicast groups which can be configured as command line options.
14 * The consumer subscribes to a single group per instance and an
15 * additional instance tries to subscribe to all groups on a single
16 * socket (if the ACE_SOCK_Dgram_Mcast instance bind()'s the first
17 * address to the socket, additional joins will fail). The producer
18 * iterates through the list of group addresses and sends a single
19 * message containing the destination address and port to each one. It
20 * also sends messages to five (5) additional groups and a message to
21 * an additional port for each group in order to produce a bit of
22 * "noise" in order to help validate how well the multicast filtering
23 * works on a particular platform.
25 * The list of destination groups start at ff01::1 (default) and
26 * increment by 1 up to 5 (default) groups. Both of these values, as
27 * well as others, can be overridden via command-line options. Use
28 * the -? option to display the usage message...
30 * @author Don Hinton <dhinton@dresystems.com>
31 * Brian Buesker <bbuesker@qualcomm.com>
33 // ============================================================================
34 //FUZZ: enable check_for_lack_ACE_OS
36 #include "test_config.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Vector_T.h"
39 #include "ace/SOCK_Dgram_Mcast.h"
41 #include "ace/Reactor.h"
42 #include "ace/OS_NS_string.h"
43 #include "ace/OS_NS_strings.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infinite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
57 static sig_atomic_t finished
= 0;
58 extern "C" void handler (int)
63 static const int MCT_ITERATIONS
= 10;
64 static const int MCT_GROUPS
= 5;
65 static const int MCT_MIN_GROUPS
= 2;
67 #if defined (ACE_HAS_IPV6)
68 static const char MCT_START_GROUP
[] = "ff01::1";
70 // an IPv4 address that will ensure an error message is not printed when
71 // IPv6 is not enabled
72 static const char MCT_START_GROUP
[] = "239.255.0.1";
73 #endif /* ACE_HAS_IPV6 */
74 static const int MCT_START_PORT
= 16000;
76 static const size_t MAX_STRING_SIZE
= 200;
78 int advance_addr (ACE_INET_Addr
&addr
);
80 // Keep track of errors so we can report them on exit.
81 static sig_atomic_t error
= 0;
84 * MCast_Config holds configuration data for this test.
93 BOTH
= PRODUCER
| CONSUMER
97 : group_start_ (MCT_START_PORT
, MCT_START_GROUP
),
101 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS
),
102 iterations_ (MCT_ITERATIONS
),
106 if (IP_MAX_MEMBERSHIPS
== 0)
107 this->groups_
= MCT_GROUPS
;
109 this->groups_
= ACE_MIN (IP_MAX_MEMBERSHIPS
, MCT_GROUPS
);
115 //FUZZ: disable check_for_lack_ACE_OS
116 int open (int argc
, ACE_TCHAR
*argv
[]);
117 //FUZZ: enable check_for_lack_ACE_OS
119 int debug () const { return this->debug_
;}
121 int groups () const { return this->groups_
;}
122 const ACE_INET_Addr
group_start () const { return this->group_start_
;}
123 u_long
role () const { return this->role_
;}
124 int iterations () const { return this->iterations_
;}
125 int ttl () const { return this->ttl_
;}
127 //FUZZ: disable check_for_lack_ACE_OS
128 int wait () const { return this->wait_
;}
129 //FUZZ: enable check_for_lack_ACE_OS
131 ACE_SOCK_Dgram_Mcast::options
options () const
133 return static_cast<ACE_SOCK_Dgram_Mcast::options
> (this->sdm_opts_
);
136 int set_group (int port
, const char *group
);
139 // Starting group address.
140 ACE_INET_Addr group_start_
;
142 // Number of groups we will try to use in the test.
148 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
151 // ACE_SOCK_Dgram_Mcast ctor options
154 // Producer iterations
157 // TTL, time to live, for use over routers.
160 // Time to wait on CONSUMER threads to end before killing test.
165 MCT_Config::open (int argc
, ACE_TCHAR
*argv
[])
170 //FUZZ: disable check_for_lack_ACE_OS
171 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT (":?"), 1, 1);
172 //FUZZ: enable check_for_lack_ACE_OS
174 if (getopt
.long_option (ACE_TEXT ("GroupStart"),
176 ACE_Get_Opt::ARG_REQUIRED
) != 0)
177 ACE_ERROR_RETURN ((LM_ERROR
,
178 ACE_TEXT (" Unable to add GroupStart option.\n")),
181 if (getopt
.long_option (ACE_TEXT ("Groups"),
183 ACE_Get_Opt::ARG_REQUIRED
) != 0)
184 ACE_ERROR_RETURN ((LM_ERROR
,
185 ACE_TEXT (" Unable to add Groups option.\n")), 1);
187 if (getopt
.long_option (ACE_TEXT ("Debug"),
189 ACE_Get_Opt::NO_ARG
) != 0)
190 ACE_ERROR_RETURN ((LM_ERROR
,
191 ACE_TEXT (" Unable to add Debug option.\n")), 1);
193 if (getopt
.long_option (ACE_TEXT ("Role"),
195 ACE_Get_Opt::ARG_REQUIRED
) != 0)
196 ACE_ERROR_RETURN ((LM_ERROR
,
197 ACE_TEXT (" Unable to add Role option.\n")), 1);
199 if (getopt
.long_option (ACE_TEXT ("SDM_options"),
201 ACE_Get_Opt::ARG_REQUIRED
) != 0)
202 ACE_ERROR_RETURN ((LM_ERROR
,
203 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
206 if (getopt
.long_option (ACE_TEXT ("Iterations"),
208 ACE_Get_Opt::ARG_REQUIRED
) != 0)
209 ACE_ERROR_RETURN ((LM_ERROR
,
210 ACE_TEXT (" Unable to add iterations option.\n")),
213 if (getopt
.long_option (ACE_TEXT ("TTL"),
215 ACE_Get_Opt::ARG_REQUIRED
) != 0)
216 ACE_ERROR_RETURN ((LM_ERROR
,
217 ACE_TEXT (" Unable to add TTL option.\n")),
220 if (getopt
.long_option (ACE_TEXT ("Wait"),
222 ACE_Get_Opt::ARG_REQUIRED
) != 0)
223 ACE_ERROR_RETURN ((LM_ERROR
,
224 ACE_TEXT (" Unable to add wait option.\n")),
227 if (getopt
.long_option (ACE_TEXT ("help"),
229 ACE_Get_Opt::NO_ARG
) != 0)
230 ACE_ERROR_RETURN ((LM_ERROR
,
231 ACE_TEXT (" Unable to add help option.\n")),
234 // Now, let's parse it...
237 //FUZZ: disable check_for_lack_ACE_OS
238 while ((c
= getopt ()) != EOF
)
240 //FUZZ: enable check_for_lack_ACE_OS
244 // Long Option. This should never happen.
249 // @todo validate all these, i.e., must be within range
250 // 224.255.0.0 to 238.255.255.255, but we only allow the
251 // administrative "site local" range, 239.255.0.0 to
253 ACE_TCHAR
*group
= getopt
.opt_arg ();
254 if (this->group_start_
.set (group
) != 0)
256 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Bad group address:%s\n"),
262 this->iterations_
= ACE_OS::atoi (getopt
.opt_arg ());
266 int n
= ACE_OS::atoi (getopt
.opt_arg ());
267 // I'm assuming 0 means unlimited, so just use whatever the
269 if (IP_MAX_MEMBERSHIPS
== 0)
272 this->groups_
= ACE_MIN (ACE_MAX (n
, MCT_MIN_GROUPS
),
281 ACE_TCHAR
*c
= getopt
.opt_arg ();
282 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("CONSUMER")) == 0)
283 this->role_
= CONSUMER
;
284 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("PRODUCER")) == 0)
285 this->role_
= PRODUCER
;
295 //@todo add back OPT_BINDADDR_NO...
296 ACE_TCHAR
*c
= getopt
.opt_arg ();
297 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
298 ACE_SET_BITS (this->sdm_opts_
,
299 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
300 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
301 ACE_CLR_BITS (this->sdm_opts_
,
302 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
303 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
305 ACE_CLR_BITS (this->sdm_opts_
,
306 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
307 ACE_SET_BITS (this->sdm_opts_
,
308 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR
);
310 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
311 ACE_SET_BITS (this->sdm_opts_
,
312 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
313 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
314 ACE_CLR_BITS (this->sdm_opts_
,
315 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
316 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
318 ACE_CLR_BITS (this->sdm_opts_
,
319 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
320 ACE_SET_BITS (this->sdm_opts_
,
321 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE
);
323 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPTS")) == 0)
324 this->sdm_opts_
= ACE_SOCK_Dgram_Mcast::DEFOPTS
;
333 this->ttl_
= ACE_OS::atoi (getopt
.opt_arg ());
336 this->wait_
= ACE_OS::atoi (getopt
.opt_arg ());
339 // This means an option requiring an argument didn't have one.
340 ACE_ERROR ((LM_ERROR
,
341 ACE_TEXT (" Option '%c' requires an argument but ")
342 ACE_TEXT ("none was supplied\n"),
350 if (ACE_OS::strcmp (argv
[getopt
.opt_ind () - 1], ACE_TEXT ("-?")) != 0
351 && getopt
.opt_opt () != 'h')
352 // Don't allow unknown options.
353 ACE_ERROR ((LM_ERROR
,
354 ACE_TEXT (" Found an unknown option (%c) ")
355 ACE_TEXT ("we couldn't handle.\n"),
357 // getopt.last_option ())); //readd with "%s" when
358 // last_option() is available.
369 ACE_ERROR ((LM_ERROR
,
370 ACE_TEXT ("usage: %s [options]\n")
371 ACE_TEXT ("Options:\n")
372 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
373 ACE_TEXT ("starting multicast group address\n")
375 ACE_TEXT ("(default=239.255.0.1:16000)\n")
376 ACE_TEXT (" -n {#} --Groups={#} ")
377 ACE_TEXT ("number of groups (default=5)\n")
378 ACE_TEXT (" -d --Debug ")
379 ACE_TEXT ("debug flag (default=off)\n")
380 ACE_TEXT (" -r {STRING} --Role={STRING} ")
381 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
383 ACE_TEXT ("(default=BOTH)\n")
384 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
385 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
387 ACE_TEXT ("(default=DEFOPTS)\n")
388 ACE_TEXT (" -i {#} --Iterations={#} ")
389 ACE_TEXT ("number of iterations (default=100)\n")
390 ACE_TEXT (" -t {#} --TTL={#} ")
391 ACE_TEXT ("time to live (default=1)\n")
392 ACE_TEXT (" -w {#} --Wait={#} ")
393 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
395 ACE_TEXT ("(default=2)\n")
396 ACE_TEXT (" -h/? --help ")
397 ACE_TEXT ("show this message\n"),
407 MCT_Config::dump () const
409 ACE_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
410 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" Dumping MCT_Config\n")));
411 ACE_DEBUG ((LM_DEBUG
,
412 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
413 IP_MAX_MEMBERSHIPS
));
414 ACE_DEBUG ((LM_DEBUG
,
415 ACE_TEXT ("\tgroups_ = %d\n"),
417 ACE_DEBUG ((LM_DEBUG
,
418 ACE_TEXT ("\trole_ = %s\n"),
419 (ACE_BIT_ENABLED (this->role_
, PRODUCER
)
420 && ACE_BIT_ENABLED (this->role_
, CONSUMER
))
421 ? ACE_TEXT ("PRODUCER/CONSUMER")
422 : ACE_BIT_ENABLED (this->role_
, PRODUCER
)
423 ? ACE_TEXT ("PRODUCER")
424 : ACE_TEXT ("CONSUMER")));
425 ACE_DEBUG ((LM_DEBUG
,
426 ACE_TEXT ("\tsdm_options_ = %d\n"),
428 ACE_DEBUG ((LM_DEBUG
,
429 ACE_TEXT ("\titerations_ = %d\n"),
431 ACE_DEBUG ((LM_DEBUG
,
432 ACE_TEXT ("\tttl_ = %d\n"),
434 ACE_DEBUG ((LM_DEBUG
,
435 ACE_TEXT ("\twait_ = %d\n"),
437 // Note that this call to get_host_addr is the non-reentrant
438 // version, but it's okay for us.
439 ACE_DEBUG ((LM_DEBUG
,
440 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
441 this->group_start_
.get_host_addr (),
442 this->group_start_
.get_port_number ()));
444 ACE_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
448 MCT_Config::set_group (int port
, const char *group
)
450 return group_start_
.set (port
, group
);
453 /******************************************************************************/
455 class MCT_Event_Handler
: public ACE_Event_Handler
458 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
459 = ACE_SOCK_Dgram_Mcast::DEFOPTS
);
460 ~MCT_Event_Handler () override
;
462 int join (const ACE_INET_Addr
&mcast_addr
,
464 const ACE_TCHAR
*net_if
= 0);
465 int leave (const ACE_INET_Addr
&mcast_addr
,
466 const ACE_TCHAR
*net_if
= 0);
468 // = Event Handler hooks.
469 int handle_input (ACE_HANDLE handle
) override
;
470 int handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask close_mask
) override
;
472 ACE_HANDLE
get_handle () const override
;
475 ACE_SOCK_Dgram_Mcast
*mcast ();
476 int find (const char *buf
);
479 ACE_SOCK_Dgram_Mcast mcast_
;
481 // List of groups we've joined
482 ACE_Vector
<ACE_CString
*> address_vec_
;
484 // Flag used to set the 'finished' flag when the last event handler
485 // gets removed from the reactor.
486 static ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> active_handlers_
;
489 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> MCT_Event_Handler::active_handlers_
= 0;
491 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
)
494 // Increment the number of active handlers in the reactor. Note this isn't
495 // really correct, but it should work for our simple example.
496 ++MCT_Event_Handler::active_handlers_
;
499 MCT_Event_Handler::~MCT_Event_Handler ()
501 size_t size
= this->address_vec_
.size ();
502 for (size_t i
= 0; i
< size
; ++i
)
504 delete this->address_vec_
[i
];
505 this->address_vec_
[i
] = 0;
510 ACE_SOCK_Dgram_Mcast
*
511 MCT_Event_Handler::mcast ()
513 return &this->mcast_
;
517 MCT_Event_Handler::find (const char *buf
)
519 size_t size
= this->address_vec_
.size ();
521 for (i
= 0; i
< size
; ++i
)
523 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
527 // Not found, so output message we received along with a list of groups
528 // we've joined for debugging.
530 for (i
= 0; i
< size
; ++i
)
533 local
+= this->address_vec_
[i
]->c_str ();
536 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("%C not in:\n%C"),
537 buf
, local
.c_str ()));
544 MCT_Event_Handler::join (const ACE_INET_Addr
&mcast_addr
,
546 const ACE_TCHAR
*net_if
)
548 char buf
[MAX_STRING_SIZE
];
549 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
550 mcast_addr
.get_host_addr (),
551 mcast_addr
.get_port_number ());
553 if (this->mcast_
.join (mcast_addr
, reuse_addr
, net_if
) == -1)
554 ACE_ERROR_RETURN ((LM_ERROR
,
555 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
557 ACE_TEXT ("failed")),
559 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Joined %C\n"), buf
));
561 ACE_CString
*str
= 0;
562 ACE_NEW_RETURN (str
, ACE_CString (buf
), -1);
563 this->address_vec_
.push_back (str
);
568 MCT_Event_Handler::leave (const ACE_INET_Addr
&mcast_addr
,
569 const ACE_TCHAR
*net_if
)
571 if (this->mcast_
.leave (mcast_addr
, net_if
) == 0)
573 char buf
[MAX_STRING_SIZE
];
574 size_t size
= this->address_vec_
.size ();
575 for (size_t i
= 0; i
< size
; ++i
)
577 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
578 mcast_addr
.get_host_addr (),
579 mcast_addr
.get_port_number ());
580 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
582 this->address_vec_
[i
]->set ("");
592 MCT_Event_Handler::handle_input (ACE_HANDLE
/*handle*/)
594 char buf
[MAX_STRING_SIZE
];
595 ACE_OS::memset (buf
, 0, sizeof buf
);
598 if (this->mcast ()->recv (buf
, sizeof buf
, addr
) == -1)
601 ACE_ERROR_RETURN ((LM_ERROR
,
602 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
603 ACE_TEXT ("calling recv\n")), -1);
606 // Zero length buffer means we are done.
607 if (ACE_OS::strlen (buf
) == 0)
609 else if (this->find (buf
) == -1)
612 ACE_DEBUG ((LM_ERROR
,
613 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
614 ACE_TEXT ("Received dgram for a group we didn't join ")
622 MCT_Event_Handler::handle_close (ACE_HANDLE
/*fd*/,
623 ACE_Reactor_Mask
/*close_mask*/)
625 // If this is the last handler, use the finished flag to signal
627 if (--MCT_Event_Handler::active_handlers_
== 0)
630 // The DONT_CALL flag keeps the reactor from calling handle_close ()
631 // again, since we commit suicide below.
632 this->reactor ()->remove_handler (this,
633 ACE_Event_Handler::ALL_EVENTS_MASK
|
634 ACE_Event_Handler::DONT_CALL
);
641 MCT_Event_Handler::get_handle () const
643 return this->mcast_
.get_handle ();
646 /******************************************************************************/
649 * Our MCT_Task object will be an Active Object if we are running the Consumer
650 * side of the test. open() calls active() which creates a thread and calls
651 * the svc() method that calls runs the reactor event loop.
653 class MCT_Task
: public ACE_Task
<ACE_NULL_SYNCH
>
656 MCT_Task (const MCT_Config
&config
,
657 ACE_Reactor
*reactor
= ACE_Reactor::instance ());
658 ~MCT_Task () override
;
660 //FUZZ: disable check_for_lack_ACE_OS
662 int open (void *args
= 0) override
;
664 //FUZZ: enable check_for_lack_ACE_OS
667 const MCT_Config
&config_
;
670 MCT_Task::MCT_Task (const MCT_Config
&config
,
671 ACE_Reactor
*reactor
)
674 this->reactor (reactor
);
677 MCT_Task::~MCT_Task ()
681 MCT_Task::open (void *)
683 MCT_Event_Handler
*handler
;
685 ACE_INET_Addr addr
= this->config_
.group_start ();
686 int groups
= this->config_
.groups ();
687 for (int i
= 0; i
< groups
; ++i
)
689 ACE_NEW_RETURN (handler
,
690 MCT_Event_Handler (this->config_
.options ()), -1);
691 // We subscribe to all groups for the first one and one each for
695 // go ahead and hide the other one since we want our own.
696 ACE_INET_Addr addr
= this->config_
.group_start ();
697 for (int j
= 0; j
< groups
; ++j
)
699 // If OPT_BINDADDR_YES is set, this will fail after the first
700 // join, so just break and keep on going, otherwise it's a
703 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
,
704 this->config_
.options ()))
707 if (handler
->join (addr
) == -1)
708 ACE_ERROR_RETURN ((LM_ERROR
,
709 ACE_TEXT ("MCT_Task::open - join error\n")),
716 if (handler
->join (addr
) == -1)
717 ACE_ERROR_RETURN ((LM_ERROR
,
718 ACE_TEXT ("MCT_Task::open - join error\n")),
724 if (this->reactor ()->register_handler (handler
, READ_MASK
) == -1)
725 ACE_ERROR_RETURN ((LM_ERROR
,
726 ACE_TEXT ("MCT_Task::open - cannot register ")
727 ACE_TEXT ("handler\n")),
731 if (this->activate (THR_NEW_LWP
) == -1)
732 ACE_ERROR_RETURN ((LM_ERROR
,
734 ACE_TEXT ("MCT_TASK:open - activate failed")),
742 // make sure this thread owns the reactor or handle_events () won't do
744 this->reactor ()->owner (ACE_Thread::self ());
746 // loop and call handle_events...
748 this->reactor ()->handle_events ();
753 /******************************************************************************/
755 int send_dgram (ACE_SOCK_Dgram
&socket
, ACE_INET_Addr addr
, int done
= 0)
757 // Send each message twice, once to the right port, and once to the "wrong"
758 // port. This helps generate noise and lets us see if port filtering is
760 const char *address
= addr
.get_host_addr ();
761 int port
= addr
.get_port_number ();
763 for (int i
= 0; i
< 2; ++i
)
765 char buf
[MAX_STRING_SIZE
];
769 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d", address
, port
);
770 //ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sending (%s)\n"), buf));
771 if (socket
.send (buf
, ACE_OS::strlen (buf
),addr
) == -1)
772 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
773 ACE_TEXT ("send_dgram - error calling send on ")
774 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
775 addr
.set_port_number (++port
);
780 int producer (MCT_Config
&config
)
784 //FUZZ: disable check_for_lack_ACE_OS
785 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting producer...\n")));
786 ACE_SOCK_Dgram
socket (ACE_sap_any_cast (ACE_INET_Addr
&));
787 //FUZZ: enable check_for_lack_ACE_OS
789 // set the TTL or hop count based on the config.ttl () value
790 if (config
.ttl () > 1 && config
.group_start().get_type() == AF_INET
)
792 int ttl
= config
.ttl ();
793 if (socket
.set_option (IPPROTO_IP
,
797 ACE_DEBUG ((LM_ERROR
,
798 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
802 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl
));
804 #if defined (ACE_HAS_IPV6)
807 // for IPv6, a hop limit is used instead of TTL
808 int hops
= config
.ttl ();
809 if (socket
.set_option (IPPROTO_IPV6
,
813 ACE_DEBUG ((LM_ERROR
,
814 ACE_TEXT ("could net set socket option IPV6_MULTICAST_HOPS")
815 ACE_TEXT (" = %d\n"),
818 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("set IPV6_MULTICAST_HOPS = %d\n"),
822 // Turn on multicast loopback since the test relies on it and the
823 // ACE_SOCK_Dgram_Mcast documents the loopback state as indeterminate.
825 if (socket
.set_option (IPPROTO_IPV6
,
827 (void *)&do_loopback
,
828 sizeof (do_loopback
)) == -1)
830 if (errno
== ENOTSUP
)
832 ACE_TEXT ("IPV6_MULTICAST_LOOP not supported\n")));
834 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("%p\n"),
835 ACE_TEXT ("Can't set IPV6_MULTICAST_LOOP")));
837 #endif /* ACE_HAS_IPV6 */
840 int iterations
= config
.iterations ();
841 // we add an extra 5 groups for noise.
842 int groups
= config
.groups () + 5;
843 for (int i
= 0; (i
< iterations
|| iterations
== 0) && !finished
; ++i
)
845 ACE_INET_Addr addr
= config
.group_start ();
846 for (int j
= 0; j
< groups
&& !finished
; ++j
)
848 if ((retval
+= send_dgram (socket
, addr
,
849 ((i
+ 1) == iterations
))) == -1)
850 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Calling send_dgram.\n")));
851 if ((retval
+= advance_addr (addr
)) == -1)
852 ACE_ERROR ((LM_ERROR
,
853 ACE_TEXT ("Calling advance_addr.\n")));
855 // Give the task thread a chance to run.
856 ACE_Thread::yield ();
862 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
863 * Note that the algorithm is somewhat simplistic, but sufficient for our
866 int advance_addr (ACE_INET_Addr
&addr
)
869 if (addr
.get_type () == AF_INET
)
871 ::sscanf (addr
.get_host_addr (), "%d.%d.%d.%d", &a
, &b
, &c
, &d
);
893 ACE_ERROR_RETURN ((LM_ERROR
,
894 ACE_TEXT ("advance_addr - Cannot advance multicast ")
895 ACE_TEXT ("group address past %s\n"),
896 addr
.get_host_addr ()),
899 ACE_TCHAR buf
[MAX_STRING_SIZE
];
900 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, ACE_TEXT ("%d.%d.%d.%d:%d"),
901 a
, b
, c
, d
, addr
.get_port_number ());
905 #if defined (ACE_HAS_IPV6)
906 else // assume AF_INET6
908 sockaddr_in6
*saddr
= reinterpret_cast<sockaddr_in6
*> (addr
.get_addr ());
909 unsigned char *sin6_addr
= reinterpret_cast<unsigned char *> (&saddr
->sin6_addr
);
912 // i >= 2 is used here so that the flags and scope for the
913 // multicast address are not changed
914 while (i
>= 2 && sin6_addr
[i
] == 0xff)
926 ACE_ERROR_RETURN ((LM_ERROR
,
927 ACE_TEXT ("advance_addr - Cannot advance ")
928 ACE_TEXT ("multicast group address past %s\n"),
929 addr
.get_host_addr ()),
933 #endif /* ACE_HAS_IPV6 */
939 run_main (int argc
, ACE_TCHAR
*argv
[])
943 retval
= config
.open (argc
, argv
);
947 const ACE_TCHAR
*temp
= ACE_TEXT ("Multicast_Test_IPV6");
948 ACE_TString test
= temp
;
950 u_long role
= config
.role ();
951 if (ACE_BIT_DISABLED (role
, MCT_Config::PRODUCER
)
952 || ACE_BIT_DISABLED (role
, MCT_Config::CONSUMER
))
954 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
955 test
+= ACE_TEXT ("-PRODUCER");
957 test
+= ACE_TEXT ("-CONSUMER");
960 // Start test only if options are valid.
961 ACE_START_TEST (test
.c_str ());
963 #if defined (ACE_HAS_IPV6)
965 # if !defined (ACE_LACKS_UNIX_SIGNALS)
966 // Register a signal handler to close down application gracefully.
967 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGINT
);
970 // Dump the configuration info to the log if caller passed debug option.
974 ACE_Reactor
*reactor
= ACE_Reactor::instance ();
976 MCT_Task
*task
= new MCT_Task (config
, reactor
);
978 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
980 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting consumer...\n")));
981 // Open makes it an active object.
982 retval
+= task
->open ();
985 // now produce the datagrams...
986 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
987 retval
+= producer (config
);
989 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
991 // and wait for everything to finish
993 ACE_TEXT ("start waiting for consumer to finish...\n")));
994 // Wait for the threads to exit.
995 // But, wait for a limited time since we could hang if the last udp
996 // message isn't received.
997 ACE_Time_Value
max_wait ( config
.wait ()/* seconds */);
998 ACE_Time_Value
wait_time (ACE_OS::gettimeofday () + max_wait
);
999 ACE_Time_Value
*ptime
= ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
)
1001 if (ACE_Thread_Manager::instance ()->wait (ptime
) == -1)
1004 ACE_ERROR ((LM_ERROR
,
1005 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
1008 ACE_OS::perror (ACE_TEXT ("wait"));
1015 #endif /* ACE_HAS_IPV6 */
1017 return (retval
== 0 && error
== 0) ? 0 : 1;
1022 run_main (int, ACE_TCHAR
*[])
1024 ACE_START_TEST (ACE_TEXT ("Multicast_Test_IPV6"));
1026 ACE_ERROR ((LM_INFO
,
1027 ACE_TEXT ("This test must be run on a platform ")
1028 ACE_TEXT ("that support IP multicast and threads.\n")));
1034 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */