2 //=============================================================================
4 * @file Process_Strategy_Test.cpp
6 * This is a test of the <ACE_Strategy_Acceptor> and
7 * <ACE_File_Lock> classes. The <ACE_Strategy_Acceptor> uses
8 * either the <ACE_Process_Strategy> (which forks a
9 * process-per-connection and runs as a concurrent server
10 * process), the <ACE_Thread_Strategy> (which spawns a
11 * thread-per-connection and runs as a concurrent server thread),
12 * or <ACE_Reactive_Strategy> (which register the <Svc_Handler>
13 * with the <Reactor> and runs in the main thread of control as an
14 * iterative server). This server queries and increments a
15 * "counting value" in a file.
17 * This test program can be run in the following ways:
19 * # Run the server "reactively" (i.e., iteratively)
20 * % Process_Strategy_Test -c REACTIVE
22 * # Run the server in multi-threads.
23 * % Process_Strategy_Test -c THREAD
25 * # Run the server in multi-processes
26 * % Process_Strategy_Test -c PROCESS
28 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Kevin Boyle <kboyle@sanwafp.com>
30 //=============================================================================
32 #include "test_config.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/Acceptor.h"
36 #include "ace/Handle_Set.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/SOCK_Acceptor.h"
41 #include "ace/SOCK_Connector.h"
42 #include "ace/Strategies_T.h"
43 #include "ace/Singleton.h"
44 #include "ace/Synch_Traits.h"
45 #include "ace/File_Lock.h"
46 #include "ace/Lib_Find.h"
48 // Counting_Service and Options in here
49 #include "Process_Strategy_Test.h"
51 // This test does not function properly when fork() is used on HP-UX
53 #define ACE_LACKS_FORK
56 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, Options
, ACE_Null_Mutex
);
58 // Define a <Strategy_Acceptor> that's parameterized by the
59 // <Counting_Service>.
61 using ACCEPTOR
= ACE_Strategy_Acceptor
<Counting_Service
, ACE_SOCK_Acceptor
>;
63 // Create an Options Singleton.
64 using OPTIONS
= ACE_Singleton
<Options
, ACE_Null_Mutex
>;
66 // counter for connections
67 static size_t connections
= 0;
69 // Use this to show down the process gracefully.
71 connection_completed ()
73 // Increment connection counter.
76 // If all connections have been serviced.
77 if (connections
== ACE_MAX_ITERATIONS
+ 1)
78 // Make sure that the event loop is interrupted.
79 ACE_Reactor::instance()->wakeup_all_threads ();
83 Process_Strategy::Process_Strategy (size_t n_processes
,
84 ACE_Event_Handler
*acceptor
,
87 : ACE_Process_Strategy
<Counting_Service
> (n_processes
,
94 // Destructor. g++ 2.7.2.3 gets very confused ("Internal compiler
95 // error") without it.
96 Process_Strategy::~Process_Strategy ()
100 // Overwrite the process creation method to include connection
104 Process_Strategy::activate_svc_handler (Counting_Service
*svc_handler
,
107 // Call down to the base class
109 ACE_Process_Strategy
<Counting_Service
>::activate_svc_handler (svc_handler
,
111 // Connection is now complete
112 connection_completed ();
118 Options::file_lock ()
120 return this->file_lock_
;
123 ACE_Concurrency_Strategy
<Counting_Service
> *
124 Options::concurrency_strategy ()
126 return this->concurrency_strategy_
;
132 return this->filename_
;
137 // Choose to use processes by default.
138 #if !defined (ACE_LACKS_FORK)
139 concurrency_type_ (PROCESS
)
140 #elif defined (ACE_HAS_THREADS)
141 concurrency_type_ (THREAD
)
143 concurrency_type_ (REACTIVE
)
144 #endif /* !ACE_LACKS_FORK */
150 delete this->concurrency_strategy_
;
154 Options::parse_args (int argc
, ACE_TCHAR
*argv
[])
156 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("p:c:f:"));
158 // - 26 is for the "process_strategy_test_temp" that is appended
159 if (ACE::get_temp_dir (this->filename_
, MAXPATHLEN
- 26) == -1)
160 ACE_ERROR_RETURN ((LM_ERROR
,
161 ACE_TEXT ("Temporary path too long\n")),
164 ACE_OS::strcat (this->filename_
, ACE_TEXT ("process_strategy_test_temp"));
166 for (int c
; (c
= get_opt ()) != -1; )
170 if (ACE_OS::strcmp (get_opt
.opt_arg (),
171 ACE_TEXT ("REACTIVE")) == 0)
172 OPTIONS::instance ()->concurrency_type (Options::REACTIVE
);
173 #if !defined (ACE_LACKS_FORK)
174 else if (ACE_OS::strcmp (get_opt
.opt_arg (),
175 ACE_TEXT ("PROCESS")) == 0)
176 OPTIONS::instance ()->concurrency_type (Options::PROCESS
);
177 #endif /* !ACE_LACKS_FORK */
178 #if defined (ACE_HAS_THREADS)
179 else if (ACE_OS::strcmp (get_opt
.opt_arg (),
180 ACE_TEXT ("THREAD")) == 0)
181 OPTIONS::instance ()->concurrency_type (Options::THREAD
);
182 #endif /* ACE_HAS_THREADS */
184 ACE_DEBUG ((LM_DEBUG
,
185 ACE_TEXT ("WARNING: concurrency strategy \"%s\" ")
186 ACE_TEXT ("is not supported\n"),
187 get_opt
.opt_arg ()));
190 ACE_OS::strcpy (this->filename_
, get_opt
.opt_arg ());
193 ACE_DEBUG ((LM_DEBUG
,
194 ACE_TEXT ("usage: %n [-f (filename)] ")
195 ACE_TEXT ("[-c (concurrency strategy)]\n%a"), 1));
199 // Initialize the file lock. Note that this object lives beyond the
200 // lifetime of the Acceptor.
201 if (this->file_lock_
.open (this->filename_
,
203 ACE_DEFAULT_FILE_PERMS
) == -1)
204 ACE_ERROR_RETURN ((LM_ERROR
,
209 ACE_DEBUG ((LM_DEBUG
,
210 ACE_TEXT ("(%P|%t) opening %s on handle %d.\n"),
212 this->file_lock_
.get_handle ()));
216 // Store the initial value of the count in the file.
217 if (ACE_OS::write (this->file_lock_
.get_handle (),
218 (const void *) &count
,
219 sizeof count
) != (ssize_t
) sizeof count
)
220 ACE_ERROR ((LM_ERROR
,
221 ACE_TEXT ("(%P|%t) %p\n"),
222 ACE_TEXT ("write")));
224 // Initialize the Concurrency strategy.
225 switch (this->concurrency_type_
)
227 case Options::PROCESS
:
228 #if !defined (ACE_LACKS_FORK)
229 ACE_NEW_RETURN (this->concurrency_strategy_
,
232 ACE_Reactor::instance (),
233 1), // Avoid zombies.
237 ACE_TEST_ASSERT ("PROCESS invalid on this platform" == 0);
238 #endif /* !defined (ACE_LACKS_FORK) */
239 case Options::THREAD
:
240 #if defined (ACE_HAS_THREADS)
241 ACE_NEW_RETURN (this->concurrency_strategy_
,
242 ACE_Thread_Strategy
<Counting_Service
>
243 (ACE_Thread_Manager::instance (),
249 ACE_TEST_ASSERT (!"THREAD invalid on this platform");
250 #endif /* !ACE_HAS_THREADS */
251 case Options::REACTIVE
:
252 // Settle for the purely Reactive strategy.
253 ACE_NEW_RETURN (this->concurrency_strategy_
,
254 ACE_Reactive_Strategy
<Counting_Service
>
255 (ACE_Reactor::instance ()),
263 Options::Concurrency_Type
264 Options::concurrency_type ()
266 return this->concurrency_type_
;
270 Options::concurrency_type (Options::Concurrency_Type cs
)
272 this->concurrency_type_
= cs
;
275 Counting_Service::Counting_Service (ACE_Thread_Manager
*)
277 ACE_DEBUG ((LM_DEBUG
,
278 ACE_TEXT ("(%P|%t) creating the Counting_Service\n")));
281 // Read the current value from the shared file and return it to the
285 Counting_Service::read ()
287 ACE_READ_GUARD_RETURN (ACE_File_Lock
, ace_mon
, OPTIONS::instance ()->file_lock (), -1);
289 ACE_DEBUG ((LM_DEBUG
,
290 ACE_TEXT ("(%P|%t) reading on handle %d.\n"),
291 OPTIONS::instance ()->file_lock ().get_handle ()));
294 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
297 0) != (ssize_t
) sizeof count
)
298 ACE_ERROR_RETURN ((LM_ERROR
,
299 ACE_TEXT ("(%P|%t) %p\n"),
304 int n
= ACE_OS::snprintf (buf
, BUFSIZ
, "count = %d\n", count
);
305 ACE_DEBUG ((LM_DEBUG
,
306 ACE_TEXT ("(%P|%t) count = %d\n"),
309 if (this->peer ().send_n (buf
, n
) != n
)
310 ACE_ERROR_RETURN ((LM_ERROR
,
311 ACE_TEXT ("(%P|%t) %p\n"),
312 ACE_TEXT ("send_n")),
317 // Increment the current value in the shared file by 1.
320 Counting_Service::inc ()
322 ACE_WRITE_GUARD_RETURN (ACE_File_Lock
, ace_mon
,
323 OPTIONS::instance ()->file_lock (), -1);
325 ACE_DEBUG ((LM_DEBUG
,
326 ACE_TEXT ("(%P|%t) incrementing on handle %d.\n"),
327 OPTIONS::instance ()->file_lock ().get_handle ()));
330 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
333 0) != (ssize_t
) sizeof count
)
334 ACE_ERROR_RETURN ((LM_ERROR
,
335 ACE_TEXT ("(%P|%t) %p\n"),
339 ACE_DEBUG ((LM_DEBUG
,
340 ACE_TEXT ("(%P|%t) incrementing count from %d to %d\n"),
345 if (ACE_OS::pwrite (OPTIONS::instance ()->file_lock ().get_handle (),
346 (const void *) &count
,
348 0) != (ssize_t
) sizeof count
)
349 ACE_ERROR_RETURN ((LM_ERROR
,
350 ACE_TEXT ("(%P|%t) %p\n"),
356 // Receive the request from the client and call the appropriate
360 Counting_Service::handle_input (ACE_HANDLE
)
363 ACE_Time_Value
* timeout
= 0;
365 // Even though we're in handle_input, there seems to be a
366 // situation on HP-UX where there is nothing to recv just yet.
367 // So, we recv() with a timeout and everything works.
368 ACE_Time_Value
hpux_timeout (3);
369 timeout
= &hpux_timeout
;
372 ACE_DEBUG ((LM_DEBUG
,
373 ACE_TEXT ("(%P|%t) reading from peer on %d\n"),
374 this->peer ().get_handle ()));
376 // Read the PDU length first.
377 ssize_t bytes
= this->peer ().recv ((void *) &len
,
383 bytes
= this->peer ().recv (buf
, len
);
385 if (bytes
<= 0 || buf
[0] == (char) EOF
)
390 ACE_DEBUG ((LM_DEBUG
,
391 ACE_TEXT ("(%P|%t) %d bytes of input on %d is %*s\n"),
393 this->peer ().get_handle (),
395 ACE_TEXT_CHAR_TO_TCHAR (buf
)));
396 // Read and return the current value in the file.
397 if (ACE_OS::strncmp (buf
,
400 return this->read ();
401 // Increment the current value in the file.
402 else if (ACE_OS::strncmp (buf
, "inc", 3) == 0)
405 ACE_DEBUG ((LM_DEBUG
,
406 ACE_TEXT ("(%P|%t) no match...\n")));
412 Counting_Service::svc ()
414 ACE_DEBUG ((LM_DEBUG
,
415 ACE_TEXT ("(%P|%t) handling thread\n")));
417 while (this->handle_input () >= 0)
424 Counting_Service::handle_close (ACE_HANDLE
,
427 // Count completed connections here only when the test is not in
428 // "process-per-connection" mode. In general, this should not be
429 // done here. Proper place for this is activate_svc_handler() but
430 // since only "process-per-connection" hooks into that function in
431 // other modes it's done here. The later creates a problem in
432 // "process-per-connection" mode since it calculates the same
433 // connection twice and as a result it cannot finalize gracefully.
434 if (OPTIONS::instance ()->concurrency_type () != Options::PROCESS
)
436 // Done with another connection.
437 connection_completed ();
440 // Call down to base class
441 return ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>::handle_close ();
444 // This method is called back by the <Acceptor> once the client has
445 // connected and the process is forked or spawned.
448 Counting_Service::open (void *)
450 ACE_DEBUG ((LM_DEBUG
,
451 ACE_TEXT ("(%P|%t) opening service\n")));
453 if (OPTIONS::instance ()->concurrency_type () == Options::PROCESS
)
455 // We need to rerun the event loop here since we ended up here
456 // due to being fork'd and we can't just return to our context
457 // because it's in the middle of a different event loop that
458 // won't behave properly since it's meant to handle connection
459 // establishment, *not* data transfer.
460 while (this->handle_input () >= 0)
463 ACE_DEBUG ((LM_DEBUG
,
464 ACE_TEXT ("(%P|%t) About to exit from the child\n")));
469 else if (OPTIONS::instance ()->concurrency_type () == Options::THREAD
)
470 // We need to set this to 0 so that our <shutdown> method doesn't
471 // try to deregister <this> from the Reactor.
476 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
478 // Execute the client tests.
483 ACE_INET_Addr
*remote_addr
=
484 reinterpret_cast<ACE_INET_Addr
*> (arg
);
485 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
486 ACE_DEFAULT_SERVER_HOST
);
487 ACE_SOCK_Stream stream
;
488 ACE_SOCK_Connector connector
;
491 const char *command
= 0;
495 for (i
= 0; i
< ACE_MAX_ITERATIONS
; i
++)
497 ACE_DEBUG ((LM_DEBUG
,
498 ACE_TEXT ("(%P|%t) client iteration %d\n"),
500 if (connector
.connect (stream
,
502 ACE_ERROR_RETURN ((LM_ERROR
,
507 command_len
= ACE_OS::strlen (command
);
509 #ifndef ACE_LACKS_VA_FUNCTIONS
511 &command_len
, sizeof command_len
,
512 command
, command_len
) == -1)
513 ACE_ERROR_RETURN ((LM_ERROR
,
518 command_len
= ACE_OS::strlen (command
);
521 &command_len
, sizeof command_len
,
522 command
, command_len
) == -1)
523 ACE_ERROR_RETURN ((LM_ERROR
,
527 else if (stream
.recv (buf
, sizeof buf
) <= 0)
528 ACE_ERROR_RETURN ((LM_ERROR
,
529 ACE_TEXT ("(%P|%t) %p\n"),
533 ACE_UNUSED_ARG (command_len
);
535 // ACE_DEBUG ((LM_DEBUG,
536 // ACE_TEXT ("(%P|%t) client iteration %d, buf = %C\n"),
539 if (stream
.close () == -1)
540 ACE_ERROR_RETURN ((LM_ERROR
,
547 command_len
= ACE_OS::strlen (command
);
548 ssize_t bytes_read
= 0;
550 if (connector
.connect (stream
, server_addr
) == -1)
551 ACE_ERROR_RETURN ((LM_ERROR
,
555 #ifndef ACE_LACKS_VA_FUNCTIONS
556 else if (stream
.send (4,
557 &command_len
, sizeof command_len
,
558 command
, command_len
) == -1)
559 ACE_ERROR_RETURN ((LM_ERROR
,
564 else if ((bytes_read
= stream
.recv (buf
, sizeof buf
)) <= 0)
565 ACE_ERROR_RETURN ((LM_ERROR
,
571 // Null terminate buf to avoid an uninitialized memory read in
572 // the call to ACE_OS::strrchr ().
573 buf
[bytes_read
] = '\0';
575 size_t count
= ACE_OS::atoi (ACE_OS::strrchr (ACE_TEXT_CHAR_TO_TCHAR (buf
),
578 ACE_DEBUG ((LM_DEBUG
,
579 ACE_TEXT ("(%P|%t) count = %d\n"),
581 // Make sure that the count is correct.
582 if (count
!= ACE_MAX_ITERATIONS
)
584 ACE_ERROR_RETURN ((LM_ERROR
,
585 ACE_TEXT ("Error: Count invalid, has %d expected %d\n"),
586 count
, ACE_MAX_ITERATIONS
),
591 if (stream
.close () == -1)
592 ACE_ERROR_RETURN ((LM_ERROR
,
597 // Remove the filename.
598 ACE_OS::unlink (OPTIONS::instance ()->filename ());
603 // Performs the server activities.
605 // Have all connections been serviced?
610 return connections
== ACE_MAX_ITERATIONS
+ 1;
617 ACE_Reactor::instance ()->owner (ACE_Thread::self ());
619 while (!done () && result
!= -1)
620 // Run the main event loop.
621 result
= ACE_Reactor::instance ()->handle_events ();
626 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
629 run_main (int argc
, ACE_TCHAR
*argv
[])
631 ACE_START_TEST (ACE_TEXT ("Process_Strategy_Test"));
633 if (OPTIONS::instance ()->parse_args (argc
, argv
) == -1)
634 ACE_ERROR_RETURN ((LM_ERROR
,
636 ACE_TEXT ("parse_args")),
639 #ifndef ACE_LACKS_ACCEPT
643 ACE_INET_Addr server_addr
;
645 // Bind acceptor to any port and then find out what the port was.
646 // Note that this implicitly creates the Reactor singleton.
647 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&),
648 ACE_Reactor::instance(),
651 OPTIONS::instance ()->concurrency_strategy ()) == -1
652 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
653 ACE_ERROR_RETURN ((LM_ERROR
,
659 ACE_DEBUG ((LM_DEBUG
,
660 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
661 server_addr
.get_port_number ()));
663 #if !defined (ACE_LACKS_FORK)
664 // We're running the client and serve as separate processes.
665 pid_t pid
= ACE::fork (ACE_TEXT ("child"),
666 1); // Avoid zombies.
671 ACE_ERROR ((LM_ERROR
,
672 ACE_TEXT ("(%P|%t) %p\n%a"),
673 ACE_TEXT ("fork failed")));
677 client (&server_addr
);
685 #elif defined (ACE_HAS_THREADS)
686 if (ACE_Thread_Manager::instance ()->spawn
687 (ACE_THR_FUNC (server
),
689 THR_NEW_LWP
| THR_DETACHED
) == -1)
690 ACE_ERROR ((LM_ERROR
,
691 ACE_TEXT ("(%P|%t) %p\n%a"),
692 ACE_TEXT ("thread create failed")));
694 if (ACE_Thread_Manager::instance ()->spawn
695 (ACE_THR_FUNC (client
),
696 (void *) &server_addr
,
697 THR_NEW_LWP
| THR_DETACHED
) == -1)
698 ACE_ERROR ((LM_ERROR
,
699 ACE_TEXT ("(%P|%t) %p\n%a"),
700 ACE_TEXT ("thread create failed")));
702 // Wait for the threads to exit.
703 ACE_Thread_Manager::instance ()->wait ();
706 ACE_TEXT ("(%P|%t) only one thread may be run ")
707 ACE_TEXT ("in a process on this platform\n")));
708 #endif /* ACE_HAS_THREADS */
711 #endif // ACE_LACKS_ACCEPT