fix doc example typo
[boost.git] / boost / asio / detail / select_reactor.hpp
blob77caf5466d313b90e2233ae26cbe20905c592129
1 //
2 // select_reactor.hpp
3 // ~~~~~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
11 #ifndef BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
12 #define BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP
14 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
15 # pragma once
16 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
18 #include <boost/asio/detail/push_options.hpp>
20 #include <boost/asio/detail/socket_types.hpp> // Must come before posix_time.
22 #include <boost/asio/detail/push_options.hpp>
23 #include <cstddef>
24 #include <boost/config.hpp>
25 #include <boost/date_time/posix_time/posix_time_types.hpp>
26 #include <boost/shared_ptr.hpp>
27 #include <vector>
28 #include <boost/asio/detail/pop_options.hpp>
30 #include <boost/asio/io_service.hpp>
31 #include <boost/asio/detail/bind_handler.hpp>
32 #include <boost/asio/detail/fd_set_adapter.hpp>
33 #include <boost/asio/detail/mutex.hpp>
34 #include <boost/asio/detail/noncopyable.hpp>
35 #include <boost/asio/detail/reactor_op_queue.hpp>
36 #include <boost/asio/detail/select_interrupter.hpp>
37 #include <boost/asio/detail/select_reactor_fwd.hpp>
38 #include <boost/asio/detail/service_base.hpp>
39 #include <boost/asio/detail/signal_blocker.hpp>
40 #include <boost/asio/detail/socket_ops.hpp>
41 #include <boost/asio/detail/socket_types.hpp>
42 #include <boost/asio/detail/task_io_service.hpp>
43 #include <boost/asio/detail/thread.hpp>
44 #include <boost/asio/detail/timer_queue.hpp>
46 namespace boost {
47 namespace asio {
48 namespace detail {
50 template <bool Own_Thread>
51 class select_reactor
52 : public boost::asio::detail::service_base<select_reactor<Own_Thread> >
54 public:
55 // Per-descriptor data.
56 struct per_descriptor_data
60 // Constructor.
61 select_reactor(boost::asio::io_service& io_service)
62 : boost::asio::detail::service_base<
63 select_reactor<Own_Thread> >(io_service),
64 mutex_(),
65 select_in_progress_(false),
66 interrupter_(),
67 read_op_queue_(),
68 write_op_queue_(),
69 except_op_queue_(),
70 pending_cancellations_(),
71 stop_thread_(false),
72 thread_(0),
73 shutdown_(false)
75 if (Own_Thread)
77 boost::asio::detail::signal_blocker sb;
78 thread_ = new boost::asio::detail::thread(
79 bind_handler(&select_reactor::call_run_thread, this));
83 // Destructor.
84 ~select_reactor()
86 shutdown_service();
89 // Destroy all user-defined handler objects owned by the service.
90 void shutdown_service()
92 boost::asio::detail::mutex::scoped_lock lock(mutex_);
93 shutdown_ = true;
94 stop_thread_ = true;
95 lock.unlock();
97 if (thread_)
99 interrupter_.interrupt();
100 thread_->join();
101 delete thread_;
102 thread_ = 0;
105 read_op_queue_.destroy_operations();
106 write_op_queue_.destroy_operations();
107 except_op_queue_.destroy_operations();
109 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
110 timer_queues_[i]->destroy_timers();
111 timer_queues_.clear();
114 // Initialise the task, but only if the reactor is not in its own thread.
115 void init_task()
117 if (!Own_Thread)
119 typedef task_io_service<select_reactor<Own_Thread> > task_io_service_type;
120 use_service<task_io_service_type>(this->get_io_service()).init_task();
124 // Register a socket with the reactor. Returns 0 on success, system error
125 // code on failure.
126 int register_descriptor(socket_type, per_descriptor_data&)
128 return 0;
131 // Start a new read operation. The handler object will be invoked when the
132 // given descriptor is ready to be read, or an error has occurred.
133 template <typename Handler>
134 void start_read_op(socket_type descriptor, per_descriptor_data&,
135 Handler handler, bool /*allow_speculative_read*/ = true)
137 boost::asio::detail::mutex::scoped_lock lock(mutex_);
138 if (!shutdown_)
139 if (read_op_queue_.enqueue_operation(descriptor, handler))
140 interrupter_.interrupt();
143 // Start a new write operation. The handler object will be invoked when the
144 // given descriptor is ready to be written, or an error has occurred.
145 template <typename Handler>
146 void start_write_op(socket_type descriptor, per_descriptor_data&,
147 Handler handler, bool /*allow_speculative_write*/ = true)
149 boost::asio::detail::mutex::scoped_lock lock(mutex_);
150 if (!shutdown_)
151 if (write_op_queue_.enqueue_operation(descriptor, handler))
152 interrupter_.interrupt();
155 // Start a new exception operation. The handler object will be invoked when
156 // the given descriptor has exception information, or an error has occurred.
157 template <typename Handler>
158 void start_except_op(socket_type descriptor,
159 per_descriptor_data&, Handler handler)
161 boost::asio::detail::mutex::scoped_lock lock(mutex_);
162 if (!shutdown_)
163 if (except_op_queue_.enqueue_operation(descriptor, handler))
164 interrupter_.interrupt();
167 // Wrapper for connect handlers to enable the handler object to be placed
168 // in both the write and the except operation queues, but ensure that only
169 // one of the handlers is called.
170 template <typename Handler>
171 class connect_handler_wrapper
173 public:
174 connect_handler_wrapper(socket_type descriptor,
175 boost::shared_ptr<bool> completed,
176 select_reactor<Own_Thread>& reactor, Handler handler)
177 : descriptor_(descriptor),
178 completed_(completed),
179 reactor_(reactor),
180 handler_(handler)
184 bool perform(boost::system::error_code& ec,
185 std::size_t& bytes_transferred)
187 // Check whether one of the handlers has already been called. If it has,
188 // then we don't want to do anything in this handler.
189 if (*completed_)
191 completed_.reset(); // Indicate that this handler should not complete.
192 return true;
195 // Cancel the other reactor operation for the connection.
196 *completed_ = true;
197 reactor_.enqueue_cancel_ops_unlocked(descriptor_);
199 // Call the contained handler.
200 return handler_.perform(ec, bytes_transferred);
203 void complete(const boost::system::error_code& ec,
204 std::size_t bytes_transferred)
206 if (completed_.get())
207 handler_.complete(ec, bytes_transferred);
210 private:
211 socket_type descriptor_;
212 boost::shared_ptr<bool> completed_;
213 select_reactor<Own_Thread>& reactor_;
214 Handler handler_;
217 // Start new write and exception operations. The handler object will be
218 // invoked when the given descriptor is ready for writing or has exception
219 // information available, or an error has occurred. The handler will be called
220 // only once.
221 template <typename Handler>
222 void start_connect_op(socket_type descriptor,
223 per_descriptor_data&, Handler handler)
225 boost::asio::detail::mutex::scoped_lock lock(mutex_);
226 if (!shutdown_)
228 boost::shared_ptr<bool> completed(new bool(false));
229 connect_handler_wrapper<Handler> wrapped_handler(
230 descriptor, completed, *this, handler);
231 bool interrupt = write_op_queue_.enqueue_operation(
232 descriptor, wrapped_handler);
233 interrupt = except_op_queue_.enqueue_operation(
234 descriptor, wrapped_handler) || interrupt;
235 if (interrupt)
236 interrupter_.interrupt();
240 // Cancel all operations associated with the given descriptor. The
241 // handlers associated with the descriptor will be invoked with the
242 // operation_aborted error.
243 void cancel_ops(socket_type descriptor, per_descriptor_data&)
245 boost::asio::detail::mutex::scoped_lock lock(mutex_);
246 cancel_ops_unlocked(descriptor);
249 // Enqueue cancellation of all operations associated with the given
250 // descriptor. The handlers associated with the descriptor will be invoked
251 // with the operation_aborted error. This function does not acquire the
252 // select_reactor's mutex, and so should only be used when the reactor lock is
253 // already held.
254 void enqueue_cancel_ops_unlocked(socket_type descriptor)
256 pending_cancellations_.push_back(descriptor);
259 // Cancel any operations that are running against the descriptor and remove
260 // its registration from the reactor.
261 void close_descriptor(socket_type descriptor, per_descriptor_data&)
263 boost::asio::detail::mutex::scoped_lock lock(mutex_);
264 cancel_ops_unlocked(descriptor);
267 // Add a new timer queue to the reactor.
268 template <typename Time_Traits>
269 void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
271 boost::asio::detail::mutex::scoped_lock lock(mutex_);
272 timer_queues_.push_back(&timer_queue);
275 // Remove a timer queue from the reactor.
276 template <typename Time_Traits>
277 void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
279 boost::asio::detail::mutex::scoped_lock lock(mutex_);
280 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
282 if (timer_queues_[i] == &timer_queue)
284 timer_queues_.erase(timer_queues_.begin() + i);
285 return;
290 // Schedule a timer in the given timer queue to expire at the specified
291 // absolute time. The handler object will be invoked when the timer expires.
292 template <typename Time_Traits, typename Handler>
293 void schedule_timer(timer_queue<Time_Traits>& timer_queue,
294 const typename Time_Traits::time_type& time, Handler handler, void* token)
296 boost::asio::detail::mutex::scoped_lock lock(mutex_);
297 if (!shutdown_)
298 if (timer_queue.enqueue_timer(time, handler, token))
299 interrupter_.interrupt();
302 // Cancel the timer associated with the given token. Returns the number of
303 // handlers that have been posted or dispatched.
304 template <typename Time_Traits>
305 std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
307 boost::asio::detail::mutex::scoped_lock lock(mutex_);
308 std::size_t n = timer_queue.cancel_timer(token);
309 if (n > 0)
310 interrupter_.interrupt();
311 return n;
314 private:
315 friend class task_io_service<select_reactor<Own_Thread> >;
317 // Run select once until interrupted or events are ready to be dispatched.
318 void run(bool block)
320 boost::asio::detail::mutex::scoped_lock lock(mutex_);
322 // Dispatch any operation cancellations that were made while the select
323 // loop was not running.
324 read_op_queue_.perform_cancellations();
325 write_op_queue_.perform_cancellations();
326 except_op_queue_.perform_cancellations();
327 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
328 timer_queues_[i]->dispatch_cancellations();
330 // Check if the thread is supposed to stop.
331 if (stop_thread_)
333 complete_operations_and_timers(lock);
334 return;
337 // We can return immediately if there's no work to do and the reactor is
338 // not supposed to block.
339 if (!block && read_op_queue_.empty() && write_op_queue_.empty()
340 && except_op_queue_.empty() && all_timer_queues_are_empty())
342 complete_operations_and_timers(lock);
343 return;
346 // Set up the descriptor sets.
347 fd_set_adapter read_fds;
348 read_fds.set(interrupter_.read_descriptor());
349 read_op_queue_.get_descriptors(read_fds);
350 fd_set_adapter write_fds;
351 write_op_queue_.get_descriptors(write_fds);
352 fd_set_adapter except_fds;
353 except_op_queue_.get_descriptors(except_fds);
354 socket_type max_fd = read_fds.max_descriptor();
355 if (write_fds.max_descriptor() > max_fd)
356 max_fd = write_fds.max_descriptor();
357 if (except_fds.max_descriptor() > max_fd)
358 max_fd = except_fds.max_descriptor();
360 // Block on the select call without holding the lock so that new
361 // operations can be started while the call is executing.
362 timeval tv_buf = { 0, 0 };
363 timeval* tv = block ? get_timeout(tv_buf) : &tv_buf;
364 select_in_progress_ = true;
365 lock.unlock();
366 boost::system::error_code ec;
367 int retval = socket_ops::select(static_cast<int>(max_fd + 1),
368 read_fds, write_fds, except_fds, tv, ec);
369 lock.lock();
370 select_in_progress_ = false;
372 // Block signals while dispatching operations.
373 boost::asio::detail::signal_blocker sb;
375 // Reset the interrupter.
376 if (retval > 0 && read_fds.is_set(interrupter_.read_descriptor()))
377 interrupter_.reset();
379 // Dispatch all ready operations.
380 if (retval > 0)
382 // Exception operations must be processed first to ensure that any
383 // out-of-band data is read before normal data.
384 except_op_queue_.perform_operations_for_descriptors(
385 except_fds, boost::system::error_code());
386 read_op_queue_.perform_operations_for_descriptors(
387 read_fds, boost::system::error_code());
388 write_op_queue_.perform_operations_for_descriptors(
389 write_fds, boost::system::error_code());
390 except_op_queue_.perform_cancellations();
391 read_op_queue_.perform_cancellations();
392 write_op_queue_.perform_cancellations();
394 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
396 timer_queues_[i]->dispatch_timers();
397 timer_queues_[i]->dispatch_cancellations();
400 // Issue any pending cancellations.
401 for (size_t i = 0; i < pending_cancellations_.size(); ++i)
402 cancel_ops_unlocked(pending_cancellations_[i]);
403 pending_cancellations_.clear();
405 complete_operations_and_timers(lock);
408 // Run the select loop in the thread.
409 void run_thread()
411 boost::asio::detail::mutex::scoped_lock lock(mutex_);
412 while (!stop_thread_)
414 lock.unlock();
415 run(true);
416 lock.lock();
420 // Entry point for the select loop thread.
421 static void call_run_thread(select_reactor* reactor)
423 reactor->run_thread();
426 // Interrupt the select loop.
427 void interrupt()
429 interrupter_.interrupt();
432 // Check if all timer queues are empty.
433 bool all_timer_queues_are_empty() const
435 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
436 if (!timer_queues_[i]->empty())
437 return false;
438 return true;
441 // Get the timeout value for the select call.
442 timeval* get_timeout(timeval& tv)
444 if (all_timer_queues_are_empty())
445 return 0;
447 // By default we will wait no longer than 5 minutes. This will ensure that
448 // any changes to the system clock are detected after no longer than this.
449 boost::posix_time::time_duration minimum_wait_duration
450 = boost::posix_time::minutes(5);
452 for (std::size_t i = 0; i < timer_queues_.size(); ++i)
454 boost::posix_time::time_duration wait_duration
455 = timer_queues_[i]->wait_duration();
456 if (wait_duration < minimum_wait_duration)
457 minimum_wait_duration = wait_duration;
460 if (minimum_wait_duration > boost::posix_time::time_duration())
462 tv.tv_sec = minimum_wait_duration.total_seconds();
463 tv.tv_usec = minimum_wait_duration.total_microseconds() % 1000000;
465 else
467 tv.tv_sec = 0;
468 tv.tv_usec = 0;
471 return &tv;
474 // Cancel all operations associated with the given descriptor. The do_cancel
475 // function of the handler objects will be invoked. This function does not
476 // acquire the select_reactor's mutex.
477 void cancel_ops_unlocked(socket_type descriptor)
479 bool interrupt = read_op_queue_.cancel_operations(descriptor);
480 interrupt = write_op_queue_.cancel_operations(descriptor) || interrupt;
481 interrupt = except_op_queue_.cancel_operations(descriptor) || interrupt;
482 if (interrupt)
483 interrupter_.interrupt();
486 // Clean up operations and timers. We must not hold the lock since the
487 // destructors may make calls back into this reactor. We make a copy of the
488 // vector of timer queues since the original may be modified while the lock
489 // is not held.
490 void complete_operations_and_timers(
491 boost::asio::detail::mutex::scoped_lock& lock)
493 timer_queues_for_cleanup_ = timer_queues_;
494 lock.unlock();
495 read_op_queue_.complete_operations();
496 write_op_queue_.complete_operations();
497 except_op_queue_.complete_operations();
498 for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
499 timer_queues_for_cleanup_[i]->complete_timers();
502 // Mutex to protect access to internal data.
503 boost::asio::detail::mutex mutex_;
505 // Whether the select loop is currently running or not.
506 bool select_in_progress_;
508 // The interrupter is used to break a blocking select call.
509 select_interrupter interrupter_;
511 // The queue of read operations.
512 reactor_op_queue<socket_type> read_op_queue_;
514 // The queue of write operations.
515 reactor_op_queue<socket_type> write_op_queue_;
517 // The queue of exception operations.
518 reactor_op_queue<socket_type> except_op_queue_;
520 // The timer queues.
521 std::vector<timer_queue_base*> timer_queues_;
523 // A copy of the timer queues, used when cleaning up timers. The copy is
524 // stored as a class data member to avoid unnecessary memory allocation.
525 std::vector<timer_queue_base*> timer_queues_for_cleanup_;
527 // The descriptors that are pending cancellation.
528 std::vector<socket_type> pending_cancellations_;
530 // Does the reactor loop thread need to stop.
531 bool stop_thread_;
533 // The thread that is running the reactor loop.
534 boost::asio::detail::thread* thread_;
536 // Whether the service has been shut down.
537 bool shutdown_;
540 } // namespace detail
541 } // namespace asio
542 } // namespace boost
544 #include <boost/asio/detail/pop_options.hpp>
546 #endif // BOOST_ASIO_DETAIL_SELECT_REACTOR_HPP