ACE+TAO-7_0_8
[ACE_TAO.git] / TAO / tests / Bug_3531b_Regression / server.cpp
blob44221124f4098dffb48b54a0b4ff293842d22413
1 #include "ace/Get_Opt.h"
2 #include "ace/Global_Macros.h"
3 #include "ace/Task.h"
4 #include "tao/ORB_Core.h"
5 #include "tao/default_resource.h"
6 #include "tao/Leader_Follower.h"
7 #include "tao/LF_Event_Loop_Thread_Helper.h"
8 #include "tao/LF_Event.h"
9 #include "tao/Transport.h"
10 #include "ace/Task_T.h"
11 #include "tao/TAO_Export.h"
12 #include "ace/TP_Reactor.h"
13 #include <memory>
15 #if defined (ACE_HAS_THREADS)
17 int nthreads = 4;
18 bool debug = false;
20 // TEST_ASSERT is exactly the same as ACE_ASSERT except it is active
21 // for both debug and *release* builds.
22 #define TEST_ASSERT(X) \
23 do { if(!(X)) { \
24 ACE_ERROR ((LM_ERROR, ACE_TEXT ("TEST_ASSERT: (%P|%t) file %N, line %l assertion failed for '%C'.%a\n"), \
25 #X, -1)); \
26 } } while (0)
28 #define TSS_ASSERT(TSS, LF, ELT, CLT, LA) \
29 TEST_ASSERT ((TSS->event_loop_thread_ == ELT)); \
30 TEST_ASSERT ((TSS->client_leader_thread_ == CLT)); \
31 TEST_ASSERT ((LF.leader_available () == LA));
33 class Worker;
35 int
36 parse_args (int argc, ACE_TCHAR *argv[])
38 ACE_Get_Opt get_opts (argc, argv, ACE_TEXT ("d"));
39 int c;
41 while ((c = get_opts ()) != -1)
42 switch (c)
44 case 'd':
45 debug = true;
46 break;
48 case '?':
49 default:
50 ACE_ERROR_RETURN ((LM_ERROR,
51 "usage: %s "
52 "-d"
53 "\n",
54 argv [0]),
55 -1);
57 // Indicates sucessful parsing of the command line
58 return 0;
61 class Command: public ACE_Message_Block
63 public:
64 virtual int execute (Worker*) = 0;
68 //////////////////////////////////////////////////////////////////////
69 // NOTE: Do *NOT* put the same msg into the msg queue more than once.
70 // This will confuse the msg queue and result it in dropping messages
71 //////////////////////////////////////////////////////////////////////
72 class Worker: public ACE_Task<ACE_SYNCH>
74 public:
75 Worker (void)
76 : shutdown_ (false)
79 virtual int svc ();
80 virtual int close (u_long = 0);
81 virtual int put (ACE_Message_Block * mblk, ACE_Time_Value * tv = 0);
82 int process_cmd (void);
83 void shutdown (bool do_shutdown);
84 bool shutdown (void);
86 private:
87 bool shutdown_;
90 ACE_TSS<Worker> *workers_p = 0;
91 #define workers (*workers_p)
93 int Worker::svc (void)
95 if (debug)
96 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Worker thread starting up.\n")));
97 // Register this worker
98 workers.ts_object (const_cast<Worker*> (this));
99 int retval = 0;
100 while (!shutdown_ && retval != -1)
102 retval = this->process_cmd ();
104 if (debug)
105 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Worker thread shutting down.\n")));
106 return retval;
109 int Worker::close (u_long)
111 // de-register this worker, otherwise the ACE_TSS will try to destroy it
112 workers.ts_object (0);
113 return 0;
116 int Worker::put (ACE_Message_Block * mblk, ACE_Time_Value * tv)
118 return this->putq (mblk, tv);
121 int Worker::process_cmd (void)
123 ACE_Message_Block *mb = 0;
124 if (this->getq (mb, 0) == -1)
126 ACE_ERROR ((LM_ERROR, ACE_TEXT ("Error calling getq: $!\n")));
127 // Strangely, message queues return this instead of ETIME
128 if (errno == EWOULDBLOCK || errno == ESHUTDOWN)
129 return 0;
130 return -1;
132 Command* cmd = dynamic_cast <Command*> (mb);
133 ACE_ASSERT (cmd != 0);
134 cmd->execute (this);
135 cmd->release ();
136 return 0;
139 void Worker::shutdown (bool do_shutdown)
141 shutdown_ = do_shutdown;
144 bool Worker::shutdown (void)
146 return shutdown_;
149 class Test_Reactor: public ACE_TP_Reactor
151 public:
152 Test_Reactor (size_t max_number_of_handles,
153 bool restart = false,
154 ACE_Sig_Handler *sh = 0,
155 ACE_Timer_Queue *tq = 0,
156 bool mask_signals = true,
157 int s_queue = ACE_Select_Reactor_Token::FIFO)
158 : ACE_TP_Reactor(max_number_of_handles, restart, sh, tq, mask_signals, s_queue) {}
161 // This is the method that the Leader_Follower object calls.
162 virtual int handle_events (ACE_Time_Value * = 0)
164 if (TAO_debug_level > 10)
165 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Test_Reactor::handle_events\n")));
166 // This is called by client leader threads. Note, the loop here
167 // glosses over the fact that the Leader_Follower code does not
168 // work quite the way we want it to. Namely, this logic:
169 // 1 - client leader thread detects when there are event loop
170 // threads waiting
171 // 2 - client leader wakes up event loop threads via broadcast
172 // 3 - client leader temporarily gives up lock to allow event loop
173 // threads to take over leadership
174 // 4 - client leader thread takes lock again and loops around to
175 // become follower
176 // The problem is that the gap between 3 & 4 is often not long
177 // enough for the event loop threads to get switched in and take
178 // ovwnership of the lock, even though the client leader thread
179 // does a thr_yield!
180 // Thus this code, once shutdown, will continuely return and thus
181 // give the leader follower multiple chances to hand off to an
182 // event loop thread. This is not ideal but it will have to do
183 // until the leader follower code is fixed (if possible)
184 while (!workers->shutdown())
185 // call this thread's (worker's) process_cmd method
186 workers->process_cmd ();
187 return 0;
190 virtual int handle_events (ACE_Time_Value &)
192 return this->handle_events ();
196 // Our own Resource_Factory for testing purposes. This just returns
197 // our Test_Reactor to the Leader_Follower object via the ORB_Core.
198 class Test_Resource_Factory: public TAO_Default_Resource_Factory
200 public:
201 Test_Resource_Factory ()
204 virtual ACE_Reactor_Impl* allocate_reactor_impl () const
206 ACE_Reactor_Impl *impl = 0;
207 ACE_NEW_RETURN (impl,
208 Test_Reactor (ACE::max_handles (),
210 (ACE_Sig_Handler*)0,
211 (ACE_Timer_Queue*)0,
212 this->reactor_mask_signals_,
213 ACE_Select_Reactor_Token::LIFO),
215 return impl;
218 private:
221 // force export flag otherwise Windows will complain
222 #define TAO_Test_Export ACE_Proper_Export_Flag
224 ACE_FACTORY_DEFINE (TAO_Test, Test_Resource_Factory)
225 ACE_STATIC_SVC_DEFINE (Test_Resource_Factory,
226 ACE_TEXT ("Resource_Factory"),
227 ACE_SVC_OBJ_T,
228 &ACE_SVC_NAME (Test_Resource_Factory),
229 ACE_Service_Type::DELETE_THIS
230 | ACE_Service_Type::DELETE_OBJ,
232 ACE_STATIC_SVC_REQUIRE (Test_Resource_Factory);
234 int load_test_resources =
235 ACE_Service_Config::process_directive (ace_svc_desc_Test_Resource_Factory);
237 class Test_LF_Event: public TAO_LF_Event
239 public:
240 Test_LF_Event()
243 void complete_event (TAO_Leader_Follower &lf)
245 if (debug)
246 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Completing event\n")));
247 this->state_changed (TAO_LF_Event::LFS_SUCCESS, lf);
250 protected:
251 virtual bool successful_i () const
253 return this->state_ == TAO_LF_Event::LFS_SUCCESS;
256 virtual bool error_detected_i () const
258 return (this->state_ == TAO_LF_Event::LFS_FAILURE
259 || this->state_ == TAO_LF_Event::LFS_TIMEOUT
260 || this->state_ == TAO_LF_Event::LFS_CONNECTION_CLOSED);
262 virtual void state_changed_i (LFS_STATE new_state)
264 this->state_ = new_state;
267 virtual bool is_state_final () const
269 if (this->state_ == TAO_LF_Event::LFS_TIMEOUT ||
270 this->state_ == TAO_LF_Event::LFS_FAILURE)
271 return true;
272 return false;
276 class Test_Transport : public TAO_Transport
278 public:
279 Test_Transport (CORBA::ULong tag,
280 TAO_ORB_Core *orb_core)
281 : TAO_Transport (tag, orb_core)
284 virtual int send_message (TAO_OutputCDR &,
285 TAO_Stub * = 0,
286 TAO_ServerRequest * = 0,
287 TAO_Message_Semantics = TAO_Message_Semantics (),
288 ACE_Time_Value * = 0)
290 return 0;
293 virtual ssize_t send (iovec *, int ,
294 size_t &,
295 const ACE_Time_Value * = 0)
297 return 0;
300 virtual ssize_t recv (char *,
301 size_t,
302 const ACE_Time_Value * = 0)
304 return 0;
307 virtual int messaging_init (CORBA::Octet,
308 CORBA::Octet)
310 return 0;
313 virtual ACE_Event_Handler * event_handler_i (void)
315 return 0;
318 protected:
319 virtual TAO_Connection_Handler * connection_handler_i (void)
321 return 0;
324 virtual int send_request (TAO_Stub *,
325 TAO_ORB_Core *,
326 TAO_OutputCDR &,
327 TAO_Message_Semantics,
328 ACE_Time_Value *)
330 return 0;
335 class Shutdown: public Command
337 public:
338 virtual int execute (Worker* worker)
340 if (debug)
341 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Shutdown cmd\n")));
342 worker->shutdown (true);
343 return 0;
348 class TSS_Assert: public Command
350 public:
351 TSS_Assert (TAO_ORB_Core* orb_core,
352 int elt_count,
353 int clt_count,
354 bool leader_available)
355 : orb_core_ (orb_core),
356 elt_count_ (elt_count),
357 clt_count_ (clt_count),
358 leader_available_ (leader_available)
361 virtual int execute (Worker*)
363 if (debug)
364 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing TSS_Assert(%d,%d,%d) cmd\n"),
365 elt_count_, clt_count_, leader_available_));
366 TAO_Leader_Follower &leader_follower = orb_core_->leader_follower ();
367 TAO_ORB_Core_TSS_Resources* tss = orb_core_->get_tss_resources ();
368 TSS_ASSERT (tss, leader_follower,
369 elt_count_, clt_count_, leader_available_);
370 return 0;
372 private:
373 TAO_ORB_Core* orb_core_;
374 const int elt_count_;
375 const int clt_count_;
376 const bool leader_available_;
379 class Wait_For_Event: public Command
381 public:
382 Wait_For_Event (Test_LF_Event& event,
383 Test_Transport& transport,
384 TAO_Leader_Follower& lf)
385 : event_ (event),
386 transport_ (transport),
387 lf_ (lf)
389 virtual int execute (Worker*)
391 if (debug)
392 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Wait_For_Event cmd\n")));
393 int retval = lf_.wait_for_event (&event_, &transport_, 0);
394 // The worker has probably been shutdown in order for the client
395 // leader event loop to exit - reactivate the worker so it from
396 // process msgs once we return
397 workers->shutdown (false);
398 return retval;
400 private:
401 Test_LF_Event& event_;
402 Test_Transport& transport_;
403 TAO_Leader_Follower& lf_;
406 class Cond_Signal: public Command
408 public:
409 Cond_Signal ()
410 : lock_ (),
411 cond_ (lock_),
412 ref_count_ (2)
415 virtual int execute (Worker*)
417 if (debug)
418 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Cond_Signal cmd\n")));
419 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, guard ,this->cond_.mutex (), 0);
420 return this->cond_.signal ();
422 TAO_SYNCH_MUTEX& lock ()
424 return lock_;
426 ACE_Condition_Thread_Mutex& cond ()
428 return cond_;
430 virtual ACE_Message_Block *release (void)
432 // we need to only release once both the main and worker thread
433 // are done with this object - each signals this by calling this
434 // method
435 --this->ref_count_;
436 if (this->ref_count_ == 0)
437 return ACE_Message_Block::release ();
438 return this;
441 private:
442 TAO_SYNCH_MUTEX lock_;
443 ACE_Condition_Thread_Mutex cond_;
444 int ref_count_;
447 class Event_Loop_Thread: public Command
449 public:
450 Event_Loop_Thread(TAO_Leader_Follower& lf,
451 TAO_LF_Strategy& lf_strategy)
452 : lf_ (lf), lf_strategy_ (lf_strategy)
455 virtual int execute (Worker* worker)
457 if (debug)
458 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Event_Loop_Thread cmd\n")));
459 TAO_LF_Event_Loop_Thread_Helper elt (lf_, lf_strategy_, 0);
460 while (!worker->shutdown())
461 worker->process_cmd ();
462 // The worker has been shutdown in order for this event loop
463 // thread to exit - reactivate the worker so it from process msgs
464 // once we return
465 worker->shutdown (false);
466 return 0;
468 private:
469 TAO_Leader_Follower& lf_;
470 TAO_LF_Strategy& lf_strategy_;
473 class Set_Upcall_Thread: public Command
475 public:
476 Set_Upcall_Thread (TAO_Leader_Follower& lf)
477 : lf_ (lf)
480 virtual int execute (Worker*)
482 if (debug)
483 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) Executing Set_Upcall_Thread cmd\n")));
484 lf_.set_upcall_thread ();
485 return 0;
488 private:
489 TAO_Leader_Follower& lf_;
492 void synch_with_worker (Worker& worker)
494 // This object is released by the worker thread after it has
495 // executed the cmd
496 Cond_Signal* cond = new Cond_Signal;
498 ACE_GUARD (TAO_SYNCH_MUTEX, guard, cond->lock ());
499 worker.put (cond);
500 ACE_Time_Value tv (1, 0);
501 tv += ACE_OS::gettimeofday ();
502 TEST_ASSERT ((cond->cond ().wait (&tv) == 0));
506 // 1 - Simple event loop thread test
507 void Test_1 (TAO_ORB_Core* orb_core)
509 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
510 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
511 TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();
513 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
514 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #1 - Simple Event Loop call\n")));
516 TSS_ASSERT (tss, leader_follower, 0, 0, false);
518 std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
519 elt (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
520 lf_strategy,
521 0));
522 TSS_ASSERT (tss, leader_follower, 1, 0, true);
524 elt.reset (0);
525 TSS_ASSERT (tss, leader_follower, 0, 0, false);
528 // 2 - Nested event loop threads - no set_upcall_thread call
529 void Test_2 (TAO_ORB_Core* orb_core)
531 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
532 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
533 TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();
534 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
535 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #2 - 2 nested Event Loop calls\n")));
537 TSS_ASSERT (tss, leader_follower, 0, 0, false);
539 std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
540 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
541 lf_strategy, 0));
542 TSS_ASSERT (tss, leader_follower, 1, 0, true);
544 std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
545 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
546 lf_strategy, 0));
547 TSS_ASSERT (tss, leader_follower, 2, 0, true);
549 elt2.reset (0);
550 TSS_ASSERT (tss, leader_follower, 1, 0, true);
552 elt1.reset (0);
553 TSS_ASSERT (tss, leader_follower, 0, 0, false);
556 // 3 - Nested event loop threads - with set_upcall_thread call
557 void Test_3 (TAO_ORB_Core* orb_core)
559 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
560 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
561 TAO_ORB_Core_TSS_Resources* tss = orb_core->get_tss_resources ();
562 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
563 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #3 - 2 nested Event Loop calls with set_upcall_thread\n")));
565 TSS_ASSERT (tss, leader_follower, 0, 0, false);
567 std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
568 elt1 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
569 lf_strategy, 0));
570 TSS_ASSERT (tss, leader_follower, 1, 0, true);
572 leader_follower.set_upcall_thread ();
573 TSS_ASSERT (tss, leader_follower, 0, 0, false);
575 std::unique_ptr<TAO_LF_Event_Loop_Thread_Helper>
576 elt2 (new TAO_LF_Event_Loop_Thread_Helper(leader_follower,
577 lf_strategy, 0));
578 TSS_ASSERT (tss, leader_follower, 1, 0, true);
580 elt2.reset (0);
581 TSS_ASSERT (tss, leader_follower, 0, 0, false);
583 elt1.reset (0);
584 TSS_ASSERT (tss, leader_follower, 0, 0, false);
587 // 4 - client leader thread
588 void Test_4 (TAO_ORB_Core* orb_core)
590 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
591 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
592 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #4 - Simple Client Leader thread\n")));
594 // Activate a thread
595 Worker wrk1;
596 wrk1.activate ();
598 // Test initial conditions
599 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
601 // Have the thread wait on an event
602 Test_LF_Event event;
603 Test_Transport transport (0, orb_core);
604 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
606 // The thread is still waiting on the event and thus should
607 // now be a client-leader thread
608 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
610 // Synchronise with the thread before we complete its event
611 synch_with_worker (wrk1);
612 // Complete the event
613 event.complete_event (leader_follower);
615 // The thread is still inside handle_events - shutdown the
616 // event processing
617 wrk1.put (new Shutdown);
619 // The thread should now return from being a client leader
620 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
622 wrk1.put (new Shutdown);
623 wrk1.wait ();
626 // 5 - nested client leader thread
627 void Test_5 (TAO_ORB_Core* orb_core)
629 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
630 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
631 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #5 - 2 nested Client Leader calls\n")));
633 // Activate a thread
634 Worker wrk1;
635 wrk1.activate ();
637 // Test initial conditions
638 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
640 // Have the thread wait on an event
641 Test_LF_Event event;
642 Test_Transport transport (0, orb_core);
643 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
645 // The thread is still waiting on the event and thus should
646 // now be a client-leader thread
647 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
649 // Wait for another event
650 Test_LF_Event event2;
651 wrk1.put (new Wait_For_Event(event2, transport, leader_follower));
653 // The thread is still waiting on the event and thus should
654 // now be a client-leader thread
655 wrk1.put (new TSS_Assert(orb_core, 0, 2, true));
657 // Synchronise with the thread before we complete its event
658 synch_with_worker (wrk1);
660 // Complete the first event - nothing should happen
661 event.complete_event (leader_follower);
663 wrk1.put (new TSS_Assert(orb_core, 0, 2, true));
665 // Complete the second event - everything should unwind
666 synch_with_worker (wrk1);
667 event2.complete_event (leader_follower);
669 // The thread is still inside handle_events - shutdown the
670 // event processing for the inner client leader
671 wrk1.put (new Shutdown);
673 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
675 // The thread is now in the handle_events for the outter
676 // client-leader - the event is already complete so just
677 // shutdown the cmd processing.
678 wrk1.put (new Shutdown);
680 // We should now we back at our initial state.
681 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
683 wrk1.put (new Shutdown);
684 wrk1.wait ();
687 // 6 - nested client leader thread with set_upcall_thread
688 void Test_6 (TAO_ORB_Core* orb_core)
690 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
691 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
692 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #6 - 2 nested Client Leader calls with set_upcall_thread\n")));
694 // Activate a thread
695 Worker wrk1;
696 wrk1.activate ();
698 // Test initial conditions
699 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
701 // Have the thread wait on an event
702 Test_LF_Event event;
703 Test_Transport transport (0, orb_core);
704 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
706 // The thread is still waiting on the event and thus should
707 // now be a client-leader thread
708 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
710 // Call set_upcall_thread
711 wrk1.put (new Set_Upcall_Thread (leader_follower));
712 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
714 // Wait for another event
715 Test_LF_Event event2;
716 wrk1.put (new Wait_For_Event(event2, transport, leader_follower));
718 // The thread is still waiting on the event and thus should
719 // now be a client-leader thread
720 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
722 // Synchronise with the thread before we complete its event
723 synch_with_worker (wrk1);
725 // Complete the first event - nothing should happen
726 event.complete_event (leader_follower);
728 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
730 // Complete the second event - everything should unwind
731 synch_with_worker (wrk1);
732 event2.complete_event (leader_follower);
734 // The thread is still inside handle_events - shutdown the
735 // event processing for the inner client leader
736 wrk1.put (new Shutdown);
738 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
740 // The thread is now in the handle_events for the outter
741 // client-leader - the event is already complete so just
742 // shutdown the cmd processing.
743 wrk1.put (new Shutdown);
745 // We should now we back at our initial state.
746 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
748 wrk1.put (new Shutdown);
749 wrk1.wait ();
752 // 7 - 2 client leader threads with set_upcall_thread
753 void Test_7 (TAO_ORB_Core* orb_core)
755 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
756 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
757 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #7 - Client Leader yields to another client thread\n")));
759 // Activate a thread
760 Worker wrk1;
761 wrk1.activate ();
763 // Test initial conditions
764 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
766 // Have the thread wait on an event
767 Test_LF_Event event;
768 Test_Transport transport (0, orb_core);
769 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
771 // The thread is still waiting on the event and thus should
772 // now be a client-leader thread
773 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
774 // Wait for the firs thread to be the client leader before we start
775 // the second thread
776 synch_with_worker (wrk1);
778 // Create another worker and have it do the same
779 Worker wrk2;
780 wrk2.activate ();
781 wrk2.put (new TSS_Assert(orb_core, 0, 0, true));
782 // Make sure this test is complete before the Set_Upcall_Thread below
783 synch_with_worker (wrk2);
784 Test_LF_Event event2;
785 wrk2.put (new Wait_For_Event(event2, transport, leader_follower));
786 // Note, we can't test the new thread here - it is block waiting on
787 // the follower cond var
788 // wrk2.put (new TSS_Assert(orb_core, 0, 1, true));
790 // Call set_upcall_thread on the first thread
791 wrk1.put (new Set_Upcall_Thread (leader_follower));
792 // Our second thread should now be the client leader and the first
793 // thread should not. Note, we need to first synchronise with
794 // thread 2 (to make sure it is in handle_events) to avoid race
795 // conditions.
796 synch_with_worker (wrk2);
797 wrk2.put (new TSS_Assert(orb_core, 0, 1, true));
798 wrk1.put (new TSS_Assert(orb_core, 0, 0, true));
800 // We should now be able to shutdown the first thread - have it
801 // return from handle_events and complete its event. If it has to
802 // wait it will just go back to being a follower
803 wrk1.put (new Shutdown);
804 event.complete_event (leader_follower);
805 synch_with_worker (wrk1);
806 wrk1.put (new TSS_Assert(orb_core, 0, 0, true));
807 // We can now shut-down the thread
808 wrk1.put (new Shutdown);
809 wrk1.wait ();
811 // Now shut-down the second thread
812 event2.complete_event (leader_follower);
813 wrk2.put (new Shutdown);
814 synch_with_worker (wrk2);
815 wrk2.put (new TSS_Assert(orb_core, 0, 0, false));
816 wrk2.put (new Shutdown);
817 wrk2.wait ();
820 // 8 - client becomes leader when event loop thread dispatched
821 void Test_8 (TAO_ORB_Core* orb_core)
823 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
824 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
825 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
826 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #8 - client becomes leader when event thread dispatched\n")));
828 // Activate a thread
829 Worker wrk1;
830 wrk1.activate ();
832 // Test initial conditions
833 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
835 // Have the thread become an event loop thread
836 wrk1.put (new Event_Loop_Thread (leader_follower,
837 lf_strategy));
838 wrk1.put (new TSS_Assert(orb_core, 1, 0, true));
840 // Before we start the next thread synchronise with the first
841 synch_with_worker (wrk1);
843 // Start another thread and have it wait on an event
844 Worker wrk2;
845 wrk2.activate ();
846 wrk2.put (new TSS_Assert(orb_core, 0, 0, true));
847 Test_LF_Event event;
848 Test_Transport transport (0, orb_core);
849 synch_with_worker (wrk2);
850 wrk2.put (new Wait_For_Event(event, transport, leader_follower));
852 // The new thread is a follower and thus is waiting on the follower
853 // cond var - we can't test this other than to check if the leader
854 // follower has clients, however, because we can't synchronise with
855 // that thread such a test would contain a race condition.
857 // Now dispatch the event loop thread by having it call set_upcall_thread
858 wrk1.put (new Set_Upcall_Thread (leader_follower));
860 // the first worker should have given up leadership and the second
861 // thread should have assumed leadership. We have to synchronise
862 // with both threads before we can test anything, otherwise we could
863 // catch the window where there is no leader.
864 synch_with_worker (wrk1);
865 synch_with_worker (wrk2);
866 wrk1.put (new TSS_Assert (orb_core, 0, 0, true));
867 wrk2.put (new TSS_Assert (orb_core, 0, 1, true));
869 // OK, now shut everything down - first the event loop thread
870 wrk1.put (new Shutdown);
871 wrk1.put (new TSS_Assert (orb_core, 0, 0, true));
872 wrk1.put (new Shutdown);
873 wrk1.wait ();
875 // Now the client thread
876 wrk2.put (new TSS_Assert (orb_core, 0, 1, true));
877 synch_with_worker (wrk2);
878 event.complete_event (leader_follower);
879 wrk2.put (new Shutdown);
880 wrk2.put (new TSS_Assert (orb_core, 0, 0, false));
881 wrk2.put (new Shutdown);
882 wrk2.wait ();
885 // 9 - client leader thread then event loop thread
886 void Test_9 (TAO_ORB_Core* orb_core)
888 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
889 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
890 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
891 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #9 - Client Leader thread yields to Event Loop thread\n")));
893 // Activate a thread
894 Worker wrk1;
895 wrk1.activate ();
897 // Test initial conditions
898 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
900 // Have the thread wait on an event
901 Test_LF_Event event;
902 Test_Transport transport (0, orb_core);
903 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
905 // The thread is still waiting on the event and thus should
906 // now be a client-leader thread
907 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
908 // We now need to synchronise with the worker, to make sure it
909 // has processed all the cmds we have sent it
910 synch_with_worker (wrk1);
912 // create an event loop thread - this means a new worker
913 Worker wrk2;
914 wrk2.activate ();
915 wrk2.put (new Event_Loop_Thread (leader_follower,
916 lf_strategy));
918 // Unfortunately there is no way to test if the event loop thread is
919 // where we expect it to be (the
920 // wait_for_client_leader_to_complete() method). The only thing we
921 // could check is the event_loop_threads_waiting_ count, however,
922 // that is private to the TAO_Leader_Follower class.
924 // We need to get the client leader thread to return from
925 // process_cmd() and allow it to surrender leadership to the waiting
926 // event loop thread - send it a shutdown. The TAO_Leader_Follower
927 // code may call handle_events a few more times, however, since the
928 // cmd processing is shutdown (and won't be reactivated until the
929 // event is complete) handle_events will just return.
930 wrk1.put (new Shutdown);
932 // Now test the new event loop thread
933 wrk2.put (new TSS_Assert(orb_core, 1, 0, true));
934 // Wait until the event loop thread is running before we test
935 // the client thread
936 synch_with_worker (wrk2);
938 // We can't test the client thread either - it is blocked in a call
939 // to the event's cond var's wait() method. All we can do is
940 // complete the event, which will signal the cond var
941 event.complete_event (leader_follower);
943 // The client thread should return from wait_for_event
944 wrk1.put (new TSS_Assert(orb_core, 0, 0, true));
945 // And the event loop thread should still be 'running'
946 wrk2.put (new TSS_Assert(orb_core, 1, 0, true));
948 // Some other misc checks
949 synch_with_worker (wrk1);
950 TEST_ASSERT ((leader_follower.has_clients () == 0));
952 // OK, lets shut everything down now - the event loop thread
953 // requires two shutdown cmds, one to exit the event loop thread cmd
954 // and the second to exit the main cmd processor
955 wrk2.put (new Shutdown);
956 // Incidently there is now no leader
957 wrk2.put (new TSS_Assert(orb_core, 0, 0, false));
958 wrk2.put (new Shutdown);
959 wrk2.wait ();
961 // Shutdown the other worker
962 wrk1.put (new Shutdown);
963 wrk1.wait ();
966 // 10 - ET1437460 (second problem)
967 void Test_10 (TAO_ORB_Core* orb_core )
969 TAO_LF_Strategy &lf_strategy = orb_core->lf_strategy ();
970 TAO_Leader_Follower &leader_follower = orb_core->leader_follower ();
971 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("==========\n")));
972 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("TEST #10 - ET1437460\n")));
974 // This scenario involves:
975 // - an event loop thread
976 // - which calls set_upcall_thread
977 // - then becomes a client leader
978 // - is dispatched and then becomes a client leader again
979 // (without calling set_upcall_thread)
980 // - calls set_upcall_thread
981 // - unwinds
983 // Originally this caused the leaders_ member to get set to -1
984 // (the inner client leader still decremented leaders_ even
985 // though set_upcall_thread was called)
987 // Activate a thread
988 Worker wrk1;
989 wrk1.activate ();
991 // Test initial conditions
992 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
994 // Have the thread become an event loop thread
995 wrk1.put (new Event_Loop_Thread (leader_follower,
996 lf_strategy));
998 // The thread should be an event loop thread
999 wrk1.put (new TSS_Assert(orb_core, 1, 0, true));
1001 // call set_upcall_thread
1002 wrk1.put (new Set_Upcall_Thread (leader_follower));
1004 // The thread should no longer be an event loop thread
1005 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
1007 // Have the thread wait on an event
1008 Test_LF_Event event;
1009 Test_Transport transport (0, orb_core);
1010 wrk1.put (new Wait_For_Event(event, transport, leader_follower));
1012 // The thread is still waiting on the event and thus should
1013 // now be a client-leader thread
1014 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
1016 // Have the thread wait on another event
1017 Test_LF_Event event2;
1018 wrk1.put (new Wait_For_Event(event2, transport, leader_follower));
1020 // The thread is still waiting on the event and thus should now be a
1021 // client-leader thread (again)
1022 wrk1.put (new TSS_Assert(orb_core, 0, 2, true));
1024 // Call set_upcall_thread
1025 wrk1.put (new Set_Upcall_Thread(leader_follower));
1027 // We now need to synchronise with the worker, to make sure it
1028 // has processed all the cmds we have sent it
1029 synch_with_worker (wrk1);
1031 // Now, complete the events, and then shutdown the cmd event loop
1032 event.complete_event (leader_follower);
1033 event2.complete_event (leader_follower);
1034 wrk1.put (new Shutdown);
1036 // The inner client has returned
1037 wrk1.put (new TSS_Assert(orb_core, 0, 1, true));
1039 // Shutdown the outter client thread
1040 wrk1.put (new Shutdown);
1042 // We should be back to the initial state
1043 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
1045 // Now shutdown the event loop thread
1046 wrk1.put (new Shutdown);
1047 wrk1.put (new TSS_Assert(orb_core, 0, 0, false));
1049 // Shutdown the other worker
1050 wrk1.put (new Shutdown);
1051 wrk1.wait ();
1056 ACE_TMAIN(int argc, ACE_TCHAR *argv[])
1058 // scope TSS holder within main scope
1059 // so we're certain it gets destroyed before the
1060 // ACE object manager
1061 ACE_TSS<Worker> workers_;
1062 // provide global access
1063 workers_p = &workers_;
1067 CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);
1068 if (parse_args (argc, argv) != 0)
1069 return 1;
1071 // Make sure the reactor is initialised in the leader_follower
1072 ACE_Reactor* reactor = orb->orb_core ()->leader_follower ().reactor ();
1073 TEST_ASSERT ((reactor != 0));
1075 // Ready to go
1076 Test_1 (orb->orb_core ());
1077 Test_2 (orb->orb_core ());
1078 Test_3 (orb->orb_core ());
1079 Test_4 (orb->orb_core ());
1080 Test_5 (orb->orb_core ());
1081 Test_6 (orb->orb_core ());
1082 Test_7 (orb->orb_core ());
1083 Test_8 (orb->orb_core ());
1084 Test_9 (orb->orb_core ());
1085 Test_10 (orb->orb_core ());
1087 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("Testing complete.\n")));
1089 // cleanup
1090 orb->destroy ();
1092 catch (const CORBA::Exception& ex)
1094 ex._tao_print_exception ("Exception caught:");
1095 return 1;
1098 return 0;
1101 #else
1104 ACE_TMAIN(int /*argc*/, ACE_TCHAR * /*argv*/ [])
1106 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("This test only makes sense in an MT build.\n")));
1108 return 0;
1111 #endif // !ACE_HAS_THREADS
1113 // ****************************************************************