3 #include "ace/WIN32_Proactor.h"
5 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
6 // WIN implemenatation of the Proactor.
8 #include "ace/Log_Category.h"
9 #include "ace/Object_Manager.h"
10 #include "ace/OS_NS_errno.h"
11 #include "ace/OS_NS_unistd.h"
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
16 * @class ACE_WIN32_Wakeup_Completion
18 * This is result object is used by the <end_event_loop> of the
19 * ACE_Proactor interface to wake up all the threads blocking
22 class ACE_WIN32_Wakeup_Completion
: public ACE_WIN32_Asynch_Result
27 ACE_WIN32_Wakeup_Completion (ACE_Handler::Proxy_Ptr
&handler_proxy
,
29 ACE_HANDLE event
= ACE_INVALID_HANDLE
,
31 int signal_number
= ACE_SIGRTMIN
);
34 virtual ~ACE_WIN32_Wakeup_Completion (void);
36 /// This method calls the <handler>'s <handle_wakeup> method.
37 virtual void complete (size_t bytes_transferred
= 0,
39 const void *completion_key
= 0,
43 ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads
,
44 bool used_with_reactor_event_loop
)
45 : completion_port_ (0),
46 // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE !!!
47 number_of_threads_ (static_cast<DWORD
> (number_of_threads
)),
48 used_with_reactor_event_loop_ (used_with_reactor_event_loop
)
50 // Create the completion port.
51 this->completion_port_
= ::CreateIoCompletionPort (INVALID_HANDLE_VALUE
,
54 this->number_of_threads_
);
55 if (this->completion_port_
== 0)
56 ACELIB_ERROR ((LM_ERROR
,
58 ACE_TEXT ("CreateIoCompletionPort")));
60 this->get_asynch_pseudo_task ().start ();
63 ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void)
65 this->get_asynch_pseudo_task ().stop ();
70 ACE_Asynch_Pseudo_Task
&
71 ACE_WIN32_Proactor::get_asynch_pseudo_task ()
73 return this->pseudo_task_
;
77 ACE_WIN32_Proactor::close (void)
79 // Close the completion port
80 if (this->completion_port_
!= 0)
82 // To avoid memory leaks we should delete all results from queue.
86 ACE_OVERLAPPED
*overlapped
= 0;
87 u_long bytes_transferred
= 0;
88 ULONG_PTR completion_key
= 0;
90 // Get the next asynchronous operation that completes
91 BOOL res
= ::GetQueuedCompletionStatus
92 (this->completion_port_
,
98 if (overlapped
== 0 || res
== FALSE
)
101 ACE_WIN32_Asynch_Result
*asynch_result
=
102 (ACE_WIN32_Asynch_Result
*) overlapped
;
104 delete asynch_result
;
107 int result
= ACE_OS::close (this->completion_port_
);
108 this->completion_port_
= 0;
116 ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle
,
117 const void *completion_key
)
119 ULONG_PTR
comp_key (reinterpret_cast<ULONG_PTR
> (completion_key
));
121 // No locking is needed here as no state changes.
122 ACE_HANDLE cp
= ::CreateIoCompletionPort (handle
,
123 this->completion_port_
,
125 this->number_of_threads_
);
128 ACE_OS::set_errno_to_last_error ();
129 // If errno == ERROR_INVALID_PARAMETER, then this handle was
130 // already registered.
131 if (errno
!= ERROR_INVALID_PARAMETER
)
135 ACELIB_DEBUG ((LM_ERROR
,
137 ACE_TEXT ("CreateIoCompletionPort")));
145 ACE_Asynch_Read_Stream_Impl
*
146 ACE_WIN32_Proactor::create_asynch_read_stream (void)
148 ACE_Asynch_Read_Stream_Impl
*implementation
= 0;
149 ACE_NEW_RETURN (implementation
,
150 ACE_WIN32_Asynch_Read_Stream (this),
152 return implementation
;
155 ACE_Asynch_Write_Stream_Impl
*
156 ACE_WIN32_Proactor::create_asynch_write_stream (void)
158 ACE_Asynch_Write_Stream_Impl
*implementation
= 0;
159 ACE_NEW_RETURN (implementation
,
160 ACE_WIN32_Asynch_Write_Stream (this),
162 return implementation
;
165 ACE_Asynch_Read_Dgram_Impl
*
166 ACE_WIN32_Proactor::create_asynch_read_dgram (void)
168 ACE_Asynch_Read_Dgram_Impl
*implementation
= 0;
169 ACE_NEW_RETURN (implementation
,
170 ACE_WIN32_Asynch_Read_Dgram (this),
172 return implementation
;
175 ACE_Asynch_Write_Dgram_Impl
*
176 ACE_WIN32_Proactor::create_asynch_write_dgram (void)
178 ACE_Asynch_Write_Dgram_Impl
*implementation
= 0;
179 ACE_NEW_RETURN (implementation
,
180 ACE_WIN32_Asynch_Write_Dgram (this),
182 return implementation
;
185 ACE_Asynch_Read_File_Impl
*
186 ACE_WIN32_Proactor::create_asynch_read_file (void)
188 ACE_Asynch_Read_File_Impl
*implementation
= 0;
189 ACE_NEW_RETURN (implementation
,
190 ACE_WIN32_Asynch_Read_File (this),
192 return implementation
;
195 ACE_Asynch_Write_File_Impl
*
196 ACE_WIN32_Proactor::create_asynch_write_file (void)
198 ACE_Asynch_Write_File_Impl
*implementation
= 0;
199 ACE_NEW_RETURN (implementation
,
200 ACE_WIN32_Asynch_Write_File (this),
202 return implementation
;
205 ACE_Asynch_Accept_Impl
*
206 ACE_WIN32_Proactor::create_asynch_accept (void)
208 ACE_Asynch_Accept_Impl
*implementation
= 0;
209 ACE_NEW_RETURN (implementation
,
210 ACE_WIN32_Asynch_Accept (this),
212 return implementation
;
215 ACE_Asynch_Connect_Impl
*
216 ACE_WIN32_Proactor::create_asynch_connect (void)
218 ACE_Asynch_Connect_Impl
*implementation
= 0;
219 ACE_NEW_RETURN (implementation
,
220 ACE_WIN32_Asynch_Connect (this),
222 return implementation
;
225 ACE_Asynch_Transmit_File_Impl
*
226 ACE_WIN32_Proactor::create_asynch_transmit_file (void)
228 ACE_Asynch_Transmit_File_Impl
*implementation
= 0;
229 ACE_NEW_RETURN (implementation
,
230 ACE_WIN32_Asynch_Transmit_File (this),
232 return implementation
;
235 ACE_Asynch_Read_Stream_Result_Impl
*
236 ACE_WIN32_Proactor::create_asynch_read_stream_result
237 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
239 ACE_Message_Block
&message_block
,
240 size_t bytes_to_read
,
246 ACE_Asynch_Read_Stream_Result_Impl
*implementation
= 0;
247 ACE_NEW_RETURN (implementation
,
248 ACE_WIN32_Asynch_Read_Stream_Result (handler_proxy
,
257 return implementation
;
260 ACE_Asynch_Write_Stream_Result_Impl
*
261 ACE_WIN32_Proactor::create_asynch_write_stream_result
262 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
264 ACE_Message_Block
&message_block
,
265 size_t bytes_to_write
,
271 ACE_Asynch_Write_Stream_Result_Impl
*implementation
= 0;
272 ACE_NEW_RETURN (implementation
,
273 ACE_WIN32_Asynch_Write_Stream_Result (handler_proxy
,
282 return implementation
;
285 ACE_Asynch_Read_File_Result_Impl
*
286 ACE_WIN32_Proactor::create_asynch_read_file_result
287 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
289 ACE_Message_Block
&message_block
,
290 size_t bytes_to_read
,
298 ACE_Asynch_Read_File_Result_Impl
*implementation
= 0;
299 ACE_NEW_RETURN (implementation
,
300 ACE_WIN32_Asynch_Read_File_Result (handler_proxy
,
311 return implementation
;
314 ACE_Asynch_Write_File_Result_Impl
*
315 ACE_WIN32_Proactor::create_asynch_write_file_result
316 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
318 ACE_Message_Block
&message_block
,
319 size_t bytes_to_write
,
327 ACE_Asynch_Write_File_Result_Impl
*implementation
= 0;
328 ACE_NEW_RETURN (implementation
,
329 ACE_WIN32_Asynch_Write_File_Result (handler_proxy
,
340 return implementation
;
343 ACE_Asynch_Read_Dgram_Result_Impl
*
344 ACE_WIN32_Proactor::create_asynch_read_dgram_result
345 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
347 ACE_Message_Block
*message_block
,
348 size_t bytes_to_read
,
356 ACE_Asynch_Read_Dgram_Result_Impl
*implementation
= 0;
357 ACE_NEW_RETURN (implementation
,
358 ACE_WIN32_Asynch_Read_Dgram_Result (handler_proxy
,
369 return implementation
;
372 ACE_Asynch_Write_Dgram_Result_Impl
*
373 ACE_WIN32_Proactor::create_asynch_write_dgram_result
374 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
376 ACE_Message_Block
*message_block
,
377 size_t bytes_to_read
,
384 ACE_Asynch_Write_Dgram_Result_Impl
*implementation
= 0;
385 ACE_NEW_RETURN (implementation
,
386 ACE_WIN32_Asynch_Write_Dgram_Result(handler_proxy
,
396 return implementation
;
399 ACE_Asynch_Accept_Result_Impl
*
400 ACE_WIN32_Proactor::create_asynch_accept_result
401 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
402 ACE_HANDLE listen_handle
,
403 ACE_HANDLE accept_handle
,
404 ACE_Message_Block
&message_block
,
405 size_t bytes_to_read
,
411 ACE_Asynch_Accept_Result_Impl
*implementation
= 0;
412 ACE_NEW_RETURN (implementation
,
413 ACE_WIN32_Asynch_Accept_Result (handler_proxy
,
423 return implementation
;
426 ACE_Asynch_Connect_Result_Impl
*
427 ACE_WIN32_Proactor::create_asynch_connect_result
428 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
429 ACE_HANDLE connect_handle
,
435 ACE_Asynch_Connect_Result_Impl
*implementation
= 0;
436 ACE_NEW_RETURN (implementation
,
437 ACE_WIN32_Asynch_Connect_Result (handler_proxy
,
444 return implementation
;
447 ACE_Asynch_Transmit_File_Result_Impl
*
448 ACE_WIN32_Proactor::create_asynch_transmit_file_result
449 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
452 ACE_Asynch_Transmit_File::Header_And_Trailer
*header_and_trailer
,
453 size_t bytes_to_write
,
456 size_t bytes_per_send
,
463 ACE_Asynch_Transmit_File_Result_Impl
*implementation
= 0;
464 ACE_NEW_RETURN (implementation
,
465 ACE_WIN32_Asynch_Transmit_File_Result (handler_proxy
,
479 return implementation
;
482 ACE_Asynch_Result_Impl
*
483 ACE_WIN32_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
485 const ACE_Time_Value
&tv
,
490 ACE_Asynch_Result_Impl
*implementation
= 0;
491 ACE_NEW_RETURN (implementation
,
492 ACE_WIN32_Asynch_Timer (handler_proxy
,
499 return implementation
;
503 ACE_WIN32_Proactor::handle_signal (int, siginfo_t
*, ucontext_t
*)
505 // Perform a non-blocking "poll" for all the I/O events that have
506 // completed in the I/O completion queue.
510 for (ACE_Time_Value
timeout (0, 0);
514 result
= this->handle_events (timeout
);
520 // If our handle_events failed, we'll report a failure to the
522 return result
== -1 ? -1 : 0;
526 ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle
,
527 ACE_Reactor_Mask close_mask
)
529 ACE_UNUSED_ARG (close_mask
);
530 ACE_UNUSED_ARG (handle
);
532 return this->close ();
536 ACE_WIN32_Proactor::get_handle (void) const
538 if (this->used_with_reactor_event_loop_
)
539 return this->event_
.handle ();
545 ACE_WIN32_Proactor::handle_events (ACE_Time_Value
&wait_time
)
547 // Decrement <wait_time> with the amount of time spent in the method
548 ACE_Countdown_Time
countdown (&wait_time
);
549 return this->handle_events (wait_time
.msec ());
553 ACE_WIN32_Proactor::handle_events (void)
555 return this->handle_events (ACE_INFINITE
);
559 ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds
)
561 ACE_OVERLAPPED
*overlapped
= 0;
562 u_long bytes_transferred
= 0;
563 ULONG_PTR completion_key
= 0;
565 // Get the next asynchronous operation that completes
566 BOOL result
= ::GetQueuedCompletionStatus (this->completion_port_
,
571 if (result
== FALSE
&& overlapped
== 0)
573 ACE_OS::set_errno_to_last_error ();
582 // Calling GetQueuedCompletionStatus with timeout value 0
583 // returns FALSE with extended errno "ERROR_SUCCESS" errno =
584 // ETIME; ?? I don't know if this has to be done !!
589 ACELIB_DEBUG ((LM_ERROR
,
591 ACE_TEXT ("GetQueuedCompletionStatus")));
595 else if (overlapped
!= 0)
597 // Narrow the result.
598 ACE_WIN32_Asynch_Result
*asynch_result
= (ACE_WIN32_Asynch_Result
*) overlapped
;
600 // If errors happen, grab the error.
602 ACE_OS::set_errno_to_last_error ();
606 u_long result_err
= asynch_result
->error ();
608 // if "result_err" is 0 than
609 // It is normal OS/WIN32 AIO completion.
610 // We have cleared asynch_result->error_
611 // during shared_read/shared_write.
612 // The real error code is already stored in "errno",
613 // so copy "errno" value to the "result_err"
614 // and pass this "result_err" code
615 // to the application_specific_code ()
617 // "result_err" non zero
618 // it means we have "post_completed" result
619 // so pass this "result_err" code
620 // to the application_specific_code ()
625 this->application_specific_code (asynch_result
,
626 static_cast<size_t> (bytes_transferred
),
627 (void *) completion_key
,
634 ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result
*asynch_result
,
635 size_t bytes_transferred
,
636 const void *completion_key
,
641 // Call completion hook
642 asynch_result
->complete (bytes_transferred
,
644 (void *) completion_key
,
649 // This is crucial to prevent memory leaks
650 delete asynch_result
;
655 ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result
*result
)
657 // Grab the event associated with the Proactor
658 HANDLE handle
= this->get_handle ();
663 // to the ::PostQueuedCompletionStatus()
664 // error will be extracted later in handle_events()
666 DWORD bytes_transferred
= 0;
667 const void * completion_key
= 0 ;
671 // This cast is ok since the original API calls restricted the transfer
672 // counts to DWORD range.
673 bytes_transferred
= static_cast<DWORD
> (result
->bytes_transferred ());
674 completion_key
= result
->completion_key();
677 ULONG_PTR
comp_key (reinterpret_cast<ULONG_PTR
> (completion_key
));
680 if (::PostQueuedCompletionStatus (this->completion_port_
, // completion port
681 bytes_transferred
, // xfer count
682 comp_key
, // completion key
690 ACELIB_DEBUG ((LM_ERROR
,
692 ACE_TEXT ("PostQueuedCompletionStatus failed")));
697 // If Proactor event is valid, signal it
698 if (handle
!= ACE_INVALID_HANDLE
700 ACE_OS::event_signal (&handle
);
706 ACE_WIN32_Proactor::post_wakeup_completions (int how_many
)
708 ACE_WIN32_Wakeup_Completion
*wakeup_completion
= 0;
710 for (ssize_t ci
= 0; ci
< how_many
; ci
++)
714 ACE_WIN32_Wakeup_Completion (this->wakeup_handler_
.proxy ()),
717 if (wakeup_completion
->post_completion (this) == -1)
725 ACE_WIN32_Proactor::wake_up_dispatch_threads (void)
731 ACE_WIN32_Proactor::close_dispatch_threads (int)
737 ACE_WIN32_Proactor::number_of_threads (void) const
739 return static_cast<size_t> (this->number_of_threads_
);
743 ACE_WIN32_Proactor::number_of_threads (size_t threads
)
745 this->number_of_threads_
= static_cast<DWORD
> (threads
);
748 ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer
749 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
751 const ACE_Time_Value
&tv
,
755 : ACE_Asynch_Result_Impl (),
756 ACE_WIN32_Asynch_Result (handler_proxy
, act
, event
, 0, 0, priority
,
763 ACE_WIN32_Asynch_Timer::complete (size_t,
768 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
770 handler
->handle_time_out (this->time_
, this->act ());
773 ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion
774 (ACE_Handler::Proxy_Ptr
&handler_proxy
,
779 : ACE_Asynch_Result_Impl (),
780 ACE_WIN32_Asynch_Result
781 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
)
785 ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void)
790 ACE_WIN32_Wakeup_Completion::complete (size_t /* bytes_transferred */,
792 const void * /* completion_key */,
795 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
797 handler
->handle_wakeup ();
800 ACE_END_VERSIONED_NAMESPACE_DECL
802 #endif /* ACE_WIN32 */