Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / ace / TP_Reactor.cpp
blob4811e28289e63d71fad6d9689c81bd090ea9bef9
1 #include "ace/TP_Reactor.h"
2 #include "ace/Thread.h"
3 #include "ace/Timer_Queue.h"
4 #include "ace/Sig_Handler.h"
5 #include "ace/Log_Category.h"
6 #include "ace/Functor_T.h"
7 #include "ace/OS_NS_sys_time.h"
9 #if !defined (__ACE_INLINE__)
10 #include "ace/TP_Reactor.inl"
11 #endif /* __ACE_INLINE__ */
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 ACE_ALLOC_HOOK_DEFINE (ACE_TP_Reactor)
17 int
18 ACE_TP_Token_Guard::acquire_read_token (ACE_Time_Value *max_wait_time)
20 ACE_TRACE ("ACE_TP_Token_Guard::acquire_read_token");
22 // The order of these events is very subtle, modify with care.
24 // Try to grab the lock. If someone if already there, don't wake
25 // them up, just queue up in the thread pool.
26 int result = 0;
28 if (max_wait_time)
30 ACE_Time_Value tv = ACE_OS::gettimeofday ();
31 tv += *max_wait_time;
33 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook,
35 &tv));
37 else
39 ACE_MT (result = this->token_.acquire_read (&ACE_TP_Reactor::no_op_sleep_hook));
42 // Check for timeouts and errors.
43 if (result == -1)
45 if (errno == ETIME)
46 return 0;
47 else
48 return -1;
51 // We got the token and so let us mark ourselves as owner
52 this->owner_ = true;
54 return result;
57 int
58 ACE_TP_Token_Guard::acquire_token (ACE_Time_Value *max_wait_time)
60 ACE_TRACE ("ACE_TP_Token_Guard::acquire_token");
62 // Try to grab the lock. If someone if already there, don't wake
63 // them up, just queue up in the thread pool.
64 int result = 0;
66 if (max_wait_time)
68 ACE_Time_Value tv = ACE_OS::gettimeofday ();
69 tv += *max_wait_time;
71 ACE_MT (result = this->token_.acquire (0,
73 &tv));
75 else
77 ACE_MT (result = this->token_.acquire ());
80 // Check for timeouts and errors.
81 if (result == -1)
83 if (errno == ETIME)
84 return 0;
85 else
86 return -1;
89 // We got the token and so let us mark ourselves as owner
90 this->owner_ = true;
92 return result;
96 ACE_TP_Reactor::ACE_TP_Reactor (ACE_Sig_Handler *sh,
97 ACE_Timer_Queue *tq,
98 bool mask_signals,
99 int s_queue)
100 : ACE_Select_Reactor (sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
102 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
103 this->supress_notify_renew (true);
106 ACE_TP_Reactor::ACE_TP_Reactor (size_t max_number_of_handles,
107 bool restart,
108 ACE_Sig_Handler *sh,
109 ACE_Timer_Queue *tq,
110 bool mask_signals,
111 int s_queue)
112 : ACE_Select_Reactor (max_number_of_handles, restart, sh, tq, ACE_DISABLE_NOTIFY_PIPE_DEFAULT, 0, mask_signals, s_queue)
114 ACE_TRACE ("ACE_TP_Reactor::ACE_TP_Reactor");
115 this->supress_notify_renew (true);
119 ACE_TP_Reactor::owner (ACE_thread_t, ACE_thread_t *o_id)
121 ACE_TRACE ("ACE_TP_Reactor::owner");
122 if (o_id)
123 *o_id = ACE_Thread::self ();
125 return 0;
129 ACE_TP_Reactor::owner (ACE_thread_t *t_id)
131 ACE_TRACE ("ACE_TP_Reactor::owner");
132 *t_id = ACE_Thread::self ();
134 return 0;
138 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
140 ACE_TRACE ("ACE_TP_Reactor::handle_events");
142 // Stash the current time -- the destructor of this object will
143 // automatically compute how much time elapsed since this method was
144 // called.
145 ACE_Countdown_Time countdown (max_wait_time);
148 // The order of these events is very subtle, modify with care.
151 // Instantiate the token guard which will try grabbing the token for
152 // this thread.
153 ACE_TP_Token_Guard guard (this->token_);
155 int const result = guard.acquire_read_token (max_wait_time);
157 // If the guard is NOT the owner just return the retval
158 if (!guard.is_owner ())
159 return result;
161 // After getting the lock just just for deactivation..
162 if (this->deactivated_)
164 errno = ESHUTDOWN;
165 return -1;
168 // Update the countdown to reflect time waiting for the token.
169 countdown.update ();
171 return this->dispatch_i (max_wait_time, guard);
175 ACE_TP_Reactor::dispatch_i (ACE_Time_Value *max_wait_time,
176 ACE_TP_Token_Guard &guard)
178 int event_count = this->get_event_for_dispatching (max_wait_time);
180 // We use this count to detect potential infinite loops as described
181 // in bug 2540.
182 int const initial_event_count = event_count;
184 int result = 0;
186 // Note: We are passing the <event_count> around, to have record of
187 // how many events still need processing. May be this could be
188 // useful in future.
190 #if 0
191 // @Ciju
192 // signal handling isn't in a production state yet.
193 // Commenting it out for now.
195 // Dispatch signals
196 if (event_count == -1)
198 // Looks like we dont do any upcalls in dispatch signals. If at
199 // a later point of time, we decide to handle signals we have to
200 // release the lock before we make any upcalls.. What is here
201 // now is not the right thing...
203 // @@ We need to do better..
204 return this->handle_signals (event_count, guard);
206 #endif // #if 0
208 // If there are no signals and if we had received a proper
209 // event_count then first look at dispatching timeouts. We need to
210 // handle timers early since they may have higher latency
211 // constraints than I/O handlers. Ideally, the order of dispatching
212 // should be a strategy...
214 // NOTE: The event count does not have the number of timers that
215 // needs dispatching. But we are still passing this along. We dont
216 // need to do that. In the future we *may* have the timers also
217 // returned through the <event_count>. Just passing that along for
218 // that day.
219 result = this->handle_timer_events (event_count, guard);
221 if (result > 0)
222 return result;
224 // Else just go ahead fall through for further handling.
226 if (event_count > 0)
228 // Next dispatch the notification handlers (if there are any to
229 // dispatch). These are required to handle multiple-threads
230 // that are trying to update the <Reactor>.
231 result = this->handle_notify_events (event_count, guard);
233 if (result > 0)
234 return result;
236 // Else just fall through for further handling
239 if (event_count > 0)
241 // Handle socket events
242 result = this->handle_socket_events (event_count, guard);
245 if (event_count != 0 && event_count == initial_event_count)
247 this->state_changed_ = true;
250 return result;
254 #if 0
255 // @Ciju
256 // signal handling isn't in a production state yet.
257 // Commenting it out for now.
260 ACE_TP_Reactor::handle_signals (int & /*event_count*/,
261 ACE_TP_Token_Guard & /*guard*/)
263 ACE_TRACE ("ACE_TP_Reactor::handle_signals");
267 * THIS METHOD SEEMS BROKEN
271 // First check for interrupts.
272 // Bail out -- we got here since <select> was interrupted.
273 if (ACE_Sig_Handler::sig_pending () != 0)
275 ACE_Sig_Handler::sig_pending (0);
277 // This piece of code comes from the old TP_Reactor. We did not
278 // handle signals at all then. If we happen to handle signals
279 // in the TP_Reactor, we should then start worryiung about this
280 // - Bala 21-Aug- 01
281 if 0
282 // Not sure if this should be done in the TP_Reactor
283 // case... leave it out for now. -Steve Huston 22-Aug-00
285 // If any HANDLES in the <ready_set_> are activated as a
286 // result of signals they should be dispatched since
287 // they may be time critical...
288 active_handle_count = this->any_ready (dispatch_set);
289 else
290 // active_handle_count = 0;
291 endif
293 // Record the fact that the Reactor has dispatched a
294 // handle_signal() method. We need this to return the
295 // appropriate count.
296 return 1;
299 return -1;
301 #endif // #if 0
305 ACE_TP_Reactor::handle_timer_events (int & /*event_count*/,
306 ACE_TP_Token_Guard &guard)
308 using Guard_Release = ACE_Member_Function_Command<ACE_TP_Token_Guard>;
310 Guard_Release release(guard, &ACE_TP_Token_Guard::release_token);
311 return this->timer_queue_->expire_single(release);
315 ACE_TP_Reactor::handle_notify_events (int & /*event_count*/,
316 ACE_TP_Token_Guard &guard)
318 // Get the handle on which notify calls could have occurred
319 ACE_HANDLE notify_handle = this->get_notify_handle ();
321 int result = 0;
323 // The notify was not in the list returned by
324 // wait_for_multiple_events ().
325 if (notify_handle == ACE_INVALID_HANDLE)
326 return result;
328 // Now just do a read on the pipe..
329 ACE_Notification_Buffer buffer;
331 // Clear the handle of the read_mask of our <ready_set_>
332 this->ready_set_.rd_mask_.clr_bit (notify_handle);
334 // Keep reading notifies till we empty it or till we have a
335 // dispatchable buffer
336 while (this->notify_handler_->read_notify_pipe (notify_handle, buffer) > 0)
338 // Just figure out whether we can read any buffer that has
339 // dispatchable info. If not we have just been unblocked by
340 // another thread trying to update the reactor. If we get any
341 // buffer that needs dispatching we will dispatch that after
342 // releasing the lock
343 if (this->notify_handler_->is_dispatchable (buffer) > 0)
345 // Release the token before dispatching notifies...
346 guard.release_token ();
348 // Dispatch the upcall for the notify
349 this->notify_handler_->dispatch_notify (buffer);
351 // We had a successful dispatch.
352 result = 1;
354 // break out of the while loop
355 break;
359 // If we did some work, then we just return 1 which will allow us
360 // to get out of here. If we return 0, then we will be asked to do
361 // some work ie. dispacth socket events
362 return result;
366 ACE_TP_Reactor::handle_socket_events (int &event_count,
367 ACE_TP_Token_Guard &guard)
369 // We got the lock, lets handle some I/O events.
370 ACE_EH_Dispatch_Info dispatch_info;
372 this->get_socket_event_info (dispatch_info);
374 // If there is any event handler that is ready to be dispatched, the
375 // dispatch information is recorded in dispatch_info.
376 if (!dispatch_info.dispatch ())
378 // Check for removed handlers.
379 if (dispatch_info.event_handler_ == 0)
381 this->handler_rep_.unbind(dispatch_info.handle_,
382 dispatch_info.mask_);
386 return 0;
389 // Suspend the handler so that other threads don't start dispatching
390 // it, if we can't suspend then return directly
392 // NOTE: This check was performed in older versions of the
393 // TP_Reactor. Looks like it is a waste..
394 if (dispatch_info.event_handler_ != this->notify_handler_)
395 if (this->suspend_i (dispatch_info.handle_) == -1)
396 return 0;
398 // Call add_reference() if needed.
399 if (dispatch_info.reference_counting_required_)
400 dispatch_info.event_handler_->add_reference ();
402 // Release the lock. Others threads can start waiting.
403 guard.release_token ();
405 int result = 0;
407 // If there was an event handler ready, dispatch it.
408 // Decrement the event left
409 --event_count;
411 // Dispatched an event
412 if (this->dispatch_socket_event (dispatch_info) == 0)
413 ++result;
415 return result;
419 ACE_TP_Reactor::get_event_for_dispatching (ACE_Time_Value *max_wait_time)
421 // If the reactor handler state has changed, clear any remembered
422 // ready bits and re-scan from the master wait_set.
423 if (this->state_changed_)
425 this->ready_set_.rd_mask_.reset ();
426 this->ready_set_.wr_mask_.reset ();
427 this->ready_set_.ex_mask_.reset ();
429 this->state_changed_ = false;
431 else
433 // This is a hack... somewhere, under certain conditions (which
434 // I don't understand...) the mask will have all of its bits clear,
435 // yet have a size_ > 0. This is an attempt to remedy the affect,
436 // without knowing why it happens.
438 this->ready_set_.rd_mask_.sync (this->ready_set_.rd_mask_.max_set ());
439 this->ready_set_.wr_mask_.sync (this->ready_set_.wr_mask_.max_set ());
440 this->ready_set_.ex_mask_.sync (this->ready_set_.ex_mask_.max_set ());
443 return this->wait_for_multiple_events (this->ready_set_, max_wait_time);
447 ACE_TP_Reactor::get_socket_event_info (ACE_EH_Dispatch_Info &event)
449 // Check for dispatch in write, except, read. Only catch one, but if
450 // one is caught, be sure to clear the handle from each mask in case
451 // there is more than one mask set for it. This would cause problems
452 // if the handler is suspended for dispatching, but its set bit in
453 // another part of ready_set_ kept it from being dispatched.
454 int found_io = 0;
455 ACE_HANDLE handle;
457 // @@todo: We can do quite a bit of code reduction here. Let me get
458 // it to work before I do this.
460 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.wr_mask_);
462 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
464 if (this->is_suspended_i (handle))
465 continue;
467 // Remember this info
468 event.set (handle,
469 this->handler_rep_.find (handle),
470 ACE_Event_Handler::WRITE_MASK,
471 &ACE_Event_Handler::handle_output);
473 this->clear_handle_read_set (handle);
474 found_io = 1;
478 if (!found_io)
480 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.ex_mask_);
482 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
484 if (this->is_suspended_i (handle))
485 continue;
487 // Remember this info
488 event.set (handle,
489 this->handler_rep_.find (handle),
490 ACE_Event_Handler::EXCEPT_MASK,
491 &ACE_Event_Handler::handle_exception);
493 this->clear_handle_read_set (handle);
495 found_io = 1;
499 if (!found_io)
501 ACE_Handle_Set_Iterator handle_iter (this->ready_set_.rd_mask_);
503 while (!found_io && (handle = handle_iter ()) != ACE_INVALID_HANDLE)
505 if (this->is_suspended_i (handle))
506 continue;
508 // Remember this info
509 event.set (handle,
510 this->handler_rep_.find (handle),
511 ACE_Event_Handler::READ_MASK,
512 &ACE_Event_Handler::handle_input);
514 this->clear_handle_read_set (handle);
515 found_io = 1;
519 return found_io;
522 // Dispatches a single event handler
524 ACE_TP_Reactor::dispatch_socket_event (ACE_EH_Dispatch_Info &dispatch_info)
526 ACE_TRACE ("ACE_TP_Reactor::dispatch_socket_event");
528 ACE_Event_Handler * const event_handler = dispatch_info.event_handler_;
529 ACE_EH_PTMF const callback = dispatch_info.callback_;
531 // Check for removed handlers.
532 if (event_handler == 0)
533 return -1;
535 // Upcall. If the handler returns positive value (requesting a
536 // reactor callback) don't set the ready-bit because it will be
537 // ignored if the reactor state has changed. Just call back
538 // as many times as the handler requests it. Other threads are off
539 // handling other things.
540 int status = 1;
541 while (status > 0)
542 status = (event_handler->*callback) (dispatch_info.handle_);
544 // Post process socket event
545 return this->post_process_socket_event (dispatch_info, status);
549 ACE_TP_Reactor::post_process_socket_event (ACE_EH_Dispatch_Info &dispatch_info,
550 int status)
552 int result = 0;
554 // First check if we really have to post process something, if not, then
555 // we don't acquire the token which saves us a lot of time.
556 if (status < 0 ||
557 (dispatch_info.event_handler_ != this->notify_handler_ &&
558 dispatch_info.resume_flag_ ==
559 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER))
561 // Get the reactor token and with this token acquired remove first the
562 // handler and resume it at the same time. This must be atomic, see also
563 // bugzilla 2395. When this is not atomic it can be that we resume the
564 // handle after it is reused by the OS.
565 ACE_TP_Token_Guard guard (this->token_);
567 result = guard.acquire_token ();
569 // If the guard is NOT the owner just return the retval
570 if (!guard.is_owner ())
571 return result;
573 // A different event handler may have been registered during the
574 // upcall if the handle was closed and then reopened, for
575 // example. Make sure we're removing and/or resuming the event
576 // handler used during the upcall.
577 ACE_Event_Handler const * const eh =
578 this->handler_rep_.find (dispatch_info.handle_);
580 // Only remove or resume the event handler used during the
581 // upcall.
582 if (eh == dispatch_info.event_handler_)
584 if (status < 0)
586 result =
587 this->remove_handler_i (dispatch_info.handle_,
588 dispatch_info.mask_);
591 // Resume handler if required.
592 if (dispatch_info.event_handler_ != this->notify_handler_ &&
593 dispatch_info.resume_flag_ ==
594 ACE_Event_Handler::ACE_REACTOR_RESUMES_HANDLER)
595 this->resume_i (dispatch_info.handle_);
599 // Call remove_reference() if needed.
600 if (dispatch_info.reference_counting_required_)
601 dispatch_info.event_handler_->remove_reference ();
603 return result;
607 ACE_TP_Reactor::resumable_handler ()
609 return 1;
613 ACE_TP_Reactor::handle_events (ACE_Time_Value &max_wait_time)
615 return this->handle_events (&max_wait_time);
618 void
619 ACE_TP_Reactor::notify_handle (ACE_HANDLE,
620 ACE_Reactor_Mask,
621 ACE_Handle_Set &,
622 ACE_Event_Handler *eh,
623 ACE_EH_PTMF)
625 ACELIB_ERROR ((LM_ERROR,
626 ACE_TEXT ("ACE_TP_Reactor::notify_handle: ")
627 ACE_TEXT ("Wrong version of notify_handle() got called\n")));
629 ACE_ASSERT (eh == 0);
630 ACE_UNUSED_ARG (eh);
633 ACE_HANDLE
634 ACE_TP_Reactor::get_notify_handle ()
636 // Call the notify handler to get a handle on which we would have a
637 // notify waiting
638 ACE_HANDLE const read_handle =
639 this->notify_handler_->notify_handle ();
641 // Check whether the rd_mask has been set on that handle. If so
642 // return the handle.
643 if (read_handle != ACE_INVALID_HANDLE &&
644 this->ready_set_.rd_mask_.is_set (read_handle))
646 return read_handle;
649 // None found..
650 return ACE_INVALID_HANDLE;
653 ACE_END_VERSIONED_NAMESPACE_DECL