Merge pull request #2301 from sonndinh/remove-dup-reactor-functions
[ACE_TAO.git] / ACE / tests / Multicast_Test_IPV6.cpp
blob11056ef53c77f172a1bde5fecaabd7820e963f50
1 // ============================================================================
2 //FUZZ: disable check_for_lack_ACE_OS
3 /**
4 * @file Multicast_Test_IPV6.cpp
6 * @brief This program tests ACE_SOCK_Dgram_Mcast class.
8 * It specifically tests subscribing to multiple groups on the same
9 * socket on one or more physical interfaces (if available).
11 * The test can be run as a producer, consumer, or both
12 * producer/consumer (default). The test requires at least two (2)
13 * multicast groups which can be configured as command line options.
14 * The consumer subscribes to a single group per instance and an
15 * additional instance tries to subscribe to all groups on a single
16 * socket (if the ACE_SOCK_Dgram_Mcast instance bind()'s the first
17 * address to the socket, additional joins will fail). The producer
18 * iterates through the list of group addresses and sends a single
19 * message containing the destination address and port to each one. It
20 * also sends messages to five (5) additional groups and a message to
21 * an additional port for each group in order to produce a bit of
22 * "noise" in order to help validate how well the multicast filtering
23 * works on a particular platform.
25 * The list of destination groups start at ff01::1 (default) and
26 * increment by 1 up to 5 (default) groups. Both of these values, as
27 * well as others, can be overridden via command-line options. Use
28 * the -? option to display the usage message...
30 * @author Don Hinton <dhinton@dresystems.com>
31 * Brian Buesker <bbuesker@qualcomm.com>
33 // ============================================================================
34 //FUZZ: enable check_for_lack_ACE_OS
36 #include "test_config.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Vector_T.h"
39 #include "ace/SOCK_Dgram_Mcast.h"
40 #include "ace/ACE.h"
41 #include "ace/Reactor.h"
42 #include "ace/OS_NS_string.h"
43 #include "ace/OS_NS_strings.h"
44 #include "ace/Task.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infinite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
55 * SIGINT (CTRL-C).
57 static sig_atomic_t finished = 0;
58 extern "C" void handler (int)
60 finished = 1;
63 static const int MCT_ITERATIONS = 10;
64 static const int MCT_GROUPS = 5;
65 static const int MCT_MIN_GROUPS = 2;
67 #if defined (ACE_HAS_IPV6)
68 static const char MCT_START_GROUP[] = "ff01::1";
69 #else
70 // an IPv4 address that will ensure an error message is not printed when
71 // IPv6 is not enabled
72 static const char MCT_START_GROUP[] = "239.255.0.1";
73 #endif /* ACE_HAS_IPV6 */
74 static const int MCT_START_PORT = 16000;
76 static const size_t MAX_STRING_SIZE = 200;
78 int advance_addr (ACE_INET_Addr &addr);
80 // Keep track of errors so we can report them on exit.
81 static sig_atomic_t error = 0;
84 * MCast_Config holds configuration data for this test.
86 class MCT_Config
88 public:
89 enum
91 PRODUCER = 1,
92 CONSUMER = 2,
93 BOTH = PRODUCER | CONSUMER
96 MCT_Config ()
97 : group_start_ (MCT_START_PORT, MCT_START_GROUP),
98 groups_ (0),
99 debug_ (0),
100 role_ (BOTH),
101 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
102 iterations_ (MCT_ITERATIONS),
103 ttl_ (1),
104 wait_ (2)
106 if (IP_MAX_MEMBERSHIPS == 0)
107 this->groups_ = MCT_GROUPS;
108 else
109 this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
112 ~MCT_Config ()
115 //FUZZ: disable check_for_lack_ACE_OS
116 int open (int argc, ACE_TCHAR *argv[]);
117 //FUZZ: enable check_for_lack_ACE_OS
119 int debug () const { return this->debug_;}
120 void dump () const;
121 int groups () const { return this->groups_;}
122 const ACE_INET_Addr group_start () const { return this->group_start_;}
123 u_long role () const { return this->role_;}
124 int iterations () const { return this->iterations_;}
125 int ttl () const { return this->ttl_;}
127 //FUZZ: disable check_for_lack_ACE_OS
128 int wait () const { return this->wait_;}
129 //FUZZ: enable check_for_lack_ACE_OS
131 ACE_SOCK_Dgram_Mcast::options options () const
133 return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
136 int set_group (int port, const char *group);
138 private:
139 // Starting group address.
140 ACE_INET_Addr group_start_;
142 // Number of groups we will try to use in the test.
143 int groups_;
145 // Debug flag.
146 int debug_;
148 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
149 u_long role_;
151 // ACE_SOCK_Dgram_Mcast ctor options
152 u_long sdm_opts_;
154 // Producer iterations
155 int iterations_;
157 // TTL, time to live, for use over routers.
158 int ttl_;
160 // Time to wait on CONSUMER threads to end before killing test.
161 int wait_;
165 MCT_Config::open (int argc, ACE_TCHAR *argv[])
167 int retval = 0;
168 int help = 0;
170 //FUZZ: disable check_for_lack_ACE_OS
171 ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);
172 //FUZZ: enable check_for_lack_ACE_OS
174 if (getopt.long_option (ACE_TEXT ("GroupStart"),
175 'g',
176 ACE_Get_Opt::ARG_REQUIRED) != 0)
177 ACE_ERROR_RETURN ((LM_ERROR,
178 ACE_TEXT (" Unable to add GroupStart option.\n")),
181 if (getopt.long_option (ACE_TEXT ("Groups"),
182 'n',
183 ACE_Get_Opt::ARG_REQUIRED) != 0)
184 ACE_ERROR_RETURN ((LM_ERROR,
185 ACE_TEXT (" Unable to add Groups option.\n")), 1);
187 if (getopt.long_option (ACE_TEXT ("Debug"),
188 'd',
189 ACE_Get_Opt::NO_ARG) != 0)
190 ACE_ERROR_RETURN ((LM_ERROR,
191 ACE_TEXT (" Unable to add Debug option.\n")), 1);
193 if (getopt.long_option (ACE_TEXT ("Role"),
194 'r',
195 ACE_Get_Opt::ARG_REQUIRED) != 0)
196 ACE_ERROR_RETURN ((LM_ERROR,
197 ACE_TEXT (" Unable to add Role option.\n")), 1);
199 if (getopt.long_option (ACE_TEXT ("SDM_options"),
200 'm',
201 ACE_Get_Opt::ARG_REQUIRED) != 0)
202 ACE_ERROR_RETURN ((LM_ERROR,
203 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
206 if (getopt.long_option (ACE_TEXT ("Iterations"),
207 'i',
208 ACE_Get_Opt::ARG_REQUIRED) != 0)
209 ACE_ERROR_RETURN ((LM_ERROR,
210 ACE_TEXT (" Unable to add iterations option.\n")),
213 if (getopt.long_option (ACE_TEXT ("TTL"),
214 't',
215 ACE_Get_Opt::ARG_REQUIRED) != 0)
216 ACE_ERROR_RETURN ((LM_ERROR,
217 ACE_TEXT (" Unable to add TTL option.\n")),
220 if (getopt.long_option (ACE_TEXT ("Wait"),
221 'w',
222 ACE_Get_Opt::ARG_REQUIRED) != 0)
223 ACE_ERROR_RETURN ((LM_ERROR,
224 ACE_TEXT (" Unable to add wait option.\n")),
227 if (getopt.long_option (ACE_TEXT ("help"),
228 'h',
229 ACE_Get_Opt::NO_ARG) != 0)
230 ACE_ERROR_RETURN ((LM_ERROR,
231 ACE_TEXT (" Unable to add help option.\n")),
234 // Now, let's parse it...
235 int c = 0;
237 //FUZZ: disable check_for_lack_ACE_OS
238 while ((c = getopt ()) != EOF)
240 //FUZZ: enable check_for_lack_ACE_OS
241 switch (c)
243 case 0:
244 // Long Option. This should never happen.
245 retval = -1;
246 break;
247 case 'g':
249 // @todo validate all these, i.e., must be within range
250 // 224.255.0.0 to 238.255.255.255, but we only allow the
251 // administrative "site local" range, 239.255.0.0 to
252 // 239.255.255.255.
253 ACE_TCHAR *group = getopt.opt_arg ();
254 if (this->group_start_.set (group) != 0)
256 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
257 group));
260 break;
261 case 'i':
262 this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
263 break;
264 case 'n':
266 int n = ACE_OS::atoi (getopt.opt_arg ());
267 // I'm assuming 0 means unlimited, so just use whatever the
268 // user provides.
269 if (IP_MAX_MEMBERSHIPS == 0)
270 this->groups_ = n;
271 else
272 this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
273 IP_MAX_MEMBERSHIPS);
274 break;
276 case 'd':
277 this->debug_ = 1;
278 break;
279 case 'r':
281 ACE_TCHAR *c = getopt.opt_arg ();
282 if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
283 this->role_ = CONSUMER;
284 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
285 this->role_ = PRODUCER;
286 else
288 help = 1;
289 retval = -1;
292 break;
293 case 'm':
295 //@todo add back OPT_BINDADDR_NO...
296 ACE_TCHAR *c = getopt.opt_arg ();
297 if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
298 ACE_SET_BITS (this->sdm_opts_,
299 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
300 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
301 ACE_CLR_BITS (this->sdm_opts_,
302 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
303 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
305 ACE_CLR_BITS (this->sdm_opts_,
306 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
307 ACE_SET_BITS (this->sdm_opts_,
308 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
310 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
311 ACE_SET_BITS (this->sdm_opts_,
312 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
313 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
314 ACE_CLR_BITS (this->sdm_opts_,
315 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
316 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
318 ACE_CLR_BITS (this->sdm_opts_,
319 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
320 ACE_SET_BITS (this->sdm_opts_,
321 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
323 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
324 this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
325 else
327 help = 1;
328 retval = -1;
331 break;
332 case 't':
333 this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
334 break;
335 case 'w':
336 this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
337 break;
338 case ':':
339 // This means an option requiring an argument didn't have one.
340 ACE_ERROR ((LM_ERROR,
341 ACE_TEXT (" Option '%c' requires an argument but ")
342 ACE_TEXT ("none was supplied\n"),
343 getopt.opt_opt ()));
344 help = 1;
345 retval = -1;
346 break;
347 case '?':
348 case 'h':
349 default:
350 if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
351 && getopt.opt_opt () != 'h')
352 // Don't allow unknown options.
353 ACE_ERROR ((LM_ERROR,
354 ACE_TEXT (" Found an unknown option (%c) ")
355 ACE_TEXT ("we couldn't handle.\n"),
356 getopt.opt_opt ()));
357 // getopt.last_option ())); //readd with "%s" when
358 // last_option() is available.
359 help = 1;
360 retval = -1;
361 break;
365 if (retval == -1)
367 if (help)
368 // print usage here
369 ACE_ERROR ((LM_ERROR,
370 ACE_TEXT ("usage: %s [options]\n")
371 ACE_TEXT ("Options:\n")
372 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
373 ACE_TEXT ("starting multicast group address\n")
374 ACE_TEXT (" ")
375 ACE_TEXT ("(default=239.255.0.1:16000)\n")
376 ACE_TEXT (" -n {#} --Groups={#} ")
377 ACE_TEXT ("number of groups (default=5)\n")
378 ACE_TEXT (" -d --Debug ")
379 ACE_TEXT ("debug flag (default=off)\n")
380 ACE_TEXT (" -r {STRING} --Role={STRING} ")
381 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
382 ACE_TEXT (" ")
383 ACE_TEXT ("(default=BOTH)\n")
384 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
385 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
386 ACE_TEXT (" ")
387 ACE_TEXT ("(default=DEFOPTS)\n")
388 ACE_TEXT (" -i {#} --Iterations={#} ")
389 ACE_TEXT ("number of iterations (default=100)\n")
390 ACE_TEXT (" -t {#} --TTL={#} ")
391 ACE_TEXT ("time to live (default=1)\n")
392 ACE_TEXT (" -w {#} --Wait={#} ")
393 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
394 ACE_TEXT (" ")
395 ACE_TEXT ("(default=2)\n")
396 ACE_TEXT (" -h/? --help ")
397 ACE_TEXT ("show this message\n"),
398 argv[0]));
400 return -1;
403 return 0;
406 void
407 MCT_Config::dump () const
409 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
410 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
411 ACE_DEBUG ((LM_DEBUG,
412 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
413 IP_MAX_MEMBERSHIPS));
414 ACE_DEBUG ((LM_DEBUG,
415 ACE_TEXT ("\tgroups_ = %d\n"),
416 this->groups_));
417 ACE_DEBUG ((LM_DEBUG,
418 ACE_TEXT ("\trole_ = %s\n"),
419 (ACE_BIT_ENABLED (this->role_, PRODUCER)
420 && ACE_BIT_ENABLED (this->role_, CONSUMER))
421 ? ACE_TEXT ("PRODUCER/CONSUMER")
422 : ACE_BIT_ENABLED (this->role_, PRODUCER)
423 ? ACE_TEXT ("PRODUCER")
424 : ACE_TEXT ("CONSUMER")));
425 ACE_DEBUG ((LM_DEBUG,
426 ACE_TEXT ("\tsdm_options_ = %d\n"),
427 this->sdm_opts_));
428 ACE_DEBUG ((LM_DEBUG,
429 ACE_TEXT ("\titerations_ = %d\n"),
430 this->iterations_));
431 ACE_DEBUG ((LM_DEBUG,
432 ACE_TEXT ("\tttl_ = %d\n"),
433 this->ttl_));
434 ACE_DEBUG ((LM_DEBUG,
435 ACE_TEXT ("\twait_ = %d\n"),
436 this->wait_));
437 // Note that this call to get_host_addr is the non-reentrant
438 // version, but it's okay for us.
439 ACE_DEBUG ((LM_DEBUG,
440 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
441 this->group_start_.get_host_addr (),
442 this->group_start_.get_port_number ()));
444 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
448 MCT_Config::set_group (int port, const char *group)
450 return group_start_.set (port, group);
453 /******************************************************************************/
455 class MCT_Event_Handler : public ACE_Event_Handler
457 public:
458 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
459 = ACE_SOCK_Dgram_Mcast::DEFOPTS);
460 ~MCT_Event_Handler () override;
462 int join (const ACE_INET_Addr &mcast_addr,
463 int reuse_addr = 1,
464 const ACE_TCHAR *net_if = 0);
465 int leave (const ACE_INET_Addr &mcast_addr,
466 const ACE_TCHAR *net_if = 0);
468 // = Event Handler hooks.
469 int handle_input (ACE_HANDLE handle) override;
470 int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask) override;
472 ACE_HANDLE get_handle () const override;
474 protected:
475 ACE_SOCK_Dgram_Mcast *mcast ();
476 int find (const char *buf);
478 private:
479 ACE_SOCK_Dgram_Mcast mcast_;
481 // List of groups we've joined
482 ACE_Vector<ACE_CString*> address_vec_;
484 // Flag used to set the 'finished' flag when the last event handler
485 // gets removed from the reactor.
486 static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
489 ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;
491 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
492 : mcast_ (options)
494 // Increment the number of active handlers in the reactor. Note this isn't
495 // really correct, but it should work for our simple example.
496 ++MCT_Event_Handler::active_handlers_;
499 MCT_Event_Handler::~MCT_Event_Handler ()
501 size_t size = this->address_vec_.size ();
502 for (size_t i = 0; i < size; ++i)
504 delete this->address_vec_[i];
505 this->address_vec_[i] = 0;
510 ACE_SOCK_Dgram_Mcast *
511 MCT_Event_Handler::mcast ()
513 return &this->mcast_;
517 MCT_Event_Handler::find (const char *buf)
519 size_t size = this->address_vec_.size ();
520 size_t i;
521 for (i = 0; i < size; ++i)
523 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
524 return 0;
527 // Not found, so output message we received along with a list of groups
528 // we've joined for debugging.
529 ACE_CString local;
530 for (i = 0; i < size; ++i)
532 local += "\t";
533 local += this->address_vec_[i]->c_str ();
534 local += "\n";
536 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
537 buf, local.c_str ()));
539 return -1;
544 MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
545 int reuse_addr,
546 const ACE_TCHAR *net_if)
548 char buf[MAX_STRING_SIZE];
549 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
550 mcast_addr.get_host_addr (),
551 mcast_addr.get_port_number ());
553 if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
554 ACE_ERROR_RETURN ((LM_ERROR,
555 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
556 buf,
557 ACE_TEXT ("failed")),
558 -1);
559 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Joined %C\n"), buf));
561 ACE_CString *str = 0;
562 ACE_NEW_RETURN (str, ACE_CString (buf), -1);
563 this->address_vec_.push_back (str);
564 return 0;
568 MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
569 const ACE_TCHAR *net_if)
571 if (this->mcast_.leave (mcast_addr, net_if) == 0)
573 char buf[MAX_STRING_SIZE];
574 size_t size = this->address_vec_.size ();
575 for (size_t i = 0; i < size; ++i)
577 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
578 mcast_addr.get_host_addr (),
579 mcast_addr.get_port_number ());
580 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
582 this->address_vec_[i]->set ("");
583 break;
586 return 0;
588 return -1;
592 MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
594 char buf[MAX_STRING_SIZE];
595 ACE_OS::memset (buf, 0, sizeof buf);
596 ACE_INET_Addr addr;
598 if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
600 ++error;
601 ACE_ERROR_RETURN ((LM_ERROR,
602 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
603 ACE_TEXT ("calling recv\n")), -1);
606 // Zero length buffer means we are done.
607 if (ACE_OS::strlen (buf) == 0)
608 return -1;
609 else if (this->find (buf) == -1)
611 ++error;
612 ACE_DEBUG ((LM_ERROR,
613 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
614 ACE_TEXT ("Received dgram for a group we didn't join ")
615 ACE_TEXT ("(%s)\n"),
616 buf));
618 return 0;
622 MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
623 ACE_Reactor_Mask /*close_mask*/)
625 // If this is the last handler, use the finished flag to signal
626 // the task to exit.
627 if (--MCT_Event_Handler::active_handlers_ == 0)
628 finished = 1;
630 // The DONT_CALL flag keeps the reactor from calling handle_close ()
631 // again, since we commit suicide below.
632 this->reactor ()->remove_handler (this,
633 ACE_Event_Handler::ALL_EVENTS_MASK |
634 ACE_Event_Handler::DONT_CALL);
635 this->reactor (0);
636 delete this;
637 return 0;
640 ACE_HANDLE
641 MCT_Event_Handler::get_handle () const
643 return this->mcast_.get_handle ();
646 /******************************************************************************/
649 * Our MCT_Task object will be an Active Object if we are running the Consumer
650 * side of the test. open() calls active() which creates a thread and calls
651 * the svc() method that calls runs the reactor event loop.
653 class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
655 public:
656 MCT_Task (const MCT_Config &config,
657 ACE_Reactor *reactor = ACE_Reactor::instance ());
658 ~MCT_Task () override;
660 //FUZZ: disable check_for_lack_ACE_OS
661 // = Task hooks.
662 int open (void *args = 0) override;
663 int svc () override;
664 //FUZZ: enable check_for_lack_ACE_OS
666 private:
667 const MCT_Config &config_;
670 MCT_Task::MCT_Task (const MCT_Config &config,
671 ACE_Reactor *reactor)
672 : config_ (config)
674 this->reactor (reactor);
677 MCT_Task::~MCT_Task ()
681 MCT_Task::open (void *)
683 MCT_Event_Handler *handler;
685 ACE_INET_Addr addr = this->config_.group_start ();
686 int groups = this->config_.groups ();
687 for (int i = 0; i < groups; ++i)
689 ACE_NEW_RETURN (handler,
690 MCT_Event_Handler (this->config_.options ()), -1);
691 // We subscribe to all groups for the first one and one each for
692 // all the others.
693 if (i == 0)
695 // go ahead and hide the other one since we want our own.
696 ACE_INET_Addr addr = this->config_.group_start ();
697 for (int j = 0; j < groups; ++j)
699 // If OPT_BINDADDR_YES is set, this will fail after the first
700 // join, so just break and keep on going, otherwise it's a
701 // real error.
702 if (j > 0
703 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
704 this->config_.options ()))
705 break;
707 if (handler->join (addr) == -1)
708 ACE_ERROR_RETURN ((LM_ERROR,
709 ACE_TEXT ("MCT_Task::open - join error\n")),
710 -1);
711 advance_addr (addr);
714 else
716 if (handler->join (addr) == -1)
717 ACE_ERROR_RETURN ((LM_ERROR,
718 ACE_TEXT ("MCT_Task::open - join error\n")),
719 -1);
722 advance_addr (addr);
724 if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
725 ACE_ERROR_RETURN ((LM_ERROR,
726 ACE_TEXT ("MCT_Task::open - cannot register ")
727 ACE_TEXT ("handler\n")),
728 -1);
731 if (this->activate (THR_NEW_LWP) == -1)
732 ACE_ERROR_RETURN ((LM_ERROR,
733 ACE_TEXT ("%p\n"),
734 ACE_TEXT ("MCT_TASK:open - activate failed")),
735 -1);
736 return 0;
740 MCT_Task::svc ()
742 // make sure this thread owns the reactor or handle_events () won't do
743 // anything.
744 this->reactor ()->owner (ACE_Thread::self ());
746 // loop and call handle_events...
747 while (!finished)
748 this->reactor ()->handle_events ();
750 return 0;
753 /******************************************************************************/
755 int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
757 // Send each message twice, once to the right port, and once to the "wrong"
758 // port. This helps generate noise and lets us see if port filtering is
759 // working properly.
760 const char *address = addr.get_host_addr ();
761 int port = addr.get_port_number ();
763 for (int i = 0; i < 2; ++i)
765 char buf[MAX_STRING_SIZE];
766 if (done)
767 buf[0] = 0;
768 else
769 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d", address, port);
770 //ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("sending (%s)\n"), buf));
771 if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
772 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
773 ACE_TEXT ("send_dgram - error calling send on ")
774 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
775 addr.set_port_number (++port);
777 return 0;
780 int producer (MCT_Config &config)
782 int retval = 0;
784 //FUZZ: disable check_for_lack_ACE_OS
785 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
786 ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &));
787 //FUZZ: enable check_for_lack_ACE_OS
789 // set the TTL or hop count based on the config.ttl () value
790 if (config.ttl () > 1 && config.group_start().get_type() == AF_INET)
792 int ttl = config.ttl ();
793 if (socket.set_option (IPPROTO_IP,
794 IP_MULTICAST_TTL,
795 (void*) &ttl,
796 sizeof ttl) != 0)
797 ACE_DEBUG ((LM_ERROR,
798 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
799 ACE_TEXT ("= %d\n"),
800 ttl));
801 else
802 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
804 #if defined (ACE_HAS_IPV6)
805 else
807 // for IPv6, a hop limit is used instead of TTL
808 int hops = config.ttl ();
809 if (socket.set_option (IPPROTO_IPV6,
810 IPV6_MULTICAST_HOPS,
811 (void*) &hops,
812 sizeof hops) != 0)
813 ACE_DEBUG ((LM_ERROR,
814 ACE_TEXT ("could net set socket option IPV6_MULTICAST_HOPS")
815 ACE_TEXT (" = %d\n"),
816 hops));
817 else
818 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IPV6_MULTICAST_HOPS = %d\n"),
819 hops));
822 // Turn on multicast loopback since the test relies on it and the
823 // ACE_SOCK_Dgram_Mcast documents the loopback state as indeterminate.
824 int do_loopback = 1;
825 if (socket.set_option (IPPROTO_IPV6,
826 IPV6_MULTICAST_LOOP,
827 (void *)&do_loopback,
828 sizeof (do_loopback)) == -1)
830 if (errno == ENOTSUP)
831 ACE_DEBUG ((LM_INFO,
832 ACE_TEXT ("IPV6_MULTICAST_LOOP not supported\n")));
833 else
834 ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
835 ACE_TEXT ("Can't set IPV6_MULTICAST_LOOP")));
837 #endif /* ACE_HAS_IPV6 */
840 int iterations = config.iterations ();
841 // we add an extra 5 groups for noise.
842 int groups = config.groups () + 5;
843 for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
845 ACE_INET_Addr addr = config.group_start ();
846 for (int j = 0; j < groups && !finished; ++j)
848 if ((retval += send_dgram (socket, addr,
849 ((i + 1) == iterations))) == -1)
850 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
851 if ((retval += advance_addr (addr)) == -1)
852 ACE_ERROR ((LM_ERROR,
853 ACE_TEXT ("Calling advance_addr.\n")));
855 // Give the task thread a chance to run.
856 ACE_Thread::yield ();
858 return retval;
862 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
863 * Note that the algorithm is somewhat simplistic, but sufficient for our
864 * purpose.
866 int advance_addr (ACE_INET_Addr &addr)
868 int a, b, c, d;
869 if (addr.get_type () == AF_INET)
871 ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
872 if (d < 255)
873 ++d;
874 else if (c < 255)
876 d = 1;
877 ++c;
879 else if (b < 255)
881 d = 1;
882 c = 0;
883 ++b;
885 else if (a < 239)
887 d = 1;
888 c = 0;
889 b = 0;
890 ++a;
892 else
893 ACE_ERROR_RETURN ((LM_ERROR,
894 ACE_TEXT ("advance_addr - Cannot advance multicast ")
895 ACE_TEXT ("group address past %s\n"),
896 addr.get_host_addr ()),
897 -1);
899 ACE_TCHAR buf[MAX_STRING_SIZE];
900 ACE_OS::snprintf (buf, MAX_STRING_SIZE, ACE_TEXT ("%d.%d.%d.%d:%d"),
901 a, b, c, d, addr.get_port_number ());
902 addr.set (buf);
903 return 0;
905 #if defined (ACE_HAS_IPV6)
906 else // assume AF_INET6
908 sockaddr_in6 *saddr = reinterpret_cast<sockaddr_in6 *> (addr.get_addr ());
909 unsigned char *sin6_addr = reinterpret_cast<unsigned char *> (&saddr->sin6_addr);
910 int i = 15;
912 // i >= 2 is used here so that the flags and scope for the
913 // multicast address are not changed
914 while (i >= 2 && sin6_addr[i] == 0xff)
916 sin6_addr[i] = 0;
917 i--;
920 if (i >= 2)
922 sin6_addr[i]++;
924 else
926 ACE_ERROR_RETURN ((LM_ERROR,
927 ACE_TEXT ("advance_addr - Cannot advance ")
928 ACE_TEXT ("multicast group address past %s\n"),
929 addr.get_host_addr ()),
930 -1);
933 #endif /* ACE_HAS_IPV6 */
935 return 0;
939 run_main (int argc, ACE_TCHAR *argv[])
941 int retval = 0;
942 MCT_Config config;
943 retval = config.open (argc, argv);
944 if (retval != 0)
945 return 1;
947 const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test_IPV6");
948 ACE_TString test = temp;
950 u_long role = config.role ();
951 if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
952 || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
954 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
955 test += ACE_TEXT ("-PRODUCER");
956 else
957 test += ACE_TEXT ("-CONSUMER");
960 // Start test only if options are valid.
961 ACE_START_TEST (test.c_str ());
963 #if defined (ACE_HAS_IPV6)
965 # if !defined (ACE_LACKS_UNIX_SIGNALS)
966 // Register a signal handler to close down application gracefully.
967 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
968 # endif
970 // Dump the configuration info to the log if caller passed debug option.
971 if (config.debug ())
972 config.dump ();
974 ACE_Reactor *reactor = ACE_Reactor::instance ();
976 MCT_Task *task = new MCT_Task (config, reactor);
978 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
980 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
981 // Open makes it an active object.
982 retval += task->open ();
985 // now produce the datagrams...
986 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
987 retval += producer (config);
989 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
991 // and wait for everything to finish
992 ACE_DEBUG ((LM_INFO,
993 ACE_TEXT ("start waiting for consumer to finish...\n")));
994 // Wait for the threads to exit.
995 // But, wait for a limited time since we could hang if the last udp
996 // message isn't received.
997 ACE_Time_Value max_wait ( config.wait ()/* seconds */);
998 ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
999 ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
1000 ? &wait_time : 0;
1001 if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
1003 if (errno == ETIME)
1004 ACE_ERROR ((LM_ERROR,
1005 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
1006 max_wait.msec ()));
1007 else
1008 ACE_OS::perror (ACE_TEXT ("wait"));
1010 ++error;
1014 delete task;
1015 #endif /* ACE_HAS_IPV6 */
1016 ACE_END_TEST;
1017 return (retval == 0 && error == 0) ? 0 : 1;
1020 #else
1022 run_main (int, ACE_TCHAR *[])
1024 ACE_START_TEST (ACE_TEXT ("Multicast_Test_IPV6"));
1026 ACE_ERROR ((LM_INFO,
1027 ACE_TEXT ("This test must be run on a platform ")
1028 ACE_TEXT ("that support IP multicast and threads.\n")));
1030 ACE_END_TEST;
1032 return 0;
1034 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */