2 //=============================================================================
4 * @file MT_Reference_Counted_Event_Handler_Test.cpp
6 * This test tries to represents what happens in the ORB wrt to
7 * event handlers, reactors, timer queues, threads, and connection
8 * caches, minus the other complexities. The following reactors
9 * are tested: Select, TP, WFMO, and Dev Poll (if enabled).
11 * The test checks proper use and shutting down of client-side
12 * event handlers when it is used by invocation threads and/or
13 * event loop threads. Server-side event handlers are either
14 * threaded or reactive. A purger thread is introduced to check the
15 * connection recycling and cache purging. Nested upcalls are also
18 * @author Irfan Pyarali <irfan@oomworks.com>
20 //=============================================================================
22 #include "test_config.h"
23 #include "ace/Reactor.h"
24 #include "ace/Select_Reactor.h"
25 #include "ace/TP_Reactor.h"
26 #include "ace/WFMO_Reactor.h"
27 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Get_Opt.h"
30 #include "ace/SOCK_Acceptor.h"
31 #include "ace/SOCK_Connector.h"
32 #include "ace/Auto_Event.h"
33 #include "ace/OS_NS_signal.h"
34 #include "ace/OS_NS_time.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/OS_NS_unistd.h"
38 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
40 static const char message
[] = "abcdefghijklmnopqrstuvwxyz";
41 static const int message_size
= 26;
42 static int test_select_reactor
= 1;
43 static int test_tp_reactor
= 1;
44 static int test_wfmo_reactor
= 1;
45 static int test_dev_poll_reactor
= 1;
47 static int number_of_connections
= 5;
48 static int max_nested_upcall_level
= 10;
49 static int close_timeout
= 500;
50 static int pipe_open_attempts
= 10;
51 static int pipe_retry_timeout
= 1;
52 static int make_invocations
= -1;
53 static int run_event_loop_thread
= -1;
54 static int run_purger_thread
= -1;
55 static int run_receiver_thread
= -1;
56 static int nested_upcalls
= -1;
58 static ACE_HANDLE server_handle
= ACE_INVALID_HANDLE
;
59 static ACE_HANDLE client_handle
= ACE_INVALID_HANDLE
;
61 static int number_of_options
= 5;
62 static int test_configs
[][5] =
65 // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls
68 // { 0, 0, 0, 0, 0, }, // At least one thread should be running.
69 // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
70 // no thread will know that the socket is closed.
71 // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
72 // we cannot decide which socket to close.
73 // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
74 // no thread will know that the socket is closed.
75 // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
76 // we cannot decide which socket to close.
78 // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations.
79 // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
80 // we cannot decide which socket to close.
82 // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations.
83 // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
84 // no thread can receive the messages.
86 // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
87 // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
88 // no thread can receive the messages.
90 // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
94 // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
98 // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
102 disable_signal (int sigmin
, int sigmax
)
104 #if !defined (ACE_LACKS_UNIX_SIGNALS)
106 if (ACE_OS::sigemptyset (&signal_set
) == - 1)
107 ACE_ERROR ((LM_ERROR
,
108 ACE_TEXT ("Error: (%P|%t):%p\n"),
109 ACE_TEXT ("sigemptyset failed")));
111 for (int i
= sigmin
; i
<= sigmax
; i
++)
112 ACE_OS::sigaddset (&signal_set
, i
);
114 // Put the <signal_set>.
115 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
116 // In multi-threaded application this is not POSIX compliant
117 // but let's leave it just in case.
118 if (ACE_OS::sigprocmask (SIG_BLOCK
, &signal_set
, 0) != 0)
120 if (ACE_OS::thr_sigsetmask (SIG_BLOCK
, &signal_set
, 0) != 0)
121 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
122 ACE_ERROR_RETURN ((LM_ERROR
,
123 ACE_TEXT ("Error: (%P|%t): %p\n"),
124 ACE_TEXT ("SIG_BLOCK failed")),
127 ACE_UNUSED_ARG (sigmin
);
128 ACE_UNUSED_ARG (sigmax
);
129 #endif /* ACE_LACKS_UNIX_SIGNALS */
134 /* Replication of the ACE_Pipe class. Only difference is that this
135 class always uses two sockets to create the pipe, even on platforms
136 that support pipes. */
143 //FUZZ: disable check_for_lack_ACE_OS
144 ///FUZZ: enable check_for_lack_ACE_OS
147 ACE_HANDLE
read_handle () const;
149 ACE_HANDLE
write_handle () const;
152 ACE_HANDLE handles_
[2];
158 ACE_INET_Addr my_addr
;
159 ACE_SOCK_Acceptor acceptor
;
160 ACE_SOCK_Connector connector
;
161 ACE_SOCK_Stream reader
;
162 ACE_SOCK_Stream writer
;
165 // Bind listener to any port and then find out what the port was.
166 if (acceptor
.open (ACE_Addr::sap_any
) == -1
167 || acceptor
.get_local_addr (my_addr
) == -1)
171 int af
= my_addr
.get_type ();
172 const ACE_TCHAR
*local
= ACE_LOCALHOST
;
173 #if defined (ACE_HAS_IPV6)
175 local
= ACE_IPV6_LOCALHOST
;
176 #endif /* ACE_HAS_IPV6 */
177 ACE_INET_Addr
sv_addr (my_addr
.get_port_number (),
181 // Establish a connection within the same process.
182 if (connector
.connect (writer
, sv_addr
) == -1)
184 else if (acceptor
.accept (reader
) == -1)
191 // Close down the acceptor endpoint since we don't need it anymore.
196 this->handles_
[0] = reader
.get_handle ();
197 this->handles_
[1] = writer
.get_handle ();
204 this->handles_
[0] = ACE_INVALID_HANDLE
;
205 this->handles_
[1] = ACE_INVALID_HANDLE
;
209 Pipe::read_handle () const
211 return this->handles_
[0];
215 Pipe::write_handle () const
217 return this->handles_
[1];
220 class Connection_Cache
;
221 class Event_Loop_Thread
;
223 static Event_Loop_Thread
*global_event_loop_thread_variable
= 0;
225 class Sender
: public ACE_Event_Handler
228 Sender (ACE_HANDLE handle
,
229 Connection_Cache
&connection_cache
);
233 int handle_input (ACE_HANDLE
) override
;
235 ssize_t
send_message ();
237 //FUZZ: disable check_for_lack_ACE_OS
238 ///FUZZ: enable check_for_lack_ACE_OS
243 Connection_Cache
&connection_cache_
;
246 class Connection_Cache
251 ~Connection_Cache ();
253 void add_connection (Sender
*sender
);
255 void remove_connection (Sender
*sender
);
257 Sender
*acquire_connection ();
259 void release_connection (Sender
*sender
);
261 int find (Sender
*sender
);
263 ACE_SYNCH_MUTEX
&lock ();
280 ACE_SYNCH_MUTEX lock_
;
283 Sender::Sender (ACE_HANDLE handle
,
284 Connection_Cache
&connection_cache
)
286 connection_cache_ (connection_cache
)
288 // Enable reference counting.
289 this->reference_counting_policy ().value
290 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
293 ACE_DEBUG ((LM_DEBUG
,
294 ACE_TEXT ("(%t) Reference count in Sender::Sender() is %d\n"),
295 this->reference_count_
.load ()));
301 ACE_DEBUG ((LM_DEBUG
,
302 ACE_TEXT ("(%t) Reference count in ~Sender::Sender() is %d\n"),
303 this->reference_count_
.load ()));
305 // Close the socket that we are responsible for.
306 ACE_OS::closesocket (this->handle_
);
310 Sender::handle_input (ACE_HANDLE
)
313 ACE_DEBUG ((LM_DEBUG
,
314 ACE_TEXT ("(%t) Reference count in Sender::handle_input() is %d\n"),
315 this->reference_count_
.load ()));
318 // In this test, this method is only called when the connection has
319 // been closed. Remove self from Reactor.
322 ACE_DEBUG ((LM_DEBUG
,
323 ACE_TEXT ("(%t) Event loop thread calling Sender::close() ")
324 ACE_TEXT ("for handle %d\n"),
335 // Remove socket from Reactor (may fail if another thread has already
336 // removed the handle from the Reactor).
337 if (this->reactor() != 0)
338 this->reactor ()->remove_handler (this->handle_
,
339 ACE_Event_Handler::ALL_EVENTS_MASK
);
341 // Remove self from connection cache (may fail if another thread has
342 // already removed "this" from the cache).
343 this->connection_cache_
.remove_connection (this);
347 Sender::send_message ()
349 ACE_Time_Value
timeout (0, close_timeout
* 1000);
351 return ACE::send_n (this->handle_
,
357 class Event_Loop_Thread
: public ACE_Task_Base
360 Event_Loop_Thread (ACE_Thread_Manager
&thread_manager
,
361 ACE_Reactor
&reactor
);
365 ACE_Reactor
&reactor_
;
368 class Receiver
: public ACE_Task_Base
371 Receiver (ACE_Thread_Manager
&thread_manager
,
375 ~Receiver () override
;
379 //FUZZ: disable check_for_lack_ACE_OS
380 ///FUZZ: enable check_for_lack_ACE_OS
381 int close (u_long flags
) override
;
383 int handle_input (ACE_HANDLE
) override
;
385 int resume_handler () override
;
393 int nested_upcalls_level_
;
396 Receiver::Receiver (ACE_Thread_Manager
&thread_manager
,
399 : ACE_Task_Base (&thread_manager
),
402 nested_upcalls_ (nested_upcalls
),
403 nested_upcalls_level_ (0)
405 // Enable reference counting.
406 this->reference_counting_policy ().value
407 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED
);
410 ACE_DEBUG ((LM_DEBUG
,
411 ACE_TEXT ("(%t) Reference count in Receiver::Receiver() is %d\n"),
412 this->reference_count_
.load ()));
415 Receiver::~Receiver ()
418 ACE_DEBUG ((LM_DEBUG
,
419 ACE_TEXT ("(%t) Reference count in ~Receiver::Receiver() is %d\n"),
420 this->reference_count_
.load ()));
422 // Close the socket that we are responsible for.
423 ACE_OS::closesocket (this->handle_
);
430 // Continuously receive messages from the Sender. On error, exit
435 ACE_DEBUG ((LM_DEBUG
,
436 ACE_TEXT("(%t) Receiver::svc commencing, handle = %d\n"),
439 disable_signal (SIGPIPE
, SIGPIPE
);
444 this->handle_input (this->handle_
);
446 ACE_DEBUG ((LM_DEBUG
,
447 ACE_TEXT("(%t) Receiver::svc terminating, handle = %d\n"),
453 Receiver::handle_input (ACE_HANDLE handle
)
455 char buf
[message_size
+ 1];
457 ACE_Time_Value
timeout (0, close_timeout
* 1000);
466 if (debug
&& result
< 1)
467 ACE_DEBUG ((LM_DEBUG
,
468 ACE_TEXT("(%t) Receiver::handle input, ")
469 ACE_TEXT("h = %d, result = %d %p\n"),
470 handle_
, result
, ACE_TEXT("ACE::recv_n")));
472 if (this->reactor ())
473 this->reactor ()->resume_handler (handle
);
475 if (result
== message_size
)
478 ACE_DEBUG ((LM_DEBUG
,
479 ACE_TEXT ("(%t) Message %d received on handle %d\n"),
483 if (this->thr_count () == 0 &&
484 this->nested_upcalls_
)
486 this->nested_upcalls_level_
++;
489 ACE_DEBUG ((LM_DEBUG
,
490 ACE_TEXT ("(%t) Nesting level %d\n"),
491 this->nested_upcalls_level_
));
493 if ((this->nested_upcalls_level_
!= max_nested_upcall_level
) &&
494 (global_event_loop_thread_variable
!= 0))
495 global_event_loop_thread_variable
->svc ();
497 this->nested_upcalls_level_
--;
506 ACE_DEBUG ((LM_DEBUG
,
507 ACE_TEXT ("(%t) /*** Problem in receiving message %d on handle")
508 ACE_TEXT (" %d: shutting down receiving thread ***/\n"),
517 Receiver::resume_handler ()
519 /// The application takes responsibility of resuming the handler.
520 return ACE_APPLICATION_RESUMES_HANDLER
;
524 Receiver::close (u_long
)
526 // If threaded, we are responsible for deleting this instance when
527 // the thread completes. If not threaded, Reactor reference
528 // counting will handle the deletion of this instance.
536 Connector (ACE_Thread_Manager
&thread_manager
,
537 ACE_Reactor
&reactor
,
540 //FUZZ: disable check_for_lack_ACE_OS
541 ///FUZZ: enable check_for_lack_ACE_OS
542 int connect (ACE_HANDLE
&client_handle
,
543 ACE_HANDLE
&server_handle
,
544 int run_receiver_thread
);
546 ACE_Thread_Manager
&thread_manager_
;
548 ACE_Reactor
&reactor_
;
553 Connector::Connector (ACE_Thread_Manager
&thread_manager
,
554 ACE_Reactor
&reactor
,
556 : thread_manager_ (thread_manager
),
558 nested_upcalls_ (nested_upcalls
)
563 Connector::connect (ACE_HANDLE
&client_handle
,
564 ACE_HANDLE
&server_handle
,
565 int run_receiver_thread
)
568 // Create a connection and a receiver to receive messages on the
575 for (int i
= 0; i
< pipe_open_attempts
; ++i
)
584 ACE_OS::sleep (pipe_retry_timeout
);
587 ACE_TEST_ASSERT (result
== 0);
588 ACE_UNUSED_ARG (result
);
591 new Receiver (this->thread_manager_
,
592 pipe
.write_handle (),
593 this->nested_upcalls_
);
595 // Either the receiver is threaded or register it with the Reactor.
596 if (run_receiver_thread
)
598 receiver
->activate ();
602 this->reactor_
.register_handler (pipe
.write_handle (),
604 ACE_Event_Handler::READ_MASK
);
606 // The reference count on the receiver was increased by the
608 ACE_Event_Handler_var
safe_receiver (receiver
);
611 ACE_TEST_ASSERT (result
== 0);
612 ACE_UNUSED_ARG (result
);
618 pipe
.write_handle ();
621 ACE_DEBUG ((LM_DEBUG
,
622 ACE_TEXT ("(%t) New connection: client handle = %d, ")
623 ACE_TEXT ("server handle = %d\n"),
624 client_handle
, server_handle
));
629 Connection_Cache::Connection_Cache ()
631 // Initialize the connection cache.
633 new Entry
[number_of_connections
];
635 for (int i
= 0; i
< number_of_connections
; ++i
)
637 this->entries_
[i
].sender_
= 0;
638 this->entries_
[i
].state_
= NOT_IN_CACHE
;
643 Connection_Cache::find (Sender
*sender
)
645 for (int i
= 0; i
< number_of_connections
; ++i
)
647 if (this->entries_
[i
].sender_
== sender
)
655 Connection_Cache::add_connection (Sender
*sender
)
657 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
659 // Make sure that the state of the connection cache is as
660 // expected. <sender> should not be already in the cache.
661 ACE_TEST_ASSERT (this->find (sender
) == -1);
666 sender
->add_reference ();
667 this->entries_
[empty_index
].sender_
= sender
;
668 this->entries_
[empty_index
].state_
= BUSY
;
672 Connection_Cache::remove_connection (Sender
*sender
)
674 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
676 // Make sure that the state of the connection cache is as expected.
677 // remove_connection() may already have been called.
684 // If we still have the sender, remove it.
685 sender
->remove_reference ();
686 this->entries_
[index
].sender_
= 0;
687 this->entries_
[index
].state_
= NOT_IN_CACHE
;
691 Connection_Cache::acquire_connection ()
693 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0);
695 // Find a valid and IDLE sender.
699 for (int i
= 0; i
< number_of_connections
; ++i
)
701 if (this->entries_
[i
].sender_
&&
702 this->entries_
[i
].state_
== IDLE
)
709 this->entries_
[index
].sender_
->add_reference ();
710 this->entries_
[index
].state_
= BUSY
;
712 return this->entries_
[index
].sender_
;
716 Connection_Cache::release_connection (Sender
*sender
)
718 ACE_GUARD (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
);
720 // Make sure that the state of the connection cache is as expected.
721 // remove_connection() may have already removed the connection from
729 // If we still have the sender, idle it.
730 this->entries_
[index
].state_
= IDLE
;
734 Connection_Cache::lock ()
739 Connection_Cache::~Connection_Cache ()
741 for (int i
= 0; i
< number_of_connections
; ++i
)
743 if (this->entries_
[i
].sender_
)
744 this->remove_connection (this->entries_
[i
].sender_
);
747 delete[] this->entries_
;
750 class Invocation_Thread
: public ACE_Task_Base
753 Invocation_Thread (ACE_Thread_Manager
&thread_manager
,
754 ACE_Reactor
&reactor
,
755 Connection_Cache
&connection_cache
,
756 ACE_Auto_Event
&new_connection_event
,
757 int make_invocations
,
758 int run_receiver_thread
,
763 Sender
*create_connection ();
765 Connection_Cache
&connection_cache_
;
767 ACE_Reactor
&reactor_
;
769 ACE_Thread_Manager
&thread_manager_
;
771 ACE_Auto_Event
&new_connection_event_
;
773 int make_invocations_
;
775 int run_receiver_thread_
;
780 Invocation_Thread::Invocation_Thread (ACE_Thread_Manager
&thread_manager
,
781 ACE_Reactor
&reactor
,
782 Connection_Cache
&connection_cache
,
783 ACE_Auto_Event
&new_connection_event
,
784 int make_invocations
,
785 int run_receiver_thread
,
787 : ACE_Task_Base (&thread_manager
),
788 connection_cache_ (connection_cache
),
790 thread_manager_ (thread_manager
),
791 new_connection_event_ (new_connection_event
),
792 make_invocations_ (make_invocations
),
793 run_receiver_thread_ (run_receiver_thread
),
794 nested_upcalls_ (nested_upcalls
)
799 Invocation_Thread::create_connection ()
803 // Connector for creating new connections.
804 Connector
connector (this->thread_manager_
,
806 this->nested_upcalls_
);
808 // <server_handle> is a global variable. It will be used later by
809 // the Close_Socket_Thread.
811 connector
.connect (client_handle
,
813 this->run_receiver_thread_
);
814 ACE_TEST_ASSERT (result
== 0);
815 ACE_UNUSED_ARG (result
);
817 // Create a new sender.
819 new Sender (client_handle
,
820 this->connection_cache_
);
822 // Register it with the cache.
823 this->connection_cache_
.add_connection (sender
);
826 // There might be a race condition here. The sender has been added
827 // to the cache and is potentially available to other threads
828 // accessing the cache. Therefore, the other thread may use this
829 // sender and potentially close the sender before it even gets
830 // registered with the Reactor.
832 // This is resolved by marking the connection as busy when it is
833 // first added to the cache. And only once the thread creating the
834 // connection is done with it, it is marked a available in the
837 // This order of registration is important.
840 // Register the handle with the Reactor.
842 this->reactor_
.register_handler (client_handle
,
844 ACE_Event_Handler::READ_MASK
);
846 ACE_TEST_ASSERT (result
== 0);
847 ACE_UNUSED_ARG (result
);
850 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("(%t) create_connection h %d, %p\n"),
852 ACE_TEXT ("register_handler")));
858 Invocation_Thread::svc ()
860 int connection_counter
= 0;
861 ACE_DEBUG ((LM_DEBUG
,
862 ACE_TEXT("(%t) Invocation_Thread::svc commencing\n")));
864 disable_signal (SIGPIPE
, SIGPIPE
);
866 for (int message_counter
= 1;; ++message_counter
)
868 // Get a connection from the cache.
870 this->connection_cache_
.acquire_connection ();
872 // If no connection is available in the cache, create a new one.
875 if (connection_counter
< number_of_connections
)
877 sender
= this->create_connection ();
879 // This lets the Close_Socket_Thread know that the new
880 // connection has been created.
882 this->new_connection_event_
.signal ();
883 ACE_TEST_ASSERT (result
== 0);
884 ACE_UNUSED_ARG (result
);
886 ++connection_counter
;
890 // Stop the thread, if the maximum number of connections
891 // for the test has been reached.
895 // The reference count on the sender was increased by the cache
896 // before it was returned to us.
897 ACE_Event_Handler_var
safe_sender (sender
);
899 // If the test does not require making invocations, immediately
900 // release the connection.
901 if (!this->make_invocations_
)
903 this->connection_cache_
.release_connection (sender
);
905 // Sleep for a short while
906 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
912 sender
->send_message ();
914 // If successful, release connection.
915 if (result
== message_size
)
918 ACE_DEBUG ((LM_DEBUG
,
919 ACE_TEXT ("(%t) Message %d:%d delivered on handle %d\n"),
924 this->connection_cache_
.release_connection (sender
);
928 // If failure in making invocation, close the sender.
930 ACE_DEBUG ((LM_DEBUG
,
931 ACE_TEXT ("(%t) /*** Problem in delivering message ")
932 ACE_TEXT ("%d:%d on handle %d: shutting down ")
933 ACE_TEXT ("invocation thread ***/\n"),
938 ACE_DEBUG ((LM_DEBUG
,
939 ACE_TEXT ("(%t) Invocation thread calling ")
940 ACE_TEXT ("Sender::close() for handle %d\n"),
947 ACE_DEBUG ((LM_DEBUG
,
948 ACE_TEXT("(%t) Invocation_Thread::svc calling end_reactor_event_loop\n")));
950 // Close the Reactor event loop.
951 this->reactor_
.end_reactor_event_loop ();
952 ACE_DEBUG ((LM_DEBUG
,
953 ACE_TEXT("(%t) Invocation_Thread::svc terminating\n")));
958 class Close_Socket_Thread
: public ACE_Task_Base
961 Close_Socket_Thread (ACE_Thread_Manager
&thread_manager
,
962 ACE_Reactor
&reactor
,
963 ACE_Auto_Event
&new_connection_event
,
964 int make_invocations
,
965 int run_receiver_thread
);
969 ACE_Auto_Event
&new_connection_event_
;
971 ACE_Reactor
&reactor_
;
973 int make_invocations_
;
975 int run_receiver_thread_
;
978 Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager
&thread_manager
,
979 ACE_Reactor
&reactor
,
980 ACE_Auto_Event
&new_connection_event
,
981 int make_invocations
,
982 int run_receiver_thread
)
983 : ACE_Task_Base (&thread_manager
),
984 new_connection_event_ (new_connection_event
),
986 make_invocations_ (make_invocations
),
987 run_receiver_thread_ (run_receiver_thread
)
992 Close_Socket_Thread::svc ()
994 unsigned int seed
= (unsigned int) ACE_OS::time ();
995 ACE_Time_Value
timeout (0, close_timeout
* 1000);
996 ACE_DEBUG ((LM_DEBUG
,
997 ACE_TEXT("(%t) Close_Socket_Thread::svc commencing\n")));
999 disable_signal (SIGPIPE
, SIGPIPE
);
1001 for (; !this->reactor_
.reactor_event_loop_done ();)
1003 // Wait for the new connection to be established.
1005 this->new_connection_event_
.wait (&timeout
,
1007 ACE_TEST_ASSERT (result
== 0 ||
1008 (result
== -1 && errno
== ETIME
));
1014 // Sleep for half a second.
1015 ACE_OS::sleep (timeout
);
1017 int close_client
= 0;
1019 // If the invocation thread is making invocations and if the
1020 // receiver is threaded, either socket can be closed.
1021 if (this->make_invocations_
&&
1022 this->run_receiver_thread_
)
1023 // Randomize which socket to close.
1024 close_client
= ACE_OS::rand_r (&seed
) % 2;
1026 // If the invocation thread is making invocations, only close
1027 // the client socket.
1028 else if (this->make_invocations_
)
1031 // If the receiver is threaded, only close the server socket.
1036 // Close the client socket.
1038 ACE_DEBUG ((LM_DEBUG
,
1039 ACE_TEXT ("(%t) Close socket thread closing client ")
1040 ACE_TEXT ("handle %d\n"),
1043 ACE_OS::shutdown (client_handle
, ACE_SHUTDOWN_BOTH
);
1044 ACE_OS::closesocket (client_handle
);
1048 // Close the server socket.
1050 ACE_DEBUG ((LM_DEBUG
,
1051 ACE_TEXT ("(%t) Close socket thread closing server ")
1052 ACE_TEXT ("handle %d\n"),
1054 ACE_OS::shutdown (server_handle
, ACE_SHUTDOWN_BOTH
);
1055 ACE_OS::closesocket (server_handle
);
1058 ACE_DEBUG ((LM_DEBUG
,
1059 ACE_TEXT("(%t) Close_Socket_Thread::svc terminating\n")));
1064 Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager
&thread_manager
,
1065 ACE_Reactor
&reactor
)
1066 : ACE_Task_Base (&thread_manager
),
1072 Event_Loop_Thread::svc ()
1074 // Simply run the event loop.
1075 this->reactor_
.owner (ACE_Thread::self ());
1076 ACE_DEBUG ((LM_DEBUG
,
1077 ACE_TEXT("(%t) Event_Loop_Thread::svc commencing\n")));
1079 disable_signal (SIGPIPE
, SIGPIPE
);
1081 while (!this->reactor_
.reactor_event_loop_done ())
1083 this->reactor_
.handle_events ();
1085 ACE_DEBUG ((LM_DEBUG
,
1086 ACE_TEXT("(%t) Event_Loop_Thread::svc terminating\n")));
1091 class Purger_Thread
: public ACE_Task_Base
1094 Purger_Thread (ACE_Thread_Manager
&thread_manager
,
1095 ACE_Reactor
&reactor
,
1096 Connection_Cache
&connection_cache
);
1098 int svc () override
;
1100 ACE_Reactor
&reactor_
;
1102 Connection_Cache
&connection_cache_
;
1105 Purger_Thread::Purger_Thread (ACE_Thread_Manager
&thread_manager
,
1106 ACE_Reactor
&reactor
,
1107 Connection_Cache
&connection_cache
)
1108 : ACE_Task_Base (&thread_manager
),
1110 connection_cache_ (connection_cache
)
1115 Purger_Thread::svc ()
1117 ACE_DEBUG ((LM_DEBUG
,
1118 ACE_TEXT("(%t) Purger_Thread::svc commencing\n")));
1120 disable_signal (SIGPIPE
, SIGPIPE
);
1122 for (; !this->reactor_
.reactor_event_loop_done ();)
1124 // Get a connection from the cache.
1126 this->connection_cache_
.acquire_connection ();
1128 // If no connection is available in the cache, sleep for a while.
1130 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
1133 // The reference count on the sender was increased by the
1134 // cache before it was returned to us.
1135 ACE_Event_Handler_var
safe_sender (sender
);
1137 // Actively close the connection.
1138 ACE_DEBUG ((LM_DEBUG
,
1139 ACE_TEXT ("(%t) Purger thread calling Sender::close() ")
1140 ACE_TEXT ("for handle %d\n"),
1146 ACE_DEBUG ((LM_DEBUG
,
1147 ACE_TEXT("(%t) Purger_Thread::svc terminating\n")));
1153 testing (ACE_Reactor
*reactor
,
1154 int make_invocations
,
1155 int run_event_loop_thread
,
1156 int run_purger_thread
,
1157 int run_receiver_thread
,
1160 ACE_DEBUG ((LM_DEBUG
,
1161 ACE_TEXT ("\n(%t) Configuration:\n")
1162 ACE_TEXT ("\tInvocation thread = %d\n")
1163 ACE_TEXT ("\tEvent Loop thread = %d\n")
1164 ACE_TEXT ("\tPurger thread = %d\n")
1165 ACE_TEXT ("\tReceiver thread = %d\n")
1166 ACE_TEXT ("\tNested Upcalls = %d\n\n"),
1168 run_event_loop_thread
,
1170 run_receiver_thread
,
1173 ACE_Thread_Manager thread_manager
;
1177 // Create the connection cache.
1178 Connection_Cache connection_cache
;
1179 ACE_Auto_Event new_connection_event
;
1181 // Create the invocation thread.
1182 Invocation_Thread
invocation_thread (thread_manager
,
1185 new_connection_event
,
1187 run_receiver_thread
,
1191 invocation_thread
.activate ();
1192 ACE_TEST_ASSERT (result
== 0);
1194 // Create the thread for closing the server socket.
1195 Close_Socket_Thread
close_socket_thread (thread_manager
,
1197 new_connection_event
,
1199 run_receiver_thread
);
1201 close_socket_thread
.activate ();
1202 ACE_TEST_ASSERT (result
== 0);
1204 global_event_loop_thread_variable
= 0;
1206 // Create a thread to run the event loop.
1207 Event_Loop_Thread
event_loop_thread (thread_manager
,
1209 if (run_event_loop_thread
)
1211 global_event_loop_thread_variable
=
1215 event_loop_thread
.activate ();
1216 ACE_TEST_ASSERT (result
== 0);
1219 // Create a thread to run the purger.
1220 Purger_Thread
purger_thread (thread_manager
,
1223 if (run_purger_thread
)
1226 purger_thread
.activate ();
1227 ACE_TEST_ASSERT (result
== 0);
1230 // Wait for threads to exit.
1231 result
= thread_manager
.wait ();
1232 ACE_TEST_ASSERT (result
== 0);
1234 // Set the global variable to zero again because the
1235 // event_loop_thread exists on the stack and now
1237 global_event_loop_thread_variable
= 0;
1240 template <class REACTOR_IMPL
>
1244 test (int ignore_nested_upcalls
,
1245 int require_event_loop_thread
);
1248 template <class REACTOR_IMPL
>
1249 test
<REACTOR_IMPL
>::test (int ignore_nested_upcalls
,
1250 int require_event_loop_thread
)
1253 i
< (int) (sizeof test_configs
/ (sizeof (int) * number_of_options
));
1256 if ((make_invocations
== -1 ||
1257 make_invocations
== test_configs
[i
][0]) &&
1258 (run_event_loop_thread
== -1 ||
1259 run_event_loop_thread
== test_configs
[i
][1]) &&
1260 (run_purger_thread
== -1 ||
1261 run_purger_thread
== test_configs
[i
][2]) &&
1262 (run_receiver_thread
== -1 ||
1263 run_receiver_thread
== test_configs
[i
][3]) &&
1264 (nested_upcalls
== -1 ||
1265 nested_upcalls
== test_configs
[i
][4]))
1267 #if 0 /* defined (ACE_LINUX) */
1269 // @@ I am not sure why but when <make_invocations> is 0 and
1270 // there is no purger thread, the receiver thread does not
1271 // notice that the connection has been closed.
1272 if (!test_configs
[i
][0] && !test_configs
[i
][2])
1275 // @@ Linux also does not work correctly in the following
1276 // case: Invocation thread starts and sends messages filling
1277 // the socket buffer. It then blocks in write(). In the
1278 // meantime, the close connection thread closes the socket
1279 // used by invocation thread. However, the invocation thread
1280 // does not notice this as it does not return from write().
1281 // Meanwhile, the event loop thread notices that a socket in
1282 // it's wait set has been closed, and starts to spin in
1283 // handle_events() since the invocation thread is not taking
1284 // out the closed handle from the Reactor's wait set.
1285 if (test_configs
[i
][0] && test_configs
[i
][1] && !test_configs
[i
][3])
1290 if (test_configs
[i
][4] && ignore_nested_upcalls
)
1293 if (!test_configs
[i
][1] && require_event_loop_thread
)
1296 ACE_Reactor
reactor (new REACTOR_IMPL
, true);
1303 test_configs
[i
][4]);
1309 parse_args (int argc
, ACE_TCHAR
*argv
[])
1311 ACE_Get_Opt
get_opt (argc
, argv
, ACE_TEXT ("a:b:c:d:f:g:k:l:m:n:o:uz:"));
1314 while ((cc
= get_opt ()) != -1)
1319 test_select_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1322 test_tp_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1325 test_wfmo_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1328 test_dev_poll_reactor
= ACE_OS::atoi (get_opt
.opt_arg ());
1331 number_of_connections
= ACE_OS::atoi (get_opt
.opt_arg ());
1334 close_timeout
= ACE_OS::atoi (get_opt
.opt_arg ());
1337 make_invocations
= ACE_OS::atoi (get_opt
.opt_arg ());
1340 run_event_loop_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1343 run_purger_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1346 run_receiver_thread
= ACE_OS::atoi (get_opt
.opt_arg ());
1349 nested_upcalls
= ACE_OS::atoi (get_opt
.opt_arg ());
1352 debug
= ACE_OS::atoi (get_opt
.opt_arg ());
1356 ACE_ERROR ((LM_ERROR
,
1357 ACE_TEXT ("\nusage: %s \n\n")
1358 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
1359 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
1360 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
1361 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
1362 ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n")
1363 ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n")
1364 ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n")
1365 ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n")
1366 ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n")
1367 ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n")
1368 ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n")
1369 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
1372 test_select_reactor
,
1375 test_dev_poll_reactor
,
1376 number_of_connections
,
1379 run_event_loop_thread
,
1381 run_receiver_thread
,
1392 run_main (int argc
, ACE_TCHAR
*argv
[])
1394 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1396 // Validate options.
1398 parse_args (argc
, argv
);
1402 disable_signal (SIGPIPE
, SIGPIPE
);
1404 int ignore_nested_upcalls
= 1;
1405 int perform_nested_upcalls
= 0;
1407 int event_loop_thread_required
= 1;
1408 int event_loop_thread_not_required
= 0;
1410 if (test_select_reactor
)
1412 ACE_DEBUG ((LM_DEBUG
,
1413 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
1415 test
<ACE_Select_Reactor
> test (ignore_nested_upcalls
,
1416 event_loop_thread_not_required
);
1417 ACE_UNUSED_ARG (test
);
1420 if (test_tp_reactor
)
1422 ACE_DEBUG ((LM_DEBUG
,
1423 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
1425 test
<ACE_TP_Reactor
> test (perform_nested_upcalls
,
1426 event_loop_thread_not_required
);
1427 ACE_UNUSED_ARG (test
);
1430 #if defined (ACE_HAS_EVENT_POLL)
1432 if (test_dev_poll_reactor
)
1434 ACE_DEBUG ((LM_DEBUG
,
1435 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
1437 test
<ACE_Dev_Poll_Reactor
> test (perform_nested_upcalls
,
1438 event_loop_thread_not_required
);
1439 ACE_UNUSED_ARG (test
);
1444 #if defined (ACE_WIN32)
1446 if (test_wfmo_reactor
)
1448 ACE_DEBUG ((LM_DEBUG
,
1449 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
1451 test
<ACE_WFMO_Reactor
> test (ignore_nested_upcalls
,
1452 event_loop_thread_required
);
1453 ACE_UNUSED_ARG (test
);
1456 #else /* ACE_WIN32 */
1458 ACE_UNUSED_ARG (event_loop_thread_required
);
1460 #endif /* ACE_WIN32 */
1467 #else /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
1470 run_main (int, ACE_TCHAR
*[])
1472 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1474 ACE_ERROR ((LM_INFO
,
1475 ACE_TEXT ("threads/accept not supported on this platform\n")));
1482 #endif /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */