1 #include "ace/POSIX_CB_Proactor.h"
3 #if defined (ACE_HAS_AIO_CALLS) && !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
5 #include "ace/Task_T.h"
6 #include "ace/Log_Category.h"
7 #include "ace/Object_Manager.h"
8 #include "ace/OS_NS_sys_time.h"
10 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
12 ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations
)
13 : ACE_POSIX_AIOCB_Proactor (max_aio_operations
,
14 ACE_POSIX_Proactor::PROACTOR_CB
),
15 sema_ ((unsigned int) 0)
17 // we should start pseudo-asynchronous accept task
18 // one per all future acceptors
20 this->get_asynch_pseudo_task ().start ();
24 ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor ()
29 ACE_POSIX_Proactor::Proactor_Type
30 ACE_POSIX_CB_Proactor::get_impl_type ()
35 void ACE_POSIX_CB_Proactor::aio_completion_func (sigval cb_data
)
37 ACE_POSIX_CB_Proactor
* impl
= static_cast<ACE_POSIX_CB_Proactor
*> (cb_data
.sival_ptr
);
39 impl
->notify_completion (0);
42 #if defined (ACE_HAS_SIG_C_FUNC)
44 ACE_POSIX_CB_Proactor_aio_completion (sigval cb_data
)
46 ACE_POSIX_CB_Proactor::aio_completion_func (cb_data
);
48 #endif /* ACE_HAS_SIG_C_FUNC */
51 ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value
&wait_time
)
53 // Decrement <wait_time> with the amount of time spent in the method
54 ACE_Countdown_Time
countdown (&wait_time
);
55 return this->handle_events_i (wait_time
.msec ());
59 ACE_POSIX_CB_Proactor::handle_events ()
61 return this->handle_events_i (ACE_INFINITE
);
65 ACE_POSIX_CB_Proactor::notify_completion (int sig_num
)
67 ACE_UNUSED_ARG (sig_num
);
69 return this->sema_
.release();
74 ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result
*result
)
76 ssize_t
const slot
= ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (result
);
80 // setup OS notification methods for this aio
81 // @@ TODO: This gets the completion method back to this proactor to
82 // find the completed aiocb. It would be so much better to not only get
83 // the proactor, but the aiocb as well.
84 result
->aio_sigevent
.sigev_notify
= SIGEV_THREAD
;
85 # if defined (ACE_HAS_SIG_C_FUNC)
86 result
->aio_sigevent
.sigev_notify_function
= ACE_POSIX_CB_Proactor_aio_completion
;
88 result
->aio_sigevent
.sigev_notify_function
= aio_completion_func
;
89 # endif /* ACE_HAS_SIG_C_FUNC */
90 result
->aio_sigevent
.sigev_notify_attributes
= 0;
92 result
->aio_sigevent
.sigev_value
.sival_ptr
= this ;
98 ACE_POSIX_CB_Proactor::handle_events_i (u_long milli_seconds
)
102 // Wait for the signals.
103 if (milli_seconds
== ACE_INFINITE
)
105 result_wait
= this->sema_
.acquire();
109 // Wait for <milli_seconds> amount of time.
110 ACE_Time_Value abs_time
= ACE_OS::gettimeofday ()
111 + ACE_Time_Value (0, milli_seconds
* 1000);
113 result_wait
= this->sema_
.acquire(abs_time
);
117 // but let continue work in case of errors
118 // we should check "post_completed" queue
119 if (result_wait
== -1)
121 int const lerror
= errno
;
122 if (lerror
!= ETIME
&& // timeout
123 lerror
!= EINTR
) // interrupted system call
124 ACELIB_ERROR ((LM_ERROR
,
125 ACE_TEXT("%N:%l:(%P | %t)::%p\n"),
126 ACE_TEXT("ACE_POSIX_CB_Proactor::handle_events:")
127 ACE_TEXT("semaphore acquire failed")
131 size_t index
= 0; // start index to scan aiocb list
132 size_t count
= this->aiocb_list_max_size_
; // max number to iterate
134 int error_status
= 0;
135 size_t return_status
= 0;
142 ACE_POSIX_Asynch_Result
* asynch_result
=
143 this->find_completed_aio (error_status
,
148 if (asynch_result
== 0)
151 // Call the application code.
152 this->application_specific_code (asynch_result
,
153 return_status
, // Bytes transferred.
154 0, // No completion key.
155 error_status
); // Error
158 // process post_completed results
159 ret_que
= this->process_result_queue ();
161 // Uncomment this if you want to test
162 // and research the behavior of you system
163 // ACELIB_DEBUG ((LM_DEBUG,
164 // "(%t) NumAIO=%d NumQueue=%d\n",
165 // ret_aio, ret_que));
167 return ret_aio
+ ret_que
> 0 ? 1 : 0;
170 ACE_END_VERSIONED_NAMESPACE_DECL
172 #endif /* ACE_HAS_AIO_CALLS && !ACE_HAS_BROKEN_SIGEVENT_STRUCT */