5 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
11 #ifndef BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
12 #define BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18 #include <boost/asio/detail/push_options.hpp>
20 #include <boost/asio/detail/socket_types.hpp> // Must come before posix_time.
22 #include <boost/asio/detail/push_options.hpp>
24 #include <boost/config.hpp>
25 #include <boost/date_time/posix_time/posix_time_types.hpp>
26 #include <boost/shared_ptr.hpp>
28 #include <boost/asio/detail/pop_options.hpp>
30 #include <boost/asio/io_service.hpp>
31 #include <boost/asio/detail/bind_handler.hpp>
32 #include <boost/asio/detail/fd_set_adapter.hpp>
33 #include <boost/asio/detail/mutex.hpp>
34 #include <boost/asio/detail/noncopyable.hpp>
35 #include <boost/asio/detail/reactor_op_queue.hpp>
36 #include <boost/asio/detail/select_interrupter.hpp>
37 #include <boost/asio/detail/select_reactor_fwd.hpp>
38 #include <boost/asio/detail/service_base.hpp>
39 #include <boost/asio/detail/signal_blocker.hpp>
40 #include <boost/asio/detail/socket_ops.hpp>
41 #include <boost/asio/detail/socket_types.hpp>
42 #include <boost/asio/detail/task_io_service.hpp>
43 #include <boost/asio/detail/thread.hpp>
44 #include <boost/asio/detail/timer_queue.hpp>
50 template <bool Own_Thread
>
52 : public boost::asio::detail::service_base
<select_reactor
<Own_Thread
> >
55 // Per-descriptor data.
56 struct per_descriptor_data
61 select_reactor(boost::asio::io_service
& io_service
)
62 : boost::asio::detail::service_base
<
63 select_reactor
<Own_Thread
> >(io_service
),
65 select_in_progress_(false),
70 pending_cancellations_(),
77 boost::asio::detail::signal_blocker sb
;
78 thread_
= new boost::asio::detail::thread(
79 bind_handler(&select_reactor::call_run_thread
, this));
89 // Destroy all user-defined handler objects owned by the service.
90 void shutdown_service()
92 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
99 interrupter_
.interrupt();
105 read_op_queue_
.destroy_operations();
106 write_op_queue_
.destroy_operations();
107 except_op_queue_
.destroy_operations();
109 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
110 timer_queues_
[i
]->destroy_timers();
111 timer_queues_
.clear();
114 // Initialise the task, but only if the reactor is not in its own thread.
119 typedef task_io_service
<select_reactor
<Own_Thread
> > task_io_service_type
;
120 use_service
<task_io_service_type
>(this->get_io_service()).init_task();
124 // Register a socket with the reactor. Returns 0 on success, system error
126 int register_descriptor(socket_type
, per_descriptor_data
&)
131 // Start a new read operation. The handler object will be invoked when the
132 // given descriptor is ready to be read, or an error has occurred.
133 template <typename Handler
>
134 void start_read_op(socket_type descriptor
, per_descriptor_data
&,
135 Handler handler
, bool /*allow_speculative_read*/ = true)
137 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
139 if (read_op_queue_
.enqueue_operation(descriptor
, handler
))
140 interrupter_
.interrupt();
143 // Start a new write operation. The handler object will be invoked when the
144 // given descriptor is ready to be written, or an error has occurred.
145 template <typename Handler
>
146 void start_write_op(socket_type descriptor
, per_descriptor_data
&,
147 Handler handler
, bool /*allow_speculative_write*/ = true)
149 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
151 if (write_op_queue_
.enqueue_operation(descriptor
, handler
))
152 interrupter_
.interrupt();
155 // Start a new exception operation. The handler object will be invoked when
156 // the given descriptor has exception information, or an error has occurred.
157 template <typename Handler
>
158 void start_except_op(socket_type descriptor
,
159 per_descriptor_data
&, Handler handler
)
161 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
163 if (except_op_queue_
.enqueue_operation(descriptor
, handler
))
164 interrupter_
.interrupt();
167 // Wrapper for connect handlers to enable the handler object to be placed
168 // in both the write and the except operation queues, but ensure that only
169 // one of the handlers is called.
170 template <typename Handler
>
171 class connect_handler_wrapper
174 connect_handler_wrapper(socket_type descriptor
,
175 boost::shared_ptr
<bool> completed
,
176 select_reactor
<Own_Thread
>& reactor
, Handler handler
)
177 : descriptor_(descriptor
),
178 completed_(completed
),
184 bool perform(boost::system::error_code
& ec
,
185 std::size_t& bytes_transferred
)
187 // Check whether one of the handlers has already been called. If it has,
188 // then we don't want to do anything in this handler.
191 completed_
.reset(); // Indicate that this handler should not complete.
195 // Cancel the other reactor operation for the connection.
197 reactor_
.enqueue_cancel_ops_unlocked(descriptor_
);
199 // Call the contained handler.
200 return handler_
.perform(ec
, bytes_transferred
);
203 void complete(const boost::system::error_code
& ec
,
204 std::size_t bytes_transferred
)
206 if (completed_
.get())
207 handler_
.complete(ec
, bytes_transferred
);
211 socket_type descriptor_
;
212 boost::shared_ptr
<bool> completed_
;
213 select_reactor
<Own_Thread
>& reactor_
;
217 // Start new write and exception operations. The handler object will be
218 // invoked when the given descriptor is ready for writing or has exception
219 // information available, or an error has occurred. The handler will be called
221 template <typename Handler
>
222 void start_connect_op(socket_type descriptor
,
223 per_descriptor_data
&, Handler handler
)
225 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
228 boost::shared_ptr
<bool> completed(new bool(false));
229 connect_handler_wrapper
<Handler
> wrapped_handler(
230 descriptor
, completed
, *this, handler
);
231 bool interrupt
= write_op_queue_
.enqueue_operation(
232 descriptor
, wrapped_handler
);
233 interrupt
= except_op_queue_
.enqueue_operation(
234 descriptor
, wrapped_handler
) || interrupt
;
236 interrupter_
.interrupt();
240 // Cancel all operations associated with the given descriptor. The
241 // handlers associated with the descriptor will be invoked with the
242 // operation_aborted error.
243 void cancel_ops(socket_type descriptor
, per_descriptor_data
&)
245 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
246 cancel_ops_unlocked(descriptor
);
249 // Enqueue cancellation of all operations associated with the given
250 // descriptor. The handlers associated with the descriptor will be invoked
251 // with the operation_aborted error. This function does not acquire the
252 // select_reactor's mutex, and so should only be used when the reactor lock is
254 void enqueue_cancel_ops_unlocked(socket_type descriptor
)
256 pending_cancellations_
.push_back(descriptor
);
259 // Cancel any operations that are running against the descriptor and remove
260 // its registration from the reactor.
261 void close_descriptor(socket_type descriptor
, per_descriptor_data
&)
263 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
264 cancel_ops_unlocked(descriptor
);
267 // Add a new timer queue to the reactor.
268 template <typename Time_Traits
>
269 void add_timer_queue(timer_queue
<Time_Traits
>& timer_queue
)
271 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
272 timer_queues_
.push_back(&timer_queue
);
275 // Remove a timer queue from the reactor.
276 template <typename Time_Traits
>
277 void remove_timer_queue(timer_queue
<Time_Traits
>& timer_queue
)
279 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
280 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
282 if (timer_queues_
[i
] == &timer_queue
)
284 timer_queues_
.erase(timer_queues_
.begin() + i
);
290 // Schedule a timer in the given timer queue to expire at the specified
291 // absolute time. The handler object will be invoked when the timer expires.
292 template <typename Time_Traits
, typename Handler
>
293 void schedule_timer(timer_queue
<Time_Traits
>& timer_queue
,
294 const typename
Time_Traits::time_type
& time
, Handler handler
, void* token
)
296 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
298 if (timer_queue
.enqueue_timer(time
, handler
, token
))
299 interrupter_
.interrupt();
302 // Cancel the timer associated with the given token. Returns the number of
303 // handlers that have been posted or dispatched.
304 template <typename Time_Traits
>
305 std::size_t cancel_timer(timer_queue
<Time_Traits
>& timer_queue
, void* token
)
307 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
308 std::size_t n
= timer_queue
.cancel_timer(token
);
310 interrupter_
.interrupt();
315 friend class task_io_service
<select_reactor
<Own_Thread
> >;
317 // Run select once until interrupted or events are ready to be dispatched.
320 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
322 // Dispatch any operation cancellations that were made while the select
323 // loop was not running.
324 read_op_queue_
.perform_cancellations();
325 write_op_queue_
.perform_cancellations();
326 except_op_queue_
.perform_cancellations();
327 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
328 timer_queues_
[i
]->dispatch_cancellations();
330 // Check if the thread is supposed to stop.
333 complete_operations_and_timers(lock
);
337 // We can return immediately if there's no work to do and the reactor is
338 // not supposed to block.
339 if (!block
&& read_op_queue_
.empty() && write_op_queue_
.empty()
340 && except_op_queue_
.empty() && all_timer_queues_are_empty())
342 complete_operations_and_timers(lock
);
346 // Set up the descriptor sets.
347 fd_set_adapter read_fds
;
348 read_fds
.set(interrupter_
.read_descriptor());
349 read_op_queue_
.get_descriptors(read_fds
);
350 fd_set_adapter write_fds
;
351 write_op_queue_
.get_descriptors(write_fds
);
352 fd_set_adapter except_fds
;
353 except_op_queue_
.get_descriptors(except_fds
);
354 socket_type max_fd
= read_fds
.max_descriptor();
355 if (write_fds
.max_descriptor() > max_fd
)
356 max_fd
= write_fds
.max_descriptor();
357 if (except_fds
.max_descriptor() > max_fd
)
358 max_fd
= except_fds
.max_descriptor();
360 // Block on the select call without holding the lock so that new
361 // operations can be started while the call is executing.
362 timeval tv_buf
= { 0, 0 };
363 timeval
* tv
= block
? get_timeout(tv_buf
) : &tv_buf
;
364 select_in_progress_
= true;
366 boost::system::error_code ec
;
367 int retval
= socket_ops::select(static_cast<int>(max_fd
+ 1),
368 read_fds
, write_fds
, except_fds
, tv
, ec
);
370 select_in_progress_
= false;
372 // Block signals while dispatching operations.
373 boost::asio::detail::signal_blocker sb
;
375 // Reset the interrupter.
376 if (retval
> 0 && read_fds
.is_set(interrupter_
.read_descriptor()))
377 interrupter_
.reset();
379 // Dispatch all ready operations.
382 // Exception operations must be processed first to ensure that any
383 // out-of-band data is read before normal data.
384 except_op_queue_
.perform_operations_for_descriptors(
385 except_fds
, boost::system::error_code());
386 read_op_queue_
.perform_operations_for_descriptors(
387 read_fds
, boost::system::error_code());
388 write_op_queue_
.perform_operations_for_descriptors(
389 write_fds
, boost::system::error_code());
390 except_op_queue_
.perform_cancellations();
391 read_op_queue_
.perform_cancellations();
392 write_op_queue_
.perform_cancellations();
394 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
396 timer_queues_
[i
]->dispatch_timers();
397 timer_queues_
[i
]->dispatch_cancellations();
400 // Issue any pending cancellations.
401 for (size_t i
= 0; i
< pending_cancellations_
.size(); ++i
)
402 cancel_ops_unlocked(pending_cancellations_
[i
]);
403 pending_cancellations_
.clear();
405 complete_operations_and_timers(lock
);
408 // Run the select loop in the thread.
411 boost::asio::detail::mutex::scoped_lock
lock(mutex_
);
412 while (!stop_thread_
)
420 // Entry point for the select loop thread.
421 static void call_run_thread(select_reactor
* reactor
)
423 reactor
->run_thread();
426 // Interrupt the select loop.
429 interrupter_
.interrupt();
432 // Check if all timer queues are empty.
433 bool all_timer_queues_are_empty() const
435 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
436 if (!timer_queues_
[i
]->empty())
441 // Get the timeout value for the select call.
442 timeval
* get_timeout(timeval
& tv
)
444 if (all_timer_queues_are_empty())
447 // By default we will wait no longer than 5 minutes. This will ensure that
448 // any changes to the system clock are detected after no longer than this.
449 boost::posix_time::time_duration minimum_wait_duration
450 = boost::posix_time::minutes(5);
452 for (std::size_t i
= 0; i
< timer_queues_
.size(); ++i
)
454 boost::posix_time::time_duration wait_duration
455 = timer_queues_
[i
]->wait_duration();
456 if (wait_duration
< minimum_wait_duration
)
457 minimum_wait_duration
= wait_duration
;
460 if (minimum_wait_duration
> boost::posix_time::time_duration())
462 tv
.tv_sec
= minimum_wait_duration
.total_seconds();
463 tv
.tv_usec
= minimum_wait_duration
.total_microseconds() % 1000000;
474 // Cancel all operations associated with the given descriptor. The do_cancel
475 // function of the handler objects will be invoked. This function does not
476 // acquire the select_reactor's mutex.
477 void cancel_ops_unlocked(socket_type descriptor
)
479 bool interrupt
= read_op_queue_
.cancel_operations(descriptor
);
480 interrupt
= write_op_queue_
.cancel_operations(descriptor
) || interrupt
;
481 interrupt
= except_op_queue_
.cancel_operations(descriptor
) || interrupt
;
483 interrupter_
.interrupt();
486 // Clean up operations and timers. We must not hold the lock since the
487 // destructors may make calls back into this reactor. We make a copy of the
488 // vector of timer queues since the original may be modified while the lock
490 void complete_operations_and_timers(
491 boost::asio::detail::mutex::scoped_lock
& lock
)
493 timer_queues_for_cleanup_
= timer_queues_
;
495 read_op_queue_
.complete_operations();
496 write_op_queue_
.complete_operations();
497 except_op_queue_
.complete_operations();
498 for (std::size_t i
= 0; i
< timer_queues_for_cleanup_
.size(); ++i
)
499 timer_queues_for_cleanup_
[i
]->complete_timers();
502 // Mutex to protect access to internal data.
503 boost::asio::detail::mutex mutex_
;
505 // Whether the select loop is currently running or not.
506 bool select_in_progress_
;
508 // The interrupter is used to break a blocking select call.
509 select_interrupter interrupter_
;
511 // The queue of read operations.
512 reactor_op_queue
<socket_type
> read_op_queue_
;
514 // The queue of write operations.
515 reactor_op_queue
<socket_type
> write_op_queue_
;
517 // The queue of exception operations.
518 reactor_op_queue
<socket_type
> except_op_queue_
;
521 std::vector
<timer_queue_base
*> timer_queues_
;
523 // A copy of the timer queues, used when cleaning up timers. The copy is
524 // stored as a class data member to avoid unnecessary memory allocation.
525 std::vector
<timer_queue_base
*> timer_queues_for_cleanup_
;
527 // The descriptors that are pending cancellation.
528 std::vector
<socket_type
> pending_cancellations_
;
530 // Does the reactor loop thread need to stop.
533 // The thread that is running the reactor loop.
534 boost::asio::detail::thread
* thread_
;
536 // Whether the service has been shut down.
540 } // namespace detail
544 #include <boost/asio/detail/pop_options.hpp>
546 #endif // BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP