Also use Objects as part of an operation but as a result don't generate Any operation...
[ACE_TAO.git] / ACE / ace / WFMO_Reactor.cpp
blob5f901b14108ddc933ca69af94d447411296f9e22
1 #include "ace/WFMO_Reactor.h"
3 #if defined (ACE_WIN32)
5 #include "ace/Handle_Set.h"
6 #include "ace/Timer_Heap.h"
7 #include "ace/Thread.h"
8 #include "ace/OS_NS_errno.h"
9 #include "ace/Null_Condition.h"
11 #if !defined (__ACE_INLINE__)
12 #include "ace/WFMO_Reactor.inl"
13 #endif /* __ACE_INLINE__ */
15 #include "ace/Auto_Ptr.h"
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor &wfmo_reactor)
20 : wfmo_reactor_ (wfmo_reactor)
24 int
25 ACE_WFMO_Reactor_Handler_Repository::open (size_t size)
27 if (size > MAXIMUM_WAIT_OBJECTS)
28 ACELIB_ERROR_RETURN ((LM_ERROR,
29 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
30 size,
31 MAXIMUM_WAIT_OBJECTS),
32 -1);
34 // Dynamic allocation
35 ACE_NEW_RETURN (this->current_handles_,
36 ACE_HANDLE[size],
37 -1);
38 ACE_NEW_RETURN (this->current_info_,
39 Current_Info[size],
40 -1);
41 ACE_NEW_RETURN (this->current_suspended_info_,
42 Suspended_Info[size],
43 -1);
44 ACE_NEW_RETURN (this->to_be_added_info_,
45 To_Be_Added_Info[size],
46 -1);
48 // Initialization
49 this->max_size_ = size;
50 this->max_handlep1_ = 0;
51 this->suspended_handles_ = 0;
52 this->handles_to_be_added_ = 0;
53 this->handles_to_be_deleted_ = 0;
54 this->handles_to_be_suspended_ = 0;
55 this->handles_to_be_resumed_ = 0;
57 for (size_t i = 0; i < size; ++i)
58 this->current_handles_[i] = ACE_INVALID_HANDLE;
60 return 0;
63 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository (void)
65 // Free up dynamically allocated space
66 delete [] this->current_handles_;
67 delete [] this->current_info_;
68 delete [] this->current_suspended_info_;
69 delete [] this->to_be_added_info_;
72 ACE_Reactor_Mask
73 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks,
74 ACE_Reactor_Mask change_masks,
75 int operation)
77 // Find the old reactor masks. This automatically does the work of
78 // the GET_MASK operation.
80 ACE_Reactor_Mask old_masks = ACE_Event_Handler::NULL_MASK;
82 if (ACE_BIT_ENABLED (existing_masks, FD_READ)
83 || ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
84 ACE_SET_BITS (old_masks, ACE_Event_Handler::READ_MASK);
86 if (ACE_BIT_ENABLED (existing_masks, FD_WRITE))
87 ACE_SET_BITS (old_masks, ACE_Event_Handler::WRITE_MASK);
89 if (ACE_BIT_ENABLED (existing_masks, FD_OOB))
90 ACE_SET_BITS (old_masks, ACE_Event_Handler::EXCEPT_MASK);
92 if (ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
93 ACE_SET_BITS (old_masks, ACE_Event_Handler::ACCEPT_MASK);
95 if (ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
96 ACE_SET_BITS (old_masks, ACE_Event_Handler::CONNECT_MASK);
98 if (ACE_BIT_ENABLED (existing_masks, FD_QOS))
99 ACE_SET_BITS (old_masks, ACE_Event_Handler::QOS_MASK);
101 if (ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
102 ACE_SET_BITS (old_masks, ACE_Event_Handler::GROUP_QOS_MASK);
104 switch (operation)
106 case ACE_Reactor::CLR_MASK:
107 // For the CLR_MASK operation, clear only the specific masks.
109 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
111 ACE_CLR_BITS (existing_masks, FD_READ);
112 ACE_CLR_BITS (existing_masks, FD_CLOSE);
115 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
116 ACE_CLR_BITS (existing_masks, FD_WRITE);
118 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
119 ACE_CLR_BITS (existing_masks, FD_OOB);
121 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
122 ACE_CLR_BITS (existing_masks, FD_ACCEPT);
124 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
125 ACE_CLR_BITS (existing_masks, FD_CONNECT);
127 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
128 ACE_CLR_BITS (existing_masks, FD_QOS);
130 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
131 ACE_CLR_BITS (existing_masks, FD_GROUP_QOS);
133 break;
135 case ACE_Reactor::SET_MASK:
136 // If the operation is a set, first reset any existing masks
138 existing_masks = 0;
139 /* FALLTHRU */
141 case ACE_Reactor::ADD_MASK:
142 // For the ADD_MASK and the SET_MASK operation, add only the
143 // specific masks.
145 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::READ_MASK))
147 ACE_SET_BITS (existing_masks, FD_READ);
148 ACE_SET_BITS (existing_masks, FD_CLOSE);
151 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::WRITE_MASK))
152 ACE_SET_BITS (existing_masks, FD_WRITE);
154 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::EXCEPT_MASK))
155 ACE_SET_BITS (existing_masks, FD_OOB);
157 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::ACCEPT_MASK))
158 ACE_SET_BITS (existing_masks, FD_ACCEPT);
160 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::CONNECT_MASK))
161 ACE_SET_BITS (existing_masks, FD_CONNECT);
163 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::QOS_MASK))
164 ACE_SET_BITS (existing_masks, FD_QOS);
166 if (ACE_BIT_ENABLED (change_masks, ACE_Event_Handler::GROUP_QOS_MASK))
167 ACE_SET_BITS (existing_masks, FD_GROUP_QOS);
169 break;
171 case ACE_Reactor::GET_MASK:
173 // The work for this operation is done in all cases at the
174 // beginning of the function.
176 ACE_UNUSED_ARG (change_masks);
178 break;
181 return old_masks;
185 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle,
186 ACE_Reactor_Mask mask,
187 bool &changes_required)
189 int error = 0;
191 // Remember this value; only if it changes do we need to wakeup
192 // the other threads
193 size_t const original_handle_count = this->handles_to_be_deleted_;
194 size_t i;
196 // Go through all the handles looking for <handle>. Even if we find
197 // it, we continue through the rest of the list since <handle> could
198 // appear multiple times. All handles are checked.
200 // First check the current entries
201 for (i = 0; i < this->max_handlep1_ && error == 0; ++i)
202 // Since the handle can either be the event or the I/O handle,
203 // we have to check both
204 if ((this->current_handles_[i] == handle
205 || this->current_info_[i].io_handle_ == handle)
206 && // Make sure that it is not already marked for deleted
207 !this->current_info_[i].delete_entry_)
209 if (this->remove_handler_i (i, mask) == -1)
210 error = 1;
213 // Then check the suspended entries
214 for (i = 0; i < this->suspended_handles_ && error == 0; ++i)
215 // Since the handle can either be the event or the I/O handle, we
216 // have to check both
217 if ((this->current_suspended_info_[i].io_handle_ == handle
218 || this->current_suspended_info_[i].event_handle_ == handle)
220 // Make sure that it is not already marked for deleted
221 !this->current_suspended_info_[i].delete_entry_)
223 if (this->remove_suspended_handler_i (i, mask) == -1)
224 error = 1;
227 // Then check the to_be_added entries
228 for (i = 0; i < this->handles_to_be_added_ && error == 0; ++i)
229 // Since the handle can either be the event or the I/O handle,
230 // we have to check both
231 if ((this->to_be_added_info_[i].io_handle_ == handle
232 || this->to_be_added_info_[i].event_handle_ == handle)
234 // Make sure that it is not already marked for deleted
235 !this->to_be_added_info_[i].delete_entry_)
237 if (this->remove_to_be_added_handler_i (i, mask) == -1)
238 error = 1;
241 // Only if the number of handlers to be deleted changes do we need
242 // to wakeup the other threads
243 if (original_handle_count < this->handles_to_be_deleted_)
244 changes_required = true;
246 return error ? -1 : 0;
250 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot,
251 ACE_Reactor_Mask to_be_removed_masks)
253 // I/O entries
254 if (this->current_info_[slot].io_entry_)
256 // See if there are other events that the <Event_Handler> is
257 // interested in
258 this->bit_ops (this->current_info_[slot].network_events_,
259 to_be_removed_masks,
260 ACE_Reactor::CLR_MASK);
262 // Disassociate/Reassociate the event from/with the I/O handle.
263 // This will depend on the value of remaining set of network
264 // events that the <event_handler> is interested in. I don't
265 // think we can do anything about errors here, so I will not
266 // check this.
267 ::WSAEventSelect ((SOCKET) this->current_info_[slot].io_handle_,
268 this->current_handles_[slot],
269 this->current_info_[slot].network_events_);
271 // Normal event entries.
272 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
273 // Preserve DONT_CALL
274 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
275 else
276 // Make sure that the <to_be_removed_masks> is the NULL_MASK
277 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
279 // If this event was marked for suspension, undo the suspension flag
280 // and reduce the to be suspended count.
281 if (this->current_info_[slot].suspend_entry_)
283 // Undo suspension
284 this->current_info_[slot].suspend_entry_ = false;
285 // Decrement the handle count
286 --this->handles_to_be_suspended_;
289 // If there are no more events that the <Event_Handler> is
290 // interested in, or this is a non-I/O entry, schedule the
291 // <Event_Handler> for removal
292 if (this->current_info_[slot].network_events_ == 0)
294 // Mark to be deleted
295 this->current_info_[slot].delete_entry_ = true;
296 // Remember the mask
297 this->current_info_[slot].close_masks_ = to_be_removed_masks;
298 // Increment the handle count
299 ++this->handles_to_be_deleted_;
302 // Since it is not a complete removal, we'll call handle_close
303 // for all the masks that were removed. This does not change
304 // the internal state of the reactor.
306 // Note: this condition only applies to I/O entries
307 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
309 ACE_HANDLE handle = this->current_info_[slot].io_handle_;
310 this->current_info_[slot].event_handler_->handle_close (handle,
311 to_be_removed_masks);
314 return 0;
318 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot,
319 ACE_Reactor_Mask to_be_removed_masks)
321 // I/O entries
322 if (this->current_suspended_info_[slot].io_entry_)
324 // See if there are other events that the <Event_Handler> is
325 // interested in
326 this->bit_ops (this->current_suspended_info_[slot].network_events_,
327 to_be_removed_masks,
328 ACE_Reactor::CLR_MASK);
330 // Disassociate/Reassociate the event from/with the I/O handle.
331 // This will depend on the value of remaining set of network
332 // events that the <event_handler> is interested in. I don't
333 // think we can do anything about errors here, so I will not
334 // check this.
335 ::WSAEventSelect ((SOCKET) this->current_suspended_info_[slot].io_handle_,
336 this->current_suspended_info_[slot].event_handle_,
337 this->current_suspended_info_[slot].network_events_);
339 // Normal event entries.
340 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
341 // Preserve DONT_CALL
342 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
343 else
344 // Make sure that the <to_be_removed_masks> is the NULL_MASK
345 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
347 // If this event was marked for resumption, undo the resumption flag
348 // and reduce the to be resumed count.
349 if (this->current_suspended_info_[slot].resume_entry_)
351 // Undo resumption
352 this->current_suspended_info_[slot].resume_entry_ = false;
353 // Decrement the handle count
354 --this->handles_to_be_resumed_;
357 // If there are no more events that the <Event_Handler> is
358 // interested in, or this is a non-I/O entry, schedule the
359 // <Event_Handler> for removal
360 if (this->current_suspended_info_[slot].network_events_ == 0)
362 // Mark to be deleted
363 this->current_suspended_info_[slot].delete_entry_ = true;
364 // Remember the mask
365 this->current_suspended_info_[slot].close_masks_ = to_be_removed_masks;
366 // Increment the handle count
367 ++this->handles_to_be_deleted_;
369 // Since it is not a complete removal, we'll call handle_close for
370 // all the masks that were removed. This does not change the
371 // internal state of the reactor.
373 // Note: this condition only applies to I/O entries
374 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
376 ACE_HANDLE handle = this->current_suspended_info_[slot].io_handle_;
377 this->current_suspended_info_[slot].event_handler_->handle_close (handle,
378 to_be_removed_masks);
381 return 0;
385 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot,
386 ACE_Reactor_Mask to_be_removed_masks)
388 // I/O entries
389 if (this->to_be_added_info_[slot].io_entry_)
391 // See if there are other events that the <Event_Handler> is
392 // interested in
393 this->bit_ops (this->to_be_added_info_[slot].network_events_,
394 to_be_removed_masks,
395 ACE_Reactor::CLR_MASK);
397 // Disassociate/Reassociate the event from/with the I/O handle.
398 // This will depend on the value of remaining set of network
399 // events that the <event_handler> is interested in. I don't
400 // think we can do anything about errors here, so I will not
401 // check this.
402 ::WSAEventSelect ((SOCKET) this->to_be_added_info_[slot].io_handle_,
403 this->to_be_added_info_[slot].event_handle_,
404 this->to_be_added_info_[slot].network_events_);
406 // Normal event entries.
407 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL))
408 // Preserve DONT_CALL
409 to_be_removed_masks = ACE_Event_Handler::DONT_CALL;
410 else
411 // Make sure that the <to_be_removed_masks> is the NULL_MASK
412 to_be_removed_masks = ACE_Event_Handler::NULL_MASK;
414 // If this event was marked for suspension, undo the suspension flag
415 // and reduce the to be suspended count.
416 if (this->to_be_added_info_[slot].suspend_entry_)
418 // Undo suspension
419 this->to_be_added_info_[slot].suspend_entry_ = false;
420 // Decrement the handle count
421 --this->handles_to_be_suspended_;
424 // If there are no more events that the <Event_Handler> is
425 // interested in, or this is a non-I/O entry, schedule the
426 // <Event_Handler> for removal
427 if (this->to_be_added_info_[slot].network_events_ == 0)
429 // Mark to be deleted
430 this->to_be_added_info_[slot].delete_entry_ = true;
431 // Remember the mask
432 this->to_be_added_info_[slot].close_masks_ = to_be_removed_masks;
433 // Increment the handle count
434 ++this->handles_to_be_deleted_;
436 // Since it is not a complete removal, we'll call handle_close
437 // for all the masks that were removed. This does not change
438 // the internal state of the reactor.
440 // Note: this condition only applies to I/O entries
441 else if (ACE_BIT_ENABLED (to_be_removed_masks, ACE_Event_Handler::DONT_CALL) == 0)
443 ACE_HANDLE handle = this->to_be_added_info_[slot].io_handle_;
444 this->to_be_added_info_[slot].event_handler_->handle_close (handle,
445 to_be_removed_masks);
448 return 0;
452 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle,
453 bool &changes_required)
455 size_t i = 0;
457 // Go through all the handles looking for <handle>. Even if we find
458 // it, we continue through the rest of the list since <handle> could
459 // appear multiple times. All handles are checked.
461 // Check the current entries first.
462 for (i = 0; i < this->max_handlep1_; ++i)
463 // Since the handle can either be the event or the I/O handle,
464 // we have to check both
465 if ((this->current_handles_[i] == handle ||
466 this->current_info_[i].io_handle_ == handle) &&
467 // Make sure that it is not already marked for suspension
468 !this->current_info_[i].suspend_entry_)
470 // Mark to be suspended
471 this->current_info_[i].suspend_entry_ = true;
472 // Increment the handle count
473 ++this->handles_to_be_suspended_;
474 // Changes will be required
475 changes_required = true;
478 // Then check the suspended entries.
479 for (i = 0; i < this->suspended_handles_; ++i)
480 // Since the handle can either be the event or the I/O handle,
481 // we have to check both
482 if ((this->current_suspended_info_[i].event_handle_ == handle ||
483 this->current_suspended_info_[i].io_handle_ == handle) &&
484 // Make sure that the resumption is not already undone
485 this->current_suspended_info_[i].resume_entry_)
487 // Undo resumption
488 this->current_suspended_info_[i].resume_entry_ = false;
489 // Decrement the handle count
490 --this->handles_to_be_resumed_;
491 // Changes will be required
492 changes_required = true;
495 // Then check the to_be_added entries.
496 for (i = 0; i < this->handles_to_be_added_; ++i)
497 // Since the handle can either be the event or the I/O handle,
498 // we have to check both
499 if ((this->to_be_added_info_[i].io_handle_ == handle ||
500 this->to_be_added_info_[i].event_handle_ == handle) &&
501 // Make sure that it is not already marked for suspension
502 !this->to_be_added_info_[i].suspend_entry_)
504 // Mark to be suspended
505 this->to_be_added_info_[i].suspend_entry_ = true;
506 // Increment the handle count
507 ++this->handles_to_be_suspended_;
508 // Changes will be required
509 changes_required = true;
512 return 0;
516 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle,
517 bool &changes_required)
519 size_t i = 0;
521 // Go through all the handles looking for <handle>. Even if we find
522 // it, we continue through the rest of the list since <handle> could
523 // appear multiple times. All handles are checked.
525 // Check the current entries first.
526 for (i = 0; i < this->max_handlep1_; ++i)
527 // Since the handle can either be the event or the I/O handle,
528 // we have to check both
529 if ((this->current_handles_[i] == handle ||
530 this->current_info_[i].io_handle_ == handle) &&
531 // Make sure that the suspension is not already undone
532 this->current_info_[i].suspend_entry_)
534 // Undo suspension
535 this->current_info_[i].suspend_entry_ = false;
536 // Decrement the handle count
537 --this->handles_to_be_suspended_;
538 // Changes will be required
539 changes_required = true;
542 // Then check the suspended entries.
543 for (i = 0; i < this->suspended_handles_; ++i)
544 // Since the handle can either be the event or the I/O handle,
545 // we have to check both
546 if ((this->current_suspended_info_[i].event_handle_ == handle ||
547 this->current_suspended_info_[i].io_handle_ == handle) &&
548 // Make sure that it is not already marked for resumption
549 !this->current_suspended_info_[i].resume_entry_)
551 // Mark to be resumed
552 this->current_suspended_info_[i].resume_entry_ = true;
553 // Increment the handle count
554 ++this->handles_to_be_resumed_;
555 // Changes will be required
556 changes_required = true;
559 // Then check the to_be_added entries.
560 for (i = 0; i < this->handles_to_be_added_; ++i)
561 // Since the handle can either be the event or the I/O handle,
562 // we have to check both
563 if ((this->to_be_added_info_[i].io_handle_ == handle ||
564 this->to_be_added_info_[i].event_handle_ == handle) &&
565 // Make sure that the suspension is not already undone
566 this->to_be_added_info_[i].suspend_entry_)
568 // Undo suspension
569 this->to_be_added_info_[i].suspend_entry_ = false;
570 // Decrement the handle count
571 --this->handles_to_be_suspended_;
572 // Changes will be required
573 changes_required = true;
576 return 0;
579 void
580 ACE_WFMO_Reactor_Handler_Repository::unbind_all (void)
583 ACE_GUARD (ACE_Process_Mutex, ace_mon, this->wfmo_reactor_.lock_);
585 bool dummy;
586 size_t i;
588 // Remove all the current handlers
589 for (i = 0; i < this->max_handlep1_; ++i)
590 this->unbind_i (this->current_handles_[i],
591 ACE_Event_Handler::ALL_EVENTS_MASK,
592 dummy);
594 // Remove all the suspended handlers
595 for (i = 0; i < this->suspended_handles_; ++i)
596 this->unbind_i (this->current_suspended_info_[i].event_handle_,
597 ACE_Event_Handler::ALL_EVENTS_MASK,
598 dummy);
600 // Remove all the to_be_added handlers
601 for (i = 0; i < this->handles_to_be_added_; ++i)
602 this->unbind_i (this->to_be_added_info_[i].event_handle_,
603 ACE_Event_Handler::ALL_EVENTS_MASK,
604 dummy);
607 // The guard is released here
609 // Wake up all threads in WaitForMultipleObjects so that they can
610 // reconsult the handle set
611 this->wfmo_reactor_.wakeup_all_threads ();
615 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry,
616 ACE_Event_Handler *event_handler,
617 long network_events,
618 ACE_HANDLE io_handle,
619 ACE_HANDLE event_handle,
620 bool delete_event)
622 if (event_handler == 0)
623 return -1;
625 // Make sure that the <handle> is valid
626 if (event_handle == ACE_INVALID_HANDLE)
627 event_handle = event_handler->get_handle ();
628 if (this->invalid_handle (event_handle))
629 return -1;
631 size_t current_size = this->max_handlep1_ +
632 this->handles_to_be_added_ -
633 this->handles_to_be_deleted_ +
634 this->suspended_handles_;
636 // Make sure that there's room in the table and that total pending
637 // additions should not exceed what the <to_be_added_info_> array
638 // can hold.
639 if (current_size < this->max_size_ &&
640 this->handles_to_be_added_ < this->max_size_)
642 // Cache this set into the <to_be_added_info_>, till we come
643 // around to actually adding this to the <current_info_>
644 this->to_be_added_info_[this->handles_to_be_added_].set (event_handle,
645 io_entry,
646 event_handler,
647 io_handle,
648 network_events,
649 delete_event);
651 ++this->handles_to_be_added_;
653 event_handler->add_reference ();
655 // Wake up all threads in WaitForMultipleObjects so that they can
656 // reconsult the handle set
657 this->wfmo_reactor_.wakeup_all_threads ();
659 else
661 errno = EMFILE; // File descriptor table is full (better than nothing)
662 return -1;
665 return 0;
669 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos (void)
671 // Go through the entire valid array and check for all handles that
672 // have been schedule for deletion
673 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_suspended_ > 0)
675 size_t i = 0;
676 while (i < this->max_handlep1_)
678 // This stuff is necessary here, since we should not make
679 // the upcall until all the internal data structures have
680 // been updated. This is to protect against upcalls that
681 // try to deregister again.
682 ACE_HANDLE handle = ACE_INVALID_HANDLE;
683 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
684 ACE_Event_Handler *event_handler = 0;
686 // See if this entry is scheduled for deletion
687 if (this->current_info_[i].delete_entry_)
689 // Calling the <handle_close> method here will ensure that we
690 // will only call it once per deregistering <Event_Handler>.
691 // This is essential in the case when the <Event_Handler> will
692 // do something like delete itself and we have multiple
693 // threads in WFMO_Reactor.
695 // Make sure that the DONT_CALL mask is not set
696 masks = this->current_info_[i].close_masks_;
697 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
699 // Grab the correct handle depending on the type entry
700 if (this->current_info_[i].io_entry_)
701 handle = this->current_info_[i].io_handle_;
702 else
703 handle = this->current_handles_[i];
705 // Event handler
706 event_handler = this->current_info_[i].event_handler_;
709 // If <WFMO_Reactor> created the event, we need to clean it up
710 if (this->current_info_[i].delete_event_)
711 ACE_OS::event_destroy (&this->current_handles_[i]);
713 // Reduce count by one
714 --this->handles_to_be_deleted_;
717 // See if this entry is scheduled for suspension
718 else if (this->current_info_[i].suspend_entry_)
720 this->current_suspended_info_ [this->suspended_handles_].set (this->current_handles_[i],
721 this->current_info_[i]);
722 // Increase number of suspended handles
723 ++this->suspended_handles_;
725 // Reduce count by one
726 --this->handles_to_be_suspended_;
729 // See if this entry is scheduled for deletion or suspension
730 // If so we need to clean up
731 if (this->current_info_[i].delete_entry_ ||
732 this->current_info_[i].suspend_entry_ )
734 size_t last_valid_slot = this->max_handlep1_ - 1;
735 // If this is the last handle in the set, no need to swap
736 // places. Simply remove it.
737 if (i < last_valid_slot)
738 // Swap this handle with the last valid handle
740 // Struct copy
741 this->current_info_[i] =
742 this->current_info_[last_valid_slot];
743 this->current_handles_[i] =
744 this->current_handles_[last_valid_slot];
746 // Reset the info in this slot
747 this->current_info_[last_valid_slot].reset ();
748 this->current_handles_[last_valid_slot] = ACE_INVALID_HANDLE;
749 --this->max_handlep1_;
751 else
753 // This current entry is not up for deletion or
754 // suspension. Proceed to the next entry in the current
755 // handles.
756 ++i;
759 // Now that all internal structures have been updated, make
760 // the upcall.
761 if (event_handler != 0)
763 bool const requires_reference_counting =
764 event_handler->reference_counting_policy ().value () ==
765 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
767 event_handler->handle_close (handle, masks);
769 if (requires_reference_counting)
771 event_handler->remove_reference ();
777 return 0;
781 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos (void)
783 // Go through the <suspended_handle> array
784 if (this->handles_to_be_deleted_ > 0 || this->handles_to_be_resumed_ > 0)
786 size_t i = 0;
787 while (i < this->suspended_handles_)
789 // This stuff is necessary here, since we should not make
790 // the upcall until all the internal data structures have
791 // been updated. This is to protect against upcalls that
792 // try to deregister again.
793 ACE_HANDLE handle = ACE_INVALID_HANDLE;
794 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
795 ACE_Event_Handler *event_handler = 0;
797 // See if this entry is scheduled for deletion
798 if (this->current_suspended_info_[i].delete_entry_)
800 // Calling the <handle_close> method here will ensure that we
801 // will only call it once per deregistering <Event_Handler>.
802 // This is essential in the case when the <Event_Handler> will
803 // do something like delete itself and we have multiple
804 // threads in WFMO_Reactor.
806 // Make sure that the DONT_CALL mask is not set
807 masks = this->current_suspended_info_[i].close_masks_;
808 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
810 // Grab the correct handle depending on the type entry
811 if (this->current_suspended_info_[i].io_entry_)
812 handle = this->current_suspended_info_[i].io_handle_;
813 else
814 handle = this->current_suspended_info_[i].event_handle_;
816 // Upcall
817 event_handler = this->current_suspended_info_[i].event_handler_;
820 // If <WFMO_Reactor> created the event, we need to clean it up
821 if (this->current_suspended_info_[i].delete_event_)
822 ACE_OS::event_destroy (&this->current_suspended_info_[i].event_handle_);
824 // Reduce count by one
825 --this->handles_to_be_deleted_;
828 else if (this->current_suspended_info_[i].resume_entry_)
830 // Add to the end of the current handles set
831 this->current_handles_[this->max_handlep1_] = this->current_suspended_info_[i].event_handle_;
832 // Struct copy
833 this->current_info_[this->max_handlep1_].set (this->current_suspended_info_[i]);
834 ++this->max_handlep1_;
836 // Reduce count by one
837 --this->handles_to_be_resumed_;
840 // If an entry needs to be removed, either because it
841 // was deleted or resumed, remove it now before doing
842 // the upcall.
843 if (this->current_suspended_info_[i].resume_entry_ ||
844 this->current_suspended_info_[i].delete_entry_)
846 size_t last_valid_slot = this->suspended_handles_ - 1;
847 // Net effect is that we're removing an entry and
848 // compressing the list from the end. So, if removing
849 // an entry from the middle, copy the last valid one to the
850 // removed slot. Reset the end and decrement the number
851 // of suspended handles.
852 if (i < last_valid_slot)
853 // Struct copy
854 this->current_suspended_info_[i] =
855 this->current_suspended_info_[last_valid_slot];
856 this->current_suspended_info_[last_valid_slot].reset ();
857 --this->suspended_handles_;
859 else
861 // This current entry is not up for deletion or
862 // resumption. Proceed to the next entry in the
863 // suspended handles.
864 ++i;
867 // Now that all internal structures have been updated, make
868 // the upcall.
869 if (event_handler != 0)
871 int requires_reference_counting =
872 event_handler->reference_counting_policy ().value () ==
873 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
875 event_handler->handle_close (handle, masks);
877 if (requires_reference_counting)
879 event_handler->remove_reference ();
885 return 0;
889 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos (void)
891 // Go through the <to_be_added_*> arrays
892 for (size_t i = 0; i < this->handles_to_be_added_; ++i)
894 // This stuff is necessary here, since we should not make
895 // the upcall until all the internal data structures have
896 // been updated. This is to protect against upcalls that
897 // try to deregister again.
898 ACE_HANDLE handle = ACE_INVALID_HANDLE;
899 ACE_Reactor_Mask masks = ACE_Event_Handler::NULL_MASK;
900 ACE_Event_Handler *event_handler = 0;
902 // See if this entry is scheduled for deletion
903 if (this->to_be_added_info_[i].delete_entry_)
905 // Calling the <handle_close> method here will ensure that we
906 // will only call it once per deregistering <Event_Handler>.
907 // This is essential in the case when the <Event_Handler> will
908 // do something like delete itself and we have multiple
909 // threads in WFMO_Reactor.
911 // Make sure that the DONT_CALL mask is not set
912 masks = this->to_be_added_info_[i].close_masks_;
913 if (ACE_BIT_ENABLED (masks, ACE_Event_Handler::DONT_CALL) == 0)
915 // Grab the correct handle depending on the type entry
916 if (this->to_be_added_info_[i].io_entry_)
917 handle = this->to_be_added_info_[i].io_handle_;
918 else
919 handle = this->to_be_added_info_[i].event_handle_;
921 // Upcall
922 event_handler = this->to_be_added_info_[i].event_handler_;
925 // If <WFMO_Reactor> created the event, we need to clean it up
926 if (this->to_be_added_info_[i].delete_event_)
927 ACE_OS::event_destroy (&this->to_be_added_info_[i].event_handle_);
929 // Reduce count by one
930 --this->handles_to_be_deleted_;
933 // See if this entry is scheduled for suspension
934 else if (this->to_be_added_info_[i].suspend_entry_)
936 this->current_suspended_info_ [this->suspended_handles_].set (this->to_be_added_info_[i].event_handle_,
937 this->to_be_added_info_[i]);
938 // Increase number of suspended handles
939 ++this->suspended_handles_;
941 // Reduce count by one
942 --this->handles_to_be_suspended_;
945 // If neither of the two flags are on, add to current
946 else
948 // Add to the end of the current handles set
949 this->current_handles_[this->max_handlep1_] = this->to_be_added_info_[i].event_handle_;
950 // Struct copy
951 this->current_info_[this->max_handlep1_].set (this->to_be_added_info_[i]);
952 ++this->max_handlep1_;
955 // Reset the <to_be_added_info_>
956 this->to_be_added_info_[i].reset ();
958 // Now that all internal structures have been updated, make the
959 // upcall.
960 if (event_handler != 0)
962 int requires_reference_counting =
963 event_handler->reference_counting_policy ().value () ==
964 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
966 event_handler->handle_close (handle, masks);
968 if (requires_reference_counting)
970 event_handler->remove_reference ();
975 // Since all to be added handles have been taken care of, reset the
976 // counter
977 this->handles_to_be_added_ = 0;
979 return 0;
982 void
983 ACE_WFMO_Reactor_Handler_Repository::dump (void) const
985 #if defined (ACE_HAS_DUMP)
986 size_t i = 0;
988 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
990 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
992 ACELIB_DEBUG ((LM_DEBUG,
993 ACE_TEXT ("Max size = %d\n"),
994 this->max_size_));
996 ACELIB_DEBUG ((LM_DEBUG,
997 ACE_TEXT ("Current info table\n\n")));
998 ACELIB_DEBUG ((LM_DEBUG,
999 ACE_TEXT ("\tSize = %d\n"),
1000 this->max_handlep1_));
1001 ACELIB_DEBUG ((LM_DEBUG,
1002 ACE_TEXT ("\tHandles to be suspended = %d\n"),
1003 this->handles_to_be_suspended_));
1005 for (i = 0; i < this->max_handlep1_; ++i)
1006 this->current_info_[i].dump (this->current_handles_[i]);
1008 ACELIB_DEBUG ((LM_DEBUG,
1009 ACE_TEXT ("\n")));
1011 ACELIB_DEBUG ((LM_DEBUG,
1012 ACE_TEXT ("To-be-added info table\n\n")));
1013 ACELIB_DEBUG ((LM_DEBUG,
1014 ACE_TEXT ("\tSize = %d\n"),
1015 this->handles_to_be_added_));
1017 for (i = 0; i < this->handles_to_be_added_; ++i)
1018 this->to_be_added_info_[i].dump ();
1020 ACELIB_DEBUG ((LM_DEBUG,
1021 ACE_TEXT ("\n")));
1023 ACELIB_DEBUG ((LM_DEBUG,
1024 ACE_TEXT ("Suspended info table\n\n")));
1025 ACELIB_DEBUG ((LM_DEBUG,
1026 ACE_TEXT ("\tSize = %d\n"),
1027 this->suspended_handles_));
1028 ACELIB_DEBUG ((LM_DEBUG,
1029 ACE_TEXT ("\tHandles to be resumed = %d\n"),
1030 this->handles_to_be_resumed_));
1032 for (i = 0; i < this->suspended_handles_; ++i)
1033 this->current_suspended_info_[i].dump ();
1035 ACELIB_DEBUG ((LM_DEBUG,
1036 ACE_TEXT ("\n")));
1038 ACELIB_DEBUG ((LM_DEBUG,
1039 ACE_TEXT ("Total handles to be deleted = %d\n"),
1040 this->handles_to_be_deleted_));
1042 ACELIB_DEBUG ((LM_DEBUG,
1043 ACE_END_DUMP));
1044 #endif /* ACE_HAS_DUMP */
1047 /************************************************************/
1050 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value &)
1052 ACE_NOTSUP_RETURN (-1);
1055 #if defined (ACE_WIN32_VC8)
1056 # pragma warning (push)
1057 # pragma warning (disable:4355) /* Use of 'this' in initializer list */
1058 # endif
1059 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler *sh,
1060 ACE_Timer_Queue *tq,
1061 ACE_Reactor_Notify *notify)
1062 : signal_handler_ (0),
1063 delete_signal_handler_ (false),
1064 timer_queue_ (0),
1065 delete_timer_queue_ (false),
1066 delete_handler_rep_ (false),
1067 notify_handler_ (0),
1068 delete_notify_handler_ (false),
1069 lock_adapter_ (lock_),
1070 handler_rep_ (*this),
1071 // this event is initially signaled
1072 ok_to_wait_ (1),
1073 // this event is initially unsignaled
1074 wakeup_all_threads_ (0),
1075 // this event is initially unsignaled
1076 waiting_to_change_state_ (0),
1077 active_threads_ (0),
1078 owner_ (ACE_Thread::self ()),
1079 new_owner_ (0),
1080 change_state_thread_ (0),
1081 open_for_business_ (false),
1082 deactivated_ (0)
1084 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE, 0, sh, tq, 0, notify) == -1)
1085 ACELIB_ERROR ((LM_ERROR,
1086 ACE_TEXT ("%p\n"),
1087 ACE_TEXT ("WFMO_Reactor")));
1090 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size,
1091 int unused,
1092 ACE_Sig_Handler *sh,
1093 ACE_Timer_Queue *tq,
1094 ACE_Reactor_Notify *notify)
1095 : signal_handler_ (0),
1096 delete_signal_handler_ (false),
1097 timer_queue_ (0),
1098 delete_timer_queue_ (false),
1099 delete_handler_rep_ (false),
1100 notify_handler_ (0),
1101 delete_notify_handler_ (false),
1102 lock_adapter_ (lock_),
1103 handler_rep_ (*this),
1104 // this event is initially signaled
1105 ok_to_wait_ (1),
1106 // this event is initially unsignaled
1107 wakeup_all_threads_ (0),
1108 // this event is initially unsignaled
1109 waiting_to_change_state_ (0),
1110 active_threads_ (0),
1111 owner_ (ACE_Thread::self ()),
1112 new_owner_ (0),
1113 change_state_thread_ (0),
1114 open_for_business_ (false),
1115 deactivated_ (0)
1117 ACE_UNUSED_ARG (unused);
1119 if (this->open (size, 0, sh, tq, 0, notify) == -1)
1120 ACELIB_ERROR ((LM_ERROR,
1121 ACE_TEXT ("%p\n"),
1122 ACE_TEXT ("WFMO_Reactor")));
1124 #if defined (ACE_WIN32_VC8)
1125 # pragma warning (pop)
1126 #endif
1129 ACE_WFMO_Reactor::current_info (ACE_HANDLE, size_t &)
1131 return -1;
1135 ACE_WFMO_Reactor::open (size_t size,
1136 bool,
1137 ACE_Sig_Handler *sh,
1138 ACE_Timer_Queue *tq,
1139 int,
1140 ACE_Reactor_Notify *notify)
1142 // This GUARD is necessary since we are updating shared state.
1143 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
1145 // If we are already open, return -1
1146 if (this->open_for_business_)
1147 return -1;
1149 // Timer Queue
1150 if (this->delete_timer_queue_)
1151 delete this->timer_queue_;
1152 else if (this->timer_queue_)
1153 this->timer_queue_->close ();
1155 if (tq == 0)
1157 ACE_NEW_RETURN (this->timer_queue_,
1158 ACE_Timer_Heap,
1159 -1);
1160 this->delete_timer_queue_ = true;
1162 else
1164 this->timer_queue_ = tq;
1165 this->delete_timer_queue_ = false;
1168 // Signal Handler
1169 if (this->delete_signal_handler_)
1170 delete this->signal_handler_;
1172 if (sh == 0)
1174 ACE_NEW_RETURN (this->signal_handler_,
1175 ACE_Sig_Handler,
1176 -1);
1177 this->delete_signal_handler_ = true;
1179 else
1181 this->signal_handler_ = sh;
1182 this->delete_signal_handler_ = false;
1185 // Setup the atomic wait array (used later in <handle_events>)
1186 this->atomic_wait_array_[0] = this->lock_.lock ().proc_mutex_;
1187 this->atomic_wait_array_[1] = this->ok_to_wait_.handle ();
1189 // Prevent memory leaks when the ACE_WFMO_Reactor is reopened.
1190 if (this->delete_handler_rep_)
1192 if (this->handler_rep_.changes_required ())
1194 // Make necessary changes to the handler repository
1195 this->handler_rep_.make_changes ();
1196 // Turn off <wakeup_all_threads_> since all necessary changes
1197 // have completed
1198 this->wakeup_all_threads_.reset ();
1201 this->handler_rep_.~ACE_WFMO_Reactor_Handler_Repository ();
1204 // Open the handle repository. Two additional handles for internal
1205 // purposes
1206 if (this->handler_rep_.open (size + 2) == -1)
1207 ACELIB_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
1208 ACE_TEXT ("opening handler repository")),
1209 -1);
1210 else
1211 this->delete_handler_rep_ = true;
1213 if (this->notify_handler_ != 0 && this->delete_notify_handler_)
1214 delete this->notify_handler_;
1216 this->notify_handler_ = notify;
1218 if (this->notify_handler_ == 0)
1220 ACE_NEW_RETURN (this->notify_handler_,
1221 ACE_WFMO_Reactor_Notify,
1222 -1);
1224 if (this->notify_handler_ == 0)
1225 return -1;
1226 else
1227 this->delete_notify_handler_ = true;
1230 /* NOTE */
1231 // The order of the following two registrations is very important
1233 // Open the notification handler
1234 if (this->notify_handler_->open (this, this->timer_queue_) == -1)
1235 ACELIB_ERROR_RETURN ((LM_ERROR,
1236 ACE_TEXT ("%p\n"),
1237 ACE_TEXT ("opening notify handler ")),
1238 -1);
1240 // Register for <wakeup_all_threads> event
1241 if (this->register_handler (&this->wakeup_all_threads_handler_,
1242 this->wakeup_all_threads_.handle ()) == -1)
1243 ACELIB_ERROR_RETURN ((LM_ERROR,
1244 ACE_TEXT ("%p\n"),
1245 ACE_TEXT ("registering thread wakeup handler")),
1246 -1);
1248 // Since we have added two handles into the handler repository,
1249 // update the <handler_repository_>
1250 if (this->handler_rep_.changes_required ())
1252 // Make necessary changes to the handler repository
1253 this->handler_rep_.make_changes ();
1254 // Turn off <wakeup_all_threads_> since all necessary changes
1255 // have completed
1256 this->wakeup_all_threads_.reset ();
1259 // We are open for business
1260 this->open_for_business_ = true;
1262 return 0;
1266 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler *signal_handler)
1268 if (this->signal_handler_ != 0 && this->delete_signal_handler_)
1269 delete this->signal_handler_;
1270 this->signal_handler_ = signal_handler;
1271 this->delete_signal_handler_ = false;
1272 return 0;
1275 ACE_Timer_Queue *
1276 ACE_WFMO_Reactor::timer_queue (void) const
1278 return this->timer_queue_;
1282 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue *tq)
1284 if (this->delete_timer_queue_)
1286 delete this->timer_queue_;
1288 else if (this->timer_queue_)
1290 this->timer_queue_->close ();
1292 this->timer_queue_ = tq;
1293 this->delete_timer_queue_ = false;
1294 return 0;
1298 ACE_WFMO_Reactor::close (void)
1300 // This GUARD is necessary since we are updating shared state.
1301 ACE_GUARD_RETURN (ACE_Process_Mutex, ace_mon, this->lock_, -1);
1303 // If we are already closed, return error
1304 if (!this->open_for_business_)
1305 return -1;
1307 // We are now closed
1308 this->open_for_business_ = false;
1309 // This will unregister all handles
1310 this->handler_rep_.close ();
1312 // Make necessary changes to the handler repository that we caused
1313 // by the above actions. Someone who called close() is expecting that
1314 // things will be tidied up upon return.
1315 this->handler_rep_.make_changes ();
1317 if (this->delete_timer_queue_)
1319 delete this->timer_queue_;
1320 this->timer_queue_ = 0;
1321 this->delete_timer_queue_ = false;
1323 else if (this->timer_queue_)
1325 this->timer_queue_->close ();
1326 this->timer_queue_ = 0;
1329 if (this->delete_signal_handler_)
1331 delete this->signal_handler_;
1332 this->signal_handler_ = 0;
1333 this->delete_signal_handler_ = false;
1336 if (this->delete_notify_handler_)
1338 delete this->notify_handler_;
1339 this->notify_handler_ = 0;
1340 this->delete_notify_handler_ = false;
1343 return 0;
1346 ACE_WFMO_Reactor::~ACE_WFMO_Reactor (void)
1348 // Assumption: No threads are left in the Reactor when this method
1349 // is called (i.e., active_threads_ == 0)
1351 // Close down
1352 this->close ();
1356 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle,
1357 ACE_HANDLE io_handle,
1358 ACE_Event_Handler *event_handler,
1359 ACE_Reactor_Mask new_masks)
1361 // If this is a Winsock 1 system, the underlying event assignment will
1362 // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
1363 // reacting to socket activity.
1365 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
1367 ACE_UNUSED_ARG (event_handle);
1368 ACE_UNUSED_ARG (io_handle);
1369 ACE_UNUSED_ARG (event_handler);
1370 ACE_UNUSED_ARG (new_masks);
1371 ACE_NOTSUP_RETURN (-1);
1373 #else
1375 // Make sure that the <handle> is valid
1376 if (io_handle == ACE_INVALID_HANDLE)
1377 io_handle = event_handler->get_handle ();
1379 if (this->handler_rep_.invalid_handle (io_handle))
1381 errno = ERROR_INVALID_HANDLE;
1382 return -1;
1385 long new_network_events = 0;
1386 bool delete_event = false;
1387 #if defined (ACE_HAS_CPP11)
1388 std::unique_ptr <ACE_Auto_Event> event;
1389 #else
1390 auto_ptr <ACE_Auto_Event> event;
1391 #endif /* ACE_HAS_CPP11 */
1393 // Look up the repository to see if the <event_handler> is already
1394 // there.
1395 ACE_Reactor_Mask old_masks;
1396 int found = this->handler_rep_.modify_network_events_i (io_handle,
1397 new_masks,
1398 old_masks,
1399 new_network_events,
1400 event_handle,
1401 delete_event,
1402 ACE_Reactor::ADD_MASK);
1404 // Check to see if the user passed us a valid event; If not then we
1405 // need to create one
1406 if (event_handle == ACE_INVALID_HANDLE)
1408 #if defined (ACE_HAS_CPP11)
1409 std::unique_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
1410 event = std::move(tmp);
1411 #else
1412 // Note: don't change this since some C++ compilers have
1413 // <auto_ptr>s that don't work properly...
1414 auto_ptr<ACE_Auto_Event> tmp (new ACE_Auto_Event);
1415 event = tmp;
1416 #endif /* ACE_HAS_CPP11 */
1417 event_handle = event->handle ();
1418 delete_event = true;
1421 int result = ::WSAEventSelect ((SOCKET) io_handle,
1422 event_handle,
1423 new_network_events);
1424 // If we had found the <Event_Handler> there is nothing more to do
1425 if (found)
1426 return result;
1427 else if (result != SOCKET_ERROR &&
1428 this->handler_rep_.bind_i (1,
1429 event_handler,
1430 new_network_events,
1431 io_handle,
1432 event_handle,
1433 delete_event) != -1)
1435 // The <event_handler> was not found in the repository, add to
1436 // the repository.
1437 if (delete_event)
1439 // Clear out the handle in the ACE_Auto_Event so that when
1440 // it is destroyed, the handle isn't closed out from under
1441 // the reactor. After setting it, running down the event
1442 // (via auto_ptr<> event, above) at function return will
1443 // cause an error because it'll try to close an invalid handle.
1444 // To avoid that smashing the errno value, save the errno
1445 // here, explicitly remove the event so the dtor won't do it
1446 // again, then restore errno.
1447 ACE_Errno_Guard guard (errno);
1448 event->handle (ACE_INVALID_HANDLE);
1449 event->remove ();
1451 return 0;
1453 else
1454 return -1;
1456 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
1461 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle,
1462 ACE_Reactor_Mask new_masks,
1463 int operation)
1465 // Make sure that the <handle> is valid
1466 if (this->handler_rep_.invalid_handle (io_handle))
1467 return -1;
1469 long new_network_events = 0;
1470 bool delete_event = false;
1471 ACE_HANDLE event_handle = ACE_INVALID_HANDLE;
1473 // Look up the repository to see if the <Event_Handler> is already
1474 // there.
1475 ACE_Reactor_Mask old_masks;
1476 int found = this->handler_rep_.modify_network_events_i (io_handle,
1477 new_masks,
1478 old_masks,
1479 new_network_events,
1480 event_handle,
1481 delete_event,
1482 operation);
1483 if (found)
1485 int result = ::WSAEventSelect ((SOCKET) io_handle,
1486 event_handle,
1487 new_network_events);
1488 if (result == 0)
1489 return old_masks;
1490 else
1491 return result;
1493 else
1494 return -1;
1500 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle,
1501 ACE_Reactor_Mask new_masks,
1502 ACE_Reactor_Mask &old_masks,
1503 long &new_network_events,
1504 ACE_HANDLE &event_handle,
1505 bool &delete_event,
1506 int operation)
1508 long *modified_network_events = &new_network_events;
1509 int found = 0;
1510 size_t i;
1512 // First go through the current entries
1514 // Look for all entries in the current handles for matching handle
1515 // (except those that have been scheduled for deletion)
1516 for (i = 0; i < this->max_handlep1_ && !found; ++i)
1517 if (io_handle == this->current_info_[i].io_handle_ &&
1518 !this->current_info_[i].delete_entry_)
1520 found = 1;
1521 modified_network_events = &this->current_info_[i].network_events_;
1522 delete_event = this->current_info_[i].delete_event_;
1523 event_handle = this->current_handles_[i];
1526 // Then pass through the suspended handles
1528 // Look for all entries in the suspended handles for matching handle
1529 // (except those that have been scheduled for deletion)
1530 for (i = 0; i < this->suspended_handles_ && !found; ++i)
1531 if (io_handle == this->current_suspended_info_[i].io_handle_ &&
1532 !this->current_suspended_info_[i].delete_entry_)
1534 found = 1;
1535 modified_network_events = &this->current_suspended_info_[i].network_events_;
1536 delete_event = this->current_suspended_info_[i].delete_event_;
1537 event_handle = this->current_suspended_info_[i].event_handle_;
1540 // Then check the to_be_added handles
1542 // Look for all entries in the to_be_added handles for matching
1543 // handle (except those that have been scheduled for deletion)
1544 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
1545 if (io_handle == this->to_be_added_info_[i].io_handle_ &&
1546 !this->to_be_added_info_[i].delete_entry_)
1548 found = 1;
1549 modified_network_events = &this->to_be_added_info_[i].network_events_;
1550 delete_event = this->to_be_added_info_[i].delete_event_;
1551 event_handle = this->to_be_added_info_[i].event_handle_;
1554 old_masks = this->bit_ops (*modified_network_events,
1555 new_masks,
1556 operation);
1558 new_network_events = *modified_network_events;
1560 return found;
1563 ACE_Event_Handler *
1564 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle)
1566 long existing_masks_ignored = 0;
1567 return this->handler (handle, existing_masks_ignored);
1570 ACE_Event_Handler *
1571 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
1572 long &existing_masks)
1574 int found = 0;
1575 size_t i = 0;
1576 ACE_Event_Handler *event_handler = 0;
1577 existing_masks = 0;
1579 // Look for the handle first
1581 // First go through the current entries
1583 // Look for all entries in the current handles for matching handle
1584 // (except those that have been scheduled for deletion)
1585 for (i = 0; i < this->max_handlep1_ && !found; ++i)
1586 if ((handle == this->current_info_[i].io_handle_ ||
1587 handle == this->current_handles_[i]) &&
1588 !this->current_info_[i].delete_entry_)
1590 found = 1;
1591 event_handler = this->current_info_[i].event_handler_;
1592 existing_masks = this->current_info_[i].network_events_;
1595 // Then pass through the suspended handles
1597 // Look for all entries in the suspended handles for matching handle
1598 // (except those that have been scheduled for deletion)
1599 for (i = 0; i < this->suspended_handles_ && !found; ++i)
1600 if ((handle == this->current_suspended_info_[i].io_handle_ ||
1601 handle == this->current_suspended_info_[i].event_handle_) &&
1602 !this->current_suspended_info_[i].delete_entry_)
1604 found = 1;
1605 event_handler = this->current_suspended_info_[i].event_handler_;
1606 existing_masks = this->current_suspended_info_[i].network_events_;
1609 // Then check the to_be_added handles
1611 // Look for all entries in the to_be_added handles for matching
1612 // handle (except those that have been scheduled for deletion)
1613 for (i = 0; i < this->handles_to_be_added_ && !found; ++i)
1614 if ((handle == this->to_be_added_info_[i].io_handle_ ||
1615 handle == this->to_be_added_info_[i].event_handle_) &&
1616 !this->to_be_added_info_[i].delete_entry_)
1618 found = 1;
1619 event_handler = this->to_be_added_info_[i].event_handler_;
1620 existing_masks = this->to_be_added_info_[i].network_events_;
1623 if (event_handler)
1624 event_handler->add_reference ();
1626 return event_handler;
1630 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle,
1631 ACE_Reactor_Mask user_masks,
1632 ACE_Event_Handler **user_event_handler)
1634 long existing_masks = 0;
1635 int found = 0;
1637 ACE_Event_Handler_var safe_event_handler =
1638 this->handler (handle,
1639 existing_masks);
1641 if (safe_event_handler.handler ())
1642 found = 1;
1644 if (!found)
1645 return -1;
1647 // Otherwise, make sure that the masks that the user is looking for
1648 // are on.
1649 if (found &&
1650 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::READ_MASK))
1651 if (!ACE_BIT_ENABLED (existing_masks, FD_READ) &&
1652 !ACE_BIT_ENABLED (existing_masks, FD_CLOSE))
1653 found = 0;
1655 if (found &&
1656 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::WRITE_MASK))
1657 if (!ACE_BIT_ENABLED (existing_masks, FD_WRITE))
1658 found = 0;
1660 if (found &&
1661 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::EXCEPT_MASK))
1662 if (!ACE_BIT_ENABLED (existing_masks, FD_OOB))
1663 found = 0;
1665 if (found &&
1666 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::ACCEPT_MASK))
1667 if (!ACE_BIT_ENABLED (existing_masks, FD_ACCEPT))
1668 found = 0;
1670 if (found &&
1671 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::CONNECT_MASK))
1672 if (!ACE_BIT_ENABLED (existing_masks, FD_CONNECT))
1673 found = 0;
1675 if (found &&
1676 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::QOS_MASK))
1677 if (!ACE_BIT_ENABLED (existing_masks, FD_QOS))
1678 found = 0;
1680 if (found &&
1681 ACE_BIT_ENABLED (user_masks, ACE_Event_Handler::GROUP_QOS_MASK))
1682 if (!ACE_BIT_ENABLED (existing_masks, FD_GROUP_QOS))
1683 found = 0;
1685 if (found &&
1686 user_event_handler)
1687 *user_event_handler = safe_event_handler.release ();
1689 if (found)
1690 return 0;
1691 else
1692 return -1;
1695 // Waits for and dispatches all events. Returns -1 on error, 0 if
1696 // max_wait_time expired, or the number of events that were dispatched.
1698 ACE_WFMO_Reactor::event_handling (ACE_Time_Value *max_wait_time,
1699 int alertable)
1701 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
1703 // Make sure we are not closed
1704 if (!this->open_for_business_ || this->deactivated_)
1706 errno = ESHUTDOWN;
1707 return -1;
1710 // Stash the current time -- the destructor of this object will
1711 // automatically compute how much time elapsed since this method was
1712 // called.
1713 ACE_Countdown_Time countdown (max_wait_time);
1715 int result;
1718 // Check to see if it is ok to enter ::WaitForMultipleObjects
1719 // This will acquire <this->lock_> on success On failure, the
1720 // lock will not be acquired
1721 result = this->ok_to_wait (max_wait_time, alertable);
1722 if (result != 1)
1723 return result;
1725 // Increment the number of active threads
1726 ++this->active_threads_;
1728 // Release the <lock_>
1729 this->lock_.release ();
1731 // Update the countdown to reflect time waiting to play with the
1732 // mut and event.
1733 countdown.update ();
1735 // Calculate timeout
1736 int timeout = this->calculate_timeout (max_wait_time);
1738 // Wait for event to happen
1739 DWORD wait_status = this->wait_for_multiple_events (timeout,
1740 alertable);
1742 // Upcall
1743 result = this->safe_dispatch (wait_status);
1744 if (0 == result)
1746 // wait_for_multiple_events timed out without dispatching
1747 // anything. Because of rounding and conversion errors and
1748 // such, it could be that the wait loop timed out, but
1749 // the timer queue said it wasn't quite ready to expire a
1750 // timer. In this case, max_wait_time won't have quite been
1751 // reduced to 0, and we need to go around again. If max_wait_time
1752 // is all the way to 0, just return, as the entire time the
1753 // caller wanted to wait has been used up.
1754 countdown.update (); // Reflect time waiting for events
1755 if (0 == max_wait_time || max_wait_time->usec () == 0)
1756 break;
1759 while (result == 0);
1761 return result;
1765 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value *max_wait_time,
1766 int alertable)
1768 // Calculate the max time we should spend here
1770 // Note: There is really no need to involve the <timer_queue_> here
1771 // because even if a timeout in the <timer_queue_> does expire we
1772 // will not be able to dispatch it
1774 // We need to wait for both the <lock_> and <ok_to_wait_> event.
1775 // If not on WinCE, use WaitForMultipleObjects() to wait for both atomically.
1776 // On WinCE, the waitAll arg to WFMO must be false, so wait for the
1777 // ok_to_wait_ event first (since that's likely to take the longest) then
1778 // grab the lock and recheck the ok_to_wait_ event. When we can get them
1779 // both, or there's an error/timeout, return.
1780 #if defined (ACE_HAS_WINCE)
1781 ACE_UNUSED_ARG (alertable);
1782 ACE_Time_Value timeout;
1783 if (max_wait_time != 0)
1785 timeout = ACE_OS::gettimeofday ();
1786 timeout += *max_wait_time;
1788 while (1)
1790 int status;
1791 if (max_wait_time == 0)
1792 status = this->ok_to_wait_.wait ();
1793 else
1794 status = this->ok_to_wait_.wait (&timeout);
1795 if (status == -1)
1796 return -1;
1797 // The event is signaled, so it's ok to wait; grab the lock and
1798 // recheck the event. If something has changed, restart the wait.
1799 if (max_wait_time == 0)
1800 status = this->lock_.acquire ();
1801 else
1803 status = this->lock_.acquire (timeout);
1805 if (status == -1)
1806 return -1;
1808 // Have the lock_, now re-check the event. If it's not signaled,
1809 // another thread changed something so go back and wait again.
1810 if (this->ok_to_wait_.wait (&ACE_Time_Value::zero, 0) == 0)
1811 break;
1812 this->lock_.release ();
1814 return 1;
1816 #else
1817 int timeout = max_wait_time == 0 ? INFINITE : max_wait_time->msec ();
1818 DWORD result = 0;
1819 while (1)
1821 # if defined (ACE_HAS_PHARLAP)
1822 // PharLap doesn't implement WaitForMultipleObjectsEx, and doesn't
1823 // do async I/O, so it's not needed in this case anyway.
1824 result = ::WaitForMultipleObjects (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
1825 this->atomic_wait_array_,
1826 TRUE,
1827 timeout);
1829 if (result != WAIT_IO_COMPLETION)
1830 break;
1832 # else
1833 result = ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_ / sizeof (ACE_HANDLE),
1834 this->atomic_wait_array_,
1835 TRUE,
1836 timeout,
1837 alertable);
1839 if (result != WAIT_IO_COMPLETION)
1840 break;
1842 # endif /* ACE_HAS_PHARLAP */
1845 switch (result)
1847 case WAIT_TIMEOUT:
1848 errno = ETIME;
1849 return 0;
1850 case WAIT_FAILED:
1851 case WAIT_ABANDONED_0:
1852 ACE_OS::set_errno_to_last_error ();
1853 return -1;
1854 default:
1855 break;
1858 // It is ok to enter ::WaitForMultipleObjects
1859 return 1;
1860 #endif /* ACE_HAS_WINCE */
1863 DWORD
1864 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout,
1865 int alertable)
1867 // Wait for any of handles_ to be active, or until timeout expires.
1868 // If <alertable> is enabled allow asynchronous completion of
1869 // ReadFile and WriteFile operations.
1871 #if defined (ACE_HAS_PHARLAP) || defined (ACE_HAS_WINCE)
1872 // PharLap doesn't do async I/O and doesn't implement
1873 // WaitForMultipleObjectsEx, so use WaitForMultipleObjects.
1874 ACE_UNUSED_ARG (alertable);
1875 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 (),
1876 this->handler_rep_.handles (),
1877 FALSE,
1878 timeout);
1879 #else
1880 return ::WaitForMultipleObjectsEx (this->handler_rep_.max_handlep1 (),
1881 this->handler_rep_.handles (),
1882 FALSE,
1883 timeout,
1884 alertable);
1885 #endif /* ACE_HAS_PHARLAP */
1888 DWORD
1889 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot)
1891 return ::WaitForMultipleObjects (this->handler_rep_.max_handlep1 () - slot,
1892 this->handler_rep_.handles () + slot,
1893 FALSE,
1898 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value *max_wait_time)
1900 ACE_Time_Value *time = 0;
1901 if (this->owner_ == ACE_Thread::self ())
1902 time = this->timer_queue_->calculate_timeout (max_wait_time);
1903 else
1904 time = max_wait_time;
1906 if (time == 0)
1907 return INFINITE;
1908 else
1909 return time->msec ();
1914 ACE_WFMO_Reactor::expire_timers (void)
1916 // If "owner" thread
1917 if (ACE_Thread::self () == this->owner_)
1918 // expire all pending timers.
1919 return this->timer_queue_->expire ();
1921 else
1922 // Nothing to expire
1923 return 0;
1927 ACE_WFMO_Reactor::dispatch (DWORD wait_status)
1929 // Expire timers
1930 int handlers_dispatched = this->expire_timers ();
1932 switch (wait_status)
1934 case WAIT_FAILED: // Failure.
1935 ACE_OS::set_errno_to_last_error ();
1936 return -1;
1938 case WAIT_TIMEOUT: // Timeout.
1939 errno = ETIME;
1940 return handlers_dispatched;
1942 #ifndef ACE_HAS_WINCE
1943 case WAIT_IO_COMPLETION: // APC.
1944 return handlers_dispatched;
1945 #endif // ACE_HAS_WINCE
1947 default: // Dispatch.
1948 // We'll let dispatch worry about abandoned mutes.
1949 handlers_dispatched += this->dispatch_handles (wait_status);
1950 return handlers_dispatched;
1954 // Dispatches any active handles from <handles_[slot]> to
1955 // <handles_[max_handlep1_]>, polling through our handle set looking
1956 // for active handles.
1958 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status)
1960 // dispatch_slot is the absolute slot. Only += is used to
1961 // increment it.
1962 DWORD dispatch_slot = 0;
1964 // Cache this value, this is the absolute value.
1965 DWORD const max_handlep1 = this->handler_rep_.max_handlep1 ();
1967 // nCount starts off at <max_handlep1>, this is a transient count of
1968 // handles last waited on.
1969 DWORD nCount = max_handlep1;
1971 for (int number_of_handlers_dispatched = 1;
1973 ++number_of_handlers_dispatched)
1975 const bool ok = (
1976 #if ! defined(__BORLANDC__) \
1977 && !defined (__MINGW32__) \
1978 && !defined (_MSC_VER)
1979 // wait_status is unsigned in Borland, Green Hills,
1980 // mingw32 and MSVC++
1981 // This >= is always true, with a warning.
1982 wait_status >= WAIT_OBJECT_0 &&
1983 #endif
1984 wait_status <= (WAIT_OBJECT_0 + nCount));
1986 if (ok)
1987 dispatch_slot += wait_status - WAIT_OBJECT_0;
1988 else
1989 // Otherwise, a handle was abandoned.
1990 dispatch_slot += wait_status - WAIT_ABANDONED_0;
1992 // Dispatch handler
1993 if (this->dispatch_handler (dispatch_slot, max_handlep1) == -1)
1994 return -1;
1996 // Increment slot
1997 ++dispatch_slot;
1999 // We're done.
2000 if (dispatch_slot >= max_handlep1)
2001 return number_of_handlers_dispatched;
2003 // Readjust nCount
2004 nCount = max_handlep1 - dispatch_slot;
2006 // Check the remaining handles
2007 wait_status = this->poll_remaining_handles (dispatch_slot);
2008 switch (wait_status)
2010 case WAIT_FAILED: // Failure.
2011 ACE_OS::set_errno_to_last_error ();
2012 /* FALLTHRU */
2013 case WAIT_TIMEOUT:
2014 // There are no more handles ready, we can return.
2015 return number_of_handlers_dispatched;
2021 ACE_WFMO_Reactor::dispatch_handler (DWORD slot,
2022 DWORD max_handlep1)
2024 // Check if there are window messages that need to be dispatched
2025 if (slot == max_handlep1)
2026 return this->dispatch_window_messages ();
2028 // Dispatch the handler if it has not been scheduled for deletion.
2029 // Note that this is a very week test if there are multiple threads
2030 // dispatching this slot as no locks are held here. Generally, you
2031 // do not want to do something like deleting the this pointer in
2032 // handle_close() if you have registered multiple times and there is
2033 // more than one thread in WFMO_Reactor->handle_events().
2034 else if (!this->handler_rep_.scheduled_for_deletion (slot))
2036 ACE_HANDLE event_handle = *(this->handler_rep_.handles () + slot);
2038 if (this->handler_rep_.current_info ()[slot].io_entry_)
2039 return this->complex_dispatch_handler (slot,
2040 event_handle);
2041 else
2042 return this->simple_dispatch_handler (slot,
2043 event_handle);
2045 else
2046 // The handle was scheduled for deletion, so we will skip it.
2047 return 0;
2051 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot,
2052 ACE_HANDLE event_handle)
2054 // This dispatch is used for non-I/O entires
2056 // Assign the ``signaled'' HANDLE so that callers can get it.
2057 // siginfo_t is an ACE - specific fabrication. Constructor exists.
2058 siginfo_t sig (event_handle);
2060 ACE_Event_Handler *event_handler =
2061 this->handler_rep_.current_info ()[slot].event_handler_;
2063 int requires_reference_counting =
2064 event_handler->reference_counting_policy ().value () ==
2065 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
2067 if (requires_reference_counting)
2069 event_handler->add_reference ();
2072 // Upcall
2073 if (event_handler->handle_signal (0, &sig) == -1)
2074 this->handler_rep_.unbind (event_handle,
2075 ACE_Event_Handler::NULL_MASK);
2077 // Call remove_reference() if needed.
2078 if (requires_reference_counting)
2080 event_handler->remove_reference ();
2083 return 0;
2087 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot,
2088 ACE_HANDLE event_handle)
2090 // This dispatch is used for I/O entires.
2092 ACE_WFMO_Reactor_Handler_Repository::Current_Info &current_info =
2093 this->handler_rep_.current_info ()[slot];
2095 WSANETWORKEVENTS events;
2096 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
2097 if (::WSAEnumNetworkEvents ((SOCKET) current_info.io_handle_,
2098 event_handle,
2099 &events) == SOCKET_ERROR)
2100 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
2101 else
2103 // Prepare for upcalls. Clear the bits from <events> representing
2104 // events the handler is not interested in. If there are any left,
2105 // do the upcall(s). upcall will replace events.lNetworkEvents
2106 // with bits representing any functions that requested a repeat
2107 // callback before checking handles again. In this case, continue
2108 // to call back unless the handler is unregistered as a result of
2109 // one of the upcalls. The way this is written, the upcalls will
2110 // keep being done even if one or more upcalls reported problems.
2111 // In practice this may turn out not so good, but let's see. If any
2112 // problems, please notify Steve Huston <shuston@riverace.com>
2113 // before or after you change this code.
2114 events.lNetworkEvents &= current_info.network_events_;
2115 while (events.lNetworkEvents != 0)
2117 ACE_Event_Handler *event_handler =
2118 current_info.event_handler_;
2120 int reference_counting_required =
2121 event_handler->reference_counting_policy ().value () ==
2122 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
2124 // Call add_reference() if needed.
2125 if (reference_counting_required)
2127 event_handler->add_reference ();
2130 // Upcall
2131 problems |= this->upcall (current_info.event_handler_,
2132 current_info.io_handle_,
2133 events);
2135 // Call remove_reference() if needed.
2136 if (reference_counting_required)
2138 event_handler->remove_reference ();
2141 if (this->handler_rep_.scheduled_for_deletion (slot))
2142 break;
2146 if (problems != ACE_Event_Handler::NULL_MASK
2147 && !this->handler_rep_.scheduled_for_deletion (slot) )
2148 this->handler_rep_.unbind (event_handle, problems);
2150 return 0;
2153 ACE_Reactor_Mask
2154 ACE_WFMO_Reactor::upcall (ACE_Event_Handler *event_handler,
2155 ACE_HANDLE io_handle,
2156 WSANETWORKEVENTS &events)
2158 // This method figures out what exactly has happened to the socket
2159 // and then calls appropriate methods.
2160 ACE_Reactor_Mask problems = ACE_Event_Handler::NULL_MASK;
2162 // Go through the events and do the indicated upcalls. If the handler
2163 // doesn't want to be called back, clear the bit for that event.
2164 // At the end, set the bits back to <events> to request a repeat call.
2166 long actual_events = events.lNetworkEvents;
2167 int action;
2169 if (ACE_BIT_ENABLED (actual_events, FD_WRITE))
2171 action = event_handler->handle_output (io_handle);
2172 if (action <= 0)
2174 ACE_CLR_BITS (actual_events, FD_WRITE);
2175 if (action == -1)
2176 ACE_SET_BITS (problems, ACE_Event_Handler::WRITE_MASK);
2180 if (ACE_BIT_ENABLED (actual_events, FD_CONNECT))
2182 if (events.iErrorCode[FD_CONNECT_BIT] == 0)
2184 // Successful connect
2185 action = event_handler->handle_output (io_handle);
2186 if (action <= 0)
2188 ACE_CLR_BITS (actual_events, FD_CONNECT);
2189 if (action == -1)
2190 ACE_SET_BITS (problems,
2191 ACE_Event_Handler::CONNECT_MASK);
2194 // Unsuccessful connect
2195 else
2197 action = event_handler->handle_input (io_handle);
2198 if (action <= 0)
2200 ACE_CLR_BITS (actual_events, FD_CONNECT);
2201 if (action == -1)
2202 ACE_SET_BITS (problems,
2203 ACE_Event_Handler::CONNECT_MASK);
2208 if (ACE_BIT_ENABLED (actual_events, FD_OOB))
2210 action = event_handler->handle_exception (io_handle);
2211 if (action <= 0)
2213 ACE_CLR_BITS (actual_events, FD_OOB);
2214 if (action == -1)
2215 ACE_SET_BITS (problems, ACE_Event_Handler::EXCEPT_MASK);
2219 if (ACE_BIT_ENABLED (actual_events, FD_READ))
2221 action = event_handler->handle_input (io_handle);
2222 if (action <= 0)
2224 ACE_CLR_BITS (actual_events, FD_READ);
2225 if (action == -1)
2226 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
2230 if (ACE_BIT_ENABLED (actual_events, FD_CLOSE)
2231 && ACE_BIT_DISABLED (problems, ACE_Event_Handler::READ_MASK))
2233 action = event_handler->handle_input (io_handle);
2234 if (action <= 0)
2236 ACE_CLR_BITS (actual_events, FD_CLOSE);
2237 if (action == -1)
2238 ACE_SET_BITS (problems, ACE_Event_Handler::READ_MASK);
2242 if (ACE_BIT_ENABLED (actual_events, FD_ACCEPT))
2244 action = event_handler->handle_input (io_handle);
2245 if (action <= 0)
2247 ACE_CLR_BITS (actual_events, FD_ACCEPT);
2248 if (action == -1)
2249 ACE_SET_BITS (problems, ACE_Event_Handler::ACCEPT_MASK);
2253 if (ACE_BIT_ENABLED (actual_events, FD_QOS))
2255 action = event_handler->handle_qos (io_handle);
2256 if (action <= 0)
2258 ACE_CLR_BITS (actual_events, FD_QOS);
2259 if (action == -1)
2260 ACE_SET_BITS (problems, ACE_Event_Handler::QOS_MASK);
2264 if (ACE_BIT_ENABLED (actual_events, FD_GROUP_QOS))
2266 action = event_handler->handle_group_qos (io_handle);
2267 if (action <= 0)
2269 ACE_CLR_BITS (actual_events, FD_GROUP_QOS);
2270 if (action == -1)
2271 ACE_SET_BITS (problems, ACE_Event_Handler::GROUP_QOS_MASK);
2275 events.lNetworkEvents = actual_events;
2276 return problems;
2281 ACE_WFMO_Reactor::update_state (void)
2283 // This GUARD is necessary since we are updating shared state.
2284 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
2286 // Decrement active threads
2287 --this->active_threads_;
2289 // Check if the state of the handler repository has changed or new
2290 // owner has to be set
2291 if (this->handler_rep_.changes_required () || this->new_owner ())
2293 if (this->change_state_thread_ == 0)
2294 // Try to become the thread which will be responsible for the
2295 // changes
2297 this->change_state_thread_ = ACE_Thread::self ();
2298 // Make sure no new threads are allowed to enter
2299 this->ok_to_wait_.reset ();
2301 if (this->active_threads_ > 0)
2302 // Check for other active threads
2304 // Wake up all other threads
2305 this->wakeup_all_threads_.signal ();
2306 // Release <lock_>
2307 monitor.release ();
2308 // Go to sleep waiting for all other threads to get done
2309 this->waiting_to_change_state_.wait ();
2310 // Re-acquire <lock_> again
2311 monitor.acquire ();
2314 // Note that make_changes() calls into user code which can
2315 // request other changes. So keep looping until all
2316 // requested changes are completed.
2317 while (this->handler_rep_.changes_required ())
2318 // Make necessary changes to the handler repository
2319 this->handler_rep_.make_changes ();
2320 if (this->new_owner ())
2321 // Update the owner
2322 this->change_owner ();
2323 // Turn off <wakeup_all_threads_>
2324 this->wakeup_all_threads_.reset ();
2325 // Let everyone know that it is ok to go ahead
2326 this->ok_to_wait_.signal ();
2327 // Reset this flag
2328 this->change_state_thread_ = 0;
2330 else if (this->active_threads_ == 0)
2331 // This thread did not get a chance to become the change
2332 // thread. If it is the last one out, it will wakeup the
2333 // change thread
2334 this->waiting_to_change_state_.signal ();
2336 // This is if we were woken up explicitily by the user and there are
2337 // no state changes required.
2338 else if (this->active_threads_ == 0)
2339 // Turn off <wakeup_all_threads_>
2340 this->wakeup_all_threads_.reset ();
2342 return 0;
2345 void
2346 ACE_WFMO_Reactor::dump (void) const
2348 #if defined (ACE_HAS_DUMP)
2349 ACE_TRACE ("ACE_WFMO_Reactor::dump");
2351 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
2353 ACELIB_DEBUG ((LM_DEBUG,
2354 ACE_TEXT ("Count of currently active threads = %d\n"),
2355 this->active_threads_));
2357 ACELIB_DEBUG ((LM_DEBUG,
2358 ACE_TEXT ("ID of owner thread = %d\n"),
2359 this->owner_));
2361 this->handler_rep_.dump ();
2362 this->signal_handler_->dump ();
2363 this->timer_queue_->dump ();
2365 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
2366 #endif /* ACE_HAS_DUMP */
2370 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
2371 ACE_Handle_Set & /*rd_mask*/)
2373 return -1;
2377 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer & /*buffer*/)
2379 return 0;
2382 ACE_HANDLE
2383 ACE_WFMO_Reactor_Notify::notify_handle (void)
2385 return ACE_INVALID_HANDLE;
2389 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE ,
2390 ACE_Notification_Buffer &)
2392 return 0;
2396 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer &)
2398 return 0;
2402 ACE_WFMO_Reactor_Notify::close (void)
2404 return -1;
2407 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies)
2408 : timer_queue_ (0),
2409 message_queue_ (max_notifies * sizeof (ACE_Notification_Buffer),
2410 max_notifies * sizeof (ACE_Notification_Buffer)),
2411 max_notify_iterations_ (-1)
2416 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl *wfmo_reactor,
2417 ACE_Timer_Queue *timer_queue,
2418 int ignore_notify)
2420 ACE_UNUSED_ARG (ignore_notify);
2421 timer_queue_ = timer_queue;
2422 return wfmo_reactor->register_handler (this);
2425 ACE_HANDLE
2426 ACE_WFMO_Reactor_Notify::get_handle (void) const
2428 return this->wakeup_one_thread_.handle ();
2431 // Handle all pending notifications.
2434 ACE_WFMO_Reactor_Notify::handle_signal (int signum,
2435 siginfo_t *siginfo,
2436 ucontext_t *)
2438 ACE_UNUSED_ARG (signum);
2440 // Just check for sanity...
2441 if (siginfo->si_handle_ != this->wakeup_one_thread_.handle ())
2442 return -1;
2444 // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
2445 // is signaled.
2446 // ACELIB_DEBUG ((LM_DEBUG,
2447 // ACE_TEXT ("(%t) waking up to handle internal notifications\n")));
2449 for (int i = 1; ; ++i)
2451 ACE_Message_Block *mb = 0;
2452 // Copy ACE_Time_Value::zero since dequeue_head will modify it.
2453 ACE_Time_Value zero_timeout (ACE_Time_Value::zero);
2454 if (this->message_queue_.dequeue_head (mb, &zero_timeout) == -1)
2456 if (errno == EWOULDBLOCK)
2457 // We've reached the end of the processing, return
2458 // normally.
2459 return 0;
2460 else
2461 return -1; // Something weird happened...
2463 else
2465 ACE_Notification_Buffer *buffer =
2466 reinterpret_cast <ACE_Notification_Buffer *> (mb->base ());
2468 // If eh == 0 then we've got major problems! Otherwise, we
2469 // need to dispatch the appropriate handle_* method on the
2470 // ACE_Event_Handler pointer we've been passed.
2472 if (buffer->eh_ != 0)
2474 ACE_Event_Handler *event_handler =
2475 buffer->eh_;
2477 bool const requires_reference_counting =
2478 event_handler->reference_counting_policy ().value () ==
2479 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
2481 int result = 0;
2483 switch (buffer->mask_)
2485 case ACE_Event_Handler::READ_MASK:
2486 case ACE_Event_Handler::ACCEPT_MASK:
2487 result = event_handler->handle_input (ACE_INVALID_HANDLE);
2488 break;
2489 case ACE_Event_Handler::WRITE_MASK:
2490 result = event_handler->handle_output (ACE_INVALID_HANDLE);
2491 break;
2492 case ACE_Event_Handler::EXCEPT_MASK:
2493 result = event_handler->handle_exception (ACE_INVALID_HANDLE);
2494 break;
2495 case ACE_Event_Handler::QOS_MASK:
2496 result = event_handler->handle_qos (ACE_INVALID_HANDLE);
2497 break;
2498 case ACE_Event_Handler::GROUP_QOS_MASK:
2499 result = event_handler->handle_group_qos (ACE_INVALID_HANDLE);
2500 break;
2501 default:
2502 ACELIB_ERROR ((LM_ERROR,
2503 ACE_TEXT ("invalid mask = %d\n"),
2504 buffer->mask_));
2505 break;
2508 if (result == -1)
2509 event_handler->handle_close (ACE_INVALID_HANDLE,
2510 ACE_Event_Handler::EXCEPT_MASK);
2512 if (requires_reference_counting)
2514 event_handler->remove_reference ();
2518 // Make sure to delete the memory regardless of success or
2519 // failure!
2520 mb->release ();
2522 // Bail out if we've reached the <max_notify_iterations_>.
2523 // Note that by default <max_notify_iterations_> is -1, so
2524 // we'll loop until we're done.
2525 if (i == this->max_notify_iterations_)
2527 // If there are still notification in the queue, we need
2528 // to wake up again
2529 if (!this->message_queue_.is_empty ())
2530 this->wakeup_one_thread_.signal ();
2532 // Break the loop as we have reached max_notify_iterations_
2533 return 0;
2539 // Notify the WFMO_Reactor, potentially enqueueing the
2540 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
2541 // thread of control.
2544 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler *event_handler,
2545 ACE_Reactor_Mask mask,
2546 ACE_Time_Value *timeout)
2548 if (event_handler != 0)
2550 ACE_Message_Block *mb = 0;
2551 ACE_NEW_RETURN (mb,
2552 ACE_Message_Block (sizeof (ACE_Notification_Buffer)),
2553 -1);
2555 ACE_Notification_Buffer *buffer =
2556 (ACE_Notification_Buffer *) mb->base ();
2557 buffer->eh_ = event_handler;
2558 buffer->mask_ = mask;
2560 // Convert from relative time to absolute time by adding the
2561 // current time of day. This is what <ACE_Message_Queue>
2562 // expects.
2563 if (timeout != 0)
2564 *timeout += timer_queue_->gettimeofday ();
2566 if (this->message_queue_.enqueue_tail
2567 (mb, timeout) == -1)
2569 mb->release ();
2570 return -1;
2573 event_handler->add_reference ();
2576 return this->wakeup_one_thread_.signal ();
2579 void
2580 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations)
2582 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2583 // Must always be > 0 or < 0 to optimize the loop exit condition.
2584 if (iterations == 0)
2585 iterations = 1;
2587 this->max_notify_iterations_ = iterations;
2591 ACE_WFMO_Reactor_Notify::max_notify_iterations (void)
2593 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2594 return this->max_notify_iterations_;
2598 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler *eh,
2599 ACE_Reactor_Mask mask)
2601 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
2603 // Go over message queue and take out all the matching event
2604 // handlers. If eh == 0, purge all. Note that reactor notifies (no
2605 // handler specified) are never purged, as this may lose a needed
2606 // notify the reactor queued for itself.
2608 if (this->message_queue_.is_empty ())
2609 return 0;
2611 // Guard against new and/or delivered notifications while purging.
2612 // WARNING!!! The use of the notification queue's lock object for
2613 // this guard makes use of the knowledge that on Win32, the mutex
2614 // protecting the queue is really a CriticalSection, which is
2615 // recursive. This is how we can get away with locking it down here
2616 // and still calling member functions on the queue object.
2617 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, monitor, this->message_queue_.lock(), -1);
2619 // first, copy all to our own local queue. Since we've locked everyone out
2620 // of here, there's no need to use any synchronization on this queue.
2621 ACE_Message_Queue<ACE_NULL_SYNCH> local_queue;
2623 size_t queue_size = this->message_queue_.message_count ();
2624 int number_purged = 0;
2626 size_t index;
2628 for (index = 0; index < queue_size; ++index)
2630 ACE_Message_Block *mb = 0;
2631 if (-1 == this->message_queue_.dequeue_head (mb))
2632 return -1; // This shouldn't happen...
2634 ACE_Notification_Buffer *buffer =
2635 reinterpret_cast<ACE_Notification_Buffer *> (mb->base ());
2637 // If this is not a Reactor notify (it is for a particular handler),
2638 // and it matches the specified handler (or purging all),
2639 // and applying the mask would totally eliminate the notification, then
2640 // release it and count the number purged.
2641 if ((0 != buffer->eh_) &&
2642 (0 == eh || eh == buffer->eh_) &&
2643 ACE_BIT_DISABLED (buffer->mask_, ~mask)) // the existing notification mask
2644 // is left with nothing when
2645 // applying the mask
2647 ACE_Event_Handler *event_handler = buffer->eh_;
2649 event_handler->remove_reference ();
2651 mb->release ();
2652 ++number_purged;
2654 else
2656 // To preserve it, move it to the local_queue. But first, if
2657 // this is not a Reactor notify (it is for a
2658 // particularhandler), and it matches the specified handler
2659 // (or purging all), then apply the mask
2660 if ((0 != buffer->eh_) &&
2661 (0 == eh || eh == buffer->eh_))
2662 ACE_CLR_BITS(buffer->mask_, mask);
2663 if (-1 == local_queue.enqueue_head (mb))
2664 return -1;
2668 if (this->message_queue_.message_count ())
2669 { // Should be empty!
2670 ACE_ASSERT (0);
2671 return -1;
2674 // Now copy back from the local queue to the class queue, taking
2675 // care to preserve the original order...
2676 queue_size = local_queue.message_count ();
2677 for (index = 0; index < queue_size; ++index)
2679 ACE_Message_Block *mb = 0;
2680 if (-1 == local_queue.dequeue_head (mb))
2682 ACE_ASSERT (0);
2683 return -1;
2686 if (-1 == this->message_queue_.enqueue_head (mb))
2688 ACE_ASSERT (0);
2689 return -1;
2693 return number_purged;
2696 void
2697 ACE_WFMO_Reactor_Notify::dump (void) const
2699 #if defined (ACE_HAS_DUMP)
2700 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
2701 ACELIB_DEBUG ((LM_DEBUG, ACE_BEGIN_DUMP, this));
2702 this->timer_queue_->dump ();
2703 ACELIB_DEBUG ((LM_DEBUG,
2704 ACE_TEXT ("Max. iteration: %d\n"),
2705 this->max_notify_iterations_));
2706 ACELIB_DEBUG ((LM_DEBUG, ACE_END_DUMP));
2707 #endif /* ACE_HAS_DUMP */
2710 void
2711 ACE_WFMO_Reactor::max_notify_iterations (int iterations)
2713 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2714 ACE_GUARD (ACE_Process_Mutex, monitor, this->lock_);
2716 // Must always be > 0 or < 0 to optimize the loop exit condition.
2717 this->notify_handler_->max_notify_iterations (iterations);
2721 ACE_WFMO_Reactor::max_notify_iterations (void)
2723 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2724 ACE_GUARD_RETURN (ACE_Process_Mutex, monitor, this->lock_, -1);
2726 return this->notify_handler_->max_notify_iterations ();
2730 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler *eh,
2731 ACE_Reactor_Mask mask)
2733 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
2734 if (this->notify_handler_ == 0)
2735 return 0;
2736 else
2737 return this->notify_handler_->purge_pending_notifications (eh, mask);
2741 ACE_WFMO_Reactor::resumable_handler (void)
2743 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
2744 return 0;
2748 // No-op WinSOCK2 methods to help WFMO_Reactor compile
2749 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
2751 WSAEventSelect (SOCKET /* s */,
2752 WSAEVENT /* hEventObject */,
2753 long /* lNetworkEvents */)
2755 return -1;
2759 WSAEnumNetworkEvents (SOCKET /* s */,
2760 WSAEVENT /* hEventObject */,
2761 LPWSANETWORKEVENTS /* lpNetworkEvents */)
2763 return -1;
2765 #endif /* !defined ACE_HAS_WINSOCK2 */
2767 ACE_END_VERSIONED_NAMESPACE_DECL
2769 #endif /* ACE_WIN32 */