Use =default for skeleton copy constructor
[ACE_TAO.git] / ACE / ace / POSIX_Proactor.cpp
blobe720c06dbe75f5d91d9c490f3aedcb6c0b7e10cc
1 #include "ace/POSIX_Proactor.h"
3 #if defined (ACE_HAS_AIO_CALLS)
5 #if !defined (__ACE_INLINE__)
6 #include "ace/POSIX_Proactor.inl"
7 #endif /* __ACE_INLINE__ */
9 #include "ace/ACE.h"
10 #include "ace/Flag_Manip.h"
11 #include "ace/Task_T.h"
12 #include "ace/Log_Category.h"
13 #include "ace/Object_Manager.h"
14 #include "ace/OS_NS_sys_socket.h"
15 #include "ace/OS_NS_signal.h"
16 #include "ace/OS_NS_unistd.h"
18 // *********************************************************************
20 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
22 /**
23 * @class ACE_POSIX_Wakeup_Completion
25 * This result object is used by the <end_event_loop> of the
26 * ACE_Proactor interface to wake up all the threads blocking
27 * for completions.
29 class ACE_POSIX_Wakeup_Completion : public ACE_POSIX_Asynch_Result
31 public:
32 /// Constructor.
33 ACE_POSIX_Wakeup_Completion (const ACE_Handler::Proxy_Ptr &handler_proxy,
34 const void *act = 0,
35 ACE_HANDLE event = ACE_INVALID_HANDLE,
36 int priority = 0,
37 int signal_number = ACE_SIGRTMIN);
39 /// Destructor.
40 ~ACE_POSIX_Wakeup_Completion () override;
43 /// This method calls the <handler>'s <handle_wakeup> method.
44 void complete (size_t bytes_transferred = 0,
45 int success = 1,
46 const void *completion_key = 0,
47 u_long error = 0) override;
50 // *********************************************************************
51 ACE_POSIX_Proactor::ACE_POSIX_Proactor ()
52 : os_id_ (ACE_OS_UNDEFINED)
54 #if defined(__OpenBSD)
55 os_id_ = ACE_OS_OPENBSD; // set family
56 // do the same
58 //#else defined (LINUX, __FreeBSD__ ...)
59 //setup here os_id_
60 #endif
63 ACE_POSIX_Proactor::~ACE_POSIX_Proactor ()
65 this->close ();
68 int
69 ACE_POSIX_Proactor::close ()
71 return 0;
74 int
75 ACE_POSIX_Proactor::register_handle (ACE_HANDLE handle,
76 const void *completion_key)
78 ACE_UNUSED_ARG (handle);
79 ACE_UNUSED_ARG (completion_key);
80 return 0;
83 int
84 ACE_POSIX_Proactor::wake_up_dispatch_threads ()
86 return 0;
89 int
90 ACE_POSIX_Proactor::close_dispatch_threads (int)
92 return 0;
95 size_t
96 ACE_POSIX_Proactor::number_of_threads () const
98 // @@ Implement it.
99 ACE_NOTSUP_RETURN (0);
102 void
103 ACE_POSIX_Proactor::number_of_threads (size_t threads)
105 // @@ Implement it.
106 ACE_UNUSED_ARG (threads);
109 ACE_HANDLE
110 ACE_POSIX_Proactor::get_handle () const
112 return ACE_INVALID_HANDLE;
115 ACE_Asynch_Read_Stream_Impl *
116 ACE_POSIX_Proactor::create_asynch_read_stream ()
118 ACE_Asynch_Read_Stream_Impl *implementation = 0;
119 ACE_NEW_RETURN (implementation,
120 ACE_POSIX_Asynch_Read_Stream (this),
122 return implementation;
125 ACE_Asynch_Read_Stream_Result_Impl *
126 ACE_POSIX_Proactor::create_asynch_read_stream_result
127 (const ACE_Handler::Proxy_Ptr &handler_proxy,
128 ACE_HANDLE handle,
129 ACE_Message_Block &message_block,
130 size_t bytes_to_read,
131 const void* act,
132 ACE_HANDLE event,
133 int priority,
134 int signal_number)
136 ACE_Asynch_Read_Stream_Result_Impl *implementation;
137 ACE_NEW_RETURN (implementation,
138 ACE_POSIX_Asynch_Read_Stream_Result (handler_proxy,
139 handle,
140 message_block,
141 bytes_to_read,
142 act,
143 event,
144 priority,
145 signal_number),
147 return implementation;
151 ACE_Asynch_Write_Stream_Impl *
152 ACE_POSIX_Proactor::create_asynch_write_stream ()
154 ACE_Asynch_Write_Stream_Impl *implementation = 0;
155 ACE_NEW_RETURN (implementation,
156 ACE_POSIX_Asynch_Write_Stream (this),
158 return implementation;
161 ACE_Asynch_Write_Stream_Result_Impl *
162 ACE_POSIX_Proactor::create_asynch_write_stream_result
163 (const ACE_Handler::Proxy_Ptr &handler_proxy,
164 ACE_HANDLE handle,
165 ACE_Message_Block &message_block,
166 size_t bytes_to_write,
167 const void* act,
168 ACE_HANDLE event,
169 int priority,
170 int signal_number)
172 ACE_Asynch_Write_Stream_Result_Impl *implementation;
173 ACE_NEW_RETURN (implementation,
174 ACE_POSIX_Asynch_Write_Stream_Result (handler_proxy,
175 handle,
176 message_block,
177 bytes_to_write,
178 act,
179 event,
180 priority,
181 signal_number),
183 return implementation;
187 ACE_Asynch_Read_File_Impl *
188 ACE_POSIX_Proactor::create_asynch_read_file ()
190 ACE_Asynch_Read_File_Impl *implementation = 0;
191 ACE_NEW_RETURN (implementation,
192 ACE_POSIX_Asynch_Read_File (this),
194 return implementation;
197 ACE_Asynch_Read_File_Result_Impl *
198 ACE_POSIX_Proactor::create_asynch_read_file_result
199 (const ACE_Handler::Proxy_Ptr &handler_proxy,
200 ACE_HANDLE handle,
201 ACE_Message_Block &message_block,
202 size_t bytes_to_read,
203 const void* act,
204 u_long offset,
205 u_long offset_high,
206 ACE_HANDLE event,
207 int priority,
208 int signal_number)
210 ACE_Asynch_Read_File_Result_Impl *implementation;
211 ACE_NEW_RETURN (implementation,
212 ACE_POSIX_Asynch_Read_File_Result (handler_proxy,
213 handle,
214 message_block,
215 bytes_to_read,
216 act,
217 offset,
218 offset_high,
219 event,
220 priority,
221 signal_number),
223 return implementation;
227 ACE_Asynch_Write_File_Impl *
228 ACE_POSIX_Proactor::create_asynch_write_file ()
230 ACE_Asynch_Write_File_Impl *implementation = 0;
231 ACE_NEW_RETURN (implementation,
232 ACE_POSIX_Asynch_Write_File (this),
234 return implementation;
237 ACE_Asynch_Write_File_Result_Impl *
238 ACE_POSIX_Proactor::create_asynch_write_file_result
239 (const ACE_Handler::Proxy_Ptr &handler_proxy,
240 ACE_HANDLE handle,
241 ACE_Message_Block &message_block,
242 size_t bytes_to_write,
243 const void* act,
244 u_long offset,
245 u_long offset_high,
246 ACE_HANDLE event,
247 int priority,
248 int signal_number)
250 ACE_Asynch_Write_File_Result_Impl *implementation;
251 ACE_NEW_RETURN (implementation,
252 ACE_POSIX_Asynch_Write_File_Result (handler_proxy,
253 handle,
254 message_block,
255 bytes_to_write,
256 act,
257 offset,
258 offset_high,
259 event,
260 priority,
261 signal_number),
263 return implementation;
267 ACE_Asynch_Read_Dgram_Impl *
268 ACE_POSIX_Proactor::create_asynch_read_dgram ()
270 ACE_Asynch_Read_Dgram_Impl *implementation = 0;
271 ACE_NEW_RETURN (implementation,
272 ACE_POSIX_Asynch_Read_Dgram (this),
274 return implementation;
277 ACE_Asynch_Read_Dgram_Result_Impl *
278 ACE_POSIX_Proactor::create_asynch_read_dgram_result
279 (const ACE_Handler::Proxy_Ptr &handler_proxy,
280 ACE_HANDLE handle,
281 ACE_Message_Block *message_block,
282 size_t bytes_to_read,
283 int flags,
284 int protocol_family,
285 const void* act,
286 ACE_HANDLE event ,
287 int priority ,
288 int signal_number)
290 ACE_Asynch_Read_Dgram_Result_Impl *implementation=0;
291 ACE_NEW_RETURN (implementation,
292 ACE_POSIX_Asynch_Read_Dgram_Result(handler_proxy,
293 handle,
294 message_block,
295 bytes_to_read,
296 flags,
297 protocol_family,
298 act,
299 event,
300 priority,
301 signal_number),
304 return implementation;
308 ACE_Asynch_Write_Dgram_Impl *
309 ACE_POSIX_Proactor::create_asynch_write_dgram ()
311 ACE_Asynch_Write_Dgram_Impl *implementation = 0;
312 ACE_NEW_RETURN (implementation,
313 ACE_POSIX_Asynch_Write_Dgram (this),
316 return implementation;
319 ACE_Asynch_Write_Dgram_Result_Impl *
320 ACE_POSIX_Proactor::create_asynch_write_dgram_result
321 (const ACE_Handler::Proxy_Ptr &handler_proxy,
322 ACE_HANDLE handle,
323 ACE_Message_Block *message_block,
324 size_t bytes_to_write,
325 int flags,
326 const void* act,
327 ACE_HANDLE event,
328 int priority ,
329 int signal_number)
331 ACE_Asynch_Write_Dgram_Result_Impl *implementation=0;
332 ACE_NEW_RETURN (implementation,
333 ACE_POSIX_Asynch_Write_Dgram_Result(handler_proxy,
334 handle,
335 message_block,
336 bytes_to_write,
337 flags,
338 act,
339 event,
340 priority,
341 signal_number),
344 return implementation;
348 ACE_Asynch_Accept_Impl *
349 ACE_POSIX_Proactor::create_asynch_accept ()
351 ACE_Asynch_Accept_Impl *implementation = 0;
352 ACE_NEW_RETURN (implementation,
353 ACE_POSIX_Asynch_Accept (this),
356 return implementation;
359 ACE_Asynch_Accept_Result_Impl *
360 ACE_POSIX_Proactor::create_asynch_accept_result
361 (const ACE_Handler::Proxy_Ptr &handler_proxy,
362 ACE_HANDLE listen_handle,
363 ACE_HANDLE accept_handle,
364 ACE_Message_Block &message_block,
365 size_t bytes_to_read,
366 const void* act,
367 ACE_HANDLE event,
368 int priority,
369 int signal_number)
371 ACE_Asynch_Accept_Result_Impl *implementation;
372 ACE_NEW_RETURN (implementation,
373 ACE_POSIX_Asynch_Accept_Result (handler_proxy,
374 listen_handle,
375 accept_handle,
376 message_block,
377 bytes_to_read,
378 act,
379 event,
380 priority,
381 signal_number),
383 return implementation;
387 ACE_Asynch_Connect_Impl *
388 ACE_POSIX_Proactor::create_asynch_connect ()
390 ACE_Asynch_Connect_Impl *implementation = 0;
391 ACE_NEW_RETURN (implementation,
392 ACE_POSIX_Asynch_Connect (this),
395 return implementation;
398 ACE_Asynch_Connect_Result_Impl *
399 ACE_POSIX_Proactor::create_asynch_connect_result
400 (const ACE_Handler::Proxy_Ptr &handler_proxy,
401 ACE_HANDLE connect_handle,
402 const void* act,
403 ACE_HANDLE event,
404 int priority,
405 int signal_number)
407 ACE_Asynch_Connect_Result_Impl *implementation;
408 ACE_NEW_RETURN (implementation,
409 ACE_POSIX_Asynch_Connect_Result (handler_proxy,
410 connect_handle,
411 act,
412 event,
413 priority,
414 signal_number),
416 return implementation;
420 ACE_Asynch_Transmit_File_Impl *
421 ACE_POSIX_Proactor::create_asynch_transmit_file ()
423 ACE_Asynch_Transmit_File_Impl *implementation = 0;
424 ACE_NEW_RETURN (implementation,
425 ACE_POSIX_Asynch_Transmit_File (this),
427 return implementation;
430 ACE_Asynch_Transmit_File_Result_Impl *
431 ACE_POSIX_Proactor::create_asynch_transmit_file_result
432 (const ACE_Handler::Proxy_Ptr &handler_proxy,
433 ACE_HANDLE socket,
434 ACE_HANDLE file,
435 ACE_Asynch_Transmit_File::Header_And_Trailer *header_and_trailer,
436 size_t bytes_to_write,
437 u_long offset,
438 u_long offset_high,
439 size_t bytes_per_send,
440 u_long flags,
441 const void *act,
442 ACE_HANDLE event,
443 int priority,
444 int signal_number)
446 ACE_Asynch_Transmit_File_Result_Impl *implementation;
447 ACE_NEW_RETURN (implementation,
448 ACE_POSIX_Asynch_Transmit_File_Result (handler_proxy,
449 socket,
450 file,
451 header_and_trailer,
452 bytes_to_write,
453 offset,
454 offset_high,
455 bytes_per_send,
456 flags,
457 act,
458 event,
459 priority,
460 signal_number),
462 return implementation;
465 ACE_Asynch_Result_Impl *
466 ACE_POSIX_Proactor::create_asynch_timer
467 (const ACE_Handler::Proxy_Ptr &handler_proxy,
468 const void *act,
469 const ACE_Time_Value &tv,
470 ACE_HANDLE event,
471 int priority,
472 int signal_number)
474 ACE_POSIX_Asynch_Timer *implementation;
475 ACE_NEW_RETURN (implementation,
476 ACE_POSIX_Asynch_Timer (handler_proxy,
477 act,
479 event,
480 priority,
481 signal_number),
483 return implementation;
486 void
487 ACE_POSIX_Proactor::application_specific_code (ACE_POSIX_Asynch_Result *asynch_result,
488 size_t bytes_transferred,
489 const void */* completion_key*/,
490 u_long error)
492 ACE_SEH_TRY
494 // Call completion hook
495 asynch_result->complete (bytes_transferred,
496 error ? 0 : 1,
497 0, // No completion key.
498 error);
500 ACE_SEH_FINALLY
502 // This is crucial to prevent memory leaks
503 delete asynch_result;
508 ACE_POSIX_Proactor::post_wakeup_completions (int how_many)
510 ACE_POSIX_Wakeup_Completion *wakeup_completion = 0;
512 for (int ci = 0; ci < how_many; ci++)
514 ACE_NEW_RETURN
515 (wakeup_completion,
516 ACE_POSIX_Wakeup_Completion (this->wakeup_handler_.proxy ()),
517 -1);
518 if (this->post_completion (wakeup_completion) == -1)
519 return -1;
522 return 0;
525 ACE_POSIX_Proactor::Proactor_Type
526 ACE_POSIX_Proactor::get_impl_type ()
528 return PROACTOR_POSIX;
533 * @class ACE_AIOCB_Notify_Pipe_Manager
535 * @brief This class manages the notify pipe of the AIOCB Proactor.
537 * This class acts as the Handler for the
538 * <Asynch_Read> operations issued on the notify pipe. This
539 * class is very useful in implementing <Asynch_Accept> operation
540 * class for the <AIOCB_Proactor>. This is also useful for
541 * implementing <post_completion> for <AIOCB_Proactor>.
543 * <AIOCB_Proactor> class issues a <Asynch_Read> on
544 * the pipe, using this class as the
545 * Handler. <POSIX_Asynch_Result *>'s are sent through the
546 * notify pipe. When <POSIX_Asynch_Result *>'s show up on the
547 * notify pipe, the <POSIX_AIOCB_Proactor> dispatches the
548 * completion of the <Asynch_Read_Stream> and calls the
549 * <handle_read_stream> of this class. This class calls
550 * <complete> on the <POSIX_Asynch_Result *> and thus calls the
551 * application handler.
552 * Handling the MessageBlock:
553 * We give this message block to read the result pointer through
554 * the notify pipe. We expect that to read 4 bytes from the
555 * notify pipe, for each <accept> call. Before giving this
556 * message block to another <accept>, we update <wr_ptr> and put
557 * it in its initial position.
559 class ACE_AIOCB_Notify_Pipe_Manager : public ACE_Handler
561 public:
562 /// Constructor. You need the posix proactor because you need to call
563 /// <application_specific_code>
564 ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor);
566 /// Destructor.
567 ~ACE_AIOCB_Notify_Pipe_Manager () override;
569 /// Send the result pointer through the notification pipe.
570 int notify ();
572 /// This is the call back method when <Asynch_Read> from the pipe is
573 /// complete.
574 void handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) override;
576 private:
577 /// The implementation proactor class.
578 ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor_;
580 /// Message block to get ACE_POSIX_Asynch_Result pointer from the pipe.
581 ACE_Message_Block message_block_;
583 /// Pipe for the communication between Proactor and the
584 /// Asynch_Accept/Asynch_Connect and other post_completions
585 ACE_Pipe pipe_;
587 /// To do asynch_read on the pipe.
588 ACE_POSIX_Asynch_Read_Stream read_stream_;
590 /// Default constructor. Shouldnt be called.
591 ACE_AIOCB_Notify_Pipe_Manager ();
594 ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager (ACE_POSIX_AIOCB_Proactor *posix_aiocb_proactor)
595 : posix_aiocb_proactor_ (posix_aiocb_proactor),
596 message_block_ (sizeof (2)),
597 read_stream_ (posix_aiocb_proactor)
599 // Open the pipe.
600 if (this->pipe_.open () == -1)
601 ACELIB_ERROR ((LM_ERROR,
602 ACE_TEXT("%N:%l:%p\n"),
603 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
604 ACE_TEXT("Open of pipe failed")));
606 // Set write side in NONBLOCK mode
607 ACE::set_flags (this->pipe_.write_handle (), ACE_NONBLOCK);
609 // Set read side in BLOCK mode
610 ACE::clr_flags (this->pipe_.read_handle (), ACE_NONBLOCK);
612 // Let AIOCB_Proactor know about our handle
613 posix_aiocb_proactor_->set_notify_handle (this->pipe_.read_handle ());
615 // Open the read stream.
616 if (this->read_stream_.open (this->proxy (),
617 this->pipe_.read_handle (),
618 0, // Completion Key
619 0) // Proactor
620 == -1)
621 ACELIB_ERROR ((LM_ERROR,
622 ACE_TEXT("%N:%l:%p\n"),
623 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
624 ACE_TEXT("Open on Read Stream failed")));
626 // Issue an asynch_read on the read_stream of the notify pipe.
627 if (this->read_stream_.read (this->message_block_,
628 1, // enough to read 1 byte
629 0, // ACT
630 0) // Priority
631 == -1)
632 ACELIB_ERROR ((LM_ERROR,
633 ACE_TEXT("%N:%l:%p\n"),
634 ACE_TEXT("ACE_AIOCB_Notify_Pipe_Manager::ACE_AIOCB_Notify_Pipe_Manager:")
635 ACE_TEXT("Read from pipe failed")));
638 ACE_AIOCB_Notify_Pipe_Manager::~ACE_AIOCB_Notify_Pipe_Manager ()
640 // 1. try to cancel pending aio
641 this->read_stream_.cancel ();
643 // 2. close both handles
644 // Destuctor of ACE_Pipe does not close handles.
645 // We can not use ACE_Pipe::close() as it
646 // closes read_handle and than write_handle.
647 // In some systems close() may wait for
648 // completion for all asynch. pending requests.
649 // So we should close write_handle firstly
650 // to force read completion ( if 1. does not help )
651 // and then read_handle and not vice versa
653 ACE_HANDLE h = this->pipe_.write_handle ();
654 if (h != ACE_INVALID_HANDLE)
655 ACE_OS::closesocket (h);
657 h = this->pipe_.read_handle ();
658 if ( h != ACE_INVALID_HANDLE)
659 ACE_OS::closesocket (h);
664 ACE_AIOCB_Notify_Pipe_Manager::notify ()
666 // Send the result pointer through the pipe.
667 char char_send = 0;
668 ssize_t ret_val = ACE::send (this->pipe_.write_handle (),
669 &char_send,
670 sizeof (char_send));
672 if (ret_val < 0)
674 if (errno != EWOULDBLOCK)
675 #if 0
676 ACELIB_ERROR ((LM_ERROR,
677 ACE_TEXT ("(%P %t):%p\n"),
678 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::notify")
679 ACE_TEXT ("Error:Writing on to notify pipe failed")));
680 #endif /* 0 */
681 return -1;
684 return 0;
687 void
688 ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream
689 (const ACE_Asynch_Read_Stream::Result & /*result*/)
691 // 1. Start new read to avoid pipe overflow
693 // Set the message block properly. Put the <wr_ptr> back in the
694 // initial position.
695 if (this->message_block_.length () > 0)
696 this->message_block_.wr_ptr (this->message_block_.rd_ptr ());
698 // One accept has completed. Issue a read to handle any
699 // <post_completion>s in the future.
700 if (-1 == this->read_stream_.read (this->message_block_,
701 1, // enough to read 1 byte
702 0, // ACT
703 0)) // Priority
704 ACELIB_ERROR ((LM_ERROR,
705 ACE_TEXT ("%N:%l:(%P | %t):%p\n"),
706 ACE_TEXT ("ACE_AIOCB_Notify_Pipe_Manager::handle_read_stream:")
707 ACE_TEXT ("Read from pipe failed")));
710 // 2. Do the upcalls
711 // this->posix_aiocb_proactor_->process_result_queue ();
714 // Public constructor for common use.
715 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations)
716 : aiocb_notify_pipe_manager_ (0),
717 aiocb_list_ (0),
718 result_list_ (0),
719 aiocb_list_max_size_ (max_aio_operations),
720 aiocb_list_cur_size_ (0),
721 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
722 num_deferred_aiocb_ (0),
723 num_started_aio_ (0)
725 // Check for correct value for max_aio_operations
726 check_max_aio_num ();
728 this->create_result_aiocb_list ();
730 this->create_notify_manager ();
732 // start pseudo-asynchronous accept task
733 // one per all future acceptors
734 this->get_asynch_pseudo_task().start ();
737 // Special protected constructor for ACE_SUN_Proactor
738 ACE_POSIX_AIOCB_Proactor::ACE_POSIX_AIOCB_Proactor (size_t max_aio_operations,
739 ACE_POSIX_Proactor::Proactor_Type)
740 : aiocb_notify_pipe_manager_ (0),
741 aiocb_list_ (0),
742 result_list_ (0),
743 aiocb_list_max_size_ (max_aio_operations),
744 aiocb_list_cur_size_ (0),
745 notify_pipe_read_handle_ (ACE_INVALID_HANDLE),
746 num_deferred_aiocb_ (0),
747 num_started_aio_ (0)
749 //check for correct value for max_aio_operations
750 this->check_max_aio_num ();
752 this->create_result_aiocb_list ();
754 // @@ We should create Notify_Pipe_Manager in the derived class to
755 // provide correct calls for virtual functions !!!
758 // Destructor.
759 ACE_POSIX_AIOCB_Proactor::~ACE_POSIX_AIOCB_Proactor ()
761 this->close();
764 ACE_POSIX_Proactor::Proactor_Type
765 ACE_POSIX_AIOCB_Proactor::get_impl_type ()
767 return PROACTOR_AIOCB;
772 ACE_POSIX_AIOCB_Proactor::close ()
774 // stop asynch accept task
775 this->get_asynch_pseudo_task().stop ();
777 this->delete_notify_manager ();
779 this->clear_result_queue ();
781 return this->delete_result_aiocb_list ();
784 void ACE_POSIX_AIOCB_Proactor::set_notify_handle (ACE_HANDLE h)
786 notify_pipe_read_handle_ = h;
789 int ACE_POSIX_AIOCB_Proactor::create_result_aiocb_list ()
791 if (aiocb_list_ != 0)
792 return 0;
794 ACE_NEW_RETURN (aiocb_list_, aiocb *[aiocb_list_max_size_], -1);
796 ACE_NEW_RETURN (result_list_,
797 ACE_POSIX_Asynch_Result *[aiocb_list_max_size_],
798 -1);
800 // Initialize the array.
801 for (size_t ai = 0; ai < this->aiocb_list_max_size_; ai++)
803 aiocb_list_[ai] = 0;
804 result_list_[ai] = 0;
807 return 0;
810 int ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list ()
812 if (aiocb_list_ == 0) // already deleted
813 return 0;
815 size_t ai;
817 // Try to cancel all uncompleted operations; POSIX systems may have
818 // hidden system threads that still can work with our aiocbs!
819 for (ai = 0; ai < aiocb_list_max_size_; ai++)
820 if (this->aiocb_list_[ai] != 0) // active operation
821 this->cancel_aiocb (result_list_[ai]);
823 int num_pending = 0;
825 for (ai = 0; ai < aiocb_list_max_size_; ai++)
827 if (this->aiocb_list_[ai] == 0 ) // not active operation
828 continue;
830 // Get the error and return status of the aio_ operation.
831 int error_status = 0;
832 size_t transfer_count = 0;
833 int flg_completed = this->get_result_status (result_list_[ai],
834 error_status,
835 transfer_count);
837 //don't delete uncompleted AIOCB's
838 if (flg_completed == 0) // not completed !!!
840 num_pending++;
841 #if 0
842 char * errtxt = ACE_OS::strerror (error_status);
843 if (errtxt == 0)
844 errtxt ="?????????";
846 char * op = (aiocb_list_[ai]->aio_lio_opcode == LIO_WRITE )?
847 "WRITE":"READ" ;
850 ACELIB_ERROR ((LM_ERROR,
851 ACE_TEXT("slot=%d op=%s status=%d xfercnt=%d %s\n"),
854 error_status,
855 transfer_count,
856 errtxt));
857 #endif /* 0 */
859 else // completed , OK
861 delete this->result_list_[ai];
862 this->result_list_[ai] = 0;
863 this->aiocb_list_[ai] = 0;
867 // If it is not possible cancel some operation (num_pending > 0 ),
868 // we can do only one thing -report about this
869 // and complain about POSIX implementation.
870 // We know that we have memory leaks, but it is better than
871 // segmentation fault!
872 ACELIB_DEBUG
873 ((LM_DEBUG,
874 ACE_TEXT("ACE_POSIX_AIOCB_Proactor::delete_result_aiocb_list\n")
875 ACE_TEXT(" number pending AIO=%d\n"),
876 num_pending));
878 delete [] this->aiocb_list_;
879 this->aiocb_list_ = 0;
881 delete [] this->result_list_;
882 this->result_list_ = 0;
884 return (num_pending == 0 ? 0 : -1);
885 // ?? or just always return 0;
888 void ACE_POSIX_AIOCB_Proactor::check_max_aio_num ()
890 #if !defined (ACE_ANDROID)
891 // Android API 23 introduced a define _POSIX_AIO_MAX 1 which gets used by _SC_AIO_MAX.
892 // Previously, without the define, the value returned was -1, which got ignored.
893 // Officially, the Android OS does not support AIO so if ACE_HAS_AIO_CALLS is defined
894 // then a 3rd party library must be in use and this check is invalid.
896 long max_os_aio_num = ACE_OS::sysconf (_SC_AIO_MAX);
898 // Define max limit AIO's for concrete OS
899 // -1 means that there is no limit, but it is not true
900 // (example, SunOS 5.6)
902 if (max_os_aio_num > 0 &&
903 aiocb_list_max_size_ > (unsigned long) max_os_aio_num)
904 aiocb_list_max_size_ = max_os_aio_num;
905 #endif
907 #if defined (__FreeBSD__)
908 long max_os_listio_num = ACE_OS::sysconf (_SC_AIO_LISTIO_MAX);
909 if (max_os_listio_num > 0
910 && aiocb_list_max_size_ > (unsigned long) max_os_listio_num)
911 aiocb_list_max_size_ = max_os_listio_num;
912 #endif /* __FreeBSD__ */
914 // check for user-defined value
915 // ACE_AIO_MAX_SIZE if defined in POSIX_Proactor.h
917 if (aiocb_list_max_size_ <= 0
918 || aiocb_list_max_size_ > ACE_AIO_MAX_SIZE)
919 aiocb_list_max_size_ = ACE_AIO_MAX_SIZE;
921 // check for max number files to open
923 int max_num_files = ACE::max_handles ();
925 if (max_num_files > 0
926 && aiocb_list_max_size_ > (unsigned long) max_num_files)
928 ACE::set_handle_limit (aiocb_list_max_size_);
930 max_num_files = ACE::max_handles ();
933 if (max_num_files > 0
934 && aiocb_list_max_size_ > (unsigned long) max_num_files)
935 aiocb_list_max_size_ = (unsigned long) max_num_files;
937 ACELIB_DEBUG ((LM_DEBUG,
938 "(%P | %t) ACE_POSIX_AIOCB_Proactor::Max Number of AIOs=%d\n",
939 aiocb_list_max_size_));
942 void
943 ACE_POSIX_AIOCB_Proactor::create_notify_manager ()
945 // Remember! this issues a Asynch_Read
946 // on the notify pipe for doing the Asynch_Accept/Connect.
948 if (aiocb_notify_pipe_manager_ == 0)
949 ACE_NEW (aiocb_notify_pipe_manager_,
950 ACE_AIOCB_Notify_Pipe_Manager (this));
953 void
954 ACE_POSIX_AIOCB_Proactor::delete_notify_manager ()
956 // We are responsible for delete as all pointers set to 0 after
957 // delete, it is save to delete twice
958 delete aiocb_notify_pipe_manager_;
959 aiocb_notify_pipe_manager_ = 0;
963 ACE_POSIX_AIOCB_Proactor::handle_events (ACE_Time_Value &wait_time)
965 // Decrement <wait_time> with the amount of time spent in the method
966 ACE_Countdown_Time countdown (&wait_time);
967 return this->handle_events_i (wait_time.msec ());
971 ACE_POSIX_AIOCB_Proactor::handle_events ()
973 return this->handle_events_i (ACE_INFINITE);
977 ACE_POSIX_AIOCB_Proactor::notify_completion(int sig_num)
979 ACE_UNUSED_ARG (sig_num);
981 return this->aiocb_notify_pipe_manager_->notify ();
985 ACE_POSIX_AIOCB_Proactor::post_completion (ACE_POSIX_Asynch_Result *result)
987 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, -1));
989 int ret_val = this->putq_result (result);
991 return ret_val;
995 ACE_POSIX_AIOCB_Proactor::putq_result (ACE_POSIX_Asynch_Result *result)
997 // this protected method should be called with locked mutex_
998 // we can't use GUARD as Proactor uses non-recursive mutex
1000 if (!result)
1001 return -1;
1003 int sig_num = result->signal_number ();
1004 int ret_val = this->result_queue_.enqueue_tail (result);
1006 if (ret_val == -1)
1007 ACELIB_ERROR_RETURN ((LM_ERROR,
1008 "%N:%l:ACE_POSIX_AIOCB_Proactor::putq_result failed\n"),
1009 -1);
1011 this->notify_completion (sig_num);
1013 return 0;
1016 ACE_POSIX_Asynch_Result * ACE_POSIX_AIOCB_Proactor::getq_result ()
1018 ACE_MT (ACE_GUARD_RETURN (ACE_SYNCH_MUTEX, ace_mon, this->mutex_, 0));
1021 ACE_POSIX_Asynch_Result* result = 0;
1023 if (this->result_queue_.dequeue_head (result) != 0)
1024 return 0;
1026 // don't waste time if queue is empty - it is normal
1027 // or check queue size before dequeue_head
1028 // ACELIB_ERROR_RETURN ((LM_ERROR,
1029 // ACE_TEXT("%N:%l:(%P | %t):%p\n"),
1030 // ACE_TEXT("ACE_POSIX_AIOCB_Proactor::getq_result failed")),
1031 // 0);
1033 return result;
1036 int ACE_POSIX_AIOCB_Proactor::clear_result_queue ()
1038 int ret_val = 0;
1039 ACE_POSIX_Asynch_Result* result = 0;
1041 while ((result = this->getq_result ()) != 0)
1043 delete result;
1044 ret_val++;
1047 return ret_val;
1050 int ACE_POSIX_AIOCB_Proactor::process_result_queue ()
1052 int ret_val = 0;
1053 ACE_POSIX_Asynch_Result* result = 0;
1055 while ((result = this->getq_result ()) != 0)
1057 this->application_specific_code
1058 (result,
1059 result->bytes_transferred(), // 0, No bytes transferred.
1060 0, // No completion key.
1061 result->error()); //0, No error.
1063 ret_val++;
1066 return ret_val;
1070 ACE_POSIX_AIOCB_Proactor::handle_events_i (u_long milli_seconds)
1072 int result_suspend = 0;
1073 int retval= 0;
1075 if (milli_seconds == ACE_INFINITE)
1076 // Indefinite blocking.
1077 result_suspend = aio_suspend (aiocb_list_,
1078 aiocb_list_max_size_,
1080 else
1082 // Block on <aio_suspend> for <milli_seconds>
1083 timespec timeout;
1084 timeout.tv_sec = milli_seconds / 1000;
1085 timeout.tv_nsec = (milli_seconds - (timeout.tv_sec * 1000)) * 1000000;
1086 result_suspend = aio_suspend (aiocb_list_,
1087 aiocb_list_max_size_,
1088 &timeout);
1091 // Check for errors
1092 if (result_suspend == -1)
1094 if (errno != EAGAIN && // Timeout
1095 errno != EINTR ) // Interrupted call
1096 ACELIB_ERROR ((LM_ERROR,
1097 ACE_TEXT ("%N:%l:(%P|%t)::%p\n"),
1098 ACE_TEXT ("handle_events: aio_suspend failed")));
1099 // let continue work
1100 // we should check "post_completed" queue
1102 else
1104 size_t index = 0;
1105 size_t count = aiocb_list_max_size_; // max number to iterate
1106 int error_status = 0;
1107 size_t transfer_count = 0;
1109 for (;; retval++)
1111 ACE_POSIX_Asynch_Result *asynch_result =
1112 find_completed_aio (error_status,
1113 transfer_count,
1114 index,
1115 count);
1117 if (asynch_result == 0)
1118 break;
1120 // Call the application code.
1121 this->application_specific_code (asynch_result,
1122 transfer_count,
1123 0, // No completion key.
1124 error_status);
1128 // process post_completed results
1129 retval += this->process_result_queue ();
1131 return retval > 0 ? 1 : 0;
1135 ACE_POSIX_AIOCB_Proactor::get_result_status (ACE_POSIX_Asynch_Result *asynch_result,
1136 int &error_status,
1137 size_t &transfer_count)
1139 transfer_count = 0;
1141 // Get the error status of the aio_ operation.
1142 // The following aio_ptr anathema is required to work around a bug in an over-aggressive
1143 // optimizer in GCC 4.1.2.
1144 aiocb *aio_ptr (asynch_result);
1145 error_status = aio_error (aio_ptr);
1146 if (error_status == EINPROGRESS)
1147 return 0; // not completed
1149 ssize_t op_return = aio_return (aio_ptr);
1150 if (op_return > 0)
1151 transfer_count = static_cast<size_t> (op_return);
1152 // else transfer_count is already 0, error_status reports the error.
1153 return 1; // completed
1156 ACE_POSIX_Asynch_Result *
1157 ACE_POSIX_AIOCB_Proactor::find_completed_aio (int &error_status,
1158 size_t &transfer_count,
1159 size_t &index,
1160 size_t &count)
1162 // parameter index defines initial slot to scan
1163 // parameter count tells us how many slots should we scan
1165 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, 0));
1167 ACE_POSIX_Asynch_Result *asynch_result = 0;
1169 if (num_started_aio_ == 0) // save time
1170 return 0;
1172 for (; count > 0; index++ , count--)
1174 if (index >= aiocb_list_max_size_) // like a wheel
1175 index = 0;
1177 if (aiocb_list_[index] == 0) // Dont process null blocks.
1178 continue;
1180 if (0 != this->get_result_status (result_list_[index],
1181 error_status,
1182 transfer_count)) // completed
1183 break;
1184 } // end for
1186 if (count == 0) // all processed , nothing found
1187 return 0;
1188 asynch_result = result_list_[index];
1190 aiocb_list_[index] = 0;
1191 result_list_[index] = 0;
1192 aiocb_list_cur_size_--;
1194 num_started_aio_--; // decrement count active aios
1195 index++; // for next iteration
1196 count--; // for next iteration
1198 this->start_deferred_aio ();
1199 //make attempt to start deferred AIO
1200 //It is safe as we are protected by mutex_
1202 return asynch_result;
1207 ACE_POSIX_AIOCB_Proactor::start_aio (ACE_POSIX_Asynch_Result *result,
1208 ACE_POSIX_Proactor::Opcode op)
1210 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio");
1212 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
1214 int ret_val = (aiocb_list_cur_size_ >= aiocb_list_max_size_) ? -1 : 0;
1216 if (result == 0) // Just check the status of the list
1217 return ret_val;
1219 // Save operation code in the aiocb
1220 switch (op)
1222 case ACE_POSIX_Proactor::ACE_OPCODE_READ:
1223 result->aio_lio_opcode = LIO_READ;
1224 break;
1226 case ACE_POSIX_Proactor::ACE_OPCODE_WRITE:
1227 result->aio_lio_opcode = LIO_WRITE;
1228 break;
1230 default:
1231 ACELIB_ERROR_RETURN ((LM_ERROR,
1232 ACE_TEXT ("%N:%l:(%P|%t)::")
1233 ACE_TEXT ("start_aio: Invalid op code %d\n"),
1234 op),
1235 -1);
1238 if (ret_val != 0) // No free slot
1240 errno = EAGAIN;
1241 return -1;
1244 // Find a free slot and store.
1246 ssize_t slot = allocate_aio_slot (result);
1248 if (slot < 0)
1249 return -1;
1251 size_t index = static_cast<size_t> (slot);
1253 result_list_[index] = result; //Store result ptr anyway
1254 aiocb_list_cur_size_++;
1256 ret_val = start_aio_i (result);
1257 switch (ret_val)
1259 case 0: // started OK
1260 aiocb_list_[index] = result;
1261 return 0;
1263 case 1: // OS AIO queue overflow
1264 num_deferred_aiocb_ ++;
1265 return 0;
1267 default: // Invalid request, there is no point
1268 break; // to start it later
1271 result_list_[index] = 0;
1272 aiocb_list_cur_size_--;
1273 return -1;
1276 ssize_t
1277 ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
1279 size_t i = 0;
1281 // we reserve zero slot for ACE_AIOCB_Notify_Pipe_Manager
1282 // so make check for ACE_AIOCB_Notify_Pipe_Manager request
1284 if (notify_pipe_read_handle_ == result->aio_fildes) // Notify_Pipe ?
1285 { // should be free,
1286 if (result_list_[i] != 0) // only 1 request
1287 { // is allowed
1288 errno = EAGAIN;
1289 ACELIB_ERROR_RETURN ((LM_ERROR,
1290 "%N:%l:(%P | %t)::\n"
1291 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1292 "internal Proactor error 0\n"),
1293 -1);
1296 else //try to find free slot as usual, but starting from 1
1298 for (i= 1; i < this->aiocb_list_max_size_; i++)
1299 if (result_list_[i] == 0)
1300 break;
1303 if (i >= this->aiocb_list_max_size_)
1304 ACELIB_ERROR_RETURN ((LM_ERROR,
1305 "%N:%l:(%P | %t)::\n"
1306 "ACE_POSIX_AIOCB_Proactor::allocate_aio_slot:"
1307 "internal Proactor error 1\n"),
1308 -1);
1310 //setup OS notification methods for this aio
1311 result->aio_sigevent.sigev_notify = SIGEV_NONE;
1313 return static_cast<ssize_t> (i);
1316 // start_aio_i has new return codes
1317 // 0 AIO was started successfully
1318 // 1 AIO was not started, OS AIO queue overflow
1319 // -1 AIO was not started, other errors
1322 ACE_POSIX_AIOCB_Proactor::start_aio_i (ACE_POSIX_Asynch_Result *result)
1324 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_aio_i");
1326 int ret_val;
1327 const ACE_TCHAR *ptype = 0;
1329 // Start IO
1330 // The following aio_ptr anathema is required to work around a bug in
1331 // the optimizer for GCC 4.1.2
1332 aiocb * aio_ptr (result);
1333 switch (result->aio_lio_opcode )
1335 case LIO_READ :
1336 ptype = ACE_TEXT ("read ");
1337 ret_val = aio_read (aio_ptr);
1338 break;
1339 case LIO_WRITE :
1340 ptype = ACE_TEXT ("write");
1341 ret_val = aio_write (aio_ptr);
1342 break;
1343 default:
1344 ptype = ACE_TEXT ("?????");
1345 ret_val = -1;
1346 break;
1349 if (ret_val == 0)
1351 ++this->num_started_aio_;
1353 else // if (ret_val == -1)
1355 if (errno == EAGAIN || errno == ENOMEM) //Ok, it will be deferred AIO
1356 ret_val = 1;
1357 else
1358 ACELIB_ERROR ((LM_ERROR,
1359 ACE_TEXT ("%N:%l:(%P | %t)::start_aio_i: aio_%s %p\n"),
1360 ptype,
1361 ACE_TEXT ("queueing failed")));
1364 return ret_val;
1369 ACE_POSIX_AIOCB_Proactor::start_deferred_aio ()
1371 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::start_deferred_aio");
1373 // This protected method is called from
1374 // find_completed_aio after any AIO completion
1375 // We should call this method always with locked
1376 // ACE_POSIX_AIOCB_Proactor::mutex_
1378 // It tries to start the first deferred AIO
1379 // if such exists
1381 if (num_deferred_aiocb_ == 0)
1382 return 0; // nothing to do
1384 size_t i = 0;
1386 for (i= 0; i < this->aiocb_list_max_size_; i++)
1387 if (result_list_[i] !=0 // check for
1388 && aiocb_list_[i] ==0) // deferred AIO
1389 break;
1391 if (i >= this->aiocb_list_max_size_)
1392 ACELIB_ERROR_RETURN ((LM_ERROR,
1393 "%N:%l:(%P | %t)::\n"
1394 "start_deferred_aio:"
1395 "internal Proactor error 3\n"),
1396 -1);
1398 ACE_POSIX_Asynch_Result *result = result_list_[i];
1400 int ret_val = start_aio_i (result);
1402 switch (ret_val)
1404 case 0 : //started OK , decrement count of deferred AIOs
1405 aiocb_list_[i] = result;
1406 num_deferred_aiocb_ --;
1407 return 0;
1409 case 1 :
1410 return 0; //try again later
1412 default : // Invalid Parameters , should never be
1413 break;
1416 //AL notify user
1418 result_list_[i] = 0;
1419 --aiocb_list_cur_size_;
1421 --num_deferred_aiocb_;
1423 result->set_error (errno);
1424 result->set_bytes_transferred (0);
1425 this->putq_result (result); // we are with locked mutex_ here !
1427 return -1;
1431 ACE_POSIX_AIOCB_Proactor::cancel_aio (ACE_HANDLE handle)
1433 // This new method should be called from
1434 // ACE_POSIX_Asynch_Operation instead of usual ::aio_cancel
1435 // It scans the result_list_ and defines all AIO requests
1436 // that were issued for handle "handle"
1438 // For all deferred AIO requests with handle "handle"
1439 // it removes its from the lists and notifies user
1441 // For all running AIO requests with handle "handle"
1442 // it calls ::aio_cancel. According to the POSIX standards
1443 // we will receive ECANCELED for all ::aio_canceled AIO requests
1444 // later on return from ::aio_suspend
1446 ACE_TRACE ("ACE_POSIX_AIOCB_Proactor::cancel_aio");
1448 int num_total = 0;
1449 int num_cancelled = 0;
1452 ACE_MT (ACE_GUARD_RETURN (ACE_Thread_Mutex, ace_mon, this->mutex_, -1));
1454 size_t ai = 0;
1456 for (ai = 0; ai < this->aiocb_list_max_size_; ai++)
1458 if (this->result_list_[ai] == 0) // Skip empty slot
1459 continue;
1461 if (this->result_list_[ai]->aio_fildes != handle) // Not ours
1462 continue;
1464 ++num_total;
1466 ACE_POSIX_Asynch_Result *asynch_result = this->result_list_[ai];
1468 if (this->aiocb_list_[ai] == 0) // Canceling a deferred operation
1470 num_cancelled++;
1471 this->num_deferred_aiocb_--;
1473 this->aiocb_list_[ai] = 0;
1474 this->result_list_[ai] = 0;
1475 this->aiocb_list_cur_size_--;
1477 asynch_result->set_error (ECANCELED);
1478 asynch_result->set_bytes_transferred (0);
1479 this->putq_result (asynch_result);
1480 // we are with locked mutex_ here !
1482 else // Cancel started aio
1484 int rc_cancel = this->cancel_aiocb (asynch_result);
1486 if (rc_cancel == 0) //notification in the future
1487 num_cancelled++; //it is OS responsiblity
1491 } // release mutex_
1493 if (num_total == 0)
1494 return 1; // ALLDONE
1496 if (num_cancelled == num_total)
1497 return 0; // CANCELLED
1499 return 2; // NOT CANCELLED
1503 ACE_POSIX_AIOCB_Proactor::cancel_aiocb (ACE_POSIX_Asynch_Result * result)
1505 // This method is called from cancel_aio
1506 // to cancel a previously submitted AIO request
1507 int rc = ::aio_cancel (0, result);
1509 // Check the return value and return 0/1/2 appropriately.
1510 if (rc == AIO_CANCELED)
1511 return 0;
1512 else if (rc == AIO_ALLDONE)
1513 return 1;
1514 else // (rc == AIO_NOTCANCELED)
1515 return 2;
1519 // *********************************************************************
1521 #if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
1523 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (size_t max_aio_operations)
1524 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
1525 ACE_POSIX_Proactor::PROACTOR_SIG)
1527 // = Set up the mask we'll use to block waiting for SIGRTMIN. Use that
1528 // to add it to the signal mask for this thread, and also set the process
1529 // signal action to pass signal information when we want it.
1531 // Clear the signal set.
1532 ACE_OS::sigemptyset (&this->RT_completion_signals_);
1534 // Add the signal number to the signal set.
1535 if (ACE_OS::sigaddset (&this->RT_completion_signals_, ACE_SIGRTMIN) == -1)
1536 ACELIB_ERROR ((LM_ERROR, ACE_TEXT ("ACE_POSIX_SIG_Proactor: %p\n"),
1537 ACE_TEXT ("sigaddset")));
1538 this->block_signals ();
1539 // Set up the signal action for SIGRTMIN.
1540 this->setup_signal_handler (ACE_SIGRTMIN);
1542 // we do not have to create notify manager
1543 // but we should start pseudo-asynchronous accept task
1544 // one per all future acceptors
1546 this->get_asynch_pseudo_task().start ();
1547 return;
1550 ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor (const sigset_t signal_set,
1551 size_t max_aio_operations)
1552 : ACE_POSIX_AIOCB_Proactor (max_aio_operations,
1553 ACE_POSIX_Proactor::PROACTOR_SIG)
1555 // = Keep <Signal_set> with the Proactor, mask all the signals and
1556 // setup signal actions for the signals in the <signal_set>.
1558 // = Keep <signal_set> with the Proactor.
1560 // Empty the signal set first.
1561 if (sigemptyset (&this->RT_completion_signals_) == -1)
1562 ACELIB_ERROR ((LM_ERROR,
1563 ACE_TEXT("Error:(%P | %t):%p\n"),
1564 ACE_TEXT("sigemptyset failed")));
1566 // For each signal number present in the <signal_set>, add it to
1567 // the signal set we use, and also set up its process signal action
1568 // to allow signal info to be passed into sigwait/sigtimedwait.
1569 int member = 0;
1570 for (int si = ACE_SIGRTMIN; si <= ACE_SIGRTMAX; si++)
1572 member = sigismember (&signal_set,
1573 si);
1574 if (member == -1)
1575 ACELIB_ERROR ((LM_ERROR,
1576 ACE_TEXT("%N:%l:(%P | %t)::%p\n"),
1577 ACE_TEXT("ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:")
1578 ACE_TEXT("sigismember failed")));
1579 else if (member == 1)
1581 sigaddset (&this->RT_completion_signals_, si);
1582 this->setup_signal_handler (si);
1586 // Mask all the signals.
1587 this->block_signals ();
1589 // we do not have to create notify manager
1590 // but we should start pseudo-asynchronous accept task
1591 // one per all future acceptors
1593 this->get_asynch_pseudo_task().start ();
1594 return;
1597 ACE_POSIX_SIG_Proactor::~ACE_POSIX_SIG_Proactor ()
1599 this->close ();
1601 // @@ Enable the masked signals again.
1604 ACE_POSIX_Proactor::Proactor_Type
1605 ACE_POSIX_SIG_Proactor::get_impl_type ()
1607 return PROACTOR_SIG;
1611 ACE_POSIX_SIG_Proactor::handle_events (ACE_Time_Value &wait_time)
1613 // Decrement <wait_time> with the amount of time spent in the method
1614 ACE_Countdown_Time countdown (&wait_time);
1615 return this->handle_events_i (&wait_time);
1619 ACE_POSIX_SIG_Proactor::handle_events ()
1621 return this->handle_events_i (0);
1625 ACE_POSIX_SIG_Proactor::notify_completion (int sig_num)
1627 // Get this process id.
1628 pid_t const pid = ACE_OS::getpid ();
1629 if (pid == (pid_t) -1)
1630 ACELIB_ERROR_RETURN ((LM_ERROR,
1631 ACE_TEXT("Error:%N:%l(%P | %t):%p"),
1632 ACE_TEXT("<getpid> failed")),
1633 -1);
1635 // Set the signal information.
1636 sigval value;
1637 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1638 value.sigval_int = -1;
1639 #else
1640 value.sival_int = -1;
1641 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1643 // Queue the signal.
1644 if (sigqueue (pid, sig_num, value) == 0)
1645 return 0;
1647 if (errno != EAGAIN)
1648 ACELIB_ERROR_RETURN ((LM_ERROR,
1649 ACE_TEXT("Error:%N:%l:(%P | %t):%p\n"),
1650 ACE_TEXT("<sigqueue> failed")),
1651 -1);
1652 return -1;
1655 ACE_Asynch_Result_Impl *
1656 ACE_POSIX_SIG_Proactor::create_asynch_timer
1657 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1658 const void *act,
1659 const ACE_Time_Value &tv,
1660 ACE_HANDLE event,
1661 int priority,
1662 int signal_number)
1664 int is_member = 0;
1666 // Fix the signal number.
1667 if (signal_number == -1)
1669 int si;
1670 for (si = ACE_SIGRTMAX;
1671 (is_member == 0) && (si >= ACE_SIGRTMIN);
1672 si--)
1674 is_member = sigismember (&this->RT_completion_signals_,
1675 si);
1676 if (is_member == -1)
1677 ACELIB_ERROR_RETURN ((LM_ERROR,
1678 "%N:%l:(%P | %t)::%s\n",
1679 "ACE_POSIX_SIG_Proactor::create_asynch_timer:"
1680 "sigismember failed"),
1684 if (is_member == 0)
1685 ACELIB_ERROR_RETURN ((LM_ERROR,
1686 "Error:%N:%l:(%P | %t)::%s\n",
1687 "ACE_POSIX_SIG_Proactor::ACE_POSIX_SIG_Proactor:"
1688 "Signal mask set empty"),
1690 else
1691 // + 1 to nullify loop increment.
1692 signal_number = si + 1;
1695 ACE_Asynch_Result_Impl *implementation;
1696 ACE_NEW_RETURN (implementation,
1697 ACE_POSIX_Asynch_Timer (handler_proxy,
1698 act,
1700 event,
1701 priority,
1702 signal_number),
1704 return implementation;
1707 #if 0
1708 static void
1709 sig_handler (int sig_num, siginfo_t *, ucontext_t *)
1711 // Should never be called
1712 ACELIB_DEBUG ((LM_DEBUG,
1713 "%N:%l:(%P | %t)::sig_handler received signal: %d\n",
1714 sig_num));
1716 #endif /*if 0*/
1719 ACE_POSIX_SIG_Proactor::setup_signal_handler (int signal_number) const
1721 // Set up the specified signal so that signal information will be
1722 // passed to sigwaitinfo/sigtimedwait. Don't change the default
1723 // signal handler - having a handler and waiting for the signal can
1724 // produce undefined behavior.
1726 // But can not use SIG_DFL
1727 // With SIG_DFL after delivering the first signal
1728 // SIG_DFL handler resets SA_SIGINFO flags
1729 // and we will lose all information sig_info
1730 // At least all SunOS have such behavior
1731 #if 0
1732 struct sigaction reaction;
1733 sigemptyset (&reaction.sa_mask); // Nothing else to mask.
1734 reaction.sa_flags = SA_SIGINFO; // Realtime flag.
1735 reaction.sa_sigaction = ACE_SIGNAL_C_FUNC (sig_handler); // (SIG_DFL);
1736 int sigaction_return = ACE_OS::sigaction (signal_number,
1737 &reaction,
1739 if (sigaction_return == -1)
1740 ACELIB_ERROR_RETURN ((LM_ERROR,
1741 ACE_TEXT("Error:%p\n"),
1742 ACE_TEXT("Proactor couldnt do sigaction for the RT SIGNAL")),
1743 -1);
1744 #else
1745 ACE_UNUSED_ARG(signal_number);
1746 #endif
1747 return 0;
1752 ACE_POSIX_SIG_Proactor::block_signals () const
1754 return ACE_OS::pthread_sigmask (SIG_BLOCK, &this->RT_completion_signals_, 0);
1757 ssize_t
1758 ACE_POSIX_SIG_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result *result)
1760 size_t i = 0;
1762 //try to find free slot as usual, starting from 0
1763 for (i = 0; i < this->aiocb_list_max_size_; i++)
1764 if (result_list_[i] == 0)
1765 break;
1767 if (i >= this->aiocb_list_max_size_)
1768 ACELIB_ERROR_RETURN ((LM_ERROR,
1769 "%N:%l:(%P | %t)::\n"
1770 "ACE_POSIX_SIG_Proactor::allocate_aio_slot "
1771 "internal Proactor error 1\n"),
1772 -1);
1774 // setup OS notification methods for this aio
1775 // store index!!, not pointer in signal info
1776 result->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
1777 result->aio_sigevent.sigev_signo = result->signal_number ();
1778 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1779 result->aio_sigevent.sigev_value.sigval_int = static_cast<int> (i);
1780 #else
1781 result->aio_sigevent.sigev_value.sival_int = static_cast<int> (i);
1782 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1784 return static_cast<ssize_t> (i);
1788 ACE_POSIX_SIG_Proactor::handle_events_i (const ACE_Time_Value *timeout)
1790 int result_sigwait = 0;
1791 siginfo_t sig_info;
1795 // Wait for the signals.
1796 if (timeout == 0)
1798 result_sigwait = ACE_OS::sigwaitinfo (&this->RT_completion_signals_,
1799 &sig_info);
1801 else
1803 result_sigwait = ACE_OS::sigtimedwait (&this->RT_completion_signals_,
1804 &sig_info,
1805 timeout);
1806 if (result_sigwait == -1 && errno == EAGAIN)
1807 return 0;
1810 while (result_sigwait == -1 && errno == EINTR);
1812 if (result_sigwait == -1) // Not a timeout, not EINTR: tell caller of error
1813 return -1;
1815 // Decide what to do. We always check the completion queue since it's an
1816 // easy, quick check. What is decided here is whether to check for
1817 // I/O completions and, if so, how completely to scan.
1818 int flg_aio = 0; // 1 if AIO Completion possible
1820 size_t index = 0; // start index to scan aiocb list
1821 size_t count = 1; // max number of aiocbs to scan
1822 int error_status = 0;
1823 size_t transfer_count = 0;
1825 if (sig_info.si_code == SI_ASYNCIO)
1827 flg_aio = 1; // AIO signal received
1828 // define index to start
1829 // nothing will happen if it contains garbage
1830 #if defined (ACE_HAS_SIGVAL_SIGVAL_INT)
1831 index = static_cast<size_t> (sig_info.si_value.sigval_int);
1832 #else
1833 index = static_cast<size_t> (sig_info.si_value.sival_int);
1834 #endif /* ACE_HAS_SIGVAL_SIGVAL_INT */
1836 else if (sig_info.si_code != SI_QUEUE)
1838 // Unknown signal code.
1839 // may some other third-party libraries could send it
1840 // or message queue could also generate it !
1841 // So print the message and check our completions
1842 ACELIB_ERROR ((LM_DEBUG,
1843 ACE_TEXT ("%N:%l:(%P | %t): ")
1844 ACE_TEXT ("ACE_POSIX_SIG_Proactor::handle_events: ")
1845 ACE_TEXT ("Unexpected signal code (%d) returned ")
1846 ACE_TEXT ("from sigwait; expecting %d\n"),
1847 result_sigwait, sig_info.si_code));
1848 flg_aio = 1;
1851 int ret_aio = 0;
1852 int ret_que = 0;
1854 if (flg_aio)
1855 for (;; ret_aio++)
1857 ACE_POSIX_Asynch_Result *asynch_result =
1858 find_completed_aio (error_status,
1859 transfer_count,
1860 index,
1861 count);
1863 if (asynch_result == 0)
1864 break;
1866 // Call the application code.
1867 this->application_specific_code (asynch_result,
1868 transfer_count,
1869 0, // No completion key.
1870 error_status); // Error
1873 // process post_completed results
1874 ret_que = this->process_result_queue ();
1876 // Uncomment this if you want to test
1877 // and research the behavior of you system
1878 #if 0
1879 ACELIB_DEBUG ((LM_DEBUG,
1880 "(%t) NumAIO=%d NumQueue=%d\n",
1881 ret_aio, ret_que));
1882 #endif
1884 return ret_aio + ret_que > 0 ? 1 : 0;
1887 #endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
1889 // *********************************************************************
1891 ACE_POSIX_Asynch_Timer::ACE_POSIX_Asynch_Timer
1892 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1893 const void *act,
1894 const ACE_Time_Value &tv,
1895 ACE_HANDLE event,
1896 int priority,
1897 int signal_number)
1898 : ACE_POSIX_Asynch_Result
1899 (handler_proxy, act, event, 0, 0, priority, signal_number),
1900 time_ (tv)
1904 void
1905 ACE_POSIX_Asynch_Timer::complete (size_t /* bytes_transferred */,
1906 int /* success */,
1907 const void * /* completion_key */,
1908 u_long /* error */)
1910 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
1911 if (handler != 0)
1912 handler->handle_time_out (this->time_, this->act ());
1916 // *********************************************************************
1918 ACE_POSIX_Wakeup_Completion::ACE_POSIX_Wakeup_Completion
1919 (const ACE_Handler::Proxy_Ptr &handler_proxy,
1920 const void *act,
1921 ACE_HANDLE event,
1922 int priority,
1923 int signal_number)
1924 : ACE_Asynch_Result_Impl (),
1925 ACE_POSIX_Asynch_Result (handler_proxy,
1926 act,
1927 event,
1930 priority,
1931 signal_number)
1935 ACE_POSIX_Wakeup_Completion::~ACE_POSIX_Wakeup_Completion ()
1939 void
1940 ACE_POSIX_Wakeup_Completion::complete (size_t /* bytes_transferred */,
1941 int /* success */,
1942 const void * /* completion_key */,
1943 u_long /* error */)
1945 ACE_Handler *handler = this->handler_proxy_.get ()->handler ();
1946 if (handler != 0)
1947 handler->handle_wakeup ();
1950 ACE_END_VERSIONED_NAMESPACE_DECL
1952 #endif /* ACE_HAS_AIO_CALLS */