Changes to attempt to silence bcc64x
[ACE_TAO.git] / ACE / ace / WFMO_Reactor.cpp
blobcf935f9dd45b5bf83ad20165d29410cfa5d8e2ce
1 #include "ace/WFMO_Reactor.h"
3 #if defined (ACE_WIN32)
5 #include "ace/Handle_Set.h"
6 #include "ace/Timer_Heap.h"
7 #include "ace/Thread.h"
8 #include "ace/OS_NS_errno.h"
9 #include "ace/Null_Condition.h"
11 #if !defined (__ACE_INLINE__)
12 #include "ace/WFMO_Reactor.inl"
13 #endif /* __ACE_INLINE__ */
15 #include <memory>
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
20 : wfmo_reactor_ (wfmo_reactor)
24 int
25 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
27 if (size > MAXIMUM_WAIT_OBJECTS)
28 ACELIB_ERROR_RETURN ((LM_ERROR,
29 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
30 size,
31 MAXIMUM_WAIT_OBJECTS),
32 -1);
34 // Dynamic allocation
35 ACE_NEW_RETURN (this->current_handles_,
36 ACE_HANDLE[size],
37 -1);
38 ACE_NEW_RETURN (this->current_info_,
39 Current_Info[size],
40 -1);
41 ACE_NEW_RETURN (this->current_suspended_info_,
42 Suspended_Info[size],
43 -1);
44 ACE_NEW_RETURN (this->to_be_added_info_,
45 To_Be_Added_Info[size],
46 -1);
48 // Initialization
49 this->max_size_ = size;
50 this->max_handlep1_ = 0;
51 this->suspended_handles_ = 0;
52 this->handles_to_be_added_ = 0;
53 this->handles_to_be_deleted_ = 0;
54 this->handles_to_be_suspended_ = 0;
55 this->handles_to_be_resumed_ = 0;
57 for (size_t i = 0; i < size; ++i)
58 this->current_handles_[i] = ACE_INVALID_HANDLE;
60 return 0;
63 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository ()
65 // Free up dynamically allocated space
66 delete [] this->current_handles_;
67 delete [] this->current_info_;
68 delete [] this->current_suspended_info_;
69 delete [] this->to_be_added_info_;
72 ACE_Reactor_Mask
73 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
74 ACE_Reactor_Mask change_masks,
75 int operation)
77 // Find the old reactor masks. This automatically does the work of
78 // the GET_MASK operation.
80 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
82 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
83 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
84 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
86 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
87 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
89 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
90 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
92 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
93 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
95 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
96 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
98 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
99 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
101 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
102 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
104 switch (operation)
106 case ACE_Reactor::CLR_MASK:
107 // For the CLR_MASK operation, clear only the specific masks.
109 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
111 ACE_CLR_BITS (existing_masks, FD_READ);
112 ACE_CLR_BITS (existing_masks, FD_CLOSE);
115 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
116 ACE_CLR_BITS (existing_masks, FD_WRITE);
118 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
119 ACE_CLR_BITS (existing_masks, FD_OOB);
121 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
122 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
124 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
125 ACE_CLR_BITS (existing_masks, FD_CONNECT);
127 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
128 ACE_CLR_BITS (existing_masks, FD_QOS);
130 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
131 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
133 break;
135 case ACE_Reactor::SET_MASK:
136 // If the operation is a set, first reset any existing masks
138 existing_masks = 0;
139 /* FALLTHRU */
141 case ACE_Reactor::ADD_MASK:
142 // For the ADD_MASK and the SET_MASK operation, add only the
143 // specific masks.
145 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
147 ACE_SET_BITS (existing_masks, FD_READ);
148 ACE_SET_BITS (existing_masks, FD_CLOSE);
151 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
152 ACE_SET_BITS (existing_masks, FD_WRITE);
154 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
155 ACE_SET_BITS (existing_masks, FD_OOB);
157 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
158 ACE_SET_BITS (existing_masks, FD_ACCEPT);
160 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
161 ACE_SET_BITS (existing_masks, FD_CONNECT);
163 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
164 ACE_SET_BITS (existing_masks, FD_QOS);
166 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
167 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
169 break;
171 case ACE_Reactor::GET_MASK:
173 // The work for this operation is done in all cases at the
174 // beginning of the function.
176 ACE_UNUSED_ARG (change_masks);
178 break;
181 return old_masks;
185 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
186 ACE_Reactor_Mask mask,
187 bool &changes_required)
189 bool error = false;
191 // Remember this value; only if it changes do we need to wakeup
192 // the other threads
193 size_t const original_handle_count = this->handles_to_be_deleted_;
194 size_t i;
196 // Go through all the handles looking for <handle>. Even if we find
197 // it, we continue through the rest of the list since <handle> could
198 // appear multiple times. All handles are checked.
200 // First check the current entries
201 for (i = 0; i < this->max_handlep1_ && !error; ++i)
202 // Since the handle can either be the event or the I/O handle,
203 // we have to check both
204 if ((this->current_handles_[i] == handle
205 || this->current_info_[i].io_handle_ == handle)
206 && // Make sure that it is not already marked for deleted
207 !this->current_info_[i].delete_entry_)
209 if (this->remove_handler_i (i, mask) == -1)
210 error = true;
213 // Then check the suspended entries
214 for (i = 0; i < this->suspended_handles_ && !error; ++i)
215 // Since the handle can either be the event or the I/O handle, we
216 // have to check both
217 if ((this->current_suspended_info_[i].io_handle_ == handle
218 || this->current_suspended_info_[i].event_handle_ == handle)
220 // Make sure that it is not already marked for deleted
221 !this->current_suspended_info_[i].delete_entry_)
223 if (this->remove_suspended_handler_i (i, mask) == -1)
224 error = true;
227 // Then check the to_be_added entries
228 for (i = 0; i < this->handles_to_be_added_ && !error; ++i)
229 // Since the handle can either be the event or the I/O handle,
230 // we have to check both
231 if ((this->to_be_added_info_[i].io_handle_ == handle
232 || this->to_be_added_info_[i].event_handle_ == handle)
234 // Make sure that it is not already marked for deleted
235 !this->to_be_added_info_[i].delete_entry_)
237 if (this->remove_to_be_added_handler_i (i, mask) == -1)
238 error = true;
241 // Only if the number of handlers to be deleted changes do we need
242 // to wakeup the other threads
243 if (original_handle_count < this->handles_to_be_deleted_)
244 changes_required = true;
246 return error ? -1 : 0;
250 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
251 ACE_Reactor_Mask to_be_removed_masks)
253 // I/O entries
254 if (this->current_info_[slot].io_entry_)
256 // See if there are other events that the <Event_Handler> is
257 // interested in
258 this->bit_ops (this->current_info_[slot].network_events_,
259 to_be_removed_masks,
260 ACE_Reactor::CLR_MASK);
262 // Disassociate/Reassociate the event from/with the I/O handle.
263 // This will depend on the value of remaining set of network
264 // events that the <event_handler> is interested in. I don't
265 // think we can do anything about errors here, so I will not
266 // check this.
267 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
268 this->current_handles_[slot],
269 this->current_info_[slot].network_events_);
271 // Normal event entries.
272 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
273 // Preserve DONT_CALL
274 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
275 else
276 // Make sure that the <to_be_removed_masks> is the NULL_MASK
277 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
279 // If this event was marked for suspension, undo the suspension flag
280 // and reduce the to be suspended count.
281 if (this->current_info_[slot].suspend_entry_)
283 // Undo suspension
284 this->current_info_[slot].suspend_entry_ = false;
285 // Decrement the handle count
286 --this->handles_to_be_suspended_;
289 // If there are no more events that the <Event_Handler> is
290 // interested in, or this is a non-I/O entry, schedule the
291 // <Event_Handler> for removal
292 if (this->current_info_[slot].network_events_ == 0)
294 // Mark to be deleted
295 this->current_info_[slot].delete_entry_ = true;
296 // Remember the mask
297 this->current_info_[slot].close_masks_ = to_be_removed_masks;
298 // Increment the handle count
299 ++this->handles_to_be_deleted_;
302 // Since it is not a complete removal, we'll call handle_close
303 // for all the masks that were removed. This does not change
304 // the internal state of the reactor.
306 // Note: this condition only applies to I/O entries
307 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
309 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
310 this->current_info_[slot].event_handler_->handle_close (handle,
311 to_be_removed_masks);
314 return 0;
318 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
319 ACE_Reactor_Mask to_be_removed_masks)
321 // I/O entries
322 if (this->current_suspended_info_[slot].io_entry_)
324 // See if there are other events that the <Event_Handler> is
325 // interested in
326 this->bit_ops (this->current_suspended_info_[slot].network_events_,
327 to_be_removed_masks,
328 ACE_Reactor::CLR_MASK);
330 // Disassociate/Reassociate the event from/with the I/O handle.
331 // This will depend on the value of remaining set of network
332 // events that the <event_handler> is interested in. I don't
333 // think we can do anything about errors here, so I will not
334 // check this.
335 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
336 this->current_suspended_info_[slot].event_handle_,
337 this->current_suspended_info_[slot].network_events_);
339 // Normal event entries.
340 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
341 // Preserve DONT_CALL
342 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
343 else
344 // Make sure that the <to_be_removed_masks> is the NULL_MASK
345 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
347 // If this event was marked for resumption, undo the resumption flag
348 // and reduce the to be resumed count.
349 if (this->current_suspended_info_[slot].resume_entry_)
351 // Undo resumption
352 this->current_suspended_info_[slot].resume_entry_ = false;
353 // Decrement the handle count
354 --this->handles_to_be_resumed_;
357 // If there are no more events that the <Event_Handler> is
358 // interested in, or this is a non-I/O entry, schedule the
359 // <Event_Handler> for removal
360 if (this->current_suspended_info_[slot].network_events_ == 0)
362 // Mark to be deleted
363 this->current_suspended_info_[slot].delete_entry_ = true;
364 // Remember the mask
365 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
366 // Increment the handle count
367 ++this->handles_to_be_deleted_;
369 // Since it is not a complete removal, we'll call handle_close for
370 // all the masks that were removed. This does not change the
371 // internal state of the reactor.
373 // Note: this condition only applies to I/O entries
374 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
376 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
377 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
378 to_be_removed_masks);
381 return 0;
385 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
386 ACE_Reactor_Mask to_be_removed_masks)
388 // I/O entries
389 if (this->to_be_added_info_[slot].io_entry_)
391 // See if there are other events that the <Event_Handler> is
392 // interested in
393 this->bit_ops (this->to_be_added_info_[slot].network_events_,
394 to_be_removed_masks,
395 ACE_Reactor::CLR_MASK);
397 // Disassociate/Reassociate the event from/with the I/O handle.
398 // This will depend on the value of remaining set of network
399 // events that the <event_handler> is interested in. I don't
400 // think we can do anything about errors here, so I will not
401 // check this.
402 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
403 this->to_be_added_info_[slot].event_handle_,
404 this->to_be_added_info_[slot].network_events_);
406 // Normal event entries.
407 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
408 // Preserve DONT_CALL
409 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
410 else
411 // Make sure that the <to_be_removed_masks> is the NULL_MASK
412 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
414 // If this event was marked for suspension, undo the suspension flag
415 // and reduce the to be suspended count.
416 if (this->to_be_added_info_[slot].suspend_entry_)
418 // Undo suspension
419 this->to_be_added_info_[slot].suspend_entry_ = false;
420 // Decrement the handle count
421 --this->handles_to_be_suspended_;
424 // If there are no more events that the <Event_Handler> is
425 // interested in, or this is a non-I/O entry, schedule the
426 // <Event_Handler> for removal
427 if (this->to_be_added_info_[slot].network_events_ == 0)
429 // Mark to be deleted
430 this->to_be_added_info_[slot].delete_entry_ = true;
431 // Remember the mask
432 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
433 // Increment the handle count
434 ++this->handles_to_be_deleted_;
436 // Since it is not a complete removal, we'll call handle_close
437 // for all the masks that were removed. This does not change
438 // the internal state of the reactor.
440 // Note: this condition only applies to I/O entries
441 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
443 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
444 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
445 to_be_removed_masks);
448 return 0;
452 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
453 bool &changes_required)
455 size_t i = 0;
457 // Go through all the handles looking for <handle>. Even if we find
458 // it, we continue through the rest of the list since <handle> could
459 // appear multiple times. All handles are checked.
461 // Check the current entries first.
462 for (i = 0; i < this->max_handlep1_; ++i)
463 // Since the handle can either be the event or the I/O handle,
464 // we have to check both
465 if ((this->current_handles_[i] == handle ||
466 this->current_info_[i].io_handle_ == handle) &&
467 // Make sure that it is not already marked for suspension
468 !this->current_info_[i].suspend_entry_)
470 // Mark to be suspended
471 this->current_info_[i].suspend_entry_ = true;
472 // Increment the handle count
473 ++this->handles_to_be_suspended_;
474 // Changes will be required
475 changes_required = true;
478 // Then check the suspended entries.
479 for (i = 0; i < this->suspended_handles_; ++i)
480 // Since the handle can either be the event or the I/O handle,
481 // we have to check both
482 if ((this->current_suspended_info_[i].event_handle_ == handle ||
483 this->current_suspended_info_[i].io_handle_ == handle) &&
484 // Make sure that the resumption is not already undone
485 this->current_suspended_info_[i].resume_entry_)
487 // Undo resumption
488 this->current_suspended_info_[i].resume_entry_ = false;
489 // Decrement the handle count
490 --this->handles_to_be_resumed_;
491 // Changes will be required
492 changes_required = true;
495 // Then check the to_be_added entries.
496 for (i = 0; i < this->handles_to_be_added_; ++i)
497 // Since the handle can either be the event or the I/O handle,
498 // we have to check both
499 if ((this->to_be_added_info_[i].io_handle_ == handle ||
500 this->to_be_added_info_[i].event_handle_ == handle) &&
501 // Make sure that it is not already marked for suspension
502 !this->to_be_added_info_[i].suspend_entry_)
504 // Mark to be suspended
505 this->to_be_added_info_[i].suspend_entry_ = true;
506 // Increment the handle count
507 ++this->handles_to_be_suspended_;
508 // Changes will be required
509 changes_required = true;
512 return 0;
516 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
517 bool &changes_required)
519 size_t i = 0;
521 // Go through all the handles looking for <handle>. Even if we find
522 // it, we continue through the rest of the list since <handle> could
523 // appear multiple times. All handles are checked.
525 // Check the current entries first.
526 for (i = 0; i < this->max_handlep1_; ++i)
527 // Since the handle can either be the event or the I/O handle,
528 // we have to check both
529 if ((this->current_handles_[i] == handle ||
530 this->current_info_[i].io_handle_ == handle) &&
531 // Make sure that the suspension is not already undone
532 this->current_info_[i].suspend_entry_)
534 // Undo suspension
535 this->current_info_[i].suspend_entry_ = false;
536 // Decrement the handle count
537 --this->handles_to_be_suspended_;
538 // Changes will be required
539 changes_required = true;
542 // Then check the suspended entries.
543 for (i = 0; i < this->suspended_handles_; ++i)
544 // Since the handle can either be the event or the I/O handle,
545 // we have to check both
546 if ((this->current_suspended_info_[i].event_handle_ == handle ||
547 this->current_suspended_info_[i].io_handle_ == handle) &&
548 // Make sure that it is not already marked for resumption
549 !this->current_suspended_info_[i].resume_entry_)
551 // Mark to be resumed
552 this->current_suspended_info_[i].resume_entry_ = true;
553 // Increment the handle count
554 ++this->handles_to_be_resumed_;
555 // Changes will be required
556 changes_required = true;
559 // Then check the to_be_added entries.
560 for (i = 0; i < this->handles_to_be_added_; ++i)
561 // Since the handle can either be the event or the I/O handle,
562 // we have to check both
563 if ((this->to_be_added_info_[i].io_handle_ == handle ||
564 this->to_be_added_info_[i].event_handle_ == handle) &&
565 // Make sure that the suspension is not already undone
566 this->to_be_added_info_[i].suspend_entry_)
568 // Undo suspension
569 this->to_be_added_info_[i].suspend_entry_ = false;
570 // Decrement the handle count
571 --this->handles_to_be_suspended_;
572 // Changes will be required
573 changes_required = true;
576 return 0;
579 void
580 ACE_WFMO_Reactor_Handler_Repository::unbind_all ()
583 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
585 bool dummy;
586 size_t i;
588 // Remove all the current handlers
589 for (i = 0; i < this->max_handlep1_; ++i)
590 this->unbind_i (this->current_handles_[i],
591 ACE_Event_Handler::ALL_EVENTS_MASK,
592 dummy);
594 // Remove all the suspended handlers
595 for (i = 0; i < this->suspended_handles_; ++i)
596 this->unbind_i (this->current_suspended_info_[i].event_handle_,
597 ACE_Event_Handler::ALL_EVENTS_MASK,
598 dummy);
600 // Remove all the to_be_added handlers
601 for (i = 0; i < this->handles_to_be_added_; ++i)
602 this->unbind_i (this->to_be_added_info_[i].event_handle_,
603 ACE_Event_Handler::ALL_EVENTS_MASK,
604 dummy);
607 // The guard is released here
609 // Wake up all threads in WaitForMultipleObjects so that they can
610 // reconsult the handle set
611 this->wfmo_reactor_.wakeup_all_threads ();
615 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry,
616 ACE_Event_Handler *event_handler,
617 long network_events,
618 ACE_HANDLE io_handle,
619 ACE_HANDLE event_handle,
620 bool delete_event)
622 if (event_handler == 0)
623 return -1;
625 // Make sure that the <handle> is valid
626 if (event_handle == ACE_INVALID_HANDLE)
627 event_handle = event_handler->get_handle ();
628 if (this->invalid_handle (event_handle))
629 return -1;
631 size_t current_size = this->max_handlep1_ +
632 this->handles_to_be_added_ -
633 this->handles_to_be_deleted_ +
634 this->suspended_handles_;
636 // Make sure that there's room in the table and that total pending
637 // additions should not exceed what the <to_be_added_info_> array
638 // can hold.
639 if (current_size < this->max_size_ &&
640 this->handles_to_be_added_ < this->max_size_)
642 // Cache this set into the <to_be_added_info_>, till we come
643 // around to actually adding this to the <current_info_>
644 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
645 io_entry,
646 event_handler,
647 io_handle,
648 network_events,
649 delete_event);
651 ++this->handles_to_be_added_;
653 event_handler->add_reference ();
655 // Wake up all threads in WaitForMultipleObjects so that they can
656 // reconsult the handle set
657 this->wfmo_reactor_.wakeup_all_threads ();
659 else
661 errno = EMFILE; // File descriptor table is full (better than nothing)
662 return -1;
665 return 0;
669 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos ()
671 // Go through the entire valid array and check for all handles that
672 // have been schedule for deletion
673 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
675 size_t i = 0;
676 while (i < this->max_handlep1_)
678 // This stuff is necessary here, since we should not make
679 // the upcall until all the internal data structures have
680 // been updated. This is to protect against upcalls that
681 // try to deregister again.
682 ACE_HANDLE handle = ACE_INVALID_HANDLE;
683 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
684 ACE_Event_Handler *event_handler = 0;
686 // See if this entry is scheduled for deletion
687 if (this->current_info_[i].delete_entry_)
689 // Calling the <handle_close> method here will ensure that we
690 // will only call it once per deregistering <Event_Handler>.
691 // This is essential in the case when the <Event_Handler> will
692 // do something like delete itself and we have multiple
693 // threads in WFMO_Reactor.
695 // Make sure that the DONT_CALL mask is not set
696 masks = this->current_info_[i].close_masks_;
697 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
699 // Grab the correct handle depending on the type entry
700 if (this->current_info_[i].io_entry_)
701 handle = this->current_info_[i].io_handle_;
702 else
703 handle = this->current_handles_[i];
705 // Event handler
706 event_handler = this->current_info_[i].event_handler_;
709 // If <WFMO_Reactor> created the event, we need to clean it up
710 if (this->current_info_[i].delete_event_)
711 ACE_OS::event_destroy (&this->current_handles_[i]);
713 // Reduce count by one
714 --this->handles_to_be_deleted_;
717 // See if this entry is scheduled for suspension
718 else if (this->current_info_[i].suspend_entry_)
720 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
721 this->current_info_[i]);
722 // Increase number of suspended handles
723 ++this->suspended_handles_;
725 // Reduce count by one
726 --this->handles_to_be_suspended_;
729 // See if this entry is scheduled for deletion or suspension
730 // If so we need to clean up
731 if (this->current_info_[i].delete_entry_ ||
732 this->current_info_[i].suspend_entry_ )
734 size_t last_valid_slot = this->max_handlep1_ - 1;
735 // If this is the last handle in the set, no need to swap
736 // places. Simply remove it.
737 if (i < last_valid_slot)
738 // Swap this handle with the last valid handle
740 // Struct copy
741 this->current_info_[i] =
742 this->current_info_[last_valid_slot];
743 this->current_handles_[i] =
744 this->current_handles_[last_valid_slot];
746 // Reset the info in this slot
747 this->current_info_[last_valid_slot].reset ();
748 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
749 --this->max_handlep1_;
751 else
753 // This current entry is not up for deletion or
754 // suspension. Proceed to the next entry in the current
755 // handles.
756 ++i;
759 // Now that all internal structures have been updated, make
760 // the upcall.
761 if (event_handler != 0)
763 bool const requires_reference_counting =
764 event_handler->reference_counting_policy ().value () ==
765 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
767 event_handler->handle_close (handle, masks);
769 if (requires_reference_counting)
771 event_handler->remove_reference ();
777 return 0;
781 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos ()
783 // Go through the <suspended_handle> array
784 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
786 size_t i = 0;
787 while (i < this->suspended_handles_)
789 // This stuff is necessary here, since we should not make
790 // the upcall until all the internal data structures have
791 // been updated. This is to protect against upcalls that
792 // try to deregister again.
793 ACE_HANDLE handle = ACE_INVALID_HANDLE;
794 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
795 ACE_Event_Handler *event_handler = 0;
797 // See if this entry is scheduled for deletion
798 if (this->current_suspended_info_[i].delete_entry_)
800 // Calling the <handle_close> method here will ensure that we
801 // will only call it once per deregistering <Event_Handler>.
802 // This is essential in the case when the <Event_Handler> will
803 // do something like delete itself and we have multiple
804 // threads in WFMO_Reactor.
806 // Make sure that the DONT_CALL mask is not set
807 masks = this->current_suspended_info_[i].close_masks_;
808 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
810 // Grab the correct handle depending on the type entry
811 if (this->current_suspended_info_[i].io_entry_)
812 handle = this->current_suspended_info_[i].io_handle_;
813 else
814 handle = this->current_suspended_info_[i].event_handle_;
816 // Upcall
817 event_handler = this->current_suspended_info_[i].event_handler_;
820 // If <WFMO_Reactor> created the event, we need to clean it up
821 if (this->current_suspended_info_[i].delete_event_)
822 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
824 // Reduce count by one
825 --this->handles_to_be_deleted_;
828 else if (this->current_suspended_info_[i].resume_entry_)
830 // Add to the end of the current handles set
831 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
832 // Struct copy
833 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
834 ++this->max_handlep1_;
836 // Reduce count by one
837 --this->handles_to_be_resumed_;
840 // If an entry needs to be removed, either because it
841 // was deleted or resumed, remove it now before doing
842 // the upcall.
843 if (this->current_suspended_info_[i].resume_entry_ ||
844 this->current_suspended_info_[i].delete_entry_)
846 size_t last_valid_slot = this->suspended_handles_ - 1;
847 // Net effect is that we're removing an entry and
848 // compressing the list from the end. So, if removing
849 // an entry from the middle, copy the last valid one to the
850 // removed slot. Reset the end and decrement the number
851 // of suspended handles.
852 if (i < last_valid_slot)
853 // Struct copy
854 this->current_suspended_info_[i] =
855 this->current_suspended_info_[last_valid_slot];
856 this->current_suspended_info_[last_valid_slot].reset ();
857 --this->suspended_handles_;
859 else
861 // This current entry is not up for deletion or
862 // resumption. Proceed to the next entry in the
863 // suspended handles.
864 ++i;
867 // Now that all internal structures have been updated, make
868 // the upcall.
869 if (event_handler != 0)
871 int requires_reference_counting =
872 event_handler->reference_counting_policy ().value () ==
873 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
875 event_handler->handle_close (handle, masks);
877 if (requires_reference_counting)
879 event_handler->remove_reference ();
885 return 0;
889 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos ()
891 // Go through the <to_be_added_*> arrays
892 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
894 // This stuff is necessary here, since we should not make
895 // the upcall until all the internal data structures have
896 // been updated. This is to protect against upcalls that
897 // try to deregister again.
898 ACE_HANDLE handle = ACE_INVALID_HANDLE;
899 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
900 ACE_Event_Handler *event_handler = 0;
902 // See if this entry is scheduled for deletion
903 if (this->to_be_added_info_[i].delete_entry_)
905 // Calling the <handle_close> method here will ensure that we
906 // will only call it once per deregistering <Event_Handler>.
907 // This is essential in the case when the <Event_Handler> will
908 // do something like delete itself and we have multiple
909 // threads in WFMO_Reactor.
911 // Make sure that the DONT_CALL mask is not set
912 masks = this->to_be_added_info_[i].close_masks_;
913 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
915 // Grab the correct handle depending on the type entry
916 if (this->to_be_added_info_[i].io_entry_)
917 handle = this->to_be_added_info_[i].io_handle_;
918 else
919 handle = this->to_be_added_info_[i].event_handle_;
921 // Upcall
922 event_handler = this->to_be_added_info_[i].event_handler_;
925 // If <WFMO_Reactor> created the event, we need to clean it up
926 if (this->to_be_added_info_[i].delete_event_)
927 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
929 // Reduce count by one
930 --this->handles_to_be_deleted_;
933 // See if this entry is scheduled for suspension
934 else if (this->to_be_added_info_[i].suspend_entry_)
936 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
937 this->to_be_added_info_[i]);
938 // Increase number of suspended handles
939 ++this->suspended_handles_;
941 // Reduce count by one
942 --this->handles_to_be_suspended_;
945 // If neither of the two flags are on, add to current
946 else
948 // Add to the end of the current handles set
949 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
950 // Struct copy
951 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
952 ++this->max_handlep1_;
955 // Reset the <to_be_added_info_>
956 this->to_be_added_info_[i].reset ();
958 // Now that all internal structures have been updated, make the
959 // upcall.
960 if (event_handler != 0)
962 int requires_reference_counting =
963 event_handler->reference_counting_policy ().value () ==
964 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
966 event_handler->handle_close (handle, masks);
968 if (requires_reference_counting)
970 event_handler->remove_reference ();
975 // Since all to be added handles have been taken care of, reset the
976 // counter
977 this->handles_to_be_added_ = 0;
979 return 0;
982 void
983 ACE_WFMO_Reactor_Handler_Repository::dump () const
985 #if defined (ACE_HAS_DUMP)
986 size_t i = 0;
988 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
990 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
992 ACELIB_DEBUG ((LM_DEBUG,
993 ACE_TEXT ("Max size = %d\n"),
994 this->max_size_));
996 ACELIB_DEBUG ((LM_DEBUG,
997 ACE_TEXT ("Current info table\n\n")));
998 ACELIB_DEBUG ((LM_DEBUG,
999 ACE_TEXT ("\tSize = %d\n"),
1000 this->max_handlep1_));
1001 ACELIB_DEBUG ((LM_DEBUG,
1002 ACE_TEXT ("\tHandles to be suspended = %d\n"),
1003 this->handles_to_be_suspended_));
1005 for (i = 0; i < this->max_handlep1_; ++i)
1006 this->current_info_[i].dump (this->current_handles_[i]);
1008 ACELIB_DEBUG ((LM_DEBUG,
1009 ACE_TEXT ("\n")));
1011 ACELIB_DEBUG ((LM_DEBUG,
1012 ACE_TEXT ("To-be-added info table\n\n")));
1013 ACELIB_DEBUG ((LM_DEBUG,
1014 ACE_TEXT ("\tSize = %d\n"),
1015 this->handles_to_be_added_));
1017 for (i = 0; i < this->handles_to_be_added_; ++i)
1018 this->to_be_added_info_[i].dump ();
1020 ACELIB_DEBUG ((LM_DEBUG,
1021 ACE_TEXT ("\n")));
1023 ACELIB_DEBUG ((LM_DEBUG,
1024 ACE_TEXT ("Suspended info table\n\n")));
1025 ACELIB_DEBUG ((LM_DEBUG,
1026 ACE_TEXT ("\tSize = %d\n"),
1027 this->suspended_handles_));
1028 ACELIB_DEBUG ((LM_DEBUG,
1029 ACE_TEXT ("\tHandles to be resumed = %d\n"),
1030 this->handles_to_be_resumed_));
1032 for (i = 0; i < this->suspended_handles_; ++i)
1033 this->current_suspended_info_[i].dump ();
1035 ACELIB_DEBUG ((LM_DEBUG,
1036 ACE_TEXT ("\n")));
1038 ACELIB_DEBUG ((LM_DEBUG,
1039 ACE_TEXT ("Total handles to be deleted = %d\n"),
1040 this->handles_to_be_deleted_));
1042 ACELIB_DEBUG ((LM_DEBUG,
1043 ACE_END_DUMP));
1044 #endif /* ACE_HAS_DUMP */
1047 /************************************************************/
1050 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
1052 ACE_NOTSUP_RETURN (-1);
1055 #if defined (_MSC_VER)
1056 # pragma warning (push)
1057 # pragma warning (disable:4355) /* Use of 'this' in initializer list */
1058 # endif
1059 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
1060 ACE_Timer_Queue *tq,
1061 ACE_Reactor_Notify *notify)
1062 : signal_handler_ (0),
1063 delete_signal_handler_ (false),
1064 timer_queue_ (0),
1065 delete_timer_queue_ (false),
1066 delete_handler_rep_ (false),
1067 notify_handler_ (0),
1068 delete_notify_handler_ (false),
1069 lock_adapter_ (lock_),
1070 handler_rep_ (*this),
1071 // this event is initially signaled
1072 ok_to_wait_ (1),
1073 // this event is initially unsignaled
1074 wakeup_all_threads_ (0),
1075 // this event is initially unsignaled
1076 waiting_to_change_state_ (0),
1077 active_threads_ (0),
1078 owner_ (ACE_Thread::self ()),
1079 new_owner_ (0),
1080 change_state_thread_ (0),
1081 open_for_business_ (false),
1082 deactivated_ (0)
1084 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
1085 ACELIB_ERROR ((LM_ERROR,
1086 ACE_TEXT ("%p\n"),
1087 ACE_TEXT ("WFMO_Reactor")));
1090 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
1091 int unused,
1092 ACE_Sig_Handler *sh,
1093 ACE_Timer_Queue *tq,
1094 ACE_Reactor_Notify *notify)
1095 : signal_handler_ (0),
1096 delete_signal_handler_ (false),
1097 timer_queue_ (0),
1098 delete_timer_queue_ (false),
1099 delete_handler_rep_ (false),
1100 notify_handler_ (0),
1101 delete_notify_handler_ (false),
1102 lock_adapter_ (lock_),
1103 handler_rep_ (*this),
1104 // this event is initially signaled
1105 ok_to_wait_ (1),
1106 // this event is initially unsignaled
1107 wakeup_all_threads_ (0),
1108 // this event is initially unsignaled
1109 waiting_to_change_state_ (0),
1110 active_threads_ (0),
1111 owner_ (ACE_Thread::self ()),
1112 new_owner_ (0),
1113 change_state_thread_ (0),
1114 open_for_business_ (false),
1115 deactivated_ (0)
1117 ACE_UNUSED_ARG (unused);
1119 if (this->open (size, 0, sh, tq, 0, notify) == -1)
1120 ACELIB_ERROR ((LM_ERROR,
1121 ACE_TEXT ("%p\n"),
1122 ACE_TEXT ("WFMO_Reactor")));
1124 #if defined (_MSC_VER)
1125 # pragma warning (pop)
1126 #endif
1129 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
1131 return -1;
1135 ACE_WFMO_Reactor::open (size_t size,
1136 bool,
1137 ACE_Sig_Handler *sh,
1138 ACE_Timer_Queue *tq,
1139 int,
1140 ACE_Reactor_Notify *notify)
1142 // This GUARD is necessary since we are updating shared state.
1143 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
1145 // If we are already open, return -1
1146 if (this->open_for_business_)
1147 return -1;
1149 // Timer Queue
1150 if (this->delete_timer_queue_)
1151 delete this->timer_queue_;
1152 else if (this->timer_queue_)
1153 this->timer_queue_->close ();
1155 if (tq == 0)
1157 ACE_NEW_RETURN (this->timer_queue_,
1158 ACE_Timer_Heap,
1159 -1);
1160 this->delete_timer_queue_ = true;
1162 else
1164 this->timer_queue_ = tq;
1165 this->delete_timer_queue_ = false;
1168 // Signal Handler
1169 if (this->delete_signal_handler_)
1170 delete this->signal_handler_;
1172 if (sh == 0)
1174 ACE_NEW_RETURN (this->signal_handler_,
1175 ACE_Sig_Handler,
1176 -1);
1177 this->delete_signal_handler_ = true;
1179 else
1181 this->signal_handler_ = sh;
1182 this->delete_signal_handler_ = false;
1185 // Setup the atomic wait array (used later in <handle_events>)
1186 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
1187 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
1189 // Prevent memory leaks when the ACE_WFMO_Reactor is reopened.
1190 if (this->delete_handler_rep_)
1192 if (this->handler_rep_.changes_required ())
1194 // Make necessary changes to the handler repository
1195 this->handler_rep_.make_changes ();
1196 // Turn off <wakeup_all_threads_> since all necessary changes
1197 // have completed
1198 this->wakeup_all_threads_.reset ();
1201 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
1204 // Open the handle repository. Two additional handles for internal
1205 // purposes
1206 if (this->handler_rep_.open (size + 2) == -1)
1207 ACELIB_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
1208 ACE_TEXT ("opening handler repository")),
1209 -1);
1210 else
1211 this->delete_handler_rep_ = true;
1213 if (this->notify_handler_ != 0 && this->delete_notify_handler_)
1214 delete this->notify_handler_;
1216 this->notify_handler_ = notify;
1218 if (this->notify_handler_ == 0)
1220 ACE_NEW_RETURN (this->notify_handler_,
1221 ACE_WFMO_Reactor_Notify,
1222 -1);
1224 if (this->notify_handler_ == 0)
1225 return -1;
1226 else
1227 this->delete_notify_handler_ = true;
1230 /* NOTE */
1231 // The order of the following two registrations is very important
1233 // Open the notification handler
1234 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
1235 ACELIB_ERROR_RETURN ((LM_ERROR,
1236 ACE_TEXT ("%p\n"),
1237 ACE_TEXT ("opening notify handler ")),
1238 -1);
1240 // Register for <wakeup_all_threads> event
1241 if (this->register_handler (&this->wakeup_all_threads_handler_,
1242 this->wakeup_all_threads_.handle ()) == -1)
1243 ACELIB_ERROR_RETURN ((LM_ERROR,
1244 ACE_TEXT ("%p\n"),
1245 ACE_TEXT ("registering thread wakeup handler")),
1246 -1);
1248 // Since we have added two handles into the handler repository,
1249 // update the <handler_repository_>
1250 if (this->handler_rep_.changes_required ())
1252 // Make necessary changes to the handler repository
1253 this->handler_rep_.make_changes ();
1254 // Turn off <wakeup_all_threads_> since all necessary changes
1255 // have completed
1256 this->wakeup_all_threads_.reset ();
1259 // We are open for business
1260 this->open_for_business_ = true;
1262 return 0;
1266 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
1268 if (this->signal_handler_ != 0 && this->delete_signal_handler_)
1269 delete this->signal_handler_;
1270 this->signal_handler_ = signal_handler;
1271 this->delete_signal_handler_ = false;
1272 return 0;
1275 ACE_Timer_Queue *
1276 ACE_WFMO_Reactor::timer_queue () const
1278 return this->timer_queue_;
1282 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
1284 if (this->delete_timer_queue_)
1286 delete this->timer_queue_;
1288 else if (this->timer_queue_)
1290 this->timer_queue_->close ();
1292 this->timer_queue_ = tq;
1293 this->delete_timer_queue_ = false;
1294 return 0;
1298 ACE_WFMO_Reactor::close ()
1300 // This GUARD is necessary since we are updating shared state.
1301 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
1303 // If we are already closed, return error
1304 if (!this->open_for_business_)
1305 return -1;
1307 // We are now closed
1308 this->open_for_business_ = false;
1309 // This will unregister all handles
1310 this->handler_rep_.close ();
1312 // Make necessary changes to the handler repository that we caused
1313 // by the above actions. Someone who called close() is expecting that
1314 // things will be tidied up upon return.
1315 this->handler_rep_.make_changes ();
1317 if (this->delete_timer_queue_)
1319 delete this->timer_queue_;
1320 this->timer_queue_ = 0;
1321 this->delete_timer_queue_ = false;
1323 else if (this->timer_queue_)
1325 this->timer_queue_->close ();
1326 this->timer_queue_ = 0;
1329 if (this->delete_signal_handler_)
1331 delete this->signal_handler_;
1332 this->signal_handler_ = 0;
1333 this->delete_signal_handler_ = false;
1336 if (this->delete_notify_handler_)
1338 delete this->notify_handler_;
1339 this->notify_handler_ = 0;
1340 this->delete_notify_handler_ = false;
1343 return 0;
1346 ACE_WFMO_Reactor::~ACE_WFMO_Reactor ()
1348 // Assumption: No threads are left in the Reactor when this method
1349 // is called (i.e., active_threads_ == 0)
1351 // Close down
1352 this->close ();
1356 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
1357 ACE_HANDLE io_handle,
1358 ACE_Event_Handler *event_handler,
1359 ACE_Reactor_Mask new_masks)
1361 // If this is a Winsock 1 system, the underlying event assignment will
1362 // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
1363 // reacting to socket activity.
1365 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
1367 ACE_UNUSED_ARG (event_handle);
1368 ACE_UNUSED_ARG (io_handle);
1369 ACE_UNUSED_ARG (event_handler);
1370 ACE_UNUSED_ARG (new_masks);
1371 ACE_NOTSUP_RETURN (-1);
1373 #else
1375 // Make sure that the <handle> is valid
1376 if (io_handle == ACE_INVALID_HANDLE)
1377 io_handle = event_handler->get_handle ();
1379 if (this->handler_rep_.invalid_handle (io_handle))
1381 errno = ERROR_INVALID_HANDLE;
1382 return -1;
1385 long new_network_events = 0;
1386 bool delete_event = false;
1387 std::unique_ptr <ACE_Auto_Event> event;
1389 // Look up the repository to see if the <event_handler> is already
1390 // there.
1391 ACE_Reactor_Mask old_masks;
1392 int found = this->handler_rep_.modify_network_events_i (io_handle,
1393 new_masks,
1394 old_masks,
1395 new_network_events,
1396 event_handle,
1397 delete_event,
1398 ACE_Reactor::ADD_MASK);
1400 // Check to see if the user passed us a valid event; If not then we
1401 // need to create one
1402 if (event_handle == ACE_INVALID_HANDLE)
1404 std::unique_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
1405 event = std::move(tmp);
1406 event_handle = event->handle ();
1407 delete_event = true;
1410 int result = ::WSAEventSelect ((SOCKET) io_handle,
1411 event_handle,
1412 new_network_events);
1413 // If we had found the <Event_Handler> there is nothing more to do
1414 if (found)
1415 return result;
1416 else if (result != SOCKET_ERROR &&
1417 this->handler_rep_.bind_i (1,
1418 event_handler,
1419 new_network_events,
1420 io_handle,
1421 event_handle,
1422 delete_event) != -1)
1424 // The <event_handler> was not found in the repository, add to
1425 // the repository.
1426 if (delete_event)
1428 // Clear out the handle in the ACE_Auto_Event so that when
1429 // it is destroyed, the handle isn't closed out from under
1430 // the reactor. After setting it, running down the event
1431 // (via auto_ptr<> event, above) at function return will
1432 // cause an error because it'll try to close an invalid handle.
1433 // To avoid that smashing the errno value, save the errno
1434 // here, explicitly remove the event so the dtor won't do it
1435 // again, then restore errno.
1436 ACE_Errno_Guard guard (errno);
1437 event->handle (ACE_INVALID_HANDLE);
1438 event->remove ();
1440 return 0;
1442 else
1443 return -1;
1445 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
1450 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
1451 ACE_Reactor_Mask new_masks,
1452 int operation)
1454 // Make sure that the <handle> is valid
1455 if (this->handler_rep_.invalid_handle (io_handle))
1456 return -1;
1458 long new_network_events = 0;
1459 bool delete_event = false;
1460 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
1462 // Look up the repository to see if the <Event_Handler> is already
1463 // there.
1464 ACE_Reactor_Mask old_masks;
1465 int found = this->handler_rep_.modify_network_events_i (io_handle,
1466 new_masks,
1467 old_masks,
1468 new_network_events,
1469 event_handle,
1470 delete_event,
1471 operation);
1472 if (found)
1474 int result = ::WSAEventSelect ((SOCKET) io_handle,
1475 event_handle,
1476 new_network_events);
1477 if (result == 0)
1478 return old_masks;
1479 else
1480 return result;
1482 else
1483 return -1;
1488 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
1489 ACE_Reactor_Mask new_masks,
1490 ACE_Reactor_Mask &old_masks,
1491 long &new_network_events,
1492 ACE_HANDLE &event_handle,
1493 bool &delete_event,
1494 int operation)
1496 long *modified_network_events = &new_network_events;
1497 int found = 0;
1498 size_t i;
1500 // First go through the current entries
1502 // Look for all entries in the current handles for matching handle
1503 // (except those that have been scheduled for deletion)
1504 for (i = 0; i < this->max_handlep1_ && !found; ++i)
1505 if (io_handle == this->current_info_[i].io_handle_ &&
1506 !this->current_info_[i].delete_entry_)
1508 found = 1;
1509 modified_network_events = &this->current_info_[i].network_events_;
1510 delete_event = this->current_info_[i].delete_event_;
1511 event_handle = this->current_handles_[i];
1514 // Then pass through the suspended handles
1516 // Look for all entries in the suspended handles for matching handle
1517 // (except those that have been scheduled for deletion)
1518 for (i = 0; i < this->suspended_handles_ && !found; ++i)
1519 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
1520 !this->current_suspended_info_[i].delete_entry_)
1522 found = 1;
1523 modified_network_events = &this->current_suspended_info_[i].network_events_;
1524 delete_event = this->current_suspended_info_[i].delete_event_;
1525 event_handle = this->current_suspended_info_[i].event_handle_;
1528 // Then check the to_be_added handles
1530 // Look for all entries in the to_be_added handles for matching
1531 // handle (except those that have been scheduled for deletion)
1532 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
1533 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
1534 !this->to_be_added_info_[i].delete_entry_)
1536 found = 1;
1537 modified_network_events = &this->to_be_added_info_[i].network_events_;
1538 delete_event = this->to_be_added_info_[i].delete_event_;
1539 event_handle = this->to_be_added_info_[i].event_handle_;
1542 old_masks = this->bit_ops (*modified_network_events,
1543 new_masks,
1544 operation);
1546 new_network_events = *modified_network_events;
1548 return found;
1551 ACE_Event_Handler *
1552 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
1554 long existing_masks_ignored = 0;
1555 return this->handler (handle, existing_masks_ignored);
1558 ACE_Event_Handler *
1559 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
1560 long &existing_masks)
1562 int found = 0;
1563 size_t i = 0;
1564 ACE_Event_Handler *event_handler = 0;
1565 existing_masks = 0;
1567 // Look for the handle first
1569 // First go through the current entries
1571 // Look for all entries in the current handles for matching handle
1572 // (except those that have been scheduled for deletion)
1573 for (i = 0; i < this->max_handlep1_ && !found; ++i)
1574 if ((handle == this->current_info_[i].io_handle_ ||
1575 handle == this->current_handles_[i]) &&
1576 !this->current_info_[i].delete_entry_)
1578 found = 1;
1579 event_handler = this->current_info_[i].event_handler_;
1580 existing_masks = this->current_info_[i].network_events_;
1583 // Then pass through the suspended handles
1585 // Look for all entries in the suspended handles for matching handle
1586 // (except those that have been scheduled for deletion)
1587 for (i = 0; i < this->suspended_handles_ && !found; ++i)
1588 if ((handle == this->current_suspended_info_[i].io_handle_ ||
1589 handle == this->current_suspended_info_[i].event_handle_) &&
1590 !this->current_suspended_info_[i].delete_entry_)
1592 found = 1;
1593 event_handler = this->current_suspended_info_[i].event_handler_;
1594 existing_masks = this->current_suspended_info_[i].network_events_;
1597 // Then check the to_be_added handles
1599 // Look for all entries in the to_be_added handles for matching
1600 // handle (except those that have been scheduled for deletion)
1601 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
1602 if ((handle == this->to_be_added_info_[i].io_handle_ ||
1603 handle == this->to_be_added_info_[i].event_handle_) &&
1604 !this->to_be_added_info_[i].delete_entry_)
1606 found = 1;
1607 event_handler = this->to_be_added_info_[i].event_handler_;
1608 existing_masks = this->to_be_added_info_[i].network_events_;
1611 if (event_handler)
1612 event_handler->add_reference ();
1614 return event_handler;
1618 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
1619 ACE_Reactor_Mask user_masks,
1620 ACE_Event_Handler **user_event_handler)
1622 long existing_masks = 0;
1623 bool found = false;
1625 ACE_Event_Handler_var safe_event_handler =
1626 this->handler (handle, existing_masks);
1628 if (safe_event_handler.handler ())
1629 found = true;
1631 if (!found)
1632 return -1;
1634 // Otherwise, make sure that the masks that the user is looking for
1635 // are on.
1636 if (found &&
1637 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
1638 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
1639 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
1640 found = false;
1642 if (found &&
1643 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
1644 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
1645 found = false;
1647 if (found &&
1648 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
1649 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
1650 found = false;
1652 if (found &&
1653 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
1654 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
1655 found = false;
1657 if (found &&
1658 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
1659 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
1660 found = false;
1662 if (found &&
1663 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
1664 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
1665 found = false;
1666 if (found &&
1667 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
1668 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
1669 found = false;
1671 if (found &&
1672 user_event_handler)
1673 *user_event_handler = safe_event_handler.release ();
1675 if (found)
1676 return 0;
1677 else
1678 return -1;
1681 // Waits for and dispatches all events. Returns -1 on error, 0 if
1682 // max_wait_time expired, or the number of events that were dispatched.
1684 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
1685 int alertable)
1687 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
1689 // Make sure we are not closed
1690 if (!this->open_for_business_ || this->deactivated_)
1692 errno = ESHUTDOWN;
1693 return -1;
1696 // Stash the current time -- the destructor of this object will
1697 // automatically compute how much time elapsed since this method was
1698 // called.
1699 ACE_Countdown_Time countdown (max_wait_time);
1701 int result;
1704 // Check to see if it is ok to enter ::WaitForMultipleObjects
1705 // This will acquire <this->lock_> on success On failure, the
1706 // lock will not be acquired
1707 result = this->ok_to_wait (max_wait_time, alertable);
1708 if (result != 1)
1709 return result;
1711 // Increment the number of active threads
1712 ++this->active_threads_;
1714 // Release the <lock_>
1715 this->lock_.release ();
1717 // Update the countdown to reflect time waiting to play with the
1718 // mut and event.
1719 countdown.update ();
1721 // Calculate timeout
1722 int timeout = this->calculate_timeout (max_wait_time);
1724 // Wait for event to happen
1725 DWORD wait_status = this->wait_for_multiple_events (timeout,
1726 alertable);
1728 // Upcall
1729 result = this->safe_dispatch (wait_status);
1730 if (0 == result)
1732 // wait_for_multiple_events timed out without dispatching
1733 // anything. Because of rounding and conversion errors and
1734 // such, it could be that the wait loop timed out, but
1735 // the timer queue said it wasn't quite ready to expire a
1736 // timer. In this case, max_wait_time won't have quite been
1737 // reduced to 0, and we need to go around again. If max_wait_time
1738 // is all the way to 0, just return, as the entire time the
1739 // caller wanted to wait has been used up.
1740 countdown.update (); // Reflect time waiting for events
1741 if (0 == max_wait_time || max_wait_time->usec () == 0)
1742 break;
1745 while (result == 0);
1747 return result;
1751 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
1752 int alertable)
1754 // Calculate the max time we should spend here
1756 // Note: There is really no need to involve the <timer_queue_> here
1757 // because even if a timeout in the <timer_queue_> does expire we
1758 // will not be able to dispatch it
1760 // We need to wait for both the <lock_> and <ok_to_wait_> event.
1761 // Use WaitForMultipleObjects() to wait for both atomically.
1762 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
1763 DWORD result = 0;
1764 while (1)
1766 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
1767 this->atomic_wait_array_,
1768 TRUE,
1769 timeout,
1770 alertable);
1772 if (result != WAIT_IO_COMPLETION)
1773 break;
1776 switch (result)
1778 case WAIT_TIMEOUT:
1779 errno = ETIME;
1780 return 0;
1781 case WAIT_FAILED:
1782 case WAIT_ABANDONED_0:
1783 ACE_OS::set_errno_to_last_error ();
1784 return -1;
1785 default:
1786 break;
1789 // It is ok to enter ::WaitForMultipleObjects
1790 return 1;
1793 DWORD
1794 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
1795 int alertable)
1797 // Wait for any of handles_ to be active, or until timeout expires.
1798 // If <alertable> is enabled allow asynchronous completion of
1799 // ReadFile and WriteFile operations.
1800 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
1801 this->handler_rep_.handles (),
1802 FALSE,
1803 timeout,
1804 alertable);
1807 DWORD
1808 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
1810 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
1811 this->handler_rep_.handles () + slot,
1812 FALSE,
1817 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
1819 ACE_Time_Value *time = 0;
1820 if (this->owner_ == ACE_Thread::self ())
1821 time = this->timer_queue_->calculate_timeout (max_wait_time);
1822 else
1823 time = max_wait_time;
1825 if (time == 0)
1826 return INFINITE;
1827 else
1828 return time->msec ();
1832 ACE_WFMO_Reactor::expire_timers ()
1834 // If "owner" thread
1835 if (ACE_Thread::self () == this->owner_)
1837 // expire all pending timers.
1838 return this->timer_queue_->expire ();
1840 else
1841 // Nothing to expire
1842 return 0;
1846 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
1848 // Expire timers
1849 int handlers_dispatched = this->expire_timers ();
1851 switch (wait_status)
1853 case WAIT_FAILED: // Failure.
1854 ACE_OS::set_errno_to_last_error ();
1855 return -1;
1856 case WAIT_TIMEOUT: // Timeout.
1857 errno = ETIME;
1858 return handlers_dispatched;
1859 case WAIT_IO_COMPLETION: // APC.
1860 return handlers_dispatched;
1861 default: // Dispatch.
1862 // We'll let dispatch worry about abandoned mutes.
1863 handlers_dispatched += this->dispatch_handles (wait_status);
1864 return handlers_dispatched;
1868 // Dispatches any active handles from <handles_[slot]> to
1869 // <handles_[max_handlep1_]>, polling through our handle set looking
1870 // for active handles.
1872 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
1874 // dispatch_slot is the absolute slot. Only += is used to
1875 // increment it.
1876 DWORD dispatch_slot = 0;
1878 // Cache this value, this is the absolute value.
1879 DWORD const max_handlep1 = this->handler_rep_.max_handlep1 ();
1881 // nCount starts off at <max_handlep1>, this is a transient count of
1882 // handles last waited on.
1883 DWORD nCount = max_handlep1;
1885 for (int number_of_handlers_dispatched = 1;
1887 ++number_of_handlers_dispatched)
1889 const bool ok = (
1890 #if ! defined(__BORLANDC__) \
1891 && !defined (__MINGW32__) \
1892 && !defined (_MSC_VER)
1893 // wait_status is unsigned in Borland, Green Hills,
1894 // mingw32 and MSVC++
1895 // This >= is always true, with a warning.
1896 wait_status >= WAIT_OBJECT_0 &&
1897 #endif
1898 wait_status <= (WAIT_OBJECT_0 + nCount));
1900 if (ok)
1901 dispatch_slot += wait_status - WAIT_OBJECT_0;
1902 else
1903 // Otherwise, a handle was abandoned.
1904 dispatch_slot += wait_status - WAIT_ABANDONED_0;
1906 // Dispatch handler
1907 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
1908 return -1;
1910 // Increment slot
1911 ++dispatch_slot;
1913 // We're done.
1914 if (dispatch_slot >= max_handlep1)
1915 return number_of_handlers_dispatched;
1917 // Readjust nCount
1918 nCount = max_handlep1 - dispatch_slot;
1920 // Check the remaining handles
1921 wait_status = this->poll_remaining_handles (dispatch_slot);
1922 switch (wait_status)
1924 case WAIT_FAILED: // Failure.
1925 ACE_OS::set_errno_to_last_error ();
1926 /* FALLTHRU */
1927 case WAIT_TIMEOUT:
1928 // There are no more handles ready, we can return.
1929 return number_of_handlers_dispatched;
1935 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
1936 DWORD max_handlep1)
1938 // Check if there are window messages that need to be dispatched
1939 if (slot == max_handlep1)
1940 return this->dispatch_window_messages ();
1942 // Dispatch the handler if it has not been scheduled for deletion.
1943 // Note that this is a very week test if there are multiple threads
1944 // dispatching this slot as no locks are held here. Generally, you
1945 // do not want to do something like deleting the this pointer in
1946 // handle_close() if you have registered multiple times and there is
1947 // more than one thread in WFMO_Reactor->handle_events().
1948 else if (!this->handler_rep_.scheduled_for_deletion (slot))
1950 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
1952 if (this->handler_rep_.current_info ()[slot].io_entry_)
1953 return this->complex_dispatch_handler (slot,
1954 event_handle);
1955 else
1956 return this->simple_dispatch_handler (slot,
1957 event_handle);
1959 else
1960 // The handle was scheduled for deletion, so we will skip it.
1961 return 0;
1965 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
1966 ACE_HANDLE event_handle)
1968 // This dispatch is used for non-I/O entires
1970 // Assign the ``signaled'' HANDLE so that callers can get it.
1971 // siginfo_t is an ACE - specific fabrication. Constructor exists.
1972 siginfo_t sig (event_handle);
1974 ACE_Event_Handler *event_handler =
1975 this->handler_rep_.current_info ()[slot].event_handler_;
1977 int requires_reference_counting =
1978 event_handler->reference_counting_policy ().value () ==
1979 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
1981 if (requires_reference_counting)
1983 event_handler->add_reference ();
1986 // Upcall
1987 if (event_handler->handle_signal (0, &sig) == -1)
1988 this->handler_rep_.unbind (event_handle,
1989 ACE_Event_Handler::NULL_MASK);
1991 // Call remove_reference() if needed.
1992 if (requires_reference_counting)
1994 event_handler->remove_reference ();
1997 return 0;
2001 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
2002 ACE_HANDLE event_handle)
2004 // This dispatch is used for I/O entires.
2006 ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
2007 this->handler_rep_.current_info ()[slot];
2009 WSANETWORKEVENTS events;
2010 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
2011 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
2012 event_handle,
2013 &events) == SOCKET_ERROR)
2014 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
2015 else
2017 // Prepare for upcalls. Clear the bits from <events> representing
2018 // events the handler is not interested in. If there are any left,
2019 // do the upcall(s). upcall will replace events.lNetworkEvents
2020 // with bits representing any functions that requested a repeat
2021 // callback before checking handles again. In this case, continue
2022 // to call back unless the handler is unregistered as a result of
2023 // one of the upcalls. The way this is written, the upcalls will
2024 // keep being done even if one or more upcalls reported problems.
2025 // In practice this may turn out not so good, but let's see. If any
2026 // problems, please notify Steve Huston <shuston@riverace.com>
2027 // before or after you change this code.
2028 events.lNetworkEvents &= current_info.network_events_;
2029 while (events.lNetworkEvents != 0)
2031 ACE_Event_Handler *event_handler =
2032 current_info.event_handler_;
2034 int reference_counting_required =
2035 event_handler->reference_counting_policy ().value () ==
2036 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
2038 // Call add_reference() if needed.
2039 if (reference_counting_required)
2041 event_handler->add_reference ();
2044 // Upcall
2045 problems |= this->upcall (current_info.event_handler_,
2046 current_info.io_handle_,
2047 events);
2049 // Call remove_reference() if needed.
2050 if (reference_counting_required)
2052 event_handler->remove_reference ();
2055 if (this->handler_rep_.scheduled_for_deletion (slot))
2056 break;
2060 if (problems != ACE_Event_Handler::NULL_MASK
2061 && !this->handler_rep_.scheduled_for_deletion (slot) )
2062 this->handler_rep_.unbind (event_handle, problems);
2064 return 0;
2067 ACE_Reactor_Mask
2068 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
2069 ACE_HANDLE io_handle,
2070 WSANETWORKEVENTS &events)
2072 // This method figures out what exactly has happened to the socket
2073 // and then calls appropriate methods.
2074 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
2076 // Go through the events and do the indicated upcalls. If the handler
2077 // doesn't want to be called back, clear the bit for that event.
2078 // At the end, set the bits back to <events> to request a repeat call.
2080 long actual_events = events.lNetworkEvents;
2081 int action;
2083 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
2085 action = event_handler->handle_output (io_handle);
2086 if (action <= 0)
2088 ACE_CLR_BITS (actual_events, FD_WRITE);
2089 if (action == -1)
2090 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
2094 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
2096 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
2098 // Successful connect
2099 action = event_handler->handle_output (io_handle);
2100 if (action <= 0)
2102 ACE_CLR_BITS (actual_events, FD_CONNECT);
2103 if (action == -1)
2104 ACE_SET_BITS (problems,
2105 ACE_Event_Handler::CONNECT_MASK);
2108 // Unsuccessful connect
2109 else
2111 action = event_handler->handle_input (io_handle);
2112 if (action <= 0)
2114 ACE_CLR_BITS (actual_events, FD_CONNECT);
2115 if (action == -1)
2116 ACE_SET_BITS (problems,
2117 ACE_Event_Handler::CONNECT_MASK);
2122 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
2124 action = event_handler->handle_exception (io_handle);
2125 if (action <= 0)
2127 ACE_CLR_BITS (actual_events, FD_OOB);
2128 if (action == -1)
2129 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
2133 if (ACE_BIT_ENABLED (actual_events, FD_READ))
2135 action = event_handler->handle_input (io_handle);
2136 if (action <= 0)
2138 ACE_CLR_BITS (actual_events, FD_READ);
2139 if (action == -1)
2140 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
2144 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
2145 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
2147 action = event_handler->handle_input (io_handle);
2148 if (action <= 0)
2150 ACE_CLR_BITS (actual_events, FD_CLOSE);
2151 if (action == -1)
2152 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
2156 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
2158 action = event_handler->handle_input (io_handle);
2159 if (action <= 0)
2161 ACE_CLR_BITS (actual_events, FD_ACCEPT);
2162 if (action == -1)
2163 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
2167 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
2169 action = event_handler->handle_qos (io_handle);
2170 if (action <= 0)
2172 ACE_CLR_BITS (actual_events, FD_QOS);
2173 if (action == -1)
2174 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
2178 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
2180 action = event_handler->handle_group_qos (io_handle);
2181 if (action <= 0)
2183 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
2184 if (action == -1)
2185 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
2189 events.lNetworkEvents = actual_events;
2190 return problems;
2195 ACE_WFMO_Reactor::update_state ()
2197 // This GUARD is necessary since we are updating shared state.
2198 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
2200 // Decrement active threads
2201 --this->active_threads_;
2203 // Check if the state of the handler repository has changed or new
2204 // owner has to be set
2205 if (this->handler_rep_.changes_required () || this->new_owner ())
2207 if (this->change_state_thread_ == 0)
2208 // Try to become the thread which will be responsible for the
2209 // changes
2211 this->change_state_thread_ = ACE_Thread::self ();
2212 // Make sure no new threads are allowed to enter
2213 this->ok_to_wait_.reset ();
2215 if (this->active_threads_ > 0)
2216 // Check for other active threads
2218 // Wake up all other threads
2219 this->wakeup_all_threads_.signal ();
2220 // Release <lock_>
2221 monitor.release ();
2222 // Go to sleep waiting for all other threads to get done
2223 this->waiting_to_change_state_.wait ();
2224 // Re-acquire <lock_> again
2225 monitor.acquire ();
2228 // Note that make_changes() calls into user code which can
2229 // request other changes. So keep looping until all
2230 // requested changes are completed.
2231 while (this->handler_rep_.changes_required ())
2232 // Make necessary changes to the handler repository
2233 this->handler_rep_.make_changes ();
2234 if (this->new_owner ())
2235 // Update the owner
2236 this->change_owner ();
2237 // Turn off <wakeup_all_threads_>
2238 this->wakeup_all_threads_.reset ();
2239 // Let everyone know that it is ok to go ahead
2240 this->ok_to_wait_.signal ();
2241 // Reset this flag
2242 this->change_state_thread_ = 0;
2244 else if (this->active_threads_ == 0)
2245 // This thread did not get a chance to become the change
2246 // thread. If it is the last one out, it will wakeup the
2247 // change thread
2248 this->waiting_to_change_state_.signal ();
2250 // This is if we were woken up explicitily by the user and there are
2251 // no state changes required.
2252 else if (this->active_threads_ == 0)
2253 // Turn off <wakeup_all_threads_>
2254 this->wakeup_all_threads_.reset ();
2256 return 0;
2259 void
2260 ACE_WFMO_Reactor::dump () const
2262 #if defined (ACE_HAS_DUMP)
2263 ACE_TRACE ("ACE_WFMO_Reactor::dump");
2265 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
2267 ACELIB_DEBUG ((LM_DEBUG,
2268 ACE_TEXT ("Count of currently active threads = %d\n"),
2269 this->active_threads_));
2271 ACELIB_DEBUG ((LM_DEBUG,
2272 ACE_TEXT ("ID of owner thread = %d\n"),
2273 this->owner_));
2275 this->handler_rep_.dump ();
2276 this->signal_handler_->dump ();
2277 this->timer_queue_->dump ();
2279 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
2280 #endif /* ACE_HAS_DUMP */
2284 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
2285 ACE_Handle_Set & /*rd_mask*/)
2287 return -1;
2291 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & /*buffer*/)
2293 return 0;
2296 ACE_HANDLE
2297 ACE_WFMO_Reactor_Notify::notify_handle ()
2299 return ACE_INVALID_HANDLE;
2303 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
2304 ACE_Notification_Buffer &)
2306 return 0;
2310 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
2312 return 0;
2316 ACE_WFMO_Reactor_Notify::close ()
2318 return -1;
2321 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
2322 : timer_queue_ (0),
2323 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
2324 max_notifies * sizeof (ACE_Notification_Buffer)),
2325 max_notify_iterations_ (-1)
2330 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
2331 ACE_Timer_Queue *timer_queue,
2332 int ignore_notify)
2334 ACE_UNUSED_ARG (ignore_notify);
2335 timer_queue_ = timer_queue;
2336 return wfmo_reactor->register_handler (this);
2339 ACE_HANDLE
2340 ACE_WFMO_Reactor_Notify::get_handle () const
2342 return this->wakeup_one_thread_.handle ();
2345 // Handle all pending notifications.
2348 ACE_WFMO_Reactor_Notify::handle_signal (int signum, siginfo_t *siginfo, ucontext_t *)
2350 ACE_UNUSED_ARG (signum);
2352 // Just check for sanity...
2353 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
2354 return -1;
2356 // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
2357 // is signaled.
2358 // ACELIB_DEBUG ((LM_DEBUG,
2359 // ACE_TEXT ("(%t) waking up to handle internal notifications\n")));
2361 for (int i = 1; ; ++i)
2363 ACE_Message_Block *mb = 0;
2364 // Copy ACE_Time_Value::zero since dequeue_head will modify it.
2365 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
2366 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
2368 if (errno == EWOULDBLOCK)
2369 // We've reached the end of the processing, return
2370 // normally.
2371 return 0;
2372 else
2373 return -1; // Something weird happened...
2375 else
2377 ACE_Notification_Buffer *buffer =
2378 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
2380 // If eh == 0 then we've got major problems! Otherwise, we
2381 // need to dispatch the appropriate handle_* method on the
2382 // ACE_Event_Handler pointer we've been passed.
2384 if (buffer->eh_ != 0)
2386 ACE_Event_Handler *event_handler =
2387 buffer->eh_;
2389 bool const requires_reference_counting =
2390 event_handler->reference_counting_policy ().value () ==
2391 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
2393 int result = 0;
2395 switch (buffer->mask_)
2397 case ACE_Event_Handler::READ_MASK:
2398 case ACE_Event_Handler::ACCEPT_MASK:
2399 result = event_handler->handle_input (ACE_INVALID_HANDLE);
2400 break;
2401 case ACE_Event_Handler::WRITE_MASK:
2402 result = event_handler->handle_output (ACE_INVALID_HANDLE);
2403 break;
2404 case ACE_Event_Handler::EXCEPT_MASK:
2405 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
2406 break;
2407 case ACE_Event_Handler::QOS_MASK:
2408 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
2409 break;
2410 case ACE_Event_Handler::GROUP_QOS_MASK:
2411 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
2412 break;
2413 default:
2414 ACELIB_ERROR ((LM_ERROR,
2415 ACE_TEXT ("invalid mask = %d\n"),
2416 buffer->mask_));
2417 break;
2420 if (result == -1)
2421 event_handler->handle_close (ACE_INVALID_HANDLE,
2422 ACE_Event_Handler::EXCEPT_MASK);
2424 if (requires_reference_counting)
2426 event_handler->remove_reference ();
2430 // Make sure to delete the memory regardless of success or
2431 // failure!
2432 mb->release ();
2434 // Bail out if we've reached the <max_notify_iterations_>.
2435 // Note that by default <max_notify_iterations_> is -1, so
2436 // we'll loop until we're done.
2437 if (i == this->max_notify_iterations_)
2439 // If there are still notification in the queue, we need
2440 // to wake up again
2441 if (!this->message_queue_.is_empty ())
2442 this->wakeup_one_thread_.signal ();
2444 // Break the loop as we have reached max_notify_iterations_
2445 return 0;
2451 // Notify the WFMO_Reactor, potentially enqueueing the
2452 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
2453 // thread of control.
2456 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
2457 ACE_Reactor_Mask mask,
2458 ACE_Time_Value *timeout)
2460 if (event_handler != 0)
2462 ACE_Message_Block *mb = 0;
2463 ACE_NEW_RETURN (mb,
2464 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
2465 -1);
2467 ACE_Notification_Buffer *buffer =
2468 (ACE_Notification_Buffer *) mb->base ();
2469 buffer->eh_ = event_handler;
2470 buffer->mask_ = mask;
2472 // Convert from relative time to absolute time by adding the
2473 // current time of day. This is what <ACE_Message_Queue>
2474 // expects.
2475 if (timeout != 0)
2476 *timeout += timer_queue_->gettimeofday ();
2478 if (this->message_queue_.enqueue_tail
2479 (mb, timeout) == -1)
2481 mb->release ();
2482 return -1;
2485 event_handler->add_reference ();
2488 return this->wakeup_one_thread_.signal ();
2491 void
2492 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
2494 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2495 // Must always be > 0 or < 0 to optimize the loop exit condition.
2496 if (iterations == 0)
2497 iterations = 1;
2499 this->max_notify_iterations_ = iterations;
2503 ACE_WFMO_Reactor_Notify::max_notify_iterations ()
2505 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2506 return this->max_notify_iterations_;
2510 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
2511 ACE_Reactor_Mask mask)
2513 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
2515 // Go over message queue and take out all the matching event
2516 // handlers. If eh == 0, purge all. Note that reactor notifies (no
2517 // handler specified) are never purged, as this may lose a needed
2518 // notify the reactor queued for itself.
2520 if (this->message_queue_.is_empty ())
2521 return 0;
2523 // Guard against new and/or delivered notifications while purging.
2524 // WARNING!!! The use of the notification queue's lock object for
2525 // this guard makes use of the knowledge that on Win32, the mutex
2526 // protecting the queue is really a CriticalSection, which is
2527 // recursive. This is how we can get away with locking it down here
2528 // and still calling member functions on the queue object.
2529 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
2531 // first, copy all to our own local queue. Since we've locked everyone out
2532 // of here, there's no need to use any synchronization on this queue.
2533 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
2535 size_t queue_size = this->message_queue_.message_count ();
2536 int number_purged = 0;
2538 size_t index;
2540 for (index = 0; index < queue_size; ++index)
2542 ACE_Message_Block *mb = 0;
2543 if (-1 == this->message_queue_.dequeue_head (mb))
2544 return -1; // This shouldn't happen...
2546 ACE_Notification_Buffer *buffer =
2547 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
2549 // If this is not a Reactor notify (it is for a particular handler),
2550 // and it matches the specified handler (or purging all),
2551 // and applying the mask would totally eliminate the notification, then
2552 // release it and count the number purged.
2553 if ((0 != buffer->eh_) &&
2554 (0 == eh || eh == buffer->eh_) &&
2555 ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
2556 // is left with nothing when
2557 // applying the mask
2559 ACE_Event_Handler *event_handler = buffer->eh_;
2561 event_handler->remove_reference ();
2563 mb->release ();
2564 ++number_purged;
2566 else
2568 // To preserve it, move it to the local_queue. But first, if
2569 // this is not a Reactor notify (it is for a
2570 // particularhandler), and it matches the specified handler
2571 // (or purging all), then apply the mask
2572 if ((0 != buffer->eh_) &&
2573 (0 == eh || eh == buffer->eh_))
2574 ACE_CLR_BITS(buffer->mask_, mask);
2575 if (-1 == local_queue.enqueue_head (mb))
2576 return -1;
2580 if (this->message_queue_.message_count ())
2581 { // Should be empty!
2582 ACE_ASSERT (0);
2583 return -1;
2586 // Now copy back from the local queue to the class queue, taking
2587 // care to preserve the original order...
2588 queue_size = local_queue.message_count ();
2589 for (index = 0; index < queue_size; ++index)
2591 ACE_Message_Block *mb = 0;
2592 if (-1 == local_queue.dequeue_head (mb))
2594 ACE_ASSERT (0);
2595 return -1;
2598 if (-1 == this->message_queue_.enqueue_head (mb))
2600 ACE_ASSERT (0);
2601 return -1;
2605 return number_purged;
2608 void
2609 ACE_WFMO_Reactor_Notify::dump () const
2611 #if defined (ACE_HAS_DUMP)
2612 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
2613 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
2614 this->timer_queue_->dump ();
2615 ACELIB_DEBUG ((LM_DEBUG,
2616 ACE_TEXT ("Max. iteration: %d\n"),
2617 this->max_notify_iterations_));
2618 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
2619 #endif /* ACE_HAS_DUMP */
2622 void
2623 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
2625 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2626 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
2628 // Must always be > 0 or < 0 to optimize the loop exit condition.
2629 this->notify_handler_->max_notify_iterations (iterations);
2633 ACE_WFMO_Reactor::max_notify_iterations ()
2635 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2636 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
2638 return this->notify_handler_->max_notify_iterations ();
2642 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
2643 ACE_Reactor_Mask mask)
2645 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
2646 if (this->notify_handler_ == 0)
2647 return 0;
2648 else
2649 return this->notify_handler_->purge_pending_notifications (eh, mask);
2653 ACE_WFMO_Reactor::resumable_handler ()
2655 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
2656 return 0;
2660 // No-op WinSOCK2 methods to help WFMO_Reactor compile
2661 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
2663 WSAEventSelect (SOCKET /* s */,
2664 WSAEVENT /* hEventObject */,
2665 long /* lNetworkEvents */)
2667 return -1;
2671 WSAEnumNetworkEvents (SOCKET /* s */,
2672 WSAEVENT /* hEventObject */,
2673 LPWSANETWORKEVENTS /* lpNetworkEvents */)
2675 return -1;
2677 #endif /* !defined ACE_HAS_WINSOCK2 */
2679 ACE_END_VERSIONED_NAMESPACE_DECL
2681 #endif /* ACE_WIN32 */