GitHub Actions: Try MSVC builds with /std:c++17 and 20
[ACE_TAO.git] / ACE / ace / POSIX_Proactor.cpp
blobe3414396b3abbac16c694705f68a09238af95271
1 #include "ace/POSIX_Proactor.h"
3 #if defined (ACE_HAS_AIO_CALLS)
5 #if !defined (__ACE_INLINE__)
6 #include "ace/POSIX_Proactor.inl"
7 #endif /* __ACE_INLINE__ */
9 # if defined (ACE_HAS_SYS_SYSTEMINFO_H)
10 # include /**/ <sys/systeminfo.h>
11 # endif /* ACE_HAS_SYS_SYSTEMINFO_H */
13 #include "ace/ACE.h"
14 #include "ace/Flag_Manip.h"
15 #include "ace/Task_T.h"
16 #include "ace/Log_Category.h"
17 #include "ace/Object_Manager.h"
18 #include "ace/OS_NS_sys_socket.h"
19 #include "ace/OS_NS_signal.h"
20 #include "ace/OS_NS_unistd.h"
22 #if defined (sun)
23 # include "ace/OS_NS_strings.h"
24 #endif /* sun */
26 // *********************************************************************
28 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
30 /**
31 * @class ACE_POSIX_Wakeup_Completion
33 * This result object is used by the <end_event_loop> of the
34 * ACE_Proactor interface to wake up all the threads blocking
35 * for completions.
37 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
39 public:
40 /// Constructor.
41 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
42 const void *act = 0,
43 ACE_HANDLE event = ACE_INVALID_HANDLE,
44 int priority = 0,
45 int signal_number = ACE_SIGRTMIN);
47 /// Destructor.
48 virtual ~ACE_POSIX_Wakeup_Completion (void);
51 /// This method calls the <handler>'s <handle_wakeup> method.
52 virtual void complete (size_t bytes_transferred = 0,
53 int success = 1,
54 const void *completion_key = 0,
55 u_long error = 0);
58 // *********************************************************************
59 ACE_POSIX_Proactor::ACE_POSIX_Proactor (void)
60 : os_id_ (ACE_OS_UNDEFINED)
62 #if defined(sun)
64 os_id_ = ACE_OS_SUN; // set family
66 char Buf [32];
68 ::memset(Buf,0,sizeof(Buf));
70 ACE_OS::sysinfo (SI_RELEASE , Buf, sizeof(Buf)-1);
72 if (ACE_OS::strcasecmp (Buf , "5.6") == 0)
73 os_id_ = ACE_OS_SUN_56;
74 else if (ACE_OS::strcasecmp (Buf , "5.7") == 0)
75 os_id_ = ACE_OS_SUN_57;
76 else if (ACE_OS::strcasecmp (Buf , "5.8") == 0)
77 os_id_ = ACE_OS_SUN_58;
79 #elif defined(HPUX)
81 os_id_ = ACE_OS_HPUX; // set family
83 #elif defined(__OpenBSD)
85 os_id_ = ACE_OS_OPENBSD; // set family
87 // do the same
89 //#else defined (LINUX, __FreeBSD__ ...)
90 //setup here os_id_
91 #endif
94 ACE_POSIX_Proactor::~ACE_POSIX_Proactor (void)
96 this->close ();
99 int
100 ACE_POSIX_Proactor::close (void)
102 return 0;
106 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
107 const void *completion_key)
109 ACE_UNUSED_ARG (handle);
110 ACE_UNUSED_ARG (completion_key);
111 return 0;
115 ACE_POSIX_Proactor::wake_up_dispatch_threads (void)
117 return 0;
121 ACE_POSIX_Proactor::close_dispatch_threads (int)
123 return 0;
126 size_t
127 ACE_POSIX_Proactor::number_of_threads (void) const
129 // @@ Implement it.
130 ACE_NOTSUP_RETURN (0);
133 void
134 ACE_POSIX_Proactor::number_of_threads (size_t threads)
136 // @@ Implement it.
137 ACE_UNUSED_ARG (threads);
140 ACE_HANDLE
141 ACE_POSIX_Proactor::get_handle (void) const
143 return ACE_INVALID_HANDLE;
146 ACE_Asynch_Read_Stream_Impl *
147 ACE_POSIX_Proactor::create_asynch_read_stream (void)
149 ACE_Asynch_Read_Stream_Impl *implementation = 0;
150 ACE_NEW_RETURN (implementation,
151 ACE_POSIX_Asynch_Read_Stream (this),
153 return implementation;
156 ACE_Asynch_Read_Stream_Result_Impl *
157 ACE_POSIX_Proactor::create_asynch_read_stream_result
158 (const ACE_Handler::Proxy_Ptr &handler_proxy,
159 ACE_HANDLE handle,
160 ACE_Message_Block &message_block,
161 size_t bytes_to_read,
162 const void* act,
163 ACE_HANDLE event,
164 int priority,
165 int signal_number)
167 ACE_Asynch_Read_Stream_Result_Impl *implementation;
168 ACE_NEW_RETURN (implementation,
169 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
170 handle,
171 message_block,
172 bytes_to_read,
173 act,
174 event,
175 priority,
176 signal_number),
178 return implementation;
182 ACE_Asynch_Write_Stream_Impl *
183 ACE_POSIX_Proactor::create_asynch_write_stream (void)
185 ACE_Asynch_Write_Stream_Impl *implementation = 0;
186 ACE_NEW_RETURN (implementation,
187 ACE_POSIX_Asynch_Write_Stream (this),
189 return implementation;
192 ACE_Asynch_Write_Stream_Result_Impl *
193 ACE_POSIX_Proactor::create_asynch_write_stream_result
194 (const ACE_Handler::Proxy_Ptr &handler_proxy,
195 ACE_HANDLE handle,
196 ACE_Message_Block &message_block,
197 size_t bytes_to_write,
198 const void* act,
199 ACE_HANDLE event,
200 int priority,
201 int signal_number)
203 ACE_Asynch_Write_Stream_Result_Impl *implementation;
204 ACE_NEW_RETURN (implementation,
205 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
206 handle,
207 message_block,
208 bytes_to_write,
209 act,
210 event,
211 priority,
212 signal_number),
214 return implementation;
218 ACE_Asynch_Read_File_Impl *
219 ACE_POSIX_Proactor::create_asynch_read_file (void)
221 ACE_Asynch_Read_File_Impl *implementation = 0;
222 ACE_NEW_RETURN (implementation,
223 ACE_POSIX_Asynch_Read_File (this),
225 return implementation;
228 ACE_Asynch_Read_File_Result_Impl *
229 ACE_POSIX_Proactor::create_asynch_read_file_result
230 (const ACE_Handler::Proxy_Ptr &handler_proxy,
231 ACE_HANDLE handle,
232 ACE_Message_Block &message_block,
233 size_t bytes_to_read,
234 const void* act,
235 u_long offset,
236 u_long offset_high,
237 ACE_HANDLE event,
238 int priority,
239 int signal_number)
241 ACE_Asynch_Read_File_Result_Impl *implementation;
242 ACE_NEW_RETURN (implementation,
243 ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
244 handle,
245 message_block,
246 bytes_to_read,
247 act,
248 offset,
249 offset_high,
250 event,
251 priority,
252 signal_number),
254 return implementation;
258 ACE_Asynch_Write_File_Impl *
259 ACE_POSIX_Proactor::create_asynch_write_file (void)
261 ACE_Asynch_Write_File_Impl *implementation = 0;
262 ACE_NEW_RETURN (implementation,
263 ACE_POSIX_Asynch_Write_File (this),
265 return implementation;
268 ACE_Asynch_Write_File_Result_Impl *
269 ACE_POSIX_Proactor::create_asynch_write_file_result
270 (const ACE_Handler::Proxy_Ptr &handler_proxy,
271 ACE_HANDLE handle,
272 ACE_Message_Block &message_block,
273 size_t bytes_to_write,
274 const void* act,
275 u_long offset,
276 u_long offset_high,
277 ACE_HANDLE event,
278 int priority,
279 int signal_number)
281 ACE_Asynch_Write_File_Result_Impl *implementation;
282 ACE_NEW_RETURN (implementation,
283 ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
284 handle,
285 message_block,
286 bytes_to_write,
287 act,
288 offset,
289 offset_high,
290 event,
291 priority,
292 signal_number),
294 return implementation;
298 ACE_Asynch_Read_Dgram_Impl *
299 ACE_POSIX_Proactor::create_asynch_read_dgram (void)
301 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
302 ACE_NEW_RETURN (implementation,
303 ACE_POSIX_Asynch_Read_Dgram (this),
305 return implementation;
308 ACE_Asynch_Read_Dgram_Result_Impl *
309 ACE_POSIX_Proactor::create_asynch_read_dgram_result
310 (const ACE_Handler::Proxy_Ptr &handler_proxy,
311 ACE_HANDLE handle,
312 ACE_Message_Block *message_block,
313 size_t bytes_to_read,
314 int flags,
315 int protocol_family,
316 const void* act,
317 ACE_HANDLE event ,
318 int priority ,
319 int signal_number)
321 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
322 ACE_NEW_RETURN (implementation,
323 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
324 handle,
325 message_block,
326 bytes_to_read,
327 flags,
328 protocol_family,
329 act,
330 event,
331 priority,
332 signal_number),
335 return implementation;
339 ACE_Asynch_Write_Dgram_Impl *
340 ACE_POSIX_Proactor::create_asynch_write_dgram (void)
342 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
343 ACE_NEW_RETURN (implementation,
344 ACE_POSIX_Asynch_Write_Dgram (this),
347 return implementation;
350 ACE_Asynch_Write_Dgram_Result_Impl *
351 ACE_POSIX_Proactor::create_asynch_write_dgram_result
352 (const ACE_Handler::Proxy_Ptr &handler_proxy,
353 ACE_HANDLE handle,
354 ACE_Message_Block *message_block,
355 size_t bytes_to_write,
356 int flags,
357 const void* act,
358 ACE_HANDLE event,
359 int priority ,
360 int signal_number)
362 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
363 ACE_NEW_RETURN (implementation,
364 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
365 handle,
366 message_block,
367 bytes_to_write,
368 flags,
369 act,
370 event,
371 priority,
372 signal_number),
375 return implementation;
379 ACE_Asynch_Accept_Impl *
380 ACE_POSIX_Proactor::create_asynch_accept (void)
382 ACE_Asynch_Accept_Impl *implementation = 0;
383 ACE_NEW_RETURN (implementation,
384 ACE_POSIX_Asynch_Accept (this),
387 return implementation;
390 ACE_Asynch_Accept_Result_Impl *
391 ACE_POSIX_Proactor::create_asynch_accept_result
392 (const ACE_Handler::Proxy_Ptr &handler_proxy,
393 ACE_HANDLE listen_handle,
394 ACE_HANDLE accept_handle,
395 ACE_Message_Block &message_block,
396 size_t bytes_to_read,
397 const void* act,
398 ACE_HANDLE event,
399 int priority,
400 int signal_number)
402 ACE_Asynch_Accept_Result_Impl *implementation;
403 ACE_NEW_RETURN (implementation,
404 ACE_POSIX_Asynch_Accept_Result (handler_proxy,
405 listen_handle,
406 accept_handle,
407 message_block,
408 bytes_to_read,
409 act,
410 event,
411 priority,
412 signal_number),
414 return implementation;
418 ACE_Asynch_Connect_Impl *
419 ACE_POSIX_Proactor::create_asynch_connect (void)
421 ACE_Asynch_Connect_Impl *implementation = 0;
422 ACE_NEW_RETURN (implementation,
423 ACE_POSIX_Asynch_Connect (this),
426 return implementation;
429 ACE_Asynch_Connect_Result_Impl *
430 ACE_POSIX_Proactor::create_asynch_connect_result
431 (const ACE_Handler::Proxy_Ptr &handler_proxy,
432 ACE_HANDLE connect_handle,
433 const void* act,
434 ACE_HANDLE event,
435 int priority,
436 int signal_number)
438 ACE_Asynch_Connect_Result_Impl *implementation;
439 ACE_NEW_RETURN (implementation,
440 ACE_POSIX_Asynch_Connect_Result (handler_proxy,
441 connect_handle,
442 act,
443 event,
444 priority,
445 signal_number),
447 return implementation;
451 ACE_Asynch_Transmit_File_Impl *
452 ACE_POSIX_Proactor::create_asynch_transmit_file (void)
454 ACE_Asynch_Transmit_File_Impl *implementation = 0;
455 ACE_NEW_RETURN (implementation,
456 ACE_POSIX_Asynch_Transmit_File (this),
458 return implementation;
461 ACE_Asynch_Transmit_File_Result_Impl *
462 ACE_POSIX_Proactor::create_asynch_transmit_file_result
463 (const ACE_Handler::Proxy_Ptr &handler_proxy,
464 ACE_HANDLE socket,
465 ACE_HANDLE file,
466 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
467 size_t bytes_to_write,
468 u_long offset,
469 u_long offset_high,
470 size_t bytes_per_send,
471 u_long flags,
472 const void *act,
473 ACE_HANDLE event,
474 int priority,
475 int signal_number)
477 ACE_Asynch_Transmit_File_Result_Impl *implementation;
478 ACE_NEW_RETURN (implementation,
479 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
480 socket,
481 file,
482 header_and_trailer,
483 bytes_to_write,
484 offset,
485 offset_high,
486 bytes_per_send,
487 flags,
488 act,
489 event,
490 priority,
491 signal_number),
493 return implementation;
496 ACE_Asynch_Result_Impl *
497 ACE_POSIX_Proactor::create_asynch_timer
498 (const ACE_Handler::Proxy_Ptr &handler_proxy,
499 const void *act,
500 const ACE_Time_Value &tv,
501 ACE_HANDLE event,
502 int priority,
503 int signal_number)
505 ACE_POSIX_Asynch_Timer *implementation;
506 ACE_NEW_RETURN (implementation,
507 ACE_POSIX_Asynch_Timer (handler_proxy,
508 act,
510 event,
511 priority,
512 signal_number),
514 return implementation;
517 void
518 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
519 size_t bytes_transferred,
520 const void */* completion_key*/,
521 u_long error)
523 ACE_SEH_TRY
525 // Call completion hook
526 asynch_result->complete (bytes_transferred,
527 error ? 0 : 1,
528 0, // No completion key.
529 error);
531 ACE_SEH_FINALLY
533 // This is crucial to prevent memory leaks
534 delete asynch_result;
539 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
541 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
543 for (int ci = 0; ci < how_many; ci++)
545 ACE_NEW_RETURN
546 (wakeup_completion,
547 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
548 -1);
549 if (this->post_completion (wakeup_completion) == -1)
550 return -1;
553 return 0;
556 ACE_POSIX_Proactor::Proactor_Type
557 ACE_POSIX_Proactor::get_impl_type (void)
559 return PROACTOR_POSIX;
564 * @class ACE_AIOCB_Notify_Pipe_Manager
566 * @brief This class manages the notify pipe of the AIOCB Proactor.
568 * This class acts as the Handler for the
569 * <Asynch_Read> operations issued on the notify pipe. This
570 * class is very useful in implementing <Asynch_Accept> operation
571 * class for the <AIOCB_Proactor>. This is also useful for
572 * implementing <post_completion> for <AIOCB_Proactor>.
574 * <AIOCB_Proactor> class issues a <Asynch_Read> on
575 * the pipe, using this class as the
576 * Handler. <POSIX_Asynch_Result *>'s are sent through the
577 * notify pipe. When <POSIX_Asynch_Result *>'s show up on the
578 * notify pipe, the <POSIX_AIOCB_Proactor> dispatches the
579 * completion of the <Asynch_Read_Stream> and calls the
580 * <handle_read_stream> of this class. This class calls
581 * <complete> on the <POSIX_Asynch_Result *> and thus calls the
582 * application handler.
583 * Handling the MessageBlock:
584 * We give this message block to read the result pointer through
585 * the notify pipe. We expect that to read 4 bytes from the
586 * notify pipe, for each <accept> call. Before giving this
587 * message block to another <accept>, we update <wr_ptr> and put
588 * it in its initial position.
590 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
592 public:
593 /// Constructor. You need the posix proactor because you need to call
594 /// <application_specific_code>
595 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
597 /// Destructor.
598 virtual ~ACE_AIOCB_Notify_Pipe_Manager (void);
600 /// Send the result pointer through the notification pipe.
601 int notify ();
603 /// This is the call back method when <Asynch_Read> from the pipe is
604 /// complete.
605 virtual void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result);
607 private:
608 /// The implementation proactor class.
609 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
611 /// Message block to get ACE_POSIX_Asynch_Result pointer from the pipe.
612 ACE_Message_Block message_block_;
614 /// Pipe for the communication between Proactor and the
615 /// Asynch_Accept/Asynch_Connect and other post_completions
616 ACE_Pipe pipe_;
618 /// To do asynch_read on the pipe.
619 ACE_POSIX_Asynch_Read_Stream read_stream_;
621 /// Default constructor. Shouldnt be called.
622 ACE_AIOCB_Notify_Pipe_Manager (void);
625 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
626 : posix_aiocb_proactor_ (posix_aiocb_proactor),
627 message_block_ (sizeof (2)),
628 read_stream_ (posix_aiocb_proactor)
630 // Open the pipe.
631 if (this->pipe_.open () == -1)
632 ACELIB_ERROR ((LM_ERROR,
633 ACE_TEXT("%N:%l:%p\n"),
634 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
635 ACE_TEXT("Open of pipe failed")));
637 // Set write side in NONBLOCK mode
638 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
640 // Set read side in BLOCK mode
641 ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
643 // Let AIOCB_Proactor know about our handle
644 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
646 // Open the read stream.
647 if (this->read_stream_.open (this->proxy (),
648 this->pipe_.read_handle (),
649 0, // Completion Key
650 0) // Proactor
651 == -1)
652 ACELIB_ERROR ((LM_ERROR,
653 ACE_TEXT("%N:%l:%p\n"),
654 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
655 ACE_TEXT("Open on Read Stream failed")));
657 // Issue an asynch_read on the read_stream of the notify pipe.
658 if (this->read_stream_.read (this->message_block_,
659 1, // enough to read 1 byte
660 0, // ACT
661 0) // Priority
662 == -1)
663 ACELIB_ERROR ((LM_ERROR,
664 ACE_TEXT("%N:%l:%p\n"),
665 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
666 ACE_TEXT("Read from pipe failed")));
669 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager (void)
671 // 1. try to cancel pending aio
672 this->read_stream_.cancel ();
674 // 2. close both handles
675 // Destuctor of ACE_Pipe does not close handles.
676 // We can not use ACE_Pipe::close() as it
677 // closes read_handle and than write_handle.
678 // In some systems close() may wait for
679 // completion for all asynch. pending requests.
680 // So we should close write_handle firstly
681 // to force read completion ( if 1. does not help )
682 // and then read_handle and not vice versa
684 ACE_HANDLE h = this->pipe_.write_handle ();
685 if (h != ACE_INVALID_HANDLE)
686 ACE_OS::closesocket (h);
688 h = this->pipe_.read_handle ();
689 if ( h != ACE_INVALID_HANDLE)
690 ACE_OS::closesocket (h);
696 ACE_AIOCB_Notify_Pipe_Manager::notify ()
698 // Send the result pointer through the pipe.
699 char char_send = 0;
700 ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
701 &char_send,
702 sizeof (char_send));
704 if (ret_val < 0)
706 if (errno != EWOULDBLOCK)
707 #if 0
708 ACELIB_ERROR ((LM_ERROR,
709 ACE_TEXT ("(%P %t):%p\n"),
710 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
711 ACE_TEXT ("Error:Writing on to notify pipe failed")));
712 #endif /* 0 */
713 return -1;
716 return 0;
719 void
720 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
721 (const ACE_Asynch_Read_Stream::Result & /*result*/)
723 // 1. Start new read to avoid pipe overflow
725 // Set the message block properly. Put the <wr_ptr> back in the
726 // initial position.
727 if (this->message_block_.length () > 0)
728 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
730 // One accept has completed. Issue a read to handle any
731 // <post_completion>s in the future.
732 if (-1 == this->read_stream_.read (this->message_block_,
733 1, // enough to read 1 byte
734 0, // ACT
735 0)) // Priority
736 ACELIB_ERROR ((LM_ERROR,
737 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
738 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
739 ACE_TEXT ("Read from pipe failed")));
742 // 2. Do the upcalls
743 // this->posix_aiocb_proactor_->process_result_queue ();
746 // Public constructor for common use.
747 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
748 : aiocb_notify_pipe_manager_ (0),
749 aiocb_list_ (0),
750 result_list_ (0),
751 aiocb_list_max_size_ (max_aio_operations),
752 aiocb_list_cur_size_ (0),
753 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
754 num_deferred_aiocb_ (0),
755 num_started_aio_ (0)
757 // Check for correct value for max_aio_operations
758 check_max_aio_num ();
760 this->create_result_aiocb_list ();
762 this->create_notify_manager ();
764 // start pseudo-asynchronous accept task
765 // one per all future acceptors
766 this->get_asynch_pseudo_task().start ();
770 // Special protected constructor for ACE_SUN_Proactor
771 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
772 ACE_POSIX_Proactor::Proactor_Type)
773 : aiocb_notify_pipe_manager_ (0),
774 aiocb_list_ (0),
775 result_list_ (0),
776 aiocb_list_max_size_ (max_aio_operations),
777 aiocb_list_cur_size_ (0),
778 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
779 num_deferred_aiocb_ (0),
780 num_started_aio_ (0)
782 //check for correct value for max_aio_operations
783 this->check_max_aio_num ();
785 this->create_result_aiocb_list ();
787 // @@ We should create Notify_Pipe_Manager in the derived class to
788 // provide correct calls for virtual functions !!!
791 // Destructor.
792 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor (void)
794 this->close();
797 ACE_POSIX_Proactor::Proactor_Type
798 ACE_POSIX_AIOCB_Proactor::get_impl_type (void)
800 return PROACTOR_AIOCB;
805 ACE_POSIX_AIOCB_Proactor::close (void)
807 // stop asynch accept task
808 this->get_asynch_pseudo_task().stop ();
810 this->delete_notify_manager ();
812 this->clear_result_queue ();
814 return this->delete_result_aiocb_list ();
817 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
819 notify_pipe_read_handle_ = h;
822 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list (void)
824 if (aiocb_list_ != 0)
825 return 0;
827 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
829 ACE_NEW_RETURN (result_list_,
830 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
831 -1);
833 // Initialize the array.
834 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
836 aiocb_list_[ai] = 0;
837 result_list_[ai] = 0;
840 return 0;
843 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list (void)
845 if (aiocb_list_ == 0) // already deleted
846 return 0;
848 size_t ai;
850 // Try to cancel all uncompleted operations; POSIX systems may have
851 // hidden system threads that still can work with our aiocbs!
852 for (ai = 0; ai < aiocb_list_max_size_; ai++)
853 if (this->aiocb_list_[ai] != 0) // active operation
854 this->cancel_aiocb (result_list_[ai]);
856 int num_pending = 0;
858 for (ai = 0; ai < aiocb_list_max_size_; ai++)
860 if (this->aiocb_list_[ai] == 0 ) // not active operation
861 continue;
863 // Get the error and return status of the aio_ operation.
864 int error_status = 0;
865 size_t transfer_count = 0;
866 int flg_completed = this->get_result_status (result_list_[ai],
867 error_status,
868 transfer_count);
870 //don't delete uncompleted AIOCB's
871 if (flg_completed == 0) // not completed !!!
873 num_pending++;
874 #if 0
875 char * errtxt = ACE_OS::strerror (error_status);
876 if (errtxt == 0)
877 errtxt ="?????????";
879 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
880 "WRITE":"READ" ;
883 ACELIB_ERROR ((LM_ERROR,
884 ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
887 error_status,
888 transfer_count,
889 errtxt));
890 #endif /* 0 */
892 else // completed , OK
894 delete this->result_list_[ai];
895 this->result_list_[ai] = 0;
896 this->aiocb_list_[ai] = 0;
900 // If it is not possible cancel some operation (num_pending > 0 ),
901 // we can do only one thing -report about this
902 // and complain about POSIX implementation.
903 // We know that we have memory leaks, but it is better than
904 // segmentation fault!
905 ACELIB_DEBUG
906 ((LM_DEBUG,
907 ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
908 ACE_TEXT(" number pending AIO=%d\n"),
909 num_pending));
911 delete [] this->aiocb_list_;
912 this->aiocb_list_ = 0;
914 delete [] this->result_list_;
915 this->result_list_ = 0;
917 return (num_pending == 0 ? 0 : -1);
918 // ?? or just always return 0;
921 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
923 #if !defined (ACE_ANDROID)
924 // Android API 23 introduced a define _POSIX_AIO_MAX 1 which gets used by _SC_AIO_MAX.
925 // Previously, without the define, the value returned was -1, which got ignored.
926 // Officially, the Android OS does not support AIO so if ACE_HAS_AIO_CALLS is defined
927 // then a 3rd party library must be in use and this check is invalid.
929 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
931 // Define max limit AIO's for concrete OS
932 // -1 means that there is no limit, but it is not true
933 // (example, SunOS 5.6)
935 if (max_os_aio_num > 0 &&
936 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
937 aiocb_list_max_size_ = max_os_aio_num;
938 #endif
940 #if defined (HPUX) || defined (__FreeBSD__)
941 // Although HPUX 11.00 allows to start 2048 AIO's for all process in
942 // system it has a limit 256 max elements for aio_suspend () It is a
943 // pity, but ...
945 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
946 if (max_os_listio_num > 0
947 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
948 aiocb_list_max_size_ = max_os_listio_num;
949 #endif /* HPUX || __FreeBSD__ */
951 // check for user-defined value
952 // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
954 if (aiocb_list_max_size_ <= 0
955 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
956 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
958 // check for max number files to open
960 int max_num_files = ACE::max_handles ();
962 if (max_num_files > 0
963 && aiocb_list_max_size_ > (unsigned long) max_num_files)
965 ACE::set_handle_limit (aiocb_list_max_size_);
967 max_num_files = ACE::max_handles ();
970 if (max_num_files > 0
971 && aiocb_list_max_size_ > (unsigned long) max_num_files)
972 aiocb_list_max_size_ = (unsigned long) max_num_files;
974 ACELIB_DEBUG ((LM_DEBUG,
975 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
976 aiocb_list_max_size_));
979 void
980 ACE_POSIX_AIOCB_Proactor::create_notify_manager (void)
982 // Remember! this issues a Asynch_Read
983 // on the notify pipe for doing the Asynch_Accept/Connect.
985 if (aiocb_notify_pipe_manager_ == 0)
986 ACE_NEW (aiocb_notify_pipe_manager_,
987 ACE_AIOCB_Notify_Pipe_Manager (this));
990 void
991 ACE_POSIX_AIOCB_Proactor::delete_notify_manager (void)
993 // We are responsible for delete as all pointers set to 0 after
994 // delete, it is save to delete twice
995 delete aiocb_notify_pipe_manager_;
996 aiocb_notify_pipe_manager_ = 0;
1000 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
1002 // Decrement <wait_time> with the amount of time spent in the method
1003 ACE_Countdown_Time countdown (&wait_time);
1004 return this->handle_events_i (wait_time.msec ());
1008 ACE_POSIX_AIOCB_Proactor::handle_events (void)
1010 return this->handle_events_i (ACE_INFINITE);
1014 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
1016 ACE_UNUSED_ARG (sig_num);
1018 return this->aiocb_notify_pipe_manager_->notify ();
1022 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
1024 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
1026 int ret_val = this->putq_result (result);
1028 return ret_val;
1032 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
1034 // this protected method should be called with locked mutex_
1035 // we can't use GUARD as Proactor uses non-recursive mutex
1037 if (!result)
1038 return -1;
1040 int sig_num = result->signal_number ();
1041 int ret_val = this->result_queue_.enqueue_tail (result);
1043 if (ret_val == -1)
1044 ACELIB_ERROR_RETURN ((LM_ERROR,
1045 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
1046 -1);
1048 this->notify_completion (sig_num);
1050 return 0;
1053 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result (void)
1055 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
1058 ACE_POSIX_Asynch_Result* result = 0;
1060 if (this->result_queue_.dequeue_head (result) != 0)
1061 return 0;
1063 // don't waste time if queue is empty - it is normal
1064 // or check queue size before dequeue_head
1065 // ACELIB_ERROR_RETURN ((LM_ERROR,
1066 // ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1067 // ACE_TEXT("ACE_POSIX_AIOCB_Proactor::getq_result failed")),
1068 // 0);
1070 return result;
1073 int ACE_POSIX_AIOCB_Proactor::clear_result_queue (void)
1075 int ret_val = 0;
1076 ACE_POSIX_Asynch_Result* result = 0;
1078 while ((result = this->getq_result ()) != 0)
1080 delete result;
1081 ret_val++;
1084 return ret_val;
1087 int ACE_POSIX_AIOCB_Proactor::process_result_queue (void)
1089 int ret_val = 0;
1090 ACE_POSIX_Asynch_Result* result = 0;
1092 while ((result = this->getq_result ()) != 0)
1094 this->application_specific_code
1095 (result,
1096 result->bytes_transferred(), // 0, No bytes transferred.
1097 0, // No completion key.
1098 result->error()); //0, No error.
1100 ret_val++;
1103 return ret_val;
1107 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
1109 int result_suspend = 0;
1110 int retval= 0;
1112 if (milli_seconds == ACE_INFINITE)
1113 // Indefinite blocking.
1114 result_suspend = aio_suspend (aiocb_list_,
1115 aiocb_list_max_size_,
1117 else
1119 // Block on <aio_suspend> for <milli_seconds>
1120 timespec timeout;
1121 timeout.tv_sec = milli_seconds / 1000;
1122 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
1123 result_suspend = aio_suspend (aiocb_list_,
1124 aiocb_list_max_size_,
1125 &timeout);
1128 // Check for errors
1129 if (result_suspend == -1)
1131 if (errno != EAGAIN && // Timeout
1132 errno != EINTR ) // Interrupted call
1133 ACELIB_ERROR ((LM_ERROR,
1134 ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
1135 ACE_TEXT ("handle_events: aio_suspend failed")));
1136 // let continue work
1137 // we should check "post_completed" queue
1139 else
1141 size_t index = 0;
1142 size_t count = aiocb_list_max_size_; // max number to iterate
1143 int error_status = 0;
1144 size_t transfer_count = 0;
1146 for (;; retval++)
1148 ACE_POSIX_Asynch_Result *asynch_result =
1149 find_completed_aio (error_status,
1150 transfer_count,
1151 index,
1152 count);
1154 if (asynch_result == 0)
1155 break;
1157 // Call the application code.
1158 this->application_specific_code (asynch_result,
1159 transfer_count,
1160 0, // No completion key.
1161 error_status);
1165 // process post_completed results
1166 retval += this->process_result_queue ();
1168 return retval > 0 ? 1 : 0;
1172 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
1173 int &error_status,
1174 size_t &transfer_count)
1176 transfer_count = 0;
1178 // Get the error status of the aio_ operation.
1179 // The following aio_ptr anathema is required to work around a bug in an over-aggressive
1180 // optimizer in GCC 4.1.2.
1181 aiocb *aio_ptr (asynch_result);
1182 error_status = aio_error (aio_ptr);
1183 if (error_status == EINPROGRESS)
1184 return 0; // not completed
1186 ssize_t op_return = aio_return (aio_ptr);
1187 if (op_return > 0)
1188 transfer_count = static_cast<size_t> (op_return);
1189 // else transfer_count is already 0, error_status reports the error.
1190 return 1; // completed
1193 ACE_POSIX_Asynch_Result *
1194 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
1195 size_t &transfer_count,
1196 size_t &index,
1197 size_t &count)
1199 // parameter index defines initial slot to scan
1200 // parameter count tells us how many slots should we scan
1202 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
1204 ACE_POSIX_Asynch_Result *asynch_result = 0;
1206 if (num_started_aio_ == 0) // save time
1207 return 0;
1209 for (; count > 0; index++ , count--)
1211 if (index >= aiocb_list_max_size_) // like a wheel
1212 index = 0;
1214 if (aiocb_list_[index] == 0) // Dont process null blocks.
1215 continue;
1217 if (0 != this->get_result_status (result_list_[index],
1218 error_status,
1219 transfer_count)) // completed
1220 break;
1222 } // end for
1224 if (count == 0) // all processed , nothing found
1225 return 0;
1226 asynch_result = result_list_[index];
1228 aiocb_list_[index] = 0;
1229 result_list_[index] = 0;
1230 aiocb_list_cur_size_--;
1232 num_started_aio_--; // decrement count active aios
1233 index++; // for next iteration
1234 count--; // for next iteration
1236 this->start_deferred_aio ();
1237 //make attempt to start deferred AIO
1238 //It is safe as we are protected by mutex_
1240 return asynch_result;
1245 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
1246 ACE_POSIX_Proactor::Opcode op)
1248 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
1250 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
1252 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
1254 if (result == 0) // Just check the status of the list
1255 return ret_val;
1257 // Save operation code in the aiocb
1258 switch (op)
1260 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
1261 result->aio_lio_opcode = LIO_READ;
1262 break;
1264 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
1265 result->aio_lio_opcode = LIO_WRITE;
1266 break;
1268 default:
1269 ACELIB_ERROR_RETURN ((LM_ERROR,
1270 ACE_TEXT ("%N:%l:(%P|%t)::")
1271 ACE_TEXT ("start_aio: Invalid op code %d\n"),
1272 op),
1273 -1);
1276 if (ret_val != 0) // No free slot
1278 errno = EAGAIN;
1279 return -1;
1282 // Find a free slot and store.
1284 ssize_t slot = allocate_aio_slot (result);
1286 if (slot < 0)
1287 return -1;
1289 size_t index = static_cast<size_t> (slot);
1291 result_list_[index] = result; //Store result ptr anyway
1292 aiocb_list_cur_size_++;
1294 ret_val = start_aio_i (result);
1295 switch (ret_val)
1297 case 0: // started OK
1298 aiocb_list_[index] = result;
1299 return 0;
1301 case 1: // OS AIO queue overflow
1302 num_deferred_aiocb_ ++;
1303 return 0;
1305 default: // Invalid request, there is no point
1306 break; // to start it later
1309 result_list_[index] = 0;
1310 aiocb_list_cur_size_--;
1311 return -1;
1314 ssize_t
1315 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
1317 size_t i = 0;
1319 // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
1320 // so make check for ACE_AIOCB_Notify_Pipe_Manager request
1322 if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ?
1323 { // should be free,
1324 if (result_list_[i] != 0) // only 1 request
1325 { // is allowed
1326 errno = EAGAIN;
1327 ACELIB_ERROR_RETURN ((LM_ERROR,
1328 "%N:%l:(%P | %t)::\n"
1329 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1330 "internal Proactor error 0\n"),
1331 -1);
1334 else //try to find free slot as usual, but starting from 1
1336 for (i= 1; i < this->aiocb_list_max_size_; i++)
1337 if (result_list_[i] == 0)
1338 break;
1341 if (i >= this->aiocb_list_max_size_)
1342 ACELIB_ERROR_RETURN ((LM_ERROR,
1343 "%N:%l:(%P | %t)::\n"
1344 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1345 "internal Proactor error 1\n"),
1346 -1);
1348 //setup OS notification methods for this aio
1349 result->aio_sigevent.sigev_notify = SIGEV_NONE;
1351 return static_cast<ssize_t> (i);
1354 // start_aio_i has new return codes
1355 // 0 AIO was started successfully
1356 // 1 AIO was not started, OS AIO queue overflow
1357 // -1 AIO was not started, other errors
1360 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
1362 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
1364 int ret_val;
1365 const ACE_TCHAR *ptype = 0;
1367 // Start IO
1368 // The following aio_ptr anathema is required to work around a bug in
1369 // the optimizer for GCC 4.1.2
1370 aiocb * aio_ptr (result);
1371 switch (result->aio_lio_opcode )
1373 case LIO_READ :
1374 ptype = ACE_TEXT ("read ");
1375 ret_val = aio_read (aio_ptr);
1376 break;
1377 case LIO_WRITE :
1378 ptype = ACE_TEXT ("write");
1379 ret_val = aio_write (aio_ptr);
1380 break;
1381 default:
1382 ptype = ACE_TEXT ("?????");
1383 ret_val = -1;
1384 break;
1387 if (ret_val == 0)
1389 ++this->num_started_aio_;
1391 else // if (ret_val == -1)
1393 if (errno == EAGAIN || errno == ENOMEM) //Ok, it will be deferred AIO
1394 ret_val = 1;
1395 else
1396 ACELIB_ERROR ((LM_ERROR,
1397 ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
1398 ptype,
1399 ACE_TEXT ("queueing failed")));
1402 return ret_val;
1407 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
1409 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
1411 // This protected method is called from
1412 // find_completed_aio after any AIO completion
1413 // We should call this method always with locked
1414 // ACE_POSIX_AIOCB_Proactor::mutex_
1416 // It tries to start the first deferred AIO
1417 // if such exists
1419 if (num_deferred_aiocb_ == 0)
1420 return 0; // nothing to do
1422 size_t i = 0;
1424 for (i= 0; i < this->aiocb_list_max_size_; i++)
1425 if (result_list_[i] !=0 // check for
1426 && aiocb_list_[i] ==0) // deferred AIO
1427 break;
1429 if (i >= this->aiocb_list_max_size_)
1430 ACELIB_ERROR_RETURN ((LM_ERROR,
1431 "%N:%l:(%P | %t)::\n"
1432 "start_deferred_aio:"
1433 "internal Proactor error 3\n"),
1434 -1);
1436 ACE_POSIX_Asynch_Result *result = result_list_[i];
1438 int ret_val = start_aio_i (result);
1440 switch (ret_val)
1442 case 0 : //started OK , decrement count of deferred AIOs
1443 aiocb_list_[i] = result;
1444 num_deferred_aiocb_ --;
1445 return 0;
1447 case 1 :
1448 return 0; //try again later
1450 default : // Invalid Parameters , should never be
1451 break;
1454 //AL notify user
1456 result_list_[i] = 0;
1457 --aiocb_list_cur_size_;
1459 --num_deferred_aiocb_;
1461 result->set_error (errno);
1462 result->set_bytes_transferred (0);
1463 this->putq_result (result); // we are with locked mutex_ here !
1465 return -1;
1469 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
1471 // This new method should be called from
1472 // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
1473 // It scans the result_list_ and defines all AIO requests
1474 // that were issued for handle "handle"
1476 // For all deferred AIO requests with handle "handle"
1477 // it removes its from the lists and notifies user
1479 // For all running AIO requests with handle "handle"
1480 // it calls ::aio_cancel. According to the POSIX standards
1481 // we will receive ECANCELED for all ::aio_canceled AIO requests
1482 // later on return from ::aio_suspend
1484 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
1486 int num_total = 0;
1487 int num_cancelled = 0;
1490 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
1492 size_t ai = 0;
1494 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
1496 if (this->result_list_[ai] == 0) // Skip empty slot
1497 continue;
1499 if (this->result_list_[ai]->aio_fildes != handle) // Not ours
1500 continue;
1502 ++num_total;
1504 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
1506 if (this->aiocb_list_[ai] == 0) // Canceling a deferred operation
1508 num_cancelled++;
1509 this->num_deferred_aiocb_--;
1511 this->aiocb_list_[ai] = 0;
1512 this->result_list_[ai] = 0;
1513 this->aiocb_list_cur_size_--;
1515 asynch_result->set_error (ECANCELED);
1516 asynch_result->set_bytes_transferred (0);
1517 this->putq_result (asynch_result);
1518 // we are with locked mutex_ here !
1520 else // Cancel started aio
1522 int rc_cancel = this->cancel_aiocb (asynch_result);
1524 if (rc_cancel == 0) //notification in the future
1525 num_cancelled++; //it is OS responsiblity
1529 } // release mutex_
1531 if (num_total == 0)
1532 return 1; // ALLDONE
1534 if (num_cancelled == num_total)
1535 return 0; // CANCELLED
1537 return 2; // NOT CANCELLED
1541 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
1543 // This method is called from cancel_aio
1544 // to cancel a previously submitted AIO request
1545 int rc = ::aio_cancel (0, result);
1547 // Check the return value and return 0/1/2 appropriately.
1548 if (rc == AIO_CANCELED)
1549 return 0;
1550 else if (rc == AIO_ALLDONE)
1551 return 1;
1552 else // (rc == AIO_NOTCANCELED)
1553 return 2;
1557 // *********************************************************************
1559 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
1561 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
1562 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
1563 ACE_POSIX_Proactor::PROACTOR_SIG)
1565 // = Set up the mask we'll use to block waiting for SIGRTMIN. Use that
1566 // to add it to the signal mask for this thread, and also set the process
1567 // signal action to pass signal information when we want it.
1569 // Clear the signal set.
1570 ACE_OS::sigemptyset (&this->RT_completion_signals_);
1572 // Add the signal number to the signal set.
1573 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
1574 ACELIB_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
1575 ACE_TEXT ("sigaddset")));
1576 this->block_signals ();
1577 // Set up the signal action for SIGRTMIN.
1578 this->setup_signal_handler (ACE_SIGRTMIN);
1580 // we do not have to create notify manager
1581 // but we should start pseudo-asynchronous accept task
1582 // one per all future acceptors
1584 this->get_asynch_pseudo_task().start ();
1585 return;
1588 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
1589 size_t max_aio_operations)
1590 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
1591 ACE_POSIX_Proactor::PROACTOR_SIG)
1593 // = Keep <Signal_set> with the Proactor, mask all the signals and
1594 // setup signal actions for the signals in the <signal_set>.
1596 // = Keep <signal_set> with the Proactor.
1598 // Empty the signal set first.
1599 if (sigemptyset (&this->RT_completion_signals_) == -1)
1600 ACELIB_ERROR ((LM_ERROR,
1601 ACE_TEXT("Error:(%P | %t):%p\n"),
1602 ACE_TEXT("sigemptyset failed")));
1604 // For each signal number present in the <signal_set>, add it to
1605 // the signal set we use, and also set up its process signal action
1606 // to allow signal info to be passed into sigwait/sigtimedwait.
1607 int member = 0;
1608 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
1610 member = sigismember (&signal_set,
1611 si);
1612 if (member == -1)
1613 ACELIB_ERROR ((LM_ERROR,
1614 ACE_TEXT("%N:%l:(%P | %t)::%p\n"),
1615 ACE_TEXT("ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:")
1616 ACE_TEXT("sigismember failed")));
1617 else if (member == 1)
1619 sigaddset (&this->RT_completion_signals_, si);
1620 this->setup_signal_handler (si);
1624 // Mask all the signals.
1625 this->block_signals ();
1627 // we do not have to create notify manager
1628 // but we should start pseudo-asynchronous accept task
1629 // one per all future acceptors
1631 this->get_asynch_pseudo_task().start ();
1632 return;
1635 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor (void)
1637 this->close ();
1639 // @@ Enable the masked signals again.
1642 ACE_POSIX_Proactor::Proactor_Type
1643 ACE_POSIX_SIG_Proactor::get_impl_type (void)
1645 return PROACTOR_SIG;
1649 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
1651 // Decrement <wait_time> with the amount of time spent in the method
1652 ACE_Countdown_Time countdown (&wait_time);
1653 return this->handle_events_i (&wait_time);
1657 ACE_POSIX_SIG_Proactor::handle_events (void)
1659 return this->handle_events_i (0);
1663 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
1665 // Get this process id.
1666 pid_t const pid = ACE_OS::getpid ();
1667 if (pid == (pid_t) -1)
1668 ACELIB_ERROR_RETURN ((LM_ERROR,
1669 ACE_TEXT("Error:%N:%l(%P | %t):%p"),
1670 ACE_TEXT("<getpid> failed")),
1671 -1);
1673 // Set the signal information.
1674 sigval value;
1675 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1676 value.sigval_int = -1;
1677 #else
1678 value.sival_int = -1;
1679 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1681 // Queue the signal.
1682 if (sigqueue (pid, sig_num, value) == 0)
1683 return 0;
1685 if (errno != EAGAIN)
1686 ACELIB_ERROR_RETURN ((LM_ERROR,
1687 ACE_TEXT("Error:%N:%l:(%P | %t):%p\n"),
1688 ACE_TEXT("<sigqueue> failed")),
1689 -1);
1690 return -1;
1693 ACE_Asynch_Result_Impl *
1694 ACE_POSIX_SIG_Proactor::create_asynch_timer
1695 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1696 const void *act,
1697 const ACE_Time_Value &tv,
1698 ACE_HANDLE event,
1699 int priority,
1700 int signal_number)
1702 int is_member = 0;
1704 // Fix the signal number.
1705 if (signal_number == -1)
1707 int si;
1708 for (si = ACE_SIGRTMAX;
1709 (is_member == 0) && (si >= ACE_SIGRTMIN);
1710 si--)
1712 is_member = sigismember (&this->RT_completion_signals_,
1713 si);
1714 if (is_member == -1)
1715 ACELIB_ERROR_RETURN ((LM_ERROR,
1716 "%N:%l:(%P | %t)::%s\n",
1717 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
1718 "sigismember failed"),
1722 if (is_member == 0)
1723 ACELIB_ERROR_RETURN ((LM_ERROR,
1724 "Error:%N:%l:(%P | %t)::%s\n",
1725 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
1726 "Signal mask set empty"),
1728 else
1729 // + 1 to nullify loop increment.
1730 signal_number = si + 1;
1733 ACE_Asynch_Result_Impl *implementation;
1734 ACE_NEW_RETURN (implementation,
1735 ACE_POSIX_Asynch_Timer (handler_proxy,
1736 act,
1738 event,
1739 priority,
1740 signal_number),
1742 return implementation;
1745 #if 0
1746 static void
1747 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
1749 // Should never be called
1750 ACELIB_DEBUG ((LM_DEBUG,
1751 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
1752 sig_num));
1754 #endif /*if 0*/
1757 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
1759 // Set up the specified signal so that signal information will be
1760 // passed to sigwaitinfo/sigtimedwait. Don't change the default
1761 // signal handler - having a handler and waiting for the signal can
1762 // produce undefined behavior.
1764 // But can not use SIG_DFL
1765 // With SIG_DFL after delivering the first signal
1766 // SIG_DFL handler resets SA_SIGINFO flags
1767 // and we will lose all information sig_info
1768 // At least all SunOS have such behavior
1769 #if 0
1770 struct sigaction reaction;
1771 sigemptyset (&reaction.sa_mask); // Nothing else to mask.
1772 reaction.sa_flags = SA_SIGINFO; // Realtime flag.
1773 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler); // (SIG_DFL);
1774 int sigaction_return = ACE_OS::sigaction (signal_number,
1775 &reaction,
1777 if (sigaction_return == -1)
1778 ACELIB_ERROR_RETURN ((LM_ERROR,
1779 ACE_TEXT("Error:%p\n"),
1780 ACE_TEXT("Proactor couldnt do sigaction for the RT SIGNAL")),
1781 -1);
1782 #else
1783 ACE_UNUSED_ARG(signal_number);
1784 #endif
1785 return 0;
1790 ACE_POSIX_SIG_Proactor::block_signals (void) const
1792 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
1795 ssize_t
1796 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
1798 size_t i = 0;
1800 //try to find free slot as usual, starting from 0
1801 for (i = 0; i < this->aiocb_list_max_size_; i++)
1802 if (result_list_[i] == 0)
1803 break;
1805 if (i >= this->aiocb_list_max_size_)
1806 ACELIB_ERROR_RETURN ((LM_ERROR,
1807 "%N:%l:(%P | %t)::\n"
1808 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
1809 "internal Proactor error 1\n"),
1810 -1);
1812 // setup OS notification methods for this aio
1813 // store index!!, not pointer in signal info
1814 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
1815 result->aio_sigevent.sigev_signo = result->signal_number ();
1816 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1817 result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
1818 #else
1819 result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
1820 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1822 return static_cast<ssize_t> (i);
1826 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
1828 int result_sigwait = 0;
1829 siginfo_t sig_info;
1833 // Wait for the signals.
1834 if (timeout == 0)
1836 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
1837 &sig_info);
1839 else
1841 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
1842 &sig_info,
1843 timeout);
1844 if (result_sigwait == -1 && errno == EAGAIN)
1845 return 0;
1848 while (result_sigwait == -1 && errno == EINTR);
1850 if (result_sigwait == -1) // Not a timeout, not EINTR: tell caller of error
1851 return -1;
1853 // Decide what to do. We always check the completion queue since it's an
1854 // easy, quick check. What is decided here is whether to check for
1855 // I/O completions and, if so, how completely to scan.
1856 int flg_aio = 0; // 1 if AIO Completion possible
1858 size_t index = 0; // start index to scan aiocb list
1859 size_t count = 1; // max number of aiocbs to scan
1860 int error_status = 0;
1861 size_t transfer_count = 0;
1863 if (sig_info.si_code == SI_ASYNCIO || this->os_id_ == ACE_OS_SUN_56)
1865 flg_aio = 1; // AIO signal received
1866 // define index to start
1867 // nothing will happen if it contains garbage
1868 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1869 index = static_cast<size_t> (sig_info.si_value.sigval_int);
1870 #else
1871 index = static_cast<size_t> (sig_info.si_value.sival_int);
1872 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1873 // Assume we have a correctly-functioning implementation, and that
1874 // there is one I/O to process, and it's correctly specified in the
1875 // siginfo received. There are, however, some special situations
1876 // where this isn't true...
1877 if (os_id_ == ACE_OS_SUN_56) // Solaris 6
1879 // 1. Solaris 6 always loses any RT signal,
1880 // if it has more SIGQUEMAX=32 pending signals
1881 // so we should scan the whole aiocb list
1882 // 2. Moreover,it has one more bad habit
1883 // to notify aio completion
1884 // with SI_QUEUE code instead of SI_ASYNCIO, hence the
1885 // OS_SUN_56 addition to the si_code check, above.
1886 count = aiocb_list_max_size_;
1889 else if (sig_info.si_code != SI_QUEUE)
1891 // Unknown signal code.
1892 // may some other third-party libraries could send it
1893 // or message queue could also generate it !
1894 // So print the message and check our completions
1895 ACELIB_ERROR ((LM_DEBUG,
1896 ACE_TEXT ("%N:%l:(%P | %t): ")
1897 ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
1898 ACE_TEXT ("Unexpected signal code (%d) returned ")
1899 ACE_TEXT ("from sigwait; expecting %d\n"),
1900 result_sigwait, sig_info.si_code));
1901 flg_aio = 1;
1904 int ret_aio = 0;
1905 int ret_que = 0;
1907 if (flg_aio)
1908 for (;; ret_aio++)
1910 ACE_POSIX_Asynch_Result *asynch_result =
1911 find_completed_aio (error_status,
1912 transfer_count,
1913 index,
1914 count);
1916 if (asynch_result == 0)
1917 break;
1919 // Call the application code.
1920 this->application_specific_code (asynch_result,
1921 transfer_count,
1922 0, // No completion key.
1923 error_status); // Error
1926 // process post_completed results
1927 ret_que = this->process_result_queue ();
1929 // Uncomment this if you want to test
1930 // and research the behavior of you system
1931 #if 0
1932 ACELIB_DEBUG ((LM_DEBUG,
1933 "(%t) NumAIO=%d NumQueue=%d\n",
1934 ret_aio, ret_que));
1935 #endif
1937 return ret_aio + ret_que > 0 ? 1 : 0;
1940 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
1942 // *********************************************************************
1944 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
1945 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1946 const void *act,
1947 const ACE_Time_Value &tv,
1948 ACE_HANDLE event,
1949 int priority,
1950 int signal_number)
1951 : ACE_POSIX_Asynch_Result
1952 (handler_proxy, act, event, 0, 0, priority, signal_number),
1953 time_ (tv)
1957 void
1958 ACE_POSIX_Asynch_Timer::complete (size_t /* bytes_transferred */,
1959 int /* success */,
1960 const void * /* completion_key */,
1961 u_long /* error */)
1963 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
1964 if (handler != 0)
1965 handler->handle_time_out (this->time_, this->act ());
1969 // *********************************************************************
1971 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
1972 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1973 const void *act,
1974 ACE_HANDLE event,
1975 int priority,
1976 int signal_number)
1977 : ACE_Asynch_Result_Impl (),
1978 ACE_POSIX_Asynch_Result (handler_proxy,
1979 act,
1980 event,
1983 priority,
1984 signal_number)
1988 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion (void)
1992 void
1993 ACE_POSIX_Wakeup_Completion::complete (size_t /* bytes_transferred */,
1994 int /* success */,
1995 const void * /* completion_key */,
1996 u_long /* error */)
1999 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
2000 if (handler != 0)
2001 handler->handle_wakeup ();
2004 ACE_END_VERSIONED_NAMESPACE_DECL
2006 #endif /* ACE_HAS_AIO_CALLS */