1 // ============================================================================
7 // This program tests ACE_SOCK_Dgram_Mcast class.
8 // It specifically tests subscribing to multiple groups on the same socket
9 // on one or more physical interfaces (if available).
11 // The test can be run as a producer, consumer, or both producer/consumer
12 // (default). The test requires at least two (2) multicast groups which can
13 // be configured as command line options. The consumer subscribes to a
14 // single group per instance and an additional instance tries to subscribe
15 // to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
16 // bind()'s the first address to the socket, additional joins will fail).
17 // The producer iterates through the list of group addresses and sends a
18 // single message containing the destination address and port to each one.
19 // It also sends messages to five (5) additional groups and a message to an
20 // additional port for each group in order to produce a bit of "noise" in
21 // order to help validate how well the multicast filtering works on a
22 // particular platform.
24 // The list of destination groups start at 239.255.0.1 (default) and
25 // increment by 1 up to 5 (default) groups. Both of these values, as well
26 // as others, can be overridden via command-line options. Use the -?
27 // option to display the usage message...
30 // Don Hinton <dhinton@dresystems.com>
32 // ============================================================================
34 #include "test_config.h"
35 #include "ace/Get_Opt.h"
36 #include "ace/Vector_T.h"
37 #include "ace/SOCK_Dgram_Mcast.h"
39 #include "ace/Reactor.h"
40 #include "ace/OS_NS_stdio.h"
41 #include "ace/OS_NS_string.h"
42 #include "ace/OS_NS_strings.h"
43 #include "ace/OS_NS_sys_time.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infninite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
57 static sig_atomic_t finished
= 0;
58 extern "C" void handler (int)
63 static const int MCT_ITERATIONS
= 10;
64 static const int MCT_GROUPS
= 5;
65 static const int MCT_MIN_GROUPS
= 2;
67 static const char MCT_START_GROUP
[] = "239.255.0.1";
68 static const int MCT_START_PORT
= 16000;
70 static const size_t MAX_STRING_SIZE
= 200;
72 int advance_addr (ACE_INET_Addr
&addr
);
74 // Keep track of errors so we can report them on exit.
75 static sig_atomic_t error
= 0;
78 * MCast_Config holds configuration data for this test.
88 BOTH
= PRODUCER
| CONSUMER
92 : group_start_ (MCT_START_PORT
, MCT_START_GROUP
),
96 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS
),
97 iterations_ (MCT_ITERATIONS
),
101 if (IP_MAX_MEMBERSHIPS
== 0)
102 this->groups_
= MCT_GROUPS
;
104 this->groups_
= ACE_MIN (IP_MAX_MEMBERSHIPS
, MCT_GROUPS
);
109 //FUZZ: disable check_for_lack_ACE_OS
110 int open (int argc
, ACE_TCHAR
*argv
[]);
111 //FUZZ: enable check_for_lack_ACE_OS
113 int debug (void) const { return this->debug_
;}
114 void dump (void) const;
115 int groups (void) const { return this->groups_
;}
116 const ACE_INET_Addr
group_start (void) const { return this->group_start_
;}
117 u_long
role (void) const { return this->role_
;}
118 int iterations (void) const { return this->iterations_
;}
119 int ttl (void) const { return this->ttl_
;}
121 //FUZZ: disable check_for_lack_ACE_OS
122 int wait (void) const { return this->wait_
;}
123 //FUZZ: enable check_for_lack_ACE_OS
125 ACE_SOCK_Dgram_Mcast::options
options (void) const
127 return static_cast<ACE_SOCK_Dgram_Mcast::options
> (this->sdm_opts_
);
131 // Starting group address. (only IPv4 capable right now...)
132 ACE_INET_Addr group_start_
;
134 // Number of groups we will try to use in the test.
140 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
143 // ACE_SOCK_Dgram_Mcast ctor options
146 // Producer iterations
149 // TTL, time to live, for use over routers.
152 // Time to wait on CONSUMER threads to end before killing test.
157 MCT_Config::open (int argc
, ACE_TCHAR
*argv
[])
162 //FUZZ: disable check_for_lack_ACE_OS
163 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT (":?"), 1, 1);
164 //FUZZ: enable check_for_lack_ACE_OS
166 if (getopt
.long_option (ACE_TEXT ("GroupStart"),
168 ACE_Get_Opt::ARG_REQUIRED
) != 0)
169 ACE_ERROR_RETURN ((LM_ERROR
,
170 ACE_TEXT (" Unable to add GroupStart option.\n")),
173 if (getopt
.long_option (ACE_TEXT ("Groups"),
175 ACE_Get_Opt::ARG_REQUIRED
) != 0)
176 ACE_ERROR_RETURN ((LM_ERROR
,
177 ACE_TEXT (" Unable to add Groups option.\n")), 1);
179 if (getopt
.long_option (ACE_TEXT ("Debug"),
181 ACE_Get_Opt::NO_ARG
) != 0)
182 ACE_ERROR_RETURN ((LM_ERROR
,
183 ACE_TEXT (" Unable to add Debug option.\n")), 1);
185 if (getopt
.long_option (ACE_TEXT ("Role"),
187 ACE_Get_Opt::ARG_REQUIRED
) != 0)
188 ACE_ERROR_RETURN ((LM_ERROR
,
189 ACE_TEXT (" Unable to add Role option.\n")), 1);
191 if (getopt
.long_option (ACE_TEXT ("SDM_options"),
193 ACE_Get_Opt::ARG_REQUIRED
) != 0)
194 ACE_ERROR_RETURN ((LM_ERROR
,
195 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
198 if (getopt
.long_option (ACE_TEXT ("Iterations"),
200 ACE_Get_Opt::ARG_REQUIRED
) != 0)
201 ACE_ERROR_RETURN ((LM_ERROR
,
202 ACE_TEXT (" Unable to add iterations option.\n")),
205 if (getopt
.long_option (ACE_TEXT ("TTL"),
207 ACE_Get_Opt::ARG_REQUIRED
) != 0)
208 ACE_ERROR_RETURN ((LM_ERROR
,
209 ACE_TEXT (" Unable to add TTL option.\n")),
212 if (getopt
.long_option (ACE_TEXT ("Wait"),
214 ACE_Get_Opt::ARG_REQUIRED
) != 0)
215 ACE_ERROR_RETURN ((LM_ERROR
,
216 ACE_TEXT (" Unable to add wait option.\n")),
219 if (getopt
.long_option (ACE_TEXT ("help"),
221 ACE_Get_Opt::NO_ARG
) != 0)
222 ACE_ERROR_RETURN ((LM_ERROR
,
223 ACE_TEXT (" Unable to add help option.\n")),
226 //FUZZ: disable check_for_lack_ACE_OS
227 // Now, let's parse it...
229 while ((c
= getopt ()) != EOF
)
231 //FUZZ: enable check_for_lack_ACE_OS
235 // Long Option. This should never happen.
240 // @todo validate all these, i.e., must be within range
241 // 224.255.0.0 to 238.255.255.255, but we only allow the
242 // administrative "site local" range, 239.255.0.0 to
244 ACE_TCHAR
*group
= getopt
.opt_arg ();
245 if (this->group_start_
.set (group
) != 0)
247 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Bad group address:%s\n"),
253 this->iterations_
= ACE_OS::atoi (getopt
.opt_arg ());
257 int n
= ACE_OS::atoi (getopt
.opt_arg ());
258 // I'm assuming 0 means unlimited, so just use whatever the
259 // user provides. Seems to work okay on Solaris 5.8.
260 if (IP_MAX_MEMBERSHIPS
== 0)
263 this->groups_
= ACE_MIN (ACE_MAX (n
, MCT_MIN_GROUPS
),
272 ACE_TCHAR
*c
= getopt
.opt_arg ();
273 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("CONSUMER")) == 0)
274 this->role_
= CONSUMER
;
275 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("PRODUCER")) == 0)
276 this->role_
= PRODUCER
;
286 //@todo add back OPT_BINDADDR_NO...
287 ACE_TCHAR
*c
= getopt
.opt_arg ();
288 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
289 ACE_SET_BITS (this->sdm_opts_
,
290 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
291 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
292 ACE_CLR_BITS (this->sdm_opts_
,
293 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
294 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
296 ACE_CLR_BITS (this->sdm_opts_
,
297 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
298 ACE_SET_BITS (this->sdm_opts_
,
299 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR
);
301 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
302 ACE_SET_BITS (this->sdm_opts_
,
303 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
304 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
305 ACE_CLR_BITS (this->sdm_opts_
,
306 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
307 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
309 ACE_CLR_BITS (this->sdm_opts_
,
310 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
311 ACE_SET_BITS (this->sdm_opts_
,
312 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE
);
314 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPTS")) == 0)
315 this->sdm_opts_
= ACE_SOCK_Dgram_Mcast::DEFOPTS
;
324 this->ttl_
= ACE_OS::atoi (getopt
.opt_arg ());
327 this->wait_
= ACE_OS::atoi (getopt
.opt_arg ());
330 // This means an option requiring an argument didn't have one.
331 ACE_ERROR ((LM_ERROR
,
332 ACE_TEXT (" Option '%c' requires an argument but ")
333 ACE_TEXT ("none was supplied\n"),
341 if (ACE_OS::strcmp (argv
[getopt
.opt_ind () - 1], ACE_TEXT ("-?")) != 0
342 && getopt
.opt_opt () != 'h')
343 // Don't allow unknown options.
344 ACE_ERROR ((LM_ERROR
,
345 ACE_TEXT (" Found an unknown option (%c) ")
346 ACE_TEXT ("we couldn't handle.\n"),
348 // getopt.last_option ())); //readd with "%s" when
349 // last_option() is available.
360 ACE_ERROR ((LM_ERROR
,
361 ACE_TEXT ("usage: %s [options]\n")
362 ACE_TEXT ("Options:\n")
363 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
364 ACE_TEXT ("starting multicast group address\n")
366 ACE_TEXT ("(default=239.255.0.1:16000)\n")
367 ACE_TEXT (" -n {#} --Groups={#} ")
368 ACE_TEXT ("number of groups (default=5)\n")
369 ACE_TEXT (" -d --Debug ")
370 ACE_TEXT ("debug flag (default=off)\n")
371 ACE_TEXT (" -r {STRING} --Role={STRING} ")
372 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
374 ACE_TEXT ("(default=BOTH)\n")
375 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
376 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
378 ACE_TEXT ("(default=DEFOPTS)\n")
379 ACE_TEXT (" -i {#} --Iterations={#} ")
380 ACE_TEXT ("number of iterations (default=100)\n")
381 ACE_TEXT (" -t {#} --TTL={#} ")
382 ACE_TEXT ("time to live (default=1)\n")
383 ACE_TEXT (" -w {#} --Wait={#} ")
384 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
386 ACE_TEXT ("(default=2)\n")
387 ACE_TEXT (" -h/? --help ")
388 ACE_TEXT ("show this message\n"),
398 MCT_Config::dump (void) const
400 ACE_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
401 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" Dumping MCT_Config\n")));
402 ACE_DEBUG ((LM_DEBUG
,
403 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
404 IP_MAX_MEMBERSHIPS
));
405 ACE_DEBUG ((LM_DEBUG
,
406 ACE_TEXT ("\tgroups_ = %d\n"),
408 ACE_DEBUG ((LM_DEBUG
,
409 ACE_TEXT ("\trole_ = %s\n"),
410 (ACE_BIT_ENABLED (this->role_
, PRODUCER
)
411 && ACE_BIT_ENABLED (this->role_
, CONSUMER
))
412 ? ACE_TEXT ("PRODUCER/CONSUMER")
413 : ACE_BIT_ENABLED (this->role_
, PRODUCER
)
414 ? ACE_TEXT ("PRODUCER")
415 : ACE_TEXT ("CONSUMER")));
416 ACE_DEBUG ((LM_DEBUG
,
417 ACE_TEXT ("\tsdm_options_ = %d\n"),
419 ACE_DEBUG ((LM_DEBUG
,
420 ACE_TEXT ("\titerations_ = %d\n"),
422 ACE_DEBUG ((LM_DEBUG
,
423 ACE_TEXT ("\tttl_ = %d\n"),
425 ACE_DEBUG ((LM_DEBUG
,
426 ACE_TEXT ("\twait_ = %d\n"),
428 // Note that this call to get_host_addr is the non-reentrant
429 // version, but it's okay for us.
430 ACE_DEBUG ((LM_DEBUG
,
431 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
432 this->group_start_
.get_host_addr (),
433 this->group_start_
.get_port_number ()));
435 ACE_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
438 /******************************************************************************/
440 class MCT_Event_Handler
: public ACE_Event_Handler
443 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
444 = ACE_SOCK_Dgram_Mcast::DEFOPTS
);
445 virtual ~MCT_Event_Handler (void);
447 int join (const ACE_INET_Addr
&mcast_addr
,
449 const ACE_TCHAR
*net_if
= 0);
450 int leave (const ACE_INET_Addr
&mcast_addr
,
451 const ACE_TCHAR
*net_if
= 0);
453 // = Event Handler hooks.
454 virtual int handle_input (ACE_HANDLE handle
);
455 virtual int handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask close_mask
);
457 virtual ACE_HANDLE
get_handle (void) const;
459 // Turn loopback on/off. Must be called after at least 1 join() is performed.
460 int loopback (bool on_off
);
463 ACE_SOCK_Dgram_Mcast
*mcast (void);
464 int find (const char *buf
);
467 ACE_SOCK_Dgram_Mcast mcast_
;
469 // List of groups we've joined
470 ACE_Vector
<ACE_CString
*> address_vec_
;
472 // Flag used to set the 'finished' flag when the last event handler
473 // gets removed from the reactor.
474 static ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> active_handlers_
;
477 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> MCT_Event_Handler::active_handlers_
= 0;
479 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
)
482 // Increment the number of active handlers in the reactor. Note this isn't
483 // really correct, but it should work for our simple example.
484 ++MCT_Event_Handler::active_handlers_
;
487 MCT_Event_Handler::~MCT_Event_Handler (void)
489 size_t size
= this->address_vec_
.size ();
490 for (size_t i
= 0; i
< size
; ++i
)
492 delete this->address_vec_
[i
];
493 this->address_vec_
[i
] = 0;
499 ACE_SOCK_Dgram_Mcast
*
500 MCT_Event_Handler::mcast (void)
502 return &this->mcast_
;
506 MCT_Event_Handler::find (const char *buf
)
508 size_t const size
= this->address_vec_
.size ();
510 for (i
= 0; i
< size
; ++i
)
512 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
516 // Not found, so output message we received along with a list of groups
517 // we've joined for debugging.
519 for (i
= 0; i
< size
; ++i
)
522 local
+= this->address_vec_
[i
]->c_str ();
525 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("%C not in:\n%C"),
526 buf
, local
.c_str ()));
533 MCT_Event_Handler::join (const ACE_INET_Addr
&mcast_addr
,
535 const ACE_TCHAR
*net_if
)
537 char buf
[MAX_STRING_SIZE
];
538 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
539 mcast_addr
.get_host_addr (),
540 mcast_addr
.get_port_number ());
542 if (this->mcast_
.join (mcast_addr
, reuse_addr
, net_if
) == -1)
543 ACE_ERROR_RETURN ((LM_ERROR
,
544 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
546 ACE_TEXT ("failed")),
548 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Joined %C\n"), buf
));
550 ACE_CString
*str
= 0;
551 ACE_NEW_RETURN (str
, ACE_CString (buf
), -1);
552 this->address_vec_
.push_back (str
);
557 MCT_Event_Handler::leave (const ACE_INET_Addr
&mcast_addr
,
558 const ACE_TCHAR
*net_if
)
560 if (this->mcast_
.leave (mcast_addr
, net_if
) == 0)
562 char buf
[MAX_STRING_SIZE
];
563 size_t size
= this->address_vec_
.size ();
564 for (size_t i
= 0; i
< size
; ++i
)
566 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
567 mcast_addr
.get_host_addr (),
568 mcast_addr
.get_port_number ());
569 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
571 this->address_vec_
[i
]->set ("");
581 MCT_Event_Handler::handle_input (ACE_HANDLE
/*handle*/)
583 char buf
[MAX_STRING_SIZE
];
584 ACE_OS::memset (buf
, 0, sizeof buf
);
587 if (this->mcast ()->recv (buf
, sizeof buf
, addr
) == -1)
590 ACE_ERROR_RETURN ((LM_ERROR
,
591 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
592 ACE_TEXT ("calling recv\n")), -1);
595 // Zero length buffer means we are done.
596 if (ACE_OS::strlen (buf
) == 0)
598 else if (this->find (buf
) == -1)
601 ACE_DEBUG ((LM_ERROR
,
602 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
603 ACE_TEXT ("Received dgram for a group we didn't join ")
611 MCT_Event_Handler::handle_close (ACE_HANDLE
/*fd*/,
612 ACE_Reactor_Mask
/*close_mask*/)
614 // If this is the last handler, use the finished flag to signal
616 if (--MCT_Event_Handler::active_handlers_
== 0)
619 // The DONT_CALL flag keeps the reactor from calling handle_close ()
620 // again, since we commit suicide below.
621 this->reactor ()->remove_handler (this,
622 ACE_Event_Handler::ALL_EVENTS_MASK
|
623 ACE_Event_Handler::DONT_CALL
);
630 MCT_Event_Handler::get_handle (void) const
632 return this->mcast_
.get_handle ();
635 // Turn loopback on/off
637 MCT_Event_Handler::loopback (bool on_off
)
639 char loopback_on
= on_off
? 1 : 0;
640 return this->mcast_
.set_option (IP_MULTICAST_LOOP
, loopback_on
);
643 /******************************************************************************/
646 * Our MCT_Task object will be an Active Object if we are running the Consumer
647 * side of the test. open() calls active() which creates a thread and calls
648 * the svc() method that calls runs the reactor event loop.
650 class MCT_Task
: public ACE_Task
<ACE_NULL_SYNCH
>
653 MCT_Task (const MCT_Config
&config
,
654 ACE_Reactor
*reactor
= ACE_Reactor::instance ());
657 //FUZZ: disable check_for_lack_ACE_OS
659 virtual int open (void *args
= 0);
660 //FUZZ: enable check_for_lack_ACE_OS
662 virtual int svc (void);
665 const MCT_Config
&config_
;
668 MCT_Task::MCT_Task (const MCT_Config
&config
,
669 ACE_Reactor
*reactor
)
672 this->reactor (reactor
);
675 MCT_Task::~MCT_Task (void)
679 MCT_Task::open (void *)
681 MCT_Event_Handler
*handler
= 0;
683 ACE_INET_Addr addr
= this->config_
.group_start ();
684 int groups
= this->config_
.groups ();
685 for (int i
= 0; i
< groups
; ++i
)
687 ACE_NEW_RETURN (handler
,
688 MCT_Event_Handler (this->config_
.options ()), -1);
689 // We subscribe to all groups for the first one and one each for
693 // go ahead and hide the other one since we want our own.
694 ACE_INET_Addr addr
= this->config_
.group_start ();
695 for (int j
= 0; j
< groups
; ++j
)
697 // If OPT_BINDADDR_YES is set, this will fail after the first
698 // join, so just break and keep on going, otherwise it's a
701 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
,
702 this->config_
.options ()))
705 if (handler
->join (addr
) == -1)
706 ACE_ERROR_RETURN ((LM_ERROR
,
707 ACE_TEXT ("MCT_Task::open - join error\n")),
714 if (handler
->join (addr
) == -1)
715 ACE_ERROR_RETURN ((LM_ERROR
,
716 ACE_TEXT ("MCT_Task::open - join error\n")),
722 // This test needs loopback because we're both sending and receiving.
723 // Loopback is usually the default, but be sure.
724 if (-1 == handler
->loopback (true))
725 ACE_ERROR ((LM_WARNING
,
727 ACE_TEXT ("MCT_Task::open - enable loopback")));
729 if (this->reactor ()->register_handler (handler
, READ_MASK
) == -1)
730 ACE_ERROR_RETURN ((LM_ERROR
,
731 ACE_TEXT ("MCT_Task::open - cannot register ")
732 ACE_TEXT ("handler\n")),
736 if (this->activate (THR_NEW_LWP
) == -1)
737 ACE_ERROR_RETURN ((LM_ERROR
,
739 ACE_TEXT ("MCT_TASK:open - activate failed")),
747 // make sure this thread owns the reactor or handle_events () won't do
749 this->reactor ()->owner (ACE_Thread::self ());
751 // loop and call handle_events...
753 this->reactor ()->handle_events ();
758 /******************************************************************************/
760 int send_dgram (ACE_SOCK_Dgram
&socket
, ACE_INET_Addr addr
, int done
= 0)
763 // Send each message twice, once to the right port, and once to the "wrong"
764 // port. This helps generate noise and lets us see if port filtering is
766 const char *address
= addr
.get_host_addr ();
767 int port
= addr
.get_port_number ();
769 for (int i
= 0; i
< 2; ++i
)
771 char buf
[MAX_STRING_SIZE
];
775 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d", address
, port
);
777 if (socket
.send (buf
, ACE_OS::strlen (buf
),addr
) == -1)
778 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("Send to %C, %p\n"),
780 ACE_TEXT ("send_dgram - error calling send on ")
781 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
782 addr
.set_port_number (++port
);
787 int producer (MCT_Config
&config
)
791 //FUZZ: disable check_for_lack_ACE_OS
792 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting producer...\n")));
793 ACE_SOCK_Dgram
socket (ACE_sap_any_cast (ACE_INET_Addr
&), PF_INET
);
794 //FUZZ: enable check_for_lack_ACE_OS
796 // Note that is is IPv4 specific and needs to be changed once
798 if (config
.ttl () > 1)
800 int ttl
= config
.ttl ();
801 if (socket
.set_option (IPPROTO_IP
,
805 ACE_DEBUG ((LM_ERROR
,
806 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
810 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl
));
813 int iterations
= config
.iterations ();
814 // we add an extra 5 groups for noise.
815 int groups
= config
.groups () + 5;
816 for (int i
= 0; (i
< iterations
|| iterations
== 0) && !finished
; ++i
)
818 ACE_INET_Addr addr
= config
.group_start ();
819 for (int j
= 0; j
< groups
&& !finished
; ++j
)
821 if ((retval
+= send_dgram (socket
, addr
,
822 ((i
+ 1) == iterations
))) == -1)
823 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Calling send_dgram.\n")));
824 if ((retval
+= advance_addr (addr
)) == -1)
825 ACE_ERROR ((LM_ERROR
,
826 ACE_TEXT ("Calling advance_addr.\n")));
828 // Give the task thread a chance to run.
829 ACE_Thread::yield ();
836 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
837 * Note that the algorithm is somewhat simplistic, but sufficient for our
840 int advance_addr (ACE_INET_Addr
&addr
)
843 ::sscanf (addr
.get_host_addr (), "%d.%d.%d.%d", &a
, &b
, &c
, &d
);
865 ACE_ERROR_RETURN ((LM_ERROR
,
866 ACE_TEXT ("advance_addr - Cannot advance multicast ")
867 ACE_TEXT ("group address past %s\n"),
868 addr
.get_host_addr ()),
871 ACE_TCHAR buf
[MAX_STRING_SIZE
];
872 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, ACE_TEXT ("%d.%d.%d.%d:%d"),
873 a
, b
, c
, d
, addr
.get_port_number ());
879 run_main (int argc
, ACE_TCHAR
*argv
[])
883 retval
= config
.open (argc
, argv
);
887 const ACE_TCHAR
*temp
= ACE_TEXT ("Multicast_Test");
888 ACE_TString test
= temp
;
890 u_long role
= config
.role ();
891 if (ACE_BIT_DISABLED (role
, MCT_Config::PRODUCER
)
892 || ACE_BIT_DISABLED (role
, MCT_Config::CONSUMER
))
894 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
895 test
+= ACE_TEXT ("-PRODUCER");
897 test
+= ACE_TEXT ("-CONSUMER");
900 // Start test only if options are valid.
901 ACE_START_TEST (test
.c_str ());
903 // Register a signal handler to close down application gracefully.
904 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGINT
);
906 // Dump the configuration info to the log if caller passed debug option.
910 ACE_Reactor
*reactor
= ACE_Reactor::instance ();
912 MCT_Task
*task
= new MCT_Task (config
, reactor
);
914 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
916 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting consumer...\n")));
917 // Open makes it an active object.
918 retval
+= task
->open ();
921 // now produce the datagrams...
922 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
923 retval
+= producer (config
);
925 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
927 // and wait for everything to finish
929 ACE_TEXT ("start waiting for consumer to finish...\n")));
930 // Wait for the threads to exit.
931 // But, wait for a limited time since we could hang if the last udp
932 // message isn't received.
933 ACE_Time_Value
max_wait ( config
.wait ()/* seconds */);
934 ACE_Time_Value
wait_time (ACE_OS::gettimeofday () + max_wait
);
935 ACE_Time_Value
*ptime
= ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
)
937 if (ACE_Thread_Manager::instance ()->wait (ptime
) == -1)
939 // We will no longer wait for this thread, so we must
940 // force it to exit otherwise the thread will be referencing
943 reactor
->end_reactor_event_loop ();
946 ACE_ERROR ((LM_ERROR
,
947 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
950 ACE_OS::perror (ACE_TEXT ("wait"));
954 // This should exit now that we ended the reactor loop.
961 return (retval
== 0 && error
== 0) ? 0 : 1;
966 run_main (int, ACE_TCHAR
*[])
968 ACE_START_TEST (ACE_TEXT ("Multicast_Test"));
971 ACE_TEXT ("This test must be run on a platform ")
972 ACE_TEXT ("that support IP multicast.\n")));
977 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */