Make x.0.10 publicly available
[ACE_TAO.git] / ACE / tests / Process_Strategy_Test.cpp
blob8b29d6343943a37b6b6a71dd2a09a1361557ff9a
2 //=============================================================================
3 /**
4 * @file Process_Strategy_Test.cpp
6 * This is a test of the <ACE_Strategy_Acceptor> and
7 * <ACE_File_Lock> classes. The <ACE_Strategy_Acceptor> uses
8 * either the <ACE_Process_Strategy> (which forks a
9 * process-per-connection and runs as a concurrent server
10 * process), the <ACE_Thread_Strategy> (which spawns a
11 * thread-per-connection and runs as a concurrent server thread),
12 * or <ACE_Reactive_Strategy> (which register the <Svc_Handler>
13 * with the <Reactor> and runs in the main thread of control as an
14 * iterative server). This server queries and increments a
15 * "counting value" in a file.
17 * This test program can be run in the following ways:
19 * # Run the server "reactively" (i.e., iteratively)
20 * % Process_Strategy_Test -c REACTIVE
22 * # Run the server in multi-threads.
23 * % Process_Strategy_Test -c THREAD
25 * # Run the server in multi-processes
26 * % Process_Strategy_Test -c PROCESS
28 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Kevin Boyle <kboyle@sanwafp.com>
30 //=============================================================================
32 #include "test_config.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/Acceptor.h"
36 #include "ace/Handle_Set.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/SOCK_Acceptor.h"
41 #include "ace/SOCK_Connector.h"
42 #include "ace/Strategies_T.h"
43 #include "ace/Singleton.h"
44 #include "ace/Synch_Traits.h"
45 #include "ace/File_Lock.h"
46 #include "ace/Lib_Find.h"
48 // Counting_Service and Options in here
49 #include "Process_Strategy_Test.h"
51 // This test does not function properly when fork() is used on HP-UX
52 #if defined(__hpux)
53 #define ACE_LACKS_FORK
54 #endif /* __hpux */
56 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton, Options, ACE_Null_Mutex);
58 // Define a <Strategy_Acceptor> that's parameterized by the
59 // <Counting_Service>.
61 using ACCEPTOR = ACE_Strategy_Acceptor<Counting_Service, ACE_SOCK_Acceptor>;
63 // Create an Options Singleton.
64 using OPTIONS = ACE_Singleton<Options, ACE_Null_Mutex>;
66 // counter for connections
67 static size_t connections = 0;
69 // Use this to show down the process gracefully.
70 void
71 connection_completed ()
73 // Increment connection counter.
74 ++connections;
76 // If all connections have been serviced.
77 if (connections == ACE_MAX_ITERATIONS + 1)
78 // Make sure that the event loop is interrupted.
79 ACE_Reactor::instance()->wakeup_all_threads ();
82 // Constructor
83 Process_Strategy::Process_Strategy (size_t n_processes,
84 ACE_Event_Handler *acceptor,
85 ACE_Reactor *r,
86 int avoid_zombies)
87 : ACE_Process_Strategy<Counting_Service> (n_processes,
88 acceptor,
90 avoid_zombies)
94 // Destructor. g++ 2.7.2.3 gets very confused ("Internal compiler
95 // error") without it.
96 Process_Strategy::~Process_Strategy ()
100 // Overwrite the process creation method to include connection
101 // counting.
104 Process_Strategy::activate_svc_handler (Counting_Service *svc_handler,
105 void *arg)
107 // Call down to the base class
108 int const result =
109 ACE_Process_Strategy<Counting_Service>::activate_svc_handler (svc_handler,
110 arg);
111 // Connection is now complete
112 connection_completed ();
114 return result;
117 ACE_File_Lock &
118 Options::file_lock ()
120 return this->file_lock_;
123 ACE_Concurrency_Strategy <Counting_Service> *
124 Options::concurrency_strategy ()
126 return this->concurrency_strategy_;
129 const ACE_TCHAR *
130 Options::filename ()
132 return this->filename_;
135 Options::Options ()
137 // Choose to use processes by default.
138 #if !defined (ACE_LACKS_FORK)
139 concurrency_type_ (PROCESS)
140 #elif defined (ACE_HAS_THREADS)
141 concurrency_type_ (THREAD)
142 #else
143 concurrency_type_ (REACTIVE)
144 #endif /* !ACE_LACKS_FORK */
148 Options::~Options ()
150 delete this->concurrency_strategy_;
154 Options::parse_args (int argc, ACE_TCHAR *argv[])
156 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:c:f:"));
158 // - 26 is for the "process_strategy_test_temp" that is appended
159 if (ACE::get_temp_dir (this->filename_, MAXPATHLEN - 26) == -1)
160 ACE_ERROR_RETURN ((LM_ERROR,
161 ACE_TEXT ("Temporary path too long\n")),
162 -1);
164 ACE_OS::strcat (this->filename_, ACE_TEXT ("process_strategy_test_temp"));
166 for (int c; (c = get_opt ()) != -1; )
167 switch (c)
169 case 'c':
170 if (ACE_OS::strcmp (get_opt.opt_arg (),
171 ACE_TEXT ("REACTIVE")) == 0)
172 OPTIONS::instance ()->concurrency_type (Options::REACTIVE);
173 #if !defined (ACE_LACKS_FORK)
174 else if (ACE_OS::strcmp (get_opt.opt_arg (),
175 ACE_TEXT ("PROCESS")) == 0)
176 OPTIONS::instance ()->concurrency_type (Options::PROCESS);
177 #endif /* !ACE_LACKS_FORK */
178 #if defined (ACE_HAS_THREADS)
179 else if (ACE_OS::strcmp (get_opt.opt_arg (),
180 ACE_TEXT ("THREAD")) == 0)
181 OPTIONS::instance ()->concurrency_type (Options::THREAD);
182 #endif /* ACE_HAS_THREADS */
183 else
184 ACE_DEBUG ((LM_DEBUG,
185 ACE_TEXT ("WARNING: concurrency strategy \"%s\" ")
186 ACE_TEXT ("is not supported\n"),
187 get_opt.opt_arg ()));
188 break;
189 case 'f':
190 ACE_OS::strcpy (this->filename_, get_opt.opt_arg ());
191 break;
192 default:
193 ACE_DEBUG ((LM_DEBUG,
194 ACE_TEXT ("usage: %n [-f (filename)] ")
195 ACE_TEXT ("[-c (concurrency strategy)]\n%a"), 1));
196 /* NOTREACHED */
199 // Initialize the file lock. Note that this object lives beyond the
200 // lifetime of the Acceptor.
201 if (this->file_lock_.open (this->filename_,
202 O_RDWR | O_CREAT,
203 ACE_DEFAULT_FILE_PERMS) == -1)
204 ACE_ERROR_RETURN ((LM_ERROR,
205 ACE_TEXT ("%p\n"),
206 ACE_TEXT ("open")),
207 -1);
209 ACE_DEBUG ((LM_DEBUG,
210 ACE_TEXT ("(%P|%t) opening %s on handle %d.\n"),
211 this->filename_,
212 this->file_lock_.get_handle ()));
214 int count = 0;
216 // Store the initial value of the count in the file.
217 if (ACE_OS::write (this->file_lock_.get_handle (),
218 (const void *) &count,
219 sizeof count) != (ssize_t) sizeof count)
220 ACE_ERROR ((LM_ERROR,
221 ACE_TEXT ("(%P|%t) %p\n"),
222 ACE_TEXT ("write")));
224 // Initialize the Concurrency strategy.
225 switch (this->concurrency_type_)
227 case Options::PROCESS:
228 #if !defined (ACE_LACKS_FORK)
229 ACE_NEW_RETURN (this->concurrency_strategy_,
230 Process_Strategy (1,
231 this,
232 ACE_Reactor::instance (),
233 1), // Avoid zombies.
234 -1);
235 break;
236 #else
237 ACE_TEST_ASSERT ("PROCESS invalid on this platform" == 0);
238 #endif /* !defined (ACE_LACKS_FORK) */
239 case Options::THREAD:
240 #if defined (ACE_HAS_THREADS)
241 ACE_NEW_RETURN (this->concurrency_strategy_,
242 ACE_Thread_Strategy<Counting_Service>
243 (ACE_Thread_Manager::instance (),
244 THR_NEW_LWP,
246 -1);
247 break;
248 #else
249 ACE_TEST_ASSERT (!"THREAD invalid on this platform");
250 #endif /* !ACE_HAS_THREADS */
251 case Options::REACTIVE:
252 // Settle for the purely Reactive strategy.
253 ACE_NEW_RETURN (this->concurrency_strategy_,
254 ACE_Reactive_Strategy<Counting_Service>
255 (ACE_Reactor::instance ()),
256 -1);
257 break;
260 return 0;
263 Options::Concurrency_Type
264 Options::concurrency_type ()
266 return this->concurrency_type_;
269 void
270 Options::concurrency_type (Options::Concurrency_Type cs)
272 this->concurrency_type_ = cs;
275 Counting_Service::Counting_Service (ACE_Thread_Manager *)
277 ACE_DEBUG ((LM_DEBUG,
278 ACE_TEXT ("(%P|%t) creating the Counting_Service\n")));
281 // Read the current value from the shared file and return it to the
282 // client.
285 Counting_Service::read ()
287 ACE_READ_GUARD_RETURN (ACE_File_Lock, ace_mon, OPTIONS::instance ()->file_lock (), -1);
289 ACE_DEBUG ((LM_DEBUG,
290 ACE_TEXT ("(%P|%t) reading on handle %d.\n"),
291 OPTIONS::instance ()->file_lock ().get_handle ()));
293 int count;
294 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
295 (void *) &count,
296 sizeof count,
297 0) != (ssize_t) sizeof count)
298 ACE_ERROR_RETURN ((LM_ERROR,
299 ACE_TEXT ("(%P|%t) %p\n"),
300 ACE_TEXT ("read")),
301 -1);
303 char buf[BUFSIZ];
304 int n = ACE_OS::snprintf (buf, BUFSIZ, "count = %d\n", count);
305 ACE_DEBUG ((LM_DEBUG,
306 ACE_TEXT ("(%P|%t) count = %d\n"),
307 count));
309 if (this->peer ().send_n (buf, n) != n)
310 ACE_ERROR_RETURN ((LM_ERROR,
311 ACE_TEXT ("(%P|%t) %p\n"),
312 ACE_TEXT ("send_n")),
313 -1);
314 return 0;
317 // Increment the current value in the shared file by 1.
320 Counting_Service::inc ()
322 ACE_WRITE_GUARD_RETURN (ACE_File_Lock, ace_mon,
323 OPTIONS::instance ()->file_lock (), -1);
325 ACE_DEBUG ((LM_DEBUG,
326 ACE_TEXT ("(%P|%t) incrementing on handle %d.\n"),
327 OPTIONS::instance ()->file_lock ().get_handle ()));
329 int count;
330 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
331 (void *) &count,
332 sizeof count,
333 0) != (ssize_t) sizeof count)
334 ACE_ERROR_RETURN ((LM_ERROR,
335 ACE_TEXT ("(%P|%t) %p\n"),
336 ACE_TEXT ("read")),
337 -1);
339 ACE_DEBUG ((LM_DEBUG,
340 ACE_TEXT ("(%P|%t) incrementing count from %d to %d\n"),
341 count,
342 count + 1));
343 count++;
345 if (ACE_OS::pwrite (OPTIONS::instance ()->file_lock ().get_handle (),
346 (const void *) &count,
347 sizeof count,
348 0) != (ssize_t) sizeof count)
349 ACE_ERROR_RETURN ((LM_ERROR,
350 ACE_TEXT ("(%P|%t) %p\n"),
351 ACE_TEXT ("write")),
352 -1);
353 return 0;
356 // Receive the request from the client and call the appropriate
357 // operation.
360 Counting_Service::handle_input (ACE_HANDLE)
362 char buf[BUFSIZ];
363 ACE_Time_Value* timeout = 0;
364 #if defined (__hpux)
365 // Even though we're in handle_input, there seems to be a
366 // situation on HP-UX where there is nothing to recv just yet.
367 // So, we recv() with a timeout and everything works.
368 ACE_Time_Value hpux_timeout (3);
369 timeout = &hpux_timeout;
370 #endif /* __hpux */
372 ACE_DEBUG ((LM_DEBUG,
373 ACE_TEXT ("(%P|%t) reading from peer on %d\n"),
374 this->peer ().get_handle ()));
375 size_t len;
376 // Read the PDU length first.
377 ssize_t bytes = this->peer ().recv ((void *) &len,
378 sizeof len,
379 timeout);
380 if (bytes <= 0)
381 return -1;
383 bytes = this->peer ().recv (buf, len);
385 if (bytes <= 0 || buf[0] == (char) EOF)
386 return -1;
387 else
389 buf[len] = '\0';
390 ACE_DEBUG ((LM_DEBUG,
391 ACE_TEXT ("(%P|%t) %d bytes of input on %d is %*s\n"),
392 bytes,
393 this->peer ().get_handle (),
394 bytes,
395 ACE_TEXT_CHAR_TO_TCHAR (buf)));
396 // Read and return the current value in the file.
397 if (ACE_OS::strncmp (buf,
398 "read",
399 4) == 0)
400 return this->read ();
401 // Increment the current value in the file.
402 else if (ACE_OS::strncmp (buf, "inc", 3) == 0)
403 return this->inc ();
404 else
405 ACE_DEBUG ((LM_DEBUG,
406 ACE_TEXT ("(%P|%t) no match...\n")));
407 return 0;
412 Counting_Service::svc ()
414 ACE_DEBUG ((LM_DEBUG,
415 ACE_TEXT ("(%P|%t) handling thread\n")));
417 while (this->handle_input () >= 0)
418 continue;
420 return 0;
424 Counting_Service::handle_close (ACE_HANDLE,
425 ACE_Reactor_Mask)
427 // Count completed connections here only when the test is not in
428 // "process-per-connection" mode. In general, this should not be
429 // done here. Proper place for this is activate_svc_handler() but
430 // since only "process-per-connection" hooks into that function in
431 // other modes it's done here. The later creates a problem in
432 // "process-per-connection" mode since it calculates the same
433 // connection twice and as a result it cannot finalize gracefully.
434 if (OPTIONS::instance ()->concurrency_type () != Options::PROCESS)
436 // Done with another connection.
437 connection_completed ();
440 // Call down to base class
441 return ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::handle_close ();
444 // This method is called back by the <Acceptor> once the client has
445 // connected and the process is forked or spawned.
448 Counting_Service::open (void *)
450 ACE_DEBUG ((LM_DEBUG,
451 ACE_TEXT ("(%P|%t) opening service\n")));
453 if (OPTIONS::instance ()->concurrency_type () == Options::PROCESS)
455 // We need to rerun the event loop here since we ended up here
456 // due to being fork'd and we can't just return to our context
457 // because it's in the middle of a different event loop that
458 // won't behave properly since it's meant to handle connection
459 // establishment, *not* data transfer.
460 while (this->handle_input () >= 0)
461 continue;
463 ACE_DEBUG ((LM_DEBUG,
464 ACE_TEXT ("(%P|%t) About to exit from the child\n")));
466 // Exit the child.
467 ACE_OS::exit (0);
469 else if (OPTIONS::instance ()->concurrency_type () == Options::THREAD)
470 // We need to set this to 0 so that our <shutdown> method doesn't
471 // try to deregister <this> from the Reactor.
472 this->reactor (0);
473 return 0;
476 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
478 // Execute the client tests.
480 void *
481 client (void *arg)
483 ACE_INET_Addr *remote_addr =
484 reinterpret_cast<ACE_INET_Addr *> (arg);
485 ACE_INET_Addr server_addr (remote_addr->get_port_number (),
486 ACE_DEFAULT_SERVER_HOST);
487 ACE_SOCK_Stream stream;
488 ACE_SOCK_Connector connector;
490 char buf[BUFSIZ];
491 const char *command = 0;
492 size_t command_len;
493 size_t i;
495 for (i = 0; i < ACE_MAX_ITERATIONS; i++)
497 ACE_DEBUG ((LM_DEBUG,
498 ACE_TEXT ("(%P|%t) client iteration %d\n"),
499 i));
500 if (connector.connect (stream,
501 server_addr) == -1)
502 ACE_ERROR_RETURN ((LM_ERROR,
503 ACE_TEXT ("%p\n"),
504 ACE_TEXT ("open")),
506 command = "inc";
507 command_len = ACE_OS::strlen (command);
509 #ifndef ACE_LACKS_VA_FUNCTIONS
510 if (stream.send (4,
511 &command_len, sizeof command_len,
512 command, command_len) == -1)
513 ACE_ERROR_RETURN ((LM_ERROR,
514 ACE_TEXT ("%p\n"),
515 ACE_TEXT ("send")),
517 command = "read";
518 command_len = ACE_OS::strlen (command);
520 if (stream.send (4,
521 &command_len, sizeof command_len,
522 command, command_len) == -1)
523 ACE_ERROR_RETURN ((LM_ERROR,
524 ACE_TEXT ("%p\n"),
525 ACE_TEXT ("send")),
527 else if (stream.recv (buf, sizeof buf) <= 0)
528 ACE_ERROR_RETURN ((LM_ERROR,
529 ACE_TEXT ("(%P|%t) %p\n"),
530 ACE_TEXT ("recv")),
532 #else
533 ACE_UNUSED_ARG (command_len);
534 #endif
535 // ACE_DEBUG ((LM_DEBUG,
536 // ACE_TEXT ("(%P|%t) client iteration %d, buf = %C\n"),
537 // i, buf));
539 if (stream.close () == -1)
540 ACE_ERROR_RETURN ((LM_ERROR,
541 ACE_TEXT ("%p\n"),
542 ACE_TEXT ("close")),
546 command = "read";
547 command_len = ACE_OS::strlen (command);
548 ssize_t bytes_read = 0;
550 if (connector.connect (stream, server_addr) == -1)
551 ACE_ERROR_RETURN ((LM_ERROR,
552 ACE_TEXT ("%p\n"),
553 ACE_TEXT ("open")),
555 #ifndef ACE_LACKS_VA_FUNCTIONS
556 else if (stream.send (4,
557 &command_len, sizeof command_len,
558 command, command_len) == -1)
559 ACE_ERROR_RETURN ((LM_ERROR,
560 ACE_TEXT ("%p\n"),
561 ACE_TEXT ("send")),
563 #endif
564 else if ((bytes_read = stream.recv (buf, sizeof buf)) <= 0)
565 ACE_ERROR_RETURN ((LM_ERROR,
566 ACE_TEXT ("%p\n"),
567 ACE_TEXT ("recv")),
569 else
571 // Null terminate buf to avoid an uninitialized memory read in
572 // the call to ACE_OS::strrchr ().
573 buf [bytes_read] = '\0';
575 size_t count = ACE_OS::atoi (ACE_OS::strrchr (ACE_TEXT_CHAR_TO_TCHAR (buf),
576 ACE_TEXT (' ')));
578 ACE_DEBUG ((LM_DEBUG,
579 ACE_TEXT ("(%P|%t) count = %d\n"),
580 count));
581 // Make sure that the count is correct.
582 if (count != ACE_MAX_ITERATIONS)
584 ACE_ERROR_RETURN ((LM_ERROR,
585 ACE_TEXT ("Error: Count invalid, has %d expected %d\n"),
586 count, ACE_MAX_ITERATIONS),
591 if (stream.close () == -1)
592 ACE_ERROR_RETURN ((LM_ERROR,
593 ACE_TEXT ("%p\n"),
594 ACE_TEXT ("close")),
597 // Remove the filename.
598 ACE_OS::unlink (OPTIONS::instance ()->filename ());
600 return 0;
603 // Performs the server activities.
605 // Have all connections been serviced?
608 done ()
610 return connections == ACE_MAX_ITERATIONS + 1;
613 void *
614 server (void *)
616 int result = 0;
617 ACE_Reactor::instance ()->owner (ACE_Thread::self ());
619 while (!done () && result != -1)
620 // Run the main event loop.
621 result = ACE_Reactor::instance ()->handle_events ();
623 return 0;
626 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
629 run_main (int argc, ACE_TCHAR *argv[])
631 ACE_START_TEST (ACE_TEXT ("Process_Strategy_Test"));
633 if (OPTIONS::instance ()->parse_args (argc, argv) == -1)
634 ACE_ERROR_RETURN ((LM_ERROR,
635 ACE_TEXT ("%p\n"),
636 ACE_TEXT ("parse_args")),
637 -1);
639 #ifndef ACE_LACKS_ACCEPT
641 ACCEPTOR acceptor;
643 ACE_INET_Addr server_addr;
645 // Bind acceptor to any port and then find out what the port was.
646 // Note that this implicitly creates the Reactor singleton.
647 if (acceptor.open (ACE_sap_any_cast (const ACE_INET_Addr &),
648 ACE_Reactor::instance(),
651 OPTIONS::instance ()->concurrency_strategy ()) == -1
652 || acceptor.acceptor ().get_local_addr (server_addr) == -1)
653 ACE_ERROR_RETURN ((LM_ERROR,
654 ACE_TEXT ("%p\n"),
655 ACE_TEXT ("open")),
656 -1);
657 else
659 ACE_DEBUG ((LM_DEBUG,
660 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
661 server_addr.get_port_number ()));
663 #if !defined (ACE_LACKS_FORK)
664 // We're running the client and serve as separate processes.
665 pid_t pid = ACE::fork (ACE_TEXT ("child"),
666 1); // Avoid zombies.
668 switch (pid)
670 case -1:
671 ACE_ERROR ((LM_ERROR,
672 ACE_TEXT ("(%P|%t) %p\n%a"),
673 ACE_TEXT ("fork failed")));
674 ACE_OS::exit (-1);
675 /* NOTREACHED */
676 case 0:
677 client (&server_addr);
678 break;
679 /* NOTREACHED */
680 default:
681 server (0);
682 break;
683 /* NOTREACHED */
685 #elif defined (ACE_HAS_THREADS)
686 if (ACE_Thread_Manager::instance ()->spawn
687 (ACE_THR_FUNC (server),
688 (void *) 0,
689 THR_NEW_LWP | THR_DETACHED) == -1)
690 ACE_ERROR ((LM_ERROR,
691 ACE_TEXT ("(%P|%t) %p\n%a"),
692 ACE_TEXT ("thread create failed")));
694 if (ACE_Thread_Manager::instance ()->spawn
695 (ACE_THR_FUNC (client),
696 (void *) &server_addr,
697 THR_NEW_LWP | THR_DETACHED) == -1)
698 ACE_ERROR ((LM_ERROR,
699 ACE_TEXT ("(%P|%t) %p\n%a"),
700 ACE_TEXT ("thread create failed")));
702 // Wait for the threads to exit.
703 ACE_Thread_Manager::instance ()->wait ();
704 #else
705 ACE_ERROR ((LM_INFO,
706 ACE_TEXT ("(%P|%t) only one thread may be run ")
707 ACE_TEXT ("in a process on this platform\n")));
708 #endif /* ACE_HAS_THREADS */
711 #endif // ACE_LACKS_ACCEPT
712 ACE_END_TEST;
713 return 0;