1 #include "ace/POSIX_CB_Proactor.h"
3 #if defined (ACE_HAS_AIO_CALLS) && !defined (ACE_HAS_BROKEN_SIGEVENT_STRUCT)
5 #include "ace/Task_T.h"
6 #include "ace/Log_Category.h"
7 #include "ace/Object_Manager.h"
8 #include "ace/OS_NS_sys_time.h"
10 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
12 ACE_POSIX_CB_Proactor::ACE_POSIX_CB_Proactor (size_t max_aio_operations
)
13 : ACE_POSIX_AIOCB_Proactor (max_aio_operations
,
14 ACE_POSIX_Proactor::PROACTOR_CB
),
15 sema_ ((unsigned int) 0)
17 // we should start pseudo-asynchronous accept task
18 // one per all future acceptors
20 this->get_asynch_pseudo_task ().start ();
24 ACE_POSIX_CB_Proactor::~ACE_POSIX_CB_Proactor (void)
29 ACE_POSIX_Proactor::Proactor_Type
30 ACE_POSIX_CB_Proactor::get_impl_type (void)
35 void ACE_POSIX_CB_Proactor::aio_completion_func (sigval cb_data
)
37 ACE_POSIX_CB_Proactor
* impl
= static_cast<ACE_POSIX_CB_Proactor
*> (cb_data
.sival_ptr
);
39 impl
->notify_completion (0);
42 #if defined (ACE_HAS_SIG_C_FUNC)
44 ACE_POSIX_CB_Proactor_aio_completion (sigval cb_data
)
46 ACE_POSIX_CB_Proactor::aio_completion_func (cb_data
);
48 #endif /* ACE_HAS_SIG_C_FUNC */
51 ACE_POSIX_CB_Proactor::handle_events (ACE_Time_Value
&wait_time
)
53 // Decrement <wait_time> with the amount of time spent in the method
54 ACE_Countdown_Time
countdown (&wait_time
);
55 return this->handle_events_i (wait_time
.msec ());
59 ACE_POSIX_CB_Proactor::handle_events (void)
61 return this->handle_events_i (ACE_INFINITE
);
65 ACE_POSIX_CB_Proactor::notify_completion (int sig_num
)
67 ACE_UNUSED_ARG (sig_num
);
69 return this->sema_
.release();
74 ACE_POSIX_CB_Proactor::allocate_aio_slot (ACE_POSIX_Asynch_Result
*result
)
76 ssize_t slot
= ACE_POSIX_AIOCB_Proactor::allocate_aio_slot (result
);
80 // setup OS notification methods for this aio
81 // @@ TODO: This gets the completion method back to this proactor to
82 // find the completed aiocb. It would be so much better to not only get
83 // the proactor, but the aiocb as well.
84 result
->aio_sigevent
.sigev_notify
= SIGEV_THREAD
;
85 # if defined (ACE_HAS_SIG_C_FUNC)
86 result
->aio_sigevent
.sigev_notify_function
=
87 ACE_POSIX_CB_Proactor_aio_completion
;
89 result
->aio_sigevent
.sigev_notify_function
= aio_completion_func
;
90 # endif /* ACE_HAS_SIG_C_FUNC */
91 result
->aio_sigevent
.sigev_notify_attributes
= 0;
93 result
->aio_sigevent
.sigev_value
.sival_ptr
= this ;
99 ACE_POSIX_CB_Proactor::handle_events_i (u_long milli_seconds
)
104 // Wait for the signals.
105 if (milli_seconds
== ACE_INFINITE
)
107 result_wait
= this->sema_
.acquire();
111 // Wait for <milli_seconds> amount of time.
112 ACE_Time_Value abs_time
= ACE_OS::gettimeofday ()
113 + ACE_Time_Value (0, milli_seconds
* 1000);
115 result_wait
= this->sema_
.acquire(abs_time
);
119 // but let continue work in case of errors
120 // we should check "post_completed" queue
121 if (result_wait
== -1)
123 int const lerror
= errno
;
124 if (lerror
!= ETIME
&& // timeout
125 lerror
!= EINTR
) // interrupted system call
126 ACELIB_ERROR ((LM_ERROR
,
127 ACE_TEXT("%N:%l:(%P | %t)::%p\n"),
128 ACE_TEXT("ACE_POSIX_CB_Proactor::handle_events:")
129 ACE_TEXT("semaphore acquire failed")
133 size_t index
= 0; // start index to scan aiocb list
134 size_t count
= this->aiocb_list_max_size_
; // max number to iterate
136 int error_status
= 0;
137 size_t return_status
= 0;
144 ACE_POSIX_Asynch_Result
* asynch_result
=
145 this->find_completed_aio (error_status
,
150 if (asynch_result
== 0)
153 // Call the application code.
154 this->application_specific_code (asynch_result
,
155 return_status
, // Bytes transferred.
156 0, // No completion key.
157 error_status
); // Error
160 // process post_completed results
161 ret_que
= this->process_result_queue ();
163 // Uncomment this if you want to test
164 // and research the behavior of you system
165 // ACELIB_DEBUG ((LM_DEBUG,
166 // "(%t) NumAIO=%d NumQueue=%d\n",
167 // ret_aio, ret_que));
169 return ret_aio
+ ret_que
> 0 ? 1 : 0;
172 ACE_END_VERSIONED_NAMESPACE_DECL
174 #endif /* ACE_HAS_AIO_CALLS && !ACE_HAS_BROKEN_SIGEVENT_STRUCT */