Merge pull request #2309 from mitza-oci/warnings
[ACE_TAO.git] / ACE / tests / MT_Reference_Counted_Event_Handler_Test.cpp
blob42b229011702d80caa5cae8e130d26facb8f48ee
2 //=============================================================================
3 /**
4 * @file MT_Reference_Counted_Event_Handler_Test.cpp
6 * This test tries to represents what happens in the ORB wrt to
7 * event handlers, reactors, timer queues, threads, and connection
8 * caches, minus the other complexities. The following reactors
9 * are tested: Select, TP, WFMO, and Dev Poll (if enabled).
11 * The test checks proper use and shutting down of client-side
12 * event handlers when it is used by invocation threads and/or
13 * event loop threads. Server-side event handlers are either
14 * threaded or reactive. A purger thread is introduced to check the
15 * connection recycling and cache purging. Nested upcalls are also
16 * tested.
18 * @author Irfan Pyarali <irfan@oomworks.com>
20 //=============================================================================
22 #include "test_config.h"
23 #include "ace/Reactor.h"
24 #include "ace/Select_Reactor.h"
25 #include "ace/TP_Reactor.h"
26 #include "ace/WFMO_Reactor.h"
27 #include "ace/Dev_Poll_Reactor.h"
28 #include "ace/Get_Opt.h"
29 #include "ace/Task.h"
30 #include "ace/SOCK_Acceptor.h"
31 #include "ace/SOCK_Connector.h"
32 #include "ace/Auto_Event.h"
33 #include "ace/OS_NS_signal.h"
34 #include "ace/OS_NS_time.h"
35 #include "ace/OS_NS_sys_socket.h"
36 #include "ace/OS_NS_unistd.h"
38 #if defined (ACE_HAS_THREADS) && !defined ACE_LACKS_ACCEPT
40 static const char message[] = "abcdefghijklmnopqrstuvwxyz";
41 static const int message_size = 26;
42 static int test_select_reactor = 1;
43 static int test_tp_reactor = 1;
44 static int test_wfmo_reactor = 1;
45 static int test_dev_poll_reactor = 1;
46 static int debug = 0;
47 static int number_of_connections = 5;
48 static int max_nested_upcall_level = 10;
49 static int close_timeout = 500;
50 static int pipe_open_attempts = 10;
51 static int pipe_retry_timeout = 1;
52 static int make_invocations = -1;
53 static int run_event_loop_thread = -1;
54 static int run_purger_thread = -1;
55 static int run_receiver_thread = -1;
56 static int nested_upcalls = -1;
58 static ACE_HANDLE server_handle = ACE_INVALID_HANDLE;
59 static ACE_HANDLE client_handle = ACE_INVALID_HANDLE;
61 static int number_of_options = 5;
62 static int test_configs[][5] =
65 // make_invocations, run_event_loop_thread, run_purger_thread, run_receiver_thread, nested_upcalls
68 // { 0, 0, 0, 0, 0, }, // At least one thread should be running.
69 // { 0, 0, 0, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
70 // no thread will know that the socket is closed.
71 // { 0, 0, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
72 // we cannot decide which socket to close.
73 // { 0, 0, 1, 1, 0, }, // If event_loop_thread is not running and invocation_thread is not making invocations,
74 // no thread will know that the socket is closed.
75 // { 0, 1, 0, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
76 // we cannot decide which socket to close.
77 { 0, 1, 0, 1, 0, },
78 // { 0, 1, 0, 1, 1, }, // No need for nested upcalls without invocations.
79 // { 0, 1, 1, 0, 0, }, // If invocation_thread is not making invocations and if receiver is not threaded,
80 // we cannot decide which socket to close.
81 { 0, 1, 1, 1, 0, },
82 // { 0, 1, 1, 1, 1, }, // No need for nested upcalls without invocations.
83 // { 1, 0, 0, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
84 // no thread can receive the messages.
85 { 1, 0, 0, 1, 0, },
86 // { 1, 0, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
87 // { 1, 0, 1, 0, 0, }, // If both event_loop_thread and receiver are not threaded,
88 // no thread can receive the messages.
89 { 1, 0, 1, 1, 0, },
90 // { 1, 0, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
91 { 1, 1, 0, 0, 0, },
92 { 1, 1, 0, 0, 1, },
93 { 1, 1, 0, 1, 0, },
94 // { 1, 1, 0, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
95 { 1, 1, 1, 0, 0, },
96 { 1, 1, 1, 0, 1, },
97 { 1, 1, 1, 1, 0, },
98 // { 1, 1, 1, 1, 1, }, // No need for nested upcalls without event loop being used by the receiver.
101 static int
102 disable_signal (int sigmin, int sigmax)
104 #if !defined (ACE_LACKS_UNIX_SIGNALS)
105 sigset_t signal_set;
106 if (ACE_OS::sigemptyset (&signal_set) == - 1)
107 ACE_ERROR ((LM_ERROR,
108 ACE_TEXT ("Error: (%P|%t):%p\n"),
109 ACE_TEXT ("sigemptyset failed")));
111 for (int i = sigmin; i <= sigmax; i++)
112 ACE_OS::sigaddset (&signal_set, i);
114 // Put the <signal_set>.
115 # if defined (ACE_LACKS_PTHREAD_THR_SIGSETMASK)
116 // In multi-threaded application this is not POSIX compliant
117 // but let's leave it just in case.
118 if (ACE_OS::sigprocmask (SIG_BLOCK, &signal_set, 0) != 0)
119 # else
120 if (ACE_OS::thr_sigsetmask (SIG_BLOCK, &signal_set, 0) != 0)
121 # endif /* ACE_LACKS_PTHREAD_THR_SIGSETMASK */
122 ACE_ERROR_RETURN ((LM_ERROR,
123 ACE_TEXT ("Error: (%P|%t): %p\n"),
124 ACE_TEXT ("SIG_BLOCK failed")),
125 -1);
126 #else
127 ACE_UNUSED_ARG (sigmin);
128 ACE_UNUSED_ARG (sigmax);
129 #endif /* ACE_LACKS_UNIX_SIGNALS */
131 return 0;
134 /* Replication of the ACE_Pipe class. Only difference is that this
135 class always uses two sockets to create the pipe, even on platforms
136 that support pipes. */
138 class Pipe
140 public:
141 Pipe ();
143 //FUZZ: disable check_for_lack_ACE_OS
144 ///FUZZ: enable check_for_lack_ACE_OS
145 int open ();
147 ACE_HANDLE read_handle () const;
149 ACE_HANDLE write_handle () const;
151 private:
152 ACE_HANDLE handles_[2];
156 Pipe::open ()
158 ACE_INET_Addr my_addr;
159 ACE_SOCK_Acceptor acceptor;
160 ACE_SOCK_Connector connector;
161 ACE_SOCK_Stream reader;
162 ACE_SOCK_Stream writer;
163 int result = 0;
165 // Bind listener to any port and then find out what the port was.
166 if (acceptor.open (ACE_Addr::sap_any) == -1
167 || acceptor.get_local_addr (my_addr) == -1)
168 result = -1;
169 else
171 int af = my_addr.get_type ();
172 const ACE_TCHAR *local = ACE_LOCALHOST;
173 #if defined (ACE_HAS_IPV6)
174 if (af == AF_INET6)
175 local = ACE_IPV6_LOCALHOST;
176 #endif /* ACE_HAS_IPV6 */
177 ACE_INET_Addr sv_addr (my_addr.get_port_number (),
178 local,
179 af);
181 // Establish a connection within the same process.
182 if (connector.connect (writer, sv_addr) == -1)
183 result = -1;
184 else if (acceptor.accept (reader) == -1)
186 writer.close ();
187 result = -1;
191 // Close down the acceptor endpoint since we don't need it anymore.
192 acceptor.close ();
193 if (result == -1)
194 return -1;
196 this->handles_[0] = reader.get_handle ();
197 this->handles_[1] = writer.get_handle ();
199 return 0;
202 Pipe::Pipe ()
204 this->handles_[0] = ACE_INVALID_HANDLE;
205 this->handles_[1] = ACE_INVALID_HANDLE;
208 ACE_HANDLE
209 Pipe::read_handle () const
211 return this->handles_[0];
214 ACE_HANDLE
215 Pipe::write_handle () const
217 return this->handles_[1];
220 class Connection_Cache;
221 class Event_Loop_Thread;
223 static Event_Loop_Thread *global_event_loop_thread_variable = 0;
225 class Sender : public ACE_Event_Handler
227 public:
228 Sender (ACE_HANDLE handle,
229 Connection_Cache &connection_cache);
231 ~Sender () override;
233 int handle_input (ACE_HANDLE) override;
235 ssize_t send_message ();
237 //FUZZ: disable check_for_lack_ACE_OS
238 ///FUZZ: enable check_for_lack_ACE_OS
239 void close ();
241 ACE_HANDLE handle_;
243 Connection_Cache &connection_cache_;
246 class Connection_Cache
248 public:
249 Connection_Cache ();
251 ~Connection_Cache ();
253 void add_connection (Sender *sender);
255 void remove_connection (Sender *sender);
257 Sender *acquire_connection ();
259 void release_connection (Sender *sender);
261 int find (Sender *sender);
263 ACE_SYNCH_MUTEX &lock ();
265 enum State
267 IDLE,
268 BUSY,
269 NOT_IN_CACHE
272 struct Entry
274 Sender *sender_;
275 State state_;
278 Entry *entries_;
280 ACE_SYNCH_MUTEX lock_;
283 Sender::Sender (ACE_HANDLE handle,
284 Connection_Cache &connection_cache)
285 : handle_ (handle),
286 connection_cache_ (connection_cache)
288 // Enable reference counting.
289 this->reference_counting_policy ().value
290 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
292 if (debug)
293 ACE_DEBUG ((LM_DEBUG,
294 ACE_TEXT ("(%t) Reference count in Sender::Sender() is %d\n"),
295 this->reference_count_.load ()));
298 Sender::~Sender ()
300 if (debug)
301 ACE_DEBUG ((LM_DEBUG,
302 ACE_TEXT ("(%t) Reference count in ~Sender::Sender() is %d\n"),
303 this->reference_count_.load ()));
305 // Close the socket that we are responsible for.
306 ACE_OS::closesocket (this->handle_);
310 Sender::handle_input (ACE_HANDLE)
312 if (debug)
313 ACE_DEBUG ((LM_DEBUG,
314 ACE_TEXT ("(%t) Reference count in Sender::handle_input() is %d\n"),
315 this->reference_count_.load ()));
318 // In this test, this method is only called when the connection has
319 // been closed. Remove self from Reactor.
322 ACE_DEBUG ((LM_DEBUG,
323 ACE_TEXT ("(%t) Event loop thread calling Sender::close() ")
324 ACE_TEXT ("for handle %d\n"),
325 this->handle_));
327 this->close ();
329 return 0;
332 void
333 Sender::close ()
335 // Remove socket from Reactor (may fail if another thread has already
336 // removed the handle from the Reactor).
337 if (this->reactor() != 0)
338 this->reactor ()->remove_handler (this->handle_,
339 ACE_Event_Handler::ALL_EVENTS_MASK);
341 // Remove self from connection cache (may fail if another thread has
342 // already removed "this" from the cache).
343 this->connection_cache_.remove_connection (this);
346 ssize_t
347 Sender::send_message ()
349 ACE_Time_Value timeout (0, close_timeout * 1000);
351 return ACE::send_n (this->handle_,
352 message,
353 message_size,
354 &timeout);
357 class Event_Loop_Thread : public ACE_Task_Base
359 public:
360 Event_Loop_Thread (ACE_Thread_Manager &thread_manager,
361 ACE_Reactor &reactor);
363 int svc () override;
365 ACE_Reactor &reactor_;
368 class Receiver : public ACE_Task_Base
370 public:
371 Receiver (ACE_Thread_Manager &thread_manager,
372 ACE_HANDLE handle,
373 int nested_upcalls);
375 ~Receiver () override;
377 int svc () override;
379 //FUZZ: disable check_for_lack_ACE_OS
380 ///FUZZ: enable check_for_lack_ACE_OS
381 int close (u_long flags) override;
383 int handle_input (ACE_HANDLE) override;
385 int resume_handler () override;
387 ACE_HANDLE handle_;
389 int counter_;
391 int nested_upcalls_;
393 int nested_upcalls_level_;
396 Receiver::Receiver (ACE_Thread_Manager &thread_manager,
397 ACE_HANDLE handle,
398 int nested_upcalls)
399 : ACE_Task_Base (&thread_manager),
400 handle_ (handle),
401 counter_ (1),
402 nested_upcalls_ (nested_upcalls),
403 nested_upcalls_level_ (0)
405 // Enable reference counting.
406 this->reference_counting_policy ().value
407 (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
409 if (debug)
410 ACE_DEBUG ((LM_DEBUG,
411 ACE_TEXT ("(%t) Reference count in Receiver::Receiver() is %d\n"),
412 this->reference_count_.load ()));
415 Receiver::~Receiver ()
417 if (debug)
418 ACE_DEBUG ((LM_DEBUG,
419 ACE_TEXT ("(%t) Reference count in ~Receiver::Receiver() is %d\n"),
420 this->reference_count_.load ()));
422 // Close the socket that we are responsible for.
423 ACE_OS::closesocket (this->handle_);
427 Receiver::svc ()
430 // Continuously receive messages from the Sender. On error, exit
431 // thread.
434 int result = 0;
435 ACE_DEBUG ((LM_DEBUG,
436 ACE_TEXT("(%t) Receiver::svc commencing, handle = %d\n"),
437 this->handle_));
439 disable_signal (SIGPIPE, SIGPIPE);
441 while (result != -1)
443 result =
444 this->handle_input (this->handle_);
446 ACE_DEBUG ((LM_DEBUG,
447 ACE_TEXT("(%t) Receiver::svc terminating, handle = %d\n"),
448 this->handle_));
449 return 0;
453 Receiver::handle_input (ACE_HANDLE handle)
455 char buf[message_size + 1];
457 ACE_Time_Value timeout (0, close_timeout * 1000);
459 // Receive message.
460 ssize_t result =
461 ACE::recv_n (handle,
462 buf,
463 message_size,
464 &timeout);
466 if (debug && result < 1)
467 ACE_DEBUG ((LM_DEBUG,
468 ACE_TEXT("(%t) Receiver::handle input, ")
469 ACE_TEXT("h = %d, result = %d %p\n"),
470 handle_, result, ACE_TEXT("ACE::recv_n")));
472 if (this->reactor ())
473 this->reactor ()->resume_handler (handle);
475 if (result == message_size)
477 if (debug)
478 ACE_DEBUG ((LM_DEBUG,
479 ACE_TEXT ("(%t) Message %d received on handle %d\n"),
480 this->counter_++,
481 handle));
483 if (this->thr_count () == 0 &&
484 this->nested_upcalls_)
486 this->nested_upcalls_level_++;
488 if (debug)
489 ACE_DEBUG ((LM_DEBUG,
490 ACE_TEXT ("(%t) Nesting level %d\n"),
491 this->nested_upcalls_level_));
493 if ((this->nested_upcalls_level_ != max_nested_upcall_level) &&
494 (global_event_loop_thread_variable != 0))
495 global_event_loop_thread_variable->svc ();
497 this->nested_upcalls_level_--;
498 return 0;
500 else
501 return 0;
503 else
505 if (debug)
506 ACE_DEBUG ((LM_DEBUG,
507 ACE_TEXT ("(%t) /*** Problem in receiving message %d on handle")
508 ACE_TEXT (" %d: shutting down receiving thread ***/\n"),
509 this->counter_,
510 handle));
512 return -1;
517 Receiver::resume_handler ()
519 /// The application takes responsibility of resuming the handler.
520 return ACE_APPLICATION_RESUMES_HANDLER;
524 Receiver::close (u_long)
526 // If threaded, we are responsible for deleting this instance when
527 // the thread completes. If not threaded, Reactor reference
528 // counting will handle the deletion of this instance.
529 delete this;
530 return 0;
533 class Connector
535 public:
536 Connector (ACE_Thread_Manager &thread_manager,
537 ACE_Reactor &reactor,
538 int nested_upcalls);
540 //FUZZ: disable check_for_lack_ACE_OS
541 ///FUZZ: enable check_for_lack_ACE_OS
542 int connect (ACE_HANDLE &client_handle,
543 ACE_HANDLE &server_handle,
544 int run_receiver_thread);
546 ACE_Thread_Manager &thread_manager_;
548 ACE_Reactor &reactor_;
550 int nested_upcalls_;
553 Connector::Connector (ACE_Thread_Manager &thread_manager,
554 ACE_Reactor &reactor,
555 int nested_upcalls)
556 : thread_manager_ (thread_manager),
557 reactor_ (reactor),
558 nested_upcalls_ (nested_upcalls)
563 Connector::connect (ACE_HANDLE &client_handle,
564 ACE_HANDLE &server_handle,
565 int run_receiver_thread)
568 // Create a connection and a receiver to receive messages on the
569 // connection.
572 Pipe pipe;
573 int result = 0;
575 for (int i = 0; i < pipe_open_attempts; ++i)
577 result =
578 pipe.open ();
580 if (result == 0)
581 break;
583 if (result == -1)
584 ACE_OS::sleep (pipe_retry_timeout);
587 ACE_TEST_ASSERT (result == 0);
588 ACE_UNUSED_ARG (result);
590 Receiver *receiver =
591 new Receiver (this->thread_manager_,
592 pipe.write_handle (),
593 this->nested_upcalls_);
595 // Either the receiver is threaded or register it with the Reactor.
596 if (run_receiver_thread)
597 result =
598 receiver->activate ();
599 else
601 result =
602 this->reactor_.register_handler (pipe.write_handle (),
603 receiver,
604 ACE_Event_Handler::READ_MASK);
606 // The reference count on the receiver was increased by the
607 // Reactor.
608 ACE_Event_Handler_var safe_receiver (receiver);
611 ACE_TEST_ASSERT (result == 0);
612 ACE_UNUSED_ARG (result);
614 client_handle =
615 pipe.read_handle ();
617 server_handle =
618 pipe.write_handle ();
620 if (debug)
621 ACE_DEBUG ((LM_DEBUG,
622 ACE_TEXT ("(%t) New connection: client handle = %d, ")
623 ACE_TEXT ("server handle = %d\n"),
624 client_handle, server_handle));
626 return 0;
629 Connection_Cache::Connection_Cache ()
631 // Initialize the connection cache.
632 this->entries_ =
633 new Entry[number_of_connections];
635 for (int i = 0; i < number_of_connections; ++i)
637 this->entries_[i].sender_ = 0;
638 this->entries_[i].state_ = NOT_IN_CACHE;
643 Connection_Cache::find (Sender *sender)
645 for (int i = 0; i < number_of_connections; ++i)
647 if (this->entries_[i].sender_ == sender)
648 return i;
651 return -1;
654 void
655 Connection_Cache::add_connection (Sender *sender)
657 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
659 // Make sure that the state of the connection cache is as
660 // expected. <sender> should not be already in the cache.
661 ACE_TEST_ASSERT (this->find (sender) == -1);
663 int empty_index =
664 this->find (0);
666 sender->add_reference ();
667 this->entries_[empty_index].sender_ = sender;
668 this->entries_[empty_index].state_ = BUSY;
671 void
672 Connection_Cache::remove_connection (Sender *sender)
674 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
676 // Make sure that the state of the connection cache is as expected.
677 // remove_connection() may already have been called.
678 int index =
679 this->find (sender);
681 if (index == -1)
682 return;
684 // If we still have the sender, remove it.
685 sender->remove_reference ();
686 this->entries_[index].sender_ = 0;
687 this->entries_[index].state_ = NOT_IN_CACHE;
690 Sender *
691 Connection_Cache::acquire_connection ()
693 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0);
695 // Find a valid and IDLE sender.
697 int index = -1;
699 for (int i = 0; i < number_of_connections; ++i)
701 if (this->entries_[i].sender_ &&
702 this->entries_[i].state_ == IDLE)
703 index = i;
706 if (index == -1)
707 return 0;
709 this->entries_[index].sender_->add_reference ();
710 this->entries_[index].state_ = BUSY;
712 return this->entries_[index].sender_;
715 void
716 Connection_Cache::release_connection (Sender *sender)
718 ACE_GUARD (ACE_SYNCH_MUTEX, ace_mon, this->lock_);
720 // Make sure that the state of the connection cache is as expected.
721 // remove_connection() may have already removed the connection from
722 // the cache.
723 int index =
724 this->find (sender);
726 if (index == -1)
727 return;
729 // If we still have the sender, idle it.
730 this->entries_[index].state_ = IDLE;
733 ACE_SYNCH_MUTEX &
734 Connection_Cache::lock ()
736 return this->lock_;
739 Connection_Cache::~Connection_Cache ()
741 for (int i = 0; i < number_of_connections; ++i)
743 if (this->entries_[i].sender_)
744 this->remove_connection (this->entries_[i].sender_);
747 delete[] this->entries_;
750 class Invocation_Thread : public ACE_Task_Base
752 public:
753 Invocation_Thread (ACE_Thread_Manager &thread_manager,
754 ACE_Reactor &reactor,
755 Connection_Cache &connection_cache,
756 ACE_Auto_Event &new_connection_event,
757 int make_invocations,
758 int run_receiver_thread,
759 int nested_upcalls);
761 int svc () override;
763 Sender *create_connection ();
765 Connection_Cache &connection_cache_;
767 ACE_Reactor &reactor_;
769 ACE_Thread_Manager &thread_manager_;
771 ACE_Auto_Event &new_connection_event_;
773 int make_invocations_;
775 int run_receiver_thread_;
777 int nested_upcalls_;
780 Invocation_Thread::Invocation_Thread (ACE_Thread_Manager &thread_manager,
781 ACE_Reactor &reactor,
782 Connection_Cache &connection_cache,
783 ACE_Auto_Event &new_connection_event,
784 int make_invocations,
785 int run_receiver_thread,
786 int nested_upcalls)
787 : ACE_Task_Base (&thread_manager),
788 connection_cache_ (connection_cache),
789 reactor_ (reactor),
790 thread_manager_ (thread_manager),
791 new_connection_event_ (new_connection_event),
792 make_invocations_ (make_invocations),
793 run_receiver_thread_ (run_receiver_thread),
794 nested_upcalls_ (nested_upcalls)
798 Sender *
799 Invocation_Thread::create_connection ()
801 int result = 0;
803 // Connector for creating new connections.
804 Connector connector (this->thread_manager_,
805 this->reactor_,
806 this->nested_upcalls_);
808 // <server_handle> is a global variable. It will be used later by
809 // the Close_Socket_Thread.
810 result =
811 connector.connect (client_handle,
812 server_handle,
813 this->run_receiver_thread_);
814 ACE_TEST_ASSERT (result == 0);
815 ACE_UNUSED_ARG (result);
817 // Create a new sender.
818 Sender *sender =
819 new Sender (client_handle,
820 this->connection_cache_);
822 // Register it with the cache.
823 this->connection_cache_.add_connection (sender);
826 // There might be a race condition here. The sender has been added
827 // to the cache and is potentially available to other threads
828 // accessing the cache. Therefore, the other thread may use this
829 // sender and potentially close the sender before it even gets
830 // registered with the Reactor.
832 // This is resolved by marking the connection as busy when it is
833 // first added to the cache. And only once the thread creating the
834 // connection is done with it, it is marked a available in the
835 // cache.
837 // This order of registration is important.
840 // Register the handle with the Reactor.
841 result =
842 this->reactor_.register_handler (client_handle,
843 sender,
844 ACE_Event_Handler::READ_MASK);
845 #if 0
846 ACE_TEST_ASSERT (result == 0);
847 ACE_UNUSED_ARG (result);
848 #else
849 if (result != 0)
850 ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) create_connection h %d, %p\n"),
851 client_handle,
852 ACE_TEXT ("register_handler")));
853 #endif
854 return sender;
858 Invocation_Thread::svc ()
860 int connection_counter = 0;
861 ACE_DEBUG ((LM_DEBUG,
862 ACE_TEXT("(%t) Invocation_Thread::svc commencing\n")));
864 disable_signal (SIGPIPE, SIGPIPE);
866 for (int message_counter = 1;; ++message_counter)
868 // Get a connection from the cache.
869 Sender *sender =
870 this->connection_cache_.acquire_connection ();
872 // If no connection is available in the cache, create a new one.
873 if (sender == 0)
875 if (connection_counter < number_of_connections)
877 sender = this->create_connection ();
879 // This lets the Close_Socket_Thread know that the new
880 // connection has been created.
881 int result =
882 this->new_connection_event_.signal ();
883 ACE_TEST_ASSERT (result == 0);
884 ACE_UNUSED_ARG (result);
886 ++connection_counter;
887 message_counter = 1;
889 else
890 // Stop the thread, if the maximum number of connections
891 // for the test has been reached.
892 break;
895 // The reference count on the sender was increased by the cache
896 // before it was returned to us.
897 ACE_Event_Handler_var safe_sender (sender);
899 // If the test does not require making invocations, immediately
900 // release the connection.
901 if (!this->make_invocations_)
903 this->connection_cache_.release_connection (sender);
905 // Sleep for a short while
906 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
908 else
910 // Make invocation.
911 ssize_t result =
912 sender->send_message ();
914 // If successful, release connection.
915 if (result == message_size)
917 if (debug)
918 ACE_DEBUG ((LM_DEBUG,
919 ACE_TEXT ("(%t) Message %d:%d delivered on handle %d\n"),
920 connection_counter,
921 message_counter,
922 sender->handle_));
924 this->connection_cache_.release_connection (sender);
926 else
928 // If failure in making invocation, close the sender.
929 if (debug)
930 ACE_DEBUG ((LM_DEBUG,
931 ACE_TEXT ("(%t) /*** Problem in delivering message ")
932 ACE_TEXT ("%d:%d on handle %d: shutting down ")
933 ACE_TEXT ("invocation thread ***/\n"),
934 connection_counter,
935 message_counter,
936 sender->handle_));
938 ACE_DEBUG ((LM_DEBUG,
939 ACE_TEXT ("(%t) Invocation thread calling ")
940 ACE_TEXT ("Sender::close() for handle %d\n"),
941 sender->handle_));
943 sender->close ();
947 ACE_DEBUG ((LM_DEBUG,
948 ACE_TEXT("(%t) Invocation_Thread::svc calling end_reactor_event_loop\n")));
950 // Close the Reactor event loop.
951 this->reactor_.end_reactor_event_loop ();
952 ACE_DEBUG ((LM_DEBUG,
953 ACE_TEXT("(%t) Invocation_Thread::svc terminating\n")));
955 return 0;
958 class Close_Socket_Thread : public ACE_Task_Base
960 public:
961 Close_Socket_Thread (ACE_Thread_Manager &thread_manager,
962 ACE_Reactor &reactor,
963 ACE_Auto_Event &new_connection_event,
964 int make_invocations,
965 int run_receiver_thread);
967 int svc () override;
969 ACE_Auto_Event &new_connection_event_;
971 ACE_Reactor &reactor_;
973 int make_invocations_;
975 int run_receiver_thread_;
978 Close_Socket_Thread::Close_Socket_Thread (ACE_Thread_Manager &thread_manager,
979 ACE_Reactor &reactor,
980 ACE_Auto_Event &new_connection_event,
981 int make_invocations,
982 int run_receiver_thread)
983 : ACE_Task_Base (&thread_manager),
984 new_connection_event_ (new_connection_event),
985 reactor_ (reactor),
986 make_invocations_ (make_invocations),
987 run_receiver_thread_ (run_receiver_thread)
992 Close_Socket_Thread::svc ()
994 unsigned int seed = (unsigned int) ACE_OS::time ();
995 ACE_Time_Value timeout (0, close_timeout * 1000);
996 ACE_DEBUG ((LM_DEBUG,
997 ACE_TEXT("(%t) Close_Socket_Thread::svc commencing\n")));
999 disable_signal (SIGPIPE, SIGPIPE);
1001 for (; !this->reactor_.reactor_event_loop_done ();)
1003 // Wait for the new connection to be established.
1004 int result =
1005 this->new_connection_event_.wait (&timeout,
1007 ACE_TEST_ASSERT (result == 0 ||
1008 (result == -1 && errno == ETIME));
1010 if (result == -1 &&
1011 errno == ETIME)
1012 continue;
1014 // Sleep for half a second.
1015 ACE_OS::sleep (timeout);
1017 int close_client = 0;
1019 // If the invocation thread is making invocations and if the
1020 // receiver is threaded, either socket can be closed.
1021 if (this->make_invocations_ &&
1022 this->run_receiver_thread_)
1023 // Randomize which socket to close.
1024 close_client = ACE_OS::rand_r (&seed) % 2;
1026 // If the invocation thread is making invocations, only close
1027 // the client socket.
1028 else if (this->make_invocations_)
1029 close_client = 1;
1030 else
1031 // If the receiver is threaded, only close the server socket.
1032 close_client = 0;
1034 if (close_client)
1036 // Close the client socket.
1037 if (debug)
1038 ACE_DEBUG ((LM_DEBUG,
1039 ACE_TEXT ("(%t) Close socket thread closing client ")
1040 ACE_TEXT ("handle %d\n"),
1041 client_handle));
1043 ACE_OS::shutdown (client_handle, ACE_SHUTDOWN_BOTH);
1044 ACE_OS::closesocket (client_handle);
1046 else
1048 // Close the server socket.
1049 if (debug)
1050 ACE_DEBUG ((LM_DEBUG,
1051 ACE_TEXT ("(%t) Close socket thread closing server ")
1052 ACE_TEXT ("handle %d\n"),
1053 server_handle));
1054 ACE_OS::shutdown (server_handle, ACE_SHUTDOWN_BOTH);
1055 ACE_OS::closesocket (server_handle);
1058 ACE_DEBUG ((LM_DEBUG,
1059 ACE_TEXT("(%t) Close_Socket_Thread::svc terminating\n")));
1061 return 0;
1064 Event_Loop_Thread::Event_Loop_Thread (ACE_Thread_Manager &thread_manager,
1065 ACE_Reactor &reactor)
1066 : ACE_Task_Base (&thread_manager),
1067 reactor_ (reactor)
1072 Event_Loop_Thread::svc ()
1074 // Simply run the event loop.
1075 this->reactor_.owner (ACE_Thread::self ());
1076 ACE_DEBUG ((LM_DEBUG,
1077 ACE_TEXT("(%t) Event_Loop_Thread::svc commencing\n")));
1079 disable_signal (SIGPIPE, SIGPIPE);
1081 while (!this->reactor_.reactor_event_loop_done ())
1083 this->reactor_.handle_events ();
1085 ACE_DEBUG ((LM_DEBUG,
1086 ACE_TEXT("(%t) Event_Loop_Thread::svc terminating\n")));
1088 return 0;
1091 class Purger_Thread : public ACE_Task_Base
1093 public:
1094 Purger_Thread (ACE_Thread_Manager &thread_manager,
1095 ACE_Reactor &reactor,
1096 Connection_Cache &connection_cache);
1098 int svc () override;
1100 ACE_Reactor &reactor_;
1102 Connection_Cache &connection_cache_;
1105 Purger_Thread::Purger_Thread (ACE_Thread_Manager &thread_manager,
1106 ACE_Reactor &reactor,
1107 Connection_Cache &connection_cache)
1108 : ACE_Task_Base (&thread_manager),
1109 reactor_ (reactor),
1110 connection_cache_ (connection_cache)
1115 Purger_Thread::svc ()
1117 ACE_DEBUG ((LM_DEBUG,
1118 ACE_TEXT("(%t) Purger_Thread::svc commencing\n")));
1120 disable_signal (SIGPIPE, SIGPIPE);
1122 for (; !this->reactor_.reactor_event_loop_done ();)
1124 // Get a connection from the cache.
1125 Sender *sender =
1126 this->connection_cache_.acquire_connection ();
1128 // If no connection is available in the cache, sleep for a while.
1129 if (sender == 0)
1130 ACE_OS::sleep (ACE_Time_Value (0, 10 * 1000));
1131 else
1133 // The reference count on the sender was increased by the
1134 // cache before it was returned to us.
1135 ACE_Event_Handler_var safe_sender (sender);
1137 // Actively close the connection.
1138 ACE_DEBUG ((LM_DEBUG,
1139 ACE_TEXT ("(%t) Purger thread calling Sender::close() ")
1140 ACE_TEXT ("for handle %d\n"),
1141 sender->handle_));
1143 sender->close ();
1146 ACE_DEBUG ((LM_DEBUG,
1147 ACE_TEXT("(%t) Purger_Thread::svc terminating\n")));
1149 return 0;
1152 void
1153 testing (ACE_Reactor *reactor,
1154 int make_invocations,
1155 int run_event_loop_thread,
1156 int run_purger_thread,
1157 int run_receiver_thread,
1158 int nested_upcalls)
1160 ACE_DEBUG ((LM_DEBUG,
1161 ACE_TEXT ("\n(%t) Configuration:\n")
1162 ACE_TEXT ("\tInvocation thread = %d\n")
1163 ACE_TEXT ("\tEvent Loop thread = %d\n")
1164 ACE_TEXT ("\tPurger thread = %d\n")
1165 ACE_TEXT ("\tReceiver thread = %d\n")
1166 ACE_TEXT ("\tNested Upcalls = %d\n\n"),
1167 make_invocations,
1168 run_event_loop_thread,
1169 run_purger_thread,
1170 run_receiver_thread,
1171 nested_upcalls));
1173 ACE_Thread_Manager thread_manager;
1175 int result = 0;
1177 // Create the connection cache.
1178 Connection_Cache connection_cache;
1179 ACE_Auto_Event new_connection_event;
1181 // Create the invocation thread.
1182 Invocation_Thread invocation_thread (thread_manager,
1183 *reactor,
1184 connection_cache,
1185 new_connection_event,
1186 make_invocations,
1187 run_receiver_thread,
1188 nested_upcalls);
1190 result =
1191 invocation_thread.activate ();
1192 ACE_TEST_ASSERT (result == 0);
1194 // Create the thread for closing the server socket.
1195 Close_Socket_Thread close_socket_thread (thread_manager,
1196 *reactor,
1197 new_connection_event,
1198 make_invocations,
1199 run_receiver_thread);
1200 result =
1201 close_socket_thread.activate ();
1202 ACE_TEST_ASSERT (result == 0);
1204 global_event_loop_thread_variable = 0;
1206 // Create a thread to run the event loop.
1207 Event_Loop_Thread event_loop_thread (thread_manager,
1208 *reactor);
1209 if (run_event_loop_thread)
1211 global_event_loop_thread_variable =
1212 &event_loop_thread;
1214 result =
1215 event_loop_thread.activate ();
1216 ACE_TEST_ASSERT (result == 0);
1219 // Create a thread to run the purger.
1220 Purger_Thread purger_thread (thread_manager,
1221 *reactor,
1222 connection_cache);
1223 if (run_purger_thread)
1225 result =
1226 purger_thread.activate ();
1227 ACE_TEST_ASSERT (result == 0);
1230 // Wait for threads to exit.
1231 result = thread_manager.wait ();
1232 ACE_TEST_ASSERT (result == 0);
1234 // Set the global variable to zero again because the
1235 // event_loop_thread exists on the stack and now
1236 // gets destructed.
1237 global_event_loop_thread_variable = 0;
1240 template <class REACTOR_IMPL>
1241 class test
1243 public:
1244 test (int ignore_nested_upcalls,
1245 int require_event_loop_thread);
1248 template <class REACTOR_IMPL>
1249 test<REACTOR_IMPL>::test (int ignore_nested_upcalls,
1250 int require_event_loop_thread)
1252 for (int i = 0;
1253 i < (int) (sizeof test_configs / (sizeof (int) * number_of_options));
1254 i++)
1256 if ((make_invocations == -1 ||
1257 make_invocations == test_configs[i][0]) &&
1258 (run_event_loop_thread == -1 ||
1259 run_event_loop_thread == test_configs[i][1]) &&
1260 (run_purger_thread == -1 ||
1261 run_purger_thread == test_configs[i][2]) &&
1262 (run_receiver_thread == -1 ||
1263 run_receiver_thread == test_configs[i][3]) &&
1264 (nested_upcalls == -1 ||
1265 nested_upcalls == test_configs[i][4]))
1267 #if 0 /* defined (ACE_LINUX) */
1269 // @@ I am not sure why but when <make_invocations> is 0 and
1270 // there is no purger thread, the receiver thread does not
1271 // notice that the connection has been closed.
1272 if (!test_configs[i][0] && !test_configs[i][2])
1273 continue;
1275 // @@ Linux also does not work correctly in the following
1276 // case: Invocation thread starts and sends messages filling
1277 // the socket buffer. It then blocks in write(). In the
1278 // meantime, the close connection thread closes the socket
1279 // used by invocation thread. However, the invocation thread
1280 // does not notice this as it does not return from write().
1281 // Meanwhile, the event loop thread notices that a socket in
1282 // it's wait set has been closed, and starts to spin in
1283 // handle_events() since the invocation thread is not taking
1284 // out the closed handle from the Reactor's wait set.
1285 if (test_configs[i][0] && test_configs[i][1] && !test_configs[i][3])
1286 continue;
1288 #endif /* linux */
1290 if (test_configs[i][4] && ignore_nested_upcalls)
1291 continue;
1293 if (!test_configs[i][1] && require_event_loop_thread)
1294 continue;
1296 ACE_Reactor reactor (new REACTOR_IMPL, true);
1298 testing (&reactor,
1299 test_configs[i][0],
1300 test_configs[i][1],
1301 test_configs[i][2],
1302 test_configs[i][3],
1303 test_configs[i][4]);
1308 static int
1309 parse_args (int argc, ACE_TCHAR *argv[])
1311 ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("a:b:c:d:f:g:k:l:m:n:o:uz:"));
1313 int cc;
1314 while ((cc = get_opt ()) != -1)
1316 switch (cc)
1318 case 'a':
1319 test_select_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1320 break;
1321 case 'b':
1322 test_tp_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1323 break;
1324 case 'c':
1325 test_wfmo_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1326 break;
1327 case 'd':
1328 test_dev_poll_reactor = ACE_OS::atoi (get_opt.opt_arg ());
1329 break;
1330 case 'f':
1331 number_of_connections = ACE_OS::atoi (get_opt.opt_arg ());
1332 break;
1333 case 'g':
1334 close_timeout = ACE_OS::atoi (get_opt.opt_arg ());
1335 break;
1336 case 'k':
1337 make_invocations = ACE_OS::atoi (get_opt.opt_arg ());
1338 break;
1339 case 'l':
1340 run_event_loop_thread = ACE_OS::atoi (get_opt.opt_arg ());
1341 break;
1342 case 'm':
1343 run_purger_thread = ACE_OS::atoi (get_opt.opt_arg ());
1344 break;
1345 case 'n':
1346 run_receiver_thread = ACE_OS::atoi (get_opt.opt_arg ());
1347 break;
1348 case 'o':
1349 nested_upcalls = ACE_OS::atoi (get_opt.opt_arg ());
1350 break;
1351 case 'z':
1352 debug = ACE_OS::atoi (get_opt.opt_arg ());
1353 break;
1354 case 'u':
1355 default:
1356 ACE_ERROR ((LM_ERROR,
1357 ACE_TEXT ("\nusage: %s \n\n")
1358 ACE_TEXT ("\t[-a test Select Reactor] (defaults to %d)\n")
1359 ACE_TEXT ("\t[-b test TP Reactor] (defaults to %d)\n")
1360 ACE_TEXT ("\t[-c test WFMO Reactor] (defaults to %d)\n")
1361 ACE_TEXT ("\t[-d test Dev Poll Reactor] (defaults to %d)\n")
1362 ACE_TEXT ("\t[-f number of connections] (defaults to %d)\n")
1363 ACE_TEXT ("\t[-g close timeout] (defaults to %d)\n")
1364 ACE_TEXT ("\t[-k make invocations] (defaults to %d)\n")
1365 ACE_TEXT ("\t[-l run event loop thread] (defaults to %d)\n")
1366 ACE_TEXT ("\t[-m run purger thread] (defaults to %d)\n")
1367 ACE_TEXT ("\t[-n run receiver thread] (defaults to %d)\n")
1368 ACE_TEXT ("\t[-o nested upcalls] (defaults to %d)\n")
1369 ACE_TEXT ("\t[-z debug] (defaults to %d)\n")
1370 ACE_TEXT ("\n"),
1371 argv[0],
1372 test_select_reactor,
1373 test_tp_reactor,
1374 test_wfmo_reactor,
1375 test_dev_poll_reactor,
1376 number_of_connections,
1377 close_timeout,
1378 make_invocations,
1379 run_event_loop_thread,
1380 run_purger_thread,
1381 run_receiver_thread,
1382 nested_upcalls,
1383 debug));
1384 return -1;
1388 return 0;
1392 run_main (int argc, ACE_TCHAR *argv[])
1394 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1396 // Validate options.
1397 int result =
1398 parse_args (argc, argv);
1399 if (result != 0)
1400 return result;
1402 disable_signal (SIGPIPE, SIGPIPE);
1404 int ignore_nested_upcalls = 1;
1405 int perform_nested_upcalls = 0;
1407 int event_loop_thread_required = 1;
1408 int event_loop_thread_not_required = 0;
1410 if (test_select_reactor)
1412 ACE_DEBUG ((LM_DEBUG,
1413 ACE_TEXT ("\n\n(%t) Testing Select Reactor....\n\n")));
1415 test<ACE_Select_Reactor> test (ignore_nested_upcalls,
1416 event_loop_thread_not_required);
1417 ACE_UNUSED_ARG (test);
1420 if (test_tp_reactor)
1422 ACE_DEBUG ((LM_DEBUG,
1423 ACE_TEXT ("\n\n(%t) Testing TP Reactor....\n\n")));
1425 test<ACE_TP_Reactor> test (perform_nested_upcalls,
1426 event_loop_thread_not_required);
1427 ACE_UNUSED_ARG (test);
1430 #if defined (ACE_HAS_EVENT_POLL)
1432 if (test_dev_poll_reactor)
1434 ACE_DEBUG ((LM_DEBUG,
1435 ACE_TEXT ("\n\n(%t) Testing Dev Poll Reactor....\n\n")));
1437 test<ACE_Dev_Poll_Reactor> test (perform_nested_upcalls,
1438 event_loop_thread_not_required);
1439 ACE_UNUSED_ARG (test);
1442 #endif
1444 #if defined (ACE_WIN32)
1446 if (test_wfmo_reactor)
1448 ACE_DEBUG ((LM_DEBUG,
1449 ACE_TEXT ("\n\n(%t) Testing WFMO Reactor....\n\n")));
1451 test<ACE_WFMO_Reactor> test (ignore_nested_upcalls,
1452 event_loop_thread_required);
1453 ACE_UNUSED_ARG (test);
1456 #else /* ACE_WIN32 */
1458 ACE_UNUSED_ARG (event_loop_thread_required);
1460 #endif /* ACE_WIN32 */
1462 ACE_END_TEST;
1464 return 0;
1467 #else /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */
1470 run_main (int, ACE_TCHAR *[])
1472 ACE_START_TEST (ACE_TEXT ("MT_Reference_Counted_Event_Handler_Test"));
1474 ACE_ERROR ((LM_INFO,
1475 ACE_TEXT ("threads/accept not supported on this platform\n")));
1477 ACE_END_TEST;
1479 return 0;
1482 #endif /* ACE_HAS_THREADS && ! ACE_LACKS_ACCEPT */