Revert to Current Include Style
[ACE_TAO.git] / ACE / ace / WIN32_Proactor.cpp
blob25bf4f31b324ed9d8e3c3a44568a5f04699d714d
1 //
3 #include "ace/WIN32_Proactor.h"
5 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
6 // WIN implemenatation of the Proactor.
8 #include "ace/Log_Category.h"
9 #include "ace/Object_Manager.h"
10 #include "ace/OS_NS_errno.h"
11 #include "ace/OS_NS_unistd.h"
13 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
15 /**
16 * @class ACE_WIN32_Wakeup_Completion
18 * This is result object is used by the <end_event_loop> of the
19 * ACE_Proactor interface to wake up all the threads blocking
20 * for completions.
22 class ACE_WIN32_Wakeup_Completion : public ACE_WIN32_Asynch_Result
25 public:
26 /// Constructor.
27 ACE_WIN32_Wakeup_Completion (ACE_Handler::Proxy_Ptr &handler_proxy,
28 const void *act = 0,
29 ACE_HANDLE event = ACE_INVALID_HANDLE,
30 int priority = 0,
31 int signal_number = ACE_SIGRTMIN);
33 /// Destructor.
34 virtual ~ACE_WIN32_Wakeup_Completion (void);
36 /// This method calls the <handler>'s <handle_wakeup> method.
37 virtual void complete (size_t bytes_transferred = 0,
38 int success = 1,
39 const void *completion_key = 0,
40 u_long error = 0);
43 ACE_WIN32_Proactor::ACE_WIN32_Proactor (size_t number_of_threads,
44 bool used_with_reactor_event_loop)
45 : completion_port_ (0),
46 // This *MUST* be 0, *NOT* ACE_INVALID_HANDLE !!!
47 number_of_threads_ (static_cast<DWORD> (number_of_threads)),
48 used_with_reactor_event_loop_ (used_with_reactor_event_loop)
50 // Create the completion port.
51 this->completion_port_ = ::CreateIoCompletionPort (INVALID_HANDLE_VALUE,
54 this->number_of_threads_);
55 if (this->completion_port_ == 0)
56 ACELIB_ERROR ((LM_ERROR,
57 ACE_TEXT ("%p\n"),
58 ACE_TEXT ("CreateIoCompletionPort")));
60 this->get_asynch_pseudo_task ().start ();
63 ACE_WIN32_Proactor::~ACE_WIN32_Proactor (void)
65 this->get_asynch_pseudo_task ().stop ();
67 this->close ();
70 ACE_Asynch_Pseudo_Task &
71 ACE_WIN32_Proactor::get_asynch_pseudo_task ()
73 return this->pseudo_task_;
76 int
77 ACE_WIN32_Proactor::close (void)
79 // Close the completion port
80 if (this->completion_port_ != 0)
82 // To avoid memory leaks we should delete all results from queue.
84 for (;;)
86 ACE_OVERLAPPED *overlapped = 0;
87 u_long bytes_transferred = 0;
88 ULONG_PTR completion_key = 0;
90 // Get the next asynchronous operation that completes
91 BOOL res = ::GetQueuedCompletionStatus
92 (this->completion_port_,
93 &bytes_transferred,
94 &completion_key,
95 &overlapped,
96 0); // poll
98 if (overlapped == 0 || res == FALSE)
99 break;
101 ACE_WIN32_Asynch_Result *asynch_result =
102 (ACE_WIN32_Asynch_Result *) overlapped;
104 delete asynch_result;
107 int result = ACE_OS::close (this->completion_port_);
108 this->completion_port_ = 0;
109 return result;
112 return 0;
116 ACE_WIN32_Proactor::register_handle (ACE_HANDLE handle,
117 const void *completion_key)
119 ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));
121 // No locking is needed here as no state changes.
122 ACE_HANDLE cp = ::CreateIoCompletionPort (handle,
123 this->completion_port_,
124 comp_key,
125 this->number_of_threads_);
126 if (cp == 0)
128 ACE_OS::set_errno_to_last_error ();
129 // If errno == ERROR_INVALID_PARAMETER, then this handle was
130 // already registered.
131 if (errno != ERROR_INVALID_PARAMETER)
133 if (ACE::debug ())
135 ACELIB_DEBUG ((LM_ERROR,
136 ACE_TEXT ("%p\n"),
137 ACE_TEXT ("CreateIoCompletionPort")));
139 return -1;
142 return 0;
145 ACE_Asynch_Read_Stream_Impl *
146 ACE_WIN32_Proactor::create_asynch_read_stream (void)
148 ACE_Asynch_Read_Stream_Impl *implementation = 0;
149 ACE_NEW_RETURN (implementation,
150 ACE_WIN32_Asynch_Read_Stream (this),
152 return implementation;
155 ACE_Asynch_Write_Stream_Impl *
156 ACE_WIN32_Proactor::create_asynch_write_stream (void)
158 ACE_Asynch_Write_Stream_Impl *implementation = 0;
159 ACE_NEW_RETURN (implementation,
160 ACE_WIN32_Asynch_Write_Stream (this),
162 return implementation;
165 ACE_Asynch_Read_Dgram_Impl *
166 ACE_WIN32_Proactor::create_asynch_read_dgram (void)
168 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
169 ACE_NEW_RETURN (implementation,
170 ACE_WIN32_Asynch_Read_Dgram (this),
172 return implementation;
175 ACE_Asynch_Write_Dgram_Impl *
176 ACE_WIN32_Proactor::create_asynch_write_dgram (void)
178 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
179 ACE_NEW_RETURN (implementation,
180 ACE_WIN32_Asynch_Write_Dgram (this),
182 return implementation;
185 ACE_Asynch_Read_File_Impl *
186 ACE_WIN32_Proactor::create_asynch_read_file (void)
188 ACE_Asynch_Read_File_Impl *implementation = 0;
189 ACE_NEW_RETURN (implementation,
190 ACE_WIN32_Asynch_Read_File (this),
192 return implementation;
195 ACE_Asynch_Write_File_Impl *
196 ACE_WIN32_Proactor::create_asynch_write_file (void)
198 ACE_Asynch_Write_File_Impl *implementation = 0;
199 ACE_NEW_RETURN (implementation,
200 ACE_WIN32_Asynch_Write_File (this),
202 return implementation;
205 ACE_Asynch_Accept_Impl *
206 ACE_WIN32_Proactor::create_asynch_accept (void)
208 ACE_Asynch_Accept_Impl *implementation = 0;
209 ACE_NEW_RETURN (implementation,
210 ACE_WIN32_Asynch_Accept (this),
212 return implementation;
215 ACE_Asynch_Connect_Impl *
216 ACE_WIN32_Proactor::create_asynch_connect (void)
218 ACE_Asynch_Connect_Impl *implementation = 0;
219 ACE_NEW_RETURN (implementation,
220 ACE_WIN32_Asynch_Connect (this),
222 return implementation;
225 ACE_Asynch_Transmit_File_Impl *
226 ACE_WIN32_Proactor::create_asynch_transmit_file (void)
228 ACE_Asynch_Transmit_File_Impl *implementation = 0;
229 ACE_NEW_RETURN (implementation,
230 ACE_WIN32_Asynch_Transmit_File (this),
232 return implementation;
235 ACE_Asynch_Read_Stream_Result_Impl *
236 ACE_WIN32_Proactor::create_asynch_read_stream_result
237 (const ACE_Handler::Proxy_Ptr &handler_proxy,
238 ACE_HANDLE handle,
239 ACE_Message_Block &message_block,
240 size_t bytes_to_read,
241 const void* act,
242 ACE_HANDLE event,
243 int priority,
244 int signal_number)
246 ACE_Asynch_Read_Stream_Result_Impl *implementation = 0;
247 ACE_NEW_RETURN (implementation,
248 ACE_WIN32_Asynch_Read_Stream_Result (handler_proxy,
249 handle,
250 message_block,
251 bytes_to_read,
252 act,
253 event,
254 priority,
255 signal_number),
257 return implementation;
260 ACE_Asynch_Write_Stream_Result_Impl *
261 ACE_WIN32_Proactor::create_asynch_write_stream_result
262 (const ACE_Handler::Proxy_Ptr &handler_proxy,
263 ACE_HANDLE handle,
264 ACE_Message_Block &message_block,
265 size_t bytes_to_write,
266 const void* act,
267 ACE_HANDLE event,
268 int priority,
269 int signal_number)
271 ACE_Asynch_Write_Stream_Result_Impl *implementation = 0;
272 ACE_NEW_RETURN (implementation,
273 ACE_WIN32_Asynch_Write_Stream_Result (handler_proxy,
274 handle,
275 message_block,
276 bytes_to_write,
277 act,
278 event,
279 priority,
280 signal_number),
282 return implementation;
285 ACE_Asynch_Read_File_Result_Impl *
286 ACE_WIN32_Proactor::create_asynch_read_file_result
287 (const ACE_Handler::Proxy_Ptr &handler_proxy,
288 ACE_HANDLE handle,
289 ACE_Message_Block &message_block,
290 size_t bytes_to_read,
291 const void* act,
292 u_long offset,
293 u_long offset_high,
294 ACE_HANDLE event,
295 int priority,
296 int signal_number)
298 ACE_Asynch_Read_File_Result_Impl *implementation = 0;
299 ACE_NEW_RETURN (implementation,
300 ACE_WIN32_Asynch_Read_File_Result (handler_proxy,
301 handle,
302 message_block,
303 bytes_to_read,
304 act,
305 offset,
306 offset_high,
307 event,
308 priority,
309 signal_number),
311 return implementation;
314 ACE_Asynch_Write_File_Result_Impl *
315 ACE_WIN32_Proactor::create_asynch_write_file_result
316 (const ACE_Handler::Proxy_Ptr &handler_proxy,
317 ACE_HANDLE handle,
318 ACE_Message_Block &message_block,
319 size_t bytes_to_write,
320 const void* act,
321 u_long offset,
322 u_long offset_high,
323 ACE_HANDLE event,
324 int priority,
325 int signal_number)
327 ACE_Asynch_Write_File_Result_Impl *implementation = 0;
328 ACE_NEW_RETURN (implementation,
329 ACE_WIN32_Asynch_Write_File_Result (handler_proxy,
330 handle,
331 message_block,
332 bytes_to_write,
333 act,
334 offset,
335 offset_high,
336 event,
337 priority,
338 signal_number),
340 return implementation;
343 ACE_Asynch_Read_Dgram_Result_Impl *
344 ACE_WIN32_Proactor::create_asynch_read_dgram_result
345 (const ACE_Handler::Proxy_Ptr &handler_proxy,
346 ACE_HANDLE handle,
347 ACE_Message_Block *message_block,
348 size_t bytes_to_read,
349 int flags,
350 int protocol_family,
351 const void* act,
352 ACE_HANDLE event,
353 int priority,
354 int signal_number)
356 ACE_Asynch_Read_Dgram_Result_Impl *implementation = 0;
357 ACE_NEW_RETURN (implementation,
358 ACE_WIN32_Asynch_Read_Dgram_Result (handler_proxy,
359 handle,
360 message_block,
361 bytes_to_read,
362 flags,
363 protocol_family,
364 act,
365 event,
366 priority,
367 signal_number),
369 return implementation;
372 ACE_Asynch_Write_Dgram_Result_Impl *
373 ACE_WIN32_Proactor::create_asynch_write_dgram_result
374 (const ACE_Handler::Proxy_Ptr &handler_proxy,
375 ACE_HANDLE handle,
376 ACE_Message_Block *message_block,
377 size_t bytes_to_read,
378 int flags,
379 const void* act,
380 ACE_HANDLE event,
381 int priority,
382 int signal_number)
384 ACE_Asynch_Write_Dgram_Result_Impl *implementation = 0;
385 ACE_NEW_RETURN (implementation,
386 ACE_WIN32_Asynch_Write_Dgram_Result(handler_proxy,
387 handle,
388 message_block,
389 bytes_to_read,
390 flags,
391 act,
392 event,
393 priority,
394 signal_number),
396 return implementation;
399 ACE_Asynch_Accept_Result_Impl *
400 ACE_WIN32_Proactor::create_asynch_accept_result
401 (const ACE_Handler::Proxy_Ptr &handler_proxy,
402 ACE_HANDLE listen_handle,
403 ACE_HANDLE accept_handle,
404 ACE_Message_Block &message_block,
405 size_t bytes_to_read,
406 const void* act,
407 ACE_HANDLE event,
408 int priority,
409 int signal_number)
411 ACE_Asynch_Accept_Result_Impl *implementation = 0;
412 ACE_NEW_RETURN (implementation,
413 ACE_WIN32_Asynch_Accept_Result (handler_proxy,
414 listen_handle,
415 accept_handle,
416 message_block,
417 bytes_to_read,
418 act,
419 event,
420 priority,
421 signal_number),
423 return implementation;
426 ACE_Asynch_Connect_Result_Impl *
427 ACE_WIN32_Proactor::create_asynch_connect_result
428 (const ACE_Handler::Proxy_Ptr &handler_proxy,
429 ACE_HANDLE connect_handle,
430 const void *act,
431 ACE_HANDLE event,
432 int priority,
433 int signal_number)
435 ACE_Asynch_Connect_Result_Impl *implementation = 0;
436 ACE_NEW_RETURN (implementation,
437 ACE_WIN32_Asynch_Connect_Result (handler_proxy,
438 connect_handle,
439 act,
440 event,
441 priority,
442 signal_number),
444 return implementation;
447 ACE_Asynch_Transmit_File_Result_Impl *
448 ACE_WIN32_Proactor::create_asynch_transmit_file_result
449 (const ACE_Handler::Proxy_Ptr &handler_proxy,
450 ACE_HANDLE socket,
451 ACE_HANDLE file,
452 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
453 size_t bytes_to_write,
454 u_long offset,
455 u_long offset_high,
456 size_t bytes_per_send,
457 u_long flags,
458 const void *act,
459 ACE_HANDLE event,
460 int priority,
461 int signal_number)
463 ACE_Asynch_Transmit_File_Result_Impl *implementation = 0;
464 ACE_NEW_RETURN (implementation,
465 ACE_WIN32_Asynch_Transmit_File_Result (handler_proxy,
466 socket,
467 file,
468 header_and_trailer,
469 bytes_to_write,
470 offset,
471 offset_high,
472 bytes_per_send,
473 flags,
474 act,
475 event,
476 priority,
477 signal_number),
479 return implementation;
482 ACE_Asynch_Result_Impl *
483 ACE_WIN32_Proactor::create_asynch_timer (const ACE_Handler::Proxy_Ptr &handler_proxy,
484 const void *act,
485 const ACE_Time_Value &tv,
486 ACE_HANDLE event,
487 int priority,
488 int signal_number)
490 ACE_Asynch_Result_Impl *implementation = 0;
491 ACE_NEW_RETURN (implementation,
492 ACE_WIN32_Asynch_Timer (handler_proxy,
493 act,
495 event,
496 priority,
497 signal_number),
499 return implementation;
503 ACE_WIN32_Proactor::handle_signal (int, siginfo_t *, ucontext_t *)
505 // Perform a non-blocking "poll" for all the I/O events that have
506 // completed in the I/O completion queue.
508 int result = 0;
510 for (ACE_Time_Value timeout (0, 0);
514 result = this->handle_events (timeout);
516 if (result != 1)
517 break;
520 // If our handle_events failed, we'll report a failure to the
521 // Reactor.
522 return result == -1 ? -1 : 0;
526 ACE_WIN32_Proactor::handle_close (ACE_HANDLE handle,
527 ACE_Reactor_Mask close_mask)
529 ACE_UNUSED_ARG (close_mask);
530 ACE_UNUSED_ARG (handle);
532 return this->close ();
535 ACE_HANDLE
536 ACE_WIN32_Proactor::get_handle (void) const
538 if (this->used_with_reactor_event_loop_)
539 return this->event_.handle ();
540 else
541 return 0;
545 ACE_WIN32_Proactor::handle_events (ACE_Time_Value &wait_time)
547 // Decrement <wait_time> with the amount of time spent in the method
548 ACE_Countdown_Time countdown (&wait_time);
549 return this->handle_events (wait_time.msec ());
553 ACE_WIN32_Proactor::handle_events (void)
555 return this->handle_events (ACE_INFINITE);
559 ACE_WIN32_Proactor::handle_events (unsigned long milli_seconds)
561 ACE_OVERLAPPED *overlapped = 0;
562 u_long bytes_transferred = 0;
563 ULONG_PTR completion_key = 0;
565 // Get the next asynchronous operation that completes
566 BOOL result = ::GetQueuedCompletionStatus (this->completion_port_,
567 &bytes_transferred,
568 &completion_key,
569 &overlapped,
570 milli_seconds);
571 if (result == FALSE && overlapped == 0)
573 ACE_OS::set_errno_to_last_error ();
575 switch (errno)
577 case WAIT_TIMEOUT:
578 errno = ETIME;
579 return 0;
581 case ERROR_SUCCESS:
582 // Calling GetQueuedCompletionStatus with timeout value 0
583 // returns FALSE with extended errno "ERROR_SUCCESS" errno =
584 // ETIME; ?? I don't know if this has to be done !!
585 return 0;
587 default:
588 if (ACE::debug ())
589 ACELIB_DEBUG ((LM_ERROR,
590 ACE_TEXT ("%p\n"),
591 ACE_TEXT ("GetQueuedCompletionStatus")));
592 return -1;
595 else if (overlapped != 0)
597 // Narrow the result.
598 ACE_WIN32_Asynch_Result *asynch_result = (ACE_WIN32_Asynch_Result *) overlapped;
600 // If errors happen, grab the error.
601 if (result == FALSE)
602 ACE_OS::set_errno_to_last_error ();
603 else
604 errno = 0;
606 u_long result_err = asynch_result->error ();
608 // if "result_err" is 0 than
609 // It is normal OS/WIN32 AIO completion.
610 // We have cleared asynch_result->error_
611 // during shared_read/shared_write.
612 // The real error code is already stored in "errno",
613 // so copy "errno" value to the "result_err"
614 // and pass this "result_err" code
615 // to the application_specific_code ()
616 // else
617 // "result_err" non zero
618 // it means we have "post_completed" result
619 // so pass this "result_err" code
620 // to the application_specific_code ()
622 if (result_err == 0)
623 result_err = errno ;
625 this->application_specific_code (asynch_result,
626 static_cast<size_t> (bytes_transferred),
627 (void *) completion_key,
628 result_err);
630 return 1;
633 void
634 ACE_WIN32_Proactor::application_specific_code (ACE_WIN32_Asynch_Result *asynch_result,
635 size_t bytes_transferred,
636 const void *completion_key,
637 u_long error)
639 ACE_SEH_TRY
641 // Call completion hook
642 asynch_result->complete (bytes_transferred,
643 error ? 0 : 1,
644 (void *) completion_key,
645 error);
647 ACE_SEH_FINALLY
649 // This is crucial to prevent memory leaks
650 delete asynch_result;
655 ACE_WIN32_Proactor::post_completion (ACE_WIN32_Asynch_Result *result)
657 // Grab the event associated with the Proactor
658 HANDLE handle = this->get_handle ();
660 // pass
661 // bytes_transferred
662 // completion_key
663 // to the ::PostQueuedCompletionStatus()
664 // error will be extracted later in handle_events()
666 DWORD bytes_transferred = 0;
667 const void * completion_key = 0 ;
669 if (result != 0)
671 // This cast is ok since the original API calls restricted the transfer
672 // counts to DWORD range.
673 bytes_transferred = static_cast<DWORD> (result->bytes_transferred ());
674 completion_key = result->completion_key();
677 ULONG_PTR comp_key (reinterpret_cast<ULONG_PTR> (completion_key));
679 // Post a completion
680 if (::PostQueuedCompletionStatus (this->completion_port_, // completion port
681 bytes_transferred, // xfer count
682 comp_key, // completion key
683 result // overlapped
684 ) == FALSE)
686 delete result;
688 if (ACE::debug ())
690 ACELIB_DEBUG ((LM_ERROR,
691 ACE_TEXT ("%p\n"),
692 ACE_TEXT ("PostQueuedCompletionStatus failed")));
694 return -1;
697 // If Proactor event is valid, signal it
698 if (handle != ACE_INVALID_HANDLE
699 && handle != 0)
700 ACE_OS::event_signal (&handle);
702 return 0;
706 ACE_WIN32_Proactor::post_wakeup_completions (int how_many)
708 ACE_WIN32_Wakeup_Completion *wakeup_completion = 0;
710 for (ssize_t ci = 0; ci < how_many; ci++)
712 ACE_NEW_RETURN
713 (wakeup_completion,
714 ACE_WIN32_Wakeup_Completion (this->wakeup_handler_.proxy ()),
715 -1);
717 if (wakeup_completion->post_completion (this) == -1)
718 return -1;
721 return 0;
725 ACE_WIN32_Proactor::wake_up_dispatch_threads (void)
727 return 0;
731 ACE_WIN32_Proactor::close_dispatch_threads (int)
733 return 0;
736 size_t
737 ACE_WIN32_Proactor::number_of_threads (void) const
739 return static_cast<size_t> (this->number_of_threads_);
742 void
743 ACE_WIN32_Proactor::number_of_threads (size_t threads)
745 this->number_of_threads_ = static_cast<DWORD> (threads);
748 ACE_WIN32_Asynch_Timer::ACE_WIN32_Asynch_Timer
749 (const ACE_Handler::Proxy_Ptr &handler_proxy,
750 const void *act,
751 const ACE_Time_Value &tv,
752 ACE_HANDLE event,
753 int priority,
754 int signal_number)
755 : ACE_Asynch_Result_Impl (),
756 ACE_WIN32_Asynch_Result (handler_proxy, act, event, 0, 0, priority,
757 signal_number),
758 time_ (tv)
762 void
763 ACE_WIN32_Asynch_Timer::complete (size_t,
764 int,
765 const void *,
766 u_long)
768 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
769 if (handler != 0)
770 handler->handle_time_out (this->time_, this->act ());
773 ACE_WIN32_Wakeup_Completion::ACE_WIN32_Wakeup_Completion
774 (ACE_Handler::Proxy_Ptr &handler_proxy,
775 const void *act,
776 ACE_HANDLE event,
777 int priority,
778 int signal_number)
779 : ACE_Asynch_Result_Impl (),
780 ACE_WIN32_Asynch_Result
781 (handler_proxy, act, event, 0, 0, priority, signal_number)
785 ACE_WIN32_Wakeup_Completion::~ACE_WIN32_Wakeup_Completion (void)
789 void
790 ACE_WIN32_Wakeup_Completion::complete (size_t /* bytes_transferred */,
791 int /* success */,
792 const void * /* completion_key */,
793 u_long /* error */)
795 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
796 if (handler != 0)
797 handler->handle_wakeup ();
800 ACE_END_VERSIONED_NAMESPACE_DECL
802 #endif /* ACE_WIN32 */