3 #include "ace/WIN32_Proactor.h"
5 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
6 // WIN implemenatation of the Proactor.
8 #include "ace/Log_Category.h"
9 #include "ace/Object_Manager.h"
10 #include "ace/OS_NS_errno.h"
11 #include "ace/OS_NS_unistd.h"
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
16 * @class ACE_WIN32_Wakeup_Completion
18 * This is result object is used by the <end_event_loop> of the
19 * ACE_Proactor interface to wake up all the threads blocking
22 class ACE_WIN32_Wakeup_Completion
: public ACE_WIN32_Asynch_Result
26 ACE_WIN32_Wakeup_Completion (ACE_Handler::Proxy_Ptr
&handler_proxy
,
28 ACE_HANDLE event
= ACE_INVALID_HANDLE
,
30 int signal_number
= ACE_SIGRTMIN
);
33 virtual ~ACE_WIN32_Wakeup_Completion ();
35 /// This method calls the <handler>'s <handle_wakeup> method.
36 virtual void complete (size_t bytes_transferred
= 0,
38 const void *completion_key
= 0,
42 ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads
,
43 bool used_with_reactor_event_loop
)
44 : completion_port_ (0),
45 // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE !!!
46 number_of_threads_ (static_cast<DWORD
> (number_of_threads
)),
47 used_with_reactor_event_loop_ (used_with_reactor_event_loop
)
49 // Create the completion port.
50 this->completion_port_
= ::CreateIoCompletionPort (INVALID_HANDLE_VALUE
,
53 this->number_of_threads_
);
54 if (this->completion_port_
== 0)
55 ACELIB_ERROR ((LM_ERROR
,
57 ACE_TEXT ("CreateIoCompletionPort")));
59 this->get_asynch_pseudo_task ().start ();
62 ACE_WIN32_Proactor::~ACE_WIN32_Proactor ()
64 this->get_asynch_pseudo_task ().stop ();
69 ACE_Asynch_Pseudo_Task
&
70 ACE_WIN32_Proactor::get_asynch_pseudo_task ()
72 return this->pseudo_task_
;
76 ACE_WIN32_Proactor::close ()
78 // Close the completion port
79 if (this->completion_port_
!= 0)
81 // To avoid memory leaks we should delete all results from queue.
85 ACE_OVERLAPPED
*overlapped
= 0;
86 u_long bytes_transferred
= 0;
87 ULONG_PTR completion_key
= 0;
89 // Get the next asynchronous operation that completes
90 BOOL res
= ::GetQueuedCompletionStatus
91 (this->completion_port_
,
97 if (overlapped
== 0 || res
== FALSE
)
100 ACE_WIN32_Asynch_Result
*asynch_result
=
101 (ACE_WIN32_Asynch_Result
*) overlapped
;
103 delete asynch_result
;
106 int result
= ACE_OS::close (this->completion_port_
);
107 this->completion_port_
= 0;
115 ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle
,
116 const void *completion_key
)
118 ULONG_PTR
comp_key (reinterpret_cast<ULONG_PTR
> (completion_key
));
120 // No locking is needed here as no state changes.
121 ACE_HANDLE cp
= ::CreateIoCompletionPort (handle
,
122 this->completion_port_
,
124 this->number_of_threads_
);
127 ACE_OS::set_errno_to_last_error ();
128 // If errno == ERROR_INVALID_PARAMETER, then this handle was
129 // already registered.
130 if (errno
!= ERROR_INVALID_PARAMETER
)
134 ACELIB_DEBUG ((LM_ERROR
,
136 ACE_TEXT ("CreateIoCompletionPort")));
144 ACE_Asynch_Read_Stream_Impl
*
145 ACE_WIN32_Proactor::create_asynch_read_stream ()
147 ACE_Asynch_Read_Stream_Impl
*implementation
= 0;
148 ACE_NEW_RETURN (implementation
,
149 ACE_WIN32_Asynch_Read_Stream (this),
151 return implementation
;
154 ACE_Asynch_Write_Stream_Impl
*
155 ACE_WIN32_Proactor::create_asynch_write_stream ()
157 ACE_Asynch_Write_Stream_Impl
*implementation
= 0;
158 ACE_NEW_RETURN (implementation
,
159 ACE_WIN32_Asynch_Write_Stream (this),
161 return implementation
;
164 ACE_Asynch_Read_Dgram_Impl
*
165 ACE_WIN32_Proactor::create_asynch_read_dgram ()
167 ACE_Asynch_Read_Dgram_Impl
*implementation
= 0;
168 ACE_NEW_RETURN (implementation
,
169 ACE_WIN32_Asynch_Read_Dgram (this),
171 return implementation
;
174 ACE_Asynch_Write_Dgram_Impl
*
175 ACE_WIN32_Proactor::create_asynch_write_dgram ()
177 ACE_Asynch_Write_Dgram_Impl
*implementation
= 0;
178 ACE_NEW_RETURN (implementation
,
179 ACE_WIN32_Asynch_Write_Dgram (this),
181 return implementation
;
184 ACE_Asynch_Read_File_Impl
*
185 ACE_WIN32_Proactor::create_asynch_read_file ()
187 ACE_Asynch_Read_File_Impl
*implementation
= 0;
188 ACE_NEW_RETURN (implementation
,
189 ACE_WIN32_Asynch_Read_File (this),
191 return implementation
;
194 ACE_Asynch_Write_File_Impl
*
195 ACE_WIN32_Proactor::create_asynch_write_file ()
197 ACE_Asynch_Write_File_Impl
*implementation
= 0;
198 ACE_NEW_RETURN (implementation
,
199 ACE_WIN32_Asynch_Write_File (this),
201 return implementation
;
204 ACE_Asynch_Accept_Impl
*
205 ACE_WIN32_Proactor::create_asynch_accept ()
207 ACE_Asynch_Accept_Impl
*implementation
= 0;
208 ACE_NEW_RETURN (implementation
,
209 ACE_WIN32_Asynch_Accept (this),
211 return implementation
;
214 ACE_Asynch_Connect_Impl
*
215 ACE_WIN32_Proactor::create_asynch_connect ()
217 ACE_Asynch_Connect_Impl
*implementation
= 0;
218 ACE_NEW_RETURN (implementation
,
219 ACE_WIN32_Asynch_Connect (this),
221 return implementation
;
224 ACE_Asynch_Transmit_File_Impl
*
225 ACE_WIN32_Proactor::create_asynch_transmit_file ()
227 ACE_Asynch_Transmit_File_Impl
*implementation
= 0;
228 ACE_NEW_RETURN (implementation
,
229 ACE_WIN32_Asynch_Transmit_File (this),
231 return implementation
;
234 ACE_Asynch_Read_Stream_Result_Impl
*
235 ACE_WIN32_Proactor::create_asynch_read_stream_result
236 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
238 ACE_Message_Block
&message_block
,
239 size_t bytes_to_read
,
245 ACE_Asynch_Read_Stream_Result_Impl
*implementation
= 0;
246 ACE_NEW_RETURN (implementation
,
247 ACE_WIN32_Asynch_Read_Stream_Result (handler_proxy
,
256 return implementation
;
259 ACE_Asynch_Write_Stream_Result_Impl
*
260 ACE_WIN32_Proactor::create_asynch_write_stream_result
261 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
263 ACE_Message_Block
&message_block
,
264 size_t bytes_to_write
,
270 ACE_Asynch_Write_Stream_Result_Impl
*implementation
= 0;
271 ACE_NEW_RETURN (implementation
,
272 ACE_WIN32_Asynch_Write_Stream_Result (handler_proxy
,
281 return implementation
;
284 ACE_Asynch_Read_File_Result_Impl
*
285 ACE_WIN32_Proactor::create_asynch_read_file_result
286 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
288 ACE_Message_Block
&message_block
,
289 size_t bytes_to_read
,
297 ACE_Asynch_Read_File_Result_Impl
*implementation
= 0;
298 ACE_NEW_RETURN (implementation
,
299 ACE_WIN32_Asynch_Read_File_Result (handler_proxy
,
310 return implementation
;
313 ACE_Asynch_Write_File_Result_Impl
*
314 ACE_WIN32_Proactor::create_asynch_write_file_result
315 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
317 ACE_Message_Block
&message_block
,
318 size_t bytes_to_write
,
326 ACE_Asynch_Write_File_Result_Impl
*implementation
= 0;
327 ACE_NEW_RETURN (implementation
,
328 ACE_WIN32_Asynch_Write_File_Result (handler_proxy
,
339 return implementation
;
342 ACE_Asynch_Read_Dgram_Result_Impl
*
343 ACE_WIN32_Proactor::create_asynch_read_dgram_result
344 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
346 ACE_Message_Block
*message_block
,
347 size_t bytes_to_read
,
355 ACE_Asynch_Read_Dgram_Result_Impl
*implementation
= 0;
356 ACE_NEW_RETURN (implementation
,
357 ACE_WIN32_Asynch_Read_Dgram_Result (handler_proxy
,
368 return implementation
;
371 ACE_Asynch_Write_Dgram_Result_Impl
*
372 ACE_WIN32_Proactor::create_asynch_write_dgram_result
373 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
375 ACE_Message_Block
*message_block
,
376 size_t bytes_to_read
,
383 ACE_Asynch_Write_Dgram_Result_Impl
*implementation
= 0;
384 ACE_NEW_RETURN (implementation
,
385 ACE_WIN32_Asynch_Write_Dgram_Result(handler_proxy
,
395 return implementation
;
398 ACE_Asynch_Accept_Result_Impl
*
399 ACE_WIN32_Proactor::create_asynch_accept_result
400 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
401 ACE_HANDLE listen_handle
,
402 ACE_HANDLE accept_handle
,
403 ACE_Message_Block
&message_block
,
404 size_t bytes_to_read
,
410 ACE_Asynch_Accept_Result_Impl
*implementation
= 0;
411 ACE_NEW_RETURN (implementation
,
412 ACE_WIN32_Asynch_Accept_Result (handler_proxy
,
422 return implementation
;
425 ACE_Asynch_Connect_Result_Impl
*
426 ACE_WIN32_Proactor::create_asynch_connect_result
427 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
428 ACE_HANDLE connect_handle
,
434 ACE_Asynch_Connect_Result_Impl
*implementation
= 0;
435 ACE_NEW_RETURN (implementation
,
436 ACE_WIN32_Asynch_Connect_Result (handler_proxy
,
443 return implementation
;
446 ACE_Asynch_Transmit_File_Result_Impl
*
447 ACE_WIN32_Proactor::create_asynch_transmit_file_result
448 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
451 ACE_Asynch_Transmit_File::Header_And_Trailer
*header_and_trailer
,
452 size_t bytes_to_write
,
455 size_t bytes_per_send
,
462 ACE_Asynch_Transmit_File_Result_Impl
*implementation
= 0;
463 ACE_NEW_RETURN (implementation
,
464 ACE_WIN32_Asynch_Transmit_File_Result (handler_proxy
,
478 return implementation
;
481 ACE_Asynch_Result_Impl
*
482 ACE_WIN32_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
484 const ACE_Time_Value
&tv
,
489 ACE_Asynch_Result_Impl
*implementation
= 0;
490 ACE_NEW_RETURN (implementation
,
491 ACE_WIN32_Asynch_Timer (handler_proxy
,
498 return implementation
;
502 ACE_WIN32_Proactor::handle_signal (int, siginfo_t
*, ucontext_t
*)
504 // Perform a non-blocking "poll" for all the I/O events that have
505 // completed in the I/O completion queue.
509 for (ACE_Time_Value
timeout (0, 0);
513 result
= this->handle_events (timeout
);
519 // If our handle_events failed, we'll report a failure to the
521 return result
== -1 ? -1 : 0;
525 ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle
,
526 ACE_Reactor_Mask close_mask
)
528 ACE_UNUSED_ARG (close_mask
);
529 ACE_UNUSED_ARG (handle
);
531 return this->close ();
535 ACE_WIN32_Proactor::get_handle () const
537 if (this->used_with_reactor_event_loop_
)
538 return this->event_
.handle ();
544 ACE_WIN32_Proactor::handle_events (ACE_Time_Value
&wait_time
)
546 // Decrement <wait_time> with the amount of time spent in the method
547 ACE_Countdown_Time
countdown (&wait_time
);
548 return this->handle_events (wait_time
.msec ());
552 ACE_WIN32_Proactor::handle_events ()
554 return this->handle_events (ACE_INFINITE
);
558 ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds
)
560 ACE_OVERLAPPED
*overlapped
= 0;
561 u_long bytes_transferred
= 0;
562 ULONG_PTR completion_key
= 0;
564 // Get the next asynchronous operation that completes
565 BOOL result
= ::GetQueuedCompletionStatus (this->completion_port_
,
570 if (result
== FALSE
&& overlapped
== 0)
572 ACE_OS::set_errno_to_last_error ();
581 // Calling GetQueuedCompletionStatus with timeout value 0
582 // returns FALSE with extended errno "ERROR_SUCCESS" errno =
583 // ETIME; ?? I don't know if this has to be done !!
588 ACELIB_DEBUG ((LM_ERROR
,
590 ACE_TEXT ("GetQueuedCompletionStatus")));
594 else if (overlapped
!= 0)
596 // Narrow the result.
597 ACE_WIN32_Asynch_Result
*asynch_result
= (ACE_WIN32_Asynch_Result
*) overlapped
;
599 // If errors happen, grab the error.
601 ACE_OS::set_errno_to_last_error ();
605 u_long result_err
= asynch_result
->error ();
607 // if "result_err" is 0 than
608 // It is normal OS/WIN32 AIO completion.
609 // We have cleared asynch_result->error_
610 // during shared_read/shared_write.
611 // The real error code is already stored in "errno",
612 // so copy "errno" value to the "result_err"
613 // and pass this "result_err" code
614 // to the application_specific_code ()
616 // "result_err" non zero
617 // it means we have "post_completed" result
618 // so pass this "result_err" code
619 // to the application_specific_code ()
624 this->application_specific_code (asynch_result
,
625 static_cast<size_t> (bytes_transferred
),
626 (void *) completion_key
,
633 ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result
*asynch_result
,
634 size_t bytes_transferred
,
635 const void *completion_key
,
640 // Call completion hook
641 asynch_result
->complete (bytes_transferred
,
643 (void *) completion_key
,
648 // This is crucial to prevent memory leaks
649 delete asynch_result
;
654 ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result
*result
)
656 // Grab the event associated with the Proactor
657 HANDLE handle
= this->get_handle ();
662 // to the ::PostQueuedCompletionStatus()
663 // error will be extracted later in handle_events()
665 DWORD bytes_transferred
= 0;
666 const void * completion_key
= 0 ;
670 // This cast is ok since the original API calls restricted the transfer
671 // counts to DWORD range.
672 bytes_transferred
= static_cast<DWORD
> (result
->bytes_transferred ());
673 completion_key
= result
->completion_key();
676 ULONG_PTR
comp_key (reinterpret_cast<ULONG_PTR
> (completion_key
));
679 if (::PostQueuedCompletionStatus (this->completion_port_
, // completion port
680 bytes_transferred
, // xfer count
681 comp_key
, // completion key
689 ACELIB_DEBUG ((LM_ERROR
,
691 ACE_TEXT ("PostQueuedCompletionStatus failed")));
696 // If Proactor event is valid, signal it
697 if (handle
!= ACE_INVALID_HANDLE
699 ACE_OS::event_signal (&handle
);
705 ACE_WIN32_Proactor::post_wakeup_completions (int how_many
)
707 ACE_WIN32_Wakeup_Completion
*wakeup_completion
= 0;
709 for (ssize_t ci
= 0; ci
< how_many
; ci
++)
713 ACE_WIN32_Wakeup_Completion (this->wakeup_handler_
.proxy ()),
716 if (wakeup_completion
->post_completion (this) == -1)
724 ACE_WIN32_Proactor::wake_up_dispatch_threads ()
730 ACE_WIN32_Proactor::close_dispatch_threads (int)
736 ACE_WIN32_Proactor::number_of_threads () const
738 return static_cast<size_t> (this->number_of_threads_
);
742 ACE_WIN32_Proactor::number_of_threads (size_t threads
)
744 this->number_of_threads_
= static_cast<DWORD
> (threads
);
747 ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer
748 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
750 const ACE_Time_Value
&tv
,
754 : ACE_Asynch_Result_Impl (),
755 ACE_WIN32_Asynch_Result (handler_proxy
, act
, event
, 0, 0, priority
,
762 ACE_WIN32_Asynch_Timer::complete (size_t,
767 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
769 handler
->handle_time_out (this->time_
, this->act ());
772 ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion
773 (ACE_Handler::Proxy_Ptr
&handler_proxy
,
778 : ACE_Asynch_Result_Impl (),
779 ACE_WIN32_Asynch_Result
780 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
)
784 ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion ()
789 ACE_WIN32_Wakeup_Completion::complete (size_t /* bytes_transferred */,
791 const void * /* completion_key */,
794 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
796 handler
->handle_wakeup ();
799 ACE_END_VERSIONED_NAMESPACE_DECL
801 #endif /* ACE_WIN32 */