Merge pull request #1844 from jrw972/monterey
[ACE_TAO.git] / ACE / tests / MT_Reference_Counted_Event_Handler_Test.cpp
blobbae36ea9c0ee06a1417044e4114ce211eb066400
2 //=============================================================================
3 /**
4 * @file MT_Reference_Counted_Event_Handler_Test.cpp
6 * This test tries to represents what happens in the ORB wrt to
7 * event handlers, reactors, timer queues, threads, and connection
8 * caches, minus the other complexities. The following reactors
9 * are tested: Select, TP, WFMO, and Dev Poll (if enabled).
11 * The test checks proper use and shutting down of client-side
12 * event handlers when it is used by invocation threads and/or
13 * event loop threads. Server-side event handlers are either
14 * threaded or reactive. A purger thread is introduced to check the
15 * connection recycling and cache purging. Nested upcalls are also
16 * tested.
18 * @author Irfan Pyarali <irfan@oomworks.com>
20 //=============================================================================
22 #include "test_config.h"
23 #include "ace/Reactor.h"
24 #include "ace/Select_Reactor.h"
25 #include "ace/TP_Reactor.h"
26 #include "ace/WFMO_Reactor.h"
27 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Get_Opt.h"
29 #include "ace/Task.h"
30 #include "ace/SOCK_Acceptor.h"
31 #include "ace/SOCK_Connector.h"
32 #include "ace/Auto_Event.h"
33 #include "ace/OS_NS_signal.h"
34 #include "ace/OS_NS_time.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/OS_NS_unistd.h"
38 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
40 static const char message[] = "abcdefghijklmnopqrstuvwxyz";
41 static const int message_size = 26;
42 static int test_select_reactor = 1;
43 static int test_tp_reactor = 1;
44 static int test_wfmo_reactor = 1;
45 static int test_dev_poll_reactor = 1;
46 static int debug = 0;
47 static int number_of_connections = 5;
48 static int max_nested_upcall_level = 10;
49 static int close_timeout = 500;
50 static int pipe_open_attempts = 10;
51 static int pipe_retry_timeout = 1;
52 static int make_invocations = -1;
53 static int run_event_loop_thread = -1;
54 static int run_purger_thread = -1;
55 static int run_receiver_thread = -1;
56 static int nested_upcalls = -1;
58 static ACE_HANDLE server_handle = ACE_INVALID_HANDLE;
59 static ACE_HANDLE client_handle = ACE_INVALID_HANDLE;
61 static int number_of_options = 5;
62 static int test_configs[][5] =
65 // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls
68 // { 0, 0, 0, 0, 0, }, // At least one thread should be running.
69 // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
70 // no thread will know that the socket is closed.
71 // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
72 // we cannot decide which socket to close.
73 // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
74 // no thread will know that the socket is closed.
75 // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
76 // we cannot decide which socket to close.
77 { 0, 1, 0, 1, 0, },
78 // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations.
79 // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
80 // we cannot decide which socket to close.
81 { 0, 1, 1, 1, 0, },
82 // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations.
83 // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
84 // no thread can receive the messages.
85 { 1, 0, 0, 1, 0, },
86 // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
87 // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
88 // no thread can receive the messages.
89 { 1, 0, 1, 1, 0, },
90 // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
91 { 1, 1, 0, 0, 0, },
92 { 1, 1, 0, 0, 1, },
93 { 1, 1, 0, 1, 0, },
94 // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
95 { 1, 1, 1, 0, 0, },
96 { 1, 1, 1, 0, 1, },
97 { 1, 1, 1, 1, 0, },
98 // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
101 static int
102 disable_signal (int sigmin, int sigmax)
104 #if !defined (ACE_LACKS_UNIX_SIGNALS)
105 sigset_t signal_set;
106 if (ACE_OS::sigemptyset (&signal_set) == - 1)
107 ACE_ERROR ((LM_ERROR,
108 ACE_TEXT ("Error: (%P|%t):%p\n"),
109 ACE_TEXT ("sigemptyset failed")));
111 for (int i = sigmin; i <= sigmax; i++)
112 ACE_OS::sigaddset (&signal_set, i);
114 // Put the <signal_set>.
115 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
116 // In multi-threaded application this is not POSIX compliant
117 // but let's leave it just in case.
118 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
119 # else
120 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
121 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
122 ACE_ERROR_RETURN ((LM_ERROR,
123 ACE_TEXT ("Error: (%P|%t): %p\n"),
124 ACE_TEXT ("SIG_BLOCK failed")),
125 -1);
126 #else
127 ACE_UNUSED_ARG (sigmin);
128 ACE_UNUSED_ARG (sigmax);
129 #endif /* ACE_LACKS_UNIX_SIGNALS */
131 return 0;
134 /* Replication of the ACE_Pipe class. Only difference is that this
135 class always uses two sockets to create the pipe, even on platforms
136 that support pipes. */
138 class Pipe
140 public:
142 Pipe (void);
144 //FUZZ: disable check_for_lack_ACE_OS
145 ///FUZZ: enable check_for_lack_ACE_OS
146 int open (void);
148 ACE_HANDLE read_handle (void) const;
150 ACE_HANDLE write_handle (void) const;
152 private:
153 ACE_HANDLE handles_[2];
157 Pipe::open (void)
159 ACE_INET_Addr my_addr;
160 ACE_SOCK_Acceptor acceptor;
161 ACE_SOCK_Connector connector;
162 ACE_SOCK_Stream reader;
163 ACE_SOCK_Stream writer;
164 int result = 0;
166 // Bind listener to any port and then find out what the port was.
167 if (acceptor.open (ACE_Addr::sap_any) == -1
168 || acceptor.get_local_addr (my_addr) == -1)
169 result = -1;
170 else
172 int af = my_addr.get_type ();
173 const ACE_TCHAR *local = ACE_LOCALHOST;
174 #if defined (ACE_HAS_IPV6)
175 if (af == AF_INET6)
176 local = ACE_IPV6_LOCALHOST;
177 #endif /* ACE_HAS_IPV6 */
178 ACE_INET_Addr sv_addr (my_addr.get_port_number (),
179 local,
180 af);
182 // Establish a connection within the same process.
183 if (connector.connect (writer, sv_addr) == -1)
184 result = -1;
185 else if (acceptor.accept (reader) == -1)
187 writer.close ();
188 result = -1;
192 // Close down the acceptor endpoint since we don't need it anymore.
193 acceptor.close ();
194 if (result == -1)
195 return -1;
197 this->handles_[0] = reader.get_handle ();
198 this->handles_[1] = writer.get_handle ();
200 return 0;
203 Pipe::Pipe (void)
205 this->handles_[0] = ACE_INVALID_HANDLE;
206 this->handles_[1] = ACE_INVALID_HANDLE;
209 ACE_HANDLE
210 Pipe::read_handle (void) const
212 return this->handles_[0];
215 ACE_HANDLE
216 Pipe::write_handle (void) const
218 return this->handles_[1];
221 class Connection_Cache;
222 class Event_Loop_Thread;
224 static Event_Loop_Thread *global_event_loop_thread_variable = 0;
226 class Sender : public ACE_Event_Handler
228 public:
230 Sender (ACE_HANDLE handle,
231 Connection_Cache &connection_cache);
233 ~Sender (void);
235 int handle_input (ACE_HANDLE);
237 ssize_t send_message (void);
239 //FUZZ: disable check_for_lack_ACE_OS
240 ///FUZZ: enable check_for_lack_ACE_OS
241 void close (void);
243 ACE_HANDLE handle_;
245 Connection_Cache &connection_cache_;
249 class Connection_Cache
251 public:
253 Connection_Cache (void);
255 ~Connection_Cache (void);
257 void add_connection (Sender *sender);
259 void remove_connection (Sender *sender);
261 Sender *acquire_connection (void);
263 void release_connection (Sender *sender);
265 int find (Sender *sender);
267 ACE_SYNCH_MUTEX &lock (void);
269 enum State
271 IDLE,
272 BUSY,
273 NOT_IN_CACHE
276 struct Entry
278 Sender *sender_;
279 State state_;
282 Entry *entries_;
284 ACE_SYNCH_MUTEX lock_;
287 Sender::Sender (ACE_HANDLE handle,
288 Connection_Cache &connection_cache)
289 : handle_ (handle),
290 connection_cache_ (connection_cache)
292 // Enable reference counting.
293 this->reference_counting_policy ().value
294 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
296 if (debug)
297 ACE_DEBUG ((LM_DEBUG,
298 ACE_TEXT ("(%t) Reference count in Sender::Sender() is %d\n"),
299 this->reference_count_.value ()));
302 Sender::~Sender (void)
304 if (debug)
305 ACE_DEBUG ((LM_DEBUG,
306 ACE_TEXT ("(%t) Reference count in ~Sender::Sender() is %d\n"),
307 this->reference_count_.value ()));
309 // Close the socket that we are responsible for.
310 ACE_OS::closesocket (this->handle_);
314 Sender::handle_input (ACE_HANDLE)
316 if (debug)
317 ACE_DEBUG ((LM_DEBUG,
318 ACE_TEXT ("(%t) Reference count in Sender::handle_input() is %d\n"),
319 this->reference_count_.value ()));
322 // In this test, this method is only called when the connection has
323 // been closed. Remove self from Reactor.
326 ACE_DEBUG ((LM_DEBUG,
327 ACE_TEXT ("(%t) Event loop thread calling Sender::close() ")
328 ACE_TEXT ("for handle %d\n"),
329 this->handle_));
331 this->close ();
333 return 0;
336 void
337 Sender::close (void)
339 // Remove socket from Reactor (may fail if another thread has already
340 // removed the handle from the Reactor).
341 if (this->reactor() != 0)
342 this->reactor ()->remove_handler (this->handle_,
343 ACE_Event_Handler::ALL_EVENTS_MASK);
345 // Remove self from connection cache (may fail if another thread has
346 // already removed "this" from the cache).
347 this->connection_cache_.remove_connection (this);
350 ssize_t
351 Sender::send_message (void)
353 ACE_Time_Value timeout (0, close_timeout * 1000);
355 return ACE::send_n (this->handle_,
356 message,
357 message_size,
358 &timeout);
361 class Event_Loop_Thread : public ACE_Task_Base
363 public:
365 Event_Loop_Thread (ACE_Thread_Manager &thread_manager,
366 ACE_Reactor &reactor);
368 int svc (void);
370 ACE_Reactor &reactor_;
374 class Receiver : public ACE_Task_Base
376 public:
378 Receiver (ACE_Thread_Manager &thread_manager,
379 ACE_HANDLE handle,
380 int nested_upcalls);
382 ~Receiver (void);
384 int svc (void);
386 //FUZZ: disable check_for_lack_ACE_OS
387 ///FUZZ: enable check_for_lack_ACE_OS
388 int close (u_long flags);
390 int handle_input (ACE_HANDLE);
392 int resume_handler (void);
394 ACE_HANDLE handle_;
396 int counter_;
398 int nested_upcalls_;
400 int nested_upcalls_level_;
404 Receiver::Receiver (ACE_Thread_Manager &thread_manager,
405 ACE_HANDLE handle,
406 int nested_upcalls)
407 : ACE_Task_Base (&thread_manager),
408 handle_ (handle),
409 counter_ (1),
410 nested_upcalls_ (nested_upcalls),
411 nested_upcalls_level_ (0)
413 // Enable reference counting.
414 this->reference_counting_policy ().value
415 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
417 if (debug)
418 ACE_DEBUG ((LM_DEBUG,
419 ACE_TEXT ("(%t) Reference count in Receiver::Receiver() is %d\n"),
420 this->reference_count_.value ()));
423 Receiver::~Receiver (void)
425 if (debug)
426 ACE_DEBUG ((LM_DEBUG,
427 ACE_TEXT ("(%t) Reference count in ~Receiver::Receiver() is %d\n"),
428 this->reference_count_.value ()));
430 // Close the socket that we are responsible for.
431 ACE_OS::closesocket (this->handle_);
435 Receiver::svc (void)
438 // Continuously receive messages from the Sender. On error, exit
439 // thread.
442 int result = 0;
443 ACE_DEBUG ((LM_DEBUG,
444 ACE_TEXT("(%t) Receiver::svc commencing, handle = %d\n"),
445 this->handle_));
447 disable_signal (SIGPIPE, SIGPIPE);
449 while (result != -1)
451 result =
452 this->handle_input (this->handle_);
454 ACE_DEBUG ((LM_DEBUG,
455 ACE_TEXT("(%t) Receiver::svc terminating, handle = %d\n"),
456 this->handle_));
457 return 0;
461 Receiver::handle_input (ACE_HANDLE handle)
463 char buf[message_size + 1];
465 ACE_Time_Value timeout (0, close_timeout * 1000);
467 // Receive message.
468 ssize_t result =
469 ACE::recv_n (handle,
470 buf,
471 message_size,
472 &timeout);
474 if (debug && result < 1)
475 ACE_DEBUG ((LM_DEBUG,
476 ACE_TEXT("(%t) Receiver::handle input, ")
477 ACE_TEXT("h = %d, result = %d %p\n"),
478 handle_, result, ACE_TEXT("ACE::recv_n")));
480 if (this->reactor ())
481 this->reactor ()->resume_handler (handle);
483 if (result == message_size)
485 if (debug)
486 ACE_DEBUG ((LM_DEBUG,
487 ACE_TEXT ("(%t) Message %d received on handle %d\n"),
488 this->counter_++,
489 handle));
491 if (this->thr_count () == 0 &&
492 this->nested_upcalls_)
494 this->nested_upcalls_level_++;
496 if (debug)
497 ACE_DEBUG ((LM_DEBUG,
498 ACE_TEXT ("(%t) Nesting level %d\n"),
499 this->nested_upcalls_level_));
501 if ((this->nested_upcalls_level_ != max_nested_upcall_level) &&
502 (global_event_loop_thread_variable != 0))
503 global_event_loop_thread_variable->svc ();
505 this->nested_upcalls_level_--;
506 return 0;
508 else
509 return 0;
511 else
513 if (debug)
514 ACE_DEBUG ((LM_DEBUG,
515 ACE_TEXT ("(%t) /*** Problem in receiving message %d on handle")
516 ACE_TEXT (" %d: shutting down receiving thread ***/\n"),
517 this->counter_,
518 handle));
520 return -1;
525 Receiver::resume_handler (void)
527 /// The application takes responsibility of resuming the handler.
528 return ACE_APPLICATION_RESUMES_HANDLER;
532 Receiver::close (u_long)
534 // If threaded, we are responsible for deleting this instance when
535 // the thread completes. If not threaded, Reactor reference
536 // counting will handle the deletion of this instance.
537 delete this;
538 return 0;
541 class Connector
543 public:
545 Connector (ACE_Thread_Manager &thread_manager,
546 ACE_Reactor &reactor,
547 int nested_upcalls);
549 //FUZZ: disable check_for_lack_ACE_OS
550 ///FUZZ: enable check_for_lack_ACE_OS
551 int connect (ACE_HANDLE &client_handle,
552 ACE_HANDLE &server_handle,
553 int run_receiver_thread);
555 ACE_Thread_Manager &thread_manager_;
557 ACE_Reactor &reactor_;
559 int nested_upcalls_;
563 Connector::Connector (ACE_Thread_Manager &thread_manager,
564 ACE_Reactor &reactor,
565 int nested_upcalls)
566 : thread_manager_ (thread_manager),
567 reactor_ (reactor),
568 nested_upcalls_ (nested_upcalls)
573 Connector::connect (ACE_HANDLE &client_handle,
574 ACE_HANDLE &server_handle,
575 int run_receiver_thread)
578 // Create a connection and a receiver to receive messages on the
579 // connection.
582 Pipe pipe;
583 int result = 0;
585 for (int i = 0; i < pipe_open_attempts; ++i)
587 result =
588 pipe.open ();
590 if (result == 0)
591 break;
593 if (result == -1)
594 ACE_OS::sleep (pipe_retry_timeout);
597 ACE_TEST_ASSERT (result == 0);
598 ACE_UNUSED_ARG (result);
600 Receiver *receiver =
601 new Receiver (this->thread_manager_,
602 pipe.write_handle (),
603 this->nested_upcalls_);
605 // Either the receiver is threaded or register it with the Reactor.
606 if (run_receiver_thread)
607 result =
608 receiver->activate ();
609 else
611 result =
612 this->reactor_.register_handler (pipe.write_handle (),
613 receiver,
614 ACE_Event_Handler::READ_MASK);
616 // The reference count on the receiver was increased by the
617 // Reactor.
618 ACE_Event_Handler_var safe_receiver (receiver);
621 ACE_TEST_ASSERT (result == 0);
622 ACE_UNUSED_ARG (result);
624 client_handle =
625 pipe.read_handle ();
627 server_handle =
628 pipe.write_handle ();
630 if (debug)
631 ACE_DEBUG ((LM_DEBUG,
632 ACE_TEXT ("(%t) New connection: client handle = %d, ")
633 ACE_TEXT ("server handle = %d\n"),
634 client_handle, server_handle));
636 return 0;
639 Connection_Cache::Connection_Cache (void)
641 // Initialize the connection cache.
642 this->entries_ =
643 new Entry[number_of_connections];
645 for (int i = 0; i < number_of_connections; ++i)
647 this->entries_[i].sender_ = 0;
648 this->entries_[i].state_ = NOT_IN_CACHE;
653 Connection_Cache::find (Sender *sender)
655 for (int i = 0; i < number_of_connections; ++i)
657 if (this->entries_[i].sender_ == sender)
658 return i;
661 return -1;
664 void
665 Connection_Cache::add_connection (Sender *sender)
667 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
669 // Make sure that the state of the connection cache is as
670 // expected. <sender> should not be already in the cache.
671 ACE_TEST_ASSERT (this->find (sender) == -1);
673 int empty_index =
674 this->find (0);
676 sender->add_reference ();
677 this->entries_[empty_index].sender_ = sender;
678 this->entries_[empty_index].state_ = BUSY;
681 void
682 Connection_Cache::remove_connection (Sender *sender)
684 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
686 // Make sure that the state of the connection cache is as expected.
687 // remove_connection() may already have been called.
688 int index =
689 this->find (sender);
691 if (index == -1)
692 return;
694 // If we still have the sender, remove it.
695 sender->remove_reference ();
696 this->entries_[index].sender_ = 0;
697 this->entries_[index].state_ = NOT_IN_CACHE;
700 Sender *
701 Connection_Cache::acquire_connection (void)
703 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
705 // Find a valid and IDLE sender.
707 int index = -1;
709 for (int i = 0; i < number_of_connections; ++i)
711 if (this->entries_[i].sender_ &&
712 this->entries_[i].state_ == IDLE)
713 index = i;
716 if (index == -1)
717 return 0;
719 this->entries_[index].sender_->add_reference ();
720 this->entries_[index].state_ = BUSY;
722 return this->entries_[index].sender_;
725 void
726 Connection_Cache::release_connection (Sender *sender)
728 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
730 // Make sure that the state of the connection cache is as expected.
731 // remove_connection() may have already removed the connection from
732 // the cache.
733 int index =
734 this->find (sender);
736 if (index == -1)
737 return;
739 // If we still have the sender, idle it.
740 this->entries_[index].state_ = IDLE;
743 ACE_SYNCH_MUTEX &
744 Connection_Cache::lock (void)
746 return this->lock_;
749 Connection_Cache::~Connection_Cache (void)
751 for (int i = 0; i < number_of_connections; ++i)
753 if (this->entries_[i].sender_)
754 this->remove_connection (this->entries_[i].sender_);
757 delete[] this->entries_;
760 class Invocation_Thread : public ACE_Task_Base
762 public:
764 Invocation_Thread (ACE_Thread_Manager &thread_manager,
765 ACE_Reactor &reactor,
766 Connection_Cache &connection_cache,
767 ACE_Auto_Event &new_connection_event,
768 int make_invocations,
769 int run_receiver_thread,
770 int nested_upcalls);
772 int svc (void);
774 Sender *create_connection (void);
776 Connection_Cache &connection_cache_;
778 ACE_Reactor &reactor_;
780 ACE_Thread_Manager &thread_manager_;
782 ACE_Auto_Event &new_connection_event_;
784 int make_invocations_;
786 int run_receiver_thread_;
788 int nested_upcalls_;
792 Invocation_Thread::Invocation_Thread (ACE_Thread_Manager &thread_manager,
793 ACE_Reactor &reactor,
794 Connection_Cache &connection_cache,
795 ACE_Auto_Event &new_connection_event,
796 int make_invocations,
797 int run_receiver_thread,
798 int nested_upcalls)
799 : ACE_Task_Base (&thread_manager),
800 connection_cache_ (connection_cache),
801 reactor_ (reactor),
802 thread_manager_ (thread_manager),
803 new_connection_event_ (new_connection_event),
804 make_invocations_ (make_invocations),
805 run_receiver_thread_ (run_receiver_thread),
806 nested_upcalls_ (nested_upcalls)
810 Sender *
811 Invocation_Thread::create_connection (void)
813 int result = 0;
815 // Connector for creating new connections.
816 Connector connector (this->thread_manager_,
817 this->reactor_,
818 this->nested_upcalls_);
820 // <server_handle> is a global variable. It will be used later by
821 // the Close_Socket_Thread.
822 result =
823 connector.connect (client_handle,
824 server_handle,
825 this->run_receiver_thread_);
826 ACE_TEST_ASSERT (result == 0);
827 ACE_UNUSED_ARG (result);
829 // Create a new sender.
830 Sender *sender =
831 new Sender (client_handle,
832 this->connection_cache_);
834 // Register it with the cache.
835 this->connection_cache_.add_connection (sender);
838 // There might be a race condition here. The sender has been added
839 // to the cache and is potentially available to other threads
840 // accessing the cache. Therefore, the other thread may use this
841 // sender and potentially close the sender before it even gets
842 // registered with the Reactor.
844 // This is resolved by marking the connection as busy when it is
845 // first added to the cache. And only once the thread creating the
846 // connection is done with it, it is marked a available in the
847 // cache.
849 // This order of registration is important.
852 // Register the handle with the Reactor.
853 result =
854 this->reactor_.register_handler (client_handle,
855 sender,
856 ACE_Event_Handler::READ_MASK);
857 #if 0
858 ACE_TEST_ASSERT (result == 0);
859 ACE_UNUSED_ARG (result);
860 #else
861 if (result != 0)
862 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) create_connection h %d, %p\n"),
863 client_handle,
864 ACE_TEXT ("register_handler")));
865 #endif
866 return sender;
870 Invocation_Thread::svc (void)
872 int connection_counter = 0;
873 ACE_DEBUG ((LM_DEBUG,
874 ACE_TEXT("(%t) Invocation_Thread::svc commencing\n")));
876 disable_signal (SIGPIPE, SIGPIPE);
878 for (int message_counter = 1;; ++message_counter)
880 // Get a connection from the cache.
881 Sender *sender =
882 this->connection_cache_.acquire_connection ();
884 // If no connection is available in the cache, create a new one.
885 if (sender == 0)
887 if (connection_counter < number_of_connections)
889 sender = this->create_connection ();
891 // This lets the Close_Socket_Thread know that the new
892 // connection has been created.
893 int result =
894 this->new_connection_event_.signal ();
895 ACE_TEST_ASSERT (result == 0);
896 ACE_UNUSED_ARG (result);
898 ++connection_counter;
899 message_counter = 1;
901 else
902 // Stop the thread, if the maximum number of connections
903 // for the test has been reached.
904 break;
907 // The reference count on the sender was increased by the cache
908 // before it was returned to us.
909 ACE_Event_Handler_var safe_sender (sender);
911 // If the test does not require making invocations, immediately
912 // release the connection.
913 if (!this->make_invocations_)
915 this->connection_cache_.release_connection (sender);
917 // Sleep for a short while
918 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
920 else
922 // Make invocation.
923 ssize_t result =
924 sender->send_message ();
926 // If successful, release connection.
927 if (result == message_size)
929 if (debug)
930 ACE_DEBUG ((LM_DEBUG,
931 ACE_TEXT ("(%t) Message %d:%d delivered on handle %d\n"),
932 connection_counter,
933 message_counter,
934 sender->handle_));
936 this->connection_cache_.release_connection (sender);
938 else
940 // If failure in making invocation, close the sender.
941 if (debug)
942 ACE_DEBUG ((LM_DEBUG,
943 ACE_TEXT ("(%t) /*** Problem in delivering message ")
944 ACE_TEXT ("%d:%d on handle %d: shutting down ")
945 ACE_TEXT ("invocation thread ***/\n"),
946 connection_counter,
947 message_counter,
948 sender->handle_));
950 ACE_DEBUG ((LM_DEBUG,
951 ACE_TEXT ("(%t) Invocation thread calling ")
952 ACE_TEXT ("Sender::close() for handle %d\n"),
953 sender->handle_));
955 sender->close ();
959 ACE_DEBUG ((LM_DEBUG,
960 ACE_TEXT("(%t) Invocation_Thread::svc calling end_reactor_event_loop\n")));
962 // Close the Reactor event loop.
963 this->reactor_.end_reactor_event_loop ();
964 ACE_DEBUG ((LM_DEBUG,
965 ACE_TEXT("(%t) Invocation_Thread::svc terminating\n")));
967 return 0;
970 class Close_Socket_Thread : public ACE_Task_Base
972 public:
974 Close_Socket_Thread (ACE_Thread_Manager &thread_manager,
975 ACE_Reactor &reactor,
976 ACE_Auto_Event &new_connection_event,
977 int make_invocations,
978 int run_receiver_thread);
980 int svc (void);
982 ACE_Auto_Event &new_connection_event_;
984 ACE_Reactor &reactor_;
986 int make_invocations_;
988 int run_receiver_thread_;
992 Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager &thread_manager,
993 ACE_Reactor &reactor,
994 ACE_Auto_Event &new_connection_event,
995 int make_invocations,
996 int run_receiver_thread)
997 : ACE_Task_Base (&thread_manager),
998 new_connection_event_ (new_connection_event),
999 reactor_ (reactor),
1000 make_invocations_ (make_invocations),
1001 run_receiver_thread_ (run_receiver_thread)
1006 Close_Socket_Thread::svc (void)
1008 unsigned int seed = (unsigned int) ACE_OS::time ();
1009 ACE_Time_Value timeout (0, close_timeout * 1000);
1010 ACE_DEBUG ((LM_DEBUG,
1011 ACE_TEXT("(%t) Close_Socket_Thread::svc commencing\n")));
1013 disable_signal (SIGPIPE, SIGPIPE);
1015 for (; !this->reactor_.reactor_event_loop_done ();)
1017 // Wait for the new connection to be established.
1018 int result =
1019 this->new_connection_event_.wait (&timeout,
1021 ACE_TEST_ASSERT (result == 0 ||
1022 (result == -1 && errno == ETIME));
1024 if (result == -1 &&
1025 errno == ETIME)
1026 continue;
1028 // Sleep for half a second.
1029 ACE_OS::sleep (timeout);
1031 int close_client = 0;
1033 // If the invocation thread is making invocations and if the
1034 // receiver is threaded, either socket can be closed.
1035 if (this->make_invocations_ &&
1036 this->run_receiver_thread_)
1037 // Randomize which socket to close.
1038 close_client = ACE_OS::rand_r (&seed) % 2;
1040 // If the invocation thread is making invocations, only close
1041 // the client socket.
1042 else if (this->make_invocations_)
1043 close_client = 1;
1044 else
1045 // If the receiver is threaded, only close the server socket.
1046 close_client = 0;
1048 if (close_client)
1050 // Close the client socket.
1051 if (debug)
1052 ACE_DEBUG ((LM_DEBUG,
1053 ACE_TEXT ("(%t) Close socket thread closing client ")
1054 ACE_TEXT ("handle %d\n"),
1055 client_handle));
1057 ACE_OS::shutdown (client_handle, ACE_SHUTDOWN_BOTH);
1058 ACE_OS::closesocket (client_handle);
1060 else
1062 // Close the server socket.
1063 if (debug)
1064 ACE_DEBUG ((LM_DEBUG,
1065 ACE_TEXT ("(%t) Close socket thread closing server ")
1066 ACE_TEXT ("handle %d\n"),
1067 server_handle));
1068 ACE_OS::shutdown (server_handle, ACE_SHUTDOWN_BOTH);
1069 ACE_OS::closesocket (server_handle);
1072 ACE_DEBUG ((LM_DEBUG,
1073 ACE_TEXT("(%t) Close_Socket_Thread::svc terminating\n")));
1075 return 0;
1078 Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager &thread_manager,
1079 ACE_Reactor &reactor)
1080 : ACE_Task_Base (&thread_manager),
1081 reactor_ (reactor)
1086 Event_Loop_Thread::svc (void)
1088 // Simply run the event loop.
1089 this->reactor_.owner (ACE_Thread::self ());
1090 ACE_DEBUG ((LM_DEBUG,
1091 ACE_TEXT("(%t) Event_Loop_Thread::svc commencing\n")));
1093 disable_signal (SIGPIPE, SIGPIPE);
1095 while (!this->reactor_.reactor_event_loop_done ())
1097 this->reactor_.handle_events ();
1099 ACE_DEBUG ((LM_DEBUG,
1100 ACE_TEXT("(%t) Event_Loop_Thread::svc terminating\n")));
1102 return 0;
1105 class Purger_Thread : public ACE_Task_Base
1107 public:
1109 Purger_Thread (ACE_Thread_Manager &thread_manager,
1110 ACE_Reactor &reactor,
1111 Connection_Cache &connection_cache);
1113 int svc (void);
1115 ACE_Reactor &reactor_;
1117 Connection_Cache &connection_cache_;
1121 Purger_Thread::Purger_Thread (ACE_Thread_Manager &thread_manager,
1122 ACE_Reactor &reactor,
1123 Connection_Cache &connection_cache)
1124 : ACE_Task_Base (&thread_manager),
1125 reactor_ (reactor),
1126 connection_cache_ (connection_cache)
1131 Purger_Thread::svc (void)
1133 ACE_DEBUG ((LM_DEBUG,
1134 ACE_TEXT("(%t) Purger_Thread::svc commencing\n")));
1136 disable_signal (SIGPIPE, SIGPIPE);
1138 for (; !this->reactor_.reactor_event_loop_done ();)
1140 // Get a connection from the cache.
1141 Sender *sender =
1142 this->connection_cache_.acquire_connection ();
1144 // If no connection is available in the cache, sleep for a while.
1145 if (sender == 0)
1146 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
1147 else
1149 // The reference count on the sender was increased by the
1150 // cache before it was returned to us.
1151 ACE_Event_Handler_var safe_sender (sender);
1153 // Actively close the connection.
1154 ACE_DEBUG ((LM_DEBUG,
1155 ACE_TEXT ("(%t) Purger thread calling Sender::close() ")
1156 ACE_TEXT ("for handle %d\n"),
1157 sender->handle_));
1159 sender->close ();
1162 ACE_DEBUG ((LM_DEBUG,
1163 ACE_TEXT("(%t) Purger_Thread::svc terminating\n")));
1165 return 0;
1168 void
1169 testing (ACE_Reactor *reactor,
1170 int make_invocations,
1171 int run_event_loop_thread,
1172 int run_purger_thread,
1173 int run_receiver_thread,
1174 int nested_upcalls)
1176 ACE_DEBUG ((LM_DEBUG,
1177 ACE_TEXT ("\n(%t) Configuration:\n")
1178 ACE_TEXT ("\tInvocation thread = %d\n")
1179 ACE_TEXT ("\tEvent Loop thread = %d\n")
1180 ACE_TEXT ("\tPurger thread = %d\n")
1181 ACE_TEXT ("\tReceiver thread = %d\n")
1182 ACE_TEXT ("\tNested Upcalls = %d\n\n"),
1183 make_invocations,
1184 run_event_loop_thread,
1185 run_purger_thread,
1186 run_receiver_thread,
1187 nested_upcalls));
1189 ACE_Thread_Manager thread_manager;
1191 int result = 0;
1193 // Create the connection cache.
1194 Connection_Cache connection_cache;
1195 ACE_Auto_Event new_connection_event;
1197 // Create the invocation thread.
1198 Invocation_Thread invocation_thread (thread_manager,
1199 *reactor,
1200 connection_cache,
1201 new_connection_event,
1202 make_invocations,
1203 run_receiver_thread,
1204 nested_upcalls);
1206 result =
1207 invocation_thread.activate ();
1208 ACE_TEST_ASSERT (result == 0);
1210 // Create the thread for closing the server socket.
1211 Close_Socket_Thread close_socket_thread (thread_manager,
1212 *reactor,
1213 new_connection_event,
1214 make_invocations,
1215 run_receiver_thread);
1216 result =
1217 close_socket_thread.activate ();
1218 ACE_TEST_ASSERT (result == 0);
1220 global_event_loop_thread_variable = 0;
1222 // Create a thread to run the event loop.
1223 Event_Loop_Thread event_loop_thread (thread_manager,
1224 *reactor);
1225 if (run_event_loop_thread)
1227 global_event_loop_thread_variable =
1228 &event_loop_thread;
1230 result =
1231 event_loop_thread.activate ();
1232 ACE_TEST_ASSERT (result == 0);
1235 // Create a thread to run the purger.
1236 Purger_Thread purger_thread (thread_manager,
1237 *reactor,
1238 connection_cache);
1239 if (run_purger_thread)
1241 result =
1242 purger_thread.activate ();
1243 ACE_TEST_ASSERT (result == 0);
1246 // Wait for threads to exit.
1247 result = thread_manager.wait ();
1248 ACE_TEST_ASSERT (result == 0);
1250 // Set the global variable to zero again because the
1251 // event_loop_thread exists on the stack and now
1252 // gets destructed.
1253 global_event_loop_thread_variable = 0;
1256 template <class REACTOR_IMPL>
1257 class test
1259 public:
1260 test (int ignore_nested_upcalls,
1261 int require_event_loop_thread);
1264 template <class REACTOR_IMPL>
1265 test<REACTOR_IMPL>::test (int ignore_nested_upcalls,
1266 int require_event_loop_thread)
1268 for (int i = 0;
1269 i < (int) (sizeof test_configs / (sizeof (int) * number_of_options));
1270 i++)
1272 if ((make_invocations == -1 ||
1273 make_invocations == test_configs[i][0]) &&
1274 (run_event_loop_thread == -1 ||
1275 run_event_loop_thread == test_configs[i][1]) &&
1276 (run_purger_thread == -1 ||
1277 run_purger_thread == test_configs[i][2]) &&
1278 (run_receiver_thread == -1 ||
1279 run_receiver_thread == test_configs[i][3]) &&
1280 (nested_upcalls == -1 ||
1281 nested_upcalls == test_configs[i][4]))
1284 #if 0 /* defined (ACE_LINUX) */
1286 // @@ I am not sure why but when <make_invocations> is 0 and
1287 // there is no purger thread, the receiver thread does not
1288 // notice that the connection has been closed.
1289 if (!test_configs[i][0] && !test_configs[i][2])
1290 continue;
1292 // @@ Linux also does not work correctly in the following
1293 // case: Invocation thread starts and sends messages filling
1294 // the socket buffer. It then blocks in write(). In the
1295 // meantime, the close connection thread closes the socket
1296 // used by invocation thread. However, the invocation thread
1297 // does not notice this as it does not return from write().
1298 // Meanwhile, the event loop thread notices that a socket in
1299 // it's wait set has been closed, and starts to spin in
1300 // handle_events() since the invocation thread is not taking
1301 // out the closed handle from the Reactor's wait set.
1302 if (test_configs[i][0] && test_configs[i][1] && !test_configs[i][3])
1303 continue;
1305 #endif /* linux */
1307 if (test_configs[i][4] && ignore_nested_upcalls)
1308 continue;
1310 if (!test_configs[i][1] && require_event_loop_thread)
1311 continue;
1313 ACE_Reactor reactor (new REACTOR_IMPL, true);
1315 testing (&reactor,
1316 test_configs[i][0],
1317 test_configs[i][1],
1318 test_configs[i][2],
1319 test_configs[i][3],
1320 test_configs[i][4]);
1325 static int
1326 parse_args (int argc, ACE_TCHAR *argv[])
1328 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:l:m:n:o:uz:"));
1330 int cc;
1331 while ((cc = get_opt ()) != -1)
1333 switch (cc)
1335 case 'a':
1336 test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1337 break;
1338 case 'b':
1339 test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1340 break;
1341 case 'c':
1342 test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1343 break;
1344 case 'd':
1345 test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1346 break;
1347 case 'f':
1348 number_of_connections = ACE_OS::atoi (get_opt.opt_arg ());
1349 break;
1350 case 'g':
1351 close_timeout = ACE_OS::atoi (get_opt.opt_arg ());
1352 break;
1353 case 'k':
1354 make_invocations = ACE_OS::atoi (get_opt.opt_arg ());
1355 break;
1356 case 'l':
1357 run_event_loop_thread = ACE_OS::atoi (get_opt.opt_arg ());
1358 break;
1359 case 'm':
1360 run_purger_thread = ACE_OS::atoi (get_opt.opt_arg ());
1361 break;
1362 case 'n':
1363 run_receiver_thread = ACE_OS::atoi (get_opt.opt_arg ());
1364 break;
1365 case 'o':
1366 nested_upcalls = ACE_OS::atoi (get_opt.opt_arg ());
1367 break;
1368 case 'z':
1369 debug = ACE_OS::atoi (get_opt.opt_arg ());
1370 break;
1371 case 'u':
1372 default:
1373 ACE_ERROR ((LM_ERROR,
1374 ACE_TEXT ("\nusage: %s \n\n")
1375 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
1376 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
1377 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
1378 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
1379 ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n")
1380 ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n")
1381 ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n")
1382 ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n")
1383 ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n")
1384 ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n")
1385 ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n")
1386 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
1387 ACE_TEXT ("\n"),
1388 argv[0],
1389 test_select_reactor,
1390 test_tp_reactor,
1391 test_wfmo_reactor,
1392 test_dev_poll_reactor,
1393 number_of_connections,
1394 close_timeout,
1395 make_invocations,
1396 run_event_loop_thread,
1397 run_purger_thread,
1398 run_receiver_thread,
1399 nested_upcalls,
1400 debug));
1401 return -1;
1405 return 0;
1409 run_main (int argc, ACE_TCHAR *argv[])
1411 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1413 // Validate options.
1414 int result =
1415 parse_args (argc, argv);
1416 if (result != 0)
1417 return result;
1419 disable_signal (SIGPIPE, SIGPIPE);
1421 int ignore_nested_upcalls = 1;
1422 int perform_nested_upcalls = 0;
1424 int event_loop_thread_required = 1;
1425 int event_loop_thread_not_required = 0;
1427 if (test_select_reactor)
1429 ACE_DEBUG ((LM_DEBUG,
1430 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
1432 test<ACE_Select_Reactor> test (ignore_nested_upcalls,
1433 event_loop_thread_not_required);
1434 ACE_UNUSED_ARG (test);
1437 if (test_tp_reactor)
1439 ACE_DEBUG ((LM_DEBUG,
1440 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
1442 test<ACE_TP_Reactor> test (perform_nested_upcalls,
1443 event_loop_thread_not_required);
1444 ACE_UNUSED_ARG (test);
1447 #if defined (ACE_HAS_EVENT_POLL)
1449 if (test_dev_poll_reactor)
1451 ACE_DEBUG ((LM_DEBUG,
1452 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
1454 test<ACE_Dev_Poll_Reactor> test (perform_nested_upcalls,
1455 event_loop_thread_not_required);
1456 ACE_UNUSED_ARG (test);
1459 #endif
1461 #if defined (ACE_WIN32)
1463 if (test_wfmo_reactor)
1465 ACE_DEBUG ((LM_DEBUG,
1466 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
1468 test<ACE_WFMO_Reactor> test (ignore_nested_upcalls,
1469 event_loop_thread_required);
1470 ACE_UNUSED_ARG (test);
1473 #else /* ACE_WIN32 */
1475 ACE_UNUSED_ARG (event_loop_thread_required);
1477 #endif /* ACE_WIN32 */
1479 ACE_END_TEST;
1481 return 0;
1484 #else /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
1487 run_main (int, ACE_TCHAR *[])
1489 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1491 ACE_ERROR ((LM_INFO,
1492 ACE_TEXT ("threads/accept not supported on this platform\n")));
1494 ACE_END_TEST;
1496 return 0;
1499 #endif /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */