Disable rei.com on oopif benchmarks.
[chromium-blink-merge.git] / mojo / message_pump / handle_watcher.cc
blob964f1e2d3348d80b1edef376538ce2eac0862639
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/message_pump/handle_watcher.h"
7 #include <map>
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/thread_task_runner_handle.h"
21 #include "base/threading/thread.h"
22 #include "base/threading/thread_restrictions.h"
23 #include "base/time/time.h"
24 #include "mojo/message_pump/message_pump_mojo.h"
25 #include "mojo/message_pump/message_pump_mojo_handler.h"
26 #include "mojo/message_pump/time_helper.h"
28 namespace mojo {
29 namespace common {
31 typedef int WatcherID;
33 namespace {
35 const char kWatcherThreadName[] = "handle-watcher-thread";
37 base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
38 return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
42 // Tracks the data for a single call to Start().
43 struct WatchData {
44 WatchData()
45 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE), task_runner(NULL) {}
47 WatcherID id;
48 Handle handle;
49 MojoHandleSignals handle_signals;
50 base::TimeTicks deadline;
51 base::Callback<void(MojoResult)> callback;
52 scoped_refptr<base::SingleThreadTaskRunner> task_runner;
55 // WatcherBackend --------------------------------------------------------------
57 // WatcherBackend is responsible for managing the requests and interacting with
58 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
59 // thread WatcherThreadManager creates.
60 class WatcherBackend : public MessagePumpMojoHandler {
61 public:
62 WatcherBackend();
63 ~WatcherBackend() override;
65 void StartWatching(const WatchData& data);
67 // Cancels a previously scheduled request to start a watch.
68 void StopWatching(WatcherID watcher_id);
70 private:
71 typedef std::map<Handle, WatchData> HandleToWatchDataMap;
73 // Invoked when a handle needs to be removed and notified.
74 void RemoveAndNotify(const Handle& handle, MojoResult result);
76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
77 // and sets |handle| to the Handle. Returns false if not a known id.
78 bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
80 // MessagePumpMojoHandler overrides:
81 void OnHandleReady(const Handle& handle) override;
82 void OnHandleError(const Handle& handle, MojoResult result) override;
84 // Maps from assigned id to WatchData.
85 HandleToWatchDataMap handle_to_data_;
87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
90 WatcherBackend::WatcherBackend() {
93 WatcherBackend::~WatcherBackend() {
96 void WatcherBackend::StartWatching(const WatchData& data) {
97 RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
99 DCHECK_EQ(0u, handle_to_data_.count(data.handle));
101 handle_to_data_[data.handle] = data;
102 MessagePumpMojo::current()->AddHandler(this, data.handle,
103 data.handle_signals,
104 data.deadline);
107 void WatcherBackend::StopWatching(WatcherID watcher_id) {
108 // Because of the thread hop it is entirely possible to get here and not
109 // have a valid handle registered for |watcher_id|.
110 Handle handle;
111 if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
112 handle_to_data_.erase(handle);
113 MessagePumpMojo::current()->RemoveHandler(handle);
117 void WatcherBackend::RemoveAndNotify(const Handle& handle,
118 MojoResult result) {
119 if (handle_to_data_.count(handle) == 0)
120 return;
122 const WatchData data(handle_to_data_[handle]);
123 handle_to_data_.erase(handle);
124 MessagePumpMojo::current()->RemoveHandler(handle);
126 data.task_runner->PostTask(FROM_HERE, base::Bind(data.callback, result));
129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
130 Handle* handle) const {
131 for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
132 i != handle_to_data_.end(); ++i) {
133 if (i->second.id == watcher_id) {
134 *handle = i->second.handle;
135 return true;
138 return false;
141 void WatcherBackend::OnHandleReady(const Handle& handle) {
142 RemoveAndNotify(handle, MOJO_RESULT_OK);
145 void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
146 RemoveAndNotify(handle, result);
149 // WatcherThreadManager --------------------------------------------------------
151 // WatcherThreadManager manages the background thread that listens for handles
152 // to be ready. All requests are handled by WatcherBackend.
153 } // namespace
155 class WatcherThreadManager {
156 public:
157 ~WatcherThreadManager();
159 // Returns the shared instance.
160 static WatcherThreadManager* GetInstance();
162 // Starts watching the requested handle. Returns a unique ID that is used to
163 // stop watching the handle. When the handle is ready |callback| is notified
164 // on the thread StartWatching() was invoked on.
165 // This may be invoked on any thread.
166 WatcherID StartWatching(const Handle& handle,
167 MojoHandleSignals handle_signals,
168 base::TimeTicks deadline,
169 const base::Callback<void(MojoResult)>& callback);
171 // Stops watching a handle.
172 // This may be invoked on any thread.
173 void StopWatching(WatcherID watcher_id);
175 private:
176 enum RequestType {
177 REQUEST_START,
178 REQUEST_STOP,
181 // See description of |requests_| for details.
182 struct RequestData {
183 RequestData() : type(REQUEST_START), stop_id(0), stop_event(NULL) {}
185 RequestType type;
186 WatchData start_data;
187 WatcherID stop_id;
188 base::WaitableEvent* stop_event;
191 typedef std::vector<RequestData> Requests;
193 friend struct base::DefaultSingletonTraits<WatcherThreadManager>;
195 WatcherThreadManager();
197 // Schedules a request on the background thread. See |requests_| for details.
198 void AddRequest(const RequestData& data);
200 // Processes requests added to |requests_|. This is invoked on the backend
201 // thread.
202 void ProcessRequestsOnBackendThread();
204 base::Thread thread_;
206 base::AtomicSequenceNumber watcher_id_generator_;
208 WatcherBackend backend_;
210 // Protects |requests_|.
211 base::Lock lock_;
213 // Start/Stop result in adding a RequestData to |requests_| (protected by
214 // |lock_|). When the background thread wakes up it processes the requests.
215 Requests requests_;
217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
220 WatcherThreadManager::~WatcherThreadManager() {
221 thread_.Stop();
224 WatcherThreadManager* WatcherThreadManager::GetInstance() {
225 return base::Singleton<WatcherThreadManager>::get();
228 WatcherID WatcherThreadManager::StartWatching(
229 const Handle& handle,
230 MojoHandleSignals handle_signals,
231 base::TimeTicks deadline,
232 const base::Callback<void(MojoResult)>& callback) {
233 RequestData request_data;
234 request_data.type = REQUEST_START;
235 request_data.start_data.id = watcher_id_generator_.GetNext();
236 request_data.start_data.handle = handle;
237 request_data.start_data.callback = callback;
238 request_data.start_data.handle_signals = handle_signals;
239 request_data.start_data.deadline = deadline;
240 request_data.start_data.task_runner = base::ThreadTaskRunnerHandle::Get();
241 AddRequest(request_data);
242 return request_data.start_data.id;
245 void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
246 // Handle the case of StartWatching() followed by StopWatching() before
247 // |thread_| woke up.
249 base::AutoLock auto_lock(lock_);
250 for (Requests::iterator i = requests_.begin(); i != requests_.end(); ++i) {
251 if (i->type == REQUEST_START && i->start_data.id == watcher_id) {
252 // Watcher ids are not reused, so if we find it we can stop.
253 requests_.erase(i);
254 return;
259 base::ThreadRestrictions::ScopedAllowWait allow_wait;
260 base::WaitableEvent event(true, false);
261 RequestData request_data;
262 request_data.type = REQUEST_STOP;
263 request_data.stop_id = watcher_id;
264 request_data.stop_event = &event;
265 AddRequest(request_data);
267 // We need to block until the handle is actually removed.
268 event.Wait();
271 void WatcherThreadManager::AddRequest(const RequestData& data) {
273 base::AutoLock auto_lock(lock_);
274 const bool was_empty = requests_.empty();
275 requests_.push_back(data);
276 if (!was_empty)
277 return;
279 // We own |thread_|, so it's safe to use Unretained() here.
280 thread_.task_runner()->PostTask(
281 FROM_HERE,
282 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread,
283 base::Unretained(this)));
286 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
287 DCHECK_EQ(thread_.message_loop(), base::MessageLoop::current());
289 Requests requests;
291 base::AutoLock auto_lock(lock_);
292 requests_.swap(requests);
294 for (size_t i = 0; i < requests.size(); ++i) {
295 if (requests[i].type == REQUEST_START) {
296 backend_.StartWatching(requests[i].start_data);
297 } else {
298 backend_.StopWatching(requests[i].stop_id);
299 requests[i].stop_event->Signal();
304 WatcherThreadManager::WatcherThreadManager()
305 : thread_(kWatcherThreadName) {
306 base::Thread::Options thread_options;
307 thread_options.message_pump_factory = base::Bind(&MessagePumpMojo::Create);
308 thread_.StartWithOptions(thread_options);
311 // HandleWatcher::StateBase and subclasses -------------------------------------
313 // The base class of HandleWatcher's state. Owns the user's callback and
314 // monitors the current thread's MessageLoop to know when to force the callback
315 // to run (with an error) even though the pipe hasn't been signaled yet.
316 class HandleWatcher::StateBase : public base::MessageLoop::DestructionObserver {
317 public:
318 StateBase(HandleWatcher* watcher,
319 const base::Callback<void(MojoResult)>& callback)
320 : watcher_(watcher),
321 callback_(callback),
322 got_ready_(false) {
323 base::MessageLoop::current()->AddDestructionObserver(this);
326 ~StateBase() override {
327 base::MessageLoop::current()->RemoveDestructionObserver(this);
330 protected:
331 void NotifyHandleReady(MojoResult result) {
332 got_ready_ = true;
333 NotifyAndDestroy(result);
336 bool got_ready() const { return got_ready_; }
338 private:
339 void WillDestroyCurrentMessageLoop() override {
340 // The current thread is exiting. Simulate a watch error.
341 NotifyAndDestroy(MOJO_RESULT_ABORTED);
344 void NotifyAndDestroy(MojoResult result) {
345 base::Callback<void(MojoResult)> callback = callback_;
346 watcher_->Stop(); // Destroys |this|.
348 callback.Run(result);
351 HandleWatcher* watcher_;
352 base::Callback<void(MojoResult)> callback_;
354 // Have we been notified that the handle is ready?
355 bool got_ready_;
357 DISALLOW_COPY_AND_ASSIGN(StateBase);
360 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
361 // SameThreadWatchingState is used to directly watch the handle on the same
362 // thread.
363 class HandleWatcher::SameThreadWatchingState : public StateBase,
364 public MessagePumpMojoHandler {
365 public:
366 SameThreadWatchingState(HandleWatcher* watcher,
367 const Handle& handle,
368 MojoHandleSignals handle_signals,
369 MojoDeadline deadline,
370 const base::Callback<void(MojoResult)>& callback)
371 : StateBase(watcher, callback),
372 handle_(handle) {
373 DCHECK(MessagePumpMojo::IsCurrent());
375 MessagePumpMojo::current()->AddHandler(
376 this, handle, handle_signals, MojoDeadlineToTimeTicks(deadline));
379 ~SameThreadWatchingState() override {
380 if (!got_ready())
381 MessagePumpMojo::current()->RemoveHandler(handle_);
384 private:
385 // MessagePumpMojoHandler overrides:
386 void OnHandleReady(const Handle& handle) override {
387 StopWatchingAndNotifyReady(handle, MOJO_RESULT_OK);
390 void OnHandleError(const Handle& handle, MojoResult result) override {
391 StopWatchingAndNotifyReady(handle, result);
394 void StopWatchingAndNotifyReady(const Handle& handle, MojoResult result) {
395 DCHECK_EQ(handle.value(), handle_.value());
396 MessagePumpMojo::current()->RemoveHandler(handle_);
397 NotifyHandleReady(result);
400 Handle handle_;
402 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState);
405 // If the thread on which HandleWatcher is used runs a message pump different
406 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
407 // handle on the handle watcher thread.
408 class HandleWatcher::SecondaryThreadWatchingState : public StateBase {
409 public:
410 SecondaryThreadWatchingState(HandleWatcher* watcher,
411 const Handle& handle,
412 MojoHandleSignals handle_signals,
413 MojoDeadline deadline,
414 const base::Callback<void(MojoResult)>& callback)
415 : StateBase(watcher, callback),
416 weak_factory_(this) {
417 watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
418 handle,
419 handle_signals,
420 MojoDeadlineToTimeTicks(deadline),
421 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady,
422 weak_factory_.GetWeakPtr()));
425 ~SecondaryThreadWatchingState() override {
426 // If we've been notified the handle is ready (|got_ready()| is true) then
427 // the watch has been implicitly removed by
428 // WatcherThreadManager/MessagePumpMojo and we don't have to call
429 // StopWatching(). To do so would needlessly entail posting a task and
430 // blocking until the background thread services it.
431 if (!got_ready())
432 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
435 private:
436 WatcherID watcher_id_;
438 // Used to weakly bind |this| to the WatcherThreadManager.
439 base::WeakPtrFactory<SecondaryThreadWatchingState> weak_factory_;
441 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState);
444 // HandleWatcher ---------------------------------------------------------------
446 HandleWatcher::HandleWatcher() {
449 HandleWatcher::~HandleWatcher() {
452 void HandleWatcher::Start(const Handle& handle,
453 MojoHandleSignals handle_signals,
454 MojoDeadline deadline,
455 const base::Callback<void(MojoResult)>& callback) {
456 DCHECK(handle.is_valid());
457 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
459 // Need to clear the state before creating a new one.
460 state_.reset();
461 if (MessagePumpMojo::IsCurrent()) {
462 state_.reset(new SameThreadWatchingState(
463 this, handle, handle_signals, deadline, callback));
464 } else {
465 state_.reset(new SecondaryThreadWatchingState(
466 this, handle, handle_signals, deadline, callback));
470 void HandleWatcher::Stop() {
471 state_.reset();
474 } // namespace common
475 } // namespace mojo