1 #include "ace/Get_Opt.h"
2 #include "ace/Global_Macros.h"
4 #include "tao/ORB_Core.h"
5 #include "tao/default_resource.h"
6 #include "tao/Leader_Follower.h"
7 #include "tao/LF_Event_Loop_Thread_Helper.h"
8 #include "tao/LF_Event.h"
9 #include "tao/Transport.h"
10 #include "ace/Task_T.h"
11 #include "tao/TAO_Export.h"
12 #include "ace/TP_Reactor.h"
15 #if defined (ACE_HAS_THREADS)
20 // TEST_ASSERT is exactly the same as ACE_ASSERT except it is active
21 // for both debug and *release* builds.
22 #define TEST_ASSERT(X) \
24 ACE_ERROR ((LM_ERROR, ACE_TEXT ("TEST_ASSERT: (%P|%t) file %N, line %l assertion failed for '%C'.%a\n"), \
28 #define TSS_ASSERT(TSS, LF, ELT, CLT, LA) \
29 TEST_ASSERT ((TSS->event_loop_thread_ == ELT)); \
30 TEST_ASSERT ((TSS->client_leader_thread_ == CLT)); \
31 TEST_ASSERT ((LF.leader_available () == LA));
36 parse_args (int argc
, ACE_TCHAR
*argv
[])
38 ACE_Get_Opt
get_opts (argc
, argv
, ACE_TEXT ("d"));
41 while ((c
= get_opts ()) != -1)
50 ACE_ERROR_RETURN ((LM_ERROR
,
57 // Indicates sucessful parsing of the command line
61 class Command
: public ACE_Message_Block
64 virtual int execute (Worker
*) = 0;
68 //////////////////////////////////////////////////////////////////////
69 // NOTE: Do *NOT* put the same msg into the msg queue more than once.
70 // This will confuse the msg queue and result it in dropping messages
71 //////////////////////////////////////////////////////////////////////
72 class Worker
: public ACE_Task
<ACE_SYNCH
>
80 virtual int close (u_long
= 0);
81 virtual int put (ACE_Message_Block
* mblk
, ACE_Time_Value
* tv
= 0);
83 void shutdown (bool do_shutdown
);
90 ACE_TSS
<Worker
> *workers_p
= 0;
91 #define workers (*workers_p)
96 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Worker thread starting up.\n")));
97 // Register this worker
98 workers
.ts_object (const_cast<Worker
*> (this));
100 while (!shutdown_
&& retval
!= -1)
102 retval
= this->process_cmd ();
105 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Worker thread shutting down.\n")));
109 int Worker::close (u_long
)
111 // de-register this worker, otherwise the ACE_TSS will try to destroy it
112 workers
.ts_object (0);
116 int Worker::put (ACE_Message_Block
* mblk
, ACE_Time_Value
* tv
)
118 return this->putq (mblk
, tv
);
121 int Worker::process_cmd ()
123 ACE_Message_Block
*mb
= 0;
124 if (this->getq (mb
, 0) == -1)
126 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("Error calling getq: $!\n")));
127 // Strangely, message queues return this instead of ETIME
128 if (errno
== EWOULDBLOCK
|| errno
== ESHUTDOWN
)
132 Command
* cmd
= dynamic_cast <Command
*> (mb
);
133 ACE_ASSERT (cmd
!= 0);
139 void Worker::shutdown (bool do_shutdown
)
141 shutdown_
= do_shutdown
;
144 bool Worker::shutdown ()
149 class Test_Reactor
: public ACE_TP_Reactor
152 Test_Reactor (size_t max_number_of_handles
,
153 bool restart
= false,
154 ACE_Sig_Handler
*sh
= 0,
155 ACE_Timer_Queue
*tq
= 0,
156 bool mask_signals
= true,
157 int s_queue
= ACE_Select_Reactor_Token::FIFO
)
158 : ACE_TP_Reactor(max_number_of_handles
, restart
, sh
, tq
, mask_signals
, s_queue
) {}
161 // This is the method that the Leader_Follower object calls.
162 virtual int handle_events (ACE_Time_Value
* = 0)
164 if (TAO_debug_level
> 10)
165 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Test_Reactor::handle_events\n")));
166 // This is called by client leader threads. Note, the loop here
167 // glosses over the fact that the Leader_Follower code does not
168 // work quite the way we want it to. Namely, this logic:
169 // 1 - client leader thread detects when there are event loop
171 // 2 - client leader wakes up event loop threads via broadcast
172 // 3 - client leader temporarily gives up lock to allow event loop
173 // threads to take over leadership
174 // 4 - client leader thread takes lock again and loops around to
176 // The problem is that the gap between 3 & 4 is often not long
177 // enough for the event loop threads to get switched in and take
178 // ovwnership of the lock, even though the client leader thread
180 // Thus this code, once shutdown, will continuely return and thus
181 // give the leader follower multiple chances to hand off to an
182 // event loop thread. This is not ideal but it will have to do
183 // until the leader follower code is fixed (if possible)
184 while (!workers
->shutdown())
185 // call this thread's (worker's) process_cmd method
186 workers
->process_cmd ();
190 virtual int handle_events (ACE_Time_Value
&)
192 return this->handle_events ();
196 // Our own Resource_Factory for testing purposes. This just returns
197 // our Test_Reactor to the Leader_Follower object via the ORB_Core.
198 class Test_Resource_Factory
: public TAO_Default_Resource_Factory
201 Test_Resource_Factory ()
204 virtual ACE_Reactor_Impl
* allocate_reactor_impl () const
206 ACE_Reactor_Impl
*impl
= 0;
207 ACE_NEW_RETURN (impl
,
208 Test_Reactor (ACE::max_handles (),
212 this->reactor_mask_signals_
,
213 ACE_Select_Reactor_Token::LIFO
),
221 // force export flag otherwise Windows will complain
222 #define TAO_Test_Export ACE_Proper_Export_Flag
224 ACE_FACTORY_DEFINE (TAO_Test
, Test_Resource_Factory
)
225 ACE_STATIC_SVC_DEFINE (Test_Resource_Factory
,
226 ACE_TEXT ("Resource_Factory"),
228 &ACE_SVC_NAME (Test_Resource_Factory
),
229 ACE_Service_Type::DELETE_THIS
230 | ACE_Service_Type::DELETE_OBJ
,
232 ACE_STATIC_SVC_REQUIRE (Test_Resource_Factory
);
234 int load_test_resources
=
235 ACE_Service_Config::process_directive (ace_svc_desc_Test_Resource_Factory
);
237 class Test_LF_Event
: public TAO_LF_Event
243 void complete_event (TAO_Leader_Follower
&lf
)
246 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Completing event\n")));
247 this->state_changed (TAO_LF_Event::LFS_SUCCESS
, lf
);
251 virtual bool successful_i () const
253 return this->state_
== TAO_LF_Event::LFS_SUCCESS
;
256 virtual bool error_detected_i () const
258 return (this->state_
== TAO_LF_Event::LFS_FAILURE
259 || this->state_
== TAO_LF_Event::LFS_TIMEOUT
260 || this->state_
== TAO_LF_Event::LFS_CONNECTION_CLOSED
);
262 virtual void state_changed_i (LFS_STATE new_state
)
264 this->state_
= new_state
;
267 virtual bool is_state_final () const
269 if (this->state_
== TAO_LF_Event::LFS_TIMEOUT
||
270 this->state_
== TAO_LF_Event::LFS_FAILURE
)
276 class Test_Transport
: public TAO_Transport
279 Test_Transport (CORBA::ULong tag
,
280 TAO_ORB_Core
*orb_core
)
281 : TAO_Transport (tag
, orb_core
)
284 virtual int send_message (TAO_OutputCDR
&,
286 TAO_ServerRequest
* = 0,
287 TAO_Message_Semantics
= TAO_Message_Semantics (),
288 ACE_Time_Value
* = 0)
293 virtual ssize_t
send (iovec
*, int ,
295 const ACE_Time_Value
* = 0)
300 virtual ssize_t
recv (char *,
302 const ACE_Time_Value
* = 0)
307 virtual int messaging_init (CORBA::Octet
,
313 virtual ACE_Event_Handler
* event_handler_i ()
319 virtual TAO_Connection_Handler
* connection_handler_i ()
324 virtual int send_request (TAO_Stub
*,
327 TAO_Message_Semantics
,
334 class Shutdown
: public Command
337 virtual int execute (Worker
* worker
)
340 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Shutdown cmd\n")));
341 worker
->shutdown (true);
346 class TSS_Assert
: public Command
349 TSS_Assert (TAO_ORB_Core
* orb_core
,
352 bool leader_available
)
353 : orb_core_ (orb_core
),
354 elt_count_ (elt_count
),
355 clt_count_ (clt_count
),
356 leader_available_ (leader_available
)
359 virtual int execute (Worker
*)
362 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing TSS_Assert(%d,%d,%d) cmd\n"),
363 elt_count_
, clt_count_
, leader_available_
));
364 TAO_Leader_Follower
&leader_follower
= orb_core_
->leader_follower ();
365 TAO_ORB_Core_TSS_Resources
* tss
= orb_core_
->get_tss_resources ();
366 TSS_ASSERT (tss
, leader_follower
,
367 elt_count_
, clt_count_
, leader_available_
);
371 TAO_ORB_Core
* orb_core_
;
372 const int elt_count_
;
373 const int clt_count_
;
374 const bool leader_available_
;
377 class Wait_For_Event
: public Command
380 Wait_For_Event (Test_LF_Event
& event
,
381 Test_Transport
& transport
,
382 TAO_Leader_Follower
& lf
)
384 transport_ (transport
),
387 virtual int execute (Worker
*)
390 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Wait_For_Event cmd\n")));
391 int retval
= lf_
.wait_for_event (&event_
, &transport_
, 0);
392 // The worker has probably been shutdown in order for the client
393 // leader event loop to exit - reactivate the worker so it from
394 // process msgs once we return
395 workers
->shutdown (false);
399 Test_LF_Event
& event_
;
400 Test_Transport
& transport_
;
401 TAO_Leader_Follower
& lf_
;
404 class Cond_Signal
: public Command
413 virtual int execute (Worker
*)
416 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Cond_Signal cmd\n")));
417 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX
, guard
,this->cond_
.mutex (), 0);
418 return this->cond_
.signal ();
420 TAO_SYNCH_MUTEX
& lock ()
424 ACE_Condition_Thread_Mutex
& cond ()
428 virtual ACE_Message_Block
*release ()
430 // we need to only release once both the main and worker thread
431 // are done with this object - each signals this by calling this
434 if (this->ref_count_
== 0)
435 return ACE_Message_Block::release ();
440 TAO_SYNCH_MUTEX lock_
;
441 ACE_Condition_Thread_Mutex cond_
;
445 class Event_Loop_Thread
: public Command
448 Event_Loop_Thread(TAO_Leader_Follower
& lf
,
449 TAO_LF_Strategy
& lf_strategy
)
450 : lf_ (lf
), lf_strategy_ (lf_strategy
)
453 virtual int execute (Worker
* worker
)
456 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Event_Loop_Thread cmd\n")));
457 TAO_LF_Event_Loop_Thread_Helper
elt (lf_
, lf_strategy_
, 0);
458 while (!worker
->shutdown())
459 worker
->process_cmd ();
460 // The worker has been shutdown in order for this event loop
461 // thread to exit - reactivate the worker so it from process msgs
463 worker
->shutdown (false);
467 TAO_Leader_Follower
& lf_
;
468 TAO_LF_Strategy
& lf_strategy_
;
471 class Set_Upcall_Thread
: public Command
474 Set_Upcall_Thread (TAO_Leader_Follower
& lf
)
478 virtual int execute (Worker
*)
481 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("(%P|%t) Executing Set_Upcall_Thread cmd\n")));
482 lf_
.set_upcall_thread ();
487 TAO_Leader_Follower
& lf_
;
490 void synch_with_worker (Worker
& worker
)
492 // This object is released by the worker thread after it has
494 Cond_Signal
* cond
= new Cond_Signal
;
496 ACE_GUARD (TAO_SYNCH_MUTEX
, guard
, cond
->lock ());
498 ACE_Time_Value
tv (1, 0);
499 tv
+= ACE_OS::gettimeofday ();
500 TEST_ASSERT ((cond
->cond ().wait (&tv
) == 0));
504 // 1 - Simple event loop thread test
505 void Test_1 (TAO_ORB_Core
* orb_core
)
507 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
508 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
509 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
511 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
512 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #1 - Simple Event Loop call\n")));
514 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
516 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
517 elt (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
520 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
523 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
526 // 2 - Nested event loop threads - no set_upcall_thread call
527 void Test_2 (TAO_ORB_Core
* orb_core
)
529 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
530 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
531 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
532 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
533 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #2 - 2 nested Event Loop calls\n")));
535 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
537 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
538 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
540 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
542 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
543 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
545 TSS_ASSERT (tss
, leader_follower
, 2, 0, true);
548 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
551 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
554 // 3 - Nested event loop threads - with set_upcall_thread call
555 void Test_3 (TAO_ORB_Core
* orb_core
)
557 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
558 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
559 TAO_ORB_Core_TSS_Resources
* tss
= orb_core
->get_tss_resources ();
560 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
561 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #3 - 2 nested Event Loop calls with set_upcall_thread\n")));
563 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
565 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
566 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
568 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
570 leader_follower
.set_upcall_thread ();
571 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
573 std::unique_ptr
<TAO_LF_Event_Loop_Thread_Helper
>
574 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower
,
576 TSS_ASSERT (tss
, leader_follower
, 1, 0, true);
579 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
582 TSS_ASSERT (tss
, leader_follower
, 0, 0, false);
585 // 4 - client leader thread
586 void Test_4 (TAO_ORB_Core
* orb_core
)
588 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
589 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
590 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #4 - Simple Client Leader thread\n")));
596 // Test initial conditions
597 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
599 // Have the thread wait on an event
601 Test_Transport
transport (0, orb_core
);
602 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
604 // The thread is still waiting on the event and thus should
605 // now be a client-leader thread
606 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
608 // Synchronise with the thread before we complete its event
609 synch_with_worker (wrk1
);
610 // Complete the event
611 event
.complete_event (leader_follower
);
613 // The thread is still inside handle_events - shutdown the
615 wrk1
.put (new Shutdown
);
617 // The thread should now return from being a client leader
618 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
620 wrk1
.put (new Shutdown
);
624 // 5 - nested client leader thread
625 void Test_5 (TAO_ORB_Core
* orb_core
)
627 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
628 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
629 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #5 - 2 nested Client Leader calls\n")));
635 // Test initial conditions
636 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
638 // Have the thread wait on an event
640 Test_Transport
transport (0, orb_core
);
641 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
643 // The thread is still waiting on the event and thus should
644 // now be a client-leader thread
645 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
647 // Wait for another event
648 Test_LF_Event event2
;
649 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
651 // The thread is still waiting on the event and thus should
652 // now be a client-leader thread
653 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
655 // Synchronise with the thread before we complete its event
656 synch_with_worker (wrk1
);
658 // Complete the first event - nothing should happen
659 event
.complete_event (leader_follower
);
661 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
663 // Complete the second event - everything should unwind
664 synch_with_worker (wrk1
);
665 event2
.complete_event (leader_follower
);
667 // The thread is still inside handle_events - shutdown the
668 // event processing for the inner client leader
669 wrk1
.put (new Shutdown
);
671 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
673 // The thread is now in the handle_events for the outter
674 // client-leader - the event is already complete so just
675 // shutdown the cmd processing.
676 wrk1
.put (new Shutdown
);
678 // We should now we back at our initial state.
679 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
681 wrk1
.put (new Shutdown
);
685 // 6 - nested client leader thread with set_upcall_thread
686 void Test_6 (TAO_ORB_Core
* orb_core
)
688 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
689 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
690 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #6 - 2 nested Client Leader calls with set_upcall_thread\n")));
696 // Test initial conditions
697 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
699 // Have the thread wait on an event
701 Test_Transport
transport (0, orb_core
);
702 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
704 // The thread is still waiting on the event and thus should
705 // now be a client-leader thread
706 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
708 // Call set_upcall_thread
709 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
710 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
712 // Wait for another event
713 Test_LF_Event event2
;
714 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
716 // The thread is still waiting on the event and thus should
717 // now be a client-leader thread
718 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
720 // Synchronise with the thread before we complete its event
721 synch_with_worker (wrk1
);
723 // Complete the first event - nothing should happen
724 event
.complete_event (leader_follower
);
726 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
728 // Complete the second event - everything should unwind
729 synch_with_worker (wrk1
);
730 event2
.complete_event (leader_follower
);
732 // The thread is still inside handle_events - shutdown the
733 // event processing for the inner client leader
734 wrk1
.put (new Shutdown
);
736 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
738 // The thread is now in the handle_events for the outter
739 // client-leader - the event is already complete so just
740 // shutdown the cmd processing.
741 wrk1
.put (new Shutdown
);
743 // We should now we back at our initial state.
744 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
746 wrk1
.put (new Shutdown
);
750 // 7 - 2 client leader threads with set_upcall_thread
751 void Test_7 (TAO_ORB_Core
* orb_core
)
753 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
754 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
755 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #7 - Client Leader yields to another client thread\n")));
761 // Test initial conditions
762 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
764 // Have the thread wait on an event
766 Test_Transport
transport (0, orb_core
);
767 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
769 // The thread is still waiting on the event and thus should
770 // now be a client-leader thread
771 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
772 // Wait for the firs thread to be the client leader before we start
774 synch_with_worker (wrk1
);
776 // Create another worker and have it do the same
779 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, true));
780 // Make sure this test is complete before the Set_Upcall_Thread below
781 synch_with_worker (wrk2
);
782 Test_LF_Event event2
;
783 wrk2
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
784 // Note, we can't test the new thread here - it is block waiting on
785 // the follower cond var
786 // wrk2.put (new TSS_Assert(orb_core, 0, 1, true));
788 // Call set_upcall_thread on the first thread
789 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
790 // Our second thread should now be the client leader and the first
791 // thread should not. Note, we need to first synchronise with
792 // thread 2 (to make sure it is in handle_events) to avoid race
794 synch_with_worker (wrk2
);
795 wrk2
.put (new TSS_Assert(orb_core
, 0, 1, true));
796 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
798 // We should now be able to shutdown the first thread - have it
799 // return from handle_events and complete its event. If it has to
800 // wait it will just go back to being a follower
801 wrk1
.put (new Shutdown
);
802 event
.complete_event (leader_follower
);
803 synch_with_worker (wrk1
);
804 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
805 // We can now shut-down the thread
806 wrk1
.put (new Shutdown
);
809 // Now shut-down the second thread
810 event2
.complete_event (leader_follower
);
811 wrk2
.put (new Shutdown
);
812 synch_with_worker (wrk2
);
813 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, false));
814 wrk2
.put (new Shutdown
);
818 // 8 - client becomes leader when event loop thread dispatched
819 void Test_8 (TAO_ORB_Core
* orb_core
)
821 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
822 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
823 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
824 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #8 - client becomes leader when event thread dispatched\n")));
830 // Test initial conditions
831 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
833 // Have the thread become an event loop thread
834 wrk1
.put (new Event_Loop_Thread (leader_follower
,
836 wrk1
.put (new TSS_Assert(orb_core
, 1, 0, true));
838 // Before we start the next thread synchronise with the first
839 synch_with_worker (wrk1
);
841 // Start another thread and have it wait on an event
844 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, true));
846 Test_Transport
transport (0, orb_core
);
847 synch_with_worker (wrk2
);
848 wrk2
.put (new Wait_For_Event(event
, transport
, leader_follower
));
850 // The new thread is a follower and thus is waiting on the follower
851 // cond var - we can't test this other than to check if the leader
852 // follower has clients, however, because we can't synchronise with
853 // that thread such a test would contain a race condition.
855 // Now dispatch the event loop thread by having it call set_upcall_thread
856 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
858 // the first worker should have given up leadership and the second
859 // thread should have assumed leadership. We have to synchronise
860 // with both threads before we can test anything, otherwise we could
861 // catch the window where there is no leader.
862 synch_with_worker (wrk1
);
863 synch_with_worker (wrk2
);
864 wrk1
.put (new TSS_Assert (orb_core
, 0, 0, true));
865 wrk2
.put (new TSS_Assert (orb_core
, 0, 1, true));
867 // OK, now shut everything down - first the event loop thread
868 wrk1
.put (new Shutdown
);
869 wrk1
.put (new TSS_Assert (orb_core
, 0, 0, true));
870 wrk1
.put (new Shutdown
);
873 // Now the client thread
874 wrk2
.put (new TSS_Assert (orb_core
, 0, 1, true));
875 synch_with_worker (wrk2
);
876 event
.complete_event (leader_follower
);
877 wrk2
.put (new Shutdown
);
878 wrk2
.put (new TSS_Assert (orb_core
, 0, 0, false));
879 wrk2
.put (new Shutdown
);
883 // 9 - client leader thread then event loop thread
884 void Test_9 (TAO_ORB_Core
* orb_core
)
886 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
887 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
888 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
889 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #9 - Client Leader thread yields to Event Loop thread\n")));
895 // Test initial conditions
896 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
898 // Have the thread wait on an event
900 Test_Transport
transport (0, orb_core
);
901 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
903 // The thread is still waiting on the event and thus should
904 // now be a client-leader thread
905 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
906 // We now need to synchronise with the worker, to make sure it
907 // has processed all the cmds we have sent it
908 synch_with_worker (wrk1
);
910 // create an event loop thread - this means a new worker
913 wrk2
.put (new Event_Loop_Thread (leader_follower
,
916 // Unfortunately there is no way to test if the event loop thread is
917 // where we expect it to be (the
918 // wait_for_client_leader_to_complete() method). The only thing we
919 // could check is the event_loop_threads_waiting_ count, however,
920 // that is private to the TAO_Leader_Follower class.
922 // We need to get the client leader thread to return from
923 // process_cmd() and allow it to surrender leadership to the waiting
924 // event loop thread - send it a shutdown. The TAO_Leader_Follower
925 // code may call handle_events a few more times, however, since the
926 // cmd processing is shutdown (and won't be reactivated until the
927 // event is complete) handle_events will just return.
928 wrk1
.put (new Shutdown
);
930 // Now test the new event loop thread
931 wrk2
.put (new TSS_Assert(orb_core
, 1, 0, true));
932 // Wait until the event loop thread is running before we test
934 synch_with_worker (wrk2
);
936 // We can't test the client thread either - it is blocked in a call
937 // to the event's cond var's wait() method. All we can do is
938 // complete the event, which will signal the cond var
939 event
.complete_event (leader_follower
);
941 // The client thread should return from wait_for_event
942 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, true));
943 // And the event loop thread should still be 'running'
944 wrk2
.put (new TSS_Assert(orb_core
, 1, 0, true));
946 // Some other misc checks
947 synch_with_worker (wrk1
);
948 TEST_ASSERT ((leader_follower
.has_clients () == 0));
950 // OK, lets shut everything down now - the event loop thread
951 // requires two shutdown cmds, one to exit the event loop thread cmd
952 // and the second to exit the main cmd processor
953 wrk2
.put (new Shutdown
);
954 // Incidently there is now no leader
955 wrk2
.put (new TSS_Assert(orb_core
, 0, 0, false));
956 wrk2
.put (new Shutdown
);
959 // Shutdown the other worker
960 wrk1
.put (new Shutdown
);
964 // 10 - ET1437460 (second problem)
965 void Test_10 (TAO_ORB_Core
* orb_core
)
967 TAO_LF_Strategy
&lf_strategy
= orb_core
->lf_strategy ();
968 TAO_Leader_Follower
&leader_follower
= orb_core
->leader_follower ();
969 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("==========\n")));
970 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("TEST #10 - ET1437460\n")));
972 // This scenario involves:
973 // - an event loop thread
974 // - which calls set_upcall_thread
975 // - then becomes a client leader
976 // - is dispatched and then becomes a client leader again
977 // (without calling set_upcall_thread)
978 // - calls set_upcall_thread
981 // Originally this caused the leaders_ member to get set to -1
982 // (the inner client leader still decremented leaders_ even
983 // though set_upcall_thread was called)
989 // Test initial conditions
990 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
992 // Have the thread become an event loop thread
993 wrk1
.put (new Event_Loop_Thread (leader_follower
,
996 // The thread should be an event loop thread
997 wrk1
.put (new TSS_Assert(orb_core
, 1, 0, true));
999 // call set_upcall_thread
1000 wrk1
.put (new Set_Upcall_Thread (leader_follower
));
1002 // The thread should no longer be an event loop thread
1003 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1005 // Have the thread wait on an event
1006 Test_LF_Event event
;
1007 Test_Transport
transport (0, orb_core
);
1008 wrk1
.put (new Wait_For_Event(event
, transport
, leader_follower
));
1010 // The thread is still waiting on the event and thus should
1011 // now be a client-leader thread
1012 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
1014 // Have the thread wait on another event
1015 Test_LF_Event event2
;
1016 wrk1
.put (new Wait_For_Event(event2
, transport
, leader_follower
));
1018 // The thread is still waiting on the event and thus should now be a
1019 // client-leader thread (again)
1020 wrk1
.put (new TSS_Assert(orb_core
, 0, 2, true));
1022 // Call set_upcall_thread
1023 wrk1
.put (new Set_Upcall_Thread(leader_follower
));
1025 // We now need to synchronise with the worker, to make sure it
1026 // has processed all the cmds we have sent it
1027 synch_with_worker (wrk1
);
1029 // Now, complete the events, and then shutdown the cmd event loop
1030 event
.complete_event (leader_follower
);
1031 event2
.complete_event (leader_follower
);
1032 wrk1
.put (new Shutdown
);
1034 // The inner client has returned
1035 wrk1
.put (new TSS_Assert(orb_core
, 0, 1, true));
1037 // Shutdown the outter client thread
1038 wrk1
.put (new Shutdown
);
1040 // We should be back to the initial state
1041 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1043 // Now shutdown the event loop thread
1044 wrk1
.put (new Shutdown
);
1045 wrk1
.put (new TSS_Assert(orb_core
, 0, 0, false));
1047 // Shutdown the other worker
1048 wrk1
.put (new Shutdown
);
1054 ACE_TMAIN(int argc
, ACE_TCHAR
*argv
[])
1056 // scope TSS holder within main scope
1057 // so we're certain it gets destroyed before the
1058 // ACE object manager
1059 ACE_TSS
<Worker
> workers_
;
1060 // provide global access
1061 workers_p
= &workers_
;
1065 CORBA::ORB_var orb
= CORBA::ORB_init (argc
, argv
);
1066 if (parse_args (argc
, argv
) != 0)
1069 // Make sure the reactor is initialised in the leader_follower
1070 ACE_Reactor
* reactor
= orb
->orb_core ()->leader_follower ().reactor ();
1071 TEST_ASSERT ((reactor
!= 0));
1074 Test_1 (orb
->orb_core ());
1075 Test_2 (orb
->orb_core ());
1076 Test_3 (orb
->orb_core ());
1077 Test_4 (orb
->orb_core ());
1078 Test_5 (orb
->orb_core ());
1079 Test_6 (orb
->orb_core ());
1080 Test_7 (orb
->orb_core ());
1081 Test_8 (orb
->orb_core ());
1082 Test_9 (orb
->orb_core ());
1083 Test_10 (orb
->orb_core ());
1085 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("Testing complete.\n")));
1090 catch (const CORBA::Exception
& ex
)
1092 ex
._tao_print_exception ("Exception caught:");
1102 ACE_TMAIN(int /*argc*/, ACE_TCHAR
* /*argv*/ [])
1104 ACE_DEBUG ((LM_DEBUG
, ACE_TEXT ("This test only makes sense in an MT build.\n")));
1109 #endif // !ACE_HAS_THREADS
1111 // ****************************************************************