Merge pull request #2220 from DOCGroup/revert-2217-jwi-inetwraning
[ACE_TAO.git] / ACE / tests / Process_Strategy_Test.cpp
blobd8df9490bac1aaf0e1dd4d6450c082ce1b76f74f
2 //=============================================================================
3 /**
4 * @file Process_Strategy_Test.cpp
6 * This is a test of the <ACE_Strategy_Acceptor> and
7 * <ACE_File_Lock> classes. The <ACE_Strategy_Acceptor> uses
8 * either the <ACE_Process_Strategy> (which forks a
9 * process-per-connection and runs as a concurrent server
10 * process), the <ACE_Thread_Strategy> (which spawns a
11 * thread-per-connection and runs as a concurrent server thread),
12 * or <ACE_Reactive_Strategy> (which register the <Svc_Handler>
13 * with the <Reactor> and runs in the main thread of control as an
14 * iterative server). This server queries and increments a
15 * "counting value" in a file.
17 * This test program can be run in the following ways:
19 * # Run the server "reactively" (i.e., iteratively)
20 * % Process_Strategy_Test -c REACTIVE
22 * # Run the server in multi-threads.
23 * % Process_Strategy_Test -c THREAD
25 * # Run the server in multi-processes
26 * % Process_Strategy_Test -c PROCESS
28 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Kevin Boyle <kboyle@sanwafp.com>
30 //=============================================================================
32 #include "test_config.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/Acceptor.h"
36 #include "ace/Handle_Set.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/SOCK_Acceptor.h"
41 #include "ace/SOCK_Connector.h"
42 #include "ace/Strategies_T.h"
43 #include "ace/Singleton.h"
44 #include "ace/Synch_Traits.h"
45 #include "ace/File_Lock.h"
46 #include "ace/Lib_Find.h"
48 // Counting_Service and Options in here
49 #include "Process_Strategy_Test.h"
51 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, Options, ACE_Null_Mutex);
53 // Define a <Strategy_Acceptor> that's parameterized by the
54 // <Counting_Service>.
56 using ACCEPTOR = ACE_Strategy_Acceptor<Counting_Service, ACE_SOCK_Acceptor>;
58 // Create an Options Singleton.
59 using OPTIONS = ACE_Singleton<Options, ACE_Null_Mutex>;
61 // counter for connections
62 static size_t connections = 0;
64 // Use this to show down the process gracefully.
65 void
66 connection_completed ()
68 // Increment connection counter.
69 ++connections;
71 // If all connections have been serviced.
72 if (connections == ACE_MAX_ITERATIONS + 1)
73 // Make sure that the event loop is interrupted.
74 ACE_Reactor::instance()->wakeup_all_threads ();
77 // Constructor
78 Process_Strategy::Process_Strategy (size_t n_processes,
79 ACE_Event_Handler *acceptor,
80 ACE_Reactor *r,
81 int avoid_zombies)
82 : ACE_Process_Strategy<Counting_Service> (n_processes,
83 acceptor,
85 avoid_zombies)
89 // Destructor. g++ 2.7.2.3 gets very confused ("Internal compiler
90 // error") without it.
91 Process_Strategy::~Process_Strategy ()
95 // Overwrite the process creation method to include connection
96 // counting.
98 int
99 Process_Strategy::activate_svc_handler (Counting_Service *svc_handler,
100 void *arg)
102 // Call down to the base class
103 int const result =
104 ACE_Process_Strategy<Counting_Service>::activate_svc_handler (svc_handler,
105 arg);
106 // Connection is now complete
107 connection_completed ();
109 return result;
112 ACE_File_Lock &
113 Options::file_lock ()
115 return this->file_lock_;
118 ACE_Concurrency_Strategy <Counting_Service> *
119 Options::concurrency_strategy ()
121 return this->concurrency_strategy_;
124 const ACE_TCHAR *
125 Options::filename ()
127 return this->filename_;
130 Options::Options ()
132 // Choose to use processes by default.
133 #if !defined (ACE_LACKS_FORK)
134 concurrency_type_ (PROCESS)
135 #elif defined (ACE_HAS_THREADS)
136 concurrency_type_ (THREAD)
137 #else
138 concurrency_type_ (REACTIVE)
139 #endif /* !ACE_LACKS_FORK */
143 Options::~Options ()
145 delete this->concurrency_strategy_;
149 Options::parse_args (int argc, ACE_TCHAR *argv[])
151 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:c:f:"));
153 // - 26 is for the "process_strategy_test_temp" that is appended
154 if (ACE::get_temp_dir (this->filename_, MAXPATHLEN - 26) == -1)
155 ACE_ERROR_RETURN ((LM_ERROR,
156 ACE_TEXT ("Temporary path too long\n")),
157 -1);
159 ACE_OS::strcat (this->filename_, ACE_TEXT ("process_strategy_test_temp"));
161 for (int c; (c = get_opt ()) != -1; )
162 switch (c)
164 case 'c':
165 if (ACE_OS::strcmp (get_opt.opt_arg (),
166 ACE_TEXT ("REACTIVE")) == 0)
167 OPTIONS::instance ()->concurrency_type (Options::REACTIVE);
168 #if !defined (ACE_LACKS_FORK)
169 else if (ACE_OS::strcmp (get_opt.opt_arg (),
170 ACE_TEXT ("PROCESS")) == 0)
171 OPTIONS::instance ()->concurrency_type (Options::PROCESS);
172 #endif /* !ACE_LACKS_FORK */
173 #if defined (ACE_HAS_THREADS)
174 else if (ACE_OS::strcmp (get_opt.opt_arg (),
175 ACE_TEXT ("THREAD")) == 0)
176 OPTIONS::instance ()->concurrency_type (Options::THREAD);
177 #endif /* ACE_HAS_THREADS */
178 else
179 ACE_DEBUG ((LM_DEBUG,
180 ACE_TEXT ("WARNING: concurrency strategy \"%s\" ")
181 ACE_TEXT ("is not supported\n"),
182 get_opt.opt_arg ()));
183 break;
184 case 'f':
185 ACE_OS::strcpy (this->filename_, get_opt.opt_arg ());
186 break;
187 default:
188 ACE_DEBUG ((LM_DEBUG,
189 ACE_TEXT ("usage: %n [-f (filename)] ")
190 ACE_TEXT ("[-c (concurrency strategy)]\n%a"), 1));
191 /* NOTREACHED */
194 // Initialize the file lock. Note that this object lives beyond the
195 // lifetime of the Acceptor.
196 if (this->file_lock_.open (this->filename_,
197 O_RDWR | O_CREAT,
198 ACE_DEFAULT_FILE_PERMS) == -1)
199 ACE_ERROR_RETURN ((LM_ERROR,
200 ACE_TEXT ("%p\n"),
201 ACE_TEXT ("open")),
202 -1);
204 ACE_DEBUG ((LM_DEBUG,
205 ACE_TEXT ("(%P|%t) opening %s on handle %d.\n"),
206 this->filename_,
207 this->file_lock_.get_handle ()));
209 int count = 0;
211 // Store the initial value of the count in the file.
212 if (ACE_OS::write (this->file_lock_.get_handle (),
213 (const void *) &count,
214 sizeof count) != (ssize_t) sizeof count)
215 ACE_ERROR ((LM_ERROR,
216 ACE_TEXT ("(%P|%t) %p\n"),
217 ACE_TEXT ("write")));
219 // Initialize the Concurrency strategy.
220 switch (this->concurrency_type_)
222 case Options::PROCESS:
223 #if !defined (ACE_LACKS_FORK)
224 ACE_NEW_RETURN (this->concurrency_strategy_,
225 Process_Strategy (1,
226 this,
227 ACE_Reactor::instance (),
228 1), // Avoid zombies.
229 -1);
230 break;
231 #else
232 ACE_TEST_ASSERT ("PROCESS invalid on this platform" == 0);
233 #endif /* !defined (ACE_LACKS_FORK) */
234 case Options::THREAD:
235 #if defined (ACE_HAS_THREADS)
236 ACE_NEW_RETURN (this->concurrency_strategy_,
237 ACE_Thread_Strategy<Counting_Service>
238 (ACE_Thread_Manager::instance (),
239 THR_NEW_LWP,
241 -1);
242 break;
243 #else
244 ACE_TEST_ASSERT (!"THREAD invalid on this platform");
245 #endif /* !ACE_HAS_THREADS */
246 case Options::REACTIVE:
247 // Settle for the purely Reactive strategy.
248 ACE_NEW_RETURN (this->concurrency_strategy_,
249 ACE_Reactive_Strategy<Counting_Service>
250 (ACE_Reactor::instance ()),
251 -1);
252 break;
255 return 0;
258 Options::Concurrency_Type
259 Options::concurrency_type ()
261 return this->concurrency_type_;
264 void
265 Options::concurrency_type (Options::Concurrency_Type cs)
267 this->concurrency_type_ = cs;
270 Counting_Service::Counting_Service (ACE_Thread_Manager *)
272 ACE_DEBUG ((LM_DEBUG,
273 ACE_TEXT ("(%P|%t) creating the Counting_Service\n")));
276 // Read the current value from the shared file and return it to the
277 // client.
280 Counting_Service::read ()
282 ACE_READ_GUARD_RETURN (ACE_File_Lock, ace_mon, OPTIONS::instance ()->file_lock (), -1);
284 ACE_DEBUG ((LM_DEBUG,
285 ACE_TEXT ("(%P|%t) reading on handle %d.\n"),
286 OPTIONS::instance ()->file_lock ().get_handle ()));
288 int count;
289 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
290 (void *) &count,
291 sizeof count,
292 0) != (ssize_t) sizeof count)
293 ACE_ERROR_RETURN ((LM_ERROR,
294 ACE_TEXT ("(%P|%t) %p\n"),
295 ACE_TEXT ("read")),
296 -1);
298 char buf[BUFSIZ];
299 int n = ACE_OS::snprintf (buf, BUFSIZ, "count = %d\n", count);
300 ACE_DEBUG ((LM_DEBUG,
301 ACE_TEXT ("(%P|%t) count = %d\n"),
302 count));
304 if (this->peer ().send_n (buf, n) != n)
305 ACE_ERROR_RETURN ((LM_ERROR,
306 ACE_TEXT ("(%P|%t) %p\n"),
307 ACE_TEXT ("send_n")),
308 -1);
309 return 0;
312 // Increment the current value in the shared file by 1.
315 Counting_Service::inc ()
317 ACE_WRITE_GUARD_RETURN (ACE_File_Lock, ace_mon,
318 OPTIONS::instance ()->file_lock (), -1);
320 ACE_DEBUG ((LM_DEBUG,
321 ACE_TEXT ("(%P|%t) incrementing on handle %d.\n"),
322 OPTIONS::instance ()->file_lock ().get_handle ()));
324 int count;
325 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
326 (void *) &count,
327 sizeof count,
328 0) != (ssize_t) sizeof count)
329 ACE_ERROR_RETURN ((LM_ERROR,
330 ACE_TEXT ("(%P|%t) %p\n"),
331 ACE_TEXT ("read")),
332 -1);
334 ACE_DEBUG ((LM_DEBUG,
335 ACE_TEXT ("(%P|%t) incrementing count from %d to %d\n"),
336 count,
337 count + 1));
338 count++;
340 if (ACE_OS::pwrite (OPTIONS::instance ()->file_lock ().get_handle (),
341 (const void *) &count,
342 sizeof count,
343 0) != (ssize_t) sizeof count)
344 ACE_ERROR_RETURN ((LM_ERROR,
345 ACE_TEXT ("(%P|%t) %p\n"),
346 ACE_TEXT ("write")),
347 -1);
348 return 0;
351 // Receive the request from the client and call the appropriate
352 // operation.
355 Counting_Service::handle_input (ACE_HANDLE)
357 char buf[BUFSIZ];
358 ACE_Time_Value* timeout = 0;
360 ACE_DEBUG ((LM_DEBUG,
361 ACE_TEXT ("(%P|%t) reading from peer on %d\n"),
362 this->peer ().get_handle ()));
363 size_t len;
364 // Read the PDU length first.
365 ssize_t bytes = this->peer ().recv ((void *) &len,
366 sizeof len,
367 timeout);
368 if (bytes <= 0)
369 return -1;
371 bytes = this->peer ().recv (buf, len);
373 if (bytes <= 0 || buf[0] == (char) EOF)
374 return -1;
375 else
377 buf[len] = '\0';
378 ACE_DEBUG ((LM_DEBUG,
379 ACE_TEXT ("(%P|%t) %d bytes of input on %d is %*s\n"),
380 bytes,
381 this->peer ().get_handle (),
382 bytes,
383 ACE_TEXT_CHAR_TO_TCHAR (buf)));
384 // Read and return the current value in the file.
385 if (ACE_OS::strncmp (buf,
386 "read",
387 4) == 0)
388 return this->read ();
389 // Increment the current value in the file.
390 else if (ACE_OS::strncmp (buf, "inc", 3) == 0)
391 return this->inc ();
392 else
393 ACE_DEBUG ((LM_DEBUG,
394 ACE_TEXT ("(%P|%t) no match...\n")));
395 return 0;
400 Counting_Service::svc ()
402 ACE_DEBUG ((LM_DEBUG,
403 ACE_TEXT ("(%P|%t) handling thread\n")));
405 while (this->handle_input () >= 0)
406 continue;
408 return 0;
412 Counting_Service::handle_close (ACE_HANDLE,
413 ACE_Reactor_Mask)
415 // Count completed connections here only when the test is not in
416 // "process-per-connection" mode. In general, this should not be
417 // done here. Proper place for this is activate_svc_handler() but
418 // since only "process-per-connection" hooks into that function in
419 // other modes it's done here. The later creates a problem in
420 // "process-per-connection" mode since it calculates the same
421 // connection twice and as a result it cannot finalize gracefully.
422 if (OPTIONS::instance ()->concurrency_type () != Options::PROCESS)
424 // Done with another connection.
425 connection_completed ();
428 // Call down to base class
429 return ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::handle_close ();
432 // This method is called back by the <Acceptor> once the client has
433 // connected and the process is forked or spawned.
436 Counting_Service::open (void *)
438 ACE_DEBUG ((LM_DEBUG,
439 ACE_TEXT ("(%P|%t) opening service\n")));
441 if (OPTIONS::instance ()->concurrency_type () == Options::PROCESS)
443 // We need to rerun the event loop here since we ended up here
444 // due to being fork'd and we can't just return to our context
445 // because it's in the middle of a different event loop that
446 // won't behave properly since it's meant to handle connection
447 // establishment, *not* data transfer.
448 while (this->handle_input () >= 0)
449 continue;
451 ACE_DEBUG ((LM_DEBUG,
452 ACE_TEXT ("(%P|%t) About to exit from the child\n")));
454 // Exit the child.
455 ACE_OS::exit (0);
457 else if (OPTIONS::instance ()->concurrency_type () == Options::THREAD)
458 // We need to set this to 0 so that our <shutdown> method doesn't
459 // try to deregister <this> from the Reactor.
460 this->reactor (0);
461 return 0;
464 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
466 // Execute the client tests.
468 void *
469 client (void *arg)
471 ACE_INET_Addr *remote_addr =
472 reinterpret_cast<ACE_INET_Addr *> (arg);
473 ACE_INET_Addr server_addr (remote_addr->get_port_number (),
474 ACE_DEFAULT_SERVER_HOST);
475 ACE_SOCK_Stream stream;
476 ACE_SOCK_Connector connector;
478 char buf[BUFSIZ];
479 const char *command = 0;
480 size_t command_len;
481 size_t i;
483 for (i = 0; i < ACE_MAX_ITERATIONS; i++)
485 ACE_DEBUG ((LM_DEBUG,
486 ACE_TEXT ("(%P|%t) client iteration %d\n"),
487 i));
488 if (connector.connect (stream,
489 server_addr) == -1)
490 ACE_ERROR_RETURN ((LM_ERROR,
491 ACE_TEXT ("%p\n"),
492 ACE_TEXT ("open")),
494 command = "inc";
495 command_len = ACE_OS::strlen (command);
497 #ifndef ACE_LACKS_VA_FUNCTIONS
498 if (stream.send (4,
499 &command_len, sizeof command_len,
500 command, command_len) == -1)
501 ACE_ERROR_RETURN ((LM_ERROR,
502 ACE_TEXT ("%p\n"),
503 ACE_TEXT ("send")),
505 command = "read";
506 command_len = ACE_OS::strlen (command);
508 if (stream.send (4,
509 &command_len, sizeof command_len,
510 command, command_len) == -1)
511 ACE_ERROR_RETURN ((LM_ERROR,
512 ACE_TEXT ("%p\n"),
513 ACE_TEXT ("send")),
515 else if (stream.recv (buf, sizeof buf) <= 0)
516 ACE_ERROR_RETURN ((LM_ERROR,
517 ACE_TEXT ("(%P|%t) %p\n"),
518 ACE_TEXT ("recv")),
520 #else
521 ACE_UNUSED_ARG (command_len);
522 #endif
523 // ACE_DEBUG ((LM_DEBUG,
524 // ACE_TEXT ("(%P|%t) client iteration %d, buf = %C\n"),
525 // i, buf));
527 if (stream.close () == -1)
528 ACE_ERROR_RETURN ((LM_ERROR,
529 ACE_TEXT ("%p\n"),
530 ACE_TEXT ("close")),
534 command = "read";
535 command_len = ACE_OS::strlen (command);
536 ssize_t bytes_read = 0;
538 if (connector.connect (stream, server_addr) == -1)
539 ACE_ERROR_RETURN ((LM_ERROR,
540 ACE_TEXT ("%p\n"),
541 ACE_TEXT ("open")),
543 #ifndef ACE_LACKS_VA_FUNCTIONS
544 else if (stream.send (4,
545 &command_len, sizeof command_len,
546 command, command_len) == -1)
547 ACE_ERROR_RETURN ((LM_ERROR,
548 ACE_TEXT ("%p\n"),
549 ACE_TEXT ("send")),
551 #endif
552 else if ((bytes_read = stream.recv (buf, sizeof buf)) <= 0)
553 ACE_ERROR_RETURN ((LM_ERROR,
554 ACE_TEXT ("%p\n"),
555 ACE_TEXT ("recv")),
557 else
559 // Null terminate buf to avoid an uninitialized memory read in
560 // the call to ACE_OS::strrchr ().
561 buf [bytes_read] = '\0';
563 size_t count = ACE_OS::atoi (ACE_OS::strrchr (ACE_TEXT_CHAR_TO_TCHAR (buf),
564 ACE_TEXT (' ')));
566 ACE_DEBUG ((LM_DEBUG,
567 ACE_TEXT ("(%P|%t) count = %d\n"),
568 count));
569 // Make sure that the count is correct.
570 if (count != ACE_MAX_ITERATIONS)
572 ACE_ERROR_RETURN ((LM_ERROR,
573 ACE_TEXT ("Error: Count invalid, has %d expected %d\n"),
574 count, ACE_MAX_ITERATIONS),
579 if (stream.close () == -1)
580 ACE_ERROR_RETURN ((LM_ERROR,
581 ACE_TEXT ("%p\n"),
582 ACE_TEXT ("close")),
585 // Remove the filename.
586 ACE_OS::unlink (OPTIONS::instance ()->filename ());
588 return 0;
591 // Performs the server activities.
593 // Have all connections been serviced?
596 done ()
598 return connections == ACE_MAX_ITERATIONS + 1;
601 void *
602 server (void *)
604 int result = 0;
605 ACE_Reactor::instance ()->owner (ACE_Thread::self ());
607 while (!done () && result != -1)
608 // Run the main event loop.
609 result = ACE_Reactor::instance ()->handle_events ();
611 return 0;
614 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
617 run_main (int argc, ACE_TCHAR *argv[])
619 ACE_START_TEST (ACE_TEXT ("Process_Strategy_Test"));
621 if (OPTIONS::instance ()->parse_args (argc, argv) == -1)
622 ACE_ERROR_RETURN ((LM_ERROR,
623 ACE_TEXT ("%p\n"),
624 ACE_TEXT ("parse_args")),
625 -1);
627 #ifndef ACE_LACKS_ACCEPT
629 ACCEPTOR acceptor;
631 ACE_INET_Addr server_addr;
633 // Bind acceptor to any port and then find out what the port was.
634 // Note that this implicitly creates the Reactor singleton.
635 if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &),
636 ACE_Reactor::instance(),
639 OPTIONS::instance ()->concurrency_strategy ()) == -1
640 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
641 ACE_ERROR_RETURN ((LM_ERROR,
642 ACE_TEXT ("%p\n"),
643 ACE_TEXT ("open")),
644 -1);
645 else
647 ACE_DEBUG ((LM_DEBUG,
648 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
649 server_addr.get_port_number ()));
651 #if !defined (ACE_LACKS_FORK)
652 // We're running the client and serve as separate processes.
653 pid_t pid = ACE::fork (ACE_TEXT ("child"),
654 1); // Avoid zombies.
656 switch (pid)
658 case -1:
659 ACE_ERROR ((LM_ERROR,
660 ACE_TEXT ("(%P|%t) %p\n%a"),
661 ACE_TEXT ("fork failed")));
662 ACE_OS::exit (-1);
663 /* NOTREACHED */
664 case 0:
665 client (&server_addr);
666 break;
667 /* NOTREACHED */
668 default:
669 server (0);
670 break;
671 /* NOTREACHED */
673 #elif defined (ACE_HAS_THREADS)
674 if (ACE_Thread_Manager::instance ()->spawn
675 (ACE_THR_FUNC (server),
676 (void *) 0,
677 THR_NEW_LWP | THR_DETACHED) == -1)
678 ACE_ERROR ((LM_ERROR,
679 ACE_TEXT ("(%P|%t) %p\n%a"),
680 ACE_TEXT ("thread create failed")));
682 if (ACE_Thread_Manager::instance ()->spawn
683 (ACE_THR_FUNC (client),
684 (void *) &server_addr,
685 THR_NEW_LWP | THR_DETACHED) == -1)
686 ACE_ERROR ((LM_ERROR,
687 ACE_TEXT ("(%P|%t) %p\n%a"),
688 ACE_TEXT ("thread create failed")));
690 // Wait for the threads to exit.
691 ACE_Thread_Manager::instance ()->wait ();
692 #else
693 ACE_ERROR ((LM_INFO,
694 ACE_TEXT ("(%P|%t) only one thread may be run ")
695 ACE_TEXT ("in a process on this platform\n")));
696 #endif /* ACE_HAS_THREADS */
699 #endif // ACE_LACKS_ACCEPT
700 ACE_END_TEST;
701 return 0;