rAc - revert invalid suggestions to edit mode
[chromium-blink-merge.git] / mojo / common / handle_watcher.cc
blobaf26b96cb6467b349ce23272b8dad9416c7c6b3d
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/handle_watcher.h"
7 #include <map>
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/threading/thread.h"
16 #include "base/time/tick_clock.h"
17 #include "base/time/time.h"
18 #include "mojo/common/message_pump_mojo.h"
19 #include "mojo/common/message_pump_mojo_handler.h"
21 namespace mojo {
22 namespace common {
24 typedef int WatcherID;
26 namespace {
28 const char kWatcherThreadName[] = "handle-watcher-thread";
30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
31 MessagePumpMojo* message_pump_mojo = NULL;
33 scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
34 message_pump_mojo = new MessagePumpMojo;
35 return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
38 // Tracks the data for a single call to Start().
39 struct WatchData {
40 WatchData()
41 : id(0),
42 wait_flags(MOJO_WAIT_FLAG_NONE),
43 message_loop(NULL) {}
45 WatcherID id;
46 Handle handle;
47 MojoWaitFlags wait_flags;
48 base::TimeTicks deadline;
49 base::Callback<void(MojoResult)> callback;
50 scoped_refptr<base::MessageLoopProxy> message_loop;
53 // WatcherBackend --------------------------------------------------------------
55 // WatcherBackend is responsible for managing the requests and interacting with
56 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
57 // thread WatcherThreadManager creates.
58 class WatcherBackend : public MessagePumpMojoHandler {
59 public:
60 WatcherBackend();
61 virtual ~WatcherBackend();
63 void StartWatching(const WatchData& data);
64 void StopWatching(WatcherID watcher_id);
66 private:
67 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
69 // Invoked when a handle needs to be removed and notified.
70 void RemoveAndNotify(const Handle& handle, MojoResult result);
72 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
73 // and sets |handle| to the Handle. Returns false if not a known id.
74 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
76 // MessagePumpMojoHandler overrides:
77 virtual void OnHandleReady(const Handle& handle) OVERRIDE;
78 virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
80 // Maps from assigned id to WatchData.
81 HandleToWatchDataMap handle_to_data_;
83 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
86 WatcherBackend::WatcherBackend() {
89 WatcherBackend::~WatcherBackend() {
92 void WatcherBackend::StartWatching(const WatchData& data) {
93 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
95 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
97 handle_to_data_[data.handle] = data;
98 message_pump_mojo->AddHandler(this, data.handle,
99 data.wait_flags,
100 data.deadline);
103 void WatcherBackend::StopWatching(WatcherID watcher_id) {
104 // Because of the thread hop it is entirely possible to get here and not
105 // have a valid handle registered for |watcher_id|.
106 Handle handle;
107 if (!GetMojoHandleByWatcherID(watcher_id, &handle))
108 return;
110 handle_to_data_.erase(handle);
111 message_pump_mojo->RemoveHandler(handle);
114 void WatcherBackend::RemoveAndNotify(const Handle& handle,
115 MojoResult result) {
116 if (handle_to_data_.count(handle) == 0)
117 return;
119 const WatchData data(handle_to_data_[handle]);
120 handle_to_data_.erase(handle);
121 message_pump_mojo->RemoveHandler(handle);
122 data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
125 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
126 Handle* handle) const {
127 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
128 i != handle_to_data_.end(); ++i) {
129 if (i->second.id == watcher_id) {
130 *handle = i->second.handle;
131 return true;
134 return false;
137 void WatcherBackend::OnHandleReady(const Handle& handle) {
138 RemoveAndNotify(handle, MOJO_RESULT_OK);
141 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
142 RemoveAndNotify(handle, result);
145 // WatcherThreadManager --------------------------------------------------------
147 // WatcherThreadManager manages the background thread that listens for handles
148 // to be ready. All requests are handled by WatcherBackend.
149 class WatcherThreadManager {
150 public:
151 // Returns the shared instance.
152 static WatcherThreadManager* GetInstance();
154 // Starts watching the requested handle. Returns a unique ID that is used to
155 // stop watching the handle. When the handle is ready |callback| is notified
156 // on the thread StartWatching() was invoked on.
157 // This may be invoked on any thread.
158 WatcherID StartWatching(const Handle& handle,
159 MojoWaitFlags wait_flags,
160 base::TimeTicks deadline,
161 const base::Callback<void(MojoResult)>& callback);
163 // Stops watching a handle.
164 // This may be invoked on any thread.
165 void StopWatching(WatcherID watcher_id);
167 private:
168 friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
170 WatcherThreadManager();
171 ~WatcherThreadManager();
173 base::Thread thread_;
175 base::AtomicSequenceNumber watcher_id_generator_;
177 WatcherBackend backend_;
179 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
182 WatcherThreadManager* WatcherThreadManager::GetInstance() {
183 static base::LazyInstance<WatcherThreadManager> instance =
184 LAZY_INSTANCE_INITIALIZER;
185 return &instance.Get();
188 WatcherID WatcherThreadManager::StartWatching(
189 const Handle& handle,
190 MojoWaitFlags wait_flags,
191 base::TimeTicks deadline,
192 const base::Callback<void(MojoResult)>& callback) {
193 WatchData data;
194 data.id = watcher_id_generator_.GetNext();
195 data.handle = handle;
196 data.callback = callback;
197 data.wait_flags = wait_flags;
198 data.deadline = deadline;
199 data.message_loop = base::MessageLoopProxy::current();
200 DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), data.message_loop);
201 // We outlive |thread_|, so it's safe to use Unretained() here.
202 thread_.message_loop()->PostTask(
203 FROM_HERE,
204 base::Bind(&WatcherBackend::StartWatching,
205 base::Unretained(&backend_),
206 data));
207 return data.id;
210 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
211 // We outlive |thread_|, so it's safe to use Unretained() here.
212 thread_.message_loop()->PostTask(
213 FROM_HERE,
214 base::Bind(&WatcherBackend::StopWatching,
215 base::Unretained(&backend_),
216 watcher_id));
219 WatcherThreadManager::WatcherThreadManager()
220 : thread_(kWatcherThreadName) {
221 base::Thread::Options thread_options;
222 thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
223 thread_.StartWithOptions(thread_options);
226 WatcherThreadManager::~WatcherThreadManager() {
227 thread_.Stop();
230 } // namespace
232 // HandleWatcher::StartState ---------------------------------------------------
234 // Contains the information passed to Start().
235 struct HandleWatcher::StartState {
236 explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
239 ~StartState() {
242 // ID assigned by WatcherThreadManager.
243 WatcherID watcher_id;
245 // Callback to notify when done.
246 base::Callback<void(MojoResult)> callback;
248 // When Start() is invoked a callback is passed to WatcherThreadManager
249 // using a WeakRef from |weak_refactory_|. The callback invokes
250 // OnHandleReady() (on the thread Start() is invoked from) which in turn
251 // notifies |callback_|. Doing this allows us to reset state when the handle
252 // is ready, and then notify the callback. Doing this also means Stop()
253 // cancels any pending callbacks that may be inflight.
254 base::WeakPtrFactory<HandleWatcher> weak_factory;
257 // HandleWatcher ---------------------------------------------------------------
259 // static
260 base::TickClock* HandleWatcher::tick_clock_ = NULL;
262 HandleWatcher::HandleWatcher() {
265 HandleWatcher::~HandleWatcher() {
266 Stop();
269 void HandleWatcher::Start(const Handle& handle,
270 MojoWaitFlags wait_flags,
271 MojoDeadline deadline,
272 const base::Callback<void(MojoResult)>& callback) {
273 DCHECK(handle.is_valid());
274 DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
276 Stop();
278 start_state_.reset(new StartState(this));
279 start_state_->callback = callback;
280 start_state_->watcher_id =
281 WatcherThreadManager::GetInstance()->StartWatching(
282 handle,
283 wait_flags,
284 MojoDeadlineToTimeTicks(deadline),
285 base::Bind(&HandleWatcher::OnHandleReady,
286 start_state_->weak_factory.GetWeakPtr()));
289 void HandleWatcher::Stop() {
290 if (!start_state_.get())
291 return;
293 scoped_ptr<StartState> old_state(start_state_.Pass());
294 WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
297 void HandleWatcher::OnHandleReady(MojoResult result) {
298 DCHECK(start_state_.get());
299 scoped_ptr<StartState> old_state(start_state_.Pass());
300 old_state->callback.Run(result);
302 // NOTE: We may have been deleted during callback execution.
305 // static
306 base::TimeTicks HandleWatcher::NowTicks() {
307 return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
310 // static
311 base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
312 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
313 NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
316 } // namespace common
317 } // namespace mojo