Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / ace / Select_Reactor_Base.cpp
blob42d38b87a13a8b7da9250e3fea5aed6b67cafd1c
1 #include "ace/Select_Reactor_Base.h"
2 #include "ace/Reactor.h"
3 #include "ace/Thread.h"
4 #include "ace/SOCK_Acceptor.h"
5 #include "ace/SOCK_Connector.h"
6 #include "ace/Timer_Queue.h"
7 #include "ace/Log_Category.h"
8 #include "ace/Signal.h"
9 #include "ace/OS_NS_fcntl.h"
11 #if !defined (__ACE_INLINE__)
12 #include "ace/Select_Reactor_Base.inl"
13 #endif /* __ACE_INLINE__ */
15 #ifndef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
16 # include <algorithm>
17 #endif /* !ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
19 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
21 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Notify)
22 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository)
23 ACE_ALLOC_HOOK_DEFINE(ACE_Select_Reactor_Handler_Repository_Iterator)
25 template<typename iterator>
26 inline ACE_Event_Handler *
27 ACE_SELECT_REACTOR_EVENT_HANDLER (iterator i)
29 #ifdef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
30 return (*i).item ();
31 #else
32 return (*i);
33 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
36 // Performs sanity checking on the ACE_HANDLE.
38 bool
39 ACE_Select_Reactor_Handler_Repository::invalid_handle (ACE_HANDLE handle)
41 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::invalid_handle");
42 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
43 // It's too expensive to perform more exhaustive validity checks on
44 // Win32 due to the way that they implement SOCKET HANDLEs.
45 if (handle == ACE_INVALID_HANDLE)
46 #else /* !ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
47 if (handle < 0
48 || static_cast<size_type> (handle) >= this->event_handlers_.size ())
49 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
51 errno = EINVAL;
52 return true;
55 return false;
58 // Performs sanity checking on the ACE_HANDLE.
60 bool
61 ACE_Select_Reactor_Handler_Repository::handle_in_range (ACE_HANDLE handle)
63 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::handle_in_range");
64 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
65 // It's too expensive to perform more exhaustive validity checks on
66 // Win32 due to the way that they implement SOCKET HANDLEs.
67 if (handle != ACE_INVALID_HANDLE)
68 #else /* !ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
69 if (handle >= 0 && handle < this->max_handlep1_)
70 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
72 return true;
75 // Don't bother setting errno. It isn't used in the select()-based
76 // reactors and incurs a TSS access.
77 // errno = EINVAL;
79 return false;
82 int
83 ACE_Select_Reactor_Handler_Repository::open (size_type size)
85 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::open");
87 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
88 if (this->event_handlers_.open (size) == -1)
89 return -1;
90 #else
91 if (this->event_handlers_.size (size) == -1)
92 return -1;
94 // Initialize the ACE_Event_Handler pointers to 0.
95 std::fill (this->event_handlers_.begin (),
96 this->event_handlers_.end (),
97 static_cast<ACE_Event_Handler *> (0));
99 this->max_handlep1_ = 0;
100 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
102 // Try to increase the number of handles if <size> is greater than
103 // the current limit.
104 return ACE::set_handle_limit (static_cast<int> (size), 1);
107 // Initialize a repository of the appropriate <size>.
109 ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository (ACE_Select_Reactor_Impl &select_reactor)
110 : select_reactor_ (select_reactor)
111 #ifndef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
112 , max_handlep1_ (0)
113 #endif /* !ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
115 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::ACE_Select_Reactor_Handler_Repository");
119 ACE_Select_Reactor_Handler_Repository::unbind_all ()
121 // Unbind all of the <handle, ACE_Event_Handler>s.
122 #ifdef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
123 map_type::iterator const end = this->event_handlers_.end ();
124 for (map_type::iterator pos = this->event_handlers_.begin ();
125 pos != end;
128 // Post-increment (*not* pre-increment) before unbind()ing since
129 // the current iterator will be invalidated during the unbind()
130 // operation.
131 map_type::iterator const the_pos (pos++);
133 ACE_HANDLE const handle = (*the_pos).key ();
134 (void) this->unbind (handle,
135 the_pos,
136 ACE_Event_Handler::ALL_EVENTS_MASK);
138 #else
139 // We could use the "end()" iterator but leveraging max_handlep1_
140 // allows us to optimize away unnecessary accesses of nil event
141 // handler pointers.
142 map_type::iterator pos =
143 this->event_handlers_.begin (); // iterator == ACE_Event_Handler*
145 for (ACE_HANDLE handle = 0;
146 handle < this->max_handlep1_;
147 ++handle)
149 (void) this->unbind (handle,
150 pos,
151 ACE_Event_Handler::ALL_EVENTS_MASK);
152 ++pos;
154 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
156 return 0;
160 ACE_Select_Reactor_Handler_Repository::close ()
162 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::close");
164 return this->unbind_all ();
167 ACE_Select_Reactor_Handler_Repository::map_type::iterator
168 ACE_Select_Reactor_Handler_Repository::find_eh (ACE_HANDLE handle)
170 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::find_eh");
172 map_type::iterator pos (this->event_handlers_.end ());
174 // this code assumes the handle is in range.
175 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
176 this->event_handlers_.find (handle, pos);
177 #else
178 map_type::iterator const tmp = &this->event_handlers_[handle];
180 if (*tmp != 0)
181 pos = tmp;
182 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
184 return pos;
187 // Bind the <ACE_Event_Handler *> to the <ACE_HANDLE>.
189 ACE_Select_Reactor_Handler_Repository::bind (ACE_HANDLE handle,
190 ACE_Event_Handler *event_handler,
191 ACE_Reactor_Mask mask)
193 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::bind");
195 if (event_handler == 0)
196 return -1;
198 if (handle == ACE_INVALID_HANDLE)
199 handle = event_handler->get_handle ();
201 if (this->invalid_handle (handle))
202 return -1;
204 // Is this handle already in the Reactor?
205 bool existing_handle = false;
207 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
209 map_type::ENTRY * entry = 0;
211 int const result =
212 this->event_handlers_.bind (handle, event_handler, entry);
214 if (result == -1)
216 return -1;
218 else if (result == 1) // Entry already exists.
220 // Cannot use a different handler for an existing handle.
221 if (event_handler != entry->item ())
223 return -1;
225 else
227 // Remember that this handle is already registered in the
228 // Reactor.
229 existing_handle = true;
233 #else
235 // Check if this handle is already registered.
236 ACE_Event_Handler * const current_handler =
237 this->event_handlers_[handle];
239 if (current_handler)
241 // Cannot use a different handler for an existing handle.
242 if (current_handler != event_handler)
243 return -1;
245 // Remember that this handle is already registered in the
246 // Reactor.
247 existing_handle = true;
250 this->event_handlers_[handle] = event_handler;
252 if (this->max_handlep1_ < handle + 1)
253 this->max_handlep1_ = handle + 1;
255 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
257 if (this->select_reactor_.is_suspended_i (handle))
259 this->select_reactor_.bit_ops (handle,
260 mask,
261 this->select_reactor_.suspend_set_,
262 ACE_Reactor::ADD_MASK);
264 else
266 this->select_reactor_.bit_ops (handle,
267 mask,
268 this->select_reactor_.wait_set_,
269 ACE_Reactor::ADD_MASK);
271 // Note the fact that we've changed the state of the <wait_set_>,
272 // which is used by the dispatching loop to determine whether it can
273 // keep going or if it needs to reconsult select().
274 // this->select_reactor_.state_changed_ = 1;
277 // If new entry, call add_reference() if needed.
278 if (!existing_handle)
279 event_handler->add_reference ();
281 return 0;
284 // Remove the binding of <ACE_HANDLE>.
287 ACE_Select_Reactor_Handler_Repository::unbind (
288 ACE_HANDLE handle,
289 map_type::iterator pos,
290 ACE_Reactor_Mask mask)
292 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::unbind");
294 // Retrieve event handler before unbinding it from the map. The
295 // iterator pointing to it will no longer be valid once the handler
296 // is unbound.
297 ACE_Event_Handler * const event_handler =
298 (pos == this->event_handlers_.end ()
300 : ACE_SELECT_REACTOR_EVENT_HANDLER (pos));
302 // Clear out the <mask> bits in the Select_Reactor's wait_set.
303 this->select_reactor_.bit_ops (handle,
304 mask,
305 this->select_reactor_.wait_set_,
306 ACE_Reactor::CLR_MASK);
308 // And suspend_set.
309 this->select_reactor_.bit_ops (handle,
310 mask,
311 this->select_reactor_.suspend_set_,
312 ACE_Reactor::CLR_MASK);
314 // Note the fact that we've changed the state of the <wait_set_>,
315 // which is used by the dispatching loop to determine whether it can
316 // keep going or if it needs to reconsult select().
317 // this->select_reactor_.state_changed_ = 1;
319 // If there are no longer any outstanding events on this <handle>
320 // then we can totally shut down the Event_Handler.
322 bool const has_any_wait_mask =
323 (this->select_reactor_.wait_set_.rd_mask_.is_set (handle)
324 || this->select_reactor_.wait_set_.wr_mask_.is_set (handle)
325 || this->select_reactor_.wait_set_.ex_mask_.is_set (handle));
326 bool const has_any_suspend_mask =
327 (this->select_reactor_.suspend_set_.rd_mask_.is_set (handle)
328 || this->select_reactor_.suspend_set_.wr_mask_.is_set (handle)
329 || this->select_reactor_.suspend_set_.ex_mask_.is_set (handle));
331 bool complete_removal = false;
333 if (!has_any_wait_mask && !has_any_suspend_mask)
335 #if defined (ACE_SELECT_REACTOR_BASE_USES_HASH_MAP)
336 if (event_handler != 0 && this->event_handlers_.unbind (pos) == -1)
337 return -1; // Should not happen!
338 #else
339 this->event_handlers_[handle] = 0;
341 if (this->max_handlep1_ == handle + 1)
343 // We've deleted the last entry, so we need to figure out
344 // the last valid place in the array that is worth looking
345 // at.
346 ACE_HANDLE const wait_rd_max =
347 this->select_reactor_.wait_set_.rd_mask_.max_set ();
348 ACE_HANDLE const wait_wr_max =
349 this->select_reactor_.wait_set_.wr_mask_.max_set ();
350 ACE_HANDLE const wait_ex_max =
351 this->select_reactor_.wait_set_.ex_mask_.max_set ();
353 ACE_HANDLE const suspend_rd_max =
354 this->select_reactor_.suspend_set_.rd_mask_.max_set ();
355 ACE_HANDLE const suspend_wr_max =
356 this->select_reactor_.suspend_set_.wr_mask_.max_set ();
357 ACE_HANDLE const suspend_ex_max =
358 this->select_reactor_.suspend_set_.ex_mask_.max_set ();
360 // Compute the maximum of six values.
361 this->max_handlep1_ = wait_rd_max;
362 if (this->max_handlep1_ < wait_wr_max)
363 this->max_handlep1_ = wait_wr_max;
364 if (this->max_handlep1_ < wait_ex_max)
365 this->max_handlep1_ = wait_ex_max;
367 if (this->max_handlep1_ < suspend_rd_max)
368 this->max_handlep1_ = suspend_rd_max;
369 if (this->max_handlep1_ < suspend_wr_max)
370 this->max_handlep1_ = suspend_wr_max;
371 if (this->max_handlep1_ < suspend_ex_max)
372 this->max_handlep1_ = suspend_ex_max;
374 ++this->max_handlep1_;
377 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
379 // The handle has been completely removed.
380 complete_removal = true;
383 if (event_handler == 0)
384 return -1;
386 bool const requires_reference_counting =
387 event_handler->reference_counting_policy ().value () ==
388 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
390 // Close down the <Event_Handler> unless we've been instructed not
391 // to.
392 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::DONT_CALL) == 0)
393 (void) event_handler->handle_close (handle, mask);
395 // Call remove_reference() if the removal is complete and reference
396 // counting is needed.
397 if (complete_removal && requires_reference_counting)
399 (void) event_handler->remove_reference ();
402 return 0;
405 ACE_Select_Reactor_Handler_Repository_Iterator::ACE_Select_Reactor_Handler_Repository_Iterator
406 (ACE_Select_Reactor_Handler_Repository const * s)
407 : rep_ (s),
408 current_ (s->event_handlers_.begin ())
410 #ifndef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
411 // Don't use ACE_Array_Base::end() since it may be larger than
412 // event_handlers[max_handlep1_].
413 const_base_iterator const end =
414 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
416 // Advance to the next element containing a non-zero event handler.
417 // There's no need to do this for the Windows case since the hash
418 // map will only contain non-zero event handlers.
419 while (this->current_ != end && (*(this->current_) == 0))
420 ++this->current_;
421 #endif
424 // Pass back the <next_item> that hasn't been seen in the Set.
425 // Returns 0 when all items have been seen, else 1.
427 bool
428 ACE_Select_Reactor_Handler_Repository_Iterator::next (
429 ACE_Event_Handler *&next_item)
431 bool result = true;
433 if (this->done ())
434 result = false;
435 else
436 next_item = ACE_SELECT_REACTOR_EVENT_HANDLER (this->current_);
438 return result;
441 // Move forward by one element in the set.
443 bool
444 ACE_Select_Reactor_Handler_Repository_Iterator::advance ()
446 #ifdef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
447 // No need to explicitly limit search to "current" to
448 // max_handlep1_ range.
449 const_base_iterator const end = this->rep_->event_handlers_.end ();
450 #else
451 // Don't use ACE_Array_Base::end() since it may be larger than
452 // event_handlers[max_handlep1_].
453 const_base_iterator const end =
454 &this->rep_->event_handlers_[this->rep_->max_handlep1 ()];
455 #endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
457 if (this->current_ != end)
458 ++this->current_;
460 #ifndef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
461 // Advance to the next element containing a non-zero event handler.
462 // There's no need to do this for the Windows case since the hash
463 // map will only contain non-zero event handlers.
464 while (this->current_ != end && (*(this->current_) == 0))
465 ++this->current_;
466 #endif /* !ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
468 return this->current_ != end;
471 // Dump the state of an object.
473 void
474 ACE_Select_Reactor_Handler_Repository_Iterator::dump () const
476 #if defined (ACE_HAS_DUMP)
477 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository_Iterator::dump");
479 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
480 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("rep_ = %u"), this->rep_));
481 # ifdef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
482 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = ")));
483 this->current_.dump ();
484 # else
485 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("current_ = %@"), this->current_));
486 # endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
487 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
488 #endif /* ACE_HAS_DUMP */
491 void
492 ACE_Select_Reactor_Handler_Repository::dump () const
494 #if defined (ACE_HAS_DUMP)
495 ACE_TRACE ("ACE_Select_Reactor_Handler_Repository::dump");
497 # ifdef ACE_SELECT_REACTOR_BASE_USES_HASH_MAP
498 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%@")
499 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%u")
500 # else
501 # define ACE_HANDLE_FORMAT_SPECIFIER ACE_TEXT("%d")
502 # define ACE_MAX_HANDLEP1_FORMAT_SPECIFIER ACE_TEXT("%d")
503 # endif /* ACE_SELECT_REACTOR_BASE_USES_HASH_MAP */
505 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
506 ACELIB_DEBUG ((LM_DEBUG,
507 ACE_TEXT ("max_handlep1_ = ")
508 ACE_MAX_HANDLEP1_FORMAT_SPECIFIER
509 ACE_TEXT ("\n"),
510 this->max_handlep1 ()));
511 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("[")));
513 ACE_Event_Handler *event_handler = 0;
515 for (ACE_Select_Reactor_Handler_Repository_Iterator iter (this);
516 iter.next (event_handler) != 0;
517 iter.advance ())
518 ACELIB_DEBUG ((LM_DEBUG,
519 ACE_TEXT (" (event_handler = %@,")
520 ACE_TEXT (" event_handler->handle_ = ")
521 ACE_HANDLE_FORMAT_SPECIFIER
522 ACE_TEXT ("\n"),
523 event_handler,
524 event_handler->get_handle ()));
526 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT (" ]\n")));
527 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
528 #endif /* ACE_HAS_DUMP */
531 ACE_Select_Reactor_Notify::ACE_Select_Reactor_Notify ()
532 : select_reactor_ (0)
533 , max_notify_iterations_ (-1)
537 ACE_Select_Reactor_Notify::~ACE_Select_Reactor_Notify ()
541 void
542 ACE_Select_Reactor_Notify::max_notify_iterations (int iterations)
544 // Must always be > 0 or < 0 to optimize the loop exit condition.
545 if (iterations == 0)
546 iterations = 1;
548 this->max_notify_iterations_ = iterations;
552 ACE_Select_Reactor_Notify::max_notify_iterations ()
554 return this->max_notify_iterations_;
557 // purge_pending_notifications
558 // Removes all entries from the notify_queue_ and each one that
559 // matches <eh> is put on the free_queue_. The rest are saved on a
560 // local queue and copied back to the notify_queue_ at the end.
561 // Returns the number of entries removed. Returns -1 on error.
562 // ACE_NOTSUP_RETURN if ACE_HAS_REACTOR_NOTIFICATION_QUEUE is not defined.
564 ACE_Select_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
565 ACE_Reactor_Mask mask )
567 ACE_TRACE ("ACE_Select_Reactor_Notify::purge_pending_notifications");
569 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
571 return notification_queue_.purge_pending_notifications(eh, mask);
573 #else /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
574 ACE_UNUSED_ARG (eh);
575 ACE_UNUSED_ARG (mask);
576 ACE_NOTSUP_RETURN (-1);
577 #endif /* defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE) */
580 void
581 ACE_Select_Reactor_Notify::dump () const
583 #if defined (ACE_HAS_DUMP)
584 ACE_TRACE ("ACE_Select_Reactor_Notify::dump");
586 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
587 ACELIB_DEBUG ((LM_DEBUG, ACE_TEXT ("select_reactor_ = %x"), this->select_reactor_));
588 this->notification_pipe_.dump ();
589 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
590 #endif /* ACE_HAS_DUMP */
594 ACE_Select_Reactor_Notify::open (ACE_Reactor_Impl *r,
595 ACE_Timer_Queue *,
596 int disable_notify_pipe)
598 ACE_TRACE ("ACE_Select_Reactor_Notify::open");
600 if (disable_notify_pipe == 0)
602 this->select_reactor_ = dynamic_cast<ACE_Select_Reactor_Impl *> (r);
604 if (select_reactor_ == 0)
606 errno = EINVAL;
607 return -1;
610 if (this->notification_pipe_.open () == -1)
611 return -1;
612 #if defined (F_SETFD) && !defined (ACE_LACKS_FCNTL)
613 if (ACE_OS::fcntl (this->notification_pipe_.read_handle (), F_SETFD, 1) == -1)
615 return -1;
618 if (ACE_OS::fcntl (this->notification_pipe_.write_handle (), F_SETFD, 1) == -1)
620 return -1;
622 #endif /* F_SETFD */
624 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
625 if (notification_queue_.open() == -1)
627 return -1;
630 # if defined (ACE_LACKS_LISTEN) && defined (ACE_LACKS_SOCKETPAIR) \
631 && !defined (ACE_HAS_STREAM_PIPES)
632 if (ACE::set_flags (this->notification_pipe_.write_handle (),
633 ACE_NONBLOCK) == -1)
634 return -1;
635 # endif
636 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
638 // There seems to be a Win32 bug with this... Set this into
639 // non-blocking mode.
640 if (ACE::set_flags (this->notification_pipe_.read_handle (),
641 ACE_NONBLOCK) == -1)
642 return -1;
643 else
644 return this->select_reactor_->register_handler
645 (this->notification_pipe_.read_handle (),
646 this,
647 ACE_Event_Handler::READ_MASK);
649 else
651 this->select_reactor_ = 0;
652 return 0;
657 ACE_Select_Reactor_Notify::close ()
659 ACE_TRACE ("ACE_Select_Reactor_Notify::close");
661 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
662 notification_queue_.reset();
663 #else
664 if (this->notification_pipe_.read_handle() != ACE_INVALID_HANDLE)
666 // Please see Bug 2820, if we just close the pipe then we break
667 // the reference counting rules. Basically, all the event
668 // handlers "stored" in the pipe had their reference counts
669 // increased. We need to decrease them before closing the
670 // pipe....
671 ACE_Notification_Buffer b;
672 for (int r = read_notify_pipe(notification_pipe_.read_handle(), b);
673 r > 0;
674 r = read_notify_pipe(notification_pipe_.read_handle(), b))
676 if (b.eh_ != 0)
678 b.eh_->remove_reference();
682 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
684 return this->notification_pipe_.close ();
688 ACE_Select_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
689 ACE_Reactor_Mask mask,
690 ACE_Time_Value *timeout)
692 ACE_TRACE ("ACE_Select_Reactor_Notify::notify");
694 // Just consider this method a "no-op" if there's no
695 // <ACE_Select_Reactor> configured.
696 if (this->select_reactor_ == 0)
697 return 0;
699 ACE_Event_Handler_var safe_handler (event_handler);
701 if (event_handler)
703 event_handler->add_reference ();
706 ACE_Notification_Buffer buffer (event_handler, mask);
708 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
709 int const notification_required =
710 notification_queue_.push_new_notification(buffer);
712 if (notification_required == -1)
714 return -1;
717 if (notification_required == 0)
719 // No failures, the handler is now owned by the notification queue
720 safe_handler.release ();
722 return 0;
724 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
726 ssize_t const n = ACE::send (this->notification_pipe_.write_handle (),
727 (char *) &buffer,
728 sizeof buffer,
729 timeout);
730 if (n == -1)
732 return -1;
735 // No failures.
736 safe_handler.release ();
738 return 0;
741 // Handles pending threads (if any) that are waiting to unblock the
742 // Select_Reactor.
745 ACE_Select_Reactor_Notify::dispatch_notifications (int &number_of_active_handles,
746 ACE_Handle_Set &rd_mask)
748 ACE_TRACE ("ACE_Select_Reactor_Notify::dispatch_notifications");
750 ACE_HANDLE const read_handle =
751 this->notification_pipe_.read_handle ();
753 if (read_handle != ACE_INVALID_HANDLE
754 && rd_mask.is_set (read_handle))
756 --number_of_active_handles;
757 rd_mask.clr_bit (read_handle);
758 return this->handle_input (read_handle);
760 else
761 return 0;
765 ACE_HANDLE
766 ACE_Select_Reactor_Notify::notify_handle ()
768 ACE_TRACE ("ACE_Select_Reactor_Notify::notify_handle");
770 return this->notification_pipe_.read_handle ();
775 ACE_Select_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer &buffer)
777 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
778 ACE_UNUSED_ARG(buffer);
779 return 1;
780 #else
781 // If eh == 0 then another thread is unblocking the
782 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
783 // internal structures. Otherwise, we need to dispatch the
784 // appropriate handle_* method on the <ACE_Event_Handler>
785 // pointer we've been passed.
786 if (buffer.eh_ != 0)
788 return 1;
790 else
792 // has no dispatchable buffer
793 return 0;
795 #endif /*ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
799 ACE_Select_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &buffer)
801 int result = 0;
803 #if defined (ACE_HAS_REACTOR_NOTIFICATION_QUEUE)
804 // Dispatch one message from the notify queue, and put another in
805 // the pipe if one is available. Remember, the idea is to keep
806 // exactly one message in the pipe at a time.
808 bool more_messages_queued = false;
809 ACE_Notification_Buffer next;
811 result = notification_queue_.pop_next_notification(buffer,
812 more_messages_queued,
813 next);
815 if (result == 0 || result == -1)
817 return result;
820 if(more_messages_queued)
822 (void) ACE::send(this->notification_pipe_.write_handle(),
823 (char *)&next, sizeof(ACE_Notification_Buffer));
825 #endif /* ACE_HAS_REACTOR_NOTIFICATION_QUEUE */
827 // If eh == 0 then another thread is unblocking the
828 // <ACE_Select_Reactor> to update the <ACE_Select_Reactor>'s
829 // internal structures. Otherwise, we need to dispatch the
830 // appropriate handle_* method on the <ACE_Event_Handler> pointer
831 // we've been passed.
832 if (buffer.eh_ != 0)
834 ACE_Event_Handler *event_handler = buffer.eh_;
836 bool const requires_reference_counting =
837 event_handler->reference_counting_policy ().value () ==
838 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
840 switch (buffer.mask_)
842 case ACE_Event_Handler::READ_MASK:
843 case ACE_Event_Handler::ACCEPT_MASK:
844 result = event_handler->handle_input (ACE_INVALID_HANDLE);
845 break;
846 case ACE_Event_Handler::WRITE_MASK:
847 result = event_handler->handle_output (ACE_INVALID_HANDLE);
848 break;
849 case ACE_Event_Handler::EXCEPT_MASK:
850 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
851 break;
852 case ACE_Event_Handler::QOS_MASK:
853 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
854 break;
855 case ACE_Event_Handler::GROUP_QOS_MASK:
856 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
857 break;
858 default:
859 // Should we bail out if we get an invalid mask?
860 ACELIB_ERROR ((LM_ERROR,
861 ACE_TEXT ("invalid mask = %d\n"),
862 buffer.mask_));
865 if (result == -1)
866 event_handler->handle_close (ACE_INVALID_HANDLE,
867 ACE_Event_Handler::EXCEPT_MASK);
869 if (requires_reference_counting)
871 event_handler->remove_reference ();
875 return 1;
879 ACE_Select_Reactor_Notify::read_notify_pipe (ACE_HANDLE handle,
880 ACE_Notification_Buffer &buffer)
882 ACE_TRACE ("ACE_Select_Reactor_Notify::read_notify_pipe");
884 // This is kind of a weird, fragile beast. We first read with a
885 // regular read. The read side of this socket is non-blocking, so
886 // the read may end up being short.
888 // If the read is short, then we do a recv_n to insure that we block
889 // and read the rest of the buffer.
891 // Now, you might be tempted to say, "why don't we just replace the
892 // first recv with a recv_n?" I was, too. But that doesn't work
893 // because of how the calling code in handle_input() works. In
894 // handle_input, the event will only be dispatched if the return
895 // value from read_notify_pipe() is > 0. That means that we can't
896 // return zero from this func unless it's an EOF condition.
898 // Thus, the return value semantics for this are:
899 // -1: nothing read, fatal, unrecoverable error
900 // 0: nothing read at all
901 // 1: complete buffer read
903 ssize_t const n = ACE::recv (handle, (char *) &buffer, sizeof buffer);
905 if (n > 0)
907 // Check to see if we've got a short read.
908 if ((size_t)n != sizeof buffer)
910 ssize_t const remainder = sizeof buffer - n;
912 // If so, try to recover by reading the remainder. If this
913 // doesn't work we're in big trouble since the input stream
914 // won't be aligned correctly. I'm not sure quite what to
915 // do at this point. It's probably best just to return -1.
916 if (ACE::recv_n (handle,
917 ((char *) &buffer) + n,
918 remainder) != remainder)
919 return -1;
922 return 1;
925 // Return -1 if things have gone seriously wrong.
926 if (n <= 0 && (errno != EWOULDBLOCK && errno != EAGAIN))
927 return -1;
929 return 0;
934 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
936 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
937 // Precondition: this->select_reactor_.token_.current_owner () ==
938 // ACE_Thread::self ();
940 int number_dispatched = 0;
941 int result = 0;
942 ACE_Notification_Buffer buffer;
944 // If there is only one buffer in the pipe, this will loop and call
945 // read_notify_pipe() twice. The first time will read the buffer, and
946 // the second will read the fact that the pipe is empty.
947 while ((result = this->read_notify_pipe (handle, buffer)) > 0)
949 // Dispatch the buffer
950 // NOTE: We count only if we made any dispatches ie. upcalls.
951 if (this->dispatch_notify (buffer) > 0)
952 ++number_dispatched;
954 // Bail out if we've reached the <notify_threshold_>. Note that
955 // by default <notify_threshold_> is -1, so we'll loop until all
956 // the notifications in the pipe have been dispatched.
957 if (number_dispatched == this->max_notify_iterations_)
958 break;
961 // Reassign number_dispatched to -1 if things have gone seriously
962 // wrong.
963 if (result < 0)
964 number_dispatched = -1;
966 // Enqueue ourselves into the list of waiting threads. When we
967 // reacquire the token we'll be off and running again with ownership
968 // of the token. The postcondition of this call is that
969 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
970 this->select_reactor_->renew ();
971 return number_dispatched;
974 // -------------------------------------------
977 ACE_Select_Reactor_Impl::purge_pending_notifications (ACE_Event_Handler *eh,
978 ACE_Reactor_Mask mask)
980 if (this->notify_handler_ == 0)
981 return 0;
982 else
983 return this->notify_handler_->purge_pending_notifications (eh, mask);
987 // Perform GET, CLR, SET, and ADD operations on the Handle_Sets.
989 // GET = 1, Retrieve current value
990 // SET = 2, Set value of bits to new mask (changes the entire mask)
991 // ADD = 3, Bitwise "or" the value into the mask (only changes
992 // enabled bits)
993 // CLR = 4 Bitwise "and" the negation of the value out of the mask
994 // (only changes enabled bits)
996 // Returns the original mask. Must be called with locks held.
998 ACE_Select_Reactor_Impl::bit_ops (ACE_HANDLE handle,
999 ACE_Reactor_Mask mask,
1000 ACE_Select_Reactor_Handle_Set &handle_set,
1001 int ops)
1003 ACE_TRACE ("ACE_Select_Reactor_Impl::bit_ops");
1004 if (this->handler_rep_.handle_in_range (handle) == 0)
1005 return -1;
1007 #if !defined (ACE_WIN32)
1008 ACE_Sig_Guard sb (0,
1009 this->mask_signals_); // Block out all signals until method returns.
1010 #endif /* ACE_WIN32 */
1012 ACE_FDS_PTMF ptmf = &ACE_Handle_Set::set_bit;
1013 u_long omask = ACE_Event_Handler::NULL_MASK;
1015 // Find the old reactor masks. This automatically does the work of
1016 // the GET_MASK operation.
1017 if (handle_set.rd_mask_.is_set (handle))
1018 ACE_SET_BITS (omask, ACE_Event_Handler::READ_MASK);
1019 if (handle_set.wr_mask_.is_set (handle))
1020 ACE_SET_BITS (omask, ACE_Event_Handler::WRITE_MASK);
1021 if (handle_set.ex_mask_.is_set (handle))
1022 ACE_SET_BITS (omask, ACE_Event_Handler::EXCEPT_MASK);
1024 switch (ops)
1026 case ACE_Reactor::GET_MASK:
1027 // The work for this operation is done in all cases at the
1028 // beginning of the function.
1029 break;
1030 case ACE_Reactor::CLR_MASK:
1031 ptmf = &ACE_Handle_Set::clr_bit;
1032 // State was changed. we need to reflect that change in the
1033 // dispatch_mask I assume that only ACE_Reactor::CLR_MASK should
1034 // be treated here which means we need to clear the handle|mask
1035 // from the current dispatch handler
1036 this->clear_dispatch_mask (handle, mask);
1037 ACE_FALLTHROUGH;
1038 case ACE_Reactor::SET_MASK:
1039 ACE_FALLTHROUGH;
1040 case ACE_Reactor::ADD_MASK:
1042 // The following code is rather subtle... Note that if we are
1043 // doing a ACE_Reactor::SET_MASK then if the bit is not enabled
1044 // in the mask we need to clear the bit from the ACE_Handle_Set.
1045 // On the other hand, if we are doing a ACE_Reactor::CLR_MASK or
1046 // a ACE_Reactor::ADD_MASK we just carry out the operations
1047 // specified by the mask.
1049 // READ, ACCEPT, and CONNECT flag will place the handle in the
1050 // read set.
1051 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK)
1052 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK)
1053 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK))
1055 (handle_set.rd_mask_.*ptmf) (handle);
1057 else if (ops == ACE_Reactor::SET_MASK)
1058 handle_set.rd_mask_.clr_bit (handle);
1060 // WRITE and CONNECT flag will place the handle in the write set
1061 if (ACE_BIT_ENABLED (mask,
1062 ACE_Event_Handler::WRITE_MASK)
1063 || ACE_BIT_ENABLED (mask,
1064 ACE_Event_Handler::CONNECT_MASK))
1066 (handle_set.wr_mask_.*ptmf) (handle);
1068 else if (ops == ACE_Reactor::SET_MASK)
1069 handle_set.wr_mask_.clr_bit (handle);
1071 // EXCEPT (and CONNECT on Win32) flag will place the handle in
1072 // the except set.
1073 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK)
1074 #if defined (ACE_WIN32)
1075 || ACE_BIT_ENABLED (mask, ACE_Event_Handler::CONNECT_MASK)
1076 #endif /* ACE_WIN32 */
1079 (handle_set.ex_mask_.*ptmf) (handle);
1081 else if (ops == ACE_Reactor::SET_MASK)
1082 handle_set.ex_mask_.clr_bit (handle);
1083 break;
1084 default:
1085 return -1;
1087 return omask;
1090 void
1091 ACE_Select_Reactor_Impl::clear_dispatch_mask (ACE_HANDLE handle,
1092 ACE_Reactor_Mask mask)
1094 ACE_TRACE ("ACE_Select_Reactor_Impl::clear_dispatch_mask");
1096 // Use handle and mask in order to modify the sets
1097 // (wait/suspend/ready/dispatch), that way, the dispatch_io_set loop
1098 // will not be interrupt, and there will no reason to rescan the
1099 // wait_set and re-calling select function, which is *very*
1100 // expensive. It seems that wait/suspend/ready sets are getting
1101 // updated in register/remove bind/unbind etc functions. The only
1102 // thing need to be updated is the dispatch_set (also can be found
1103 // in that file code as dispatch_mask). Because of that, we need
1104 // that dispatch_set to be member of the ACE_Select_Reactor_impl in
1105 // Select_Reactor_Base.h file That way we will have access to that
1106 // member in that function.
1108 // We kind of invalidate the iterator in dispatch_io_set because its
1109 // an array and index built from the original dispatch-set. Take a
1110 // look at dispatch_io_set for more details.
1112 // We only need to clr_bit, because we are interested in clearing the
1113 // handles that was removed, so no dispatching to these handles will
1114 // occur.
1115 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::READ_MASK) ||
1116 ACE_BIT_ENABLED (mask, ACE_Event_Handler::ACCEPT_MASK))
1118 this->dispatch_set_.rd_mask_.clr_bit (handle);
1120 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::WRITE_MASK))
1122 this->dispatch_set_.wr_mask_.clr_bit (handle);
1124 if (ACE_BIT_ENABLED (mask, ACE_Event_Handler::EXCEPT_MASK))
1126 this->dispatch_set_.ex_mask_.clr_bit (handle);
1129 // That will make the dispatch_io_set iterator re-start and rescan
1130 // the dispatch set.
1131 this->state_changed_ = true;
1136 ACE_Select_Reactor_Impl::resumable_handler ()
1138 // The select reactor has no handlers that can be resumed by the
1139 // application. So return 0;
1141 return 0;
1144 ACE_END_VERSIONED_NAMESPACE_DECL