Document return values
[ACE_TAO.git] / ACE / ace / POSIX_Asynch_IO.cpp
blobc032bca87b93203d89e49765d9c6ef5761965ed8
1 #include "ace/POSIX_Asynch_IO.h"
3 #if defined (ACE_HAS_AIO_CALLS)
5 #include "ace/Flag_Manip.h"
6 #include "ace/Proactor.h"
7 #include "ace/Message_Block.h"
8 #include "ace/INET_Addr.h"
9 #include "ace/Asynch_Pseudo_Task.h"
10 #include "ace/POSIX_Proactor.h"
11 #include "ace/OS_NS_errno.h"
12 #include "ace/OS_NS_sys_socket.h"
13 #include "ace/OS_NS_sys_stat.h"
15 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
17 size_t
18 ACE_POSIX_Asynch_Result::bytes_transferred () const
20 return this->bytes_transferred_;
23 void
24 ACE_POSIX_Asynch_Result::set_bytes_transferred (size_t nbytes)
26 this->bytes_transferred_= nbytes;
29 const void *
30 ACE_POSIX_Asynch_Result::act () const
32 return this->act_;
35 int
36 ACE_POSIX_Asynch_Result::success () const
38 return this->success_;
41 const void *
42 ACE_POSIX_Asynch_Result::completion_key () const
44 return this->completion_key_;
47 u_long
48 ACE_POSIX_Asynch_Result::error () const
50 return this->error_;
53 void
54 ACE_POSIX_Asynch_Result::set_error (u_long errcode)
56 this->error_=errcode;
58 ACE_HANDLE
59 ACE_POSIX_Asynch_Result::event () const
61 return ACE_INVALID_HANDLE;
64 u_long
65 ACE_POSIX_Asynch_Result::offset () const
67 return this->aio_offset;
70 u_long
71 ACE_POSIX_Asynch_Result::offset_high () const
74 // @@ Support aiocb64??
76 ACE_NOTSUP_RETURN (0);
79 int
80 ACE_POSIX_Asynch_Result::priority () const
82 return this->aio_reqprio;
85 int
86 ACE_POSIX_Asynch_Result::signal_number () const
88 return this->aio_sigevent.sigev_signo;
91 int
92 ACE_POSIX_Asynch_Result::post_completion (ACE_Proactor_Impl *proactor_impl)
94 // Get to the platform specific implementation.
95 ACE_POSIX_Proactor *posix_proactor = dynamic_cast<ACE_POSIX_Proactor *> (proactor_impl);
97 if (posix_proactor == 0)
98 ACELIB_ERROR_RETURN ((LM_ERROR, "Dynamic cast to POSIX Proactor failed\n"), -1);
100 // Post myself.
101 return posix_proactor->post_completion (this);
104 ACE_POSIX_Asynch_Result::~ACE_POSIX_Asynch_Result ()
108 ACE_POSIX_Asynch_Result::ACE_POSIX_Asynch_Result
109 (const ACE_Handler::Proxy_Ptr &handler_proxy,
110 const void* act,
111 ACE_HANDLE /* event */, // Event is not used on POSIX.
112 u_long offset,
113 u_long offset_high,
114 int priority,
115 int signal_number)
116 : handler_proxy_ (handler_proxy),
117 act_ (act),
118 bytes_transferred_ (0),
119 success_ (0),
120 completion_key_ (0),
121 error_ (0)
123 aio_offset = offset;
124 aio_reqprio = priority;
125 aio_sigevent.sigev_signo = signal_number;
128 // @@ Support offset_high with aiocb64.
130 ACE_UNUSED_ARG (offset_high);
132 // Other fields in the <aiocb> will be initialized by the
133 // subclasses.
136 // ****************************************************************
139 ACE_POSIX_Asynch_Operation::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
140 ACE_HANDLE handle,
141 const void * /* completion_key */,
142 ACE_Proactor *proactor)
144 this->proactor_ = proactor;
145 this->handler_proxy_ = handler_proxy;
146 this->handle_ = handle;
148 // Grab the handle from the <handler> if <handle> is invalid
149 if (this->handle_ == ACE_INVALID_HANDLE)
151 ACE_Handler *handler = handler_proxy.get ()->handler ();
152 if (handler != 0)
153 this->handle_ = handler->handle ();
155 if (this->handle_ == ACE_INVALID_HANDLE)
156 return -1;
158 #if 0
159 // @@ If <proactor> is 0, let us not bother about getting this
160 // Proactor, we have already got the specific implementation
161 // Proactor.
163 // If no proactor was passed
164 if (this->proactor_ == 0)
166 // Grab the proactor from the <Service_Config> if
167 // <handler->proactor> is zero
168 this->proactor_ = this->handler_->proactor ();
169 if (this->proactor_ == 0)
170 this->proactor_ = ACE_Proactor::instance();
172 #endif /* 0 */
174 return 0;
178 ACE_POSIX_Asynch_Operation::cancel ()
180 if (!posix_proactor_)
181 return -1;
182 return posix_proactor_->cancel_aio (this->handle_);
185 ACE_Proactor *
186 ACE_POSIX_Asynch_Operation::proactor () const
188 return this->proactor_;
191 ACE_POSIX_Proactor *
192 ACE_POSIX_Asynch_Operation::posix_proactor () const
194 return this->posix_proactor_;
197 ACE_POSIX_Asynch_Operation::~ACE_POSIX_Asynch_Operation ()
201 ACE_POSIX_Asynch_Operation::ACE_POSIX_Asynch_Operation (ACE_POSIX_Proactor *posix_proactor)
202 : posix_proactor_ (posix_proactor),
203 handle_ (ACE_INVALID_HANDLE)
207 // *********************************************************************
209 size_t
210 ACE_POSIX_Asynch_Read_Stream_Result::bytes_to_read () const
212 return this->aio_nbytes;
215 ACE_Message_Block &
216 ACE_POSIX_Asynch_Read_Stream_Result::message_block () const
218 return this->message_block_;
221 ACE_HANDLE
222 ACE_POSIX_Asynch_Read_Stream_Result::handle () const
224 return this->aio_fildes;
227 ACE_POSIX_Asynch_Read_Stream_Result::ACE_POSIX_Asynch_Read_Stream_Result
228 (const ACE_Handler::Proxy_Ptr &handler_proxy,
229 ACE_HANDLE handle,
230 ACE_Message_Block &message_block,
231 size_t bytes_to_read,
232 const void* act,
233 ACE_HANDLE event,
234 int priority,
235 int signal_number)
236 : ACE_POSIX_Asynch_Result
237 (handler_proxy, act, event, 0, 0, priority, signal_number),
238 message_block_ (message_block)
240 this->aio_fildes = handle;
241 this->aio_buf = message_block.wr_ptr ();
242 this->aio_nbytes = bytes_to_read;
245 void
246 ACE_POSIX_Asynch_Read_Stream_Result::complete (size_t bytes_transferred,
247 int success,
248 const void *completion_key,
249 u_long error)
251 this->bytes_transferred_ = bytes_transferred;
252 this->success_ = success;
253 this->completion_key_ = completion_key;
254 this->error_ = error;
256 // <errno> is available in the aiocb.
257 ACE_UNUSED_ARG (error);
259 // Appropriately move the pointers in the message block.
260 this->message_block_.wr_ptr (bytes_transferred);
262 // Create the interface result class.
263 ACE_Asynch_Read_Stream::Result result (this);
265 // Call the application handler.
266 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
267 if (handler != 0)
268 handler->handle_read_stream (result);
271 ACE_POSIX_Asynch_Read_Stream_Result::~ACE_POSIX_Asynch_Read_Stream_Result ()
275 // ************************************************************
277 ACE_POSIX_Asynch_Read_Stream::ACE_POSIX_Asynch_Read_Stream (ACE_POSIX_Proactor *posix_proactor)
278 : ACE_POSIX_Asynch_Operation (posix_proactor)
283 ACE_POSIX_Asynch_Read_Stream::read (ACE_Message_Block &message_block,
284 size_t bytes_to_read,
285 const void *act,
286 int priority,
287 int signal_number)
289 size_t space = message_block.space ();
290 if (bytes_to_read > space)
291 bytes_to_read=space;
293 if (bytes_to_read == 0)
295 errno = ENOSPC;
296 return -1;
299 // Create the Asynch_Result.
300 ACE_POSIX_Asynch_Read_Stream_Result *result = 0;
301 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
302 ACE_NEW_RETURN (result,
303 ACE_POSIX_Asynch_Read_Stream_Result (this->handler_proxy_,
304 this->handle_,
305 message_block,
306 bytes_to_read,
307 act,
308 proactor->get_handle (),
309 priority,
310 signal_number),
311 -1);
313 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
314 if (return_val == -1)
315 delete result;
317 return return_val;
320 ACE_POSIX_Asynch_Read_Stream::~ACE_POSIX_Asynch_Read_Stream ()
324 // *********************************************************************
326 size_t
327 ACE_POSIX_Asynch_Write_Stream_Result::bytes_to_write () const
329 return this->aio_nbytes;
332 ACE_Message_Block &
333 ACE_POSIX_Asynch_Write_Stream_Result::message_block () const
335 return this->message_block_;
338 ACE_HANDLE
339 ACE_POSIX_Asynch_Write_Stream_Result::handle () const
341 return this->aio_fildes;
344 ACE_POSIX_Asynch_Write_Stream_Result::ACE_POSIX_Asynch_Write_Stream_Result
345 (const ACE_Handler::Proxy_Ptr &handler_proxy,
346 ACE_HANDLE handle,
347 ACE_Message_Block &message_block,
348 size_t bytes_to_write,
349 const void* act,
350 ACE_HANDLE event,
351 int priority,
352 int signal_number)
353 : ACE_POSIX_Asynch_Result
354 (handler_proxy, act, event, 0, 0, priority, signal_number),
355 message_block_ (message_block)
357 this->aio_fildes = handle;
358 this->aio_buf = message_block.rd_ptr ();
359 this->aio_nbytes = bytes_to_write;
362 void
363 ACE_POSIX_Asynch_Write_Stream_Result::complete (size_t bytes_transferred,
364 int success,
365 const void *completion_key,
366 u_long error)
368 // Get all the data copied.
369 this->bytes_transferred_ = bytes_transferred;
370 this->success_ = success;
371 this->completion_key_ = completion_key;
372 this->error_ = error;
374 // <errno> is available in the aiocb.
375 ACE_UNUSED_ARG (error);
377 // Appropriately move the pointers in the message block.
378 this->message_block_.rd_ptr (bytes_transferred);
380 // Create the interface result class.
381 ACE_Asynch_Write_Stream::Result result (this);
383 // Call the application handler.
384 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
385 if (handler != 0)
386 handler->handle_write_stream (result);
389 ACE_POSIX_Asynch_Write_Stream_Result::~ACE_POSIX_Asynch_Write_Stream_Result ()
393 // *********************************************************************
395 ACE_POSIX_Asynch_Write_Stream::ACE_POSIX_Asynch_Write_Stream (ACE_POSIX_Proactor *posix_proactor)
396 : ACE_POSIX_Asynch_Operation (posix_proactor)
401 ACE_POSIX_Asynch_Write_Stream::write (ACE_Message_Block &message_block,
402 size_t bytes_to_write,
403 const void *act,
404 int priority,
405 int signal_number)
407 size_t len = message_block.length ();
408 if (bytes_to_write > len)
409 bytes_to_write = len;
411 if (bytes_to_write == 0)
412 ACELIB_ERROR_RETURN
413 ((LM_ERROR,
414 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
415 ACE_TEXT ("Attempt to write 0 bytes\n")),
416 -1);
418 ACE_POSIX_Asynch_Write_Stream_Result *result = 0;
419 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
420 ACE_NEW_RETURN (result,
421 ACE_POSIX_Asynch_Write_Stream_Result (this->handler_proxy_,
422 this->handle_,
423 message_block,
424 bytes_to_write,
425 act,
426 proactor->get_handle (),
427 priority,
428 signal_number),
429 -1);
431 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
432 if (return_val == -1)
433 delete result;
435 return return_val;
438 ACE_POSIX_Asynch_Write_Stream::~ACE_POSIX_Asynch_Write_Stream ()
442 // *********************************************************************
444 ACE_POSIX_Asynch_Read_File_Result::ACE_POSIX_Asynch_Read_File_Result
445 (const ACE_Handler::Proxy_Ptr &handler_proxy,
446 ACE_HANDLE handle,
447 ACE_Message_Block &message_block,
448 size_t bytes_to_read,
449 const void* act,
450 u_long offset,
451 u_long offset_high,
452 ACE_HANDLE event,
453 int priority,
454 int signal_number)
455 : ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
456 handle,
457 message_block,
458 bytes_to_read,
459 act,
460 event,
461 priority,
462 signal_number)
464 this->aio_offset = offset;
466 // @@ Use aiocb64??
468 ACE_UNUSED_ARG (offset_high);
471 void
472 ACE_POSIX_Asynch_Read_File_Result::complete (size_t bytes_transferred,
473 int success,
474 const void *completion_key,
475 u_long error)
477 // Copy all the data.
478 this->bytes_transferred_ = bytes_transferred;
479 this->success_ = success;
480 this->completion_key_ = completion_key;
481 this->error_ = error;
483 // <errno> is available in the aiocb.
484 ACE_UNUSED_ARG (error);
486 // Appropriately move the pointers in the message block.
487 this->message_block_.wr_ptr (bytes_transferred);
489 // Create the interface result class.
490 ACE_Asynch_Read_File::Result result (this);
492 // Call the application handler.
493 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
494 if (handler != 0)
495 handler->handle_read_file (result);
498 ACE_POSIX_Asynch_Read_File_Result::~ACE_POSIX_Asynch_Read_File_Result ()
502 // *********************************************************************
504 ACE_POSIX_Asynch_Read_File::ACE_POSIX_Asynch_Read_File (ACE_POSIX_Proactor *posix_proactor)
505 : ACE_POSIX_Asynch_Read_Stream (posix_proactor)
510 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
511 size_t bytes_to_read,
512 u_long offset,
513 u_long offset_high,
514 const void *act,
515 int priority,
516 int signal_number)
518 size_t space = message_block.space ();
519 if ( bytes_to_read > space )
520 bytes_to_read=space;
522 if ( bytes_to_read == 0 )
523 ACELIB_ERROR_RETURN
524 ((LM_ERROR,
525 ACE_TEXT ("ACE_POSIX_Asynch_Read_File::read:")
526 ACE_TEXT ("Attempt to read 0 bytes or no space in the message block\n")),
527 -1);
529 ACE_POSIX_Asynch_Read_File_Result *result = 0;
530 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
531 ACE_NEW_RETURN (result,
532 ACE_POSIX_Asynch_Read_File_Result (this->handler_proxy_,
533 this->handle_,
534 message_block,
535 bytes_to_read,
536 act,
537 offset,
538 offset_high,
539 posix_proactor ()->get_handle (),
540 priority,
541 signal_number),
542 -1);
544 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
545 if (return_val == -1)
546 delete result;
548 return return_val;
551 ACE_POSIX_Asynch_Read_File::~ACE_POSIX_Asynch_Read_File ()
556 ACE_POSIX_Asynch_Read_File::read (ACE_Message_Block &message_block,
557 size_t bytes_to_read,
558 const void *act,
559 int priority,
560 int signal_number)
562 return ACE_POSIX_Asynch_Read_Stream::read (message_block,
563 bytes_to_read,
564 act,
565 priority,
566 signal_number);
569 // ************************************************************
571 ACE_POSIX_Asynch_Write_File_Result::ACE_POSIX_Asynch_Write_File_Result
572 (const ACE_Handler::Proxy_Ptr &handler_proxy,
573 ACE_HANDLE handle,
574 ACE_Message_Block &message_block,
575 size_t bytes_to_write,
576 const void* act,
577 u_long offset,
578 u_long offset_high,
579 ACE_HANDLE event,
580 int priority,
581 int signal_number)
582 : ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
583 handle,
584 message_block,
585 bytes_to_write,
586 act,
587 event,
588 priority,
589 signal_number)
591 this->aio_offset = offset;
593 // @@ Support offset_high with aiocb64.
595 ACE_UNUSED_ARG (offset_high);
598 void
599 ACE_POSIX_Asynch_Write_File_Result::complete (size_t bytes_transferred,
600 int success,
601 const void *completion_key,
602 u_long error)
604 // Copy the data.
605 this->bytes_transferred_ = bytes_transferred;
606 this->success_ = success;
607 this->completion_key_ = completion_key;
608 this->error_ = error;
610 // <error> is available in <aio_resultp.aio_error>
611 ACE_UNUSED_ARG (error);
613 // Appropriately move the pointers in the message block.
614 this->message_block_.rd_ptr (bytes_transferred);
616 // Create the interface result class.
617 ACE_Asynch_Write_File::Result result (this);
619 // Call the application handler.
620 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
621 if (handler != 0)
622 handler->handle_write_file (result);
625 ACE_POSIX_Asynch_Write_File_Result::~ACE_POSIX_Asynch_Write_File_Result ()
629 // *********************************************************************
631 ACE_POSIX_Asynch_Write_File::ACE_POSIX_Asynch_Write_File (ACE_POSIX_Proactor *posix_proactor)
632 : ACE_POSIX_Asynch_Write_Stream (posix_proactor)
637 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
638 size_t bytes_to_write,
639 u_long offset,
640 u_long offset_high,
641 const void *act,
642 int priority,
643 int signal_number)
645 size_t len = message_block.length ();
646 if (bytes_to_write > len)
647 bytes_to_write = len;
649 if (bytes_to_write == 0)
650 ACELIB_ERROR_RETURN
651 ((LM_ERROR,
652 ACE_TEXT ("ACE_POSIX_Asynch_Write_File::write:")
653 ACE_TEXT ("Attempt to write 0 bytes\n")),
654 -1);
656 ACE_POSIX_Asynch_Write_File_Result *result = 0;
657 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
658 ACE_NEW_RETURN (result,
659 ACE_POSIX_Asynch_Write_File_Result (this->handler_proxy_,
660 this->handle_,
661 message_block,
662 bytes_to_write,
663 act,
664 offset,
665 offset_high,
666 proactor->get_handle (),
667 priority,
668 signal_number),
669 -1);
671 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
672 if (return_val == -1)
673 delete result;
675 return return_val;
678 ACE_POSIX_Asynch_Write_File::~ACE_POSIX_Asynch_Write_File ()
683 ACE_POSIX_Asynch_Write_File::write (ACE_Message_Block &message_block,
684 size_t bytes_to_write,
685 const void *act,
686 int priority,
687 int signal_number)
689 return ACE_POSIX_Asynch_Write_Stream::write (message_block,
690 bytes_to_write,
691 act,
692 priority,
693 signal_number);
696 // *********************************************************************
699 size_t
700 ACE_POSIX_Asynch_Accept_Result::bytes_to_read () const
702 return this->aio_nbytes;
705 ACE_Message_Block &
706 ACE_POSIX_Asynch_Accept_Result::message_block () const
708 return this->message_block_;
711 ACE_HANDLE
712 ACE_POSIX_Asynch_Accept_Result::listen_handle () const
714 return this->listen_handle_;
717 ACE_HANDLE
718 ACE_POSIX_Asynch_Accept_Result::accept_handle () const
720 return this->aio_fildes;
723 ACE_POSIX_Asynch_Accept_Result::ACE_POSIX_Asynch_Accept_Result
724 (const ACE_Handler::Proxy_Ptr &handler_proxy,
725 ACE_HANDLE listen_handle,
726 ACE_HANDLE accept_handle,
727 ACE_Message_Block &message_block,
728 size_t bytes_to_read,
729 const void* act,
730 ACE_HANDLE event,
731 int priority,
732 int signal_number)
734 : ACE_POSIX_Asynch_Result
735 (handler_proxy, act, event, 0, 0, priority, signal_number),
736 message_block_ (message_block),
737 listen_handle_ (listen_handle)
739 this->aio_fildes = accept_handle;
740 this->aio_nbytes = bytes_to_read;
743 void
744 ACE_POSIX_Asynch_Accept_Result::complete (size_t bytes_transferred,
745 int success,
746 const void *completion_key,
747 u_long error)
749 // Copy the data.
750 this->bytes_transferred_ = bytes_transferred;
751 this->success_ = success;
752 this->completion_key_ = completion_key;
753 this->error_ = error;
755 // Appropriately move the pointers in the message block.
756 this->message_block_.wr_ptr (bytes_transferred);
758 // Create the interface result class.
759 ACE_Asynch_Accept::Result result (this);
761 // Call the application handler.
762 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
763 if (handler != 0)
764 handler->handle_accept (result);
767 ACE_POSIX_Asynch_Accept_Result::~ACE_POSIX_Asynch_Accept_Result ()
771 // *********************************************************************
773 ACE_POSIX_Asynch_Accept::ACE_POSIX_Asynch_Accept (ACE_POSIX_Proactor * posix_proactor)
774 : ACE_POSIX_Asynch_Operation (posix_proactor),
775 flg_open_ (false)
779 ACE_POSIX_Asynch_Accept::~ACE_POSIX_Asynch_Accept ()
781 this->close ();
782 this->reactor (0); // to avoid purge_pending_notifications
785 ACE_HANDLE
786 ACE_POSIX_Asynch_Accept::get_handle () const
788 return this->handle_;
791 void
792 ACE_POSIX_Asynch_Accept::set_handle (ACE_HANDLE handle)
794 ACE_ASSERT (handle_ == ACE_INVALID_HANDLE);
795 this->handle_ = handle;
799 ACE_POSIX_Asynch_Accept::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
800 ACE_HANDLE handle,
801 const void *completion_key,
802 ACE_Proactor *proactor)
804 ACE_TRACE ("ACE_POSIX_Asynch_Accept::open");
806 // if we are already opened,
807 // we could not create a new handler without closing the previous
808 if (this->flg_open_)
809 ACELIB_ERROR_RETURN ((LM_ERROR,
810 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::open:")
811 ACE_TEXT("acceptor already open\n")),
812 -1);
814 if (-1 == ACE_POSIX_Asynch_Operation::open (handler_proxy,
815 handle,
816 completion_key,
817 proactor))
818 return -1;
820 flg_open_ = true;
822 ACE_Asynch_Pseudo_Task & task =
823 this->posix_proactor ()->get_asynch_pseudo_task ();
825 if (-1 == task.register_io_handler (this->get_handle(),
826 this,
827 ACE_Event_Handler::ACCEPT_MASK,
828 1)) // suspend after register
830 this->flg_open_= false;
831 this->handle_ = ACE_INVALID_HANDLE;
832 return -1 ;
835 return 0;
839 ACE_POSIX_Asynch_Accept::accept (ACE_Message_Block &message_block,
840 size_t bytes_to_read,
841 ACE_HANDLE accept_handle,
842 const void *act,
843 int priority,
844 int signal_number,
845 int addr_family)
847 ACE_TRACE ("ACE_POSIX_Asynch_Accept::accept");
849 if (!this->flg_open_)
850 ACELIB_ERROR_RETURN ((LM_ERROR,
851 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Accept::accept")
852 ACE_TEXT("acceptor was not opened before\n")),
853 -1);
855 // Sanity check: make sure that enough space has been allocated by
856 // the caller.
857 size_t address_size = sizeof (sockaddr_in);
858 #if defined (ACE_HAS_IPV6)
859 if (addr_family == AF_INET6)
860 address_size = sizeof (sockaddr_in6);
861 #else
862 ACE_UNUSED_ARG (addr_family);
863 #endif
864 size_t available_space = message_block.space ();
865 size_t space_needed = bytes_to_read + 2 * address_size;
867 if (available_space < space_needed)
869 ACE_OS::last_error (ENOBUFS);
870 return -1;
873 // Common code for both WIN and POSIX.
874 // Create future Asynch_Accept_Result
875 ACE_POSIX_Asynch_Accept_Result *result = 0;
876 ACE_NEW_RETURN (result,
877 ACE_POSIX_Asynch_Accept_Result (this->handler_proxy_,
878 this->handle_,
879 accept_handle,
880 message_block,
881 bytes_to_read,
882 act,
883 this->posix_proactor()->get_handle (),
884 priority,
885 signal_number),
886 -1);
888 // Enqueue result
890 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
891 if (this->result_queue_.enqueue_tail (result) == -1)
893 ACELIB_ERROR ((LM_ERROR,
894 ACE_TEXT ("ACE_POSIX_Asynch_Accept::accept: %p\n")
895 ACE_TEXT ("enqueue_tail")));
896 delete result; // to avoid memory leak
897 return -1;
900 if (this->result_queue_.size () > 1)
901 return 0;
904 // If this is the only item, then it means there the set was empty
905 // before. So enable the accept handle in the reactor.
907 ACE_Asynch_Pseudo_Task & task =
908 this->posix_proactor ()->get_asynch_pseudo_task ();
910 return task.resume_io_handler (this->get_handle ());
913 //@@ New method cancel_uncompleted
914 // It performs cancellation of all pending requests
916 // Parameter flg_notify can be
917 // 0 - don't send notifications about canceled accepts
918 // !0 - notify user about canceled accepts
919 // according POSIX standards we should receive notifications
920 // on canceled AIO requests
922 // Return value : number of cancelled requests
926 ACE_POSIX_Asynch_Accept::cancel_uncompleted (int flg_notify)
928 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel_uncompleted");
930 int retval = 0;
932 for (; ; retval++)
934 ACE_POSIX_Asynch_Accept_Result* result = 0;
936 this->result_queue_.dequeue_head (result);
938 if (result == 0)
939 break;
941 if (this->flg_open_ == 0 || flg_notify == 0) //if we should not notify
942 delete result ; // we have to delete result
943 else //else notify as any cancelled AIO
945 // Store the new handle.
946 result->aio_fildes = ACE_INVALID_HANDLE ;
947 result->set_bytes_transferred (0);
948 result->set_error (ECANCELED);
950 if (this->posix_proactor ()->post_completion (result) == -1)
951 ACELIB_ERROR ((LM_ERROR,
952 ACE_TEXT("(%P | %t):%p\n"),
953 ACE_TEXT("ACE_POSIX_Asynch_Accept::")
954 ACE_TEXT("cancel_uncompleted")
958 return retval;
962 ACE_POSIX_Asynch_Accept::cancel ()
964 ACE_TRACE ("ACE_POSIX_Asynch_Accept::cancel");
966 // Since this is not a real POSIX asynch I/O operation, we can't
967 // call ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
968 // We delegate real cancelation to cancel_uncompleted (1)
970 int rc = -1 ; // ERRORS
973 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
975 int num_cancelled = cancel_uncompleted (flg_open_);
977 if (num_cancelled == 0)
978 rc = 1 ; // AIO_ALLDONE
979 else if (num_cancelled > 0)
980 rc = 0 ; // AIO_CANCELED
982 if (!this->flg_open_)
983 return rc ;
986 ACE_Asynch_Pseudo_Task & task =
987 this->posix_proactor ()->get_asynch_pseudo_task ();
989 task.suspend_io_handler (this->get_handle());
990 return 0;
994 ACE_POSIX_Asynch_Accept::close ()
996 ACE_TRACE ("ACE_POSIX_Asynch_Accept::close");
998 // 1. It performs cancellation of all pending requests
999 // 2. Removes itself from Reactor ( ACE_Asynch_Pseudo_Task)
1000 // 3. close the socket
1002 // Parameter flg_notify can be
1003 // 0 - don't send notifications about canceled accepts
1004 // !0 - notify user about canceled accepts
1005 // according POSIX standards we should receive notifications
1006 // on canceled AIO requests
1008 // Return codes : 0 - OK ,
1009 // -1 - Errors
1012 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
1013 this->cancel_uncompleted (flg_open_);
1016 if (!this->flg_open_)
1018 if (this->handle_ != ACE_INVALID_HANDLE)
1020 ACE_OS::closesocket (this->handle_);
1021 this->handle_ = ACE_INVALID_HANDLE;
1023 return 0;
1026 if (this->handle_ == ACE_INVALID_HANDLE)
1027 return 0;
1029 ACE_Asynch_Pseudo_Task & task =
1030 this->posix_proactor ()->get_asynch_pseudo_task ();
1032 task.remove_io_handler (this->get_handle ());
1033 if (this->handle_ != ACE_INVALID_HANDLE)
1035 ACE_OS::closesocket (this->handle_);
1036 this->handle_ = ACE_INVALID_HANDLE;
1039 this->flg_open_ = false;
1041 return 0;
1045 ACE_POSIX_Asynch_Accept::handle_close (ACE_HANDLE, ACE_Reactor_Mask)
1047 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_close");
1049 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
1051 // handle_close is called in two cases:
1052 // 1. Pseudo task is closing (i.e. proactor destructor)
1053 // 2. The listen handle is closed (we don't have exclusive access to this)
1055 this->cancel_uncompleted (0);
1057 this->flg_open_ = false;
1058 this->handle_ = ACE_INVALID_HANDLE;
1059 return 0;
1063 ACE_POSIX_Asynch_Accept::handle_input (ACE_HANDLE /* fd */)
1065 ACE_TRACE ("ACE_POSIX_Asynch_Accept::handle_input");
1067 // An <accept> has been sensed on the <listen_handle>. We should be
1068 // able to just go ahead and do the <accept> now on this <fd>. This
1069 // should be the same as the <listen_handle>.
1071 ACE_POSIX_Asynch_Accept_Result* result = 0;
1074 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
1076 // Deregister this info pertaining to this accept call.
1077 if (this->result_queue_.dequeue_head (result) != 0)
1078 ACELIB_ERROR ((LM_ERROR,
1079 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1080 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input:")
1081 ACE_TEXT( " dequeueing failed")));
1083 // Disable the handle in the reactor if no more accepts are pending.
1084 if (this->result_queue_.size () == 0)
1086 ACE_Asynch_Pseudo_Task & task =
1087 this->posix_proactor ()->get_asynch_pseudo_task ();
1089 task.suspend_io_handler (this->get_handle());
1093 // Issue <accept> now.
1094 // @@ We shouldnt block here since we have already done poll/select
1095 // thru reactor. But are we sure?
1097 ACE_HANDLE new_handle = ACE_OS::accept (this->handle_, 0, 0);
1099 if (result == 0) // there is nobody to notify
1101 ACE_OS::closesocket (new_handle);
1102 return 0;
1105 if (new_handle == ACE_INVALID_HANDLE)
1107 result->set_error (errno);
1108 ACELIB_ERROR ((LM_ERROR,
1109 ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1110 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
1111 ACE_TEXT("accept")));
1113 // Notify client as usual, "AIO" finished with errors
1116 // Store the new handle.
1117 result->aio_fildes = new_handle;
1119 // Notify the main process about this completion
1120 // Send the Result through the notification pipe.
1121 if (this->posix_proactor ()->post_completion (result) == -1)
1122 ACELIB_ERROR ((LM_ERROR,
1123 ACE_TEXT("Error:(%P | %t):%p\n"),
1124 ACE_TEXT("ACE_POSIX_Asynch_Accept::handle_input: ")
1125 ACE_TEXT(" <post_completion> failed")));
1127 return 0;
1130 // *********************************************************************
1132 ACE_HANDLE
1133 ACE_POSIX_Asynch_Connect_Result::connect_handle () const
1135 return this->aio_fildes;
1138 void ACE_POSIX_Asynch_Connect_Result::connect_handle (ACE_HANDLE handle)
1140 this->aio_fildes = handle;
1144 ACE_POSIX_Asynch_Connect_Result::ACE_POSIX_Asynch_Connect_Result
1145 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1146 ACE_HANDLE connect_handle,
1147 const void* act,
1148 ACE_HANDLE event,
1149 int priority,
1150 int signal_number)
1151 : ACE_POSIX_Asynch_Result
1152 (handler_proxy, act, event, 0, 0, priority, signal_number)
1154 this->aio_fildes = connect_handle;
1155 this->aio_nbytes = 0;
1158 void
1159 ACE_POSIX_Asynch_Connect_Result::complete (size_t bytes_transferred,
1160 int success,
1161 const void *completion_key,
1162 u_long error)
1164 // Copy the data.
1165 this->bytes_transferred_ = bytes_transferred;
1166 this->success_ = success;
1167 this->completion_key_ = completion_key;
1168 this->error_ = error;
1170 // Create the interface result class.
1171 ACE_Asynch_Connect::Result result (this);
1173 // Call the application handler.
1174 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
1175 if (handler != 0)
1176 handler->handle_connect (result);
1179 ACE_POSIX_Asynch_Connect_Result::~ACE_POSIX_Asynch_Connect_Result ()
1183 // *********************************************************************
1185 ACE_POSIX_Asynch_Connect::ACE_POSIX_Asynch_Connect (ACE_POSIX_Proactor * posix_proactor)
1186 : ACE_POSIX_Asynch_Operation (posix_proactor),
1187 flg_open_ (false)
1191 ACE_POSIX_Asynch_Connect::~ACE_POSIX_Asynch_Connect ()
1193 this->close ();
1194 this->reactor(0); // to avoid purge_pending_notifications
1197 ACE_HANDLE
1198 ACE_POSIX_Asynch_Connect::get_handle () const
1200 ACE_ASSERT (0);
1201 return ACE_INVALID_HANDLE;
1204 void
1205 ACE_POSIX_Asynch_Connect::set_handle (ACE_HANDLE)
1207 ACE_ASSERT (0) ;
1211 ACE_POSIX_Asynch_Connect::open (const ACE_Handler::Proxy_Ptr &handler_proxy,
1212 ACE_HANDLE handle,
1213 const void *completion_key,
1214 ACE_Proactor *proactor)
1216 ACE_TRACE ("ACE_POSIX_Asynch_Connect::open");
1218 if (this->flg_open_)
1219 return -1;
1221 //int result =
1222 ACE_POSIX_Asynch_Operation::open (handler_proxy,
1223 handle,
1224 completion_key,
1225 proactor);
1227 // Ignore result as we pass ACE_INVALID_HANDLE
1228 //if (result == -1)
1229 // return result;
1231 this->flg_open_ = true;
1233 return 0;
1237 ACE_POSIX_Asynch_Connect::connect (ACE_HANDLE connect_handle,
1238 const ACE_Addr & remote_sap,
1239 const ACE_Addr & local_sap,
1240 int reuse_addr,
1241 const void *act,
1242 int priority,
1243 int signal_number)
1245 ACE_TRACE ("ACE_POSIX_Asynch_Connect::connect");
1247 if (this->flg_open_ == 0)
1248 ACELIB_ERROR_RETURN ((LM_ERROR,
1249 ACE_TEXT("%N:%l:ACE_POSIX_Asynch_Connect::connect")
1250 ACE_TEXT("connector was not opened before\n")),
1251 -1);
1253 // Common code for both WIN and POSIX.
1254 // Create future Asynch_Connect_Result
1255 ACE_POSIX_Asynch_Connect_Result *result = 0;
1256 ACE_NEW_RETURN (result,
1257 ACE_POSIX_Asynch_Connect_Result (this->handler_proxy_,
1258 connect_handle,
1259 act,
1260 this->posix_proactor ()->get_handle (),
1261 priority,
1262 signal_number),
1263 -1);
1265 int rc = connect_i (result,
1266 remote_sap,
1267 local_sap,
1268 reuse_addr);
1270 // update handle
1271 connect_handle = result->connect_handle ();
1273 if (rc != 0)
1274 return post_result (result, true);
1276 // Enqueue result we will wait for completion
1278 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
1280 if (this->result_map_.bind (connect_handle, result) == -1)
1282 ACELIB_ERROR ((LM_ERROR,
1283 ACE_TEXT ("%N:%l:%p\n"),
1284 ACE_TEXT ("ACE_POSIX_Asynch_Connect::connect:")
1285 ACE_TEXT ("bind")));
1287 result->set_error (EFAULT);
1288 return post_result (result, true);
1292 ACE_Asynch_Pseudo_Task & task =
1293 this->posix_proactor ()->get_asynch_pseudo_task ();
1295 rc = task.register_io_handler (connect_handle,
1296 this,
1297 ACE_Event_Handler::CONNECT_MASK,
1298 0); // don't suspend after register
1299 if (rc < 0)
1302 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
1304 this->result_map_.unbind (connect_handle, result);
1306 if (result != 0)
1308 result->set_error (EFAULT);
1309 this->post_result (result, true);
1311 return -1;
1313 else
1314 result = 0;
1317 return 0;
1320 int ACE_POSIX_Asynch_Connect::post_result (ACE_POSIX_Asynch_Connect_Result * result,
1321 bool post_enable)
1323 if (this->flg_open_ && post_enable != 0)
1325 if (this->posix_proactor ()->post_completion (result) == 0)
1326 return 0;
1328 ACELIB_ERROR ((LM_ERROR,
1329 ACE_TEXT("Error:(%P | %t):%p\n"),
1330 ACE_TEXT("ACE_POSIX_Asynch_Connect::post_result: ")
1331 ACE_TEXT(" <post_completion> failed")));
1334 ACE_HANDLE handle = result->connect_handle ();
1336 if (handle != ACE_INVALID_HANDLE)
1337 ACE_OS::closesocket (handle);
1339 delete result;
1341 return -1;
1344 //connect_i
1345 // return code :
1346 // -1 errors before attempt to connect
1347 // 0 connect started
1348 // 1 connect finished ( may be unsuccessfully)
1351 ACE_POSIX_Asynch_Connect::connect_i (ACE_POSIX_Asynch_Connect_Result *result,
1352 const ACE_Addr & remote_sap,
1353 const ACE_Addr & local_sap,
1354 int reuse_addr)
1356 result->set_bytes_transferred (0);
1358 ACE_HANDLE handle = result->connect_handle ();
1360 if (handle == ACE_INVALID_HANDLE)
1362 int protocol_family = remote_sap.get_type ();
1364 handle = ACE_OS::socket (protocol_family,
1365 SOCK_STREAM,
1367 // save it
1368 result->connect_handle (handle);
1369 if (handle == ACE_INVALID_HANDLE)
1371 result->set_error (errno);
1372 ACELIB_ERROR_RETURN
1373 ((LM_ERROR,
1374 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1375 ACE_TEXT("socket")),
1376 -1);
1379 // Reuse the address
1380 int one = 1;
1381 if (protocol_family != PF_UNIX &&
1382 reuse_addr != 0 &&
1383 ACE_OS::setsockopt (handle,
1384 SOL_SOCKET,
1385 SO_REUSEADDR,
1386 (const char*) &one,
1387 sizeof one) == -1 )
1389 result->set_error (errno);
1390 ACELIB_ERROR_RETURN
1391 ((LM_ERROR,
1392 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1393 ACE_TEXT("setsockopt")),
1394 -1);
1398 if (local_sap != ACE_Addr::sap_any)
1400 sockaddr * laddr = reinterpret_cast<sockaddr *> (local_sap.get_addr ());
1401 size_t size = local_sap.get_size ();
1403 if (ACE_OS::bind (handle, laddr, size) == -1)
1405 result->set_error (errno);
1406 ACELIB_ERROR_RETURN
1407 ((LM_ERROR,
1408 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n"),
1409 ACE_TEXT("bind")),
1410 -1);
1414 // set non blocking mode
1415 if (ACE::set_flags (handle, ACE_NONBLOCK) != 0)
1417 result->set_error (errno);
1418 ACELIB_ERROR_RETURN
1419 ((LM_ERROR,
1420 ACE_TEXT("ACE_POSIX_Asynch_Connect::connect_i: %p\n")
1421 ACE_TEXT("set_flags")),
1422 -1);
1425 for (;;)
1427 int rc = ACE_OS::connect
1428 (handle,
1429 reinterpret_cast<sockaddr *> (remote_sap.get_addr ()),
1430 remote_sap.get_size ());
1431 if (rc < 0) // failure
1433 if (errno == EWOULDBLOCK || errno == EINPROGRESS)
1434 return 0; // connect started
1436 if (errno == EINTR)
1437 continue;
1439 result->set_error (errno);
1442 return 1 ; // connect finished
1445 ACE_NOTREACHED (return 0);
1449 //@@ New method cancel_uncompleted
1450 // It performs cancellation of all pending requests
1452 // Parameter flg_notify can be
1453 // 0 - don't send notifications about canceled accepts
1454 // !0 - notify user about canceled accepts
1455 // according POSIX standards we should receive notifications
1456 // on canceled AIO requests
1458 // Return value : number of cancelled requests
1462 ACE_POSIX_Asynch_Connect::cancel_uncompleted (bool flg_notify,
1463 ACE_Handle_Set & set)
1465 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel_uncompleted");
1467 int retval = 0;
1469 MAP_MANAGER::ITERATOR iter (result_map_);
1470 MAP_MANAGER::ENTRY * me = 0;
1472 set.reset ();
1474 for (; iter.next (me) != 0; retval++ , iter.advance ())
1476 ACE_HANDLE handle = me->ext_id_;
1477 ACE_POSIX_Asynch_Connect_Result* result = me->int_id_ ;
1479 set.set_bit (handle);
1481 result->set_bytes_transferred (0);
1482 result->set_error (ECANCELED);
1483 this->post_result (result, flg_notify);
1486 result_map_.unbind_all ();
1488 return retval;
1492 ACE_POSIX_Asynch_Connect::cancel ()
1494 ACE_TRACE ("ACE_POSIX_Asynch_Connect::cancel");
1496 // Since this is not a real asynch I/O operation, we can't just call
1497 // ::aiocancel () or ACE_POSIX_Asynch_Operation::cancel ().
1498 // Delegate real cancelation to cancel_uncompleted (1)
1500 int rc = -1 ; // ERRORS
1502 ACE_Handle_Set set;
1503 int num_cancelled = 0;
1505 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
1506 num_cancelled = cancel_uncompleted (flg_open_, set);
1508 if (num_cancelled == 0)
1509 rc = 1 ; // AIO_ALLDONE
1510 else if (num_cancelled > 0)
1511 rc = 0 ; // AIO_CANCELED
1513 if (!this->flg_open_)
1514 return rc ;
1516 ACE_Asynch_Pseudo_Task & task =
1517 this->posix_proactor ()->get_asynch_pseudo_task ();
1519 task.remove_io_handler (set);
1520 return rc;
1524 ACE_POSIX_Asynch_Connect::close ()
1526 ACE_TRACE ("ACE_POSIX_Asynch_Connect::close");
1528 ACE_Handle_Set set ;
1529 int num_cancelled = 0;
1531 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, -1));
1532 num_cancelled = cancel_uncompleted (flg_open_, set);
1535 if (num_cancelled == 0 || !this->flg_open_)
1537 this->flg_open_ = false;
1538 return 0;
1541 ACE_Asynch_Pseudo_Task & task =
1542 this->posix_proactor ()->get_asynch_pseudo_task ();
1544 task.remove_io_handler (set);
1545 this->flg_open_ = false;
1547 return 0;
1551 ACE_POSIX_Asynch_Connect::handle_output (ACE_HANDLE fd)
1553 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_output");
1555 ACE_POSIX_Asynch_Connect_Result* result = 0;
1558 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
1559 if (this->result_map_.unbind (fd, result) != 0) // not found
1560 return -1;
1563 int sockerror = 0 ;
1564 int lsockerror = sizeof sockerror;
1566 ACE_OS::getsockopt (fd,
1567 SOL_SOCKET,
1568 SO_ERROR,
1569 (char*) &sockerror,
1570 &lsockerror);
1572 result->set_bytes_transferred (0);
1573 result->set_error (sockerror);
1575 // This previously just did a "return -1" and let handle_close() clean
1576 // things up. However, this entire object may be gone as a result of
1577 // the application's completion handler, so don't count on 'this' being
1578 // legitimate on return from post_result().
1579 // remove_io_handler() contains flag DONT_CALL
1580 this->posix_proactor ()->get_asynch_pseudo_task ().remove_io_handler (fd);
1581 this->post_result (result, this->flg_open_);
1582 return 0;
1587 ACE_POSIX_Asynch_Connect::handle_close (ACE_HANDLE fd, ACE_Reactor_Mask)
1589 ACE_TRACE ("ACE_POSIX_Asynch_Connect::handle_close");
1591 ACE_Asynch_Pseudo_Task &task =
1592 this->posix_proactor ()->get_asynch_pseudo_task ();
1594 task.remove_io_handler (fd);
1596 ACE_POSIX_Asynch_Connect_Result* result = 0;
1599 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->lock_, 0));
1600 if (this->result_map_.unbind (fd, result) != 0) // not found
1601 return -1;
1604 result->set_bytes_transferred (0);
1605 result->set_error (ECANCELED);
1606 this->post_result (result, this->flg_open_);
1608 return 0;
1611 // *********************************************************************
1613 ACE_HANDLE
1614 ACE_POSIX_Asynch_Transmit_File_Result::socket () const
1616 return this->socket_;
1619 ACE_HANDLE
1620 ACE_POSIX_Asynch_Transmit_File_Result::file () const
1622 return this->aio_fildes;
1625 ACE_Asynch_Transmit_File::Header_And_Trailer *
1626 ACE_POSIX_Asynch_Transmit_File_Result::header_and_trailer () const
1628 return this->header_and_trailer_;
1631 size_t
1632 ACE_POSIX_Asynch_Transmit_File_Result::bytes_to_write () const
1634 return this->aio_nbytes;
1637 size_t
1638 ACE_POSIX_Asynch_Transmit_File_Result::bytes_per_send () const
1640 return this->bytes_per_send_;
1643 u_long
1644 ACE_POSIX_Asynch_Transmit_File_Result::flags () const
1646 return this->flags_;
1649 ACE_POSIX_Asynch_Transmit_File_Result::ACE_POSIX_Asynch_Transmit_File_Result
1650 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1651 ACE_HANDLE socket,
1652 ACE_HANDLE file,
1653 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
1654 size_t bytes_to_write,
1655 u_long offset,
1656 u_long offset_high,
1657 size_t bytes_per_send,
1658 u_long flags,
1659 const void *act,
1660 ACE_HANDLE event,
1661 int priority,
1662 int signal_number)
1663 : ACE_POSIX_Asynch_Result
1664 (handler_proxy, act, event, offset, offset_high, priority, signal_number),
1665 socket_ (socket),
1666 header_and_trailer_ (header_and_trailer),
1667 bytes_per_send_ (bytes_per_send),
1668 flags_ (flags)
1670 this->aio_fildes = file;
1671 this->aio_nbytes = bytes_to_write;
1674 void
1675 ACE_POSIX_Asynch_Transmit_File_Result::complete (size_t bytes_transferred,
1676 int success,
1677 const void *completion_key,
1678 u_long error)
1680 // Copy the data.
1681 this->bytes_transferred_ = bytes_transferred;
1682 this->success_ = success;
1683 this->completion_key_ = completion_key;
1684 this->error_ = error;
1686 // We will not do this because (a) the header and trailer blocks may
1687 // be the same message_blocks and (b) in cases of failures we have
1688 // no idea how much of what (header, data, trailer) was sent.
1690 if (this->success_ && this->header_and_trailer_ != 0)
1692 ACE_Message_Block *header = this->header_and_trailer_->header ();
1693 if (header != 0)
1694 header->rd_ptr (this->header_and_trailer_->header_bytes ());
1696 ACE_Message_Block *trailer = this->header_and_trailer_->trailer ();
1697 if (trailer != 0)
1698 trailer->rd_ptr (this->header_and_trailer_->trailer_bytes ());
1702 // Create the interface result class.
1703 ACE_Asynch_Transmit_File::Result result (this);
1705 // Call the application handler.
1706 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
1707 if (handler != 0)
1708 handler->handle_transmit_file (result);
1711 ACE_POSIX_Asynch_Transmit_File_Result::~ACE_POSIX_Asynch_Transmit_File_Result ()
1716 // *********************************************************************
1719 * @class ACE_POSIX_Asynch_Transmit_Handler
1721 * @brief Auxillary handler for doing <Asynch_Transmit_File> in
1722 * Unix. <ACE_POSIX_Asynch_Transmit_File> internally uses this.
1724 * This is a helper class for implementing
1725 * <ACE_POSIX_Asynch_Transmit_File> in Unix systems.
1727 class ACE_Export ACE_POSIX_Asynch_Transmit_Handler : public ACE_Handler
1729 public:
1730 /// Constructor. Result pointer will have all the information to do
1731 /// the file transmission (socket, file, application handler, bytes
1732 /// to write).
1733 ACE_POSIX_Asynch_Transmit_Handler (ACE_POSIX_Proactor *posix_proactor,
1734 ACE_POSIX_Asynch_Transmit_File_Result *result);
1736 /// Destructor.
1737 ~ACE_POSIX_Asynch_Transmit_Handler () override;
1739 /// Do the transmission. All the info to do the transmission is in
1740 /// the <result> member.
1741 int transmit ();
1743 protected:
1744 /// The asynch result pointer made from the initial transmit file
1745 /// request.
1746 ACE_POSIX_Asynch_Transmit_File_Result *result_;
1748 /// Message bloack used to do the transmission.
1749 ACE_Message_Block *mb_;
1751 enum ACT
1753 HEADER_ACT = 1,
1754 DATA_ACT = 2,
1755 TRAILER_ACT = 3
1758 /// ACT to transmit header.
1759 ACT header_act_;
1761 /// ACT to transmit data.
1762 ACT data_act_;
1764 /// ACT to transmit trailer.
1765 ACT trailer_act_;
1767 /// Current offset of the file being transmitted.
1768 size_t file_offset_;
1770 /// Total size of the file.
1771 size_t file_size_;
1773 /// Number of bytes transferred on the stream.
1774 size_t bytes_transferred_;
1776 /// This is called when asynchronous writes from the socket complete.
1777 void handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) override;
1779 /// This is called when asynchronous reads from the file complete.
1780 void handle_read_file (const ACE_Asynch_Read_File::Result &result) override;
1782 /// Issue asynch read from the file.
1783 int initiate_read_file ();
1785 /// To read from the file to be transmitted.
1786 ACE_POSIX_Asynch_Read_File rf_;
1788 /// Write stream to write the header, trailer and the data.
1789 ACE_POSIX_Asynch_Write_Stream ws_;
1792 // ************************************************************
1794 // Constructor.
1795 ACE_POSIX_Asynch_Transmit_Handler::ACE_POSIX_Asynch_Transmit_Handler
1796 (ACE_POSIX_Proactor *posix_proactor,
1797 ACE_POSIX_Asynch_Transmit_File_Result *result)
1798 : result_ (result),
1799 mb_ (0),
1800 header_act_ (this->HEADER_ACT),
1801 data_act_ (this->DATA_ACT),
1802 trailer_act_ (this->TRAILER_ACT),
1803 file_offset_ (result->offset ()),
1804 file_size_ (0),
1805 bytes_transferred_ (0),
1806 rf_ (posix_proactor),
1807 ws_ (posix_proactor)
1809 // Allocate memory for the message block.
1810 ACE_NEW (this->mb_,
1811 ACE_Message_Block (this->result_->bytes_per_send ()
1812 + 1));
1813 // Init the file size.
1814 file_size_ = ACE_OS::filesize (this->result_->file ());
1817 // Destructor.
1818 ACE_POSIX_Asynch_Transmit_Handler::~ACE_POSIX_Asynch_Transmit_Handler ()
1820 delete result_;
1821 mb_->release ();
1825 // Do the transmission.
1826 // Initiate transmitting the header. When that completes
1827 // handle_write_stream will be called, there start transmitting the file.
1829 ACE_POSIX_Asynch_Transmit_Handler::transmit ()
1831 // No proactor is given for the <open>'s. Because we are using the
1832 // concrete implementations of the Asynch_Operations, and we have
1833 // already given them the specific proactor, so they wont need the
1834 // general <proactor> interface pointer.
1836 // Open Asynch_Read_File.
1837 if (this->rf_.open (this->proxy (),
1838 this->result_->file (),
1840 0) == -1)
1841 ACELIB_ERROR_RETURN ((LM_ERROR,
1842 "ACE_Asynch_Transmit_Handler:read_file open failed\n"),
1843 -1);
1845 // Open Asynch_Write_Stream.
1846 if (this->ws_.open (this->proxy (),
1847 this->result_->socket (),
1849 0) == -1)
1850 ACELIB_ERROR_RETURN ((LM_ERROR,
1851 "ACE_Asynch_Transmit_Handler:write_stream open failed\n"),
1852 -1);
1854 // Transmit the header.
1855 if (this->ws_.write (*this->result_->header_and_trailer ()->header (),
1856 this->result_->header_and_trailer ()->header_bytes (),
1857 reinterpret_cast<void *> (&this->header_act_),
1858 0) == -1)
1859 ACELIB_ERROR_RETURN ((LM_ERROR,
1860 "Asynch_Transmit_Handler:transmitting header:write_stream failed\n"),
1861 -1);
1862 return 0;
1865 void
1866 ACE_POSIX_Asynch_Transmit_Handler::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
1868 // Update bytes transferred so far.
1869 this->bytes_transferred_ += result.bytes_transferred ();
1871 // Check the success parameter.
1872 if (result.success () == 0)
1874 // Failure.
1876 ACELIB_ERROR ((LM_ERROR,
1877 "Asynch_Transmit_File failed.\n"));
1879 ACE_SEH_TRY
1881 this->result_->complete (this->bytes_transferred_,
1882 0, // Failure.
1883 0, // @@ Completion key.
1884 0); // @@ Error no.
1886 ACE_SEH_FINALLY
1888 // This is crucial to prevent memory leaks. This deletes
1889 // the result pointer also.
1890 delete this;
1894 // Write stream successful.
1896 // Partial write to socket.
1897 size_t unsent_data = result.bytes_to_write () - result.bytes_transferred ();
1898 if (unsent_data != 0)
1900 ACELIB_DEBUG ((LM_DEBUG,
1901 "%N:%l:Partial write to socket: Asynch_write called again\n"));
1903 // Duplicate the message block and retry remaining data
1904 if (this->ws_.write (*result.message_block ().duplicate (),
1905 unsent_data,
1906 result.act (),
1907 this->result_->priority (),
1908 this->result_->signal_number ()) == -1)
1910 // @@ Handle this error.
1911 ACELIB_ERROR ((LM_ERROR,
1912 "Asynch_Transmit_Handler:write_stream failed\n"));
1913 return;
1916 // @@ Handling *partial write* to a socket. Let us not continue
1917 // further before this write finishes. Because proceeding with
1918 // another read and then write might change the order of the
1919 // file transmission, because partial write to the stream is
1920 // always possible.
1921 return;
1924 // Not a partial write. A full write.
1926 // Check ACT to see what was sent.
1927 ACT act = * (ACT *) result.act ();
1929 switch (act)
1931 case TRAILER_ACT:
1932 // If it is the "trailer" that is just sent, then transmit file
1933 // is complete.
1934 // Call the application handler.
1935 ACE_SEH_TRY
1937 this->result_->complete (this->bytes_transferred_,
1938 1, // @@ Success.
1939 0, // @@ Completion key.
1940 0); // @@ Errno.
1942 ACE_SEH_FINALLY
1944 delete this;
1946 break;
1948 case HEADER_ACT:
1949 case DATA_ACT:
1950 // If header/data was sent, initiate the file data transmission.
1951 if (this->initiate_read_file () == -1)
1952 // @@ Handle this error.
1953 ACELIB_ERROR ((LM_ERROR,
1954 "Error:Asynch_Transmit_Handler:read_file couldnt be initiated\n"));
1955 break;
1957 default:
1958 // @@ Handle this error.
1959 ACELIB_ERROR ((LM_ERROR,
1960 "Error:ACE_Asynch_Transmit_Handler::handle_write_stream::Unexpected act\n"));
1964 void
1965 ACE_POSIX_Asynch_Transmit_Handler::handle_read_file (const ACE_Asynch_Read_File::Result &result)
1967 // Failure.
1968 if (result.success () == 0)
1970 ACE_SEH_TRY
1972 this->result_->complete (this->bytes_transferred_,
1973 0, // Failure.
1974 0, // @@ Completion key.
1975 errno); // Error no.
1977 ACE_SEH_FINALLY
1979 delete this;
1981 return;
1984 // Read successful.
1985 if (result.bytes_transferred () == 0)
1986 return;
1988 // Increment offset.
1989 this->file_offset_ += result.bytes_transferred ();
1991 // Write data to network.
1992 if (this->ws_.write (result.message_block (),
1993 result.bytes_transferred (),
1994 (void *)&this->data_act_,
1995 this->result_->priority (),
1996 this->result_->signal_number ()) == -1)
1998 // @@ Handle this error.
1999 ACELIB_ERROR ((LM_ERROR,
2000 "Error:ACE_Asynch_Transmit_File : write to the stream failed\n"));
2001 return;
2006 ACE_POSIX_Asynch_Transmit_Handler::initiate_read_file ()
2008 // Is there something to read.
2009 if (this->file_offset_ >= this->file_size_)
2011 // File is sent. Send the trailer.
2012 if (this->ws_.write (*this->result_->header_and_trailer ()->trailer (),
2013 this->result_->header_and_trailer ()->trailer_bytes (),
2014 (void *)&this->trailer_act_,
2015 this->result_->priority (),
2016 this->result_->signal_number ()) == -1)
2017 ACELIB_ERROR_RETURN ((LM_ERROR,
2018 "Error:Asynch_Transmit_Handler:write_stream writing trailer failed\n"),
2019 -1);
2020 return 0;
2022 else
2024 // @@ Is this right??
2025 // Previous reads and writes are over. For the new read, adjust
2026 // the wr_ptr and the rd_ptr to the beginning.
2027 this->mb_->rd_ptr (this->mb_->base ());
2028 this->mb_->wr_ptr (this->mb_->base ());
2030 // Inititiate an asynchronous read from the file.
2031 if (this->rf_.read (*this->mb_,
2032 this->mb_->size () - 1,
2033 this->file_offset_,
2034 0, // @@ offset_high !!! if aiocb64 is used.
2035 0, // Act
2036 this->result_->priority (),
2037 this->result_->signal_number ()) == -1)
2038 ACELIB_ERROR_RETURN ((LM_ERROR,
2039 "Error:Asynch_Transmit_Handler::read from file failed\n"),
2040 -1);
2041 return 0;
2045 // *********************************************************************
2047 ACE_POSIX_Asynch_Transmit_File::ACE_POSIX_Asynch_Transmit_File (ACE_POSIX_Proactor *posix_proactor)
2048 : ACE_POSIX_Asynch_Operation (posix_proactor)
2053 ACE_POSIX_Asynch_Transmit_File::transmit_file (ACE_HANDLE file,
2054 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
2055 size_t bytes_to_write,
2056 u_long offset,
2057 u_long offset_high,
2058 size_t bytes_per_send,
2059 u_long flags,
2060 const void *act,
2061 int priority,
2062 int signal_number)
2064 // Adjust these parameters if there are default values specified.
2065 ssize_t file_size = ACE_OS::filesize (file);
2067 if (file_size == -1)
2068 ACELIB_ERROR_RETURN ((LM_ERROR,
2069 ACE_TEXT("Error:%N:%l:%p\n"),
2070 ACE_TEXT("POSIX_Asynch_Transmit_File:filesize failed")),
2071 -1);
2073 if (bytes_to_write == 0)
2074 bytes_to_write = file_size;
2076 if (offset > (size_t) file_size)
2077 ACELIB_ERROR_RETURN ((LM_ERROR,
2078 ACE_TEXT("Error:%p\n"),
2079 ACE_TEXT("Asynch_Transmit_File:File size is less than offset")),
2080 -1);
2082 if (offset != 0)
2083 bytes_to_write = file_size - offset + 1;
2085 if (bytes_per_send == 0)
2086 bytes_per_send = bytes_to_write;
2088 // Configure the result parameter.
2089 ACE_POSIX_Asynch_Transmit_File_Result *result = 0;
2091 ACE_NEW_RETURN (result,
2092 ACE_POSIX_Asynch_Transmit_File_Result (this->handler_proxy_,
2093 this->handle_,
2094 file,
2095 header_and_trailer,
2096 bytes_to_write,
2097 offset,
2098 offset_high,
2099 bytes_per_send,
2100 flags,
2101 act,
2102 this->posix_proactor ()->get_handle (),
2103 priority,
2104 signal_number),
2105 -1);
2107 // Make the auxillary handler and initiate transmit.
2108 ACE_POSIX_Asynch_Transmit_Handler *transmit_handler = 0;
2110 ACE_NEW_RETURN (transmit_handler,
2111 ACE_POSIX_Asynch_Transmit_Handler (this->posix_proactor (),
2112 result),
2113 -1);
2115 ssize_t return_val = transmit_handler->transmit ();
2117 if (return_val == -1)
2118 // This deletes the <result> in it too.
2119 delete transmit_handler;
2121 return 0;
2124 ACE_POSIX_Asynch_Transmit_File::~ACE_POSIX_Asynch_Transmit_File ()
2128 // *********************************************************************
2129 size_t
2130 ACE_POSIX_Asynch_Read_Dgram_Result::bytes_to_read () const
2132 return this->bytes_to_read_;
2136 ACE_POSIX_Asynch_Read_Dgram_Result::remote_address (ACE_Addr& addr) const
2138 int retVal = -1; // failure
2140 // make sure the addresses are of the same type
2141 if (addr.get_type () == this->remote_address_->get_type ())
2142 { // copy the remote_address_ into addr
2143 addr.set_addr (this->remote_address_->get_addr (),
2144 this->remote_address_->get_size ());
2145 retVal = 0; // success
2148 return retVal;
2151 sockaddr *
2152 ACE_POSIX_Asynch_Read_Dgram_Result::saddr () const
2154 return (sockaddr *) this->remote_address_->get_addr ();
2159 ACE_POSIX_Asynch_Read_Dgram_Result::flags () const
2161 return this->flags_;
2164 ACE_HANDLE
2165 ACE_POSIX_Asynch_Read_Dgram_Result::handle () const
2167 return this->handle_;
2170 ACE_Message_Block*
2171 ACE_POSIX_Asynch_Read_Dgram_Result::message_block () const
2173 return this->message_block_;
2176 ACE_POSIX_Asynch_Read_Dgram_Result::ACE_POSIX_Asynch_Read_Dgram_Result
2177 (const ACE_Handler::Proxy_Ptr &handler_proxy,
2178 ACE_HANDLE handle,
2179 ACE_Message_Block *message_block,
2180 size_t bytes_to_read,
2181 int flags,
2182 int protocol_family,
2183 const void* act,
2184 ACE_HANDLE event,
2185 int priority,
2186 int signal_number)
2187 : ACE_POSIX_Asynch_Result
2188 (handler_proxy, act, event, 0, 0, priority, signal_number),
2189 bytes_to_read_ (bytes_to_read),
2190 message_block_ (message_block),
2191 remote_address_ (0),
2192 addr_len_ (0),
2193 flags_ (flags),
2194 handle_ (handle)
2196 ACE_UNUSED_ARG (protocol_family);
2197 this->aio_fildes = handle;
2198 this->aio_buf = message_block->wr_ptr ();
2199 this->aio_nbytes = bytes_to_read;
2200 ACE_NEW (this->remote_address_, ACE_INET_Addr);
2203 void
2204 ACE_POSIX_Asynch_Read_Dgram_Result::complete (size_t bytes_transferred,
2205 int success,
2206 const void *completion_key,
2207 u_long error)
2209 // Copy the data which was returned by GetQueuedCompletionStatus
2210 this->bytes_transferred_ = bytes_transferred;
2211 this->success_ = success;
2212 this->completion_key_ = completion_key;
2213 this->error_ = error;
2215 // Appropriately move the pointers in the message block.
2216 this->message_block_->wr_ptr (bytes_transferred);
2218 // <errno> is available in the aiocb.
2219 ACE_UNUSED_ARG (error);
2221 this->remote_address_->set_size(this->addr_len_);
2223 // Create the interface result class.
2224 ACE_Asynch_Read_Dgram::Result result (this);
2226 // Call the application handler.
2227 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
2228 if (handler != 0)
2229 handler->handle_read_dgram (result);
2232 ACE_POSIX_Asynch_Read_Dgram_Result::~ACE_POSIX_Asynch_Read_Dgram_Result ()
2234 delete this->remote_address_;
2237 //***************************************************************************
2238 size_t
2239 ACE_POSIX_Asynch_Write_Dgram_Result::bytes_to_write () const
2241 return this->bytes_to_write_;
2245 ACE_POSIX_Asynch_Write_Dgram_Result::flags () const
2247 return this->flags_;
2250 ACE_HANDLE
2251 ACE_POSIX_Asynch_Write_Dgram_Result::handle () const
2253 return this->handle_;
2257 ACE_Message_Block*
2258 ACE_POSIX_Asynch_Write_Dgram_Result::message_block () const
2260 return this->message_block_;
2263 ACE_POSIX_Asynch_Write_Dgram_Result::ACE_POSIX_Asynch_Write_Dgram_Result
2264 (const ACE_Handler::Proxy_Ptr &handler_proxy,
2265 ACE_HANDLE handle,
2266 ACE_Message_Block *message_block,
2267 size_t bytes_to_write,
2268 int flags,
2269 const void* act,
2270 ACE_HANDLE event,
2271 int priority,
2272 int signal_number)
2273 : ACE_POSIX_Asynch_Result
2274 (handler_proxy, act, event, 0, 0, priority, signal_number),
2275 bytes_to_write_ (bytes_to_write),
2276 message_block_ (message_block),
2277 flags_ (flags),
2278 handle_ (handle)
2281 this->aio_fildes = handle;
2282 this->aio_buf = message_block->rd_ptr ();
2283 this->aio_nbytes = bytes_to_write;
2286 void
2287 ACE_POSIX_Asynch_Write_Dgram_Result::complete (size_t bytes_transferred,
2288 int success,
2289 const void *completion_key,
2290 u_long error)
2292 // Copy the data which was returned by GetQueuedCompletionStatus
2293 this->bytes_transferred_ = bytes_transferred;
2294 this->success_ = success;
2295 this->completion_key_ = completion_key;
2296 this->error_ = error;
2298 // <errno> is available in the aiocb.
2299 ACE_UNUSED_ARG (error);
2301 // Appropriately move the pointers in the message block.
2302 this->message_block_->rd_ptr (bytes_transferred);
2304 // Create the interface result class.
2305 ACE_Asynch_Write_Dgram::Result result (this);
2307 // Call the application handler.
2308 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
2309 if (handler != 0)
2310 handler->handle_write_dgram (result);
2313 ACE_POSIX_Asynch_Write_Dgram_Result::~ACE_POSIX_Asynch_Write_Dgram_Result ()
2317 /***************************************************************************/
2318 ACE_POSIX_Asynch_Read_Dgram::~ACE_POSIX_Asynch_Read_Dgram ()
2322 ssize_t
2323 ACE_POSIX_Asynch_Read_Dgram::recv (ACE_Message_Block *message_block,
2324 size_t & /*number_of_bytes_recvd*/,
2325 int flags,
2326 int protocol_family,
2327 const void *act,
2328 int priority,
2329 int signal_number)
2331 size_t space = message_block->space ();
2332 // Create the Asynch_Result.
2333 ACE_POSIX_Asynch_Read_Dgram_Result *result = 0;
2334 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
2335 ACE_NEW_RETURN (result,
2336 ACE_POSIX_Asynch_Read_Dgram_Result (this->handler_proxy_,
2337 this->handle_,
2338 message_block,
2339 space,
2340 flags,
2341 protocol_family,
2342 act,
2343 proactor->get_handle (),
2344 priority,
2345 signal_number),
2346 -1);
2348 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_READ);
2349 if (return_val == -1)
2350 delete result;
2352 return return_val;
2355 ACE_POSIX_Asynch_Read_Dgram::ACE_POSIX_Asynch_Read_Dgram (ACE_POSIX_Proactor *posix_proactor)
2356 : ACE_POSIX_Asynch_Operation (posix_proactor)
2360 //***************************************************************************
2362 ACE_POSIX_Asynch_Write_Dgram::~ACE_POSIX_Asynch_Write_Dgram ()
2366 ssize_t
2367 ACE_POSIX_Asynch_Write_Dgram::send (ACE_Message_Block *message_block,
2368 size_t &/*number_of_bytes_sent*/,
2369 int flags,
2370 const ACE_Addr &/*addr*/,
2371 const void *act,
2372 int priority,
2373 int signal_number)
2375 size_t len = message_block->length ();
2376 if (len == 0)
2377 ACELIB_ERROR_RETURN
2378 ((LM_ERROR,
2379 ACE_TEXT ("ACE_POSIX_Asynch_Write_Stream::write:")
2380 ACE_TEXT ("Attempt to write 0 bytes\n")),
2381 -1);
2383 ACE_POSIX_Asynch_Write_Dgram_Result *result = 0;
2384 ACE_POSIX_Proactor *proactor = this->posix_proactor ();
2385 ACE_NEW_RETURN (result,
2386 ACE_POSIX_Asynch_Write_Dgram_Result (this->handler_proxy_,
2387 this->handle_,
2388 message_block,
2389 len,
2390 flags,
2391 act,
2392 proactor->get_handle (),
2393 priority,
2394 signal_number),
2395 -1);
2397 int return_val = proactor->start_aio (result, ACE_POSIX_Proactor::ACE_OPCODE_WRITE);
2398 if (return_val == -1)
2399 delete result;
2401 return return_val;
2404 ACE_POSIX_Asynch_Write_Dgram::ACE_POSIX_Asynch_Write_Dgram
2405 (ACE_POSIX_Proactor *posix_proactor)
2406 : ACE_POSIX_Asynch_Operation (posix_proactor)
2410 ACE_END_VERSIONED_NAMESPACE_DECL
2412 #endif /* ACE_HAS_AIO_CALLS */