2 //=============================================================================
4 * @file MT_Reference_Counted_Event_Handler_Test.cpp
6 * This test tries to represents what happens in the ORB wrt to
7 * event handlers, reactors, timer queues, threads, and connection
8 * caches, minus the other complexities. The following reactors
9 * are tested: Select, TP, WFMO, and Dev Poll (if enabled).
11 * The test checks proper use and shutting down of client-side
12 * event handlers when it is used by invocation threads and/or
13 * event loop threads. Server-side event handlers are either
14 * threaded or reactive. A purger thread is introduced to check the
15 * connection recycling and cache purging. Nested upcalls are also
18 * @author Irfan Pyarali <irfan@oomworks.com>
20 //=============================================================================
22 #include "test_config.h"
23 #include "ace/Reactor.h"
24 #include "ace/Select_Reactor.h"
25 #include "ace/TP_Reactor.h"
26 #include "ace/WFMO_Reactor.h"
27 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/SOCK_Acceptor.h"
31 #include "ace/SOCK_Connector.h"
32 #include "ace/Auto_Event.h"
33 #include "ace/OS_NS_signal.h"
34 #include "ace/OS_NS_time.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/OS_NS_unistd.h"
38 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
40 static const char message
[] = "abcdefghijklmnopqrstuvwxyz";
41 static const int message_size
= 26;
42 static int test_select_reactor
= 1;
43 static int test_tp_reactor
= 1;
44 static int test_wfmo_reactor
= 1;
45 static int test_dev_poll_reactor
= 1;
47 static int number_of_connections
= 5;
48 static int max_nested_upcall_level
= 10;
49 static int close_timeout
= 500;
50 static int pipe_open_attempts
= 10;
51 static int pipe_retry_timeout
= 1;
52 static int make_invocations
= -1;
53 static int run_event_loop_thread
= -1;
54 static int run_purger_thread
= -1;
55 static int run_receiver_thread
= -1;
56 static int nested_upcalls
= -1;
58 static ACE_HANDLE server_handle
= ACE_INVALID_HANDLE
;
59 static ACE_HANDLE client_handle
= ACE_INVALID_HANDLE
;
61 static int number_of_options
= 5;
62 static int test_configs
[][5] =
65 // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls
68 // { 0, 0, 0, 0, 0, }, // At least one thread should be running.
69 // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
70 // no thread will know that the socket is closed.
71 // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
72 // we cannot decide which socket to close.
73 // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
74 // no thread will know that the socket is closed.
75 // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
76 // we cannot decide which socket to close.
78 // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations.
79 // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
80 // we cannot decide which socket to close.
82 // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations.
83 // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
84 // no thread can receive the messages.
86 // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
87 // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
88 // no thread can receive the messages.
90 // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
94 // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
98 // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
102 disable_signal (int sigmin
, int sigmax
)
104 #if !defined (ACE_LACKS_UNIX_SIGNALS)
106 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
107 ACE_ERROR ((LM_ERROR
,
108 ACE_TEXT ("Error: (%P|%t):%p\n"),
109 ACE_TEXT ("sigemptyset failed")));
111 for (int i
= sigmin
; i
<= sigmax
; i
++)
112 ACE_OS::sigaddset (&signal_set
, i
);
114 // Put the <signal_set>.
115 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
116 // In multi-threaded application this is not POSIX compliant
117 // but let's leave it just in case.
118 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
120 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
121 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
122 ACE_ERROR_RETURN ((LM_ERROR
,
123 ACE_TEXT ("Error: (%P|%t): %p\n"),
124 ACE_TEXT ("SIG_BLOCK failed")),
127 ACE_UNUSED_ARG (sigmin
);
128 ACE_UNUSED_ARG (sigmax
);
129 #endif /* ACE_LACKS_UNIX_SIGNALS */
134 /* Replication of the ACE_Pipe class. Only difference is that this
135 class always uses two sockets to create the pipe, even on platforms
136 that support pipes. */
144 //FUZZ: disable check_for_lack_ACE_OS
145 ///FUZZ: enable check_for_lack_ACE_OS
148 ACE_HANDLE
read_handle (void) const;
150 ACE_HANDLE
write_handle (void) const;
153 ACE_HANDLE handles_
[2];
159 ACE_INET_Addr my_addr
;
160 ACE_SOCK_Acceptor acceptor
;
161 ACE_SOCK_Connector connector
;
162 ACE_SOCK_Stream reader
;
163 ACE_SOCK_Stream writer
;
166 // Bind listener to any port and then find out what the port was.
167 if (acceptor
.open (ACE_Addr::sap_any
) == -1
168 || acceptor
.get_local_addr (my_addr
) == -1)
172 int af
= my_addr
.get_type ();
173 const ACE_TCHAR
*local
= ACE_LOCALHOST
;
174 #if defined (ACE_HAS_IPV6)
176 local
= ACE_IPV6_LOCALHOST
;
177 #endif /* ACE_HAS_IPV6 */
178 ACE_INET_Addr
sv_addr (my_addr
.get_port_number (),
182 // Establish a connection within the same process.
183 if (connector
.connect (writer
, sv_addr
) == -1)
185 else if (acceptor
.accept (reader
) == -1)
192 // Close down the acceptor endpoint since we don't need it anymore.
197 this->handles_
[0] = reader
.get_handle ();
198 this->handles_
[1] = writer
.get_handle ();
205 this->handles_
[0] = ACE_INVALID_HANDLE
;
206 this->handles_
[1] = ACE_INVALID_HANDLE
;
210 Pipe::read_handle (void) const
212 return this->handles_
[0];
216 Pipe::write_handle (void) const
218 return this->handles_
[1];
221 class Connection_Cache
;
222 class Event_Loop_Thread
;
224 static Event_Loop_Thread
*global_event_loop_thread_variable
= 0;
226 class Sender
: public ACE_Event_Handler
230 Sender (ACE_HANDLE handle
,
231 Connection_Cache
&connection_cache
);
235 int handle_input (ACE_HANDLE
);
237 ssize_t
send_message (void);
239 //FUZZ: disable check_for_lack_ACE_OS
240 ///FUZZ: enable check_for_lack_ACE_OS
245 Connection_Cache
&connection_cache_
;
249 class Connection_Cache
253 Connection_Cache (void);
255 ~Connection_Cache (void);
257 void add_connection (Sender
*sender
);
259 void remove_connection (Sender
*sender
);
261 Sender
*acquire_connection (void);
263 void release_connection (Sender
*sender
);
265 int find (Sender
*sender
);
267 ACE_SYNCH_MUTEX
&lock (void);
284 ACE_SYNCH_MUTEX lock_
;
287 Sender::Sender (ACE_HANDLE handle
,
288 Connection_Cache
&connection_cache
)
290 connection_cache_ (connection_cache
)
292 // Enable reference counting.
293 this->reference_counting_policy ().value
294 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
297 ACE_DEBUG ((LM_DEBUG
,
298 ACE_TEXT ("(%t) Reference count in Sender::Sender() is %d\n"),
299 this->reference_count_
.value ()));
302 Sender::~Sender (void)
305 ACE_DEBUG ((LM_DEBUG
,
306 ACE_TEXT ("(%t) Reference count in ~Sender::Sender() is %d\n"),
307 this->reference_count_
.value ()));
309 // Close the socket that we are responsible for.
310 ACE_OS::closesocket (this->handle_
);
314 Sender::handle_input (ACE_HANDLE
)
317 ACE_DEBUG ((LM_DEBUG
,
318 ACE_TEXT ("(%t) Reference count in Sender::handle_input() is %d\n"),
319 this->reference_count_
.value ()));
322 // In this test, this method is only called when the connection has
323 // been closed. Remove self from Reactor.
326 ACE_DEBUG ((LM_DEBUG
,
327 ACE_TEXT ("(%t) Event loop thread calling Sender::close() ")
328 ACE_TEXT ("for handle %d\n"),
339 // Remove socket from Reactor (may fail if another thread has already
340 // removed the handle from the Reactor).
341 if (this->reactor() != 0)
342 this->reactor ()->remove_handler (this->handle_
,
343 ACE_Event_Handler::ALL_EVENTS_MASK
);
345 // Remove self from connection cache (may fail if another thread has
346 // already removed "this" from the cache).
347 this->connection_cache_
.remove_connection (this);
351 Sender::send_message (void)
353 ACE_Time_Value
timeout (0, close_timeout
* 1000);
355 return ACE::send_n (this->handle_
,
361 class Event_Loop_Thread
: public ACE_Task_Base
365 Event_Loop_Thread (ACE_Thread_Manager
&thread_manager
,
366 ACE_Reactor
&reactor
);
370 ACE_Reactor
&reactor_
;
374 class Receiver
: public ACE_Task_Base
378 Receiver (ACE_Thread_Manager
&thread_manager
,
386 //FUZZ: disable check_for_lack_ACE_OS
387 ///FUZZ: enable check_for_lack_ACE_OS
388 int close (u_long flags
);
390 int handle_input (ACE_HANDLE
);
392 int resume_handler (void);
400 int nested_upcalls_level_
;
404 Receiver::Receiver (ACE_Thread_Manager
&thread_manager
,
407 : ACE_Task_Base (&thread_manager
),
410 nested_upcalls_ (nested_upcalls
),
411 nested_upcalls_level_ (0)
413 // Enable reference counting.
414 this->reference_counting_policy ().value
415 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
418 ACE_DEBUG ((LM_DEBUG
,
419 ACE_TEXT ("(%t) Reference count in Receiver::Receiver() is %d\n"),
420 this->reference_count_
.value ()));
423 Receiver::~Receiver (void)
426 ACE_DEBUG ((LM_DEBUG
,
427 ACE_TEXT ("(%t) Reference count in ~Receiver::Receiver() is %d\n"),
428 this->reference_count_
.value ()));
430 // Close the socket that we are responsible for.
431 ACE_OS::closesocket (this->handle_
);
438 // Continuously receive messages from the Sender. On error, exit
443 ACE_DEBUG ((LM_DEBUG
,
444 ACE_TEXT("(%t) Receiver::svc commencing, handle = %d\n"),
447 disable_signal (SIGPIPE
, SIGPIPE
);
452 this->handle_input (this->handle_
);
454 ACE_DEBUG ((LM_DEBUG
,
455 ACE_TEXT("(%t) Receiver::svc terminating, handle = %d\n"),
461 Receiver::handle_input (ACE_HANDLE handle
)
463 char buf
[message_size
+ 1];
465 ACE_Time_Value
timeout (0, close_timeout
* 1000);
474 if (debug
&& result
< 1)
475 ACE_DEBUG ((LM_DEBUG
,
476 ACE_TEXT("(%t) Receiver::handle input, ")
477 ACE_TEXT("h = %d, result = %d %p\n"),
478 handle_
, result
, ACE_TEXT("ACE::recv_n")));
480 if (this->reactor ())
481 this->reactor ()->resume_handler (handle
);
483 if (result
== message_size
)
486 ACE_DEBUG ((LM_DEBUG
,
487 ACE_TEXT ("(%t) Message %d received on handle %d\n"),
491 if (this->thr_count () == 0 &&
492 this->nested_upcalls_
)
494 this->nested_upcalls_level_
++;
497 ACE_DEBUG ((LM_DEBUG
,
498 ACE_TEXT ("(%t) Nesting level %d\n"),
499 this->nested_upcalls_level_
));
501 if ((this->nested_upcalls_level_
!= max_nested_upcall_level
) &&
502 (global_event_loop_thread_variable
!= 0))
503 global_event_loop_thread_variable
->svc ();
505 this->nested_upcalls_level_
--;
514 ACE_DEBUG ((LM_DEBUG
,
515 ACE_TEXT ("(%t) /*** Problem in receiving message %d on handle")
516 ACE_TEXT (" %d: shutting down receiving thread ***/\n"),
525 Receiver::resume_handler (void)
527 /// The application takes responsibility of resuming the handler.
528 return ACE_APPLICATION_RESUMES_HANDLER
;
532 Receiver::close (u_long
)
534 // If threaded, we are responsible for deleting this instance when
535 // the thread completes. If not threaded, Reactor reference
536 // counting will handle the deletion of this instance.
545 Connector (ACE_Thread_Manager
&thread_manager
,
546 ACE_Reactor
&reactor
,
549 //FUZZ: disable check_for_lack_ACE_OS
550 ///FUZZ: enable check_for_lack_ACE_OS
551 int connect (ACE_HANDLE
&client_handle
,
552 ACE_HANDLE
&server_handle
,
553 int run_receiver_thread
);
555 ACE_Thread_Manager
&thread_manager_
;
557 ACE_Reactor
&reactor_
;
563 Connector::Connector (ACE_Thread_Manager
&thread_manager
,
564 ACE_Reactor
&reactor
,
566 : thread_manager_ (thread_manager
),
568 nested_upcalls_ (nested_upcalls
)
573 Connector::connect (ACE_HANDLE
&client_handle
,
574 ACE_HANDLE
&server_handle
,
575 int run_receiver_thread
)
578 // Create a connection and a receiver to receive messages on the
585 for (int i
= 0; i
< pipe_open_attempts
; ++i
)
594 ACE_OS::sleep (pipe_retry_timeout
);
597 ACE_TEST_ASSERT (result
== 0);
598 ACE_UNUSED_ARG (result
);
601 new Receiver (this->thread_manager_
,
602 pipe
.write_handle (),
603 this->nested_upcalls_
);
605 // Either the receiver is threaded or register it with the Reactor.
606 if (run_receiver_thread
)
608 receiver
->activate ();
612 this->reactor_
.register_handler (pipe
.write_handle (),
614 ACE_Event_Handler::READ_MASK
);
616 // The reference count on the receiver was increased by the
618 ACE_Event_Handler_var
safe_receiver (receiver
);
621 ACE_TEST_ASSERT (result
== 0);
622 ACE_UNUSED_ARG (result
);
628 pipe
.write_handle ();
631 ACE_DEBUG ((LM_DEBUG
,
632 ACE_TEXT ("(%t) New connection: client handle = %d, ")
633 ACE_TEXT ("server handle = %d\n"),
634 client_handle
, server_handle
));
639 Connection_Cache::Connection_Cache (void)
641 // Initialize the connection cache.
643 new Entry
[number_of_connections
];
645 for (int i
= 0; i
< number_of_connections
; ++i
)
647 this->entries_
[i
].sender_
= 0;
648 this->entries_
[i
].state_
= NOT_IN_CACHE
;
653 Connection_Cache::find (Sender
*sender
)
655 for (int i
= 0; i
< number_of_connections
; ++i
)
657 if (this->entries_
[i
].sender_
== sender
)
665 Connection_Cache::add_connection (Sender
*sender
)
667 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
669 // Make sure that the state of the connection cache is as
670 // expected. <sender> should not be already in the cache.
671 ACE_TEST_ASSERT (this->find (sender
) == -1);
676 sender
->add_reference ();
677 this->entries_
[empty_index
].sender_
= sender
;
678 this->entries_
[empty_index
].state_
= BUSY
;
682 Connection_Cache::remove_connection (Sender
*sender
)
684 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
686 // Make sure that the state of the connection cache is as expected.
687 // remove_connection() may already have been called.
694 // If we still have the sender, remove it.
695 sender
->remove_reference ();
696 this->entries_
[index
].sender_
= 0;
697 this->entries_
[index
].state_
= NOT_IN_CACHE
;
701 Connection_Cache::acquire_connection (void)
703 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0);
705 // Find a valid and IDLE sender.
709 for (int i
= 0; i
< number_of_connections
; ++i
)
711 if (this->entries_
[i
].sender_
&&
712 this->entries_
[i
].state_
== IDLE
)
719 this->entries_
[index
].sender_
->add_reference ();
720 this->entries_
[index
].state_
= BUSY
;
722 return this->entries_
[index
].sender_
;
726 Connection_Cache::release_connection (Sender
*sender
)
728 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
730 // Make sure that the state of the connection cache is as expected.
731 // remove_connection() may have already removed the connection from
739 // If we still have the sender, idle it.
740 this->entries_
[index
].state_
= IDLE
;
744 Connection_Cache::lock (void)
749 Connection_Cache::~Connection_Cache (void)
751 for (int i
= 0; i
< number_of_connections
; ++i
)
753 if (this->entries_
[i
].sender_
)
754 this->remove_connection (this->entries_
[i
].sender_
);
757 delete[] this->entries_
;
760 class Invocation_Thread
: public ACE_Task_Base
764 Invocation_Thread (ACE_Thread_Manager
&thread_manager
,
765 ACE_Reactor
&reactor
,
766 Connection_Cache
&connection_cache
,
767 ACE_Auto_Event
&new_connection_event
,
768 int make_invocations
,
769 int run_receiver_thread
,
774 Sender
*create_connection (void);
776 Connection_Cache
&connection_cache_
;
778 ACE_Reactor
&reactor_
;
780 ACE_Thread_Manager
&thread_manager_
;
782 ACE_Auto_Event
&new_connection_event_
;
784 int make_invocations_
;
786 int run_receiver_thread_
;
792 Invocation_Thread::Invocation_Thread (ACE_Thread_Manager
&thread_manager
,
793 ACE_Reactor
&reactor
,
794 Connection_Cache
&connection_cache
,
795 ACE_Auto_Event
&new_connection_event
,
796 int make_invocations
,
797 int run_receiver_thread
,
799 : ACE_Task_Base (&thread_manager
),
800 connection_cache_ (connection_cache
),
802 thread_manager_ (thread_manager
),
803 new_connection_event_ (new_connection_event
),
804 make_invocations_ (make_invocations
),
805 run_receiver_thread_ (run_receiver_thread
),
806 nested_upcalls_ (nested_upcalls
)
811 Invocation_Thread::create_connection (void)
815 // Connector for creating new connections.
816 Connector
connector (this->thread_manager_
,
818 this->nested_upcalls_
);
820 // <server_handle> is a global variable. It will be used later by
821 // the Close_Socket_Thread.
823 connector
.connect (client_handle
,
825 this->run_receiver_thread_
);
826 ACE_TEST_ASSERT (result
== 0);
827 ACE_UNUSED_ARG (result
);
829 // Create a new sender.
831 new Sender (client_handle
,
832 this->connection_cache_
);
834 // Register it with the cache.
835 this->connection_cache_
.add_connection (sender
);
838 // There might be a race condition here. The sender has been added
839 // to the cache and is potentially available to other threads
840 // accessing the cache. Therefore, the other thread may use this
841 // sender and potentially close the sender before it even gets
842 // registered with the Reactor.
844 // This is resolved by marking the connection as busy when it is
845 // first added to the cache. And only once the thread creating the
846 // connection is done with it, it is marked a available in the
849 // This order of registration is important.
852 // Register the handle with the Reactor.
854 this->reactor_
.register_handler (client_handle
,
856 ACE_Event_Handler::READ_MASK
);
858 ACE_TEST_ASSERT (result
== 0);
859 ACE_UNUSED_ARG (result
);
862 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) create_connection h %d, %p\n"),
864 ACE_TEXT ("register_handler")));
870 Invocation_Thread::svc (void)
872 int connection_counter
= 0;
873 ACE_DEBUG ((LM_DEBUG
,
874 ACE_TEXT("(%t) Invocation_Thread::svc commencing\n")));
876 disable_signal (SIGPIPE
, SIGPIPE
);
878 for (int message_counter
= 1;; ++message_counter
)
880 // Get a connection from the cache.
882 this->connection_cache_
.acquire_connection ();
884 // If no connection is available in the cache, create a new one.
887 if (connection_counter
< number_of_connections
)
889 sender
= this->create_connection ();
891 // This lets the Close_Socket_Thread know that the new
892 // connection has been created.
894 this->new_connection_event_
.signal ();
895 ACE_TEST_ASSERT (result
== 0);
896 ACE_UNUSED_ARG (result
);
898 ++connection_counter
;
902 // Stop the thread, if the maximum number of connections
903 // for the test has been reached.
907 // The reference count on the sender was increased by the cache
908 // before it was returned to us.
909 ACE_Event_Handler_var
safe_sender (sender
);
911 // If the test does not require making invocations, immediately
912 // release the connection.
913 if (!this->make_invocations_
)
915 this->connection_cache_
.release_connection (sender
);
917 // Sleep for a short while
918 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
924 sender
->send_message ();
926 // If successful, release connection.
927 if (result
== message_size
)
930 ACE_DEBUG ((LM_DEBUG
,
931 ACE_TEXT ("(%t) Message %d:%d delivered on handle %d\n"),
936 this->connection_cache_
.release_connection (sender
);
940 // If failure in making invocation, close the sender.
942 ACE_DEBUG ((LM_DEBUG
,
943 ACE_TEXT ("(%t) /*** Problem in delivering message ")
944 ACE_TEXT ("%d:%d on handle %d: shutting down ")
945 ACE_TEXT ("invocation thread ***/\n"),
950 ACE_DEBUG ((LM_DEBUG
,
951 ACE_TEXT ("(%t) Invocation thread calling ")
952 ACE_TEXT ("Sender::close() for handle %d\n"),
959 ACE_DEBUG ((LM_DEBUG
,
960 ACE_TEXT("(%t) Invocation_Thread::svc calling end_reactor_event_loop\n")));
962 // Close the Reactor event loop.
963 this->reactor_
.end_reactor_event_loop ();
964 ACE_DEBUG ((LM_DEBUG
,
965 ACE_TEXT("(%t) Invocation_Thread::svc terminating\n")));
970 class Close_Socket_Thread
: public ACE_Task_Base
974 Close_Socket_Thread (ACE_Thread_Manager
&thread_manager
,
975 ACE_Reactor
&reactor
,
976 ACE_Auto_Event
&new_connection_event
,
977 int make_invocations
,
978 int run_receiver_thread
);
982 ACE_Auto_Event
&new_connection_event_
;
984 ACE_Reactor
&reactor_
;
986 int make_invocations_
;
988 int run_receiver_thread_
;
992 Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager
&thread_manager
,
993 ACE_Reactor
&reactor
,
994 ACE_Auto_Event
&new_connection_event
,
995 int make_invocations
,
996 int run_receiver_thread
)
997 : ACE_Task_Base (&thread_manager
),
998 new_connection_event_ (new_connection_event
),
1000 make_invocations_ (make_invocations
),
1001 run_receiver_thread_ (run_receiver_thread
)
1006 Close_Socket_Thread::svc (void)
1008 unsigned int seed
= (unsigned int) ACE_OS::time ();
1009 ACE_Time_Value
timeout (0, close_timeout
* 1000);
1010 ACE_DEBUG ((LM_DEBUG
,
1011 ACE_TEXT("(%t) Close_Socket_Thread::svc commencing\n")));
1013 disable_signal (SIGPIPE
, SIGPIPE
);
1015 for (; !this->reactor_
.reactor_event_loop_done ();)
1017 // Wait for the new connection to be established.
1019 this->new_connection_event_
.wait (&timeout
,
1021 ACE_TEST_ASSERT (result
== 0 ||
1022 (result
== -1 && errno
== ETIME
));
1028 // Sleep for half a second.
1029 ACE_OS::sleep (timeout
);
1031 int close_client
= 0;
1033 // If the invocation thread is making invocations and if the
1034 // receiver is threaded, either socket can be closed.
1035 if (this->make_invocations_
&&
1036 this->run_receiver_thread_
)
1037 // Randomize which socket to close.
1038 close_client
= ACE_OS::rand_r (&seed
) % 2;
1040 // If the invocation thread is making invocations, only close
1041 // the client socket.
1042 else if (this->make_invocations_
)
1045 // If the receiver is threaded, only close the server socket.
1050 // Close the client socket.
1052 ACE_DEBUG ((LM_DEBUG
,
1053 ACE_TEXT ("(%t) Close socket thread closing client ")
1054 ACE_TEXT ("handle %d\n"),
1057 ACE_OS::shutdown (client_handle
, ACE_SHUTDOWN_BOTH
);
1058 ACE_OS::closesocket (client_handle
);
1062 // Close the server socket.
1064 ACE_DEBUG ((LM_DEBUG
,
1065 ACE_TEXT ("(%t) Close socket thread closing server ")
1066 ACE_TEXT ("handle %d\n"),
1068 ACE_OS::shutdown (server_handle
, ACE_SHUTDOWN_BOTH
);
1069 ACE_OS::closesocket (server_handle
);
1072 ACE_DEBUG ((LM_DEBUG
,
1073 ACE_TEXT("(%t) Close_Socket_Thread::svc terminating\n")));
1078 Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager
&thread_manager
,
1079 ACE_Reactor
&reactor
)
1080 : ACE_Task_Base (&thread_manager
),
1086 Event_Loop_Thread::svc (void)
1088 // Simply run the event loop.
1089 this->reactor_
.owner (ACE_Thread::self ());
1090 ACE_DEBUG ((LM_DEBUG
,
1091 ACE_TEXT("(%t) Event_Loop_Thread::svc commencing\n")));
1093 disable_signal (SIGPIPE
, SIGPIPE
);
1095 while (!this->reactor_
.reactor_event_loop_done ())
1097 this->reactor_
.handle_events ();
1099 ACE_DEBUG ((LM_DEBUG
,
1100 ACE_TEXT("(%t) Event_Loop_Thread::svc terminating\n")));
1105 class Purger_Thread
: public ACE_Task_Base
1109 Purger_Thread (ACE_Thread_Manager
&thread_manager
,
1110 ACE_Reactor
&reactor
,
1111 Connection_Cache
&connection_cache
);
1115 ACE_Reactor
&reactor_
;
1117 Connection_Cache
&connection_cache_
;
1121 Purger_Thread::Purger_Thread (ACE_Thread_Manager
&thread_manager
,
1122 ACE_Reactor
&reactor
,
1123 Connection_Cache
&connection_cache
)
1124 : ACE_Task_Base (&thread_manager
),
1126 connection_cache_ (connection_cache
)
1131 Purger_Thread::svc (void)
1133 ACE_DEBUG ((LM_DEBUG
,
1134 ACE_TEXT("(%t) Purger_Thread::svc commencing\n")));
1136 disable_signal (SIGPIPE
, SIGPIPE
);
1138 for (; !this->reactor_
.reactor_event_loop_done ();)
1140 // Get a connection from the cache.
1142 this->connection_cache_
.acquire_connection ();
1144 // If no connection is available in the cache, sleep for a while.
1146 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
1149 // The reference count on the sender was increased by the
1150 // cache before it was returned to us.
1151 ACE_Event_Handler_var
safe_sender (sender
);
1153 // Actively close the connection.
1154 ACE_DEBUG ((LM_DEBUG
,
1155 ACE_TEXT ("(%t) Purger thread calling Sender::close() ")
1156 ACE_TEXT ("for handle %d\n"),
1162 ACE_DEBUG ((LM_DEBUG
,
1163 ACE_TEXT("(%t) Purger_Thread::svc terminating\n")));
1169 testing (ACE_Reactor
*reactor
,
1170 int make_invocations
,
1171 int run_event_loop_thread
,
1172 int run_purger_thread
,
1173 int run_receiver_thread
,
1176 ACE_DEBUG ((LM_DEBUG
,
1177 ACE_TEXT ("\n(%t) Configuration:\n")
1178 ACE_TEXT ("\tInvocation thread = %d\n")
1179 ACE_TEXT ("\tEvent Loop thread = %d\n")
1180 ACE_TEXT ("\tPurger thread = %d\n")
1181 ACE_TEXT ("\tReceiver thread = %d\n")
1182 ACE_TEXT ("\tNested Upcalls = %d\n\n"),
1184 run_event_loop_thread
,
1186 run_receiver_thread
,
1189 ACE_Thread_Manager thread_manager
;
1193 // Create the connection cache.
1194 Connection_Cache connection_cache
;
1195 ACE_Auto_Event new_connection_event
;
1197 // Create the invocation thread.
1198 Invocation_Thread
invocation_thread (thread_manager
,
1201 new_connection_event
,
1203 run_receiver_thread
,
1207 invocation_thread
.activate ();
1208 ACE_TEST_ASSERT (result
== 0);
1210 // Create the thread for closing the server socket.
1211 Close_Socket_Thread
close_socket_thread (thread_manager
,
1213 new_connection_event
,
1215 run_receiver_thread
);
1217 close_socket_thread
.activate ();
1218 ACE_TEST_ASSERT (result
== 0);
1220 global_event_loop_thread_variable
= 0;
1222 // Create a thread to run the event loop.
1223 Event_Loop_Thread
event_loop_thread (thread_manager
,
1225 if (run_event_loop_thread
)
1227 global_event_loop_thread_variable
=
1231 event_loop_thread
.activate ();
1232 ACE_TEST_ASSERT (result
== 0);
1235 // Create a thread to run the purger.
1236 Purger_Thread
purger_thread (thread_manager
,
1239 if (run_purger_thread
)
1242 purger_thread
.activate ();
1243 ACE_TEST_ASSERT (result
== 0);
1246 // Wait for threads to exit.
1247 result
= thread_manager
.wait ();
1248 ACE_TEST_ASSERT (result
== 0);
1250 // Set the global variable to zero again because the
1251 // event_loop_thread exists on the stack and now
1253 global_event_loop_thread_variable
= 0;
1256 template <class REACTOR_IMPL
>
1260 test (int ignore_nested_upcalls
,
1261 int require_event_loop_thread
);
1264 template <class REACTOR_IMPL
>
1265 test
<REACTOR_IMPL
>::test (int ignore_nested_upcalls
,
1266 int require_event_loop_thread
)
1269 i
< (int) (sizeof test_configs
/ (sizeof (int) * number_of_options
));
1272 if ((make_invocations
== -1 ||
1273 make_invocations
== test_configs
[i
][0]) &&
1274 (run_event_loop_thread
== -1 ||
1275 run_event_loop_thread
== test_configs
[i
][1]) &&
1276 (run_purger_thread
== -1 ||
1277 run_purger_thread
== test_configs
[i
][2]) &&
1278 (run_receiver_thread
== -1 ||
1279 run_receiver_thread
== test_configs
[i
][3]) &&
1280 (nested_upcalls
== -1 ||
1281 nested_upcalls
== test_configs
[i
][4]))
1284 #if 0 /* defined (ACE_LINUX) */
1286 // @@ I am not sure why but when <make_invocations> is 0 and
1287 // there is no purger thread, the receiver thread does not
1288 // notice that the connection has been closed.
1289 if (!test_configs
[i
][0] && !test_configs
[i
][2])
1292 // @@ Linux also does not work correctly in the following
1293 // case: Invocation thread starts and sends messages filling
1294 // the socket buffer. It then blocks in write(). In the
1295 // meantime, the close connection thread closes the socket
1296 // used by invocation thread. However, the invocation thread
1297 // does not notice this as it does not return from write().
1298 // Meanwhile, the event loop thread notices that a socket in
1299 // it's wait set has been closed, and starts to spin in
1300 // handle_events() since the invocation thread is not taking
1301 // out the closed handle from the Reactor's wait set.
1302 if (test_configs
[i
][0] && test_configs
[i
][1] && !test_configs
[i
][3])
1307 if (test_configs
[i
][4] && ignore_nested_upcalls
)
1310 if (!test_configs
[i
][1] && require_event_loop_thread
)
1313 ACE_Reactor
reactor (new REACTOR_IMPL
, true);
1320 test_configs
[i
][4]);
1326 parse_args (int argc
, ACE_TCHAR
*argv
[])
1328 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("a:b:c:d:f:g:k:l:m:n:o:uz:"));
1331 while ((cc
= get_opt ()) != -1)
1336 test_select_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1339 test_tp_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1342 test_wfmo_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1345 test_dev_poll_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1348 number_of_connections
= ACE_OS::atoi (get_opt
.opt_arg ());
1351 close_timeout
= ACE_OS::atoi (get_opt
.opt_arg ());
1354 make_invocations
= ACE_OS::atoi (get_opt
.opt_arg ());
1357 run_event_loop_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1360 run_purger_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1363 run_receiver_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1366 nested_upcalls
= ACE_OS::atoi (get_opt
.opt_arg ());
1369 debug
= ACE_OS::atoi (get_opt
.opt_arg ());
1373 ACE_ERROR ((LM_ERROR
,
1374 ACE_TEXT ("\nusage: %s \n\n")
1375 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
1376 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
1377 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
1378 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
1379 ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n")
1380 ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n")
1381 ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n")
1382 ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n")
1383 ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n")
1384 ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n")
1385 ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n")
1386 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
1389 test_select_reactor
,
1392 test_dev_poll_reactor
,
1393 number_of_connections
,
1396 run_event_loop_thread
,
1398 run_receiver_thread
,
1409 run_main (int argc
, ACE_TCHAR
*argv
[])
1411 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1413 // Validate options.
1415 parse_args (argc
, argv
);
1419 disable_signal (SIGPIPE
, SIGPIPE
);
1421 int ignore_nested_upcalls
= 1;
1422 int perform_nested_upcalls
= 0;
1424 int event_loop_thread_required
= 1;
1425 int event_loop_thread_not_required
= 0;
1427 if (test_select_reactor
)
1429 ACE_DEBUG ((LM_DEBUG
,
1430 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
1432 test
<ACE_Select_Reactor
> test (ignore_nested_upcalls
,
1433 event_loop_thread_not_required
);
1434 ACE_UNUSED_ARG (test
);
1437 if (test_tp_reactor
)
1439 ACE_DEBUG ((LM_DEBUG
,
1440 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
1442 test
<ACE_TP_Reactor
> test (perform_nested_upcalls
,
1443 event_loop_thread_not_required
);
1444 ACE_UNUSED_ARG (test
);
1447 #if defined (ACE_HAS_EVENT_POLL)
1449 if (test_dev_poll_reactor
)
1451 ACE_DEBUG ((LM_DEBUG
,
1452 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
1454 test
<ACE_Dev_Poll_Reactor
> test (perform_nested_upcalls
,
1455 event_loop_thread_not_required
);
1456 ACE_UNUSED_ARG (test
);
1461 #if defined (ACE_WIN32)
1463 if (test_wfmo_reactor
)
1465 ACE_DEBUG ((LM_DEBUG
,
1466 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
1468 test
<ACE_WFMO_Reactor
> test (ignore_nested_upcalls
,
1469 event_loop_thread_required
);
1470 ACE_UNUSED_ARG (test
);
1473 #else /* ACE_WIN32 */
1475 ACE_UNUSED_ARG (event_loop_thread_required
);
1477 #endif /* ACE_WIN32 */
1484 #else /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
1487 run_main (int, ACE_TCHAR
*[])
1489 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1491 ACE_ERROR ((LM_INFO
,
1492 ACE_TEXT ("threads/accept not supported on this platform\n")));
1499 #endif /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */