1 // ============================================================================
7 // This program tests ACE_SOCK_Dgram_Mcast class.
8 // It specifically tests subscribing to multiple groups on the same socket
9 // on one or more physical interfaces (if available).
11 // The test can be run as a producer, consumer, or both producer/consumer
12 // (default). The test requires at least two (2) multicast groups which can
13 // be configured as command line options. The consumer subscribes to a
14 // single group per instance and an additional instance tries to subscribe
15 // to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
16 // bind()'s the first address to the socket, additional joins will fail).
17 // The producer iterates through the list of group addresses and sends a
18 // single message containing the destination address and port to each one.
19 // It also sends messages to five (5) additional groups and a message to an
20 // additional port for each group in order to produce a bit of "noise" in
21 // order to help validate how well the multicast filtering works on a
22 // particular platform.
24 // The list of destination groups start at 239.255.0.1 (default) and
25 // increment by 1 up to 5 (default) groups. Both of these values, as well
26 // as others, can be overridden via command-line options. Use the -?
27 // option to display the usage message...
30 // Don Hinton <dhinton@dresystems.com>
32 // ============================================================================
34 #include "test_config.h"
35 #include "ace/Get_Opt.h"
36 #include "ace/Vector_T.h"
37 #include "ace/SOCK_Dgram_Mcast.h"
39 #include "ace/Reactor.h"
40 #include "ace/OS_NS_stdio.h"
41 #include "ace/OS_NS_string.h"
42 #include "ace/OS_NS_strings.h"
43 #include "ace/OS_NS_sys_time.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infninite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
57 static sig_atomic_t finished
= 0;
58 extern "C" void handler (int)
63 static const int MCT_ITERATIONS
= 10;
64 static const int MCT_GROUPS
= 5;
65 static const int MCT_MIN_GROUPS
= 2;
67 static const char MCT_START_GROUP
[] = "239.255.0.1";
68 static const int MCT_START_PORT
= 16000;
70 static const size_t MAX_STRING_SIZE
= 200;
72 int advance_addr (ACE_INET_Addr
&addr
);
74 // Keep track of errors so we can report them on exit.
75 static sig_atomic_t error
= 0;
78 * MCast_Config holds configuration data for this test.
87 BOTH
= PRODUCER
| CONSUMER
91 : group_start_ (MCT_START_PORT
, MCT_START_GROUP
),
95 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS
),
96 iterations_ (MCT_ITERATIONS
),
100 if (IP_MAX_MEMBERSHIPS
== 0)
101 this->groups_
= MCT_GROUPS
;
103 this->groups_
= ACE_MIN (IP_MAX_MEMBERSHIPS
, MCT_GROUPS
);
108 //FUZZ: disable check_for_lack_ACE_OS
109 int open (int argc
, ACE_TCHAR
*argv
[]);
110 //FUZZ: enable check_for_lack_ACE_OS
112 int debug () const { return this->debug_
;}
114 int groups () const { return this->groups_
;}
115 const ACE_INET_Addr
group_start () const { return this->group_start_
;}
116 u_long
role () const { return this->role_
;}
117 int iterations () const { return this->iterations_
;}
118 int ttl () const { return this->ttl_
;}
120 //FUZZ: disable check_for_lack_ACE_OS
121 int wait () const { return this->wait_
;}
122 //FUZZ: enable check_for_lack_ACE_OS
124 ACE_SOCK_Dgram_Mcast::options
options () const
126 return static_cast<ACE_SOCK_Dgram_Mcast::options
> (this->sdm_opts_
);
130 // Starting group address. (only IPv4 capable right now...)
131 ACE_INET_Addr group_start_
;
133 // Number of groups we will try to use in the test.
139 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
142 // ACE_SOCK_Dgram_Mcast ctor options
145 // Producer iterations
148 // TTL, time to live, for use over routers.
151 // Time to wait on CONSUMER threads to end before killing test.
156 MCT_Config::open (int argc
, ACE_TCHAR
*argv
[])
161 //FUZZ: disable check_for_lack_ACE_OS
162 ACE_Get_Opt
getopt (argc
, argv
, ACE_TEXT (":?"), 1, 1);
163 //FUZZ: enable check_for_lack_ACE_OS
165 if (getopt
.long_option (ACE_TEXT ("GroupStart"),
167 ACE_Get_Opt::ARG_REQUIRED
) != 0)
168 ACE_ERROR_RETURN ((LM_ERROR
,
169 ACE_TEXT (" Unable to add GroupStart option.\n")),
172 if (getopt
.long_option (ACE_TEXT ("Groups"),
174 ACE_Get_Opt::ARG_REQUIRED
) != 0)
175 ACE_ERROR_RETURN ((LM_ERROR
,
176 ACE_TEXT (" Unable to add Groups option.\n")), 1);
178 if (getopt
.long_option (ACE_TEXT ("Debug"),
180 ACE_Get_Opt::NO_ARG
) != 0)
181 ACE_ERROR_RETURN ((LM_ERROR
,
182 ACE_TEXT (" Unable to add Debug option.\n")), 1);
184 if (getopt
.long_option (ACE_TEXT ("Role"),
186 ACE_Get_Opt::ARG_REQUIRED
) != 0)
187 ACE_ERROR_RETURN ((LM_ERROR
,
188 ACE_TEXT (" Unable to add Role option.\n")), 1);
190 if (getopt
.long_option (ACE_TEXT ("SDM_options"),
192 ACE_Get_Opt::ARG_REQUIRED
) != 0)
193 ACE_ERROR_RETURN ((LM_ERROR
,
194 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
197 if (getopt
.long_option (ACE_TEXT ("Iterations"),
199 ACE_Get_Opt::ARG_REQUIRED
) != 0)
200 ACE_ERROR_RETURN ((LM_ERROR
,
201 ACE_TEXT (" Unable to add iterations option.\n")),
204 if (getopt
.long_option (ACE_TEXT ("TTL"),
206 ACE_Get_Opt::ARG_REQUIRED
) != 0)
207 ACE_ERROR_RETURN ((LM_ERROR
,
208 ACE_TEXT (" Unable to add TTL option.\n")),
211 if (getopt
.long_option (ACE_TEXT ("Wait"),
213 ACE_Get_Opt::ARG_REQUIRED
) != 0)
214 ACE_ERROR_RETURN ((LM_ERROR
,
215 ACE_TEXT (" Unable to add wait option.\n")),
218 if (getopt
.long_option (ACE_TEXT ("help"),
220 ACE_Get_Opt::NO_ARG
) != 0)
221 ACE_ERROR_RETURN ((LM_ERROR
,
222 ACE_TEXT (" Unable to add help option.\n")),
225 //FUZZ: disable check_for_lack_ACE_OS
226 // Now, let's parse it...
228 while ((c
= getopt ()) != EOF
)
230 //FUZZ: enable check_for_lack_ACE_OS
234 // Long Option. This should never happen.
239 // @todo validate all these, i.e., must be within range
240 // 224.255.0.0 to 238.255.255.255, but we only allow the
241 // administrative "site local" range, 239.255.0.0 to
243 ACE_TCHAR
*group
= getopt
.opt_arg ();
244 if (this->group_start_
.set (group
) != 0)
246 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Bad group address:%s\n"),
252 this->iterations_
= ACE_OS::atoi (getopt
.opt_arg ());
256 int n
= ACE_OS::atoi (getopt
.opt_arg ());
257 // I'm assuming 0 means unlimited, so just use whatever the
259 if (IP_MAX_MEMBERSHIPS
== 0)
262 this->groups_
= ACE_MIN (ACE_MAX (n
, MCT_MIN_GROUPS
),
271 ACE_TCHAR
*c
= getopt
.opt_arg ();
272 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("CONSUMER")) == 0)
273 this->role_
= CONSUMER
;
274 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("PRODUCER")) == 0)
275 this->role_
= PRODUCER
;
285 //@todo add back OPT_BINDADDR_NO...
286 ACE_TCHAR
*c
= getopt
.opt_arg ();
287 if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
288 ACE_SET_BITS (this->sdm_opts_
,
289 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
290 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
291 ACE_CLR_BITS (this->sdm_opts_
,
292 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
293 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
295 ACE_CLR_BITS (this->sdm_opts_
,
296 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
);
297 ACE_SET_BITS (this->sdm_opts_
,
298 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR
);
300 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
301 ACE_SET_BITS (this->sdm_opts_
,
302 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
303 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
304 ACE_CLR_BITS (this->sdm_opts_
,
305 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
306 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
308 ACE_CLR_BITS (this->sdm_opts_
,
309 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL
);
310 ACE_SET_BITS (this->sdm_opts_
,
311 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE
);
313 else if (ACE_OS::strcasecmp (c
, ACE_TEXT ("DEFOPTS")) == 0)
314 this->sdm_opts_
= ACE_SOCK_Dgram_Mcast::DEFOPTS
;
323 this->ttl_
= ACE_OS::atoi (getopt
.opt_arg ());
326 this->wait_
= ACE_OS::atoi (getopt
.opt_arg ());
329 // This means an option requiring an argument didn't have one.
330 ACE_ERROR ((LM_ERROR
,
331 ACE_TEXT (" Option '%c' requires an argument but ")
332 ACE_TEXT ("none was supplied\n"),
340 if (ACE_OS::strcmp (argv
[getopt
.opt_ind () - 1], ACE_TEXT ("-?")) != 0
341 && getopt
.opt_opt () != 'h')
342 // Don't allow unknown options.
343 ACE_ERROR ((LM_ERROR
,
344 ACE_TEXT (" Found an unknown option (%c) ")
345 ACE_TEXT ("we couldn't handle.\n"),
347 // getopt.last_option ())); //readd with "%s" when
348 // last_option() is available.
359 ACE_ERROR ((LM_ERROR
,
360 ACE_TEXT ("usage: %s [options]\n")
361 ACE_TEXT ("Options:\n")
362 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
363 ACE_TEXT ("starting multicast group address\n")
365 ACE_TEXT ("(default=239.255.0.1:16000)\n")
366 ACE_TEXT (" -n {#} --Groups={#} ")
367 ACE_TEXT ("number of groups (default=5)\n")
368 ACE_TEXT (" -d --Debug ")
369 ACE_TEXT ("debug flag (default=off)\n")
370 ACE_TEXT (" -r {STRING} --Role={STRING} ")
371 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
373 ACE_TEXT ("(default=BOTH)\n")
374 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
375 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
377 ACE_TEXT ("(default=DEFOPTS)\n")
378 ACE_TEXT (" -i {#} --Iterations={#} ")
379 ACE_TEXT ("number of iterations (default=100)\n")
380 ACE_TEXT (" -t {#} --TTL={#} ")
381 ACE_TEXT ("time to live (default=1)\n")
382 ACE_TEXT (" -w {#} --Wait={#} ")
383 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
385 ACE_TEXT ("(default=2)\n")
386 ACE_TEXT (" -h/? --help ")
387 ACE_TEXT ("show this message\n"),
397 MCT_Config::dump () const
399 ACE_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
400 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT (" Dumping MCT_Config\n")));
401 ACE_DEBUG ((LM_DEBUG
,
402 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
403 IP_MAX_MEMBERSHIPS
));
404 ACE_DEBUG ((LM_DEBUG
,
405 ACE_TEXT ("\tgroups_ = %d\n"),
407 ACE_DEBUG ((LM_DEBUG
,
408 ACE_TEXT ("\trole_ = %s\n"),
409 (ACE_BIT_ENABLED (this->role_
, PRODUCER
)
410 && ACE_BIT_ENABLED (this->role_
, CONSUMER
))
411 ? ACE_TEXT ("PRODUCER/CONSUMER")
412 : ACE_BIT_ENABLED (this->role_
, PRODUCER
)
413 ? ACE_TEXT ("PRODUCER")
414 : ACE_TEXT ("CONSUMER")));
415 ACE_DEBUG ((LM_DEBUG
,
416 ACE_TEXT ("\tsdm_options_ = %d\n"),
418 ACE_DEBUG ((LM_DEBUG
,
419 ACE_TEXT ("\titerations_ = %d\n"),
421 ACE_DEBUG ((LM_DEBUG
,
422 ACE_TEXT ("\tttl_ = %d\n"),
424 ACE_DEBUG ((LM_DEBUG
,
425 ACE_TEXT ("\twait_ = %d\n"),
427 // Note that this call to get_host_addr is the non-reentrant
428 // version, but it's okay for us.
429 ACE_DEBUG ((LM_DEBUG
,
430 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
431 this->group_start_
.get_host_addr (),
432 this->group_start_
.get_port_number ()));
434 ACE_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
437 /******************************************************************************/
439 class MCT_Event_Handler
: public ACE_Event_Handler
442 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
443 = ACE_SOCK_Dgram_Mcast::DEFOPTS
);
444 ~MCT_Event_Handler () override
;
446 int join (const ACE_INET_Addr
&mcast_addr
,
448 const ACE_TCHAR
*net_if
= 0);
449 int leave (const ACE_INET_Addr
&mcast_addr
,
450 const ACE_TCHAR
*net_if
= 0);
452 // = Event Handler hooks.
453 int handle_input (ACE_HANDLE handle
) override
;
454 int handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask close_mask
) override
;
456 ACE_HANDLE
get_handle () const override
;
458 // Turn loopback on/off. Must be called after at least 1 join() is performed.
459 int loopback (bool on_off
);
462 ACE_SOCK_Dgram_Mcast
*mcast ();
463 int find (const char *buf
);
466 ACE_SOCK_Dgram_Mcast mcast_
;
468 // List of groups we've joined
469 ACE_Vector
<ACE_CString
*> address_vec_
;
471 // Flag used to set the 'finished' flag when the last event handler
472 // gets removed from the reactor.
473 static ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> active_handlers_
;
476 ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> MCT_Event_Handler::active_handlers_
= 0;
478 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
)
481 // Increment the number of active handlers in the reactor. Note this isn't
482 // really correct, but it should work for our simple example.
483 ++MCT_Event_Handler::active_handlers_
;
486 MCT_Event_Handler::~MCT_Event_Handler ()
488 size_t size
= this->address_vec_
.size ();
489 for (size_t i
= 0; i
< size
; ++i
)
491 delete this->address_vec_
[i
];
492 this->address_vec_
[i
] = 0;
498 ACE_SOCK_Dgram_Mcast
*
499 MCT_Event_Handler::mcast ()
501 return &this->mcast_
;
505 MCT_Event_Handler::find (const char *buf
)
507 size_t const size
= this->address_vec_
.size ();
509 for (i
= 0; i
< size
; ++i
)
511 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
515 // Not found, so output message we received along with a list of groups
516 // we've joined for debugging.
518 for (i
= 0; i
< size
; ++i
)
521 local
+= this->address_vec_
[i
]->c_str ();
524 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("%C not in:\n%C"),
525 buf
, local
.c_str ()));
532 MCT_Event_Handler::join (const ACE_INET_Addr
&mcast_addr
,
534 const ACE_TCHAR
*net_if
)
536 char buf
[MAX_STRING_SIZE
];
537 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
538 mcast_addr
.get_host_addr (),
539 mcast_addr
.get_port_number ());
541 if (this->mcast_
.join (mcast_addr
, reuse_addr
, net_if
) == -1)
542 ACE_ERROR_RETURN ((LM_ERROR
,
543 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
545 ACE_TEXT ("failed")),
547 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Joined %C\n"), buf
));
549 ACE_CString
*str
= 0;
550 ACE_NEW_RETURN (str
, ACE_CString (buf
), -1);
551 this->address_vec_
.push_back (str
);
556 MCT_Event_Handler::leave (const ACE_INET_Addr
&mcast_addr
,
557 const ACE_TCHAR
*net_if
)
559 if (this->mcast_
.leave (mcast_addr
, net_if
) == 0)
561 char buf
[MAX_STRING_SIZE
];
562 size_t size
= this->address_vec_
.size ();
563 for (size_t i
= 0; i
< size
; ++i
)
565 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d",
566 mcast_addr
.get_host_addr (),
567 mcast_addr
.get_port_number ());
568 if (ACE_OS::strcasecmp (buf
, this->address_vec_
[i
]->c_str ()) == 0)
570 this->address_vec_
[i
]->set ("");
580 MCT_Event_Handler::handle_input (ACE_HANDLE
/*handle*/)
582 char buf
[MAX_STRING_SIZE
];
583 ACE_OS::memset (buf
, 0, sizeof buf
);
586 if (this->mcast ()->recv (buf
, sizeof buf
, addr
) == -1)
589 ACE_ERROR_RETURN ((LM_ERROR
,
590 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
591 ACE_TEXT ("calling recv\n")), -1);
594 // Zero length buffer means we are done.
595 if (ACE_OS::strlen (buf
) == 0)
597 else if (this->find (buf
) == -1)
600 ACE_DEBUG ((LM_ERROR
,
601 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
602 ACE_TEXT ("Received dgram for a group we didn't join ")
610 MCT_Event_Handler::handle_close (ACE_HANDLE
/*fd*/,
611 ACE_Reactor_Mask
/*close_mask*/)
613 // If this is the last handler, use the finished flag to signal
615 if (--MCT_Event_Handler::active_handlers_
== 0)
618 // The DONT_CALL flag keeps the reactor from calling handle_close ()
619 // again, since we commit suicide below.
620 this->reactor ()->remove_handler (this,
621 ACE_Event_Handler::ALL_EVENTS_MASK
|
622 ACE_Event_Handler::DONT_CALL
);
629 MCT_Event_Handler::get_handle () const
631 return this->mcast_
.get_handle ();
634 // Turn loopback on/off
636 MCT_Event_Handler::loopback (bool on_off
)
638 char loopback_on
= on_off
? 1 : 0;
639 return this->mcast_
.set_option (IP_MULTICAST_LOOP
, loopback_on
);
642 /******************************************************************************/
645 * Our MCT_Task object will be an Active Object if we are running the Consumer
646 * side of the test. open() calls active() which creates a thread and calls
647 * the svc() method that calls runs the reactor event loop.
649 class MCT_Task
: public ACE_Task
<ACE_NULL_SYNCH
>
652 MCT_Task (const MCT_Config
&config
,
653 ACE_Reactor
*reactor
= ACE_Reactor::instance ());
654 ~MCT_Task () override
;
656 //FUZZ: disable check_for_lack_ACE_OS
658 int open (void *args
= 0) override
;
659 //FUZZ: enable check_for_lack_ACE_OS
664 const MCT_Config
&config_
;
667 MCT_Task::MCT_Task (const MCT_Config
&config
,
668 ACE_Reactor
*reactor
)
671 this->reactor (reactor
);
674 MCT_Task::~MCT_Task ()
678 MCT_Task::open (void *)
680 MCT_Event_Handler
*handler
= 0;
682 ACE_INET_Addr addr
= this->config_
.group_start ();
683 int groups
= this->config_
.groups ();
684 for (int i
= 0; i
< groups
; ++i
)
686 ACE_NEW_RETURN (handler
,
687 MCT_Event_Handler (this->config_
.options ()), -1);
688 // We subscribe to all groups for the first one and one each for
692 // go ahead and hide the other one since we want our own.
693 ACE_INET_Addr addr
= this->config_
.group_start ();
694 for (int j
= 0; j
< groups
; ++j
)
696 // If OPT_BINDADDR_YES is set, this will fail after the first
697 // join, so just break and keep on going, otherwise it's a
700 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES
,
701 this->config_
.options ()))
704 if (handler
->join (addr
) == -1)
705 ACE_ERROR_RETURN ((LM_ERROR
,
706 ACE_TEXT ("MCT_Task::open - join error\n")),
713 if (handler
->join (addr
) == -1)
714 ACE_ERROR_RETURN ((LM_ERROR
,
715 ACE_TEXT ("MCT_Task::open - join error\n")),
721 // This test needs loopback because we're both sending and receiving.
722 // Loopback is usually the default, but be sure.
723 if (-1 == handler
->loopback (true))
724 ACE_ERROR ((LM_WARNING
,
726 ACE_TEXT ("MCT_Task::open - enable loopback")));
728 if (this->reactor ()->register_handler (handler
, READ_MASK
) == -1)
729 ACE_ERROR_RETURN ((LM_ERROR
,
730 ACE_TEXT ("MCT_Task::open - cannot register ")
731 ACE_TEXT ("handler\n")),
735 if (this->activate (THR_NEW_LWP
) == -1)
736 ACE_ERROR_RETURN ((LM_ERROR
,
738 ACE_TEXT ("MCT_TASK:open - activate failed")),
746 // make sure this thread owns the reactor or handle_events () won't do
748 this->reactor ()->owner (ACE_Thread::self ());
750 // loop and call handle_events...
752 this->reactor ()->handle_events ();
757 /******************************************************************************/
759 int send_dgram (ACE_SOCK_Dgram
&socket
, ACE_INET_Addr addr
, int done
= 0)
761 // Send each message twice, once to the right port, and once to the "wrong"
762 // port. This helps generate noise and lets us see if port filtering is
764 const char *address
= addr
.get_host_addr ();
765 int port
= addr
.get_port_number ();
767 for (int i
= 0; i
< 2; ++i
)
769 char buf
[MAX_STRING_SIZE
];
773 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, "%s/%d", address
, port
);
775 if (socket
.send (buf
, ACE_OS::strlen (buf
),addr
) == -1)
776 ACE_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("Send to %C, %p\n"),
778 ACE_TEXT ("send_dgram - error calling send on ")
779 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
780 addr
.set_port_number (++port
);
785 int producer (MCT_Config
&config
)
789 //FUZZ: disable check_for_lack_ACE_OS
790 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting producer...\n")));
791 ACE_SOCK_Dgram
socket (ACE_sap_any_cast (ACE_INET_Addr
&), PF_INET
);
792 //FUZZ: enable check_for_lack_ACE_OS
794 // Note that is is IPv4 specific and needs to be changed once
796 if (config
.ttl () > 1)
798 int ttl
= config
.ttl ();
799 if (socket
.set_option (IPPROTO_IP
,
803 ACE_DEBUG ((LM_ERROR
,
804 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
808 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl
));
811 int iterations
= config
.iterations ();
812 // we add an extra 5 groups for noise.
813 int groups
= config
.groups () + 5;
814 for (int i
= 0; (i
< iterations
|| iterations
== 0) && !finished
; ++i
)
816 ACE_INET_Addr addr
= config
.group_start ();
817 for (int j
= 0; j
< groups
&& !finished
; ++j
)
819 if ((retval
+= send_dgram (socket
, addr
,
820 ((i
+ 1) == iterations
))) == -1)
821 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Calling send_dgram.\n")));
822 if ((retval
+= advance_addr (addr
)) == -1)
823 ACE_ERROR ((LM_ERROR
,
824 ACE_TEXT ("Calling advance_addr.\n")));
826 // Give the task thread a chance to run.
827 ACE_Thread::yield ();
834 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
835 * Note that the algorithm is somewhat simplistic, but sufficient for our
838 int advance_addr (ACE_INET_Addr
&addr
)
841 ::sscanf (addr
.get_host_addr (), "%d.%d.%d.%d", &a
, &b
, &c
, &d
);
863 ACE_ERROR_RETURN ((LM_ERROR
,
864 ACE_TEXT ("advance_addr - Cannot advance multicast ")
865 ACE_TEXT ("group address past %s\n"),
866 addr
.get_host_addr ()),
869 ACE_TCHAR buf
[MAX_STRING_SIZE
];
870 ACE_OS::snprintf (buf
, MAX_STRING_SIZE
, ACE_TEXT ("%d.%d.%d.%d:%d"),
871 a
, b
, c
, d
, addr
.get_port_number ());
877 run_main (int argc
, ACE_TCHAR
*argv
[])
881 retval
= config
.open (argc
, argv
);
885 const ACE_TCHAR
*temp
= ACE_TEXT ("Multicast_Test");
886 ACE_TString test
= temp
;
888 u_long role
= config
.role ();
889 if (ACE_BIT_DISABLED (role
, MCT_Config::PRODUCER
)
890 || ACE_BIT_DISABLED (role
, MCT_Config::CONSUMER
))
892 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
893 test
+= ACE_TEXT ("-PRODUCER");
895 test
+= ACE_TEXT ("-CONSUMER");
898 // Start test only if options are valid.
899 ACE_START_TEST (test
.c_str ());
901 // Register a signal handler to close down application gracefully.
902 ACE_Sig_Action
sa ((ACE_SignalHandler
) handler
, SIGINT
);
904 // Dump the configuration info to the log if caller passed debug option.
908 ACE_Reactor
*reactor
= ACE_Reactor::instance ();
910 MCT_Task
*task
= new MCT_Task (config
, reactor
);
912 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
914 ACE_DEBUG ((LM_INFO
, ACE_TEXT ("Starting consumer...\n")));
915 // Open makes it an active object.
916 retval
+= task
->open ();
919 // now produce the datagrams...
920 if (ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
))
921 retval
+= producer (config
);
923 if (ACE_BIT_ENABLED (role
, MCT_Config::CONSUMER
))
925 // and wait for everything to finish
927 ACE_TEXT ("start waiting for consumer to finish...\n")));
928 // Wait for the threads to exit.
929 // But, wait for a limited time since we could hang if the last udp
930 // message isn't received.
931 ACE_Time_Value
max_wait ( config
.wait ()/* seconds */);
932 ACE_Time_Value
wait_time (ACE_OS::gettimeofday () + max_wait
);
933 ACE_Time_Value
*ptime
= ACE_BIT_ENABLED (role
, MCT_Config::PRODUCER
)
935 if (ACE_Thread_Manager::instance ()->wait (ptime
) == -1)
937 // We will no longer wait for this thread, so we must
938 // force it to exit otherwise the thread will be referencing
941 reactor
->end_reactor_event_loop ();
944 ACE_ERROR ((LM_ERROR
,
945 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
948 ACE_OS::perror (ACE_TEXT ("wait"));
952 // This should exit now that we ended the reactor loop.
959 return (retval
== 0 && error
== 0) ? 0 : 1;
964 run_main (int, ACE_TCHAR
*[])
966 ACE_START_TEST (ACE_TEXT ("Multicast_Test"));
969 ACE_TEXT ("This test must be run on a platform ")
970 ACE_TEXT ("that support IP multicast.\n")));
975 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */