Merge pull request #2218 from jwillemsen/jwi-pthreadsigmask
[ACE_TAO.git] / TAO / tao / Leader_Follower.cpp
blob8ba45c8dab6bbb7f6f1ad0586cde1b2a405376f5
1 #include "ace/OS_NS_sys_time.h"
2 #include "ace/Reactor.h"
4 #include "tao/Leader_Follower.h"
5 #include "tao/LF_Follower_Auto_Ptr.h"
6 #include "tao/LF_Follower_Auto_Adder.h"
7 #include "tao/LF_Event_Binder.h"
8 #include "tao/debug.h"
9 #include "tao/Transport.h"
10 #include "tao/GUIResource_Factory.h"
11 #include "tao/ORB_Core.h"
12 #include "tao/ORB_Time_Policy.h"
14 #include <memory>
16 #if !defined (__ACE_INLINE__)
17 # include "tao/Leader_Follower.inl"
18 #endif /* ! __ACE_INLINE__ */
20 TAO_BEGIN_VERSIONED_NAMESPACE_DECL
22 TAO_Leader_Follower::Deferred_Event::Deferred_Event (ACE_Event_Handler* h)
23 : eh_ (h)
25 h->add_reference ();
29 ACE_Event_Handler* TAO_Leader_Follower::Deferred_Event::handler () const
31 return this->eh_.handler ();
34 TAO_Leader_Follower::~TAO_Leader_Follower ()
36 while (!this->deferred_event_set_.is_empty ())
38 Deferred_Event *event = this->deferred_event_set_.pop_front ();
39 delete event;
41 while (!this->follower_free_list_.is_empty ())
43 TAO_LF_Follower *follower = this->follower_free_list_.pop_front ();
44 delete follower;
46 // Hand the reactor back to the resource factory.
47 // use GUI reactor factory if available
48 if ( this->orb_core_->gui_resource_factory () )
49 this->orb_core_->gui_resource_factory ()->reclaim_reactor (this->reactor_);
50 else
51 this->orb_core_->resource_factory ()->reclaim_reactor (this->reactor_);
53 this->reactor_ = nullptr;
56 TAO_LF_Follower *
57 TAO_Leader_Follower::allocate_follower ()
59 if (!this->follower_free_list_.is_empty ())
60 return this->follower_free_list_.pop_front ();
62 TAO_LF_Follower* ptr = nullptr;
63 ACE_NEW_RETURN (ptr,
64 TAO_LF_Follower (*this),
65 nullptr);
66 return ptr;
69 void
70 TAO_Leader_Follower::release_follower (TAO_LF_Follower *follower)
72 this->follower_free_list_.push_front (follower);
75 int
76 TAO_Leader_Follower::elect_new_leader_i ()
78 TAO_LF_Follower* const follower = this->follower_set_.head ();
80 #if defined (TAO_DEBUG_LEADER_FOLLOWER)
81 TAOLIB_DEBUG ((LM_DEBUG,
82 ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::elect_new_leader_i - ")
83 ACE_TEXT ("follower is %@\n"),
84 follower));
85 #endif /* TAO_DEBUG_LEADER_FOLLOWER */
87 return follower->signal ();
90 int
91 TAO_Leader_Follower::wait_for_client_leader_to_complete (ACE_Time_Value *max_wait_time)
93 int result = 0;
94 TAO::ORB_Countdown_Time countdown (max_wait_time);
96 // Note that we are waiting.
97 ++this->event_loop_threads_waiting_;
99 while (this->client_thread_is_leader_ && result != -1)
101 if (max_wait_time == nullptr)
103 if (this->event_loop_threads_condition_.wait () == -1)
105 TAOLIB_ERROR ((LM_ERROR,
106 ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::")
107 ACE_TEXT ("wait_for_client_leader_to_complete - ")
108 ACE_TEXT ("Condition variable wait failed\n")));
110 result = -1;
113 else
115 countdown.update ();
116 ACE_Time_Value tv = ACE_OS::gettimeofday ();
117 tv += *max_wait_time;
118 if (this->event_loop_threads_condition_.wait (&tv) == -1)
120 if (errno != ETIME)
121 TAOLIB_ERROR ((LM_ERROR,
122 ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::")
123 ACE_TEXT ("wait_for_client_leader_to_complete - ")
124 ACE_TEXT ("Condition variable wait failed\n")));
126 result = -1;
131 // Reset waiting state.
132 --this->event_loop_threads_waiting_;
134 return result;
137 ACE_Reactor *
138 TAO_Leader_Follower::reactor ()
140 if (this->reactor_ == nullptr)
142 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), nullptr);
143 if (this->reactor_ == nullptr)
145 // use GUI reactor factory if available
146 if ( this->orb_core_->gui_resource_factory () )
147 this->reactor_ =
148 this->orb_core_->gui_resource_factory ()->get_reactor ();
149 else
150 this->reactor_ =
151 this->orb_core_->resource_factory ()->get_reactor ();
154 return this->reactor_;
157 void
158 TAO_Leader_Follower::set_client_thread ()
160 // If we were a leader thread or an event loop thread, give up
161 // leadership.
162 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
163 if (tss->event_loop_thread_ ||
164 tss->client_leader_thread_)
166 --this->leaders_;
169 if (this->clients_ == 0 &&
170 this->orb_core_->has_shutdown () &&
171 !this->orb_core_->resource_factory ()->drop_replies_during_shutdown ())
173 // The ORB has shutdown and we are the first client after
174 // that. This means that the reactor is disabled, we must
175 // re-enable it if we want to receive any replys...
176 this->orb_core_->reactor ()->reset_reactor_event_loop ();
178 ++this->clients_;
181 void
182 TAO_Leader_Follower::reset_client_thread ()
184 // If we were a leader thread or an event loop thread, take back
185 // leadership.
186 TAO_ORB_Core_TSS_Resources *tss = this->get_tss_resources ();
187 if (tss->event_loop_thread_ ||
188 tss->client_leader_thread_)
190 ++this->leaders_;
193 --this->clients_;
194 if (this->clients_ == 0 &&
195 this->orb_core_->has_shutdown ())
197 // The ORB has shutdown and we are the last client thread, we
198 // must stop the reactor to ensure that any server threads go
199 // away.
200 this->orb_core_->reactor ()->end_reactor_event_loop ();
205 TAO_Leader_Follower::defer_event (ACE_Event_Handler* eh)
207 // Obtain the lock.
208 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
210 if (TAO_debug_level > 7)
211 TAOLIB_DEBUG ((LM_DEBUG,
212 ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::defer_event, ")
213 ACE_TEXT ("deferring event handler[%d]\n"),
214 eh->get_handle ()));
215 Deferred_Event* ptr = nullptr;
216 ACE_NEW_RETURN (ptr,
217 Deferred_Event (eh),
218 -1);
219 this->deferred_event_set_.push_back (ptr);
220 return 0;
223 void
224 TAO_Leader_Follower::resume_events ()
226 // not need to obtain the lock, only called when holding the lock
227 while (!this->deferred_event_set_.is_empty ())
229 std::unique_ptr<Deferred_Event> event (this->deferred_event_set_.pop_front ());
230 // Send a notification to the reactor to cause the awakening of a new
231 // follower, if there is one already available.
232 ACE_Reactor *reactor = this->orb_core_->reactor ();
233 int const retval = reactor->notify (event->handler (), ACE_Event_Handler::READ_MASK);
234 if (TAO_debug_level > 2)
236 // @@todo: need to think about what is the action that
237 // we can take when we get here with an error?!
238 TAOLIB_DEBUG ((LM_DEBUG,
239 ACE_TEXT ("TAO (%P|%t) - TAO_Leader_Follower::resume_events, ")
240 ACE_TEXT ("an event handler[%d] has been resumed, ")
241 ACE_TEXT ("notified the reactor, retval=%d.\n"),
242 event->handler ()->get_handle (), retval));
248 TAO_Leader_Follower::wait_for_event (TAO_LF_Event *event,
249 TAO_Transport *transport,
250 ACE_Time_Value *max_wait_time)
252 // Obtain the lock.
253 ACE_GUARD_RETURN (TAO_SYNCH_MUTEX, ace_mon, this->lock (), -1);
255 TAO::ORB_Countdown_Time countdown (max_wait_time);
257 // Optimize the first iteration [no access to errno]
258 int result = 1;
260 // For some cases the transport may disappear like when waiting for
261 // connection to be initiated or closed. So cache the id.
262 // @@ NOTE: This is not completely safe either. We will be fine for
263 // cases that don't access the id ie. when debug level is off but
264 // with debugging level on we are on a sticky wicket. Hopefully none
265 // of our users should run TAO with debugging enabled like they did
266 // in PathFinder
267 size_t t_id = 0;
269 if (TAO_debug_level && transport != nullptr)
271 t_id = transport->id ();
274 { // Scope #1: All threads inside here are client threads
275 // Calls this->set_client_thread () on construction and
276 // this->reset_client_thread () on destruction.
277 TAO_LF_Client_Thread_Helper client_thread_helper (*this);
278 ACE_UNUSED_ARG (client_thread_helper);
280 // The loop here is for when we get elected (client) leader and
281 // then later relinquish the leader position and our event has
282 // still not completed (and we haven't run out of time).
283 // All the conditions below are basically the various ways the
284 // leader loop below can end, other than the event being complete
285 while (event->keep_waiting_i ()
286 && !(result == 0 &&
287 max_wait_time != nullptr &&
288 *max_wait_time == ACE_Time_Value::zero)
289 && result != -1)
290 { // Scope #2: threads here alternate between being leader/followers
292 // Check if there is a leader. Note that it cannot be us since we
293 // gave up our leadership when we became a client.
294 if (this->leader_available ())
295 { // Scope #3: threads here are followers
296 // = Wait as a follower.
298 // Grab a follower:
299 TAO_LF_Follower_Auto_Ptr follower (*this);
300 if (follower.get () == nullptr)
301 return -1;
303 if (TAO_debug_level >= 5)
304 TAOLIB_DEBUG ((LM_DEBUG,
305 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
306 ACE_TEXT (" (follower), cond <%@>\n"),
307 t_id, follower.get ()));
309 // Bound the follower and the LF_Event, this is important to
310 // get a signal when the event terminates
311 TAO_LF_Event_Binder event_binder (event, follower.get ());
313 while (event->keep_waiting_i () &&
314 this->leader_available ())
315 { // Scope #4: this loop handles spurious wake-ups
316 // Add ourselves to the list, do it everytime we wake up
317 // from the CV loop. Because:
319 // - The leader thread could have elected us as the new
320 // leader.
321 // - Before we can assume the role another thread becomes
322 // the leader
323 // - But our condition variable could have been removed
324 // already, if we don't add it again we will never wake
325 // up.
327 // Notice that we can have spurious wake ups, in that case
328 // adding the leader results in an error, that must be
329 // ignored.
330 // You may be thinking of not removing the condition
331 // variable in the code that sends the signal, but
332 // removing it here, that does not work either, in that
333 // case the condition variable may be used twice:
335 // - Wake up because its reply arrived
336 // - Wake up because it must become the leader
338 // but only the first one has any effect, so the leader is
339 // lost.
340 TAO_LF_Follower_Auto_Adder auto_adder (*this, follower);
342 if (max_wait_time == nullptr)
344 if (follower->wait (max_wait_time) == -1)
346 if (TAO_debug_level >= 5)
347 TAOLIB_DEBUG ((LM_DEBUG,
348 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, ")
349 ACE_TEXT (" (follower) [no timer, cond failed]\n"),
350 t_id));
352 // @@ Michael: What is our error handling in this case?
353 // We could be elected as leader and
354 // no leader would come in?
355 return -1;
358 else
360 countdown.update ();
361 ACE_Time_Value tv = ACE_OS::gettimeofday ();
362 tv += *max_wait_time;
363 if (follower->wait (&tv) == -1)
365 if (TAO_debug_level >= 5)
366 TAOLIB_DEBUG ((LM_DEBUG,
367 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait, ")
368 ACE_TEXT ("(follower) [has timer, follower failed]\n"),
369 t_id ));
371 // If we have timedout set the state in the
372 // LF_Event. We call the non-locking,
373 // no-signalling method on LF_Event.
374 if (errno == ETIME)
375 // We have timedout
376 event->set_state (TAO_LF_Event::LFS_TIMEOUT);
378 if (!event->successful_i ())
380 // Remove follower can fail because either
381 // 1) the condition was satisfied (i.e. reply
382 // received or queue drained), or
383 // 2) somebody elected us as leader, or
384 // 3) the connection got closed.
386 // Therefore:
387 // If remove_follower fails and the condition
388 // was not satisfied, we know that we got
389 // elected as a leader.
390 // But we got a timeout, so we cannot become
391 // the leader, therefore, we have to select a
392 // new leader.
395 if (this->elect_new_leader () == -1
396 && TAO_debug_level > 0)
398 TAOLIB_ERROR ((LM_ERROR,
399 ACE_TEXT("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, ")
400 ACE_TEXT("elect_new_leader failed\n"),
401 t_id ));
406 return -1;
409 } // End Scope #4: loop to handle spurious wakeups
411 countdown.update ();
413 // @@ Michael: This is an old comment why we do not want to
414 // remove the follower here.
415 // We should not remove the follower here, we *must* remove it when
416 // we signal it so the same condition is not signalled for
417 // both wake up as a follower and as the next leader.
419 if (TAO_debug_level >= 5)
420 TAOLIB_DEBUG ((LM_DEBUG,
421 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
422 ACE_TEXT (" done (follower), successful %d\n"),
423 t_id,
424 event->successful_i ()));
426 // Now somebody woke us up to become a leader or to handle our
427 // input. We are already removed from the follower queue.
429 if (event->successful_i ())
430 return 0;
432 if (event->error_detected_i ())
433 return -1;
435 // FALLTHROUGH
436 // We only get here if we woke up but the reply is not
437 // complete yet, time to assume the leader role....
438 // i.e. ACE_ASSERT (event->successful () == 0);
439 } // End Scope #3: we are no longer a follower
441 // One last attempt to avoid becoming a client-leader
442 // If there is a new_leader_generator attached, give it a chance to
443 // create a leader thread before becoming a leader.
444 if (this->avoid_client_leader_ && this->no_leaders_available())
446 if (TAO_debug_level >= 5)
448 TAOLIB_DEBUG ((LM_DEBUG,
449 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, ")
450 ACE_TEXT ("Would become client leader, ")
451 ACE_TEXT ("but generating new thread\n"),
452 t_id));
455 // Yield, providing the event thread some time to grab leadership
456 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
457 this->reverse_lock (), -1);
458 ACE_OS::thr_yield ();
459 continue;
461 else
463 if (TAO_debug_level >= 5)
465 TAOLIB_DEBUG ((LM_DEBUG,
466 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event, ")
467 ACE_TEXT ("Becoming client leader.\n"),
468 t_id));
471 // = Leader Code.
473 // The only way to reach this point is if we must become the
474 // leader, because there is no leader or we have to update to a
475 // leader or we are doing nested upcalls in this case we do
476 // increase the refcount on the leader in TAO_ORB_Core.
478 // Calls this->set_client_leader_thread () on
479 // construction and this->reset_client_leader_thread ()
480 // on destruction. Note that this may increase the refcount of
481 // the leader.
482 { // Scope #5: We are now the client-leader
483 TAO_LF_Client_Leader_Thread_Helper client_leader_thread_helper (*this);
484 ACE_UNUSED_ARG (client_leader_thread_helper);
486 { // Scope #6: release the lock via a reverse lock
487 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
488 this->reverse_lock (), -1);
490 // Become owner of the reactor.
491 ACE_Reactor *reactor = this->reactor_;
492 reactor->owner (ACE_Thread::self ());
494 // Run the reactor event loop.
495 if (TAO_debug_level >= 5)
496 TAOLIB_DEBUG ((LM_DEBUG,
497 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
498 ACE_TEXT (" (leader) enter reactor event loop\n"),
499 t_id));
501 // If we got our event, no need to run the event loop any
502 // further.
503 while (event->keep_waiting_i ())
505 // Run the event loop.
506 result = reactor->handle_events (max_wait_time);
508 // Did we timeout? If so, stop running the loop.
509 if (result == 0 &&
510 max_wait_time != nullptr &&
511 *max_wait_time == ACE_Time_Value::zero)
512 break;
514 // Other errors? If so, stop running the loop.
515 if (result == -1)
516 break;
518 // Has an event loop thread become available to take over?
519 // Yes, we are checking this without the lock, however, if
520 // we get a false reading we'll just circle around and
521 // become leader again...
522 if (this->event_loop_threads_waiting_)
523 break;
524 // Did we give up leadership?
525 if (!this->is_client_leader_thread ())
526 break;
527 // Otherwise, keep going...
530 if (TAO_debug_level >= 5)
531 TAOLIB_DEBUG ((LM_DEBUG,
532 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
533 ACE_TEXT (" (leader) exit reactor event loop\n"),
534 t_id));
535 } // End Scope #6: we should now hold the lock again
537 // End artificial scope for auto_ptr like helpers calling:
538 // this->reset_client_leader_thread ().
540 } // End Scope #5: we are no longer a client-leader
541 // We only get here if we were the client leader and either our
542 // event completed or an event loop thread has become available to
543 // become leader.
545 // resume any deferred events before we switch to a new leader thread
546 this->resume_events ();
548 // Wake and yield to any event loop threads that may be waiting to
549 // take leadership - otherwise we will just loop around and take
550 // leadership again (because we hold the lock).
551 if (this->event_loop_threads_waiting_ && !this->leader_available ())
553 if (TAO_debug_level >= 5)
554 TAOLIB_DEBUG ((LM_DEBUG,
555 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
556 ACE_TEXT (" (client) waking and yielding to allow event thread leadership\n"),
557 t_id));
559 // Wake up the next leader (in case not yet done)
560 if (this->elect_new_leader () == -1)
561 TAOLIB_ERROR_RETURN ((LM_ERROR,
562 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
563 ACE_TEXT (" failed to elect new leader\n"),
564 t_id),
565 -1);
567 // Yield, providing the event thread some time to grab leadership
568 ACE_GUARD_RETURN (ACE_Reverse_Lock<TAO_SYNCH_MUTEX>, rev_mon,
569 this->reverse_lock (), -1);
570 ACE_OS::thr_yield ();
573 } // End Scope #2: we loop here if our event is incomplete
576 // End artificial scope for auto_ptr like helpers calling:
577 // this->reset_client_thread ()
579 // We should only get here when our event is complete or timed-out
580 } // End Scope #1
582 // Wake up the next leader, we cannot do that in handle_input,
583 // because the woken up thread would try to get into handle_events,
584 // which is at the time in handle_input still occupied. But do it
585 // before checking the error in <result>, even if there is an error
586 // in our input we should continue running the loop in another
587 // thread.
589 if (this->elect_new_leader () == -1)
590 TAOLIB_ERROR_RETURN ((LM_ERROR,
591 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
592 ACE_TEXT (" failed to elect new leader\n"),
593 t_id),
594 -1);
596 if (result == -1 && !this->reactor_->reactor_event_loop_done ())
597 TAOLIB_ERROR_RETURN ((LM_ERROR,
598 ACE_TEXT ("TAO (%P|%t) - Leader_Follower[%d]::wait_for_event,")
599 ACE_TEXT (" handle_events failed\n"),
600 t_id),
601 -1);
603 // Return an error if there was a problem receiving the reply...
604 if (max_wait_time != nullptr)
606 if (!event->successful_i ()
607 && *max_wait_time == ACE_Time_Value::zero)
609 result = -1;
610 errno = ETIME;
612 else if (event->error_detected_i ())
614 // If the time did not expire yet, but we get a failure,
615 // e.g. the connections closed, we should still return an error.
616 result = -1;
619 else
622 * There should be no reason to reset the value of result
623 * here. If there was an error in handle_events () that the
624 * leader saw, I (Bala) believe it should be propagated to the
625 * clients.
626 * result = 0;
628 if (event->error_detected_i ())
630 result = -1;
634 return result;
637 TAO_END_VERSIONED_NAMESPACE_DECL