1 #include "ace/POSIX_Asynch_IO.h"
3 #if defined (ACE_HAS_AIO_CALLS)
5 #include "ace/Flag_Manip.h"
6 #include "ace/Proactor.h"
7 #include "ace/Message_Block.h"
8 #include "ace/INET_Addr.h"
9 #include "ace/Asynch_Pseudo_Task.h"
10 #include "ace/POSIX_Proactor.h"
11 #include "ace/OS_NS_errno.h"
12 #include "ace/OS_NS_sys_socket.h"
13 #include "ace/OS_NS_sys_stat.h"
15 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
18 ACE_POSIX_Asynch_Result::bytes_transferred () const
20 return this->bytes_transferred_
;
24 ACE_POSIX_Asynch_Result::set_bytes_transferred (size_t nbytes
)
26 this->bytes_transferred_
= nbytes
;
30 ACE_POSIX_Asynch_Result::act () const
36 ACE_POSIX_Asynch_Result::success () const
38 return this->success_
;
42 ACE_POSIX_Asynch_Result::completion_key () const
44 return this->completion_key_
;
48 ACE_POSIX_Asynch_Result::error () const
54 ACE_POSIX_Asynch_Result::set_error (u_long errcode
)
59 ACE_POSIX_Asynch_Result::event () const
61 return ACE_INVALID_HANDLE
;
65 ACE_POSIX_Asynch_Result::offset () const
67 return this->aio_offset
;
71 ACE_POSIX_Asynch_Result::offset_high () const
74 // @@ Support aiocb64??
76 ACE_NOTSUP_RETURN (0);
80 ACE_POSIX_Asynch_Result::priority () const
82 return this->aio_reqprio
;
86 ACE_POSIX_Asynch_Result::signal_number () const
88 return this->aio_sigevent
.sigev_signo
;
92 ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl
*proactor_impl
)
94 // Get to the platform specific implementation.
95 ACE_POSIX_Proactor
*posix_proactor
= dynamic_cast<ACE_POSIX_Proactor
*> (proactor_impl
);
97 if (posix_proactor
== 0)
98 ACELIB_ERROR_RETURN ((LM_ERROR
, "Dynamic cast to POSIX Proactor failed\n"), -1);
101 return posix_proactor
->post_completion (this);
104 ACE_POSIX_Asynch_Result::~ACE_POSIX_Asynch_Result ()
108 ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result
109 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
111 ACE_HANDLE
/* event */, // Event is not used on POSIX.
116 : handler_proxy_ (handler_proxy
),
118 bytes_transferred_ (0),
124 aio_reqprio
= priority
;
125 aio_sigevent
.sigev_signo
= signal_number
;
128 // @@ Support offset_high with aiocb64.
130 ACE_UNUSED_ARG (offset_high
);
132 // Other fields in the <aiocb> will be initialized by the
136 // ****************************************************************
139 ACE_POSIX_Asynch_Operation::open (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
141 const void * /* completion_key */,
142 ACE_Proactor
*proactor
)
144 this->proactor_
= proactor
;
145 this->handler_proxy_
= handler_proxy
;
146 this->handle_
= handle
;
148 // Grab the handle from the <handler> if <handle> is invalid
149 if (this->handle_
== ACE_INVALID_HANDLE
)
151 ACE_Handler
*handler
= handler_proxy
.get ()->handler ();
153 this->handle_
= handler
->handle ();
155 if (this->handle_
== ACE_INVALID_HANDLE
)
159 // @@ If <proactor> is 0, let us not bother about getting this
160 // Proactor, we have already got the specific implementation
163 // If no proactor was passed
164 if (this->proactor_
== 0)
166 // Grab the proactor from the <Service_Config> if
167 // <handler->proactor> is zero
168 this->proactor_
= this->handler_
->proactor ();
169 if (this->proactor_
== 0)
170 this->proactor_
= ACE_Proactor::instance();
178 ACE_POSIX_Asynch_Operation::cancel ()
180 if (!posix_proactor_
)
182 return posix_proactor_
->cancel_aio (this->handle_
);
186 ACE_POSIX_Asynch_Operation::proactor () const
188 return this->proactor_
;
192 ACE_POSIX_Asynch_Operation::posix_proactor () const
194 return this->posix_proactor_
;
197 ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation ()
201 ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (ACE_POSIX_Proactor
*posix_proactor
)
202 : posix_proactor_ (posix_proactor
),
203 handle_ (ACE_INVALID_HANDLE
)
207 // *********************************************************************
210 ACE_POSIX_Asynch_Read_Stream_Result::bytes_to_read () const
212 return this->aio_nbytes
;
216 ACE_POSIX_Asynch_Read_Stream_Result::message_block () const
218 return this->message_block_
;
222 ACE_POSIX_Asynch_Read_Stream_Result::handle () const
224 return this->aio_fildes
;
227 ACE_POSIX_Asynch_Read_Stream_Result::ACE_POSIX_Asynch_Read_Stream_Result
228 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
230 ACE_Message_Block
&message_block
,
231 size_t bytes_to_read
,
236 : ACE_POSIX_Asynch_Result
237 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
238 message_block_ (message_block
)
240 this->aio_fildes
= handle
;
241 this->aio_buf
= message_block
.wr_ptr ();
242 this->aio_nbytes
= bytes_to_read
;
246 ACE_POSIX_Asynch_Read_Stream_Result::complete (size_t bytes_transferred
,
248 const void *completion_key
,
251 this->bytes_transferred_
= bytes_transferred
;
252 this->success_
= success
;
253 this->completion_key_
= completion_key
;
254 this->error_
= error
;
256 // <errno> is available in the aiocb.
257 ACE_UNUSED_ARG (error
);
259 // Appropriately move the pointers in the message block.
260 this->message_block_
.wr_ptr (bytes_transferred
);
262 // Create the interface result class.
263 ACE_Asynch_Read_Stream::Result
result (this);
265 // Call the application handler.
266 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
268 handler
->handle_read_stream (result
);
271 ACE_POSIX_Asynch_Read_Stream_Result::~ACE_POSIX_Asynch_Read_Stream_Result ()
275 // ************************************************************
277 ACE_POSIX_Asynch_Read_Stream::ACE_POSIX_Asynch_Read_Stream (ACE_POSIX_Proactor
*posix_proactor
)
278 : ACE_POSIX_Asynch_Operation (posix_proactor
)
283 ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block
&message_block
,
284 size_t bytes_to_read
,
289 size_t space
= message_block
.space ();
290 if (bytes_to_read
> space
)
293 if (bytes_to_read
== 0)
299 // Create the Asynch_Result.
300 ACE_POSIX_Asynch_Read_Stream_Result
*result
= 0;
301 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
302 ACE_NEW_RETURN (result
,
303 ACE_POSIX_Asynch_Read_Stream_Result (this->handler_proxy_
,
308 proactor
->get_handle (),
313 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_READ
);
314 if (return_val
== -1)
320 ACE_POSIX_Asynch_Read_Stream::~ACE_POSIX_Asynch_Read_Stream ()
324 // *********************************************************************
327 ACE_POSIX_Asynch_Write_Stream_Result::bytes_to_write () const
329 return this->aio_nbytes
;
333 ACE_POSIX_Asynch_Write_Stream_Result::message_block () const
335 return this->message_block_
;
339 ACE_POSIX_Asynch_Write_Stream_Result::handle () const
341 return this->aio_fildes
;
344 ACE_POSIX_Asynch_Write_Stream_Result::ACE_POSIX_Asynch_Write_Stream_Result
345 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
347 ACE_Message_Block
&message_block
,
348 size_t bytes_to_write
,
353 : ACE_POSIX_Asynch_Result
354 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
355 message_block_ (message_block
)
357 this->aio_fildes
= handle
;
358 this->aio_buf
= message_block
.rd_ptr ();
359 this->aio_nbytes
= bytes_to_write
;
363 ACE_POSIX_Asynch_Write_Stream_Result::complete (size_t bytes_transferred
,
365 const void *completion_key
,
368 // Get all the data copied.
369 this->bytes_transferred_
= bytes_transferred
;
370 this->success_
= success
;
371 this->completion_key_
= completion_key
;
372 this->error_
= error
;
374 // <errno> is available in the aiocb.
375 ACE_UNUSED_ARG (error
);
377 // Appropriately move the pointers in the message block.
378 this->message_block_
.rd_ptr (bytes_transferred
);
380 // Create the interface result class.
381 ACE_Asynch_Write_Stream::Result
result (this);
383 // Call the application handler.
384 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
386 handler
->handle_write_stream (result
);
389 ACE_POSIX_Asynch_Write_Stream_Result::~ACE_POSIX_Asynch_Write_Stream_Result ()
393 // *********************************************************************
395 ACE_POSIX_Asynch_Write_Stream::ACE_POSIX_Asynch_Write_Stream (ACE_POSIX_Proactor
*posix_proactor
)
396 : ACE_POSIX_Asynch_Operation (posix_proactor
)
401 ACE_POSIX_Asynch_Write_Stream::write (ACE_Message_Block
&message_block
,
402 size_t bytes_to_write
,
407 size_t len
= message_block
.length ();
408 if (bytes_to_write
> len
)
409 bytes_to_write
= len
;
411 if (bytes_to_write
== 0)
414 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
415 ACE_TEXT ("Attempt to write 0 bytes\n")),
418 ACE_POSIX_Asynch_Write_Stream_Result
*result
= 0;
419 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
420 ACE_NEW_RETURN (result
,
421 ACE_POSIX_Asynch_Write_Stream_Result (this->handler_proxy_
,
426 proactor
->get_handle (),
431 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_WRITE
);
432 if (return_val
== -1)
438 ACE_POSIX_Asynch_Write_Stream::~ACE_POSIX_Asynch_Write_Stream ()
442 // *********************************************************************
444 ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result
445 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
447 ACE_Message_Block
&message_block
,
448 size_t bytes_to_read
,
455 : ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy
,
464 this->aio_offset
= offset
;
468 ACE_UNUSED_ARG (offset_high
);
472 ACE_POSIX_Asynch_Read_File_Result::complete (size_t bytes_transferred
,
474 const void *completion_key
,
477 // Copy all the data.
478 this->bytes_transferred_
= bytes_transferred
;
479 this->success_
= success
;
480 this->completion_key_
= completion_key
;
481 this->error_
= error
;
483 // <errno> is available in the aiocb.
484 ACE_UNUSED_ARG (error
);
486 // Appropriately move the pointers in the message block.
487 this->message_block_
.wr_ptr (bytes_transferred
);
489 // Create the interface result class.
490 ACE_Asynch_Read_File::Result
result (this);
492 // Call the application handler.
493 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
495 handler
->handle_read_file (result
);
498 ACE_POSIX_Asynch_Read_File_Result::~ACE_POSIX_Asynch_Read_File_Result ()
502 // *********************************************************************
504 ACE_POSIX_Asynch_Read_File::ACE_POSIX_Asynch_Read_File (ACE_POSIX_Proactor
*posix_proactor
)
505 : ACE_POSIX_Asynch_Read_Stream (posix_proactor
)
510 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block
&message_block
,
511 size_t bytes_to_read
,
518 size_t space
= message_block
.space ();
519 if ( bytes_to_read
> space
)
522 if ( bytes_to_read
== 0 )
525 ACE_TEXT ("ACE_POSIX_Asynch_Read_File::read:")
526 ACE_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
529 ACE_POSIX_Asynch_Read_File_Result
*result
= 0;
530 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
531 ACE_NEW_RETURN (result
,
532 ACE_POSIX_Asynch_Read_File_Result (this->handler_proxy_
,
539 posix_proactor ()->get_handle (),
544 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_READ
);
545 if (return_val
== -1)
551 ACE_POSIX_Asynch_Read_File::~ACE_POSIX_Asynch_Read_File ()
556 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block
&message_block
,
557 size_t bytes_to_read
,
562 return ACE_POSIX_Asynch_Read_Stream::read (message_block
,
569 // ************************************************************
571 ACE_POSIX_Asynch_Write_File_Result::ACE_POSIX_Asynch_Write_File_Result
572 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
574 ACE_Message_Block
&message_block
,
575 size_t bytes_to_write
,
582 : ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy
,
591 this->aio_offset
= offset
;
593 // @@ Support offset_high with aiocb64.
595 ACE_UNUSED_ARG (offset_high
);
599 ACE_POSIX_Asynch_Write_File_Result::complete (size_t bytes_transferred
,
601 const void *completion_key
,
605 this->bytes_transferred_
= bytes_transferred
;
606 this->success_
= success
;
607 this->completion_key_
= completion_key
;
608 this->error_
= error
;
610 // <error> is available in <aio_resultp.aio_error>
611 ACE_UNUSED_ARG (error
);
613 // Appropriately move the pointers in the message block.
614 this->message_block_
.rd_ptr (bytes_transferred
);
616 // Create the interface result class.
617 ACE_Asynch_Write_File::Result
result (this);
619 // Call the application handler.
620 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
622 handler
->handle_write_file (result
);
625 ACE_POSIX_Asynch_Write_File_Result::~ACE_POSIX_Asynch_Write_File_Result ()
629 // *********************************************************************
631 ACE_POSIX_Asynch_Write_File::ACE_POSIX_Asynch_Write_File (ACE_POSIX_Proactor
*posix_proactor
)
632 : ACE_POSIX_Asynch_Write_Stream (posix_proactor
)
637 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block
&message_block
,
638 size_t bytes_to_write
,
645 size_t len
= message_block
.length ();
646 if (bytes_to_write
> len
)
647 bytes_to_write
= len
;
649 if (bytes_to_write
== 0)
652 ACE_TEXT ("ACE_POSIX_Asynch_Write_File::write:")
653 ACE_TEXT ("Attempt to write 0 bytes\n")),
656 ACE_POSIX_Asynch_Write_File_Result
*result
= 0;
657 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
658 ACE_NEW_RETURN (result
,
659 ACE_POSIX_Asynch_Write_File_Result (this->handler_proxy_
,
666 proactor
->get_handle (),
671 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_WRITE
);
672 if (return_val
== -1)
678 ACE_POSIX_Asynch_Write_File::~ACE_POSIX_Asynch_Write_File ()
683 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block
&message_block
,
684 size_t bytes_to_write
,
689 return ACE_POSIX_Asynch_Write_Stream::write (message_block
,
696 // *********************************************************************
700 ACE_POSIX_Asynch_Accept_Result::bytes_to_read () const
702 return this->aio_nbytes
;
706 ACE_POSIX_Asynch_Accept_Result::message_block () const
708 return this->message_block_
;
712 ACE_POSIX_Asynch_Accept_Result::listen_handle () const
714 return this->listen_handle_
;
718 ACE_POSIX_Asynch_Accept_Result::accept_handle () const
720 return this->aio_fildes
;
723 ACE_POSIX_Asynch_Accept_Result::ACE_POSIX_Asynch_Accept_Result
724 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
725 ACE_HANDLE listen_handle
,
726 ACE_HANDLE accept_handle
,
727 ACE_Message_Block
&message_block
,
728 size_t bytes_to_read
,
734 : ACE_POSIX_Asynch_Result
735 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
736 message_block_ (message_block
),
737 listen_handle_ (listen_handle
)
739 this->aio_fildes
= accept_handle
;
740 this->aio_nbytes
= bytes_to_read
;
744 ACE_POSIX_Asynch_Accept_Result::complete (size_t bytes_transferred
,
746 const void *completion_key
,
750 this->bytes_transferred_
= bytes_transferred
;
751 this->success_
= success
;
752 this->completion_key_
= completion_key
;
753 this->error_
= error
;
755 // Appropriately move the pointers in the message block.
756 this->message_block_
.wr_ptr (bytes_transferred
);
758 // Create the interface result class.
759 ACE_Asynch_Accept::Result
result (this);
761 // Call the application handler.
762 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
764 handler
->handle_accept (result
);
767 ACE_POSIX_Asynch_Accept_Result::~ACE_POSIX_Asynch_Accept_Result ()
771 // *********************************************************************
773 ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor
* posix_proactor
)
774 : ACE_POSIX_Asynch_Operation (posix_proactor
),
779 ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept ()
782 this->reactor (0); // to avoid purge_pending_notifications
786 ACE_POSIX_Asynch_Accept::get_handle () const
788 return this->handle_
;
792 ACE_POSIX_Asynch_Accept::set_handle (ACE_HANDLE handle
)
794 ACE_ASSERT (handle_
== ACE_INVALID_HANDLE
);
795 this->handle_
= handle
;
799 ACE_POSIX_Asynch_Accept::open (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
801 const void *completion_key
,
802 ACE_Proactor
*proactor
)
804 ACE_TRACE ("ACE_POSIX_Asynch_Accept::open");
806 // if we are already opened,
807 // we could not create a new handler without closing the previous
809 ACELIB_ERROR_RETURN ((LM_ERROR
,
810 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
811 ACE_TEXT("acceptor already open\n")),
814 if (-1 == ACE_POSIX_Asynch_Operation::open (handler_proxy
,
822 ACE_Asynch_Pseudo_Task
& task
=
823 this->posix_proactor ()->get_asynch_pseudo_task ();
825 if (-1 == task
.register_io_handler (this->get_handle(),
827 ACE_Event_Handler::ACCEPT_MASK
,
828 1)) // suspend after register
830 this->flg_open_
= false;
831 this->handle_
= ACE_INVALID_HANDLE
;
839 ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block
&message_block
,
840 size_t bytes_to_read
,
841 ACE_HANDLE accept_handle
,
847 ACE_TRACE ("ACE_POSIX_Asynch_Accept::accept");
849 if (!this->flg_open_
)
850 ACELIB_ERROR_RETURN ((LM_ERROR
,
851 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
852 ACE_TEXT("acceptor was not opened before\n")),
855 // Sanity check: make sure that enough space has been allocated by
857 size_t address_size
= sizeof (sockaddr_in
);
858 #if defined (ACE_HAS_IPV6)
859 if (addr_family
== AF_INET6
)
860 address_size
= sizeof (sockaddr_in6
);
862 ACE_UNUSED_ARG (addr_family
);
864 size_t available_space
= message_block
.space ();
865 size_t space_needed
= bytes_to_read
+ 2 * address_size
;
867 if (available_space
< space_needed
)
869 ACE_OS::last_error (ENOBUFS
);
873 // Common code for both WIN and POSIX.
874 // Create future Asynch_Accept_Result
875 ACE_POSIX_Asynch_Accept_Result
*result
= 0;
876 ACE_NEW_RETURN (result
,
877 ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_
,
883 this->posix_proactor()->get_handle (),
890 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
891 if (this->result_queue_
.enqueue_tail (result
) == -1)
893 ACELIB_ERROR ((LM_ERROR
,
894 ACE_TEXT ("ACE_POSIX_Asynch_Accept::accept: %p\n")
895 ACE_TEXT ("enqueue_tail")));
896 delete result
; // to avoid memory leak
900 if (this->result_queue_
.size () > 1)
904 // If this is the only item, then it means there the set was empty
905 // before. So enable the accept handle in the reactor.
907 ACE_Asynch_Pseudo_Task
& task
=
908 this->posix_proactor ()->get_asynch_pseudo_task ();
910 return task
.resume_io_handler (this->get_handle ());
913 //@@ New method cancel_uncompleted
914 // It performs cancellation of all pending requests
916 // Parameter flg_notify can be
917 // 0 - don't send notifications about canceled accepts
918 // !0 - notify user about canceled accepts
919 // according POSIX standards we should receive notifications
920 // on canceled AIO requests
922 // Return value : number of cancelled requests
926 ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify
)
928 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel_uncompleted");
934 ACE_POSIX_Asynch_Accept_Result
* result
= 0;
936 this->result_queue_
.dequeue_head (result
);
941 if (this->flg_open_
== 0 || flg_notify
== 0) //if we should not notify
942 delete result
; // we have to delete result
943 else //else notify as any cancelled AIO
945 // Store the new handle.
946 result
->aio_fildes
= ACE_INVALID_HANDLE
;
947 result
->set_bytes_transferred (0);
948 result
->set_error (ECANCELED
);
950 if (this->posix_proactor ()->post_completion (result
) == -1)
951 ACELIB_ERROR ((LM_ERROR
,
952 ACE_TEXT("(%P | %t):%p\n"),
953 ACE_TEXT("ACE_POSIX_Asynch_Accept::")
954 ACE_TEXT("cancel_uncompleted")
962 ACE_POSIX_Asynch_Accept::cancel ()
964 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel");
966 // Since this is not a real POSIX asynch I/O operation, we can't
967 // call ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
968 // We delegate real cancelation to cancel_uncompleted (1)
970 int rc
= -1 ; // ERRORS
973 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
975 int num_cancelled
= cancel_uncompleted (flg_open_
);
977 if (num_cancelled
== 0)
978 rc
= 1 ; // AIO_ALLDONE
979 else if (num_cancelled
> 0)
980 rc
= 0 ; // AIO_CANCELED
982 if (!this->flg_open_
)
986 ACE_Asynch_Pseudo_Task
& task
=
987 this->posix_proactor ()->get_asynch_pseudo_task ();
989 task
.suspend_io_handler (this->get_handle());
994 ACE_POSIX_Asynch_Accept::close ()
996 ACE_TRACE ("ACE_POSIX_Asynch_Accept::close");
998 // 1. It performs cancellation of all pending requests
999 // 2. Removes itself from Reactor ( ACE_Asynch_Pseudo_Task)
1000 // 3. close the socket
1002 // Parameter flg_notify can be
1003 // 0 - don't send notifications about canceled accepts
1004 // !0 - notify user about canceled accepts
1005 // according POSIX standards we should receive notifications
1006 // on canceled AIO requests
1008 // Return codes : 0 - OK ,
1012 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
1013 this->cancel_uncompleted (flg_open_
);
1016 if (!this->flg_open_
)
1018 if (this->handle_
!= ACE_INVALID_HANDLE
)
1020 ACE_OS::closesocket (this->handle_
);
1021 this->handle_
= ACE_INVALID_HANDLE
;
1026 if (this->handle_
== ACE_INVALID_HANDLE
)
1029 ACE_Asynch_Pseudo_Task
& task
=
1030 this->posix_proactor ()->get_asynch_pseudo_task ();
1032 task
.remove_io_handler (this->get_handle ());
1033 if (this->handle_
!= ACE_INVALID_HANDLE
)
1035 ACE_OS::closesocket (this->handle_
);
1036 this->handle_
= ACE_INVALID_HANDLE
;
1039 this->flg_open_
= false;
1045 ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE
, ACE_Reactor_Mask
)
1047 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_close");
1049 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0));
1051 // handle_close is called in two cases:
1052 // 1. Pseudo task is closing (i.e. proactor destructor)
1053 // 2. The listen handle is closed (we don't have exclusive access to this)
1055 this->cancel_uncompleted (0);
1057 this->flg_open_
= false;
1058 this->handle_
= ACE_INVALID_HANDLE
;
1063 ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE
/* fd */)
1065 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_input");
1067 // An <accept> has been sensed on the <listen_handle>. We should be
1068 // able to just go ahead and do the <accept> now on this <fd>. This
1069 // should be the same as the <listen_handle>.
1071 ACE_POSIX_Asynch_Accept_Result
* result
= 0;
1074 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0));
1076 // Deregister this info pertaining to this accept call.
1077 if (this->result_queue_
.dequeue_head (result
) != 0)
1078 ACELIB_ERROR ((LM_ERROR
,
1079 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1080 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
1081 ACE_TEXT( " dequeueing failed")));
1083 // Disable the handle in the reactor if no more accepts are pending.
1084 if (this->result_queue_
.size () == 0)
1086 ACE_Asynch_Pseudo_Task
& task
=
1087 this->posix_proactor ()->get_asynch_pseudo_task ();
1089 task
.suspend_io_handler (this->get_handle());
1093 // Issue <accept> now.
1094 // @@ We shouldnt block here since we have already done poll/select
1095 // thru reactor. But are we sure?
1097 ACE_HANDLE new_handle
= ACE_OS::accept (this->handle_
, 0, 0);
1099 if (result
== 0) // there is nobody to notify
1101 ACE_OS::closesocket (new_handle
);
1105 if (new_handle
== ACE_INVALID_HANDLE
)
1107 result
->set_error (errno
);
1108 ACELIB_ERROR ((LM_ERROR
,
1109 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1110 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
1111 ACE_TEXT("accept")));
1113 // Notify client as usual, "AIO" finished with errors
1116 // Store the new handle.
1117 result
->aio_fildes
= new_handle
;
1119 // Notify the main process about this completion
1120 // Send the Result through the notification pipe.
1121 if (this->posix_proactor ()->post_completion (result
) == -1)
1122 ACELIB_ERROR ((LM_ERROR
,
1123 ACE_TEXT("Error:(%P | %t):%p\n"),
1124 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
1125 ACE_TEXT(" <post_completion> failed")));
1130 // *********************************************************************
1133 ACE_POSIX_Asynch_Connect_Result::connect_handle () const
1135 return this->aio_fildes
;
1138 void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle
)
1140 this->aio_fildes
= handle
;
1144 ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result
1145 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
1146 ACE_HANDLE connect_handle
,
1151 : ACE_POSIX_Asynch_Result
1152 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
)
1154 this->aio_fildes
= connect_handle
;
1155 this->aio_nbytes
= 0;
1159 ACE_POSIX_Asynch_Connect_Result::complete (size_t bytes_transferred
,
1161 const void *completion_key
,
1165 this->bytes_transferred_
= bytes_transferred
;
1166 this->success_
= success
;
1167 this->completion_key_
= completion_key
;
1168 this->error_
= error
;
1170 // Create the interface result class.
1171 ACE_Asynch_Connect::Result
result (this);
1173 // Call the application handler.
1174 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
1176 handler
->handle_connect (result
);
1179 ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result ()
1183 // *********************************************************************
1185 ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor
* posix_proactor
)
1186 : ACE_POSIX_Asynch_Operation (posix_proactor
),
1191 ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect ()
1194 this->reactor(0); // to avoid purge_pending_notifications
1198 ACE_POSIX_Asynch_Connect::get_handle () const
1201 return ACE_INVALID_HANDLE
;
1205 ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE
)
1211 ACE_POSIX_Asynch_Connect::open (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
1213 const void *completion_key
,
1214 ACE_Proactor
*proactor
)
1216 ACE_TRACE ("ACE_POSIX_Asynch_Connect::open");
1218 if (this->flg_open_
)
1222 ACE_POSIX_Asynch_Operation::open (handler_proxy
,
1227 // Ignore result as we pass ACE_INVALID_HANDLE
1231 this->flg_open_
= true;
1237 ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle
,
1238 const ACE_Addr
& remote_sap
,
1239 const ACE_Addr
& local_sap
,
1245 ACE_TRACE ("ACE_POSIX_Asynch_Connect::connect");
1247 if (this->flg_open_
== 0)
1248 ACELIB_ERROR_RETURN ((LM_ERROR
,
1249 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
1250 ACE_TEXT("connector was not opened before\n")),
1253 // Common code for both WIN and POSIX.
1254 // Create future Asynch_Connect_Result
1255 ACE_POSIX_Asynch_Connect_Result
*result
= 0;
1256 ACE_NEW_RETURN (result
,
1257 ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_
,
1260 this->posix_proactor ()->get_handle (),
1265 int rc
= connect_i (result
,
1271 connect_handle
= result
->connect_handle ();
1274 return post_result (result
, true);
1276 // Enqueue result we will wait for completion
1278 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
1280 if (this->result_map_
.bind (connect_handle
, result
) == -1)
1282 ACELIB_ERROR ((LM_ERROR
,
1283 ACE_TEXT ("%N:%l:%p\n"),
1284 ACE_TEXT ("ACE_POSIX_Asynch_Connect::connect:")
1285 ACE_TEXT ("bind")));
1287 result
->set_error (EFAULT
);
1288 return post_result (result
, true);
1292 ACE_Asynch_Pseudo_Task
& task
=
1293 this->posix_proactor ()->get_asynch_pseudo_task ();
1295 rc
= task
.register_io_handler (connect_handle
,
1297 ACE_Event_Handler::CONNECT_MASK
,
1298 0); // don't suspend after register
1302 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
1304 this->result_map_
.unbind (connect_handle
, result
);
1308 result
->set_error (EFAULT
);
1309 this->post_result (result
, true);
1320 int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result
* result
,
1323 if (this->flg_open_
&& post_enable
!= 0)
1325 if (this->posix_proactor ()->post_completion (result
) == 0)
1328 ACELIB_ERROR ((LM_ERROR
,
1329 ACE_TEXT("Error:(%P | %t):%p\n"),
1330 ACE_TEXT("ACE_POSIX_Asynch_Connect::post_result: ")
1331 ACE_TEXT(" <post_completion> failed")));
1334 ACE_HANDLE handle
= result
->connect_handle ();
1336 if (handle
!= ACE_INVALID_HANDLE
)
1337 ACE_OS::closesocket (handle
);
1346 // -1 errors before attempt to connect
1347 // 0 connect started
1348 // 1 connect finished ( may be unsuccessfully)
1351 ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result
*result
,
1352 const ACE_Addr
& remote_sap
,
1353 const ACE_Addr
& local_sap
,
1356 result
->set_bytes_transferred (0);
1358 ACE_HANDLE handle
= result
->connect_handle ();
1360 if (handle
== ACE_INVALID_HANDLE
)
1362 int protocol_family
= remote_sap
.get_type ();
1364 handle
= ACE_OS::socket (protocol_family
,
1368 result
->connect_handle (handle
);
1369 if (handle
== ACE_INVALID_HANDLE
)
1371 result
->set_error (errno
);
1374 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1375 ACE_TEXT("socket")),
1379 // Reuse the address
1381 if (protocol_family
!= PF_UNIX
&&
1383 ACE_OS::setsockopt (handle
,
1389 result
->set_error (errno
);
1392 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1393 ACE_TEXT("setsockopt")),
1398 if (local_sap
!= ACE_Addr::sap_any
)
1400 sockaddr
* laddr
= reinterpret_cast<sockaddr
*> (local_sap
.get_addr ());
1401 size_t size
= local_sap
.get_size ();
1403 if (ACE_OS::bind (handle
, laddr
, size
) == -1)
1405 result
->set_error (errno
);
1408 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1414 // set non blocking mode
1415 if (ACE::set_flags (handle
, ACE_NONBLOCK
) != 0)
1417 result
->set_error (errno
);
1420 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n")
1421 ACE_TEXT("set_flags")),
1427 int rc
= ACE_OS::connect
1429 reinterpret_cast<sockaddr
*> (remote_sap
.get_addr ()),
1430 remote_sap
.get_size ());
1431 if (rc
< 0) // failure
1433 if (errno
== EWOULDBLOCK
|| errno
== EINPROGRESS
)
1434 return 0; // connect started
1439 result
->set_error (errno
);
1442 return 1 ; // connect finished
1445 ACE_NOTREACHED (return 0);
1449 //@@ New method cancel_uncompleted
1450 // It performs cancellation of all pending requests
1452 // Parameter flg_notify can be
1453 // 0 - don't send notifications about canceled accepts
1454 // !0 - notify user about canceled accepts
1455 // according POSIX standards we should receive notifications
1456 // on canceled AIO requests
1458 // Return value : number of cancelled requests
1462 ACE_POSIX_Asynch_Connect::cancel_uncompleted (bool flg_notify
,
1463 ACE_Handle_Set
& set
)
1465 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel_uncompleted");
1469 MAP_MANAGER::ITERATOR
iter (result_map_
);
1470 MAP_MANAGER::ENTRY
* me
= 0;
1474 for (; iter
.next (me
) != 0; retval
++ , iter
.advance ())
1476 ACE_HANDLE handle
= me
->ext_id_
;
1477 ACE_POSIX_Asynch_Connect_Result
* result
= me
->int_id_
;
1479 set
.set_bit (handle
);
1481 result
->set_bytes_transferred (0);
1482 result
->set_error (ECANCELED
);
1483 this->post_result (result
, flg_notify
);
1486 result_map_
.unbind_all ();
1492 ACE_POSIX_Asynch_Connect::cancel ()
1494 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel");
1496 // Since this is not a real asynch I/O operation, we can't just call
1497 // ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
1498 // Delegate real cancelation to cancel_uncompleted (1)
1500 int rc
= -1 ; // ERRORS
1503 int num_cancelled
= 0;
1505 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
1506 num_cancelled
= cancel_uncompleted (flg_open_
, set
);
1508 if (num_cancelled
== 0)
1509 rc
= 1 ; // AIO_ALLDONE
1510 else if (num_cancelled
> 0)
1511 rc
= 0 ; // AIO_CANCELED
1513 if (!this->flg_open_
)
1516 ACE_Asynch_Pseudo_Task
& task
=
1517 this->posix_proactor ()->get_asynch_pseudo_task ();
1519 task
.remove_io_handler (set
);
1524 ACE_POSIX_Asynch_Connect::close ()
1526 ACE_TRACE ("ACE_POSIX_Asynch_Connect::close");
1528 ACE_Handle_Set set
;
1529 int num_cancelled
= 0;
1531 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, -1));
1532 num_cancelled
= cancel_uncompleted (flg_open_
, set
);
1535 if (num_cancelled
== 0 || !this->flg_open_
)
1537 this->flg_open_
= false;
1541 ACE_Asynch_Pseudo_Task
& task
=
1542 this->posix_proactor ()->get_asynch_pseudo_task ();
1544 task
.remove_io_handler (set
);
1545 this->flg_open_
= false;
1551 ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd
)
1553 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_output");
1555 ACE_POSIX_Asynch_Connect_Result
* result
= 0;
1558 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0));
1559 if (this->result_map_
.unbind (fd
, result
) != 0) // not found
1564 int lsockerror
= sizeof sockerror
;
1566 ACE_OS::getsockopt (fd
,
1572 result
->set_bytes_transferred (0);
1573 result
->set_error (sockerror
);
1575 // This previously just did a "return -1" and let handle_close() clean
1576 // things up. However, this entire object may be gone as a result of
1577 // the application's completion handler, so don't count on 'this' being
1578 // legitimate on return from post_result().
1579 // remove_io_handler() contains flag DONT_CALL
1580 this->posix_proactor ()->get_asynch_pseudo_task ().remove_io_handler (fd
);
1581 this->post_result (result
, this->flg_open_
);
1587 ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd
, ACE_Reactor_Mask
)
1589 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_close");
1591 ACE_Asynch_Pseudo_Task
&task
=
1592 this->posix_proactor ()->get_asynch_pseudo_task ();
1594 task
.remove_io_handler (fd
);
1596 ACE_POSIX_Asynch_Connect_Result
* result
= 0;
1599 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX
, ace_mon
, this->lock_
, 0));
1600 if (this->result_map_
.unbind (fd
, result
) != 0) // not found
1604 result
->set_bytes_transferred (0);
1605 result
->set_error (ECANCELED
);
1606 this->post_result (result
, this->flg_open_
);
1611 // *********************************************************************
1614 ACE_POSIX_Asynch_Transmit_File_Result::socket () const
1616 return this->socket_
;
1620 ACE_POSIX_Asynch_Transmit_File_Result::file () const
1622 return this->aio_fildes
;
1625 ACE_Asynch_Transmit_File::Header_And_Trailer
*
1626 ACE_POSIX_Asynch_Transmit_File_Result::header_and_trailer () const
1628 return this->header_and_trailer_
;
1632 ACE_POSIX_Asynch_Transmit_File_Result::bytes_to_write () const
1634 return this->aio_nbytes
;
1638 ACE_POSIX_Asynch_Transmit_File_Result::bytes_per_send () const
1640 return this->bytes_per_send_
;
1644 ACE_POSIX_Asynch_Transmit_File_Result::flags () const
1646 return this->flags_
;
1649 ACE_POSIX_Asynch_Transmit_File_Result::ACE_POSIX_Asynch_Transmit_File_Result
1650 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
1653 ACE_Asynch_Transmit_File::Header_And_Trailer
*header_and_trailer
,
1654 size_t bytes_to_write
,
1657 size_t bytes_per_send
,
1663 : ACE_POSIX_Asynch_Result
1664 (handler_proxy
, act
, event
, offset
, offset_high
, priority
, signal_number
),
1666 header_and_trailer_ (header_and_trailer
),
1667 bytes_per_send_ (bytes_per_send
),
1670 this->aio_fildes
= file
;
1671 this->aio_nbytes
= bytes_to_write
;
1675 ACE_POSIX_Asynch_Transmit_File_Result::complete (size_t bytes_transferred
,
1677 const void *completion_key
,
1681 this->bytes_transferred_
= bytes_transferred
;
1682 this->success_
= success
;
1683 this->completion_key_
= completion_key
;
1684 this->error_
= error
;
1686 // We will not do this because (a) the header and trailer blocks may
1687 // be the same message_blocks and (b) in cases of failures we have
1688 // no idea how much of what (header, data, trailer) was sent.
1690 if (this->success_ && this->header_and_trailer_ != 0)
1692 ACE_Message_Block *header = this->header_and_trailer_->header ();
1694 header->rd_ptr (this->header_and_trailer_->header_bytes ());
1696 ACE_Message_Block *trailer = this->header_and_trailer_->trailer ();
1698 trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
1702 // Create the interface result class.
1703 ACE_Asynch_Transmit_File::Result
result (this);
1705 // Call the application handler.
1706 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
1708 handler
->handle_transmit_file (result
);
1711 ACE_POSIX_Asynch_Transmit_File_Result::~ACE_POSIX_Asynch_Transmit_File_Result ()
1716 // *********************************************************************
1719 * @class ACE_POSIX_Asynch_Transmit_Handler
1721 * @brief Auxillary handler for doing <Asynch_Transmit_File> in
1722 * Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
1724 * This is a helper class for implementing
1725 * <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
1727 class ACE_Export ACE_POSIX_Asynch_Transmit_Handler
: public ACE_Handler
1730 /// Constructor. Result pointer will have all the information to do
1731 /// the file transmission (socket, file, application handler, bytes
1733 ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Proactor
*posix_proactor
,
1734 ACE_POSIX_Asynch_Transmit_File_Result
*result
);
1737 ~ACE_POSIX_Asynch_Transmit_Handler () override
;
1739 /// Do the transmission. All the info to do the transmission is in
1740 /// the <result> member.
1744 /// The asynch result pointer made from the initial transmit file
1746 ACE_POSIX_Asynch_Transmit_File_Result
*result_
;
1748 /// Message bloack used to do the transmission.
1749 ACE_Message_Block
*mb_
;
1758 /// ACT to transmit header.
1761 /// ACT to transmit data.
1764 /// ACT to transmit trailer.
1767 /// Current offset of the file being transmitted.
1768 size_t file_offset_
;
1770 /// Total size of the file.
1773 /// Number of bytes transferred on the stream.
1774 size_t bytes_transferred_
;
1776 /// This is called when asynchronous writes from the socket complete.
1777 void handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
) override
;
1779 /// This is called when asynchronous reads from the file complete.
1780 void handle_read_file (const ACE_Asynch_Read_File::Result
&result
) override
;
1782 /// Issue asynch read from the file.
1783 int initiate_read_file ();
1785 /// To read from the file to be transmitted.
1786 ACE_POSIX_Asynch_Read_File rf_
;
1788 /// Write stream to write the header, trailer and the data.
1789 ACE_POSIX_Asynch_Write_Stream ws_
;
1792 // ************************************************************
1795 ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler
1796 (ACE_POSIX_Proactor
*posix_proactor
,
1797 ACE_POSIX_Asynch_Transmit_File_Result
*result
)
1800 header_act_ (this->HEADER_ACT
),
1801 data_act_ (this->DATA_ACT
),
1802 trailer_act_ (this->TRAILER_ACT
),
1803 file_offset_ (result
->offset ()),
1805 bytes_transferred_ (0),
1806 rf_ (posix_proactor
),
1807 ws_ (posix_proactor
)
1809 // Allocate memory for the message block.
1811 ACE_Message_Block (this->result_
->bytes_per_send ()
1813 // Init the file size.
1814 file_size_
= ACE_OS::filesize (this->result_
->file ());
1818 ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler ()
1825 // Do the transmission.
1826 // Initiate transmitting the header. When that completes
1827 // handle_write_stream will be called, there start transmitting the file.
1829 ACE_POSIX_Asynch_Transmit_Handler::transmit ()
1831 // No proactor is given for the <open>'s. Because we are using the
1832 // concrete implementations of the Asynch_Operations, and we have
1833 // already given them the specific proactor, so they wont need the
1834 // general <proactor> interface pointer.
1836 // Open Asynch_Read_File.
1837 if (this->rf_
.open (this->proxy (),
1838 this->result_
->file (),
1841 ACELIB_ERROR_RETURN ((LM_ERROR
,
1842 "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
1845 // Open Asynch_Write_Stream.
1846 if (this->ws_
.open (this->proxy (),
1847 this->result_
->socket (),
1850 ACELIB_ERROR_RETURN ((LM_ERROR
,
1851 "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
1854 // Transmit the header.
1855 if (this->ws_
.write (*this->result_
->header_and_trailer ()->header (),
1856 this->result_
->header_and_trailer ()->header_bytes (),
1857 reinterpret_cast<void *> (&this->header_act_
),
1859 ACELIB_ERROR_RETURN ((LM_ERROR
,
1860 "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
1866 ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result
&result
)
1868 // Update bytes transferred so far.
1869 this->bytes_transferred_
+= result
.bytes_transferred ();
1871 // Check the success parameter.
1872 if (result
.success () == 0)
1876 ACELIB_ERROR ((LM_ERROR
,
1877 "Asynch_Transmit_File failed.\n"));
1881 this->result_
->complete (this->bytes_transferred_
,
1883 0, // @@ Completion key.
1888 // This is crucial to prevent memory leaks. This deletes
1889 // the result pointer also.
1894 // Write stream successful.
1896 // Partial write to socket.
1897 size_t unsent_data
= result
.bytes_to_write () - result
.bytes_transferred ();
1898 if (unsent_data
!= 0)
1900 ACELIB_DEBUG ((LM_DEBUG
,
1901 "%N:%l:Partial write to socket: Asynch_write called again\n"));
1903 // Duplicate the message block and retry remaining data
1904 if (this->ws_
.write (*result
.message_block ().duplicate (),
1907 this->result_
->priority (),
1908 this->result_
->signal_number ()) == -1)
1910 // @@ Handle this error.
1911 ACELIB_ERROR ((LM_ERROR
,
1912 "Asynch_Transmit_Handler:write_stream failed\n"));
1916 // @@ Handling *partial write* to a socket. Let us not continue
1917 // further before this write finishes. Because proceeding with
1918 // another read and then write might change the order of the
1919 // file transmission, because partial write to the stream is
1924 // Not a partial write. A full write.
1926 // Check ACT to see what was sent.
1927 ACT act
= * (ACT
*) result
.act ();
1932 // If it is the "trailer" that is just sent, then transmit file
1934 // Call the application handler.
1937 this->result_
->complete (this->bytes_transferred_
,
1939 0, // @@ Completion key.
1950 // If header/data was sent, initiate the file data transmission.
1951 if (this->initiate_read_file () == -1)
1952 // @@ Handle this error.
1953 ACELIB_ERROR ((LM_ERROR
,
1954 "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
1958 // @@ Handle this error.
1959 ACELIB_ERROR ((LM_ERROR
,
1960 "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
1965 ACE_POSIX_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result
&result
)
1968 if (result
.success () == 0)
1972 this->result_
->complete (this->bytes_transferred_
,
1974 0, // @@ Completion key.
1975 errno
); // Error no.
1985 if (result
.bytes_transferred () == 0)
1988 // Increment offset.
1989 this->file_offset_
+= result
.bytes_transferred ();
1991 // Write data to network.
1992 if (this->ws_
.write (result
.message_block (),
1993 result
.bytes_transferred (),
1994 (void *)&this->data_act_
,
1995 this->result_
->priority (),
1996 this->result_
->signal_number ()) == -1)
1998 // @@ Handle this error.
1999 ACELIB_ERROR ((LM_ERROR
,
2000 "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
2006 ACE_POSIX_Asynch_Transmit_Handler::initiate_read_file ()
2008 // Is there something to read.
2009 if (this->file_offset_
>= this->file_size_
)
2011 // File is sent. Send the trailer.
2012 if (this->ws_
.write (*this->result_
->header_and_trailer ()->trailer (),
2013 this->result_
->header_and_trailer ()->trailer_bytes (),
2014 (void *)&this->trailer_act_
,
2015 this->result_
->priority (),
2016 this->result_
->signal_number ()) == -1)
2017 ACELIB_ERROR_RETURN ((LM_ERROR
,
2018 "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
2024 // @@ Is this right??
2025 // Previous reads and writes are over. For the new read, adjust
2026 // the wr_ptr and the rd_ptr to the beginning.
2027 this->mb_
->rd_ptr (this->mb_
->base ());
2028 this->mb_
->wr_ptr (this->mb_
->base ());
2030 // Inititiate an asynchronous read from the file.
2031 if (this->rf_
.read (*this->mb_
,
2032 this->mb_
->size () - 1,
2034 0, // @@ offset_high !!! if aiocb64 is used.
2036 this->result_
->priority (),
2037 this->result_
->signal_number ()) == -1)
2038 ACELIB_ERROR_RETURN ((LM_ERROR
,
2039 "Error:Asynch_Transmit_Handler::read from file failed\n"),
2045 // *********************************************************************
2047 ACE_POSIX_Asynch_Transmit_File::ACE_POSIX_Asynch_Transmit_File (ACE_POSIX_Proactor
*posix_proactor
)
2048 : ACE_POSIX_Asynch_Operation (posix_proactor
)
2053 ACE_POSIX_Asynch_Transmit_File::transmit_file (ACE_HANDLE file
,
2054 ACE_Asynch_Transmit_File::Header_And_Trailer
*header_and_trailer
,
2055 size_t bytes_to_write
,
2058 size_t bytes_per_send
,
2064 // Adjust these parameters if there are default values specified.
2065 ssize_t file_size
= ACE_OS::filesize (file
);
2067 if (file_size
== -1)
2068 ACELIB_ERROR_RETURN ((LM_ERROR
,
2069 ACE_TEXT("Error:%N:%l:%p\n"),
2070 ACE_TEXT("POSIX_Asynch_Transmit_File:filesize failed")),
2073 if (bytes_to_write
== 0)
2074 bytes_to_write
= file_size
;
2076 if (offset
> (size_t) file_size
)
2077 ACELIB_ERROR_RETURN ((LM_ERROR
,
2078 ACE_TEXT("Error:%p\n"),
2079 ACE_TEXT("Asynch_Transmit_File:File size is less than offset")),
2083 bytes_to_write
= file_size
- offset
+ 1;
2085 if (bytes_per_send
== 0)
2086 bytes_per_send
= bytes_to_write
;
2088 // Configure the result parameter.
2089 ACE_POSIX_Asynch_Transmit_File_Result
*result
= 0;
2091 ACE_NEW_RETURN (result
,
2092 ACE_POSIX_Asynch_Transmit_File_Result (this->handler_proxy_
,
2102 this->posix_proactor ()->get_handle (),
2107 // Make the auxillary handler and initiate transmit.
2108 ACE_POSIX_Asynch_Transmit_Handler
*transmit_handler
= 0;
2110 ACE_NEW_RETURN (transmit_handler
,
2111 ACE_POSIX_Asynch_Transmit_Handler (this->posix_proactor (),
2115 ssize_t return_val
= transmit_handler
->transmit ();
2117 if (return_val
== -1)
2118 // This deletes the <result> in it too.
2119 delete transmit_handler
;
2124 ACE_POSIX_Asynch_Transmit_File::~ACE_POSIX_Asynch_Transmit_File ()
2128 // *********************************************************************
2130 ACE_POSIX_Asynch_Read_Dgram_Result::bytes_to_read () const
2132 return this->bytes_to_read_
;
2136 ACE_POSIX_Asynch_Read_Dgram_Result::remote_address (ACE_Addr
& addr
) const
2138 int retVal
= -1; // failure
2140 // make sure the addresses are of the same type
2141 if (addr
.get_type () == this->remote_address_
->get_type ())
2142 { // copy the remote_address_ into addr
2143 addr
.set_addr (this->remote_address_
->get_addr (),
2144 this->remote_address_
->get_size ());
2145 retVal
= 0; // success
2152 ACE_POSIX_Asynch_Read_Dgram_Result::saddr () const
2154 return (sockaddr
*) this->remote_address_
->get_addr ();
2159 ACE_POSIX_Asynch_Read_Dgram_Result::flags () const
2161 return this->flags_
;
2165 ACE_POSIX_Asynch_Read_Dgram_Result::handle () const
2167 return this->handle_
;
2171 ACE_POSIX_Asynch_Read_Dgram_Result::message_block () const
2173 return this->message_block_
;
2176 ACE_POSIX_Asynch_Read_Dgram_Result::ACE_POSIX_Asynch_Read_Dgram_Result
2177 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
2179 ACE_Message_Block
*message_block
,
2180 size_t bytes_to_read
,
2182 int protocol_family
,
2187 : ACE_POSIX_Asynch_Result
2188 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
2189 bytes_to_read_ (bytes_to_read
),
2190 message_block_ (message_block
),
2191 remote_address_ (0),
2196 ACE_UNUSED_ARG (protocol_family
);
2197 this->aio_fildes
= handle
;
2198 this->aio_buf
= message_block
->wr_ptr ();
2199 this->aio_nbytes
= bytes_to_read
;
2200 ACE_NEW (this->remote_address_
, ACE_INET_Addr
);
2204 ACE_POSIX_Asynch_Read_Dgram_Result::complete (size_t bytes_transferred
,
2206 const void *completion_key
,
2209 // Copy the data which was returned by GetQueuedCompletionStatus
2210 this->bytes_transferred_
= bytes_transferred
;
2211 this->success_
= success
;
2212 this->completion_key_
= completion_key
;
2213 this->error_
= error
;
2215 // Appropriately move the pointers in the message block.
2216 this->message_block_
->wr_ptr (bytes_transferred
);
2218 // <errno> is available in the aiocb.
2219 ACE_UNUSED_ARG (error
);
2221 this->remote_address_
->set_size(this->addr_len_
);
2223 // Create the interface result class.
2224 ACE_Asynch_Read_Dgram::Result
result (this);
2226 // Call the application handler.
2227 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
2229 handler
->handle_read_dgram (result
);
2232 ACE_POSIX_Asynch_Read_Dgram_Result::~ACE_POSIX_Asynch_Read_Dgram_Result ()
2234 delete this->remote_address_
;
2237 //***************************************************************************
2239 ACE_POSIX_Asynch_Write_Dgram_Result::bytes_to_write () const
2241 return this->bytes_to_write_
;
2245 ACE_POSIX_Asynch_Write_Dgram_Result::flags () const
2247 return this->flags_
;
2251 ACE_POSIX_Asynch_Write_Dgram_Result::handle () const
2253 return this->handle_
;
2258 ACE_POSIX_Asynch_Write_Dgram_Result::message_block () const
2260 return this->message_block_
;
2263 ACE_POSIX_Asynch_Write_Dgram_Result::ACE_POSIX_Asynch_Write_Dgram_Result
2264 (const ACE_Handler::Proxy_Ptr
&handler_proxy
,
2266 ACE_Message_Block
*message_block
,
2267 size_t bytes_to_write
,
2273 : ACE_POSIX_Asynch_Result
2274 (handler_proxy
, act
, event
, 0, 0, priority
, signal_number
),
2275 bytes_to_write_ (bytes_to_write
),
2276 message_block_ (message_block
),
2281 this->aio_fildes
= handle
;
2282 this->aio_buf
= message_block
->rd_ptr ();
2283 this->aio_nbytes
= bytes_to_write
;
2287 ACE_POSIX_Asynch_Write_Dgram_Result::complete (size_t bytes_transferred
,
2289 const void *completion_key
,
2292 // Copy the data which was returned by GetQueuedCompletionStatus
2293 this->bytes_transferred_
= bytes_transferred
;
2294 this->success_
= success
;
2295 this->completion_key_
= completion_key
;
2296 this->error_
= error
;
2298 // <errno> is available in the aiocb.
2299 ACE_UNUSED_ARG (error
);
2301 // Appropriately move the pointers in the message block.
2302 this->message_block_
->rd_ptr (bytes_transferred
);
2304 // Create the interface result class.
2305 ACE_Asynch_Write_Dgram::Result
result (this);
2307 // Call the application handler.
2308 ACE_Handler
*handler
= this->handler_proxy_
.get ()->handler ();
2310 handler
->handle_write_dgram (result
);
2313 ACE_POSIX_Asynch_Write_Dgram_Result::~ACE_POSIX_Asynch_Write_Dgram_Result ()
2317 /***************************************************************************/
2318 ACE_POSIX_Asynch_Read_Dgram::~ACE_POSIX_Asynch_Read_Dgram ()
2323 ACE_POSIX_Asynch_Read_Dgram::recv (ACE_Message_Block
*message_block
,
2324 size_t & /*number_of_bytes_recvd*/,
2326 int protocol_family
,
2331 size_t space
= message_block
->space ();
2332 // Create the Asynch_Result.
2333 ACE_POSIX_Asynch_Read_Dgram_Result
*result
= 0;
2334 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
2335 ACE_NEW_RETURN (result
,
2336 ACE_POSIX_Asynch_Read_Dgram_Result (this->handler_proxy_
,
2343 proactor
->get_handle (),
2348 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_READ
);
2349 if (return_val
== -1)
2355 ACE_POSIX_Asynch_Read_Dgram::ACE_POSIX_Asynch_Read_Dgram (ACE_POSIX_Proactor
*posix_proactor
)
2356 : ACE_POSIX_Asynch_Operation (posix_proactor
)
2360 //***************************************************************************
2362 ACE_POSIX_Asynch_Write_Dgram::~ACE_POSIX_Asynch_Write_Dgram ()
2367 ACE_POSIX_Asynch_Write_Dgram::send (ACE_Message_Block
*message_block
,
2368 size_t &/*number_of_bytes_sent*/,
2370 const ACE_Addr
&/*addr*/,
2375 size_t len
= message_block
->length ();
2379 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
2380 ACE_TEXT ("Attempt to write 0 bytes\n")),
2383 ACE_POSIX_Asynch_Write_Dgram_Result
*result
= 0;
2384 ACE_POSIX_Proactor
*proactor
= this->posix_proactor ();
2385 ACE_NEW_RETURN (result
,
2386 ACE_POSIX_Asynch_Write_Dgram_Result (this->handler_proxy_
,
2392 proactor
->get_handle (),
2397 int return_val
= proactor
->start_aio (result
, ACE_POSIX_Proactor::ACE_OPCODE_WRITE
);
2398 if (return_val
== -1)
2404 ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram
2405 (ACE_POSIX_Proactor
*posix_proactor
)
2406 : ACE_POSIX_Asynch_Operation (posix_proactor
)
2410 ACE_END_VERSIONED_NAMESPACE_DECL
2412 #endif /* ACE_HAS_AIO_CALLS */