Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / Multicast_Test.cpp
blob12cbabc8cf5efeb1252ab9e2e17d441ab6d8c210
1 // ============================================================================
2 //
3 // = LIBRARY
4 // tests
5 //
6 // = DESCRIPTION
7 // This program tests ACE_SOCK_Dgram_Mcast class.
8 // It specifically tests subscribing to multiple groups on the same socket
9 // on one or more physical interfaces (if available).
11 // The test can be run as a producer, consumer, or both producer/consumer
12 // (default). The test requires at least two (2) multicast groups which can
13 // be configured as command line options. The consumer subscribes to a
14 // single group per instance and an additional instance tries to subscribe
15 // to all groups on a single socket (if the ACE_SOCK_Dgram_Mcast instance
16 // bind()'s the first address to the socket, additional joins will fail).
17 // The producer iterates through the list of group addresses and sends a
18 // single message containing the destination address and port to each one.
19 // It also sends messages to five (5) additional groups and a message to an
20 // additional port for each group in order to produce a bit of "noise" in
21 // order to help validate how well the multicast filtering works on a
22 // particular platform.
24 // The list of destination groups start at 239.255.0.1 (default) and
25 // increment by 1 up to 5 (default) groups. Both of these values, as well
26 // as others, can be overridden via command-line options. Use the -?
27 // option to display the usage message...
29 // = AUTHOR
30 // Don Hinton <dhinton@dresystems.com>
32 // ============================================================================
34 #include "test_config.h"
35 #include "ace/Get_Opt.h"
36 #include "ace/Vector_T.h"
37 #include "ace/SOCK_Dgram_Mcast.h"
38 #include "ace/ACE.h"
39 #include "ace/Reactor.h"
40 #include "ace/OS_NS_stdio.h"
41 #include "ace/OS_NS_string.h"
42 #include "ace/OS_NS_strings.h"
43 #include "ace/OS_NS_sys_time.h"
44 #include "ace/Task.h"
45 #include "ace/Atomic_Op.h"
46 #include "ace/SString.h"
47 #include "ace/Signal.h"
48 #include "ace/Min_Max.h"
50 #if defined (ACE_HAS_IP_MULTICAST) && defined (ACE_HAS_THREADS)
53 * The 'finished' flag is used to break out of an infninite loop in the
54 * task::svc () method. The 'handler' will set the flag in respose to
55 * SIGINT (CTRL-C).
57 static sig_atomic_t finished = 0;
58 extern "C" void handler (int)
60 finished = 1;
63 static const int MCT_ITERATIONS = 10;
64 static const int MCT_GROUPS = 5;
65 static const int MCT_MIN_GROUPS = 2;
67 static const char MCT_START_GROUP[] = "239.255.0.1";
68 static const int MCT_START_PORT = 16000;
70 static const size_t MAX_STRING_SIZE = 200;
72 int advance_addr (ACE_INET_Addr &addr);
74 // Keep track of errors so we can report them on exit.
75 static sig_atomic_t error = 0;
78 * MCast_Config holds configuration data for this test.
80 class MCT_Config
82 public:
84 enum
86 PRODUCER = 1,
87 CONSUMER = 2,
88 BOTH = PRODUCER | CONSUMER
91 MCT_Config (void)
92 : group_start_ (MCT_START_PORT, MCT_START_GROUP),
93 groups_ (0),
94 debug_ (0),
95 role_ (BOTH),
96 sdm_opts_ (ACE_SOCK_Dgram_Mcast::DEFOPTS),
97 iterations_ (MCT_ITERATIONS),
98 ttl_ (1),
99 wait_ (2)
101 if (IP_MAX_MEMBERSHIPS == 0)
102 this->groups_ = MCT_GROUPS;
103 else
104 this->groups_ = ACE_MIN (IP_MAX_MEMBERSHIPS, MCT_GROUPS);
106 ~MCT_Config (void)
109 //FUZZ: disable check_for_lack_ACE_OS
110 int open (int argc, ACE_TCHAR *argv[]);
111 //FUZZ: enable check_for_lack_ACE_OS
113 int debug (void) const { return this->debug_;}
114 void dump (void) const;
115 int groups (void) const { return this->groups_;}
116 const ACE_INET_Addr group_start (void) const { return this->group_start_;}
117 u_long role (void) const { return this->role_;}
118 int iterations (void) const { return this->iterations_;}
119 int ttl (void) const { return this->ttl_;}
121 //FUZZ: disable check_for_lack_ACE_OS
122 int wait (void) const { return this->wait_;}
123 //FUZZ: enable check_for_lack_ACE_OS
125 ACE_SOCK_Dgram_Mcast::options options (void) const
127 return static_cast<ACE_SOCK_Dgram_Mcast::options> (this->sdm_opts_);
130 private:
131 // Starting group address. (only IPv4 capable right now...)
132 ACE_INET_Addr group_start_;
134 // Number of groups we will try to use in the test.
135 int groups_;
137 // Debug flag.
138 int debug_;
140 // Role, i.e., PRODUCER, CONSUMER, BOTH: defaults to BOTH
141 u_long role_;
143 // ACE_SOCK_Dgram_Mcast ctor options
144 u_long sdm_opts_;
146 // Producer iterations
147 int iterations_;
149 // TTL, time to live, for use over routers.
150 int ttl_;
152 // Time to wait on CONSUMER threads to end before killing test.
153 int wait_;
157 MCT_Config::open (int argc, ACE_TCHAR *argv[])
159 int retval = 0;
160 int help = 0;
162 //FUZZ: disable check_for_lack_ACE_OS
163 ACE_Get_Opt getopt (argc, argv, ACE_TEXT (":?"), 1, 1);
164 //FUZZ: enable check_for_lack_ACE_OS
166 if (getopt.long_option (ACE_TEXT ("GroupStart"),
167 'g',
168 ACE_Get_Opt::ARG_REQUIRED) != 0)
169 ACE_ERROR_RETURN ((LM_ERROR,
170 ACE_TEXT (" Unable to add GroupStart option.\n")),
173 if (getopt.long_option (ACE_TEXT ("Groups"),
174 'n',
175 ACE_Get_Opt::ARG_REQUIRED) != 0)
176 ACE_ERROR_RETURN ((LM_ERROR,
177 ACE_TEXT (" Unable to add Groups option.\n")), 1);
179 if (getopt.long_option (ACE_TEXT ("Debug"),
180 'd',
181 ACE_Get_Opt::NO_ARG) != 0)
182 ACE_ERROR_RETURN ((LM_ERROR,
183 ACE_TEXT (" Unable to add Debug option.\n")), 1);
185 if (getopt.long_option (ACE_TEXT ("Role"),
186 'r',
187 ACE_Get_Opt::ARG_REQUIRED) != 0)
188 ACE_ERROR_RETURN ((LM_ERROR,
189 ACE_TEXT (" Unable to add Role option.\n")), 1);
191 if (getopt.long_option (ACE_TEXT ("SDM_options"),
192 'm',
193 ACE_Get_Opt::ARG_REQUIRED) != 0)
194 ACE_ERROR_RETURN ((LM_ERROR,
195 ACE_TEXT (" Unable to add Multicast_Options option.\n")),
198 if (getopt.long_option (ACE_TEXT ("Iterations"),
199 'i',
200 ACE_Get_Opt::ARG_REQUIRED) != 0)
201 ACE_ERROR_RETURN ((LM_ERROR,
202 ACE_TEXT (" Unable to add iterations option.\n")),
205 if (getopt.long_option (ACE_TEXT ("TTL"),
206 't',
207 ACE_Get_Opt::ARG_REQUIRED) != 0)
208 ACE_ERROR_RETURN ((LM_ERROR,
209 ACE_TEXT (" Unable to add TTL option.\n")),
212 if (getopt.long_option (ACE_TEXT ("Wait"),
213 'w',
214 ACE_Get_Opt::ARG_REQUIRED) != 0)
215 ACE_ERROR_RETURN ((LM_ERROR,
216 ACE_TEXT (" Unable to add wait option.\n")),
219 if (getopt.long_option (ACE_TEXT ("help"),
220 'h',
221 ACE_Get_Opt::NO_ARG) != 0)
222 ACE_ERROR_RETURN ((LM_ERROR,
223 ACE_TEXT (" Unable to add help option.\n")),
226 //FUZZ: disable check_for_lack_ACE_OS
227 // Now, let's parse it...
228 int c = 0;
229 while ((c = getopt ()) != EOF)
231 //FUZZ: enable check_for_lack_ACE_OS
232 switch (c)
234 case 0:
235 // Long Option. This should never happen.
236 retval = -1;
237 break;
238 case 'g':
240 // @todo validate all these, i.e., must be within range
241 // 224.255.0.0 to 238.255.255.255, but we only allow the
242 // administrative "site local" range, 239.255.0.0 to
243 // 239.255.255.255.
244 ACE_TCHAR *group = getopt.opt_arg ();
245 if (this->group_start_.set (group) != 0)
247 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Bad group address:%s\n"),
248 group));
251 break;
252 case 'i':
253 this->iterations_ = ACE_OS::atoi (getopt.opt_arg ());
254 break;
255 case 'n':
257 int n = ACE_OS::atoi (getopt.opt_arg ());
258 // I'm assuming 0 means unlimited, so just use whatever the
259 // user provides. Seems to work okay on Solaris 5.8.
260 if (IP_MAX_MEMBERSHIPS == 0)
261 this->groups_ = n;
262 else
263 this->groups_ = ACE_MIN (ACE_MAX (n, MCT_MIN_GROUPS),
264 IP_MAX_MEMBERSHIPS);
265 break;
267 case 'd':
268 this->debug_ = 1;
269 break;
270 case 'r':
272 ACE_TCHAR *c = getopt.opt_arg ();
273 if (ACE_OS::strcasecmp (c, ACE_TEXT ("CONSUMER")) == 0)
274 this->role_ = CONSUMER;
275 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("PRODUCER")) == 0)
276 this->role_ = PRODUCER;
277 else
279 help = 1;
280 retval = -1;
283 break;
284 case 'm':
286 //@todo add back OPT_BINDADDR_NO...
287 ACE_TCHAR *c = getopt.opt_arg ();
288 if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_YES")) == 0)
289 ACE_SET_BITS (this->sdm_opts_,
290 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
291 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_BINDADDR_NO")) == 0)
292 ACE_CLR_BITS (this->sdm_opts_,
293 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
294 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_BINDADDR")) == 0)
296 ACE_CLR_BITS (this->sdm_opts_,
297 ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES);
298 ACE_SET_BITS (this->sdm_opts_,
299 ACE_SOCK_Dgram_Mcast::DEFOPT_BINDADDR);
301 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ALL")) == 0)
302 ACE_SET_BITS (this->sdm_opts_,
303 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
304 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("OPT_NULLIFACE_ONE")) == 0)
305 ACE_CLR_BITS (this->sdm_opts_,
306 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
307 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPT_NULLIFACE")) == 0)
309 ACE_CLR_BITS (this->sdm_opts_,
310 ACE_SOCK_Dgram_Mcast::OPT_NULLIFACE_ALL);
311 ACE_SET_BITS (this->sdm_opts_,
312 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
314 else if (ACE_OS::strcasecmp (c, ACE_TEXT ("DEFOPTS")) == 0)
315 this->sdm_opts_ = ACE_SOCK_Dgram_Mcast::DEFOPTS;
316 else
318 help = 1;
319 retval = -1;
322 break;
323 case 't':
324 this->ttl_ = ACE_OS::atoi (getopt.opt_arg ());
325 break;
326 case 'w':
327 this->wait_ = ACE_OS::atoi (getopt.opt_arg ());
328 break;
329 case ':':
330 // This means an option requiring an argument didn't have one.
331 ACE_ERROR ((LM_ERROR,
332 ACE_TEXT (" Option '%c' requires an argument but ")
333 ACE_TEXT ("none was supplied\n"),
334 getopt.opt_opt ()));
335 help = 1;
336 retval = -1;
337 break;
338 case '?':
339 case 'h':
340 default:
341 if (ACE_OS::strcmp (argv[getopt.opt_ind () - 1], ACE_TEXT ("-?")) != 0
342 && getopt.opt_opt () != 'h')
343 // Don't allow unknown options.
344 ACE_ERROR ((LM_ERROR,
345 ACE_TEXT (" Found an unknown option (%c) ")
346 ACE_TEXT ("we couldn't handle.\n"),
347 getopt.opt_opt ()));
348 // getopt.last_option ())); //readd with "%s" when
349 // last_option() is available.
350 help = 1;
351 retval = -1;
352 break;
356 if (retval == -1)
358 if (help)
359 // print usage here
360 ACE_ERROR ((LM_ERROR,
361 ACE_TEXT ("usage: %s [options]\n")
362 ACE_TEXT ("Options:\n")
363 ACE_TEXT (" -g {STRING} --GroupStart={STRING} ")
364 ACE_TEXT ("starting multicast group address\n")
365 ACE_TEXT (" ")
366 ACE_TEXT ("(default=239.255.0.1:16000)\n")
367 ACE_TEXT (" -n {#} --Groups={#} ")
368 ACE_TEXT ("number of groups (default=5)\n")
369 ACE_TEXT (" -d --Debug ")
370 ACE_TEXT ("debug flag (default=off)\n")
371 ACE_TEXT (" -r {STRING} --Role={STRING} ")
372 ACE_TEXT ("role {PRODUCER|CONSUMER|BOTH}\n")
373 ACE_TEXT (" ")
374 ACE_TEXT ("(default=BOTH)\n")
375 ACE_TEXT (" -m {STRING} --SDM_options={STRING} ")
376 ACE_TEXT ("ACE_SOCK_Dgram_Mcast ctor options\n")
377 ACE_TEXT (" ")
378 ACE_TEXT ("(default=DEFOPTS)\n")
379 ACE_TEXT (" -i {#} --Iterations={#} ")
380 ACE_TEXT ("number of iterations (default=100)\n")
381 ACE_TEXT (" -t {#} --TTL={#} ")
382 ACE_TEXT ("time to live (default=1)\n")
383 ACE_TEXT (" -w {#} --Wait={#} ")
384 ACE_TEXT ("number of seconds to wait on CONSUMER\n")
385 ACE_TEXT (" ")
386 ACE_TEXT ("(default=2)\n")
387 ACE_TEXT (" -h/? --help ")
388 ACE_TEXT ("show this message\n"),
389 argv[0]));
391 return -1;
394 return 0;
397 void
398 MCT_Config::dump (void) const
400 ACE_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
401 ACE_DEBUG ((LM_DEBUG, ACE_TEXT (" Dumping MCT_Config\n")));
402 ACE_DEBUG ((LM_DEBUG,
403 ACE_TEXT ("\tIP_MAX_MEMBERSHIPS = %d\n"),
404 IP_MAX_MEMBERSHIPS));
405 ACE_DEBUG ((LM_DEBUG,
406 ACE_TEXT ("\tgroups_ = %d\n"),
407 this->groups_));
408 ACE_DEBUG ((LM_DEBUG,
409 ACE_TEXT ("\trole_ = %s\n"),
410 (ACE_BIT_ENABLED (this->role_, PRODUCER)
411 && ACE_BIT_ENABLED (this->role_, CONSUMER))
412 ? ACE_TEXT ("PRODUCER/CONSUMER")
413 : ACE_BIT_ENABLED (this->role_, PRODUCER)
414 ? ACE_TEXT ("PRODUCER")
415 : ACE_TEXT ("CONSUMER")));
416 ACE_DEBUG ((LM_DEBUG,
417 ACE_TEXT ("\tsdm_options_ = %d\n"),
418 this->sdm_opts_));
419 ACE_DEBUG ((LM_DEBUG,
420 ACE_TEXT ("\titerations_ = %d\n"),
421 this->iterations_));
422 ACE_DEBUG ((LM_DEBUG,
423 ACE_TEXT ("\tttl_ = %d\n"),
424 this->ttl_));
425 ACE_DEBUG ((LM_DEBUG,
426 ACE_TEXT ("\twait_ = %d\n"),
427 this->wait_));
428 // Note that this call to get_host_addr is the non-reentrant
429 // version, but it's okay for us.
430 ACE_DEBUG ((LM_DEBUG,
431 ACE_TEXT ("\tgroups_start_ = %s:%d\n"),
432 this->group_start_.get_host_addr (),
433 this->group_start_.get_port_number ()));
435 ACE_DEBUG ((LM_DEBUG, ACE_END_DUMP));
438 /******************************************************************************/
440 class MCT_Event_Handler : public ACE_Event_Handler
442 public:
443 MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options
444 = ACE_SOCK_Dgram_Mcast::DEFOPTS);
445 virtual ~MCT_Event_Handler (void);
447 int join (const ACE_INET_Addr &mcast_addr,
448 int reuse_addr = 1,
449 const ACE_TCHAR *net_if = 0);
450 int leave (const ACE_INET_Addr &mcast_addr,
451 const ACE_TCHAR *net_if = 0);
453 // = Event Handler hooks.
454 virtual int handle_input (ACE_HANDLE handle);
455 virtual int handle_close (ACE_HANDLE fd, ACE_Reactor_Mask close_mask);
457 virtual ACE_HANDLE get_handle (void) const;
459 // Turn loopback on/off. Must be called after at least 1 join() is performed.
460 int loopback (bool on_off);
462 protected:
463 ACE_SOCK_Dgram_Mcast *mcast (void);
464 int find (const char *buf);
466 private:
467 ACE_SOCK_Dgram_Mcast mcast_;
469 // List of groups we've joined
470 ACE_Vector<ACE_CString*> address_vec_;
472 // Flag used to set the 'finished' flag when the last event handler
473 // gets removed from the reactor.
474 static ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> active_handlers_;
477 ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> MCT_Event_Handler::active_handlers_ = 0;
479 MCT_Event_Handler::MCT_Event_Handler (ACE_SOCK_Dgram_Mcast::options options)
480 : mcast_ (options)
482 // Increment the number of active handlers in the reactor. Note this isn't
483 // really correct, but it should work for our simple example.
484 ++MCT_Event_Handler::active_handlers_;
487 MCT_Event_Handler::~MCT_Event_Handler (void)
489 size_t size = this->address_vec_.size ();
490 for (size_t i = 0; i < size; ++i)
492 delete this->address_vec_[i];
493 this->address_vec_[i] = 0;
495 mcast_.close ();
499 ACE_SOCK_Dgram_Mcast *
500 MCT_Event_Handler::mcast (void)
502 return &this->mcast_;
506 MCT_Event_Handler::find (const char *buf)
508 size_t const size = this->address_vec_.size ();
509 size_t i = 0;
510 for (i = 0; i < size; ++i)
512 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
513 return 0;
516 // Not found, so output message we received along with a list of groups
517 // we've joined for debugging.
518 ACE_CString local;
519 for (i = 0; i < size; ++i)
521 local += "\t";
522 local += this->address_vec_[i]->c_str ();
523 local += "\n";
525 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("%C not in:\n%C"),
526 buf, local.c_str ()));
528 return -1;
533 MCT_Event_Handler::join (const ACE_INET_Addr &mcast_addr,
534 int reuse_addr,
535 const ACE_TCHAR *net_if)
537 char buf[MAX_STRING_SIZE];
538 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
539 mcast_addr.get_host_addr (),
540 mcast_addr.get_port_number ());
542 if (this->mcast_.join (mcast_addr, reuse_addr, net_if) == -1)
543 ACE_ERROR_RETURN ((LM_ERROR,
544 ACE_TEXT ("MCT_Event_Handler::join %C %p\n"),
545 buf,
546 ACE_TEXT ("failed")),
547 -1);
548 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Joined %C\n"), buf));
550 ACE_CString *str = 0;
551 ACE_NEW_RETURN (str, ACE_CString (buf), -1);
552 this->address_vec_.push_back (str);
553 return 0;
557 MCT_Event_Handler::leave (const ACE_INET_Addr &mcast_addr,
558 const ACE_TCHAR *net_if)
560 if (this->mcast_.leave (mcast_addr, net_if) == 0)
562 char buf[MAX_STRING_SIZE];
563 size_t size = this->address_vec_.size ();
564 for (size_t i = 0; i < size; ++i)
566 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d",
567 mcast_addr.get_host_addr (),
568 mcast_addr.get_port_number ());
569 if (ACE_OS::strcasecmp (buf, this->address_vec_[i]->c_str ()) == 0)
571 this->address_vec_[i]->set ("");
572 break;
575 return 0;
577 return -1;
581 MCT_Event_Handler::handle_input (ACE_HANDLE /*handle*/)
583 char buf[MAX_STRING_SIZE];
584 ACE_OS::memset (buf, 0, sizeof buf);
585 ACE_INET_Addr addr;
587 if (this->mcast ()->recv (buf, sizeof buf, addr) == -1)
589 ++error;
590 ACE_ERROR_RETURN ((LM_ERROR,
591 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
592 ACE_TEXT ("calling recv\n")), -1);
595 // Zero length buffer means we are done.
596 if (ACE_OS::strlen (buf) == 0)
597 return -1;
598 else if (this->find (buf) == -1)
600 ++error;
601 ACE_DEBUG ((LM_ERROR,
602 ACE_TEXT ("MCT_Event_Handler::handle_input - ")
603 ACE_TEXT ("Received dgram for a group we didn't join ")
604 ACE_TEXT ("(%s)\n"),
605 buf));
607 return 0;
611 MCT_Event_Handler::handle_close (ACE_HANDLE /*fd*/,
612 ACE_Reactor_Mask /*close_mask*/)
614 // If this is the last handler, use the finished flag to signal
615 // the task to exit.
616 if (--MCT_Event_Handler::active_handlers_ == 0)
617 finished = 1;
619 // The DONT_CALL flag keeps the reactor from calling handle_close ()
620 // again, since we commit suicide below.
621 this->reactor ()->remove_handler (this,
622 ACE_Event_Handler::ALL_EVENTS_MASK |
623 ACE_Event_Handler::DONT_CALL);
624 this->reactor (0);
625 delete this;
626 return 0;
629 ACE_HANDLE
630 MCT_Event_Handler::get_handle (void) const
632 return this->mcast_.get_handle ();
635 // Turn loopback on/off
637 MCT_Event_Handler::loopback (bool on_off)
639 char loopback_on = on_off ? 1 : 0;
640 return this->mcast_.set_option (IP_MULTICAST_LOOP, loopback_on);
643 /******************************************************************************/
646 * Our MCT_Task object will be an Active Object if we are running the Consumer
647 * side of the test. open() calls active() which creates a thread and calls
648 * the svc() method that calls runs the reactor event loop.
650 class MCT_Task : public ACE_Task<ACE_NULL_SYNCH>
652 public:
653 MCT_Task (const MCT_Config &config,
654 ACE_Reactor *reactor = ACE_Reactor::instance ());
655 ~MCT_Task (void);
657 //FUZZ: disable check_for_lack_ACE_OS
658 // = Task hooks.
659 virtual int open (void *args = 0);
660 //FUZZ: enable check_for_lack_ACE_OS
662 virtual int svc (void);
664 private:
665 const MCT_Config &config_;
668 MCT_Task::MCT_Task (const MCT_Config &config,
669 ACE_Reactor *reactor)
670 : config_ (config)
672 this->reactor (reactor);
675 MCT_Task::~MCT_Task (void)
679 MCT_Task::open (void *)
681 MCT_Event_Handler *handler = 0;
683 ACE_INET_Addr addr = this->config_.group_start ();
684 int groups = this->config_.groups ();
685 for (int i = 0; i < groups; ++i)
687 ACE_NEW_RETURN (handler,
688 MCT_Event_Handler (this->config_.options ()), -1);
689 // We subscribe to all groups for the first one and one each for
690 // all the others.
691 if (i == 0)
693 // go ahead and hide the other one since we want our own.
694 ACE_INET_Addr addr = this->config_.group_start ();
695 for (int j = 0; j < groups; ++j)
697 // If OPT_BINDADDR_YES is set, this will fail after the first
698 // join, so just break and keep on going, otherwise it's a
699 // real error.
700 if (j > 0
701 && ACE_BIT_ENABLED (ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_YES,
702 this->config_.options ()))
703 break;
705 if (handler->join (addr) == -1)
706 ACE_ERROR_RETURN ((LM_ERROR,
707 ACE_TEXT ("MCT_Task::open - join error\n")),
708 -1);
709 advance_addr (addr);
712 else
714 if (handler->join (addr) == -1)
715 ACE_ERROR_RETURN ((LM_ERROR,
716 ACE_TEXT ("MCT_Task::open - join error\n")),
717 -1);
720 advance_addr (addr);
722 // This test needs loopback because we're both sending and receiving.
723 // Loopback is usually the default, but be sure.
724 if (-1 == handler->loopback (true))
725 ACE_ERROR ((LM_WARNING,
726 ACE_TEXT ("%p\n"),
727 ACE_TEXT ("MCT_Task::open - enable loopback")));
729 if (this->reactor ()->register_handler (handler, READ_MASK) == -1)
730 ACE_ERROR_RETURN ((LM_ERROR,
731 ACE_TEXT ("MCT_Task::open - cannot register ")
732 ACE_TEXT ("handler\n")),
733 -1);
736 if (this->activate (THR_NEW_LWP) == -1)
737 ACE_ERROR_RETURN ((LM_ERROR,
738 ACE_TEXT ("%p\n"),
739 ACE_TEXT ("MCT_TASK:open - activate failed")),
740 -1);
741 return 0;
745 MCT_Task::svc (void)
747 // make sure this thread owns the reactor or handle_events () won't do
748 // anything.
749 this->reactor ()->owner (ACE_Thread::self ());
751 // loop and call handle_events...
752 while (!finished)
753 this->reactor ()->handle_events ();
755 return 0;
758 /******************************************************************************/
760 int send_dgram (ACE_SOCK_Dgram &socket, ACE_INET_Addr addr, int done = 0)
763 // Send each message twice, once to the right port, and once to the "wrong"
764 // port. This helps generate noise and lets us see if port filtering is
765 // working properly.
766 const char *address = addr.get_host_addr ();
767 int port = addr.get_port_number ();
769 for (int i = 0; i < 2; ++i)
771 char buf[MAX_STRING_SIZE];
772 if (done)
773 buf[0] = 0;
774 else
775 ACE_OS::snprintf (buf, MAX_STRING_SIZE, "%s/%d", address, port);
777 if (socket.send (buf, ACE_OS::strlen (buf),addr) == -1)
778 ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("Send to %C, %p\n"),
779 address,
780 ACE_TEXT ("send_dgram - error calling send on ")
781 ACE_TEXT ("ACE_SOCK_Dgram.")), -1);
782 addr.set_port_number (++port);
784 return 0;
787 int producer (MCT_Config &config)
789 int retval = 0;
791 //FUZZ: disable check_for_lack_ACE_OS
792 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting producer...\n")));
793 ACE_SOCK_Dgram socket (ACE_sap_any_cast (ACE_INET_Addr &), PF_INET);
794 //FUZZ: enable check_for_lack_ACE_OS
796 // Note that is is IPv4 specific and needs to be changed once
798 if (config.ttl () > 1)
800 int ttl = config.ttl ();
801 if (socket.set_option (IPPROTO_IP,
802 IP_MULTICAST_TTL,
803 (void*) &ttl,
804 sizeof ttl) != 0)
805 ACE_DEBUG ((LM_ERROR,
806 ACE_TEXT ("could net set socket option IP_MULTICAST_TTL ")
807 ACE_TEXT ("= %d\n"),
808 ttl));
809 else
810 ACE_DEBUG ((LM_INFO, ACE_TEXT ("set IP_MULTICAST_TTL = %d\n"), ttl));
813 int iterations = config.iterations ();
814 // we add an extra 5 groups for noise.
815 int groups = config.groups () + 5;
816 for (int i = 0; (i < iterations || iterations == 0) && !finished; ++i)
818 ACE_INET_Addr addr = config.group_start ();
819 for (int j = 0; j < groups && !finished; ++j)
821 if ((retval += send_dgram (socket, addr,
822 ((i + 1) == iterations))) == -1)
823 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Calling send_dgram.\n")));
824 if ((retval += advance_addr (addr)) == -1)
825 ACE_ERROR ((LM_ERROR,
826 ACE_TEXT ("Calling advance_addr.\n")));
828 // Give the task thread a chance to run.
829 ACE_Thread::yield ();
831 socket.close ();
832 return retval;
836 * Advance the address by 1, e.g., 239.255.0.1 => 239.255.0.2
837 * Note that the algorithm is somewhat simplistic, but sufficient for our
838 * purpose.
840 int advance_addr (ACE_INET_Addr &addr)
842 int a, b, c, d;
843 ::sscanf (addr.get_host_addr (), "%d.%d.%d.%d", &a, &b, &c, &d);
844 if (d < 255)
845 ++d;
846 else if (c < 255)
848 d = 1;
849 ++c;
851 else if (b < 255)
853 d = 1;
854 c = 0;
855 ++b;
857 else if (a < 239)
859 d = 1;
860 c = 0;
861 b = 0;
862 ++a;
864 else
865 ACE_ERROR_RETURN ((LM_ERROR,
866 ACE_TEXT ("advance_addr - Cannot advance multicast ")
867 ACE_TEXT ("group address past %s\n"),
868 addr.get_host_addr ()),
869 -1);
871 ACE_TCHAR buf[MAX_STRING_SIZE];
872 ACE_OS::snprintf (buf, MAX_STRING_SIZE, ACE_TEXT ("%d.%d.%d.%d:%d"),
873 a, b, c, d, addr.get_port_number ());
874 addr.set (buf);
875 return 0;
879 run_main (int argc, ACE_TCHAR *argv[])
881 int retval = 0;
882 MCT_Config config;
883 retval = config.open (argc, argv);
884 if (retval != 0)
885 return 1;
887 const ACE_TCHAR *temp = ACE_TEXT ("Multicast_Test");
888 ACE_TString test = temp;
890 u_long role = config.role ();
891 if (ACE_BIT_DISABLED (role, MCT_Config::PRODUCER)
892 || ACE_BIT_DISABLED (role, MCT_Config::CONSUMER))
894 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
895 test += ACE_TEXT ("-PRODUCER");
896 else
897 test += ACE_TEXT ("-CONSUMER");
900 // Start test only if options are valid.
901 ACE_START_TEST (test.c_str ());
903 // Register a signal handler to close down application gracefully.
904 ACE_Sig_Action sa ((ACE_SignalHandler) handler, SIGINT);
906 // Dump the configuration info to the log if caller passed debug option.
907 if (config.debug ())
908 config.dump ();
910 ACE_Reactor *reactor = ACE_Reactor::instance ();
912 MCT_Task *task = new MCT_Task (config, reactor);
914 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
916 ACE_DEBUG ((LM_INFO, ACE_TEXT ("Starting consumer...\n")));
917 // Open makes it an active object.
918 retval += task->open ();
921 // now produce the datagrams...
922 if (ACE_BIT_ENABLED (role, MCT_Config::PRODUCER))
923 retval += producer (config);
925 if (ACE_BIT_ENABLED (role, MCT_Config::CONSUMER))
927 // and wait for everything to finish
928 ACE_DEBUG ((LM_INFO,
929 ACE_TEXT ("start waiting for consumer to finish...\n")));
930 // Wait for the threads to exit.
931 // But, wait for a limited time since we could hang if the last udp
932 // message isn't received.
933 ACE_Time_Value max_wait ( config.wait ()/* seconds */);
934 ACE_Time_Value wait_time (ACE_OS::gettimeofday () + max_wait);
935 ACE_Time_Value *ptime = ACE_BIT_ENABLED (role, MCT_Config::PRODUCER)
936 ? &wait_time : 0;
937 if (ACE_Thread_Manager::instance ()->wait (ptime) == -1)
939 // We will no longer wait for this thread, so we must
940 // force it to exit otherwise the thread will be referencing
941 // deleted memory.
942 finished = 1;
943 reactor->end_reactor_event_loop ();
945 if (errno == ETIME)
946 ACE_ERROR ((LM_ERROR,
947 ACE_TEXT ("maximum wait time of %d msec exceeded\n"),
948 max_wait.msec ()));
949 else
950 ACE_OS::perror (ACE_TEXT ("wait"));
952 ++error;
954 // This should exit now that we ended the reactor loop.
955 task->wait ();
959 delete task;
960 ACE_END_TEST;
961 return (retval == 0 && error == 0) ? 0 : 1;
964 #else
966 run_main (int, ACE_TCHAR *[])
968 ACE_START_TEST (ACE_TEXT ("Multicast_Test"));
970 ACE_ERROR ((LM_INFO,
971 ACE_TEXT ("This test must be run on a platform ")
972 ACE_TEXT ("that support IP multicast.\n")));
974 ACE_END_TEST;
975 return 1;
977 #endif /* ACE_HAS_IP_MULTICAST && ACE_HAS_THREADS */