1 // $Id: POSIX_Proactor.cpp 81697 2008-05-14 18:33:11Z johnnyw $
3 #include "ace/POSIX_Proactor.h"
5 #if defined (ACE_HAS_AIO_CALLS)
7 #if !defined (__ACE_INLINE__)
8 #include "ace/POSIX_Proactor.inl"
9 #endif /* __ACE_INLINE__ */
11 # if defined (ACE_HAS_SYS_SYSTEMINFO_H)
12 # include /**/ <sys/systeminfo.h>
13 # endif /* ACE_HAS_SYS_SYSTEMINFO_H */
16 #include "ace/Flag_Manip.h"
17 #include "ace/Task_T.h"
18 #include "ace/Log_Msg.h"
19 #include "ace/Object_Manager.h"
20 #include "ace/OS_NS_sys_socket.h"
21 #include "ace/OS_NS_signal.h"
22 #include "ace/OS_NS_unistd.h"
25 # include "ace/OS_NS_strings.h"
28 // *********************************************************************
30 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
33 * @class ACE_POSIX_Wakeup_Completion
35 * This result object is used by the <end_event_loop> of the
36 * ACE_Proactor interface to wake up all the threads blocking
39 class ACE_POSIX_Wakeup_Completion
: public ACE_POSIX_Asynch_Result
43 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
45 ACE_HANDLE event
= ACE_INVALID_HANDLE
,
47 int signal_number
= ACE_SIGRTMIN
);
50 virtual ~ACE_POSIX_Wakeup_Completion (void);
53 /// This method calls the <handler>'s <handle_wakeup> method.
54 virtual void complete (size_t bytes_transferred
= 0,
56 const void *completion_key
= 0,
60 // *********************************************************************
61 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
62 : os_id_ (ACE_OS_UNDEFINED
)
66 os_id_
= ACE_OS_SUN
; // set family
70 ::memset(Buf
,0,sizeof(Buf
));
72 ACE_OS::sysinfo (SI_RELEASE
, Buf
, sizeof(Buf
)-1);
74 if (ACE_OS::strcasecmp (Buf
, "5.6") == 0)
75 os_id_
= ACE_OS_SUN_56
;
76 else if (ACE_OS::strcasecmp (Buf
, "5.7") == 0)
77 os_id_
= ACE_OS_SUN_57
;
78 else if (ACE_OS::strcasecmp (Buf
, "5.8") == 0)
79 os_id_
= ACE_OS_SUN_58
;
83 os_id_
= ACE_OS_HPUX
; // set family
87 os_id_
= ACE_OS_IRIX
; // set family
89 #elif defined(__OpenBSD)
91 os_id_
= ACE_OS_OPENBSD
; // set family
95 //#else defined (LINUX, __FreeBSD__ ...)
100 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
106 ACE_POSIX_Proactor::close (void)
112 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle
,
113 const void *completion_key
)
115 ACE_UNUSED_ARG (handle
);
116 ACE_UNUSED_ARG (completion_key
);
121 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
127 ACE_POSIX_Proactor::close_dispatch_threads (int)
133 ACE_POSIX_Proactor::number_of_threads (void) const
136 ACE_NOTSUP_RETURN (0);
140 ACE_POSIX_Proactor::number_of_threads (size_t threads
)
143 ACE_UNUSED_ARG (threads
);
147 ACE_POSIX_Proactor::get_handle (void) const
149 return ACE_INVALID_HANDLE
;
152 ACE_Asynch_Read_Stream_Impl
*
153 ACE_POSIX_Proactor::create_asynch_read_stream (void)
155 ACE_Asynch_Read_Stream_Impl
*implementation
= 0;
156 ACE_NEW_RETURN (implementation
,
157 ACE_POSIX_Asynch_Read_Stream (this),
159 return implementation
;
162 ACE_Asynch_Read_Stream_Result_Impl
*
163 ACE_POSIX_Proactor::create_asynch_read_stream_result
164 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
166 ACE_Message_Block
&message_block
,
167 size_t bytes_to_read
,
173 ACE_Asynch_Read_Stream_Result_Impl
*implementation
;
174 ACE_NEW_RETURN (implementation
,
175 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy
,
184 return implementation
;
188 ACE_Asynch_Write_Stream_Impl
*
189 ACE_POSIX_Proactor::create_asynch_write_stream (void)
191 ACE_Asynch_Write_Stream_Impl
*implementation
= 0;
192 ACE_NEW_RETURN (implementation
,
193 ACE_POSIX_Asynch_Write_Stream (this),
195 return implementation
;
198 ACE_Asynch_Write_Stream_Result_Impl
*
199 ACE_POSIX_Proactor::create_asynch_write_stream_result
200 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
202 ACE_Message_Block
&message_block
,
203 size_t bytes_to_write
,
209 ACE_Asynch_Write_Stream_Result_Impl
*implementation
;
210 ACE_NEW_RETURN (implementation
,
211 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy
,
220 return implementation
;
224 ACE_Asynch_Read_File_Impl
*
225 ACE_POSIX_Proactor::create_asynch_read_file (void)
227 ACE_Asynch_Read_File_Impl
*implementation
= 0;
228 ACE_NEW_RETURN (implementation
,
229 ACE_POSIX_Asynch_Read_File (this),
231 return implementation
;
234 ACE_Asynch_Read_File_Result_Impl
*
235 ACE_POSIX_Proactor::create_asynch_read_file_result
236 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
238 ACE_Message_Block
&message_block
,
239 size_t bytes_to_read
,
247 ACE_Asynch_Read_File_Result_Impl
*implementation
;
248 ACE_NEW_RETURN (implementation
,
249 ACE_POSIX_Asynch_Read_File_Result (handler_proxy
,
260 return implementation
;
264 ACE_Asynch_Write_File_Impl
*
265 ACE_POSIX_Proactor::create_asynch_write_file (void)
267 ACE_Asynch_Write_File_Impl
*implementation
= 0;
268 ACE_NEW_RETURN (implementation
,
269 ACE_POSIX_Asynch_Write_File (this),
271 return implementation
;
274 ACE_Asynch_Write_File_Result_Impl
*
275 ACE_POSIX_Proactor::create_asynch_write_file_result
276 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
278 ACE_Message_Block
&message_block
,
279 size_t bytes_to_write
,
287 ACE_Asynch_Write_File_Result_Impl
*implementation
;
288 ACE_NEW_RETURN (implementation
,
289 ACE_POSIX_Asynch_Write_File_Result (handler_proxy
,
300 return implementation
;
304 ACE_Asynch_Read_Dgram_Impl
*
305 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
307 ACE_Asynch_Read_Dgram_Impl
*implementation
= 0;
308 ACE_NEW_RETURN (implementation
,
309 ACE_POSIX_Asynch_Read_Dgram (this),
311 return implementation
;
314 ACE_Asynch_Read_Dgram_Result_Impl
*
315 ACE_POSIX_Proactor::create_asynch_read_dgram_result
316 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
318 ACE_Message_Block
*message_block
,
319 size_t bytes_to_read
,
327 ACE_Asynch_Read_Dgram_Result_Impl
*implementation
=0;
328 ACE_NEW_RETURN (implementation
,
329 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy
,
341 return implementation
;
345 ACE_Asynch_Write_Dgram_Impl
*
346 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
348 ACE_Asynch_Write_Dgram_Impl
*implementation
= 0;
349 ACE_NEW_RETURN (implementation
,
350 ACE_POSIX_Asynch_Write_Dgram (this),
353 return implementation
;
356 ACE_Asynch_Write_Dgram_Result_Impl
*
357 ACE_POSIX_Proactor::create_asynch_write_dgram_result
358 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
360 ACE_Message_Block
*message_block
,
361 size_t bytes_to_write
,
368 ACE_Asynch_Write_Dgram_Result_Impl
*implementation
=0;
369 ACE_NEW_RETURN (implementation
,
370 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy
,
381 return implementation
;
385 ACE_Asynch_Accept_Impl
*
386 ACE_POSIX_Proactor::create_asynch_accept (void)
388 ACE_Asynch_Accept_Impl
*implementation
= 0;
389 ACE_NEW_RETURN (implementation
,
390 ACE_POSIX_Asynch_Accept (this),
393 return implementation
;
396 ACE_Asynch_Accept_Result_Impl
*
397 ACE_POSIX_Proactor::create_asynch_accept_result
398 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
399 ACE_HANDLE listen_handle
,
400 ACE_HANDLE accept_handle
,
401 ACE_Message_Block
&message_block
,
402 size_t bytes_to_read
,
408 ACE_Asynch_Accept_Result_Impl
*implementation
;
409 ACE_NEW_RETURN (implementation
,
410 ACE_POSIX_Asynch_Accept_Result (handler_proxy
,
420 return implementation
;
424 ACE_Asynch_Connect_Impl
*
425 ACE_POSIX_Proactor::create_asynch_connect (void)
427 ACE_Asynch_Connect_Impl
*implementation
= 0;
428 ACE_NEW_RETURN (implementation
,
429 ACE_POSIX_Asynch_Connect (this),
432 return implementation
;
435 ACE_Asynch_Connect_Result_Impl
*
436 ACE_POSIX_Proactor::create_asynch_connect_result
437 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
438 ACE_HANDLE connect_handle
,
444 ACE_Asynch_Connect_Result_Impl
*implementation
;
445 ACE_NEW_RETURN (implementation
,
446 ACE_POSIX_Asynch_Connect_Result (handler_proxy
,
453 return implementation
;
457 ACE_Asynch_Transmit_File_Impl
*
458 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
460 ACE_Asynch_Transmit_File_Impl
*implementation
= 0;
461 ACE_NEW_RETURN (implementation
,
462 ACE_POSIX_Asynch_Transmit_File (this),
464 return implementation
;
467 ACE_Asynch_Transmit_File_Result_Impl
*
468 ACE_POSIX_Proactor::create_asynch_transmit_file_result
469 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
472 ACE_Asynch_Transmit_File::Header_And_Trailer
*header_and_trailer
,
473 size_t bytes_to_write
,
476 size_t bytes_per_send
,
483 ACE_Asynch_Transmit_File_Result_Impl
*implementation
;
484 ACE_NEW_RETURN (implementation
,
485 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy
,
499 return implementation
;
502 ACE_Asynch_Result_Impl
*
503 ACE_POSIX_Proactor::create_asynch_timer
504 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
506 const ACE_Time_Value
&tv
,
511 ACE_POSIX_Asynch_Timer
*implementation
;
512 ACE_NEW_RETURN (implementation
,
513 ACE_POSIX_Asynch_Timer (handler_proxy
,
520 return implementation
;
525 ACE_POSIX_Proactor::handle_signal (int, siginfo_t
*, ucontext_t
*)
527 // Perform a non-blocking "poll" for all the I/O events that have
528 // completed in the I/O completion queue.
530 ACE_Time_Value
timeout (0, 0);
535 result
= this->handle_events (timeout
);
536 if (result
!= 0 || errno
== ETIME
)
540 // If our handle_events failed, we'll report a failure to the
542 return result
== -1 ? -1 : 0;
546 ACE_POSIX_Proactor::handle_close (ACE_HANDLE handle
,
547 ACE_Reactor_Mask close_mask
)
549 ACE_UNUSED_ARG (close_mask
);
550 ACE_UNUSED_ARG (handle
);
552 return this->close ();
557 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result
*asynch_result
,
558 size_t bytes_transferred
,
559 const void */
* completion_key*/
,
564 // Call completion hook
565 asynch_result
->complete (bytes_transferred
,
567 0, // No completion key.
572 // This is crucial to prevent memory leaks
573 delete asynch_result
;
578 ACE_POSIX_Proactor::post_wakeup_completions (int how_many
)
580 ACE_POSIX_Wakeup_Completion
*wakeup_completion
= 0;
582 for (int ci
= 0; ci
< how_many
; ci
++)
586 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_
.proxy ()),
588 if (this->post_completion (wakeup_completion
) == -1)
595 ACE_POSIX_Proactor::Proactor_Type
596 ACE_POSIX_Proactor::get_impl_type (void)
598 return PROACTOR_POSIX
;
603 * @class ACE_AIOCB_Notify_Pipe_Manager
605 * @brief This class manages the notify pipe of the AIOCB Proactor.
607 * This class acts as the Handler for the
608 * <Asynch_Read> operations issued on the notify pipe. This
609 * class is very useful in implementing <Asynch_Accept> operation
610 * class for the <AIOCB_Proactor>. This is also useful for
611 * implementing <post_completion> for <AIOCB_Proactor>.
613 * <AIOCB_Proactor> class issues a <Asynch_Read> on
614 * the pipe, using this class as the
615 * Handler. <POSIX_Asynch_Result *>'s are sent through the
616 * notify pipe. When <POSIX_Asynch_Result *>'s show up on the
617 * notify pipe, the <POSIX_AIOCB_Proactor> dispatches the
618 * completion of the <Asynch_Read_Stream> and calls the
619 * <handle_read_stream> of this class. This class calls
620 * <complete> on the <POSIX_Asynch_Result *> and thus calls the
621 * application handler.
622 * Handling the MessageBlock:
623 * We give this message block to read the result pointer through
624 * the notify pipe. We expect that to read 4 bytes from the
625 * notify pipe, for each <accept> call. Before giving this
626 * message block to another <accept>, we update <wr_ptr> and put
627 * it in its initial position.
629 class ACE_AIOCB_Notify_Pipe_Manager
: public ACE_Handler
632 /// Constructor. You need the posix proactor because you need to call
633 /// <application_specific_code>
634 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor
*posix_aiocb_proactor
);
637 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
639 /// Send the result pointer through the notification pipe.
642 /// This is the call back method when <Asynch_Read> from the pipe is
644 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result
&result
);
647 /// The implementation proactor class.
648 ACE_POSIX_AIOCB_Proactor
*posix_aiocb_proactor_
;
650 /// Message block to get ACE_POSIX_Asynch_Result pointer from the pipe.
651 ACE_Message_Block message_block_
;
653 /// Pipe for the communication between Proactor and the
654 /// Asynch_Accept/Asynch_Connect and other post_completions
657 /// To do asynch_read on the pipe.
658 ACE_POSIX_Asynch_Read_Stream read_stream_
;
660 /// Default constructor. Shouldnt be called.
661 ACE_AIOCB_Notify_Pipe_Manager (void);
664 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor
*posix_aiocb_proactor
)
665 : posix_aiocb_proactor_ (posix_aiocb_proactor
),
666 message_block_ (sizeof (2)),
667 read_stream_ (posix_aiocb_proactor
)
672 // Set write side in NONBLOCK mode
673 ACE::set_flags (this->pipe_
.write_handle (), ACE_NONBLOCK
);
675 // Set read side in BLOCK mode
676 ACE::clr_flags (this->pipe_
.read_handle (), ACE_NONBLOCK
);
678 // Let AIOCB_Proactor know about our handle
679 posix_aiocb_proactor_
->set_notify_handle (this->pipe_
.read_handle ());
681 // Open the read stream.
682 if (this->read_stream_
.open (this->proxy (),
683 this->pipe_
.read_handle (),
687 ACE_ERROR ((LM_ERROR
,
689 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
690 "Open on Read Stream failed"));
692 // Issue an asynch_read on the read_stream of the notify pipe.
693 if (this->read_stream_
.read (this->message_block_
,
694 1, // enough to read 1 byte
698 ACE_ERROR ((LM_ERROR
,
700 "ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:"
701 "Read from pipe failed"));
704 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
706 // 1. try to cancel pending aio
707 this->read_stream_
.cancel ();
709 // 2. close both handles
710 // Destuctor of ACE_Pipe does not close handles.
711 // We can not use ACE_Pipe::close() as it
712 // closes read_handle and than write_handle.
713 // In some systems close() may wait for
714 // completion for all asynch. pending requests.
715 // So we should close write_handle firstly
716 // to force read completion ( if 1. does not help )
717 // and then read_handle and not vice versa
719 ACE_HANDLE h
= this->pipe_
.write_handle ();
720 if (h
!= ACE_INVALID_HANDLE
)
721 ACE_OS::closesocket (h
);
723 h
= this->pipe_
.read_handle ();
724 if ( h
!= ACE_INVALID_HANDLE
)
725 ACE_OS::closesocket (h
);
731 ACE_AIOCB_Notify_Pipe_Manager::notify ()
733 // Send the result pointer through the pipe.
735 ssize_t ret_val
= ACE::send (this->pipe_
.write_handle (),
741 if (errno
!= EWOULDBLOCK
)
743 ACE_ERROR ((LM_ERROR
,
744 ACE_TEXT ("(%P %t):%p\n"),
745 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
746 ACE_TEXT ("Error:Writing on to notify pipe failed")));
755 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
756 (const ACE_Asynch_Read_Stream::Result
& /*result*/)
758 // 1. Start new read to avoid pipe overflow
760 // Set the message block properly. Put the <wr_ptr> back in the
762 if (this->message_block_
.length () > 0)
763 this->message_block_
.wr_ptr (this->message_block_
.rd_ptr ());
765 // One accept has completed. Issue a read to handle any
766 // <post_completion>s in the future.
767 if (-1 == this->read_stream_
.read (this->message_block_
,
768 1, // enough to read 1 byte
771 ACE_ERROR ((LM_ERROR
,
772 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
773 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
774 ACE_TEXT ("Read from pipe failed")));
778 // this->posix_aiocb_proactor_->process_result_queue ();
781 // Public constructor for common use.
782 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations
)
783 : aiocb_notify_pipe_manager_ (0),
786 aiocb_list_max_size_ (max_aio_operations
),
787 aiocb_list_cur_size_ (0),
788 notify_pipe_read_handle_ (ACE_INVALID_HANDLE
),
789 num_deferred_aiocb_ (0),
792 // Check for correct value for max_aio_operations
793 check_max_aio_num ();
795 this->create_result_aiocb_list ();
797 this->create_notify_manager ();
799 // start pseudo-asynchronous accept task
800 // one per all future acceptors
801 this->get_asynch_pseudo_task().start ();
805 // Special protected constructor for ACE_SUN_Proactor
806 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations
,
807 ACE_POSIX_Proactor::Proactor_Type
)
808 : aiocb_notify_pipe_manager_ (0),
811 aiocb_list_max_size_ (max_aio_operations
),
812 aiocb_list_cur_size_ (0),
813 notify_pipe_read_handle_ (ACE_INVALID_HANDLE
),
814 num_deferred_aiocb_ (0),
817 //check for correct value for max_aio_operations
818 this->check_max_aio_num ();
820 this->create_result_aiocb_list ();
822 // @@ We should create Notify_Pipe_Manager in the derived class to
823 // provide correct calls for virtual functions !!!
827 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
832 ACE_POSIX_Proactor::Proactor_Type
833 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
835 return PROACTOR_AIOCB
;
840 ACE_POSIX_AIOCB_Proactor::close (void)
842 // stop asynch accept task
843 this->get_asynch_pseudo_task().stop ();
845 this->delete_notify_manager ();
847 this->clear_result_queue ();
849 return this->delete_result_aiocb_list ();
852 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h
)
854 notify_pipe_read_handle_
= h
;
857 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
859 if (aiocb_list_
!= 0)
862 ACE_NEW_RETURN (aiocb_list_
, aiocb
*[aiocb_list_max_size_
], -1);
864 ACE_NEW_RETURN (result_list_
,
865 ACE_POSIX_Asynch_Result
*[aiocb_list_max_size_
],
868 // Initialize the array.
869 for (size_t ai
= 0; ai
< this->aiocb_list_max_size_
; ai
++)
872 result_list_
[ai
] = 0;
878 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
880 if (aiocb_list_
== 0) // already deleted
885 // Try to cancel all uncompleted operations; POSIX systems may have
886 // hidden system threads that still can work with our aiocbs!
887 for (ai
= 0; ai
< aiocb_list_max_size_
; ai
++)
888 if (this->aiocb_list_
[ai
] != 0) // active operation
889 this->cancel_aiocb (result_list_
[ai
]);
893 for (ai
= 0; ai
< aiocb_list_max_size_
; ai
++)
895 if (this->aiocb_list_
[ai
] == 0 ) // not active operation
898 // Get the error and return status of the aio_ operation.
899 int error_status
= 0;
900 size_t transfer_count
= 0;
901 int flg_completed
= this->get_result_status (result_list_
[ai
],
905 //don't delete uncompleted AIOCB's
906 if (flg_completed
== 0) // not completed !!!
910 char * errtxt
= ACE_OS::strerror (error_status
);
914 char * op
= (aiocb_list_
[ai
]->aio_lio_opcode
== LIO_WRITE
)?
918 ACE_ERROR ((LM_ERROR
,
919 ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
927 else // completed , OK
929 delete this->result_list_
[ai
];
930 this->result_list_
[ai
] = 0;
931 this->aiocb_list_
[ai
] = 0;
935 // If it is not possible cancel some operation (num_pending > 0 ),
936 // we can do only one thing -report about this
937 // and complain about POSIX implementation.
938 // We know that we have memory leaks, but it is better than
939 // segmentation fault!
942 ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
943 ACE_TEXT(" number pending AIO=%d\n"),
946 delete [] this->aiocb_list_
;
947 this->aiocb_list_
= 0;
949 delete [] this->result_list_
;
950 this->result_list_
= 0;
952 return (num_pending
== 0 ? 0 : -1);
953 // ?? or just always return 0;
956 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
958 long max_os_aio_num
= ACE_OS::sysconf (_SC_AIO_MAX
);
960 // Define max limit AIO's for concrete OS
961 // -1 means that there is no limit, but it is not true
962 // (example, SunOS 5.6)
964 if (max_os_aio_num
> 0 &&
965 aiocb_list_max_size_
> (unsigned long) max_os_aio_num
)
966 aiocb_list_max_size_
= max_os_aio_num
;
968 #if defined (HPUX) || defined (__FreeBSD__)
969 // Although HPUX 11.00 allows to start 2048 AIO's for all process in
970 // system it has a limit 256 max elements for aio_suspend () It is a
973 long max_os_listio_num
= ACE_OS::sysconf (_SC_AIO_LISTIO_MAX
);
974 if (max_os_listio_num
> 0
975 && aiocb_list_max_size_
> (unsigned long) max_os_listio_num
)
976 aiocb_list_max_size_
= max_os_listio_num
;
977 #endif /* HPUX || __FreeBSD__ */
979 // check for user-defined value
980 // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
982 if (aiocb_list_max_size_
<= 0
983 || aiocb_list_max_size_
> ACE_AIO_MAX_SIZE
)
984 aiocb_list_max_size_
= ACE_AIO_MAX_SIZE
;
986 // check for max number files to open
988 int max_num_files
= ACE::max_handles ();
990 if (max_num_files
> 0
991 && aiocb_list_max_size_
> (unsigned long) max_num_files
)
993 ACE::set_handle_limit (aiocb_list_max_size_
);
995 max_num_files
= ACE::max_handles ();
998 if (max_num_files
> 0
999 && aiocb_list_max_size_
> (unsigned long) max_num_files
)
1000 aiocb_list_max_size_
= (unsigned long) max_num_files
;
1002 ACE_DEBUG ((LM_DEBUG
,
1003 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
1004 aiocb_list_max_size_
));
1008 ACE_DEBUG((LM_DEBUG
,
1009 ACE_TEXT( "SGI IRIX specific: aio_init!\n")));
1011 //typedef struct aioinit {
1012 // int aio_threads; /* The number of aio threads to start (5) */
1013 // int aio_locks; /* Initial number of preallocated locks (3) */
1014 // int aio_num; /* estimated total simultanious aiobc structs (1000) */
1015 // int aio_usedba; /* Try to use DBA for raw I/O in lio_listio (0) */
1016 // int aio_debug; /* turn on debugging (0) */
1017 // int aio_numusers; /* max number of user sprocs making aio_* calls (5) */
1018 // int aio_reserved[3];
1023 aioinit
.aio_threads
= 10; /* The number of aio threads to start (5) */
1024 aioinit
.aio_locks
= 20; /* Initial number of preallocated locks (3) */
1025 /* estimated total simultaneous aiobc structs (1000) */
1026 aioinit
.aio_num
= aiocb_list_max_size_
;
1027 aioinit
.aio_usedba
= 0; /* Try to use DBA for raw IO in lio_listio (0) */
1028 aioinit
.aio_debug
= 0; /* turn on debugging (0) */
1029 aioinit
.aio_numusers
= 100; /* max number of user sprocs making aio_* calls (5) */
1030 aioinit
.aio_reserved
[0] = 0;
1031 aioinit
.aio_reserved
[1] = 0;
1032 aioinit
.aio_reserved
[2] = 0;
1034 aio_sgi_init (&aioinit
);
1042 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
1044 // Remember! this issues a Asynch_Read
1045 // on the notify pipe for doing the Asynch_Accept/Connect.
1047 if (aiocb_notify_pipe_manager_
== 0)
1048 ACE_NEW (aiocb_notify_pipe_manager_
,
1049 ACE_AIOCB_Notify_Pipe_Manager (this));
1053 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
1055 // We are responsible for delete as all pointers set to 0 after
1056 // delete, it is save to delete twice
1058 delete aiocb_notify_pipe_manager_
;
1059 aiocb_notify_pipe_manager_
= 0;
1063 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value
&wait_time
)
1065 // Decrement <wait_time> with the amount of time spent in the method
1066 ACE_Countdown_Time
countdown (&wait_time
);
1067 return this->handle_events_i (wait_time
.msec ());
1071 ACE_POSIX_AIOCB_Proactor::handle_events (void)
1073 return this->handle_events_i (ACE_INFINITE
);
1077 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num
)
1079 ACE_UNUSED_ARG (sig_num
);
1081 return this->aiocb_notify_pipe_manager_
->notify ();
1085 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result
*result
)
1087 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->mutex_
, -1));
1089 int ret_val
= this->putq_result (result
);
1095 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result
*result
)
1097 // this protected method should be called with locked mutex_
1098 // we can't use GUARD as Proactor uses non-recursive mutex
1103 int sig_num
= result
->signal_number ();
1104 int ret_val
= this->result_queue_
.enqueue_tail (result
);
1107 ACE_ERROR_RETURN ((LM_ERROR
,
1108 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
1111 this->notify_completion (sig_num
);
1116 ACE_POSIX_Asynch_Result
* ACE_POSIX_AIOCB_Proactor::getq_result (void)
1118 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->mutex_
, 0));
1121 ACE_POSIX_Asynch_Result
* result
= 0;
1123 if (this->result_queue_
.dequeue_head (result
) != 0)
1126 // don't waste time if queue is empty - it is normal
1127 // or check queue size before dequeue_head
1128 // ACE_ERROR_RETURN ((LM_ERROR,
1129 // "%N:%l:(%P | %t):%p\n",
1130 // "ACE_POSIX_AIOCB_Proactor::getq_result failed"),
1136 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
1139 ACE_POSIX_Asynch_Result
* result
= 0;
1141 while ((result
= this->getq_result ()) != 0)
1150 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
1153 ACE_POSIX_Asynch_Result
* result
= 0;
1155 while ((result
= this->getq_result ()) != 0)
1157 this->application_specific_code
1159 result
->bytes_transferred(), // 0, No bytes transferred.
1160 0, // No completion key.
1161 result
->error()); //0, No error.
1170 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds
)
1172 int result_suspend
= 0;
1175 if (milli_seconds
== ACE_INFINITE
)
1176 // Indefinite blocking.
1177 result_suspend
= aio_suspend (aiocb_list_
,
1178 aiocb_list_max_size_
,
1182 // Block on <aio_suspend> for <milli_seconds>
1184 timeout
.tv_sec
= milli_seconds
/ 1000;
1185 timeout
.tv_nsec
= (milli_seconds
- (timeout
.tv_sec
* 1000)) * 1000000;
1186 result_suspend
= aio_suspend (aiocb_list_
,
1187 aiocb_list_max_size_
,
1192 if (result_suspend
== -1)
1194 if (errno
!= EAGAIN
&& // Timeout
1195 errno
!= EINTR
) // Interrupted call
1196 ACE_ERROR ((LM_ERROR
,
1197 ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
1198 ACE_TEXT ("handle_events: aio_suspend failed")));
1199 // let continue work
1200 // we should check "post_completed" queue
1205 size_t count
= aiocb_list_max_size_
; // max number to iterate
1206 int error_status
= 0;
1207 size_t transfer_count
= 0;
1211 ACE_POSIX_Asynch_Result
*asynch_result
=
1212 find_completed_aio (error_status
,
1217 if (asynch_result
== 0)
1220 // Call the application code.
1221 this->application_specific_code (asynch_result
,
1223 0, // No completion key.
1228 // process post_completed results
1229 retval
+= this->process_result_queue ();
1231 return retval
> 0 ? 1 : 0;
1235 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result
*asynch_result
,
1237 size_t &transfer_count
)
1241 // Get the error status of the aio_ operation.
1242 error_status
= aio_error (asynch_result
);
1243 if (error_status
== EINPROGRESS
)
1244 return 0; // not completed
1246 ssize_t op_return
= aio_return (asynch_result
);
1248 transfer_count
= static_cast<size_t> (op_return
);
1249 // else transfer_count is already 0, error_status reports the error.
1250 return 1; // completed
1253 ACE_POSIX_Asynch_Result
*
1254 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status
,
1255 size_t &transfer_count
,
1259 // parameter index defines initial slot to scan
1260 // parameter count tells us how many slots should we scan
1262 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
, this->mutex_
, 0));
1264 ACE_POSIX_Asynch_Result
*asynch_result
= 0;
1266 if (num_started_aio_
== 0) // save time
1269 for (; count
> 0; index
++ , count
--)
1271 if (index
>= aiocb_list_max_size_
) // like a wheel
1274 if (aiocb_list_
[index
] == 0) // Dont process null blocks.
1277 if (0 != this->get_result_status (result_list_
[index
],
1279 transfer_count
)) // completed
1284 if (count
== 0) // all processed , nothing found
1286 asynch_result
= result_list_
[index
];
1288 aiocb_list_
[index
] = 0;
1289 result_list_
[index
] = 0;
1290 aiocb_list_cur_size_
--;
1292 num_started_aio_
--; // decrement count active aios
1293 index
++; // for next iteration
1294 count
--; // for next iteration
1296 this->start_deferred_aio ();
1297 //make attempt to start deferred AIO
1298 //It is safe as we are protected by mutex_
1300 return asynch_result
;
1305 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result
*result
,
1306 ACE_POSIX_Proactor::Opcode op
)
1308 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
1310 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
, this->mutex_
, -1));
1312 int ret_val
= (aiocb_list_cur_size_
>= aiocb_list_max_size_
) ? -1 : 0;
1314 if (result
== 0) // Just check the status of the list
1317 // Save operation code in the aiocb
1320 case ACE_POSIX_Proactor::ACE_OPCODE_READ
:
1321 result
->aio_lio_opcode
= LIO_READ
;
1324 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE
:
1325 result
->aio_lio_opcode
= LIO_WRITE
;
1329 ACE_ERROR_RETURN ((LM_ERROR
,
1330 ACE_TEXT ("%N:%l:(%P|%t)::")
1331 ACE_TEXT ("start_aio: Invalid op code %d\n"),
1336 if (ret_val
!= 0) // No free slot
1342 // Find a free slot and store.
1344 ssize_t slot
= allocate_aio_slot (result
);
1349 size_t index
= static_cast<size_t> (slot
);
1351 result_list_
[index
] = result
; //Store result ptr anyway
1352 aiocb_list_cur_size_
++;
1354 ret_val
= start_aio_i (result
);
1357 case 0: // started OK
1358 aiocb_list_
[index
] = result
;
1361 case 1: // OS AIO queue overflow
1362 num_deferred_aiocb_
++;
1365 default: // Invalid request, there is no point
1366 break; // to start it later
1369 result_list_
[index
] = 0;
1370 aiocb_list_cur_size_
--;
1375 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result
*result
)
1379 // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
1380 // so make check for ACE_AIOCB_Notify_Pipe_Manager request
1382 if (notify_pipe_read_handle_
== result
->aio_fildes
) // Notify_Pipe ?
1383 { // should be free,
1384 if (result_list_
[i
] != 0) // only 1 request
1387 ACE_ERROR_RETURN ((LM_ERROR
,
1388 "%N:%l:(%P | %t)::\n"
1389 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1390 "internal Proactor error 0\n"),
1394 else //try to find free slot as usual, but starting from 1
1396 for (i
= 1; i
< this->aiocb_list_max_size_
; i
++)
1397 if (result_list_
[i
] == 0)
1401 if (i
>= this->aiocb_list_max_size_
)
1402 ACE_ERROR_RETURN ((LM_ERROR
,
1403 "%N:%l:(%P | %t)::\n"
1404 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1405 "internal Proactor error 1\n"),
1408 //setup OS notification methods for this aio
1409 result
->aio_sigevent
.sigev_notify
= SIGEV_NONE
;
1411 return static_cast<ssize_t
> (i
);
1414 // start_aio_i has new return codes
1415 // 0 AIO was started successfully
1416 // 1 AIO was not started, OS AIO queue overflow
1417 // -1 AIO was not started, other errors
1420 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result
*result
)
1422 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
1425 const ACE_TCHAR
*ptype
= 0;
1429 switch (result
->aio_lio_opcode
)
1432 ptype
= ACE_TEXT ("read ");
1433 ret_val
= aio_read (result
);
1436 ptype
= ACE_TEXT ("write");
1437 ret_val
= aio_write (result
);
1440 ptype
= ACE_TEXT ("?????");
1447 ++this->num_started_aio_
;
1449 else // if (ret_val == -1)
1451 if (errno
== EAGAIN
|| errno
== ENOMEM
) //Ok, it will be deferred AIO
1454 ACE_ERROR ((LM_ERROR
,
1455 ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
1457 ACE_TEXT ("queueing failed")));
1465 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
1467 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
1469 // This protected method is called from
1470 // find_completed_aio after any AIO completion
1471 // We should call this method always with locked
1472 // ACE_POSIX_AIOCB_Proactor::mutex_
1474 // It tries to start the first deferred AIO
1477 if (num_deferred_aiocb_
== 0)
1478 return 0; // nothing to do
1482 for (i
= 0; i
< this->aiocb_list_max_size_
; i
++)
1483 if (result_list_
[i
] !=0 // check for
1484 && aiocb_list_
[i
] ==0) // deferred AIO
1487 if (i
>= this->aiocb_list_max_size_
)
1488 ACE_ERROR_RETURN ((LM_ERROR
,
1489 "%N:%l:(%P | %t)::\n"
1490 "start_deferred_aio:"
1491 "internal Proactor error 3\n"),
1494 ACE_POSIX_Asynch_Result
*result
= result_list_
[i
];
1496 int ret_val
= start_aio_i (result
);
1500 case 0 : //started OK , decrement count of deferred AIOs
1501 aiocb_list_
[i
] = result
;
1502 num_deferred_aiocb_
--;
1506 return 0; //try again later
1508 default : // Invalid Parameters , should never be
1514 result_list_
[i
] = 0;
1515 --aiocb_list_cur_size_
;
1517 --num_deferred_aiocb_
;
1519 result
->set_error (errno
);
1520 result
->set_bytes_transferred (0);
1521 this->putq_result (result
); // we are with locked mutex_ here !
1527 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle
)
1529 // This new method should be called from
1530 // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
1531 // It scans the result_list_ and defines all AIO requests
1532 // that were issued for handle "handle"
1534 // For all deferred AIO requests with handle "handle"
1535 // it removes its from the lists and notifies user
1537 // For all running AIO requests with handle "handle"
1538 // it calls ::aio_cancel. According to the POSIX standards
1539 // we will receive ECANCELED for all ::aio_canceled AIO requests
1540 // later on return from ::aio_suspend
1542 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
1545 int num_cancelled
= 0;
1548 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex
, ace_mon
, this->mutex_
, -1));
1552 for (ai
= 0; ai
< this->aiocb_list_max_size_
; ai
++)
1554 if (this->result_list_
[ai
] == 0) // Skip empty slot
1557 if (this->result_list_
[ai
]->aio_fildes
!= handle
) // Not ours
1562 ACE_POSIX_Asynch_Result
*asynch_result
= this->result_list_
[ai
];
1564 if (this->aiocb_list_
[ai
] == 0) // Canceling a deferred operation
1567 this->num_deferred_aiocb_
--;
1569 this->aiocb_list_
[ai
] = 0;
1570 this->result_list_
[ai
] = 0;
1571 this->aiocb_list_cur_size_
--;
1573 asynch_result
->set_error (ECANCELED
);
1574 asynch_result
->set_bytes_transferred (0);
1575 this->putq_result (asynch_result
);
1576 // we are with locked mutex_ here !
1578 else // Cancel started aio
1580 int rc_cancel
= this->cancel_aiocb (asynch_result
);
1582 if (rc_cancel
== 0) //notification in the future
1583 num_cancelled
++; //it is OS responsiblity
1590 return 1; // ALLDONE
1592 if (num_cancelled
== num_total
)
1593 return 0; // CANCELLED
1595 return 2; // NOT CANCELLED
1599 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result
* result
)
1601 // This method is called from cancel_aio
1602 // to cancel a previously submitted AIO request
1603 int rc
= ::aio_cancel (0, result
);
1605 // Check the return value and return 0/1/2 appropriately.
1606 if (rc
== AIO_CANCELED
)
1608 else if (rc
== AIO_ALLDONE
)
1610 else // (rc == AIO_NOTCANCELED)
1615 // *********************************************************************
1617 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
1619 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations
)
1620 : ACE_POSIX_AIOCB_Proactor (max_aio_operations
,
1621 ACE_POSIX_Proactor::PROACTOR_SIG
)
1623 // = Set up the mask we'll use to block waiting for SIGRTMIN. Use that
1624 // to add it to the signal mask for this thread, and also set the process
1625 // signal action to pass signal information when we want it.
1627 // Clear the signal set.
1628 ACE_OS::sigemptyset (&this->RT_completion_signals_
);
1630 // Add the signal number to the signal set.
1631 if (ACE_OS::sigaddset (&this->RT_completion_signals_
, ACE_SIGRTMIN
) == -1)
1632 ACE_ERROR ((LM_ERROR
, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
1633 ACE_TEXT ("sigaddset")));
1634 this->block_signals ();
1635 // Set up the signal action for SIGRTMIN.
1636 this->setup_signal_handler (ACE_SIGRTMIN
);
1638 // we do not have to create notify manager
1639 // but we should start pseudo-asynchronous accept task
1640 // one per all future acceptors
1642 this->get_asynch_pseudo_task().start ();
1646 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set
,
1647 size_t max_aio_operations
)
1648 : ACE_POSIX_AIOCB_Proactor (max_aio_operations
,
1649 ACE_POSIX_Proactor::PROACTOR_SIG
)
1651 // = Keep <Signal_set> with the Proactor, mask all the signals and
1652 // setup signal actions for the signals in the <signal_set>.
1654 // = Keep <signal_set> with the Proactor.
1656 // Empty the signal set first.
1657 if (sigemptyset (&this->RT_completion_signals_
) == -1)
1658 ACE_ERROR ((LM_ERROR
,
1659 "Error:(%P | %t):%p\n",
1660 "sigemptyset failed"));
1662 // For each signal number present in the <signal_set>, add it to
1663 // the signal set we use, and also set up its process signal action
1664 // to allow signal info to be passed into sigwait/sigtimedwait.
1666 for (int si
= ACE_SIGRTMIN
; si
<= ACE_SIGRTMAX
; si
++)
1668 member
= sigismember (&signal_set
,
1671 ACE_ERROR ((LM_ERROR
,
1672 "%N:%l:(%P | %t)::%p\n",
1673 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
1674 "sigismember failed"));
1675 else if (member
== 1)
1677 sigaddset (&this->RT_completion_signals_
, si
);
1678 this->setup_signal_handler (si
);
1682 // Mask all the signals.
1683 this->block_signals ();
1685 // we do not have to create notify manager
1686 // but we should start pseudo-asynchronous accept task
1687 // one per all future acceptors
1689 this->get_asynch_pseudo_task().start ();
1693 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
1697 // @@ Enable the masked signals again.
1700 ACE_POSIX_Proactor::Proactor_Type
1701 ACE_POSIX_SIG_Proactor::get_impl_type (void)
1703 return PROACTOR_SIG
;
1707 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value
&wait_time
)
1709 // Decrement <wait_time> with the amount of time spent in the method
1710 ACE_Countdown_Time
countdown (&wait_time
);
1711 return this->handle_events_i (&wait_time
);
1715 ACE_POSIX_SIG_Proactor::handle_events (void)
1717 return this->handle_events_i (0);
1721 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num
)
1723 // Get this process id.
1724 pid_t
const pid
= ACE_OS::getpid ();
1725 if (pid
== (pid_t
) -1)
1726 ACE_ERROR_RETURN ((LM_ERROR
,
1727 "Error:%N:%l(%P | %t):%p",
1731 // Set the signal information.
1733 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1734 value
.sigval_int
= -1;
1736 value
.sival_int
= -1;
1737 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1739 // Queue the signal.
1740 if (sigqueue (pid
, sig_num
, value
) == 0)
1743 if (errno
!= EAGAIN
)
1744 ACE_ERROR_RETURN ((LM_ERROR
,
1745 "Error:%N:%l:(%P | %t):%p\n",
1746 "<sigqueue> failed"),
1751 ACE_Asynch_Result_Impl
*
1752 ACE_POSIX_SIG_Proactor::create_asynch_timer
1753 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
1755 const ACE_Time_Value
&tv
,
1762 // Fix the signal number.
1763 if (signal_number
== -1)
1766 for (si
= ACE_SIGRTMAX
;
1767 (is_member
== 0) && (si
>= ACE_SIGRTMIN
);
1770 is_member
= sigismember (&this->RT_completion_signals_
,
1772 if (is_member
== -1)
1773 ACE_ERROR_RETURN ((LM_ERROR
,
1774 "%N:%l:(%P | %t)::%s\n",
1775 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
1776 "sigismember failed"),
1781 ACE_ERROR_RETURN ((LM_ERROR
,
1782 "Error:%N:%l:(%P | %t)::%s\n",
1783 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
1784 "Signal mask set empty"),
1787 // + 1 to nullify loop increment.
1788 signal_number
= si
+ 1;
1791 ACE_Asynch_Result_Impl
*implementation
;
1792 ACE_NEW_RETURN (implementation
,
1793 ACE_POSIX_Asynch_Timer (handler_proxy
,
1800 return implementation
;
1805 sig_handler (int sig_num
, siginfo_t
*, ucontext_t
*)
1807 // Should never be called
1808 ACE_DEBUG ((LM_DEBUG
,
1809 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
1815 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number
) const
1817 // Set up the specified signal so that signal information will be
1818 // passed to sigwaitinfo/sigtimedwait. Don't change the default
1819 // signal handler - having a handler and waiting for the signal can
1820 // produce undefined behavior.
1822 // But can not use SIG_DFL
1823 // With SIG_DFL after delivering the first signal
1824 // SIG_DFL handler resets SA_SIGINFO flags
1825 // and we will lose all information sig_info
1826 // At least all SunOS have such behavior
1828 struct sigaction reaction
;
1829 sigemptyset (&reaction
.sa_mask
); // Nothing else to mask.
1830 reaction
.sa_flags
= SA_SIGINFO
; // Realtime flag.
1831 reaction
.sa_sigaction
= ACE_SIGNAL_C_FUNC (sig_handler
); // (SIG_DFL);
1832 int sigaction_return
= ACE_OS::sigaction (signal_number
,
1835 if (sigaction_return
== -1)
1836 ACE_ERROR_RETURN ((LM_ERROR
,
1838 "Proactor couldnt do sigaction for the RT SIGNAL"),
1841 ACE_UNUSED_ARG(signal_number
);
1848 ACE_POSIX_SIG_Proactor::block_signals (void) const
1850 return ACE_OS::pthread_sigmask (SIG_BLOCK
, &this->RT_completion_signals_
, 0);
1854 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result
*result
)
1858 //try to find free slot as usual, starting from 0
1859 for (i
= 0; i
< this->aiocb_list_max_size_
; i
++)
1860 if (result_list_
[i
] == 0)
1863 if (i
>= this->aiocb_list_max_size_
)
1864 ACE_ERROR_RETURN ((LM_ERROR
,
1865 "%N:%l:(%P | %t)::\n"
1866 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
1867 "internal Proactor error 1\n"),
1870 // setup OS notification methods for this aio
1871 // store index!!, not pointer in signal info
1872 result
->aio_sigevent
.sigev_notify
= SIGEV_SIGNAL
;
1873 result
->aio_sigevent
.sigev_signo
= result
->signal_number ();
1874 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1875 result
->aio_sigevent
.sigev_value
.sigval_int
= static_cast<int> (i
);
1877 result
->aio_sigevent
.sigev_value
.sival_int
= static_cast<int> (i
);
1878 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1880 return static_cast<ssize_t
> (i
);
1884 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value
*timeout
)
1886 int result_sigwait
= 0;
1891 // Wait for the signals.
1894 result_sigwait
= ACE_OS::sigwaitinfo (&this->RT_completion_signals_
,
1899 result_sigwait
= ACE_OS::sigtimedwait (&this->RT_completion_signals_
,
1902 if (result_sigwait
== -1 && errno
== EAGAIN
)
1906 while (result_sigwait
== -1 && errno
== EINTR
);
1908 if (result_sigwait
== -1) // Not a timeout, not EINTR: tell caller of error
1911 // Decide what to do. We always check the completion queue since it's an
1912 // easy, quick check. What is decided here is whether to check for
1913 // I/O completions and, if so, how completely to scan.
1914 int flg_aio
= 0; // 1 if AIO Completion possible
1916 size_t index
= 0; // start index to scan aiocb list
1917 size_t count
= 1; // max number of aiocbs to scan
1918 int error_status
= 0;
1919 size_t transfer_count
= 0;
1921 if (sig_info
.si_code
== SI_ASYNCIO
|| this->os_id_
== ACE_OS_SUN_56
)
1923 flg_aio
= 1; // AIO signal received
1924 // define index to start
1925 // nothing will happen if it contains garbage
1926 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1927 index
= static_cast<size_t> (sig_info
.si_value
.sigval_int
);
1929 index
= static_cast<size_t> (sig_info
.si_value
.sival_int
);
1930 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1931 // Assume we have a correctly-functioning implementation, and that
1932 // there is one I/O to process, and it's correctly specified in the
1933 // siginfo received. There are, however, some special situations
1934 // where this isn't true...
1935 if (os_id_
== ACE_OS_SUN_56
) // Solaris 6
1937 // 1. Solaris 6 always loses any RT signal,
1938 // if it has more SIGQUEMAX=32 pending signals
1939 // so we should scan the whole aiocb list
1940 // 2. Moreover,it has one more bad habit
1941 // to notify aio completion
1942 // with SI_QUEUE code instead of SI_ASYNCIO, hence the
1943 // OS_SUN_56 addition to the si_code check, above.
1944 count
= aiocb_list_max_size_
;
1947 else if (sig_info
.si_code
!= SI_QUEUE
)
1949 // Unknown signal code.
1950 // may some other third-party libraries could send it
1951 // or message queue could also generate it !
1952 // So print the message and check our completions
1953 ACE_ERROR ((LM_DEBUG
,
1954 ACE_TEXT ("%N:%l:(%P | %t): ")
1955 ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
1956 ACE_TEXT ("Unexpected signal code (%d) returned ")
1957 ACE_TEXT ("from sigwait; expecting %d\n"),
1958 result_sigwait
, sig_info
.si_code
));
1968 ACE_POSIX_Asynch_Result
*asynch_result
=
1969 find_completed_aio (error_status
,
1974 if (asynch_result
== 0)
1977 // Call the application code.
1978 this->application_specific_code (asynch_result
,
1980 0, // No completion key.
1981 error_status
); // Error
1984 // process post_completed results
1985 ret_que
= this->process_result_queue ();
1987 // Uncomment this if you want to test
1988 // and research the behavior of you system
1990 ACE_DEBUG ((LM_DEBUG
,
1991 "(%t) NumAIO=%d NumQueue=%d\n",
1995 return ret_aio
+ ret_que
> 0 ? 1 : 0;
1998 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
2000 // *********************************************************************
2002 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
2003 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
2005 const ACE_Time_Value
&tv
,
2009 : ACE_POSIX_Asynch_Result
2010 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
2016 ACE_POSIX_Asynch_Timer::complete (size_t /* bytes_transferred */,
2018 const void * /* completion_key */,
2021 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
2023 handler
->handle_time_out (this->time_
, this->act ());
2027 // *********************************************************************
2029 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
2030 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
2035 : ACE_Asynch_Result_Impl (),
2036 ACE_POSIX_Asynch_Result (handler_proxy
,
2046 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
2051 ACE_POSIX_Wakeup_Completion::complete (size_t /* bytes_transferred */,
2053 const void * /* completion_key */,
2057 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
2059 handler
->handle_wakeup ();
2062 ACE_END_VERSIONED_NAMESPACE_DECL
2064 #endif /* ACE_HAS_AIO_CALLS */