1 #include "ace/WFMO_Reactor.h"
3 #if defined (ACE_WIN32)
5 #include "ace/Handle_Set.h"
6 #include "ace/Timer_Heap.h"
7 #include "ace/Thread.h"
8 #include "ace/OS_NS_errno.h"
9 #include "ace/Null_Condition.h"
11 #if !defined (__ACE_INLINE__)
12 #include "ace/WFMO_Reactor.inl"
13 #endif /* __ACE_INLINE__ */
17 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
19 ACE_WFMO_Reactor_Handler_Repository::ACE_WFMO_Reactor_Handler_Repository (ACE_WFMO_Reactor
&wfmo_reactor
)
20 : wfmo_reactor_ (wfmo_reactor
)
25 ACE_WFMO_Reactor_Handler_Repository::open (size_t size
)
27 if (size
> MAXIMUM_WAIT_OBJECTS
)
28 ACELIB_ERROR_RETURN ((LM_ERROR
,
29 ACE_TEXT ("%d exceeds MAXIMUM_WAIT_OBJECTS (%d)\n"),
31 MAXIMUM_WAIT_OBJECTS
),
35 ACE_NEW_RETURN (this->current_handles_
,
38 ACE_NEW_RETURN (this->current_info_
,
41 ACE_NEW_RETURN (this->current_suspended_info_
,
44 ACE_NEW_RETURN (this->to_be_added_info_
,
45 To_Be_Added_Info
[size
],
49 this->max_size_
= size
;
50 this->max_handlep1_
= 0;
51 this->suspended_handles_
= 0;
52 this->handles_to_be_added_
= 0;
53 this->handles_to_be_deleted_
= 0;
54 this->handles_to_be_suspended_
= 0;
55 this->handles_to_be_resumed_
= 0;
57 for (size_t i
= 0; i
< size
; ++i
)
58 this->current_handles_
[i
] = ACE_INVALID_HANDLE
;
63 ACE_WFMO_Reactor_Handler_Repository::~ACE_WFMO_Reactor_Handler_Repository ()
65 // Free up dynamically allocated space
66 delete [] this->current_handles_
;
67 delete [] this->current_info_
;
68 delete [] this->current_suspended_info_
;
69 delete [] this->to_be_added_info_
;
73 ACE_WFMO_Reactor_Handler_Repository::bit_ops (long &existing_masks
,
74 ACE_Reactor_Mask change_masks
,
77 // Find the old reactor masks. This automatically does the work of
78 // the GET_MASK operation.
80 ACE_Reactor_Mask old_masks
= ACE_Event_Handler::NULL_MASK
;
82 if (ACE_BIT_ENABLED (existing_masks
, FD_READ
)
83 || ACE_BIT_ENABLED (existing_masks
, FD_CLOSE
))
84 ACE_SET_BITS (old_masks
, ACE_Event_Handler::READ_MASK
);
86 if (ACE_BIT_ENABLED (existing_masks
, FD_WRITE
))
87 ACE_SET_BITS (old_masks
, ACE_Event_Handler::WRITE_MASK
);
89 if (ACE_BIT_ENABLED (existing_masks
, FD_OOB
))
90 ACE_SET_BITS (old_masks
, ACE_Event_Handler::EXCEPT_MASK
);
92 if (ACE_BIT_ENABLED (existing_masks
, FD_ACCEPT
))
93 ACE_SET_BITS (old_masks
, ACE_Event_Handler::ACCEPT_MASK
);
95 if (ACE_BIT_ENABLED (existing_masks
, FD_CONNECT
))
96 ACE_SET_BITS (old_masks
, ACE_Event_Handler::CONNECT_MASK
);
98 if (ACE_BIT_ENABLED (existing_masks
, FD_QOS
))
99 ACE_SET_BITS (old_masks
, ACE_Event_Handler::QOS_MASK
);
101 if (ACE_BIT_ENABLED (existing_masks
, FD_GROUP_QOS
))
102 ACE_SET_BITS (old_masks
, ACE_Event_Handler::GROUP_QOS_MASK
);
106 case ACE_Reactor::CLR_MASK
:
107 // For the CLR_MASK operation, clear only the specific masks.
109 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::READ_MASK
))
111 ACE_CLR_BITS (existing_masks
, FD_READ
);
112 ACE_CLR_BITS (existing_masks
, FD_CLOSE
);
115 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::WRITE_MASK
))
116 ACE_CLR_BITS (existing_masks
, FD_WRITE
);
118 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::EXCEPT_MASK
))
119 ACE_CLR_BITS (existing_masks
, FD_OOB
);
121 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::ACCEPT_MASK
))
122 ACE_CLR_BITS (existing_masks
, FD_ACCEPT
);
124 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::CONNECT_MASK
))
125 ACE_CLR_BITS (existing_masks
, FD_CONNECT
);
127 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::QOS_MASK
))
128 ACE_CLR_BITS (existing_masks
, FD_QOS
);
130 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::GROUP_QOS_MASK
))
131 ACE_CLR_BITS (existing_masks
, FD_GROUP_QOS
);
135 case ACE_Reactor::SET_MASK
:
136 // If the operation is a set, first reset any existing masks
141 case ACE_Reactor::ADD_MASK
:
142 // For the ADD_MASK and the SET_MASK operation, add only the
145 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::READ_MASK
))
147 ACE_SET_BITS (existing_masks
, FD_READ
);
148 ACE_SET_BITS (existing_masks
, FD_CLOSE
);
151 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::WRITE_MASK
))
152 ACE_SET_BITS (existing_masks
, FD_WRITE
);
154 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::EXCEPT_MASK
))
155 ACE_SET_BITS (existing_masks
, FD_OOB
);
157 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::ACCEPT_MASK
))
158 ACE_SET_BITS (existing_masks
, FD_ACCEPT
);
160 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::CONNECT_MASK
))
161 ACE_SET_BITS (existing_masks
, FD_CONNECT
);
163 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::QOS_MASK
))
164 ACE_SET_BITS (existing_masks
, FD_QOS
);
166 if (ACE_BIT_ENABLED (change_masks
, ACE_Event_Handler::GROUP_QOS_MASK
))
167 ACE_SET_BITS (existing_masks
, FD_GROUP_QOS
);
171 case ACE_Reactor::GET_MASK
:
173 // The work for this operation is done in all cases at the
174 // beginning of the function.
176 ACE_UNUSED_ARG (change_masks
);
185 ACE_WFMO_Reactor_Handler_Repository::unbind_i (ACE_HANDLE handle
,
186 ACE_Reactor_Mask mask
,
187 bool &changes_required
)
191 // Remember this value; only if it changes do we need to wakeup
193 size_t const original_handle_count
= this->handles_to_be_deleted_
;
196 // Go through all the handles looking for <handle>. Even if we find
197 // it, we continue through the rest of the list since <handle> could
198 // appear multiple times. All handles are checked.
200 // First check the current entries
201 for (i
= 0; i
< this->max_handlep1_
&& !error
; ++i
)
202 // Since the handle can either be the event or the I/O handle,
203 // we have to check both
204 if ((this->current_handles_
[i
] == handle
205 || this->current_info_
[i
].io_handle_
== handle
)
206 && // Make sure that it is not already marked for deleted
207 !this->current_info_
[i
].delete_entry_
)
209 if (this->remove_handler_i (i
, mask
) == -1)
213 // Then check the suspended entries
214 for (i
= 0; i
< this->suspended_handles_
&& !error
; ++i
)
215 // Since the handle can either be the event or the I/O handle, we
216 // have to check both
217 if ((this->current_suspended_info_
[i
].io_handle_
== handle
218 || this->current_suspended_info_
[i
].event_handle_
== handle
)
220 // Make sure that it is not already marked for deleted
221 !this->current_suspended_info_
[i
].delete_entry_
)
223 if (this->remove_suspended_handler_i (i
, mask
) == -1)
227 // Then check the to_be_added entries
228 for (i
= 0; i
< this->handles_to_be_added_
&& !error
; ++i
)
229 // Since the handle can either be the event or the I/O handle,
230 // we have to check both
231 if ((this->to_be_added_info_
[i
].io_handle_
== handle
232 || this->to_be_added_info_
[i
].event_handle_
== handle
)
234 // Make sure that it is not already marked for deleted
235 !this->to_be_added_info_
[i
].delete_entry_
)
237 if (this->remove_to_be_added_handler_i (i
, mask
) == -1)
241 // Only if the number of handlers to be deleted changes do we need
242 // to wakeup the other threads
243 if (original_handle_count
< this->handles_to_be_deleted_
)
244 changes_required
= true;
246 return error
? -1 : 0;
250 ACE_WFMO_Reactor_Handler_Repository::remove_handler_i (size_t slot
,
251 ACE_Reactor_Mask to_be_removed_masks
)
254 if (this->current_info_
[slot
].io_entry_
)
256 // See if there are other events that the <Event_Handler> is
258 this->bit_ops (this->current_info_
[slot
].network_events_
,
260 ACE_Reactor::CLR_MASK
);
262 // Disassociate/Reassociate the event from/with the I/O handle.
263 // This will depend on the value of remaining set of network
264 // events that the <event_handler> is interested in. I don't
265 // think we can do anything about errors here, so I will not
267 ::WSAEventSelect ((SOCKET
) this->current_info_
[slot
].io_handle_
,
268 this->current_handles_
[slot
],
269 this->current_info_
[slot
].network_events_
);
271 // Normal event entries.
272 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
))
273 // Preserve DONT_CALL
274 to_be_removed_masks
= ACE_Event_Handler::DONT_CALL
;
276 // Make sure that the <to_be_removed_masks> is the NULL_MASK
277 to_be_removed_masks
= ACE_Event_Handler::NULL_MASK
;
279 // If this event was marked for suspension, undo the suspension flag
280 // and reduce the to be suspended count.
281 if (this->current_info_
[slot
].suspend_entry_
)
284 this->current_info_
[slot
].suspend_entry_
= false;
285 // Decrement the handle count
286 --this->handles_to_be_suspended_
;
289 // If there are no more events that the <Event_Handler> is
290 // interested in, or this is a non-I/O entry, schedule the
291 // <Event_Handler> for removal
292 if (this->current_info_
[slot
].network_events_
== 0)
294 // Mark to be deleted
295 this->current_info_
[slot
].delete_entry_
= true;
297 this->current_info_
[slot
].close_masks_
= to_be_removed_masks
;
298 // Increment the handle count
299 ++this->handles_to_be_deleted_
;
302 // Since it is not a complete removal, we'll call handle_close
303 // for all the masks that were removed. This does not change
304 // the internal state of the reactor.
306 // Note: this condition only applies to I/O entries
307 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
) == 0)
309 ACE_HANDLE handle
= this->current_info_
[slot
].io_handle_
;
310 this->current_info_
[slot
].event_handler_
->handle_close (handle
,
311 to_be_removed_masks
);
318 ACE_WFMO_Reactor_Handler_Repository::remove_suspended_handler_i (size_t slot
,
319 ACE_Reactor_Mask to_be_removed_masks
)
322 if (this->current_suspended_info_
[slot
].io_entry_
)
324 // See if there are other events that the <Event_Handler> is
326 this->bit_ops (this->current_suspended_info_
[slot
].network_events_
,
328 ACE_Reactor::CLR_MASK
);
330 // Disassociate/Reassociate the event from/with the I/O handle.
331 // This will depend on the value of remaining set of network
332 // events that the <event_handler> is interested in. I don't
333 // think we can do anything about errors here, so I will not
335 ::WSAEventSelect ((SOCKET
) this->current_suspended_info_
[slot
].io_handle_
,
336 this->current_suspended_info_
[slot
].event_handle_
,
337 this->current_suspended_info_
[slot
].network_events_
);
339 // Normal event entries.
340 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
))
341 // Preserve DONT_CALL
342 to_be_removed_masks
= ACE_Event_Handler::DONT_CALL
;
344 // Make sure that the <to_be_removed_masks> is the NULL_MASK
345 to_be_removed_masks
= ACE_Event_Handler::NULL_MASK
;
347 // If this event was marked for resumption, undo the resumption flag
348 // and reduce the to be resumed count.
349 if (this->current_suspended_info_
[slot
].resume_entry_
)
352 this->current_suspended_info_
[slot
].resume_entry_
= false;
353 // Decrement the handle count
354 --this->handles_to_be_resumed_
;
357 // If there are no more events that the <Event_Handler> is
358 // interested in, or this is a non-I/O entry, schedule the
359 // <Event_Handler> for removal
360 if (this->current_suspended_info_
[slot
].network_events_
== 0)
362 // Mark to be deleted
363 this->current_suspended_info_
[slot
].delete_entry_
= true;
365 this->current_suspended_info_
[slot
].close_masks_
= to_be_removed_masks
;
366 // Increment the handle count
367 ++this->handles_to_be_deleted_
;
369 // Since it is not a complete removal, we'll call handle_close for
370 // all the masks that were removed. This does not change the
371 // internal state of the reactor.
373 // Note: this condition only applies to I/O entries
374 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
) == 0)
376 ACE_HANDLE handle
= this->current_suspended_info_
[slot
].io_handle_
;
377 this->current_suspended_info_
[slot
].event_handler_
->handle_close (handle
,
378 to_be_removed_masks
);
385 ACE_WFMO_Reactor_Handler_Repository::remove_to_be_added_handler_i (size_t slot
,
386 ACE_Reactor_Mask to_be_removed_masks
)
389 if (this->to_be_added_info_
[slot
].io_entry_
)
391 // See if there are other events that the <Event_Handler> is
393 this->bit_ops (this->to_be_added_info_
[slot
].network_events_
,
395 ACE_Reactor::CLR_MASK
);
397 // Disassociate/Reassociate the event from/with the I/O handle.
398 // This will depend on the value of remaining set of network
399 // events that the <event_handler> is interested in. I don't
400 // think we can do anything about errors here, so I will not
402 ::WSAEventSelect ((SOCKET
) this->to_be_added_info_
[slot
].io_handle_
,
403 this->to_be_added_info_
[slot
].event_handle_
,
404 this->to_be_added_info_
[slot
].network_events_
);
406 // Normal event entries.
407 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
))
408 // Preserve DONT_CALL
409 to_be_removed_masks
= ACE_Event_Handler::DONT_CALL
;
411 // Make sure that the <to_be_removed_masks> is the NULL_MASK
412 to_be_removed_masks
= ACE_Event_Handler::NULL_MASK
;
414 // If this event was marked for suspension, undo the suspension flag
415 // and reduce the to be suspended count.
416 if (this->to_be_added_info_
[slot
].suspend_entry_
)
419 this->to_be_added_info_
[slot
].suspend_entry_
= false;
420 // Decrement the handle count
421 --this->handles_to_be_suspended_
;
424 // If there are no more events that the <Event_Handler> is
425 // interested in, or this is a non-I/O entry, schedule the
426 // <Event_Handler> for removal
427 if (this->to_be_added_info_
[slot
].network_events_
== 0)
429 // Mark to be deleted
430 this->to_be_added_info_
[slot
].delete_entry_
= true;
432 this->to_be_added_info_
[slot
].close_masks_
= to_be_removed_masks
;
433 // Increment the handle count
434 ++this->handles_to_be_deleted_
;
436 // Since it is not a complete removal, we'll call handle_close
437 // for all the masks that were removed. This does not change
438 // the internal state of the reactor.
440 // Note: this condition only applies to I/O entries
441 else if (ACE_BIT_ENABLED (to_be_removed_masks
, ACE_Event_Handler::DONT_CALL
) == 0)
443 ACE_HANDLE handle
= this->to_be_added_info_
[slot
].io_handle_
;
444 this->to_be_added_info_
[slot
].event_handler_
->handle_close (handle
,
445 to_be_removed_masks
);
452 ACE_WFMO_Reactor_Handler_Repository::suspend_handler_i (ACE_HANDLE handle
,
453 bool &changes_required
)
457 // Go through all the handles looking for <handle>. Even if we find
458 // it, we continue through the rest of the list since <handle> could
459 // appear multiple times. All handles are checked.
461 // Check the current entries first.
462 for (i
= 0; i
< this->max_handlep1_
; ++i
)
463 // Since the handle can either be the event or the I/O handle,
464 // we have to check both
465 if ((this->current_handles_
[i
] == handle
||
466 this->current_info_
[i
].io_handle_
== handle
) &&
467 // Make sure that it is not already marked for suspension
468 !this->current_info_
[i
].suspend_entry_
)
470 // Mark to be suspended
471 this->current_info_
[i
].suspend_entry_
= true;
472 // Increment the handle count
473 ++this->handles_to_be_suspended_
;
474 // Changes will be required
475 changes_required
= true;
478 // Then check the suspended entries.
479 for (i
= 0; i
< this->suspended_handles_
; ++i
)
480 // Since the handle can either be the event or the I/O handle,
481 // we have to check both
482 if ((this->current_suspended_info_
[i
].event_handle_
== handle
||
483 this->current_suspended_info_
[i
].io_handle_
== handle
) &&
484 // Make sure that the resumption is not already undone
485 this->current_suspended_info_
[i
].resume_entry_
)
488 this->current_suspended_info_
[i
].resume_entry_
= false;
489 // Decrement the handle count
490 --this->handles_to_be_resumed_
;
491 // Changes will be required
492 changes_required
= true;
495 // Then check the to_be_added entries.
496 for (i
= 0; i
< this->handles_to_be_added_
; ++i
)
497 // Since the handle can either be the event or the I/O handle,
498 // we have to check both
499 if ((this->to_be_added_info_
[i
].io_handle_
== handle
||
500 this->to_be_added_info_
[i
].event_handle_
== handle
) &&
501 // Make sure that it is not already marked for suspension
502 !this->to_be_added_info_
[i
].suspend_entry_
)
504 // Mark to be suspended
505 this->to_be_added_info_
[i
].suspend_entry_
= true;
506 // Increment the handle count
507 ++this->handles_to_be_suspended_
;
508 // Changes will be required
509 changes_required
= true;
516 ACE_WFMO_Reactor_Handler_Repository::resume_handler_i (ACE_HANDLE handle
,
517 bool &changes_required
)
521 // Go through all the handles looking for <handle>. Even if we find
522 // it, we continue through the rest of the list since <handle> could
523 // appear multiple times. All handles are checked.
525 // Check the current entries first.
526 for (i
= 0; i
< this->max_handlep1_
; ++i
)
527 // Since the handle can either be the event or the I/O handle,
528 // we have to check both
529 if ((this->current_handles_
[i
] == handle
||
530 this->current_info_
[i
].io_handle_
== handle
) &&
531 // Make sure that the suspension is not already undone
532 this->current_info_
[i
].suspend_entry_
)
535 this->current_info_
[i
].suspend_entry_
= false;
536 // Decrement the handle count
537 --this->handles_to_be_suspended_
;
538 // Changes will be required
539 changes_required
= true;
542 // Then check the suspended entries.
543 for (i
= 0; i
< this->suspended_handles_
; ++i
)
544 // Since the handle can either be the event or the I/O handle,
545 // we have to check both
546 if ((this->current_suspended_info_
[i
].event_handle_
== handle
||
547 this->current_suspended_info_
[i
].io_handle_
== handle
) &&
548 // Make sure that it is not already marked for resumption
549 !this->current_suspended_info_
[i
].resume_entry_
)
551 // Mark to be resumed
552 this->current_suspended_info_
[i
].resume_entry_
= true;
553 // Increment the handle count
554 ++this->handles_to_be_resumed_
;
555 // Changes will be required
556 changes_required
= true;
559 // Then check the to_be_added entries.
560 for (i
= 0; i
< this->handles_to_be_added_
; ++i
)
561 // Since the handle can either be the event or the I/O handle,
562 // we have to check both
563 if ((this->to_be_added_info_
[i
].io_handle_
== handle
||
564 this->to_be_added_info_
[i
].event_handle_
== handle
) &&
565 // Make sure that the suspension is not already undone
566 this->to_be_added_info_
[i
].suspend_entry_
)
569 this->to_be_added_info_
[i
].suspend_entry_
= false;
570 // Decrement the handle count
571 --this->handles_to_be_suspended_
;
572 // Changes will be required
573 changes_required
= true;
580 ACE_WFMO_Reactor_Handler_Repository::unbind_all ()
583 ACE_GUARD (ACE_Process_Mutex
, ace_mon
, this->wfmo_reactor_
.lock_
);
588 // Remove all the current handlers
589 for (i
= 0; i
< this->max_handlep1_
; ++i
)
590 this->unbind_i (this->current_handles_
[i
],
591 ACE_Event_Handler::ALL_EVENTS_MASK
,
594 // Remove all the suspended handlers
595 for (i
= 0; i
< this->suspended_handles_
; ++i
)
596 this->unbind_i (this->current_suspended_info_
[i
].event_handle_
,
597 ACE_Event_Handler::ALL_EVENTS_MASK
,
600 // Remove all the to_be_added handlers
601 for (i
= 0; i
< this->handles_to_be_added_
; ++i
)
602 this->unbind_i (this->to_be_added_info_
[i
].event_handle_
,
603 ACE_Event_Handler::ALL_EVENTS_MASK
,
607 // The guard is released here
609 // Wake up all threads in WaitForMultipleObjects so that they can
610 // reconsult the handle set
611 this->wfmo_reactor_
.wakeup_all_threads ();
615 ACE_WFMO_Reactor_Handler_Repository::bind_i (bool io_entry
,
616 ACE_Event_Handler
*event_handler
,
618 ACE_HANDLE io_handle
,
619 ACE_HANDLE event_handle
,
622 if (event_handler
== 0)
625 // Make sure that the <handle> is valid
626 if (event_handle
== ACE_INVALID_HANDLE
)
627 event_handle
= event_handler
->get_handle ();
628 if (this->invalid_handle (event_handle
))
631 size_t current_size
= this->max_handlep1_
+
632 this->handles_to_be_added_
-
633 this->handles_to_be_deleted_
+
634 this->suspended_handles_
;
636 // Make sure that there's room in the table and that total pending
637 // additions should not exceed what the <to_be_added_info_> array
639 if (current_size
< this->max_size_
&&
640 this->handles_to_be_added_
< this->max_size_
)
642 // Cache this set into the <to_be_added_info_>, till we come
643 // around to actually adding this to the <current_info_>
644 this->to_be_added_info_
[this->handles_to_be_added_
].set (event_handle
,
651 ++this->handles_to_be_added_
;
653 event_handler
->add_reference ();
655 // Wake up all threads in WaitForMultipleObjects so that they can
656 // reconsult the handle set
657 this->wfmo_reactor_
.wakeup_all_threads ();
661 errno
= EMFILE
; // File descriptor table is full (better than nothing)
669 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_current_infos ()
671 // Go through the entire valid array and check for all handles that
672 // have been schedule for deletion
673 if (this->handles_to_be_deleted_
> 0 || this->handles_to_be_suspended_
> 0)
676 while (i
< this->max_handlep1_
)
678 // This stuff is necessary here, since we should not make
679 // the upcall until all the internal data structures have
680 // been updated. This is to protect against upcalls that
681 // try to deregister again.
682 ACE_HANDLE handle
= ACE_INVALID_HANDLE
;
683 ACE_Reactor_Mask masks
= ACE_Event_Handler::NULL_MASK
;
684 ACE_Event_Handler
*event_handler
= 0;
686 // See if this entry is scheduled for deletion
687 if (this->current_info_
[i
].delete_entry_
)
689 // Calling the <handle_close> method here will ensure that we
690 // will only call it once per deregistering <Event_Handler>.
691 // This is essential in the case when the <Event_Handler> will
692 // do something like delete itself and we have multiple
693 // threads in WFMO_Reactor.
695 // Make sure that the DONT_CALL mask is not set
696 masks
= this->current_info_
[i
].close_masks_
;
697 if (ACE_BIT_ENABLED (masks
, ACE_Event_Handler::DONT_CALL
) == 0)
699 // Grab the correct handle depending on the type entry
700 if (this->current_info_
[i
].io_entry_
)
701 handle
= this->current_info_
[i
].io_handle_
;
703 handle
= this->current_handles_
[i
];
706 event_handler
= this->current_info_
[i
].event_handler_
;
709 // If <WFMO_Reactor> created the event, we need to clean it up
710 if (this->current_info_
[i
].delete_event_
)
711 ACE_OS::event_destroy (&this->current_handles_
[i
]);
713 // Reduce count by one
714 --this->handles_to_be_deleted_
;
717 // See if this entry is scheduled for suspension
718 else if (this->current_info_
[i
].suspend_entry_
)
720 this->current_suspended_info_
[this->suspended_handles_
].set (this->current_handles_
[i
],
721 this->current_info_
[i
]);
722 // Increase number of suspended handles
723 ++this->suspended_handles_
;
725 // Reduce count by one
726 --this->handles_to_be_suspended_
;
729 // See if this entry is scheduled for deletion or suspension
730 // If so we need to clean up
731 if (this->current_info_
[i
].delete_entry_
||
732 this->current_info_
[i
].suspend_entry_
)
734 size_t last_valid_slot
= this->max_handlep1_
- 1;
735 // If this is the last handle in the set, no need to swap
736 // places. Simply remove it.
737 if (i
< last_valid_slot
)
738 // Swap this handle with the last valid handle
741 this->current_info_
[i
] =
742 this->current_info_
[last_valid_slot
];
743 this->current_handles_
[i
] =
744 this->current_handles_
[last_valid_slot
];
746 // Reset the info in this slot
747 this->current_info_
[last_valid_slot
].reset ();
748 this->current_handles_
[last_valid_slot
] = ACE_INVALID_HANDLE
;
749 --this->max_handlep1_
;
753 // This current entry is not up for deletion or
754 // suspension. Proceed to the next entry in the current
759 // Now that all internal structures have been updated, make
761 if (event_handler
!= 0)
763 bool const requires_reference_counting
=
764 event_handler
->reference_counting_policy ().value () ==
765 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
767 event_handler
->handle_close (handle
, masks
);
769 if (requires_reference_counting
)
771 event_handler
->remove_reference ();
781 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_suspension_infos ()
783 // Go through the <suspended_handle> array
784 if (this->handles_to_be_deleted_
> 0 || this->handles_to_be_resumed_
> 0)
787 while (i
< this->suspended_handles_
)
789 // This stuff is necessary here, since we should not make
790 // the upcall until all the internal data structures have
791 // been updated. This is to protect against upcalls that
792 // try to deregister again.
793 ACE_HANDLE handle
= ACE_INVALID_HANDLE
;
794 ACE_Reactor_Mask masks
= ACE_Event_Handler::NULL_MASK
;
795 ACE_Event_Handler
*event_handler
= 0;
797 // See if this entry is scheduled for deletion
798 if (this->current_suspended_info_
[i
].delete_entry_
)
800 // Calling the <handle_close> method here will ensure that we
801 // will only call it once per deregistering <Event_Handler>.
802 // This is essential in the case when the <Event_Handler> will
803 // do something like delete itself and we have multiple
804 // threads in WFMO_Reactor.
806 // Make sure that the DONT_CALL mask is not set
807 masks
= this->current_suspended_info_
[i
].close_masks_
;
808 if (ACE_BIT_ENABLED (masks
, ACE_Event_Handler::DONT_CALL
) == 0)
810 // Grab the correct handle depending on the type entry
811 if (this->current_suspended_info_
[i
].io_entry_
)
812 handle
= this->current_suspended_info_
[i
].io_handle_
;
814 handle
= this->current_suspended_info_
[i
].event_handle_
;
817 event_handler
= this->current_suspended_info_
[i
].event_handler_
;
820 // If <WFMO_Reactor> created the event, we need to clean it up
821 if (this->current_suspended_info_
[i
].delete_event_
)
822 ACE_OS::event_destroy (&this->current_suspended_info_
[i
].event_handle_
);
824 // Reduce count by one
825 --this->handles_to_be_deleted_
;
828 else if (this->current_suspended_info_
[i
].resume_entry_
)
830 // Add to the end of the current handles set
831 this->current_handles_
[this->max_handlep1_
] = this->current_suspended_info_
[i
].event_handle_
;
833 this->current_info_
[this->max_handlep1_
].set (this->current_suspended_info_
[i
]);
834 ++this->max_handlep1_
;
836 // Reduce count by one
837 --this->handles_to_be_resumed_
;
840 // If an entry needs to be removed, either because it
841 // was deleted or resumed, remove it now before doing
843 if (this->current_suspended_info_
[i
].resume_entry_
||
844 this->current_suspended_info_
[i
].delete_entry_
)
846 size_t last_valid_slot
= this->suspended_handles_
- 1;
847 // Net effect is that we're removing an entry and
848 // compressing the list from the end. So, if removing
849 // an entry from the middle, copy the last valid one to the
850 // removed slot. Reset the end and decrement the number
851 // of suspended handles.
852 if (i
< last_valid_slot
)
854 this->current_suspended_info_
[i
] =
855 this->current_suspended_info_
[last_valid_slot
];
856 this->current_suspended_info_
[last_valid_slot
].reset ();
857 --this->suspended_handles_
;
861 // This current entry is not up for deletion or
862 // resumption. Proceed to the next entry in the
863 // suspended handles.
867 // Now that all internal structures have been updated, make
869 if (event_handler
!= 0)
871 int requires_reference_counting
=
872 event_handler
->reference_counting_policy ().value () ==
873 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
875 event_handler
->handle_close (handle
, masks
);
877 if (requires_reference_counting
)
879 event_handler
->remove_reference ();
889 ACE_WFMO_Reactor_Handler_Repository::make_changes_in_to_be_added_infos ()
891 // Go through the <to_be_added_*> arrays
892 for (size_t i
= 0; i
< this->handles_to_be_added_
; ++i
)
894 // This stuff is necessary here, since we should not make
895 // the upcall until all the internal data structures have
896 // been updated. This is to protect against upcalls that
897 // try to deregister again.
898 ACE_HANDLE handle
= ACE_INVALID_HANDLE
;
899 ACE_Reactor_Mask masks
= ACE_Event_Handler::NULL_MASK
;
900 ACE_Event_Handler
*event_handler
= 0;
902 // See if this entry is scheduled for deletion
903 if (this->to_be_added_info_
[i
].delete_entry_
)
905 // Calling the <handle_close> method here will ensure that we
906 // will only call it once per deregistering <Event_Handler>.
907 // This is essential in the case when the <Event_Handler> will
908 // do something like delete itself and we have multiple
909 // threads in WFMO_Reactor.
911 // Make sure that the DONT_CALL mask is not set
912 masks
= this->to_be_added_info_
[i
].close_masks_
;
913 if (ACE_BIT_ENABLED (masks
, ACE_Event_Handler::DONT_CALL
) == 0)
915 // Grab the correct handle depending on the type entry
916 if (this->to_be_added_info_
[i
].io_entry_
)
917 handle
= this->to_be_added_info_
[i
].io_handle_
;
919 handle
= this->to_be_added_info_
[i
].event_handle_
;
922 event_handler
= this->to_be_added_info_
[i
].event_handler_
;
925 // If <WFMO_Reactor> created the event, we need to clean it up
926 if (this->to_be_added_info_
[i
].delete_event_
)
927 ACE_OS::event_destroy (&this->to_be_added_info_
[i
].event_handle_
);
929 // Reduce count by one
930 --this->handles_to_be_deleted_
;
933 // See if this entry is scheduled for suspension
934 else if (this->to_be_added_info_
[i
].suspend_entry_
)
936 this->current_suspended_info_
[this->suspended_handles_
].set (this->to_be_added_info_
[i
].event_handle_
,
937 this->to_be_added_info_
[i
]);
938 // Increase number of suspended handles
939 ++this->suspended_handles_
;
941 // Reduce count by one
942 --this->handles_to_be_suspended_
;
945 // If neither of the two flags are on, add to current
948 // Add to the end of the current handles set
949 this->current_handles_
[this->max_handlep1_
] = this->to_be_added_info_
[i
].event_handle_
;
951 this->current_info_
[this->max_handlep1_
].set (this->to_be_added_info_
[i
]);
952 ++this->max_handlep1_
;
955 // Reset the <to_be_added_info_>
956 this->to_be_added_info_
[i
].reset ();
958 // Now that all internal structures have been updated, make the
960 if (event_handler
!= 0)
962 int requires_reference_counting
=
963 event_handler
->reference_counting_policy ().value () ==
964 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
966 event_handler
->handle_close (handle
, masks
);
968 if (requires_reference_counting
)
970 event_handler
->remove_reference ();
975 // Since all to be added handles have been taken care of, reset the
977 this->handles_to_be_added_
= 0;
983 ACE_WFMO_Reactor_Handler_Repository::dump () const
985 #if defined (ACE_HAS_DUMP)
988 ACE_TRACE ("ACE_WFMO_Reactor_Handler_Repository::dump");
990 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
992 ACELIB_DEBUG ((LM_DEBUG
,
993 ACE_TEXT ("Max size = %d\n"),
996 ACELIB_DEBUG ((LM_DEBUG
,
997 ACE_TEXT ("Current info table\n\n")));
998 ACELIB_DEBUG ((LM_DEBUG
,
999 ACE_TEXT ("\tSize = %d\n"),
1000 this->max_handlep1_
));
1001 ACELIB_DEBUG ((LM_DEBUG
,
1002 ACE_TEXT ("\tHandles to be suspended = %d\n"),
1003 this->handles_to_be_suspended_
));
1005 for (i
= 0; i
< this->max_handlep1_
; ++i
)
1006 this->current_info_
[i
].dump (this->current_handles_
[i
]);
1008 ACELIB_DEBUG ((LM_DEBUG
,
1011 ACELIB_DEBUG ((LM_DEBUG
,
1012 ACE_TEXT ("To-be-added info table\n\n")));
1013 ACELIB_DEBUG ((LM_DEBUG
,
1014 ACE_TEXT ("\tSize = %d\n"),
1015 this->handles_to_be_added_
));
1017 for (i
= 0; i
< this->handles_to_be_added_
; ++i
)
1018 this->to_be_added_info_
[i
].dump ();
1020 ACELIB_DEBUG ((LM_DEBUG
,
1023 ACELIB_DEBUG ((LM_DEBUG
,
1024 ACE_TEXT ("Suspended info table\n\n")));
1025 ACELIB_DEBUG ((LM_DEBUG
,
1026 ACE_TEXT ("\tSize = %d\n"),
1027 this->suspended_handles_
));
1028 ACELIB_DEBUG ((LM_DEBUG
,
1029 ACE_TEXT ("\tHandles to be resumed = %d\n"),
1030 this->handles_to_be_resumed_
));
1032 for (i
= 0; i
< this->suspended_handles_
; ++i
)
1033 this->current_suspended_info_
[i
].dump ();
1035 ACELIB_DEBUG ((LM_DEBUG
,
1038 ACELIB_DEBUG ((LM_DEBUG
,
1039 ACE_TEXT ("Total handles to be deleted = %d\n"),
1040 this->handles_to_be_deleted_
));
1042 ACELIB_DEBUG ((LM_DEBUG
,
1044 #endif /* ACE_HAS_DUMP */
1047 /************************************************************/
1050 ACE_WFMO_Reactor::work_pending (const ACE_Time_Value
&)
1052 ACE_NOTSUP_RETURN (-1);
1055 #if defined (_MSC_VER)
1056 # pragma warning (push)
1057 # pragma warning (disable:4355) /* Use of 'this' in initializer list */
1059 ACE_WFMO_Reactor::ACE_WFMO_Reactor (ACE_Sig_Handler
*sh
,
1060 ACE_Timer_Queue
*tq
,
1061 ACE_Reactor_Notify
*notify
)
1062 : signal_handler_ (0),
1063 delete_signal_handler_ (false),
1065 delete_timer_queue_ (false),
1066 delete_handler_rep_ (false),
1067 notify_handler_ (0),
1068 delete_notify_handler_ (false),
1069 lock_adapter_ (lock_
),
1070 handler_rep_ (*this),
1071 // this event is initially signaled
1073 // this event is initially unsignaled
1074 wakeup_all_threads_ (0),
1075 // this event is initially unsignaled
1076 waiting_to_change_state_ (0),
1077 active_threads_ (0),
1078 owner_ (ACE_Thread::self ()),
1080 change_state_thread_ (0),
1081 open_for_business_ (false),
1084 if (this->open (ACE_WFMO_Reactor::DEFAULT_SIZE
, 0, sh
, tq
, 0, notify
) == -1)
1085 ACELIB_ERROR ((LM_ERROR
,
1087 ACE_TEXT ("WFMO_Reactor")));
1090 ACE_WFMO_Reactor::ACE_WFMO_Reactor (size_t size
,
1092 ACE_Sig_Handler
*sh
,
1093 ACE_Timer_Queue
*tq
,
1094 ACE_Reactor_Notify
*notify
)
1095 : signal_handler_ (0),
1096 delete_signal_handler_ (false),
1098 delete_timer_queue_ (false),
1099 delete_handler_rep_ (false),
1100 notify_handler_ (0),
1101 delete_notify_handler_ (false),
1102 lock_adapter_ (lock_
),
1103 handler_rep_ (*this),
1104 // this event is initially signaled
1106 // this event is initially unsignaled
1107 wakeup_all_threads_ (0),
1108 // this event is initially unsignaled
1109 waiting_to_change_state_ (0),
1110 active_threads_ (0),
1111 owner_ (ACE_Thread::self ()),
1113 change_state_thread_ (0),
1114 open_for_business_ (false),
1117 ACE_UNUSED_ARG (unused
);
1119 if (this->open (size
, 0, sh
, tq
, 0, notify
) == -1)
1120 ACELIB_ERROR ((LM_ERROR
,
1122 ACE_TEXT ("WFMO_Reactor")));
1124 #if defined (_MSC_VER)
1125 # pragma warning (pop)
1129 ACE_WFMO_Reactor::current_info (ACE_HANDLE
, size_t &)
1135 ACE_WFMO_Reactor::open (size_t size
,
1137 ACE_Sig_Handler
*sh
,
1138 ACE_Timer_Queue
*tq
,
1140 ACE_Reactor_Notify
*notify
)
1142 // This GUARD is necessary since we are updating shared state.
1143 ACE_GUARD_RETURN (ACE_Process_Mutex
, ace_mon
, this->lock_
, -1);
1145 // If we are already open, return -1
1146 if (this->open_for_business_
)
1150 if (this->delete_timer_queue_
)
1151 delete this->timer_queue_
;
1152 else if (this->timer_queue_
)
1153 this->timer_queue_
->close ();
1157 ACE_NEW_RETURN (this->timer_queue_
,
1160 this->delete_timer_queue_
= true;
1164 this->timer_queue_
= tq
;
1165 this->delete_timer_queue_
= false;
1169 if (this->delete_signal_handler_
)
1170 delete this->signal_handler_
;
1174 ACE_NEW_RETURN (this->signal_handler_
,
1177 this->delete_signal_handler_
= true;
1181 this->signal_handler_
= sh
;
1182 this->delete_signal_handler_
= false;
1185 // Setup the atomic wait array (used later in <handle_events>)
1186 this->atomic_wait_array_
[0] = this->lock_
.lock ().proc_mutex_
;
1187 this->atomic_wait_array_
[1] = this->ok_to_wait_
.handle ();
1189 // Prevent memory leaks when the ACE_WFMO_Reactor is reopened.
1190 if (this->delete_handler_rep_
)
1192 if (this->handler_rep_
.changes_required ())
1194 // Make necessary changes to the handler repository
1195 this->handler_rep_
.make_changes ();
1196 // Turn off <wakeup_all_threads_> since all necessary changes
1198 this->wakeup_all_threads_
.reset ();
1201 this->handler_rep_
.~ACE_WFMO_Reactor_Handler_Repository ();
1204 // Open the handle repository. Two additional handles for internal
1206 if (this->handler_rep_
.open (size
+ 2) == -1)
1207 ACELIB_ERROR_RETURN ((LM_ERROR
, ACE_TEXT ("%p\n"),
1208 ACE_TEXT ("opening handler repository")),
1211 this->delete_handler_rep_
= true;
1213 if (this->notify_handler_
!= 0 && this->delete_notify_handler_
)
1214 delete this->notify_handler_
;
1216 this->notify_handler_
= notify
;
1218 if (this->notify_handler_
== 0)
1220 ACE_NEW_RETURN (this->notify_handler_
,
1221 ACE_WFMO_Reactor_Notify
,
1224 if (this->notify_handler_
== 0)
1227 this->delete_notify_handler_
= true;
1231 // The order of the following two registrations is very important
1233 // Open the notification handler
1234 if (this->notify_handler_
->open (this, this->timer_queue_
) == -1)
1235 ACELIB_ERROR_RETURN ((LM_ERROR
,
1237 ACE_TEXT ("opening notify handler ")),
1240 // Register for <wakeup_all_threads> event
1241 if (this->register_handler (&this->wakeup_all_threads_handler_
,
1242 this->wakeup_all_threads_
.handle ()) == -1)
1243 ACELIB_ERROR_RETURN ((LM_ERROR
,
1245 ACE_TEXT ("registering thread wakeup handler")),
1248 // Since we have added two handles into the handler repository,
1249 // update the <handler_repository_>
1250 if (this->handler_rep_
.changes_required ())
1252 // Make necessary changes to the handler repository
1253 this->handler_rep_
.make_changes ();
1254 // Turn off <wakeup_all_threads_> since all necessary changes
1256 this->wakeup_all_threads_
.reset ();
1259 // We are open for business
1260 this->open_for_business_
= true;
1266 ACE_WFMO_Reactor::set_sig_handler (ACE_Sig_Handler
*signal_handler
)
1268 if (this->signal_handler_
!= 0 && this->delete_signal_handler_
)
1269 delete this->signal_handler_
;
1270 this->signal_handler_
= signal_handler
;
1271 this->delete_signal_handler_
= false;
1276 ACE_WFMO_Reactor::timer_queue () const
1278 return this->timer_queue_
;
1282 ACE_WFMO_Reactor::timer_queue (ACE_Timer_Queue
*tq
)
1284 if (this->delete_timer_queue_
)
1286 delete this->timer_queue_
;
1288 else if (this->timer_queue_
)
1290 this->timer_queue_
->close ();
1292 this->timer_queue_
= tq
;
1293 this->delete_timer_queue_
= false;
1298 ACE_WFMO_Reactor::close ()
1300 // This GUARD is necessary since we are updating shared state.
1301 ACE_GUARD_RETURN (ACE_Process_Mutex
, ace_mon
, this->lock_
, -1);
1303 // If we are already closed, return error
1304 if (!this->open_for_business_
)
1307 // We are now closed
1308 this->open_for_business_
= false;
1309 // This will unregister all handles
1310 this->handler_rep_
.close ();
1312 // Make necessary changes to the handler repository that we caused
1313 // by the above actions. Someone who called close() is expecting that
1314 // things will be tidied up upon return.
1315 this->handler_rep_
.make_changes ();
1317 if (this->delete_timer_queue_
)
1319 delete this->timer_queue_
;
1320 this->timer_queue_
= 0;
1321 this->delete_timer_queue_
= false;
1323 else if (this->timer_queue_
)
1325 this->timer_queue_
->close ();
1326 this->timer_queue_
= 0;
1329 if (this->delete_signal_handler_
)
1331 delete this->signal_handler_
;
1332 this->signal_handler_
= 0;
1333 this->delete_signal_handler_
= false;
1336 if (this->delete_notify_handler_
)
1338 delete this->notify_handler_
;
1339 this->notify_handler_
= 0;
1340 this->delete_notify_handler_
= false;
1346 ACE_WFMO_Reactor::~ACE_WFMO_Reactor ()
1348 // Assumption: No threads are left in the Reactor when this method
1349 // is called (i.e., active_threads_ == 0)
1356 ACE_WFMO_Reactor::register_handler_i (ACE_HANDLE event_handle
,
1357 ACE_HANDLE io_handle
,
1358 ACE_Event_Handler
*event_handler
,
1359 ACE_Reactor_Mask new_masks
)
1361 // If this is a Winsock 1 system, the underlying event assignment will
1362 // not work, so don't try. Winsock 1 must use ACE_Select_Reactor for
1363 // reacting to socket activity.
1365 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
1367 ACE_UNUSED_ARG (event_handle
);
1368 ACE_UNUSED_ARG (io_handle
);
1369 ACE_UNUSED_ARG (event_handler
);
1370 ACE_UNUSED_ARG (new_masks
);
1371 ACE_NOTSUP_RETURN (-1);
1375 // Make sure that the <handle> is valid
1376 if (io_handle
== ACE_INVALID_HANDLE
)
1377 io_handle
= event_handler
->get_handle ();
1379 if (this->handler_rep_
.invalid_handle (io_handle
))
1381 errno
= ERROR_INVALID_HANDLE
;
1385 long new_network_events
= 0;
1386 bool delete_event
= false;
1387 std::unique_ptr
<ACE_Auto_Event
> event
;
1389 // Look up the repository to see if the <event_handler> is already
1391 ACE_Reactor_Mask old_masks
;
1392 int found
= this->handler_rep_
.modify_network_events_i (io_handle
,
1398 ACE_Reactor::ADD_MASK
);
1400 // Check to see if the user passed us a valid event; If not then we
1401 // need to create one
1402 if (event_handle
== ACE_INVALID_HANDLE
)
1404 std::unique_ptr
<ACE_Auto_Event
> tmp (new ACE_Auto_Event
);
1405 event
= std::move(tmp
);
1406 event_handle
= event
->handle ();
1407 delete_event
= true;
1410 int result
= ::WSAEventSelect ((SOCKET
) io_handle
,
1412 new_network_events
);
1413 // If we had found the <Event_Handler> there is nothing more to do
1416 else if (result
!= SOCKET_ERROR
&&
1417 this->handler_rep_
.bind_i (1,
1422 delete_event
) != -1)
1424 // The <event_handler> was not found in the repository, add to
1428 // Clear out the handle in the ACE_Auto_Event so that when
1429 // it is destroyed, the handle isn't closed out from under
1430 // the reactor. After setting it, running down the event
1431 // (via auto_ptr<> event, above) at function return will
1432 // cause an error because it'll try to close an invalid handle.
1433 // To avoid that smashing the errno value, save the errno
1434 // here, explicitly remove the event so the dtor won't do it
1435 // again, then restore errno.
1436 ACE_Errno_Guard
guard (errno
);
1437 event
->handle (ACE_INVALID_HANDLE
);
1445 #endif /* ACE_HAS_WINSOCK2 || ACE_HAS_WINSOCK2 == 0 */
1450 ACE_WFMO_Reactor::mask_ops_i (ACE_HANDLE io_handle
,
1451 ACE_Reactor_Mask new_masks
,
1454 // Make sure that the <handle> is valid
1455 if (this->handler_rep_
.invalid_handle (io_handle
))
1458 long new_network_events
= 0;
1459 bool delete_event
= false;
1460 ACE_HANDLE event_handle
= ACE_INVALID_HANDLE
;
1462 // Look up the repository to see if the <Event_Handler> is already
1464 ACE_Reactor_Mask old_masks
;
1465 int found
= this->handler_rep_
.modify_network_events_i (io_handle
,
1474 int result
= ::WSAEventSelect ((SOCKET
) io_handle
,
1476 new_network_events
);
1488 ACE_WFMO_Reactor_Handler_Repository::modify_network_events_i (ACE_HANDLE io_handle
,
1489 ACE_Reactor_Mask new_masks
,
1490 ACE_Reactor_Mask
&old_masks
,
1491 long &new_network_events
,
1492 ACE_HANDLE
&event_handle
,
1496 long *modified_network_events
= &new_network_events
;
1500 // First go through the current entries
1502 // Look for all entries in the current handles for matching handle
1503 // (except those that have been scheduled for deletion)
1504 for (i
= 0; i
< this->max_handlep1_
&& !found
; ++i
)
1505 if (io_handle
== this->current_info_
[i
].io_handle_
&&
1506 !this->current_info_
[i
].delete_entry_
)
1509 modified_network_events
= &this->current_info_
[i
].network_events_
;
1510 delete_event
= this->current_info_
[i
].delete_event_
;
1511 event_handle
= this->current_handles_
[i
];
1514 // Then pass through the suspended handles
1516 // Look for all entries in the suspended handles for matching handle
1517 // (except those that have been scheduled for deletion)
1518 for (i
= 0; i
< this->suspended_handles_
&& !found
; ++i
)
1519 if (io_handle
== this->current_suspended_info_
[i
].io_handle_
&&
1520 !this->current_suspended_info_
[i
].delete_entry_
)
1523 modified_network_events
= &this->current_suspended_info_
[i
].network_events_
;
1524 delete_event
= this->current_suspended_info_
[i
].delete_event_
;
1525 event_handle
= this->current_suspended_info_
[i
].event_handle_
;
1528 // Then check the to_be_added handles
1530 // Look for all entries in the to_be_added handles for matching
1531 // handle (except those that have been scheduled for deletion)
1532 for (i
= 0; i
< this->handles_to_be_added_
&& !found
; ++i
)
1533 if (io_handle
== this->to_be_added_info_
[i
].io_handle_
&&
1534 !this->to_be_added_info_
[i
].delete_entry_
)
1537 modified_network_events
= &this->to_be_added_info_
[i
].network_events_
;
1538 delete_event
= this->to_be_added_info_
[i
].delete_event_
;
1539 event_handle
= this->to_be_added_info_
[i
].event_handle_
;
1542 old_masks
= this->bit_ops (*modified_network_events
,
1546 new_network_events
= *modified_network_events
;
1552 ACE_WFMO_Reactor_Handler_Repository::find_handler (ACE_HANDLE handle
)
1554 long existing_masks_ignored
= 0;
1555 return this->handler (handle
, existing_masks_ignored
);
1559 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle
,
1560 long &existing_masks
)
1564 ACE_Event_Handler
*event_handler
= 0;
1567 // Look for the handle first
1569 // First go through the current entries
1571 // Look for all entries in the current handles for matching handle
1572 // (except those that have been scheduled for deletion)
1573 for (i
= 0; i
< this->max_handlep1_
&& !found
; ++i
)
1574 if ((handle
== this->current_info_
[i
].io_handle_
||
1575 handle
== this->current_handles_
[i
]) &&
1576 !this->current_info_
[i
].delete_entry_
)
1579 event_handler
= this->current_info_
[i
].event_handler_
;
1580 existing_masks
= this->current_info_
[i
].network_events_
;
1583 // Then pass through the suspended handles
1585 // Look for all entries in the suspended handles for matching handle
1586 // (except those that have been scheduled for deletion)
1587 for (i
= 0; i
< this->suspended_handles_
&& !found
; ++i
)
1588 if ((handle
== this->current_suspended_info_
[i
].io_handle_
||
1589 handle
== this->current_suspended_info_
[i
].event_handle_
) &&
1590 !this->current_suspended_info_
[i
].delete_entry_
)
1593 event_handler
= this->current_suspended_info_
[i
].event_handler_
;
1594 existing_masks
= this->current_suspended_info_
[i
].network_events_
;
1597 // Then check the to_be_added handles
1599 // Look for all entries in the to_be_added handles for matching
1600 // handle (except those that have been scheduled for deletion)
1601 for (i
= 0; i
< this->handles_to_be_added_
&& !found
; ++i
)
1602 if ((handle
== this->to_be_added_info_
[i
].io_handle_
||
1603 handle
== this->to_be_added_info_
[i
].event_handle_
) &&
1604 !this->to_be_added_info_
[i
].delete_entry_
)
1607 event_handler
= this->to_be_added_info_
[i
].event_handler_
;
1608 existing_masks
= this->to_be_added_info_
[i
].network_events_
;
1612 event_handler
->add_reference ();
1614 return event_handler
;
1618 ACE_WFMO_Reactor_Handler_Repository::handler (ACE_HANDLE handle
,
1619 ACE_Reactor_Mask user_masks
,
1620 ACE_Event_Handler
**user_event_handler
)
1622 long existing_masks
= 0;
1625 ACE_Event_Handler_var safe_event_handler
=
1626 this->handler (handle
, existing_masks
);
1628 if (safe_event_handler
.handler ())
1634 // Otherwise, make sure that the masks that the user is looking for
1637 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::READ_MASK
))
1638 if (!ACE_BIT_ENABLED (existing_masks
, FD_READ
) &&
1639 !ACE_BIT_ENABLED (existing_masks
, FD_CLOSE
))
1643 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::WRITE_MASK
))
1644 if (!ACE_BIT_ENABLED (existing_masks
, FD_WRITE
))
1648 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::EXCEPT_MASK
))
1649 if (!ACE_BIT_ENABLED (existing_masks
, FD_OOB
))
1653 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::ACCEPT_MASK
))
1654 if (!ACE_BIT_ENABLED (existing_masks
, FD_ACCEPT
))
1658 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::CONNECT_MASK
))
1659 if (!ACE_BIT_ENABLED (existing_masks
, FD_CONNECT
))
1663 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::QOS_MASK
))
1664 if (!ACE_BIT_ENABLED (existing_masks
, FD_QOS
))
1667 ACE_BIT_ENABLED (user_masks
, ACE_Event_Handler::GROUP_QOS_MASK
))
1668 if (!ACE_BIT_ENABLED (existing_masks
, FD_GROUP_QOS
))
1673 *user_event_handler
= safe_event_handler
.release ();
1681 // Waits for and dispatches all events. Returns -1 on error, 0 if
1682 // max_wait_time expired, or the number of events that were dispatched.
1684 ACE_WFMO_Reactor::event_handling (ACE_Time_Value
*max_wait_time
,
1687 ACE_TRACE ("ACE_WFMO_Reactor::event_handling");
1689 // Make sure we are not closed
1690 if (!this->open_for_business_
|| this->deactivated_
)
1696 // Stash the current time -- the destructor of this object will
1697 // automatically compute how much time elapsed since this method was
1699 ACE_Countdown_Time
countdown (max_wait_time
);
1704 // Check to see if it is ok to enter ::WaitForMultipleObjects
1705 // This will acquire <this->lock_> on success On failure, the
1706 // lock will not be acquired
1707 result
= this->ok_to_wait (max_wait_time
, alertable
);
1711 // Increment the number of active threads
1712 ++this->active_threads_
;
1714 // Release the <lock_>
1715 this->lock_
.release ();
1717 // Update the countdown to reflect time waiting to play with the
1719 countdown
.update ();
1721 // Calculate timeout
1722 int timeout
= this->calculate_timeout (max_wait_time
);
1724 // Wait for event to happen
1725 DWORD wait_status
= this->wait_for_multiple_events (timeout
,
1729 result
= this->safe_dispatch (wait_status
);
1732 // wait_for_multiple_events timed out without dispatching
1733 // anything. Because of rounding and conversion errors and
1734 // such, it could be that the wait loop timed out, but
1735 // the timer queue said it wasn't quite ready to expire a
1736 // timer. In this case, max_wait_time won't have quite been
1737 // reduced to 0, and we need to go around again. If max_wait_time
1738 // is all the way to 0, just return, as the entire time the
1739 // caller wanted to wait has been used up.
1740 countdown
.update (); // Reflect time waiting for events
1741 if (0 == max_wait_time
|| max_wait_time
->usec () == 0)
1745 while (result
== 0);
1751 ACE_WFMO_Reactor::ok_to_wait (ACE_Time_Value
*max_wait_time
,
1754 // Calculate the max time we should spend here
1756 // Note: There is really no need to involve the <timer_queue_> here
1757 // because even if a timeout in the <timer_queue_> does expire we
1758 // will not be able to dispatch it
1760 // We need to wait for both the <lock_> and <ok_to_wait_> event.
1761 // Use WaitForMultipleObjects() to wait for both atomically.
1762 int timeout
= max_wait_time
== 0 ? INFINITE
: max_wait_time
->msec ();
1766 result
= ::WaitForMultipleObjectsEx (sizeof this->atomic_wait_array_
/ sizeof (ACE_HANDLE
),
1767 this->atomic_wait_array_
,
1772 if (result
!= WAIT_IO_COMPLETION
)
1782 case WAIT_ABANDONED_0
:
1783 ACE_OS::set_errno_to_last_error ();
1789 // It is ok to enter ::WaitForMultipleObjects
1794 ACE_WFMO_Reactor::wait_for_multiple_events (int timeout
,
1797 // Wait for any of handles_ to be active, or until timeout expires.
1798 // If <alertable> is enabled allow asynchronous completion of
1799 // ReadFile and WriteFile operations.
1800 return ::WaitForMultipleObjectsEx (this->handler_rep_
.max_handlep1 (),
1801 this->handler_rep_
.handles (),
1808 ACE_WFMO_Reactor::poll_remaining_handles (DWORD slot
)
1810 return ::WaitForMultipleObjects (this->handler_rep_
.max_handlep1 () - slot
,
1811 this->handler_rep_
.handles () + slot
,
1817 ACE_WFMO_Reactor::calculate_timeout (ACE_Time_Value
*max_wait_time
)
1819 ACE_Time_Value
*time
= 0;
1820 if (this->owner_
== ACE_Thread::self ())
1821 time
= this->timer_queue_
->calculate_timeout (max_wait_time
);
1823 time
= max_wait_time
;
1828 return time
->msec ();
1832 ACE_WFMO_Reactor::expire_timers ()
1834 // If "owner" thread
1835 if (ACE_Thread::self () == this->owner_
)
1837 // expire all pending timers.
1838 return this->timer_queue_
->expire ();
1841 // Nothing to expire
1846 ACE_WFMO_Reactor::dispatch (DWORD wait_status
)
1849 int handlers_dispatched
= this->expire_timers ();
1851 switch (wait_status
)
1853 case WAIT_FAILED
: // Failure.
1854 ACE_OS::set_errno_to_last_error ();
1856 case WAIT_TIMEOUT
: // Timeout.
1858 return handlers_dispatched
;
1859 case WAIT_IO_COMPLETION
: // APC.
1860 return handlers_dispatched
;
1861 default: // Dispatch.
1862 // We'll let dispatch worry about abandoned mutes.
1863 handlers_dispatched
+= this->dispatch_handles (wait_status
);
1864 return handlers_dispatched
;
1868 // Dispatches any active handles from <handles_[slot]> to
1869 // <handles_[max_handlep1_]>, polling through our handle set looking
1870 // for active handles.
1872 ACE_WFMO_Reactor::dispatch_handles (DWORD wait_status
)
1874 // dispatch_slot is the absolute slot. Only += is used to
1876 DWORD dispatch_slot
= 0;
1878 // Cache this value, this is the absolute value.
1879 DWORD
const max_handlep1
= this->handler_rep_
.max_handlep1 ();
1881 // nCount starts off at <max_handlep1>, this is a transient count of
1882 // handles last waited on.
1883 DWORD nCount
= max_handlep1
;
1885 for (int number_of_handlers_dispatched
= 1;
1887 ++number_of_handlers_dispatched
)
1890 #if ! defined(__BORLANDC__) \
1891 && !defined (__MINGW32__) \
1892 && !defined (_MSC_VER)
1893 // wait_status is unsigned in Borland, Green Hills,
1894 // mingw32 and MSVC++
1895 // This >= is always true, with a warning.
1896 wait_status
>= WAIT_OBJECT_0
&&
1898 wait_status
<= (WAIT_OBJECT_0
+ nCount
));
1901 dispatch_slot
+= wait_status
- WAIT_OBJECT_0
;
1903 // Otherwise, a handle was abandoned.
1904 dispatch_slot
+= wait_status
- WAIT_ABANDONED_0
;
1907 if (this->dispatch_handler (dispatch_slot
, max_handlep1
) == -1)
1914 if (dispatch_slot
>= max_handlep1
)
1915 return number_of_handlers_dispatched
;
1918 nCount
= max_handlep1
- dispatch_slot
;
1920 // Check the remaining handles
1921 wait_status
= this->poll_remaining_handles (dispatch_slot
);
1922 switch (wait_status
)
1924 case WAIT_FAILED
: // Failure.
1925 ACE_OS::set_errno_to_last_error ();
1928 // There are no more handles ready, we can return.
1929 return number_of_handlers_dispatched
;
1935 ACE_WFMO_Reactor::dispatch_handler (DWORD slot
,
1938 // Check if there are window messages that need to be dispatched
1939 if (slot
== max_handlep1
)
1940 return this->dispatch_window_messages ();
1942 // Dispatch the handler if it has not been scheduled for deletion.
1943 // Note that this is a very week test if there are multiple threads
1944 // dispatching this slot as no locks are held here. Generally, you
1945 // do not want to do something like deleting the this pointer in
1946 // handle_close() if you have registered multiple times and there is
1947 // more than one thread in WFMO_Reactor->handle_events().
1948 else if (!this->handler_rep_
.scheduled_for_deletion (slot
))
1950 ACE_HANDLE event_handle
= *(this->handler_rep_
.handles () + slot
);
1952 if (this->handler_rep_
.current_info ()[slot
].io_entry_
)
1953 return this->complex_dispatch_handler (slot
,
1956 return this->simple_dispatch_handler (slot
,
1960 // The handle was scheduled for deletion, so we will skip it.
1965 ACE_WFMO_Reactor::simple_dispatch_handler (DWORD slot
,
1966 ACE_HANDLE event_handle
)
1968 // This dispatch is used for non-I/O entires
1970 // Assign the ``signaled'' HANDLE so that callers can get it.
1971 // siginfo_t is an ACE - specific fabrication. Constructor exists.
1972 siginfo_t
sig (event_handle
);
1974 ACE_Event_Handler
*event_handler
=
1975 this->handler_rep_
.current_info ()[slot
].event_handler_
;
1977 int requires_reference_counting
=
1978 event_handler
->reference_counting_policy ().value () ==
1979 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
1981 if (requires_reference_counting
)
1983 event_handler
->add_reference ();
1987 if (event_handler
->handle_signal (0, &sig
) == -1)
1988 this->handler_rep_
.unbind (event_handle
,
1989 ACE_Event_Handler::NULL_MASK
);
1991 // Call remove_reference() if needed.
1992 if (requires_reference_counting
)
1994 event_handler
->remove_reference ();
2001 ACE_WFMO_Reactor::complex_dispatch_handler (DWORD slot
,
2002 ACE_HANDLE event_handle
)
2004 // This dispatch is used for I/O entires.
2006 ACE_WFMO_Reactor_Handler_Repository::Current_Info
¤t_info
=
2007 this->handler_rep_
.current_info ()[slot
];
2009 WSANETWORKEVENTS events
;
2010 ACE_Reactor_Mask problems
= ACE_Event_Handler::NULL_MASK
;
2011 if (::WSAEnumNetworkEvents ((SOCKET
) current_info
.io_handle_
,
2013 &events
) == SOCKET_ERROR
)
2014 problems
= ACE_Event_Handler::ALL_EVENTS_MASK
;
2017 // Prepare for upcalls. Clear the bits from <events> representing
2018 // events the handler is not interested in. If there are any left,
2019 // do the upcall(s). upcall will replace events.lNetworkEvents
2020 // with bits representing any functions that requested a repeat
2021 // callback before checking handles again. In this case, continue
2022 // to call back unless the handler is unregistered as a result of
2023 // one of the upcalls. The way this is written, the upcalls will
2024 // keep being done even if one or more upcalls reported problems.
2025 // In practice this may turn out not so good, but let's see. If any
2026 // problems, please notify Steve Huston <shuston@riverace.com>
2027 // before or after you change this code.
2028 events
.lNetworkEvents
&= current_info
.network_events_
;
2029 while (events
.lNetworkEvents
!= 0)
2031 ACE_Event_Handler
*event_handler
=
2032 current_info
.event_handler_
;
2034 int reference_counting_required
=
2035 event_handler
->reference_counting_policy ().value () ==
2036 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
2038 // Call add_reference() if needed.
2039 if (reference_counting_required
)
2041 event_handler
->add_reference ();
2045 problems
|= this->upcall (current_info
.event_handler_
,
2046 current_info
.io_handle_
,
2049 // Call remove_reference() if needed.
2050 if (reference_counting_required
)
2052 event_handler
->remove_reference ();
2055 if (this->handler_rep_
.scheduled_for_deletion (slot
))
2060 if (problems
!= ACE_Event_Handler::NULL_MASK
2061 && !this->handler_rep_
.scheduled_for_deletion (slot
) )
2062 this->handler_rep_
.unbind (event_handle
, problems
);
2068 ACE_WFMO_Reactor::upcall (ACE_Event_Handler
*event_handler
,
2069 ACE_HANDLE io_handle
,
2070 WSANETWORKEVENTS
&events
)
2072 // This method figures out what exactly has happened to the socket
2073 // and then calls appropriate methods.
2074 ACE_Reactor_Mask problems
= ACE_Event_Handler::NULL_MASK
;
2076 // Go through the events and do the indicated upcalls. If the handler
2077 // doesn't want to be called back, clear the bit for that event.
2078 // At the end, set the bits back to <events> to request a repeat call.
2080 long actual_events
= events
.lNetworkEvents
;
2083 if (ACE_BIT_ENABLED (actual_events
, FD_WRITE
))
2085 action
= event_handler
->handle_output (io_handle
);
2088 ACE_CLR_BITS (actual_events
, FD_WRITE
);
2090 ACE_SET_BITS (problems
, ACE_Event_Handler::WRITE_MASK
);
2094 if (ACE_BIT_ENABLED (actual_events
, FD_CONNECT
))
2096 if (events
.iErrorCode
[FD_CONNECT_BIT
] == 0)
2098 // Successful connect
2099 action
= event_handler
->handle_output (io_handle
);
2102 ACE_CLR_BITS (actual_events
, FD_CONNECT
);
2104 ACE_SET_BITS (problems
,
2105 ACE_Event_Handler::CONNECT_MASK
);
2108 // Unsuccessful connect
2111 action
= event_handler
->handle_input (io_handle
);
2114 ACE_CLR_BITS (actual_events
, FD_CONNECT
);
2116 ACE_SET_BITS (problems
,
2117 ACE_Event_Handler::CONNECT_MASK
);
2122 if (ACE_BIT_ENABLED (actual_events
, FD_OOB
))
2124 action
= event_handler
->handle_exception (io_handle
);
2127 ACE_CLR_BITS (actual_events
, FD_OOB
);
2129 ACE_SET_BITS (problems
, ACE_Event_Handler::EXCEPT_MASK
);
2133 if (ACE_BIT_ENABLED (actual_events
, FD_READ
))
2135 action
= event_handler
->handle_input (io_handle
);
2138 ACE_CLR_BITS (actual_events
, FD_READ
);
2140 ACE_SET_BITS (problems
, ACE_Event_Handler::READ_MASK
);
2144 if (ACE_BIT_ENABLED (actual_events
, FD_CLOSE
)
2145 && ACE_BIT_DISABLED (problems
, ACE_Event_Handler::READ_MASK
))
2147 action
= event_handler
->handle_input (io_handle
);
2150 ACE_CLR_BITS (actual_events
, FD_CLOSE
);
2152 ACE_SET_BITS (problems
, ACE_Event_Handler::READ_MASK
);
2156 if (ACE_BIT_ENABLED (actual_events
, FD_ACCEPT
))
2158 action
= event_handler
->handle_input (io_handle
);
2161 ACE_CLR_BITS (actual_events
, FD_ACCEPT
);
2163 ACE_SET_BITS (problems
, ACE_Event_Handler::ACCEPT_MASK
);
2167 if (ACE_BIT_ENABLED (actual_events
, FD_QOS
))
2169 action
= event_handler
->handle_qos (io_handle
);
2172 ACE_CLR_BITS (actual_events
, FD_QOS
);
2174 ACE_SET_BITS (problems
, ACE_Event_Handler::QOS_MASK
);
2178 if (ACE_BIT_ENABLED (actual_events
, FD_GROUP_QOS
))
2180 action
= event_handler
->handle_group_qos (io_handle
);
2183 ACE_CLR_BITS (actual_events
, FD_GROUP_QOS
);
2185 ACE_SET_BITS (problems
, ACE_Event_Handler::GROUP_QOS_MASK
);
2189 events
.lNetworkEvents
= actual_events
;
2195 ACE_WFMO_Reactor::update_state ()
2197 // This GUARD is necessary since we are updating shared state.
2198 ACE_GUARD_RETURN (ACE_Process_Mutex
, monitor
, this->lock_
, -1);
2200 // Decrement active threads
2201 --this->active_threads_
;
2203 // Check if the state of the handler repository has changed or new
2204 // owner has to be set
2205 if (this->handler_rep_
.changes_required () || this->new_owner ())
2207 if (this->change_state_thread_
== 0)
2208 // Try to become the thread which will be responsible for the
2211 this->change_state_thread_
= ACE_Thread::self ();
2212 // Make sure no new threads are allowed to enter
2213 this->ok_to_wait_
.reset ();
2215 if (this->active_threads_
> 0)
2216 // Check for other active threads
2218 // Wake up all other threads
2219 this->wakeup_all_threads_
.signal ();
2222 // Go to sleep waiting for all other threads to get done
2223 this->waiting_to_change_state_
.wait ();
2224 // Re-acquire <lock_> again
2228 // Note that make_changes() calls into user code which can
2229 // request other changes. So keep looping until all
2230 // requested changes are completed.
2231 while (this->handler_rep_
.changes_required ())
2232 // Make necessary changes to the handler repository
2233 this->handler_rep_
.make_changes ();
2234 if (this->new_owner ())
2236 this->change_owner ();
2237 // Turn off <wakeup_all_threads_>
2238 this->wakeup_all_threads_
.reset ();
2239 // Let everyone know that it is ok to go ahead
2240 this->ok_to_wait_
.signal ();
2242 this->change_state_thread_
= 0;
2244 else if (this->active_threads_
== 0)
2245 // This thread did not get a chance to become the change
2246 // thread. If it is the last one out, it will wakeup the
2248 this->waiting_to_change_state_
.signal ();
2250 // This is if we were woken up explicitily by the user and there are
2251 // no state changes required.
2252 else if (this->active_threads_
== 0)
2253 // Turn off <wakeup_all_threads_>
2254 this->wakeup_all_threads_
.reset ();
2260 ACE_WFMO_Reactor::dump () const
2262 #if defined (ACE_HAS_DUMP)
2263 ACE_TRACE ("ACE_WFMO_Reactor::dump");
2265 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
2267 ACELIB_DEBUG ((LM_DEBUG
,
2268 ACE_TEXT ("Count of currently active threads = %d\n"),
2269 this->active_threads_
));
2271 ACELIB_DEBUG ((LM_DEBUG
,
2272 ACE_TEXT ("ID of owner thread = %d\n"),
2275 this->handler_rep_
.dump ();
2276 this->signal_handler_
->dump ();
2277 this->timer_queue_
->dump ();
2279 ACELIB_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
2280 #endif /* ACE_HAS_DUMP */
2284 ACE_WFMO_Reactor_Notify::dispatch_notifications (int & /*number_of_active_handles*/,
2285 ACE_Handle_Set
& /*rd_mask*/)
2291 ACE_WFMO_Reactor_Notify::is_dispatchable (ACE_Notification_Buffer
& /*buffer*/)
2297 ACE_WFMO_Reactor_Notify::notify_handle ()
2299 return ACE_INVALID_HANDLE
;
2303 ACE_WFMO_Reactor_Notify::read_notify_pipe (ACE_HANDLE
,
2304 ACE_Notification_Buffer
&)
2310 ACE_WFMO_Reactor_Notify::dispatch_notify (ACE_Notification_Buffer
&)
2316 ACE_WFMO_Reactor_Notify::close ()
2321 ACE_WFMO_Reactor_Notify::ACE_WFMO_Reactor_Notify (size_t max_notifies
)
2323 message_queue_ (max_notifies
* sizeof (ACE_Notification_Buffer
),
2324 max_notifies
* sizeof (ACE_Notification_Buffer
)),
2325 max_notify_iterations_ (-1)
2330 ACE_WFMO_Reactor_Notify::open (ACE_Reactor_Impl
*wfmo_reactor
,
2331 ACE_Timer_Queue
*timer_queue
,
2334 ACE_UNUSED_ARG (ignore_notify
);
2335 timer_queue_
= timer_queue
;
2336 return wfmo_reactor
->register_handler (this);
2340 ACE_WFMO_Reactor_Notify::get_handle () const
2342 return this->wakeup_one_thread_
.handle ();
2345 // Handle all pending notifications.
2348 ACE_WFMO_Reactor_Notify::handle_signal (int signum
, siginfo_t
*siginfo
, ucontext_t
*)
2350 ACE_UNUSED_ARG (signum
);
2352 // Just check for sanity...
2353 if (siginfo
->si_handle_
!= this->wakeup_one_thread_
.handle ())
2356 // This will get called when <WFMO_Reactor->wakeup_one_thread_> event
2358 // ACELIB_DEBUG ((LM_DEBUG,
2359 // ACE_TEXT ("(%t) waking up to handle internal notifications\n")));
2361 for (int i
= 1; ; ++i
)
2363 ACE_Message_Block
*mb
= 0;
2364 // Copy ACE_Time_Value::zero since dequeue_head will modify it.
2365 ACE_Time_Value
zero_timeout (ACE_Time_Value::zero
);
2366 if (this->message_queue_
.dequeue_head (mb
, &zero_timeout
) == -1)
2368 if (errno
== EWOULDBLOCK
)
2369 // We've reached the end of the processing, return
2373 return -1; // Something weird happened...
2377 ACE_Notification_Buffer
*buffer
=
2378 reinterpret_cast <ACE_Notification_Buffer
*> (mb
->base ());
2380 // If eh == 0 then we've got major problems! Otherwise, we
2381 // need to dispatch the appropriate handle_* method on the
2382 // ACE_Event_Handler pointer we've been passed.
2384 if (buffer
->eh_
!= 0)
2386 ACE_Event_Handler
*event_handler
=
2389 bool const requires_reference_counting
=
2390 event_handler
->reference_counting_policy ().value () ==
2391 ACE_Event_Handler::Reference_Counting_Policy::ENABLED
;
2395 switch (buffer
->mask_
)
2397 case ACE_Event_Handler::READ_MASK
:
2398 case ACE_Event_Handler::ACCEPT_MASK
:
2399 result
= event_handler
->handle_input (ACE_INVALID_HANDLE
);
2401 case ACE_Event_Handler::WRITE_MASK
:
2402 result
= event_handler
->handle_output (ACE_INVALID_HANDLE
);
2404 case ACE_Event_Handler::EXCEPT_MASK
:
2405 result
= event_handler
->handle_exception (ACE_INVALID_HANDLE
);
2407 case ACE_Event_Handler::QOS_MASK
:
2408 result
= event_handler
->handle_qos (ACE_INVALID_HANDLE
);
2410 case ACE_Event_Handler::GROUP_QOS_MASK
:
2411 result
= event_handler
->handle_group_qos (ACE_INVALID_HANDLE
);
2414 ACELIB_ERROR ((LM_ERROR
,
2415 ACE_TEXT ("invalid mask = %d\n"),
2421 event_handler
->handle_close (ACE_INVALID_HANDLE
,
2422 ACE_Event_Handler::EXCEPT_MASK
);
2424 if (requires_reference_counting
)
2426 event_handler
->remove_reference ();
2430 // Make sure to delete the memory regardless of success or
2434 // Bail out if we've reached the <max_notify_iterations_>.
2435 // Note that by default <max_notify_iterations_> is -1, so
2436 // we'll loop until we're done.
2437 if (i
== this->max_notify_iterations_
)
2439 // If there are still notification in the queue, we need
2441 if (!this->message_queue_
.is_empty ())
2442 this->wakeup_one_thread_
.signal ();
2444 // Break the loop as we have reached max_notify_iterations_
2451 // Notify the WFMO_Reactor, potentially enqueueing the
2452 // <ACE_Event_Handler> for subsequent processing in the WFMO_Reactor
2453 // thread of control.
2456 ACE_WFMO_Reactor_Notify::notify (ACE_Event_Handler
*event_handler
,
2457 ACE_Reactor_Mask mask
,
2458 ACE_Time_Value
*timeout
)
2460 if (event_handler
!= 0)
2462 ACE_Message_Block
*mb
= 0;
2464 ACE_Message_Block (sizeof (ACE_Notification_Buffer
)),
2467 ACE_Notification_Buffer
*buffer
=
2468 (ACE_Notification_Buffer
*) mb
->base ();
2469 buffer
->eh_
= event_handler
;
2470 buffer
->mask_
= mask
;
2472 // Convert from relative time to absolute time by adding the
2473 // current time of day. This is what <ACE_Message_Queue>
2476 *timeout
+= timer_queue_
->gettimeofday ();
2478 if (this->message_queue_
.enqueue_tail
2479 (mb
, timeout
) == -1)
2485 event_handler
->add_reference ();
2488 return this->wakeup_one_thread_
.signal ();
2492 ACE_WFMO_Reactor_Notify::max_notify_iterations (int iterations
)
2494 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2495 // Must always be > 0 or < 0 to optimize the loop exit condition.
2496 if (iterations
== 0)
2499 this->max_notify_iterations_
= iterations
;
2503 ACE_WFMO_Reactor_Notify::max_notify_iterations ()
2505 ACE_TRACE ("ACE_WFMO_Reactor_Notify::max_notify_iterations");
2506 return this->max_notify_iterations_
;
2510 ACE_WFMO_Reactor_Notify::purge_pending_notifications (ACE_Event_Handler
*eh
,
2511 ACE_Reactor_Mask mask
)
2513 ACE_TRACE ("ACE_WFMO_Reactor_Notify::purge_pending_notifications");
2515 // Go over message queue and take out all the matching event
2516 // handlers. If eh == 0, purge all. Note that reactor notifies (no
2517 // handler specified) are never purged, as this may lose a needed
2518 // notify the reactor queued for itself.
2520 if (this->message_queue_
.is_empty ())
2523 // Guard against new and/or delivered notifications while purging.
2524 // WARNING!!! The use of the notification queue's lock object for
2525 // this guard makes use of the knowledge that on Win32, the mutex
2526 // protecting the queue is really a CriticalSection, which is
2527 // recursive. This is how we can get away with locking it down here
2528 // and still calling member functions on the queue object.
2529 ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, monitor
, this->message_queue_
.lock(), -1);
2531 // first, copy all to our own local queue. Since we've locked everyone out
2532 // of here, there's no need to use any synchronization on this queue.
2533 ACE_Message_Queue
<ACE_NULL_SYNCH
> local_queue
;
2535 size_t queue_size
= this->message_queue_
.message_count ();
2536 int number_purged
= 0;
2540 for (index
= 0; index
< queue_size
; ++index
)
2542 ACE_Message_Block
*mb
= 0;
2543 if (-1 == this->message_queue_
.dequeue_head (mb
))
2544 return -1; // This shouldn't happen...
2546 ACE_Notification_Buffer
*buffer
=
2547 reinterpret_cast<ACE_Notification_Buffer
*> (mb
->base ());
2549 // If this is not a Reactor notify (it is for a particular handler),
2550 // and it matches the specified handler (or purging all),
2551 // and applying the mask would totally eliminate the notification, then
2552 // release it and count the number purged.
2553 if ((0 != buffer
->eh_
) &&
2554 (0 == eh
|| eh
== buffer
->eh_
) &&
2555 ACE_BIT_DISABLED (buffer
->mask_
, ~mask
)) // the existing notification mask
2556 // is left with nothing when
2557 // applying the mask
2559 ACE_Event_Handler
*event_handler
= buffer
->eh_
;
2561 event_handler
->remove_reference ();
2568 // To preserve it, move it to the local_queue. But first, if
2569 // this is not a Reactor notify (it is for a
2570 // particularhandler), and it matches the specified handler
2571 // (or purging all), then apply the mask
2572 if ((0 != buffer
->eh_
) &&
2573 (0 == eh
|| eh
== buffer
->eh_
))
2574 ACE_CLR_BITS(buffer
->mask_
, mask
);
2575 if (-1 == local_queue
.enqueue_head (mb
))
2580 if (this->message_queue_
.message_count ())
2581 { // Should be empty!
2586 // Now copy back from the local queue to the class queue, taking
2587 // care to preserve the original order...
2588 queue_size
= local_queue
.message_count ();
2589 for (index
= 0; index
< queue_size
; ++index
)
2591 ACE_Message_Block
*mb
= 0;
2592 if (-1 == local_queue
.dequeue_head (mb
))
2598 if (-1 == this->message_queue_
.enqueue_head (mb
))
2605 return number_purged
;
2609 ACE_WFMO_Reactor_Notify::dump () const
2611 #if defined (ACE_HAS_DUMP)
2612 ACE_TRACE ("ACE_WFMO_Reactor_Notify::dump");
2613 ACELIB_DEBUG ((LM_DEBUG
, ACE_BEGIN_DUMP
, this));
2614 this->timer_queue_
->dump ();
2615 ACELIB_DEBUG ((LM_DEBUG
,
2616 ACE_TEXT ("Max. iteration: %d\n"),
2617 this->max_notify_iterations_
));
2618 ACELIB_DEBUG ((LM_DEBUG
, ACE_END_DUMP
));
2619 #endif /* ACE_HAS_DUMP */
2623 ACE_WFMO_Reactor::max_notify_iterations (int iterations
)
2625 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2626 ACE_GUARD (ACE_Process_Mutex
, monitor
, this->lock_
);
2628 // Must always be > 0 or < 0 to optimize the loop exit condition.
2629 this->notify_handler_
->max_notify_iterations (iterations
);
2633 ACE_WFMO_Reactor::max_notify_iterations ()
2635 ACE_TRACE ("ACE_WFMO_Reactor::max_notify_iterations");
2636 ACE_GUARD_RETURN (ACE_Process_Mutex
, monitor
, this->lock_
, -1);
2638 return this->notify_handler_
->max_notify_iterations ();
2642 ACE_WFMO_Reactor::purge_pending_notifications (ACE_Event_Handler
*eh
,
2643 ACE_Reactor_Mask mask
)
2645 ACE_TRACE ("ACE_WFMO_Reactor::purge_pending_notifications");
2646 if (this->notify_handler_
== 0)
2649 return this->notify_handler_
->purge_pending_notifications (eh
, mask
);
2653 ACE_WFMO_Reactor::resumable_handler ()
2655 ACE_TRACE ("ACE_WFMO_Reactor::resumable_handler");
2660 // No-op WinSOCK2 methods to help WFMO_Reactor compile
2661 #if !defined (ACE_HAS_WINSOCK2) || (ACE_HAS_WINSOCK2 == 0)
2663 WSAEventSelect (SOCKET
/* s */,
2664 WSAEVENT
/* hEventObject */,
2665 long /* lNetworkEvents */)
2671 WSAEnumNetworkEvents (SOCKET
/* s */,
2672 WSAEVENT
/* hEventObject */,
2673 LPWSANETWORKEVENTS
/* lpNetworkEvents */)
2677 #endif /* !defined ACE_HAS_WINSOCK2 */
2679 ACE_END_VERSIONED_NAMESPACE_DECL
2681 #endif /* ACE_WIN32 */