2 //=============================================================================
4 * @file Process_Strategy_Test.cpp
6 * This is a test of the <ACE_Strategy_Acceptor> and
7 * <ACE_File_Lock> classes. The <ACE_Strategy_Acceptor> uses
8 * either the <ACE_Process_Strategy> (which forks a
9 * process-per-connection and runs as a concurrent server
10 * process), the <ACE_Thread_Strategy> (which spawns a
11 * thread-per-connection and runs as a concurrent server thread),
12 * or <ACE_Reactive_Strategy> (which register the <Svc_Handler>
13 * with the <Reactor> and runs in the main thread of control as an
14 * iterative server). This server queries and increments a
15 * "counting value" in a file.
17 * This test program can be run in the following ways:
19 * # Run the server "reactively" (i.e., iteratively)
20 * % Process_Strategy_Test -c REACTIVE
22 * # Run the server in multi-threads.
23 * % Process_Strategy_Test -c THREAD
25 * # Run the server in multi-processes
26 * % Process_Strategy_Test -c PROCESS
28 * @author Douglas C. Schmidt <d.schmidt@vanderbilt.edu> and Kevin Boyle <kboyle@sanwafp.com>
30 //=============================================================================
32 #include "test_config.h"
33 #include "ace/OS_NS_string.h"
34 #include "ace/OS_NS_unistd.h"
35 #include "ace/Acceptor.h"
36 #include "ace/Handle_Set.h"
37 #include "ace/Get_Opt.h"
38 #include "ace/Null_Condition.h"
39 #include "ace/Null_Mutex.h"
40 #include "ace/SOCK_Acceptor.h"
41 #include "ace/SOCK_Connector.h"
42 #include "ace/Strategies_T.h"
43 #include "ace/Singleton.h"
44 #include "ace/Synch_Traits.h"
45 #include "ace/File_Lock.h"
46 #include "ace/Lib_Find.h"
48 // Counting_Service and Options in here
49 #include "Process_Strategy_Test.h"
51 ACE_SINGLETON_TEMPLATE_INSTANTIATE(ACE_Singleton
, Options
, ACE_Null_Mutex
);
53 // Define a <Strategy_Acceptor> that's parameterized by the
54 // <Counting_Service>.
56 using ACCEPTOR
= ACE_Strategy_Acceptor
<Counting_Service
, ACE_SOCK_Acceptor
>;
58 // Create an Options Singleton.
59 using OPTIONS
= ACE_Singleton
<Options
, ACE_Null_Mutex
>;
61 // counter for connections
62 static size_t connections
= 0;
64 // Use this to show down the process gracefully.
66 connection_completed ()
68 // Increment connection counter.
71 // If all connections have been serviced.
72 if (connections
== ACE_MAX_ITERATIONS
+ 1)
73 // Make sure that the event loop is interrupted.
74 ACE_Reactor::instance()->wakeup_all_threads ();
78 Process_Strategy::Process_Strategy (size_t n_processes
,
79 ACE_Event_Handler
*acceptor
,
82 : ACE_Process_Strategy
<Counting_Service
> (n_processes
,
89 // Destructor. g++ 2.7.2.3 gets very confused ("Internal compiler
90 // error") without it.
91 Process_Strategy::~Process_Strategy ()
95 // Overwrite the process creation method to include connection
99 Process_Strategy::activate_svc_handler (Counting_Service
*svc_handler
,
102 // Call down to the base class
104 ACE_Process_Strategy
<Counting_Service
>::activate_svc_handler (svc_handler
,
106 // Connection is now complete
107 connection_completed ();
113 Options::file_lock ()
115 return this->file_lock_
;
118 ACE_Concurrency_Strategy
<Counting_Service
> *
119 Options::concurrency_strategy ()
121 return this->concurrency_strategy_
;
127 return this->filename_
;
132 // Choose to use processes by default.
133 #if !defined (ACE_LACKS_FORK)
134 concurrency_type_ (PROCESS
)
135 #elif defined (ACE_HAS_THREADS)
136 concurrency_type_ (THREAD
)
138 concurrency_type_ (REACTIVE
)
139 #endif /* !ACE_LACKS_FORK */
145 delete this->concurrency_strategy_
;
149 Options::parse_args (int argc
, ACE_TCHAR
*argv
[])
151 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("p:c:f:"));
153 // - 26 is for the "process_strategy_test_temp" that is appended
154 if (ACE::get_temp_dir (this->filename_
, MAXPATHLEN
- 26) == -1)
155 ACE_ERROR_RETURN ((LM_ERROR
,
156 ACE_TEXT ("Temporary path too long\n")),
159 ACE_OS::strcat (this->filename_
, ACE_TEXT ("process_strategy_test_temp"));
161 for (int c
; (c
= get_opt ()) != -1; )
165 if (ACE_OS::strcmp (get_opt
.opt_arg (),
166 ACE_TEXT ("REACTIVE")) == 0)
167 OPTIONS::instance ()->concurrency_type (Options::REACTIVE
);
168 #if !defined (ACE_LACKS_FORK)
169 else if (ACE_OS::strcmp (get_opt
.opt_arg (),
170 ACE_TEXT ("PROCESS")) == 0)
171 OPTIONS::instance ()->concurrency_type (Options::PROCESS
);
172 #endif /* !ACE_LACKS_FORK */
173 #if defined (ACE_HAS_THREADS)
174 else if (ACE_OS::strcmp (get_opt
.opt_arg (),
175 ACE_TEXT ("THREAD")) == 0)
176 OPTIONS::instance ()->concurrency_type (Options::THREAD
);
177 #endif /* ACE_HAS_THREADS */
179 ACE_DEBUG ((LM_DEBUG
,
180 ACE_TEXT ("WARNING: concurrency strategy \"%s\" ")
181 ACE_TEXT ("is not supported\n"),
182 get_opt
.opt_arg ()));
185 ACE_OS::strcpy (this->filename_
, get_opt
.opt_arg ());
188 ACE_DEBUG ((LM_DEBUG
,
189 ACE_TEXT ("usage: %n [-f (filename)] ")
190 ACE_TEXT ("[-c (concurrency strategy)]\n%a"), 1));
194 // Initialize the file lock. Note that this object lives beyond the
195 // lifetime of the Acceptor.
196 if (this->file_lock_
.open (this->filename_
,
198 ACE_DEFAULT_FILE_PERMS
) == -1)
199 ACE_ERROR_RETURN ((LM_ERROR
,
204 ACE_DEBUG ((LM_DEBUG
,
205 ACE_TEXT ("(%P|%t) opening %s on handle %d.\n"),
207 this->file_lock_
.get_handle ()));
211 // Store the initial value of the count in the file.
212 if (ACE_OS::write (this->file_lock_
.get_handle (),
213 (const void *) &count
,
214 sizeof count
) != (ssize_t
) sizeof count
)
215 ACE_ERROR ((LM_ERROR
,
216 ACE_TEXT ("(%P|%t) %p\n"),
217 ACE_TEXT ("write")));
219 // Initialize the Concurrency strategy.
220 switch (this->concurrency_type_
)
222 case Options::PROCESS
:
223 #if !defined (ACE_LACKS_FORK)
224 ACE_NEW_RETURN (this->concurrency_strategy_
,
227 ACE_Reactor::instance (),
228 1), // Avoid zombies.
232 ACE_TEST_ASSERT ("PROCESS invalid on this platform" == 0);
233 #endif /* !defined (ACE_LACKS_FORK) */
234 case Options::THREAD
:
235 #if defined (ACE_HAS_THREADS)
236 ACE_NEW_RETURN (this->concurrency_strategy_
,
237 ACE_Thread_Strategy
<Counting_Service
>
238 (ACE_Thread_Manager::instance (),
244 ACE_TEST_ASSERT (!"THREAD invalid on this platform");
245 #endif /* !ACE_HAS_THREADS */
246 case Options::REACTIVE
:
247 // Settle for the purely Reactive strategy.
248 ACE_NEW_RETURN (this->concurrency_strategy_
,
249 ACE_Reactive_Strategy
<Counting_Service
>
250 (ACE_Reactor::instance ()),
258 Options::Concurrency_Type
259 Options::concurrency_type ()
261 return this->concurrency_type_
;
265 Options::concurrency_type (Options::Concurrency_Type cs
)
267 this->concurrency_type_
= cs
;
270 Counting_Service::Counting_Service (ACE_Thread_Manager
*)
272 ACE_DEBUG ((LM_DEBUG
,
273 ACE_TEXT ("(%P|%t) creating the Counting_Service\n")));
276 // Read the current value from the shared file and return it to the
280 Counting_Service::read ()
282 ACE_READ_GUARD_RETURN (ACE_File_Lock
, ace_mon
, OPTIONS::instance ()->file_lock (), -1);
284 ACE_DEBUG ((LM_DEBUG
,
285 ACE_TEXT ("(%P|%t) reading on handle %d.\n"),
286 OPTIONS::instance ()->file_lock ().get_handle ()));
289 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
292 0) != (ssize_t
) sizeof count
)
293 ACE_ERROR_RETURN ((LM_ERROR
,
294 ACE_TEXT ("(%P|%t) %p\n"),
299 int n
= ACE_OS::snprintf (buf
, BUFSIZ
, "count = %d\n", count
);
300 ACE_DEBUG ((LM_DEBUG
,
301 ACE_TEXT ("(%P|%t) count = %d\n"),
304 if (this->peer ().send_n (buf
, n
) != n
)
305 ACE_ERROR_RETURN ((LM_ERROR
,
306 ACE_TEXT ("(%P|%t) %p\n"),
307 ACE_TEXT ("send_n")),
312 // Increment the current value in the shared file by 1.
315 Counting_Service::inc ()
317 ACE_WRITE_GUARD_RETURN (ACE_File_Lock
, ace_mon
,
318 OPTIONS::instance ()->file_lock (), -1);
320 ACE_DEBUG ((LM_DEBUG
,
321 ACE_TEXT ("(%P|%t) incrementing on handle %d.\n"),
322 OPTIONS::instance ()->file_lock ().get_handle ()));
325 if (ACE_OS::pread (OPTIONS::instance ()->file_lock ().get_handle (),
328 0) != (ssize_t
) sizeof count
)
329 ACE_ERROR_RETURN ((LM_ERROR
,
330 ACE_TEXT ("(%P|%t) %p\n"),
334 ACE_DEBUG ((LM_DEBUG
,
335 ACE_TEXT ("(%P|%t) incrementing count from %d to %d\n"),
340 if (ACE_OS::pwrite (OPTIONS::instance ()->file_lock ().get_handle (),
341 (const void *) &count
,
343 0) != (ssize_t
) sizeof count
)
344 ACE_ERROR_RETURN ((LM_ERROR
,
345 ACE_TEXT ("(%P|%t) %p\n"),
351 // Receive the request from the client and call the appropriate
355 Counting_Service::handle_input (ACE_HANDLE
)
358 ACE_Time_Value
* timeout
= 0;
360 ACE_DEBUG ((LM_DEBUG
,
361 ACE_TEXT ("(%P|%t) reading from peer on %d\n"),
362 this->peer ().get_handle ()));
364 // Read the PDU length first.
365 ssize_t bytes
= this->peer ().recv ((void *) &len
,
371 bytes
= this->peer ().recv (buf
, len
);
373 if (bytes
<= 0 || buf
[0] == (char) EOF
)
378 ACE_DEBUG ((LM_DEBUG
,
379 ACE_TEXT ("(%P|%t) %d bytes of input on %d is %*s\n"),
381 this->peer ().get_handle (),
383 ACE_TEXT_CHAR_TO_TCHAR (buf
)));
384 // Read and return the current value in the file.
385 if (ACE_OS::strncmp (buf
,
388 return this->read ();
389 // Increment the current value in the file.
390 else if (ACE_OS::strncmp (buf
, "inc", 3) == 0)
393 ACE_DEBUG ((LM_DEBUG
,
394 ACE_TEXT ("(%P|%t) no match...\n")));
400 Counting_Service::svc ()
402 ACE_DEBUG ((LM_DEBUG
,
403 ACE_TEXT ("(%P|%t) handling thread\n")));
405 while (this->handle_input () >= 0)
412 Counting_Service::handle_close (ACE_HANDLE
,
415 // Count completed connections here only when the test is not in
416 // "process-per-connection" mode. In general, this should not be
417 // done here. Proper place for this is activate_svc_handler() but
418 // since only "process-per-connection" hooks into that function in
419 // other modes it's done here. The later creates a problem in
420 // "process-per-connection" mode since it calculates the same
421 // connection twice and as a result it cannot finalize gracefully.
422 if (OPTIONS::instance ()->concurrency_type () != Options::PROCESS
)
424 // Done with another connection.
425 connection_completed ();
428 // Call down to base class
429 return ACE_Svc_Handler
<ACE_SOCK_STREAM
, ACE_NULL_SYNCH
>::handle_close ();
432 // This method is called back by the <Acceptor> once the client has
433 // connected and the process is forked or spawned.
436 Counting_Service::open (void *)
438 ACE_DEBUG ((LM_DEBUG
,
439 ACE_TEXT ("(%P|%t) opening service\n")));
441 if (OPTIONS::instance ()->concurrency_type () == Options::PROCESS
)
443 // We need to rerun the event loop here since we ended up here
444 // due to being fork'd and we can't just return to our context
445 // because it's in the middle of a different event loop that
446 // won't behave properly since it's meant to handle connection
447 // establishment, *not* data transfer.
448 while (this->handle_input () >= 0)
451 ACE_DEBUG ((LM_DEBUG
,
452 ACE_TEXT ("(%P|%t) About to exit from the child\n")));
457 else if (OPTIONS::instance ()->concurrency_type () == Options::THREAD
)
458 // We need to set this to 0 so that our <shutdown> method doesn't
459 // try to deregister <this> from the Reactor.
464 #if !defined (ACE_LACKS_FORK) || defined (ACE_HAS_THREADS)
466 // Execute the client tests.
471 ACE_INET_Addr
*remote_addr
=
472 reinterpret_cast<ACE_INET_Addr
*> (arg
);
473 ACE_INET_Addr
server_addr (remote_addr
->get_port_number (),
474 ACE_DEFAULT_SERVER_HOST
);
475 ACE_SOCK_Stream stream
;
476 ACE_SOCK_Connector connector
;
479 const char *command
= 0;
483 for (i
= 0; i
< ACE_MAX_ITERATIONS
; i
++)
485 ACE_DEBUG ((LM_DEBUG
,
486 ACE_TEXT ("(%P|%t) client iteration %d\n"),
488 if (connector
.connect (stream
,
490 ACE_ERROR_RETURN ((LM_ERROR
,
495 command_len
= ACE_OS::strlen (command
);
497 #ifndef ACE_LACKS_VA_FUNCTIONS
499 &command_len
, sizeof command_len
,
500 command
, command_len
) == -1)
501 ACE_ERROR_RETURN ((LM_ERROR
,
506 command_len
= ACE_OS::strlen (command
);
509 &command_len
, sizeof command_len
,
510 command
, command_len
) == -1)
511 ACE_ERROR_RETURN ((LM_ERROR
,
515 else if (stream
.recv (buf
, sizeof buf
) <= 0)
516 ACE_ERROR_RETURN ((LM_ERROR
,
517 ACE_TEXT ("(%P|%t) %p\n"),
521 ACE_UNUSED_ARG (command_len
);
523 // ACE_DEBUG ((LM_DEBUG,
524 // ACE_TEXT ("(%P|%t) client iteration %d, buf = %C\n"),
527 if (stream
.close () == -1)
528 ACE_ERROR_RETURN ((LM_ERROR
,
535 command_len
= ACE_OS::strlen (command
);
536 ssize_t bytes_read
= 0;
538 if (connector
.connect (stream
, server_addr
) == -1)
539 ACE_ERROR_RETURN ((LM_ERROR
,
543 #ifndef ACE_LACKS_VA_FUNCTIONS
544 else if (stream
.send (4,
545 &command_len
, sizeof command_len
,
546 command
, command_len
) == -1)
547 ACE_ERROR_RETURN ((LM_ERROR
,
552 else if ((bytes_read
= stream
.recv (buf
, sizeof buf
)) <= 0)
553 ACE_ERROR_RETURN ((LM_ERROR
,
559 // Null terminate buf to avoid an uninitialized memory read in
560 // the call to ACE_OS::strrchr ().
561 buf
[bytes_read
] = '\0';
563 size_t count
= ACE_OS::atoi (ACE_OS::strrchr (ACE_TEXT_CHAR_TO_TCHAR (buf
),
566 ACE_DEBUG ((LM_DEBUG
,
567 ACE_TEXT ("(%P|%t) count = %d\n"),
569 // Make sure that the count is correct.
570 if (count
!= ACE_MAX_ITERATIONS
)
572 ACE_ERROR_RETURN ((LM_ERROR
,
573 ACE_TEXT ("Error: Count invalid, has %d expected %d\n"),
574 count
, ACE_MAX_ITERATIONS
),
579 if (stream
.close () == -1)
580 ACE_ERROR_RETURN ((LM_ERROR
,
585 // Remove the filename.
586 ACE_OS::unlink (OPTIONS::instance ()->filename ());
591 // Performs the server activities.
593 // Have all connections been serviced?
598 return connections
== ACE_MAX_ITERATIONS
+ 1;
605 ACE_Reactor::instance ()->owner (ACE_Thread::self ());
607 while (!done () && result
!= -1)
608 // Run the main event loop.
609 result
= ACE_Reactor::instance ()->handle_events ();
614 #endif /* !ACE_LACKS_FORK || ACE_HAS_THREADS */
617 run_main (int argc
, ACE_TCHAR
*argv
[])
619 ACE_START_TEST (ACE_TEXT ("Process_Strategy_Test"));
621 if (OPTIONS::instance ()->parse_args (argc
, argv
) == -1)
622 ACE_ERROR_RETURN ((LM_ERROR
,
624 ACE_TEXT ("parse_args")),
627 #ifndef ACE_LACKS_ACCEPT
631 ACE_INET_Addr server_addr
;
633 // Bind acceptor to any port and then find out what the port was.
634 // Note that this implicitly creates the Reactor singleton.
635 if (acceptor
.open (ACE_sap_any_cast (const ACE_INET_Addr
&),
636 ACE_Reactor::instance(),
639 OPTIONS::instance ()->concurrency_strategy ()) == -1
640 || acceptor
.acceptor ().get_local_addr (server_addr
) == -1)
641 ACE_ERROR_RETURN ((LM_ERROR
,
647 ACE_DEBUG ((LM_DEBUG
,
648 ACE_TEXT ("(%P|%t) starting server at port %d\n"),
649 server_addr
.get_port_number ()));
651 #if !defined (ACE_LACKS_FORK)
652 // We're running the client and serve as separate processes.
653 pid_t pid
= ACE::fork (ACE_TEXT ("child"),
654 1); // Avoid zombies.
659 ACE_ERROR ((LM_ERROR
,
660 ACE_TEXT ("(%P|%t) %p\n%a"),
661 ACE_TEXT ("fork failed")));
665 client (&server_addr
);
673 #elif defined (ACE_HAS_THREADS)
674 if (ACE_Thread_Manager::instance ()->spawn
675 (ACE_THR_FUNC (server
),
677 THR_NEW_LWP
| THR_DETACHED
) == -1)
678 ACE_ERROR ((LM_ERROR
,
679 ACE_TEXT ("(%P|%t) %p\n%a"),
680 ACE_TEXT ("thread create failed")));
682 if (ACE_Thread_Manager::instance ()->spawn
683 (ACE_THR_FUNC (client
),
684 (void *) &server_addr
,
685 THR_NEW_LWP
| THR_DETACHED
) == -1)
686 ACE_ERROR ((LM_ERROR
,
687 ACE_TEXT ("(%P|%t) %p\n%a"),
688 ACE_TEXT ("thread create failed")));
690 // Wait for the threads to exit.
691 ACE_Thread_Manager::instance ()->wait ();
694 ACE_TEXT ("(%P|%t) only one thread may be run ")
695 ACE_TEXT ("in a process on this platform\n")));
696 #endif /* ACE_HAS_THREADS */
699 #endif // ACE_LACKS_ACCEPT