1 #include "ace/Get_Opt.h"
2 #include "ace/Global_Macros.h"
4 #include "tao/ORB_Core.h"
5 #include "tao/default_resource.h"
6 #include "tao/Leader_Follower.h"
7 #include "tao/LF_Event_Loop_Thread_Helper.h"
8 #include "tao/LF_Event.h"
9 #include "tao/Transport.h"
10 #include "ace/Task_T.h"
11 #include "tao/TAO_Export.h"
12 #include "ace/TP_Reactor.h"
15 #if defined (ACE_HAS_THREADS)
20 // TEST_ASSERT is exactly the same as ACE_ASSERT except it is active
21 // for both debug and *release* builds.
22 #define TEST_ASSERT(X) \
24 ACE_ERROR ((LM_ERROR, ACE_TEXT ("TEST_ASSERT: (%P|%t) file %N, line %l assertion failed for '%C'.%a\n"), \
28 #define TSS_ASSERT(TSS, LF, ELT, CLT, LA) \
29 TEST_ASSERT ((TSS->event_loop_thread_ == ELT)); \
30 TEST_ASSERT ((TSS->client_leader_thread_ == CLT)); \
31 TEST_ASSERT ((LF.leader_available () == LA));
36 parse_args (int argc
, ACE_TCHAR
*argv
[])
38 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT ("d"));
41 while ((c
= get_opts ()) != -1)
50 ACE_ERROR_RETURN ((LM_ERROR
,
57 // Indicates sucessful parsing of the command line
61 class Command
: public ACE_Message_Block
64 virtual int execute (Worker
*) = 0;
68 //////////////////////////////////////////////////////////////////////
69 // NOTE: Do *NOT* put the same msg into the msg queue more than once.
70 // This will confuse the msg queue and result it in dropping messages
71 //////////////////////////////////////////////////////////////////////
72 class Worker
: public ACE_Task
<ACE_SYNCH
>
80 virtual int close (u_long
= 0);
81 virtual int put (ACE_Message_Block
* mblk
, ACE_Time_Value
* tv
= 0);
82 int process_cmd (void);
83 void shutdown (bool do_shutdown
);
90 ACE_TSS
<Worker
> *workers_p
= 0;
91 #define workers (*workers_p)
93 int Worker::svc (void)
96 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Worker thread starting up.\n")));
97 // Register this worker
98 workers
.ts_object (const_cast<Worker
*> (this));
100 while (!shutdown_
&& retval
!= -1)
102 retval
= this->process_cmd ();
105 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Worker thread shutting down.\n")));
109 int Worker::close (u_long
)
111 // de-register this worker, otherwise the ACE_TSS will try to destroy it
112 workers
.ts_object (0);
116 int Worker::put (ACE_Message_Block
* mblk
, ACE_Time_Value
* tv
)
118 return this->putq (mblk
, tv
);
121 int Worker::process_cmd (void)
123 ACE_Message_Block
*mb
= 0;
124 if (this->getq (mb
, 0) == -1)
126 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Error calling getq: $!\n")));
127 // Strangely, message queues return this instead of ETIME
128 if (errno
== EWOULDBLOCK
|| errno
== ESHUTDOWN
)
132 Command
* cmd
= dynamic_cast <Command
*> (mb
);
133 ACE_ASSERT (cmd
!= 0);
139 void Worker::shutdown (bool do_shutdown
)
141 shutdown_
= do_shutdown
;
144 bool Worker::shutdown (void)
149 class Test_Reactor
: public ACE_TP_Reactor
152 Test_Reactor (size_t max_number_of_handles
,
153 bool restart
= false,
154 ACE_Sig_Handler
*sh
= 0,
155 ACE_Timer_Queue
*tq
= 0,
156 bool mask_signals
= true,
157 int s_queue
= ACE_Select_Reactor_Token::FIFO
)
158 : ACE_TP_Reactor(max_number_of_handles
, restart
, sh
, tq
, mask_signals
, s_queue
) {}
161 // This is the method that the Leader_Follower object calls.
162 virtual int handle_events (ACE_Time_Value
* = 0)
164 if (TAO_debug_level
> 10)
165 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Test_Reactor::handle_events\n")));
166 // This is called by client leader threads. Note, the loop here
167 // glosses over the fact that the Leader_Follower code does not
168 // work quite the way we want it to. Namely, this logic:
169 // 1 - client leader thread detects when there are event loop
171 // 2 - client leader wakes up event loop threads via broadcast
172 // 3 - client leader temporarily gives up lock to allow event loop
173 // threads to take over leadership
174 // 4 - client leader thread takes lock again and loops around to
176 // The problem is that the gap between 3 & 4 is often not long
177 // enough for the event loop threads to get switched in and take
178 // ovwnership of the lock, even though the client leader thread
180 // Thus this code, once shutdown, will continuely return and thus
181 // give the leader follower multiple chances to hand off to an
182 // event loop thread. This is not ideal but it will have to do
183 // until the leader follower code is fixed (if possible)
184 while (!workers
->shutdown())
185 // call this thread's (worker's) process_cmd method
186 workers
->process_cmd ();
190 virtual int handle_events (ACE_Time_Value
&)
192 return this->handle_events ();
196 // Our own Resource_Factory for testing purposes. This just returns
197 // our Test_Reactor to the Leader_Follower object via the ORB_Core.
198 class Test_Resource_Factory
: public TAO_Default_Resource_Factory
201 Test_Resource_Factory ()
204 virtual ACE_Reactor_Impl
* allocate_reactor_impl () const
206 ACE_Reactor_Impl
*impl
= 0;
207 ACE_NEW_RETURN (impl
,
208 Test_Reactor (ACE::max_handles (),
212 this->reactor_mask_signals_
,
213 ACE_Select_Reactor_Token::LIFO
),
221 // force export flag otherwise Windows will complain
222 #define TAO_Test_Export ACE_Proper_Export_Flag
224 ACE_FACTORY_DEFINE (TAO_Test
, Test_Resource_Factory
)
225 ACE_STATIC_SVC_DEFINE (Test_Resource_Factory
,
226 ACE_TEXT ("Resource_Factory"),
228 &ACE_SVC_NAME (Test_Resource_Factory
),
229 ACE_Service_Type::DELETE_THIS
230 | ACE_Service_Type::DELETE_OBJ
,
232 ACE_STATIC_SVC_REQUIRE (Test_Resource_Factory
);
234 int load_test_resources
=
235 ACE_Service_Config::process_directive (ace_svc_desc_Test_Resource_Factory
);
237 class Test_LF_Event
: public TAO_LF_Event
243 void complete_event (TAO_Leader_Follower
&lf
)
246 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Completing event\n")));
247 this->state_changed (TAO_LF_Event::LFS_SUCCESS
, lf
);
251 virtual bool successful_i () const
253 return this->state_
== TAO_LF_Event::LFS_SUCCESS
;
256 virtual bool error_detected_i () const
258 return (this->state_
== TAO_LF_Event::LFS_FAILURE
259 || this->state_
== TAO_LF_Event::LFS_TIMEOUT
260 || this->state_
== TAO_LF_Event::LFS_CONNECTION_CLOSED
);
262 virtual void state_changed_i (LFS_STATE new_state
)
264 this->state_
= new_state
;
267 virtual bool is_state_final () const
269 if (this->state_
== TAO_LF_Event::LFS_TIMEOUT
||
270 this->state_
== TAO_LF_Event::LFS_FAILURE
)
276 class Test_Transport
: public TAO_Transport
279 Test_Transport (CORBA::ULong tag
,
280 TAO_ORB_Core
*orb_core
)
281 : TAO_Transport (tag
, orb_core
)
284 virtual int send_message (TAO_OutputCDR
&,
286 TAO_ServerRequest
* = 0,
287 TAO_Message_Semantics
= TAO_Message_Semantics (),
288 ACE_Time_Value
* = 0)
293 virtual ssize_t
send (iovec
*, int ,
295 const ACE_Time_Value
* = 0)
300 virtual ssize_t
recv (char *,
302 const ACE_Time_Value
* = 0)
307 virtual int messaging_init (CORBA::Octet
,
313 virtual ACE_Event_Handler
* event_handler_i (void)
319 virtual TAO_Connection_Handler
* connection_handler_i (void)
324 virtual int send_request (TAO_Stub
*,
327 TAO_Message_Semantics
,
335 class Shutdown
: public Command
338 virtual int execute (Worker
* worker
)
341 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Shutdown cmd\n")));
342 worker
->shutdown (true);
348 class TSS_Assert
: public Command
351 TSS_Assert (TAO_ORB_Core
* orb_core
,
354 bool leader_available
)
355 : orb_core_ (orb_core
),
356 elt_count_ (elt_count
),
357 clt_count_ (clt_count
),
358 leader_available_ (leader_available
)
361 virtual int execute (Worker
*)
364 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing TSS_Assert(%d,%d,%d) cmd\n"),
365 elt_count_
, clt_count_
, leader_available_
));
366 TAO_Leader_Follower
&leader_follower
= orb_core_
->leader_follower ();
367 TAO_ORB_Core_TSS_Resources
* tss
= orb_core_
->get_tss_resources ();
368 TSS_ASSERT (tss
, leader_follower
,
369 elt_count_
, clt_count_
, leader_available_
);
373 TAO_ORB_Core
* orb_core_
;
374 const int elt_count_
;
375 const int clt_count_
;
376 const bool leader_available_
;
379 class Wait_For_Event
: public Command
382 Wait_For_Event (Test_LF_Event
& event
,
383 Test_Transport
& transport
,
384 TAO_Leader_Follower
& lf
)
386 transport_ (transport
),
389 virtual int execute (Worker
*)
392 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Wait_For_Event cmd\n")));
393 int retval
= lf_
.wait_for_event (&event_
, &transport_
, 0);
394 // The worker has probably been shutdown in order for the client
395 // leader event loop to exit - reactivate the worker so it from
396 // process msgs once we return
397 workers
->shutdown (false);
401 Test_LF_Event
& event_
;
402 Test_Transport
& transport_
;
403 TAO_Leader_Follower
& lf_
;
406 class Cond_Signal
: public Command
415 virtual int execute (Worker
*)
418 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Cond_Signal cmd\n")));
419 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
,this->cond_
.mutex (), 0);
420 return this->cond_
.signal ();
422 TAO_SYNCH_MUTEX
& lock ()
426 ACE_Condition_Thread_Mutex
& cond ()
430 virtual ACE_Message_Block
*release (void)
432 // we need to only release once both the main and worker thread
433 // are done with this object - each signals this by calling this
436 if (this->ref_count_
== 0)
437 return ACE_Message_Block::release ();
442 TAO_SYNCH_MUTEX lock_
;
443 ACE_Condition_Thread_Mutex cond_
;
447 class Event_Loop_Thread
: public Command
450 Event_Loop_Thread(TAO_Leader_Follower
& lf
,
451 TAO_LF_Strategy
& lf_strategy
)
452 : lf_ (lf
), lf_strategy_ (lf_strategy
)
455 virtual int execute (Worker
* worker
)
458 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Event_Loop_Thread cmd\n")));
459 TAO_LF_Event_Loop_Thread_Helper
elt (lf_
, lf_strategy_
, 0);
460 while (!worker
->shutdown())
461 worker
->process_cmd ();
462 // The worker has been shutdown in order for this event loop
463 // thread to exit - reactivate the worker so it from process msgs
465 worker
->shutdown (false);
469 TAO_Leader_Follower
& lf_
;
470 TAO_LF_Strategy
& lf_strategy_
;
473 class Set_Upcall_Thread
: public Command
476 Set_Upcall_Thread (TAO_Leader_Follower
& lf
)
480 virtual int execute (Worker
*)
483 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Set_Upcall_Thread cmd\n")));
484 lf_
.set_upcall_thread ();
489 TAO_Leader_Follower
& lf_
;
492 void synch_with_worker (Worker
& worker
)
494 // This object is released by the worker thread after it has
496 Cond_Signal
* cond
= new Cond_Signal
;
498 ACE_GUARD (TAO_SYNCH_MUTEX
, guard
, cond
->lock ());
500 ACE_Time_Value
tv (1, 0);
501 tv
+= ACE_OS::gettimeofday ();
502 TEST_ASSERT ((cond
->cond ().wait (&tv
) == 0));
506 // 1 - Simple event loop thread test
507 void Test_1 (TAO_ORB_Core
* orb_core
)
509 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
510 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
511 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
513 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
514 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #1 - Simple Event Loop call\n")));
516 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
518 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
519 elt (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
522 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
525 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
528 // 2 - Nested event loop threads - no set_upcall_thread call
529 void Test_2 (TAO_ORB_Core
* orb_core
)
531 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
532 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
533 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
534 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
535 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #2 - 2 nested Event Loop calls\n")));
537 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
539 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
540 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
542 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
544 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
545 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
547 TSS_ASSERT (tss
, leader_follower
, 2, 0, true);
550 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
553 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
556 // 3 - Nested event loop threads - with set_upcall_thread call
557 void Test_3 (TAO_ORB_Core
* orb_core
)
559 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
560 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
561 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
562 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
563 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #3 - 2 nested Event Loop calls with set_upcall_thread\n")));
565 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
567 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
568 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
570 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
572 leader_follower
.set_upcall_thread ();
573 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
575 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
576 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
578 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
581 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
584 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
587 // 4 - client leader thread
588 void Test_4 (TAO_ORB_Core
* orb_core
)
590 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
591 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
592 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #4 - Simple Client Leader thread\n")));
598 // Test initial conditions
599 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
601 // Have the thread wait on an event
603 Test_Transport
transport (0, orb_core
);
604 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
606 // The thread is still waiting on the event and thus should
607 // now be a client-leader thread
608 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
610 // Synchronise with the thread before we complete its event
611 synch_with_worker (wrk1
);
612 // Complete the event
613 event
.complete_event (leader_follower
);
615 // The thread is still inside handle_events - shutdown the
617 wrk1
.put (new Shutdown
);
619 // The thread should now return from being a client leader
620 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
622 wrk1
.put (new Shutdown
);
626 // 5 - nested client leader thread
627 void Test_5 (TAO_ORB_Core
* orb_core
)
629 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
630 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
631 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #5 - 2 nested Client Leader calls\n")));
637 // Test initial conditions
638 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
640 // Have the thread wait on an event
642 Test_Transport
transport (0, orb_core
);
643 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
645 // The thread is still waiting on the event and thus should
646 // now be a client-leader thread
647 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
649 // Wait for another event
650 Test_LF_Event event2
;
651 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
653 // The thread is still waiting on the event and thus should
654 // now be a client-leader thread
655 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
657 // Synchronise with the thread before we complete its event
658 synch_with_worker (wrk1
);
660 // Complete the first event - nothing should happen
661 event
.complete_event (leader_follower
);
663 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
665 // Complete the second event - everything should unwind
666 synch_with_worker (wrk1
);
667 event2
.complete_event (leader_follower
);
669 // The thread is still inside handle_events - shutdown the
670 // event processing for the inner client leader
671 wrk1
.put (new Shutdown
);
673 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
675 // The thread is now in the handle_events for the outter
676 // client-leader - the event is already complete so just
677 // shutdown the cmd processing.
678 wrk1
.put (new Shutdown
);
680 // We should now we back at our initial state.
681 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
683 wrk1
.put (new Shutdown
);
687 // 6 - nested client leader thread with set_upcall_thread
688 void Test_6 (TAO_ORB_Core
* orb_core
)
690 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
691 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
692 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #6 - 2 nested Client Leader calls with set_upcall_thread\n")));
698 // Test initial conditions
699 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
701 // Have the thread wait on an event
703 Test_Transport
transport (0, orb_core
);
704 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
706 // The thread is still waiting on the event and thus should
707 // now be a client-leader thread
708 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
710 // Call set_upcall_thread
711 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
712 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
714 // Wait for another event
715 Test_LF_Event event2
;
716 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
718 // The thread is still waiting on the event and thus should
719 // now be a client-leader thread
720 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
722 // Synchronise with the thread before we complete its event
723 synch_with_worker (wrk1
);
725 // Complete the first event - nothing should happen
726 event
.complete_event (leader_follower
);
728 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
730 // Complete the second event - everything should unwind
731 synch_with_worker (wrk1
);
732 event2
.complete_event (leader_follower
);
734 // The thread is still inside handle_events - shutdown the
735 // event processing for the inner client leader
736 wrk1
.put (new Shutdown
);
738 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
740 // The thread is now in the handle_events for the outter
741 // client-leader - the event is already complete so just
742 // shutdown the cmd processing.
743 wrk1
.put (new Shutdown
);
745 // We should now we back at our initial state.
746 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
748 wrk1
.put (new Shutdown
);
752 // 7 - 2 client leader threads with set_upcall_thread
753 void Test_7 (TAO_ORB_Core
* orb_core
)
755 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
756 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
757 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #7 - Client Leader yields to another client thread\n")));
763 // Test initial conditions
764 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
766 // Have the thread wait on an event
768 Test_Transport
transport (0, orb_core
);
769 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
771 // The thread is still waiting on the event and thus should
772 // now be a client-leader thread
773 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
774 // Wait for the firs thread to be the client leader before we start
776 synch_with_worker (wrk1
);
778 // Create another worker and have it do the same
781 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, true));
782 // Make sure this test is complete before the Set_Upcall_Thread below
783 synch_with_worker (wrk2
);
784 Test_LF_Event event2
;
785 wrk2
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
786 // Note, we can't test the new thread here - it is block waiting on
787 // the follower cond var
788 // wrk2.put (new TSS_Assert(orb_core, 0, 1, true));
790 // Call set_upcall_thread on the first thread
791 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
792 // Our second thread should now be the client leader and the first
793 // thread should not. Note, we need to first synchronise with
794 // thread 2 (to make sure it is in handle_events) to avoid race
796 synch_with_worker (wrk2
);
797 wrk2
.put (new TSS_Assert(orb_core
, 0, 1, true));
798 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
800 // We should now be able to shutdown the first thread - have it
801 // return from handle_events and complete its event. If it has to
802 // wait it will just go back to being a follower
803 wrk1
.put (new Shutdown
);
804 event
.complete_event (leader_follower
);
805 synch_with_worker (wrk1
);
806 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
807 // We can now shut-down the thread
808 wrk1
.put (new Shutdown
);
811 // Now shut-down the second thread
812 event2
.complete_event (leader_follower
);
813 wrk2
.put (new Shutdown
);
814 synch_with_worker (wrk2
);
815 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, false));
816 wrk2
.put (new Shutdown
);
820 // 8 - client becomes leader when event loop thread dispatched
821 void Test_8 (TAO_ORB_Core
* orb_core
)
823 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
824 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
825 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
826 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #8 - client becomes leader when event thread dispatched\n")));
832 // Test initial conditions
833 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
835 // Have the thread become an event loop thread
836 wrk1
.put (new Event_Loop_Thread (leader_follower
,
838 wrk1
.put (new TSS_Assert(orb_core
, 1, 0, true));
840 // Before we start the next thread synchronise with the first
841 synch_with_worker (wrk1
);
843 // Start another thread and have it wait on an event
846 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, true));
848 Test_Transport
transport (0, orb_core
);
849 synch_with_worker (wrk2
);
850 wrk2
.put (new Wait_For_Event(event
, transport
, leader_follower
));
852 // The new thread is a follower and thus is waiting on the follower
853 // cond var - we can't test this other than to check if the leader
854 // follower has clients, however, because we can't synchronise with
855 // that thread such a test would contain a race condition.
857 // Now dispatch the event loop thread by having it call set_upcall_thread
858 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
860 // the first worker should have given up leadership and the second
861 // thread should have assumed leadership. We have to synchronise
862 // with both threads before we can test anything, otherwise we could
863 // catch the window where there is no leader.
864 synch_with_worker (wrk1
);
865 synch_with_worker (wrk2
);
866 wrk1
.put (new TSS_Assert (orb_core
, 0, 0, true));
867 wrk2
.put (new TSS_Assert (orb_core
, 0, 1, true));
869 // OK, now shut everything down - first the event loop thread
870 wrk1
.put (new Shutdown
);
871 wrk1
.put (new TSS_Assert (orb_core
, 0, 0, true));
872 wrk1
.put (new Shutdown
);
875 // Now the client thread
876 wrk2
.put (new TSS_Assert (orb_core
, 0, 1, true));
877 synch_with_worker (wrk2
);
878 event
.complete_event (leader_follower
);
879 wrk2
.put (new Shutdown
);
880 wrk2
.put (new TSS_Assert (orb_core
, 0, 0, false));
881 wrk2
.put (new Shutdown
);
885 // 9 - client leader thread then event loop thread
886 void Test_9 (TAO_ORB_Core
* orb_core
)
888 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
889 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
890 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
891 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #9 - Client Leader thread yields to Event Loop thread\n")));
897 // Test initial conditions
898 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
900 // Have the thread wait on an event
902 Test_Transport
transport (0, orb_core
);
903 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
905 // The thread is still waiting on the event and thus should
906 // now be a client-leader thread
907 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
908 // We now need to synchronise with the worker, to make sure it
909 // has processed all the cmds we have sent it
910 synch_with_worker (wrk1
);
912 // create an event loop thread - this means a new worker
915 wrk2
.put (new Event_Loop_Thread (leader_follower
,
918 // Unfortunately there is no way to test if the event loop thread is
919 // where we expect it to be (the
920 // wait_for_client_leader_to_complete() method). The only thing we
921 // could check is the event_loop_threads_waiting_ count, however,
922 // that is private to the TAO_Leader_Follower class.
924 // We need to get the client leader thread to return from
925 // process_cmd() and allow it to surrender leadership to the waiting
926 // event loop thread - send it a shutdown. The TAO_Leader_Follower
927 // code may call handle_events a few more times, however, since the
928 // cmd processing is shutdown (and won't be reactivated until the
929 // event is complete) handle_events will just return.
930 wrk1
.put (new Shutdown
);
932 // Now test the new event loop thread
933 wrk2
.put (new TSS_Assert(orb_core
, 1, 0, true));
934 // Wait until the event loop thread is running before we test
936 synch_with_worker (wrk2
);
938 // We can't test the client thread either - it is blocked in a call
939 // to the event's cond var's wait() method. All we can do is
940 // complete the event, which will signal the cond var
941 event
.complete_event (leader_follower
);
943 // The client thread should return from wait_for_event
944 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
945 // And the event loop thread should still be 'running'
946 wrk2
.put (new TSS_Assert(orb_core
, 1, 0, true));
948 // Some other misc checks
949 synch_with_worker (wrk1
);
950 TEST_ASSERT ((leader_follower
.has_clients () == 0));
952 // OK, lets shut everything down now - the event loop thread
953 // requires two shutdown cmds, one to exit the event loop thread cmd
954 // and the second to exit the main cmd processor
955 wrk2
.put (new Shutdown
);
956 // Incidently there is now no leader
957 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, false));
958 wrk2
.put (new Shutdown
);
961 // Shutdown the other worker
962 wrk1
.put (new Shutdown
);
966 // 10 - ET1437460 (second problem)
967 void Test_10 (TAO_ORB_Core
* orb_core
)
969 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
970 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
971 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
972 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #10 - ET1437460\n")));
974 // This scenario involves:
975 // - an event loop thread
976 // - which calls set_upcall_thread
977 // - then becomes a client leader
978 // - is dispatched and then becomes a client leader again
979 // (without calling set_upcall_thread)
980 // - calls set_upcall_thread
983 // Originally this caused the leaders_ member to get set to -1
984 // (the inner client leader still decremented leaders_ even
985 // though set_upcall_thread was called)
991 // Test initial conditions
992 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
994 // Have the thread become an event loop thread
995 wrk1
.put (new Event_Loop_Thread (leader_follower
,
998 // The thread should be an event loop thread
999 wrk1
.put (new TSS_Assert(orb_core
, 1, 0, true));
1001 // call set_upcall_thread
1002 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
1004 // The thread should no longer be an event loop thread
1005 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1007 // Have the thread wait on an event
1008 Test_LF_Event event
;
1009 Test_Transport
transport (0, orb_core
);
1010 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
1012 // The thread is still waiting on the event and thus should
1013 // now be a client-leader thread
1014 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
1016 // Have the thread wait on another event
1017 Test_LF_Event event2
;
1018 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
1020 // The thread is still waiting on the event and thus should now be a
1021 // client-leader thread (again)
1022 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
1024 // Call set_upcall_thread
1025 wrk1
.put (new Set_Upcall_Thread(leader_follower
));
1027 // We now need to synchronise with the worker, to make sure it
1028 // has processed all the cmds we have sent it
1029 synch_with_worker (wrk1
);
1031 // Now, complete the events, and then shutdown the cmd event loop
1032 event
.complete_event (leader_follower
);
1033 event2
.complete_event (leader_follower
);
1034 wrk1
.put (new Shutdown
);
1036 // The inner client has returned
1037 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
1039 // Shutdown the outter client thread
1040 wrk1
.put (new Shutdown
);
1042 // We should be back to the initial state
1043 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1045 // Now shutdown the event loop thread
1046 wrk1
.put (new Shutdown
);
1047 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1049 // Shutdown the other worker
1050 wrk1
.put (new Shutdown
);
1056 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1058 // scope TSS holder within main scope
1059 // so we're certain it gets destroyed before the
1060 // ACE object manager
1061 ACE_TSS
<Worker
> workers_
;
1062 // provide global access
1063 workers_p
= &workers_
;
1067 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
1068 if (parse_args (argc
, argv
) != 0)
1071 // Make sure the reactor is initialised in the leader_follower
1072 ACE_Reactor
* reactor
= orb
->orb_core ()->leader_follower ().reactor ();
1073 TEST_ASSERT ((reactor
!= 0));
1076 Test_1 (orb
->orb_core ());
1077 Test_2 (orb
->orb_core ());
1078 Test_3 (orb
->orb_core ());
1079 Test_4 (orb
->orb_core ());
1080 Test_5 (orb
->orb_core ());
1081 Test_6 (orb
->orb_core ());
1082 Test_7 (orb
->orb_core ());
1083 Test_8 (orb
->orb_core ());
1084 Test_9 (orb
->orb_core ());
1085 Test_10 (orb
->orb_core ());
1087 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing complete.\n")));
1092 catch (const CORBA::Exception
& ex
)
1094 ex
._tao_print_exception ("Exception caught:");
1104 ACE_TMAIN(int /*argc*/, ACE_TCHAR
* /*argv*/ [])
1106 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("This test only makes sense in an MT build.\n")));
1111 #endif // !ACE_HAS_THREADS
1113 // ****************************************************************