Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / ace / WIN32_Proactor.cpp
blobf697edf8f6bcd83d74f6d3be904e2e4ce487914d
1 //
3 #include "ace/WIN32_Proactor.h"
5 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
6 // WIN implemenatation of the Proactor.
8 #include "ace/Log_Category.h"
9 #include "ace/Object_Manager.h"
10 #include "ace/OS_NS_errno.h"
11 #include "ace/OS_NS_unistd.h"
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 /**
16 * @class ACE_WIN32_Wakeup_Completion
18 * This is result object is used by the <end_event_loop> of the
19 * ACE_Proactor interface to wake up all the threads blocking
20 * for completions.
22 class ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result
24 public:
25 /// Constructor.
26 ACE_WIN32_Wakeup_Completion (ACE_Handler::Proxy_Ptr &handler_proxy,
27 const void *act = 0,
28 ACE_HANDLE event = ACE_INVALID_HANDLE,
29 int priority = 0,
30 int signal_number = ACE_SIGRTMIN);
32 /// Destructor.
33 virtual ~ACE_WIN32_Wakeup_Completion ();
35 /// This method calls the <handler>'s <handle_wakeup> method.
36 virtual void complete (size_t bytes_transferred = 0,
37 int success = 1,
38 const void *completion_key = 0,
39 u_long error = 0);
42 ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads,
43 bool used_with_reactor_event_loop)
44 : completion_port_ (0),
45 // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE !!!
46 number_of_threads_ (static_cast<DWORD> (number_of_threads)),
47 used_with_reactor_event_loop_ (used_with_reactor_event_loop)
49 // Create the completion port.
50 this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
53 this->number_of_threads_);
54 if (this->completion_port_ == 0)
55 ACELIB_ERROR ((LM_ERROR,
56 ACE_TEXT ("%p\n"),
57 ACE_TEXT ("CreateIoCompletionPort")));
59 this->get_asynch_pseudo_task ().start ();
62 ACE_WIN32_Proactor::~ACE_WIN32_Proactor ()
64 this->get_asynch_pseudo_task ().stop ();
66 this->close ();
69 ACE_Asynch_Pseudo_Task &
70 ACE_WIN32_Proactor::get_asynch_pseudo_task ()
72 return this->pseudo_task_;
75 int
76 ACE_WIN32_Proactor::close ()
78 // Close the completion port
79 if (this->completion_port_ != 0)
81 // To avoid memory leaks we should delete all results from queue.
83 for (;;)
85 ACE_OVERLAPPED *overlapped = 0;
86 u_long bytes_transferred = 0;
87 ULONG_PTR completion_key = 0;
89 // Get the next asynchronous operation that completes
90 BOOL res = ::GetQueuedCompletionStatus
91 (this->completion_port_,
92 &bytes_transferred,
93 &completion_key,
94 &overlapped,
95 0); // poll
97 if (overlapped == 0 || res == FALSE)
98 break;
100 ACE_WIN32_Asynch_Result *asynch_result =
101 (ACE_WIN32_Asynch_Result *) overlapped;
103 delete asynch_result;
106 int result = ACE_OS::close (this->completion_port_);
107 this->completion_port_ = 0;
108 return result;
111 return 0;
115 ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle,
116 const void *completion_key)
118 ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));
120 // No locking is needed here as no state changes.
121 ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
122 this->completion_port_,
123 comp_key,
124 this->number_of_threads_);
125 if (cp == 0)
127 ACE_OS::set_errno_to_last_error ();
128 // If errno == ERROR_INVALID_PARAMETER, then this handle was
129 // already registered.
130 if (errno != ERROR_INVALID_PARAMETER)
132 if (ACE::debug ())
134 ACELIB_DEBUG ((LM_ERROR,
135 ACE_TEXT ("%p\n"),
136 ACE_TEXT ("CreateIoCompletionPort")));
138 return -1;
141 return 0;
144 ACE_Asynch_Read_Stream_Impl *
145 ACE_WIN32_Proactor::create_asynch_read_stream ()
147 ACE_Asynch_Read_Stream_Impl *implementation = 0;
148 ACE_NEW_RETURN (implementation,
149 ACE_WIN32_Asynch_Read_Stream (this),
151 return implementation;
154 ACE_Asynch_Write_Stream_Impl *
155 ACE_WIN32_Proactor::create_asynch_write_stream ()
157 ACE_Asynch_Write_Stream_Impl *implementation = 0;
158 ACE_NEW_RETURN (implementation,
159 ACE_WIN32_Asynch_Write_Stream (this),
161 return implementation;
164 ACE_Asynch_Read_Dgram_Impl *
165 ACE_WIN32_Proactor::create_asynch_read_dgram ()
167 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
168 ACE_NEW_RETURN (implementation,
169 ACE_WIN32_Asynch_Read_Dgram (this),
171 return implementation;
174 ACE_Asynch_Write_Dgram_Impl *
175 ACE_WIN32_Proactor::create_asynch_write_dgram ()
177 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
178 ACE_NEW_RETURN (implementation,
179 ACE_WIN32_Asynch_Write_Dgram (this),
181 return implementation;
184 ACE_Asynch_Read_File_Impl *
185 ACE_WIN32_Proactor::create_asynch_read_file ()
187 ACE_Asynch_Read_File_Impl *implementation = 0;
188 ACE_NEW_RETURN (implementation,
189 ACE_WIN32_Asynch_Read_File (this),
191 return implementation;
194 ACE_Asynch_Write_File_Impl *
195 ACE_WIN32_Proactor::create_asynch_write_file ()
197 ACE_Asynch_Write_File_Impl *implementation = 0;
198 ACE_NEW_RETURN (implementation,
199 ACE_WIN32_Asynch_Write_File (this),
201 return implementation;
204 ACE_Asynch_Accept_Impl *
205 ACE_WIN32_Proactor::create_asynch_accept ()
207 ACE_Asynch_Accept_Impl *implementation = 0;
208 ACE_NEW_RETURN (implementation,
209 ACE_WIN32_Asynch_Accept (this),
211 return implementation;
214 ACE_Asynch_Connect_Impl *
215 ACE_WIN32_Proactor::create_asynch_connect ()
217 ACE_Asynch_Connect_Impl *implementation = 0;
218 ACE_NEW_RETURN (implementation,
219 ACE_WIN32_Asynch_Connect (this),
221 return implementation;
224 ACE_Asynch_Transmit_File_Impl *
225 ACE_WIN32_Proactor::create_asynch_transmit_file ()
227 ACE_Asynch_Transmit_File_Impl *implementation = 0;
228 ACE_NEW_RETURN (implementation,
229 ACE_WIN32_Asynch_Transmit_File (this),
231 return implementation;
234 ACE_Asynch_Read_Stream_Result_Impl *
235 ACE_WIN32_Proactor::create_asynch_read_stream_result
236 (const ACE_Handler::Proxy_Ptr &handler_proxy,
237 ACE_HANDLE handle,
238 ACE_Message_Block &message_block,
239 size_t bytes_to_read,
240 const void* act,
241 ACE_HANDLE event,
242 int priority,
243 int signal_number)
245 ACE_Asynch_Read_Stream_Result_Impl *implementation = 0;
246 ACE_NEW_RETURN (implementation,
247 ACE_WIN32_Asynch_Read_Stream_Result (handler_proxy,
248 handle,
249 message_block,
250 bytes_to_read,
251 act,
252 event,
253 priority,
254 signal_number),
256 return implementation;
259 ACE_Asynch_Write_Stream_Result_Impl *
260 ACE_WIN32_Proactor::create_asynch_write_stream_result
261 (const ACE_Handler::Proxy_Ptr &handler_proxy,
262 ACE_HANDLE handle,
263 ACE_Message_Block &message_block,
264 size_t bytes_to_write,
265 const void* act,
266 ACE_HANDLE event,
267 int priority,
268 int signal_number)
270 ACE_Asynch_Write_Stream_Result_Impl *implementation = 0;
271 ACE_NEW_RETURN (implementation,
272 ACE_WIN32_Asynch_Write_Stream_Result (handler_proxy,
273 handle,
274 message_block,
275 bytes_to_write,
276 act,
277 event,
278 priority,
279 signal_number),
281 return implementation;
284 ACE_Asynch_Read_File_Result_Impl *
285 ACE_WIN32_Proactor::create_asynch_read_file_result
286 (const ACE_Handler::Proxy_Ptr &handler_proxy,
287 ACE_HANDLE handle,
288 ACE_Message_Block &message_block,
289 size_t bytes_to_read,
290 const void* act,
291 u_long offset,
292 u_long offset_high,
293 ACE_HANDLE event,
294 int priority,
295 int signal_number)
297 ACE_Asynch_Read_File_Result_Impl *implementation = 0;
298 ACE_NEW_RETURN (implementation,
299 ACE_WIN32_Asynch_Read_File_Result (handler_proxy,
300 handle,
301 message_block,
302 bytes_to_read,
303 act,
304 offset,
305 offset_high,
306 event,
307 priority,
308 signal_number),
310 return implementation;
313 ACE_Asynch_Write_File_Result_Impl *
314 ACE_WIN32_Proactor::create_asynch_write_file_result
315 (const ACE_Handler::Proxy_Ptr &handler_proxy,
316 ACE_HANDLE handle,
317 ACE_Message_Block &message_block,
318 size_t bytes_to_write,
319 const void* act,
320 u_long offset,
321 u_long offset_high,
322 ACE_HANDLE event,
323 int priority,
324 int signal_number)
326 ACE_Asynch_Write_File_Result_Impl *implementation = 0;
327 ACE_NEW_RETURN (implementation,
328 ACE_WIN32_Asynch_Write_File_Result (handler_proxy,
329 handle,
330 message_block,
331 bytes_to_write,
332 act,
333 offset,
334 offset_high,
335 event,
336 priority,
337 signal_number),
339 return implementation;
342 ACE_Asynch_Read_Dgram_Result_Impl *
343 ACE_WIN32_Proactor::create_asynch_read_dgram_result
344 (const ACE_Handler::Proxy_Ptr &handler_proxy,
345 ACE_HANDLE handle,
346 ACE_Message_Block *message_block,
347 size_t bytes_to_read,
348 int flags,
349 int protocol_family,
350 const void* act,
351 ACE_HANDLE event,
352 int priority,
353 int signal_number)
355 ACE_Asynch_Read_Dgram_Result_Impl *implementation = 0;
356 ACE_NEW_RETURN (implementation,
357 ACE_WIN32_Asynch_Read_Dgram_Result (handler_proxy,
358 handle,
359 message_block,
360 bytes_to_read,
361 flags,
362 protocol_family,
363 act,
364 event,
365 priority,
366 signal_number),
368 return implementation;
371 ACE_Asynch_Write_Dgram_Result_Impl *
372 ACE_WIN32_Proactor::create_asynch_write_dgram_result
373 (const ACE_Handler::Proxy_Ptr &handler_proxy,
374 ACE_HANDLE handle,
375 ACE_Message_Block *message_block,
376 size_t bytes_to_read,
377 int flags,
378 const void* act,
379 ACE_HANDLE event,
380 int priority,
381 int signal_number)
383 ACE_Asynch_Write_Dgram_Result_Impl *implementation = 0;
384 ACE_NEW_RETURN (implementation,
385 ACE_WIN32_Asynch_Write_Dgram_Result(handler_proxy,
386 handle,
387 message_block,
388 bytes_to_read,
389 flags,
390 act,
391 event,
392 priority,
393 signal_number),
395 return implementation;
398 ACE_Asynch_Accept_Result_Impl *
399 ACE_WIN32_Proactor::create_asynch_accept_result
400 (const ACE_Handler::Proxy_Ptr &handler_proxy,
401 ACE_HANDLE listen_handle,
402 ACE_HANDLE accept_handle,
403 ACE_Message_Block &message_block,
404 size_t bytes_to_read,
405 const void* act,
406 ACE_HANDLE event,
407 int priority,
408 int signal_number)
410 ACE_Asynch_Accept_Result_Impl *implementation = 0;
411 ACE_NEW_RETURN (implementation,
412 ACE_WIN32_Asynch_Accept_Result (handler_proxy,
413 listen_handle,
414 accept_handle,
415 message_block,
416 bytes_to_read,
417 act,
418 event,
419 priority,
420 signal_number),
422 return implementation;
425 ACE_Asynch_Connect_Result_Impl *
426 ACE_WIN32_Proactor::create_asynch_connect_result
427 (const ACE_Handler::Proxy_Ptr &handler_proxy,
428 ACE_HANDLE connect_handle,
429 const void *act,
430 ACE_HANDLE event,
431 int priority,
432 int signal_number)
434 ACE_Asynch_Connect_Result_Impl *implementation = 0;
435 ACE_NEW_RETURN (implementation,
436 ACE_WIN32_Asynch_Connect_Result (handler_proxy,
437 connect_handle,
438 act,
439 event,
440 priority,
441 signal_number),
443 return implementation;
446 ACE_Asynch_Transmit_File_Result_Impl *
447 ACE_WIN32_Proactor::create_asynch_transmit_file_result
448 (const ACE_Handler::Proxy_Ptr &handler_proxy,
449 ACE_HANDLE socket,
450 ACE_HANDLE file,
451 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
452 size_t bytes_to_write,
453 u_long offset,
454 u_long offset_high,
455 size_t bytes_per_send,
456 u_long flags,
457 const void *act,
458 ACE_HANDLE event,
459 int priority,
460 int signal_number)
462 ACE_Asynch_Transmit_File_Result_Impl *implementation = 0;
463 ACE_NEW_RETURN (implementation,
464 ACE_WIN32_Asynch_Transmit_File_Result (handler_proxy,
465 socket,
466 file,
467 header_and_trailer,
468 bytes_to_write,
469 offset,
470 offset_high,
471 bytes_per_send,
472 flags,
473 act,
474 event,
475 priority,
476 signal_number),
478 return implementation;
481 ACE_Asynch_Result_Impl *
482 ACE_WIN32_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr &handler_proxy,
483 const void *act,
484 const ACE_Time_Value &tv,
485 ACE_HANDLE event,
486 int priority,
487 int signal_number)
489 ACE_Asynch_Result_Impl *implementation = 0;
490 ACE_NEW_RETURN (implementation,
491 ACE_WIN32_Asynch_Timer (handler_proxy,
492 act,
494 event,
495 priority,
496 signal_number),
498 return implementation;
502 ACE_WIN32_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
504 // Perform a non-blocking "poll" for all the I/O events that have
505 // completed in the I/O completion queue.
507 int result = 0;
509 for (ACE_Time_Value timeout (0, 0);
513 result = this->handle_events (timeout);
515 if (result != 1)
516 break;
519 // If our handle_events failed, we'll report a failure to the
520 // Reactor.
521 return result == -1 ? -1 : 0;
525 ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle,
526 ACE_Reactor_Mask close_mask)
528 ACE_UNUSED_ARG (close_mask);
529 ACE_UNUSED_ARG (handle);
531 return this->close ();
534 ACE_HANDLE
535 ACE_WIN32_Proactor::get_handle () const
537 if (this->used_with_reactor_event_loop_)
538 return this->event_.handle ();
539 else
540 return 0;
544 ACE_WIN32_Proactor::handle_events (ACE_Time_Value &wait_time)
546 // Decrement <wait_time> with the amount of time spent in the method
547 ACE_Countdown_Time countdown (&wait_time);
548 return this->handle_events (wait_time.msec ());
552 ACE_WIN32_Proactor::handle_events ()
554 return this->handle_events (ACE_INFINITE);
558 ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds)
560 ACE_OVERLAPPED *overlapped = 0;
561 u_long bytes_transferred = 0;
562 ULONG_PTR completion_key = 0;
564 // Get the next asynchronous operation that completes
565 BOOL result = ::GetQueuedCompletionStatus (this->completion_port_,
566 &bytes_transferred,
567 &completion_key,
568 &overlapped,
569 milli_seconds);
570 if (result == FALSE && overlapped == 0)
572 ACE_OS::set_errno_to_last_error ();
574 switch (errno)
576 case WAIT_TIMEOUT:
577 errno = ETIME;
578 return 0;
580 case ERROR_SUCCESS:
581 // Calling GetQueuedCompletionStatus with timeout value 0
582 // returns FALSE with extended errno "ERROR_SUCCESS" errno =
583 // ETIME; ?? I don't know if this has to be done !!
584 return 0;
586 default:
587 if (ACE::debug ())
588 ACELIB_DEBUG ((LM_ERROR,
589 ACE_TEXT ("%p\n"),
590 ACE_TEXT ("GetQueuedCompletionStatus")));
591 return -1;
594 else if (overlapped != 0)
596 // Narrow the result.
597 ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped;
599 // If errors happen, grab the error.
600 if (result == FALSE)
601 ACE_OS::set_errno_to_last_error ();
602 else
603 errno = 0;
605 u_long result_err = asynch_result->error ();
607 // if "result_err" is 0 than
608 // It is normal OS/WIN32 AIO completion.
609 // We have cleared asynch_result->error_
610 // during shared_read/shared_write.
611 // The real error code is already stored in "errno",
612 // so copy "errno" value to the "result_err"
613 // and pass this "result_err" code
614 // to the application_specific_code ()
615 // else
616 // "result_err" non zero
617 // it means we have "post_completed" result
618 // so pass this "result_err" code
619 // to the application_specific_code ()
621 if (result_err == 0)
622 result_err = errno ;
624 this->application_specific_code (asynch_result,
625 static_cast<size_t> (bytes_transferred),
626 (void *) completion_key,
627 result_err);
629 return 1;
632 void
633 ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result *asynch_result,
634 size_t bytes_transferred,
635 const void *completion_key,
636 u_long error)
638 ACE_SEH_TRY
640 // Call completion hook
641 asynch_result->complete (bytes_transferred,
642 error ? 0 : 1,
643 (void *) completion_key,
644 error);
646 ACE_SEH_FINALLY
648 // This is crucial to prevent memory leaks
649 delete asynch_result;
654 ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result)
656 // Grab the event associated with the Proactor
657 HANDLE handle = this->get_handle ();
659 // pass
660 // bytes_transferred
661 // completion_key
662 // to the ::PostQueuedCompletionStatus()
663 // error will be extracted later in handle_events()
665 DWORD bytes_transferred = 0;
666 const void * completion_key = 0 ;
668 if (result != 0)
670 // This cast is ok since the original API calls restricted the transfer
671 // counts to DWORD range.
672 bytes_transferred = static_cast<DWORD> (result->bytes_transferred ());
673 completion_key = result->completion_key();
676 ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));
678 // Post a completion
679 if (::PostQueuedCompletionStatus (this->completion_port_, // completion port
680 bytes_transferred, // xfer count
681 comp_key, // completion key
682 result // overlapped
683 ) == FALSE)
685 delete result;
687 if (ACE::debug ())
689 ACELIB_DEBUG ((LM_ERROR,
690 ACE_TEXT ("%p\n"),
691 ACE_TEXT ("PostQueuedCompletionStatus failed")));
693 return -1;
696 // If Proactor event is valid, signal it
697 if (handle != ACE_INVALID_HANDLE
698 && handle != 0)
699 ACE_OS::event_signal (&handle);
701 return 0;
705 ACE_WIN32_Proactor::post_wakeup_completions (int how_many)
707 ACE_WIN32_Wakeup_Completion *wakeup_completion = 0;
709 for (ssize_t ci = 0; ci < how_many; ci++)
711 ACE_NEW_RETURN
712 (wakeup_completion,
713 ACE_WIN32_Wakeup_Completion (this->wakeup_handler_.proxy ()),
714 -1);
716 if (wakeup_completion->post_completion (this) == -1)
717 return -1;
720 return 0;
724 ACE_WIN32_Proactor::wake_up_dispatch_threads ()
726 return 0;
730 ACE_WIN32_Proactor::close_dispatch_threads (int)
732 return 0;
735 size_t
736 ACE_WIN32_Proactor::number_of_threads () const
738 return static_cast<size_t> (this->number_of_threads_);
741 void
742 ACE_WIN32_Proactor::number_of_threads (size_t threads)
744 this->number_of_threads_ = static_cast<DWORD> (threads);
747 ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer
748 (const ACE_Handler::Proxy_Ptr &handler_proxy,
749 const void *act,
750 const ACE_Time_Value &tv,
751 ACE_HANDLE event,
752 int priority,
753 int signal_number)
754 : ACE_Asynch_Result_Impl (),
755 ACE_WIN32_Asynch_Result (handler_proxy, act, event, 0, 0, priority,
756 signal_number),
757 time_ (tv)
761 void
762 ACE_WIN32_Asynch_Timer::complete (size_t,
763 int,
764 const void *,
765 u_long)
767 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
768 if (handler != 0)
769 handler->handle_time_out (this->time_, this->act ());
772 ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion
773 (ACE_Handler::Proxy_Ptr &handler_proxy,
774 const void *act,
775 ACE_HANDLE event,
776 int priority,
777 int signal_number)
778 : ACE_Asynch_Result_Impl (),
779 ACE_WIN32_Asynch_Result
780 (handler_proxy, act, event, 0, 0, priority, signal_number)
784 ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion ()
788 void
789 ACE_WIN32_Wakeup_Completion::complete (size_t /* bytes_transferred */,
790 int /* success */,
791 const void * /* completion_key */,
792 u_long /* error */)
794 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
795 if (handler != 0)
796 handler->handle_wakeup ();
799 ACE_END_VERSIONED_NAMESPACE_DECL
801 #endif /* ACE_WIN32 */