1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/handle_watcher.h"
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/message_loop/message_loop_proxy.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/threading/thread.h"
21 #include "base/threading/thread_restrictions.h"
22 #include "base/time/time.h"
23 #include "mojo/common/message_pump_mojo.h"
24 #include "mojo/common/message_pump_mojo_handler.h"
25 #include "mojo/common/time_helper.h"
30 typedef int WatcherID
;
34 const char kWatcherThreadName
[] = "handle-watcher-thread";
36 base::TimeTicks
MojoDeadlineToTimeTicks(MojoDeadline deadline
) {
37 return deadline
== MOJO_DEADLINE_INDEFINITE
? base::TimeTicks() :
38 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline
);
41 // Tracks the data for a single call to Start().
45 handle_signals(MOJO_HANDLE_SIGNAL_NONE
),
50 MojoHandleSignals handle_signals
;
51 base::TimeTicks deadline
;
52 base::Callback
<void(MojoResult
)> callback
;
53 scoped_refptr
<base::MessageLoopProxy
> message_loop
;
56 // WatcherBackend --------------------------------------------------------------
58 // WatcherBackend is responsible for managing the requests and interacting with
59 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
60 // thread WatcherThreadManager creates.
61 class WatcherBackend
: public MessagePumpMojoHandler
{
64 ~WatcherBackend() override
;
66 void StartWatching(const WatchData
& data
);
68 // Cancels a previously scheduled request to start a watch.
69 void StopWatching(WatcherID watcher_id
);
72 typedef std::map
<Handle
, WatchData
> HandleToWatchDataMap
;
74 // Invoked when a handle needs to be removed and notified.
75 void RemoveAndNotify(const Handle
& handle
, MojoResult result
);
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78 // and sets |handle| to the Handle. Returns false if not a known id.
79 bool GetMojoHandleByWatcherID(WatcherID watcher_id
, Handle
* handle
) const;
81 // MessagePumpMojoHandler overrides:
82 void OnHandleReady(const Handle
& handle
) override
;
83 void OnHandleError(const Handle
& handle
, MojoResult result
) override
;
85 // Maps from assigned id to WatchData.
86 HandleToWatchDataMap handle_to_data_
;
88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend
);
91 WatcherBackend::WatcherBackend() {
94 WatcherBackend::~WatcherBackend() {
97 void WatcherBackend::StartWatching(const WatchData
& data
) {
98 RemoveAndNotify(data
.handle
, MOJO_RESULT_CANCELLED
);
100 DCHECK_EQ(0u, handle_to_data_
.count(data
.handle
));
102 handle_to_data_
[data
.handle
] = data
;
103 MessagePumpMojo::current()->AddHandler(this, data
.handle
,
108 void WatcherBackend::StopWatching(WatcherID watcher_id
) {
109 // Because of the thread hop it is entirely possible to get here and not
110 // have a valid handle registered for |watcher_id|.
112 if (GetMojoHandleByWatcherID(watcher_id
, &handle
)) {
113 handle_to_data_
.erase(handle
);
114 MessagePumpMojo::current()->RemoveHandler(handle
);
118 void WatcherBackend::RemoveAndNotify(const Handle
& handle
,
120 if (handle_to_data_
.count(handle
) == 0)
123 const WatchData
data(handle_to_data_
[handle
]);
124 handle_to_data_
.erase(handle
);
125 MessagePumpMojo::current()->RemoveHandler(handle
);
127 data
.message_loop
->PostTask(FROM_HERE
, base::Bind(data
.callback
, result
));
130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id
,
131 Handle
* handle
) const {
132 for (HandleToWatchDataMap::const_iterator i
= handle_to_data_
.begin();
133 i
!= handle_to_data_
.end(); ++i
) {
134 if (i
->second
.id
== watcher_id
) {
135 *handle
= i
->second
.handle
;
142 void WatcherBackend::OnHandleReady(const Handle
& handle
) {
143 RemoveAndNotify(handle
, MOJO_RESULT_OK
);
146 void WatcherBackend::OnHandleError(const Handle
& handle
, MojoResult result
) {
147 RemoveAndNotify(handle
, result
);
150 // WatcherThreadManager --------------------------------------------------------
152 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend.
156 class WatcherThreadManager
{
158 ~WatcherThreadManager();
160 // Returns the shared instance.
161 static WatcherThreadManager
* GetInstance();
163 // Starts watching the requested handle. Returns a unique ID that is used to
164 // stop watching the handle. When the handle is ready |callback| is notified
165 // on the thread StartWatching() was invoked on.
166 // This may be invoked on any thread.
167 WatcherID
StartWatching(const Handle
& handle
,
168 MojoHandleSignals handle_signals
,
169 base::TimeTicks deadline
,
170 const base::Callback
<void(MojoResult
)>& callback
);
172 // Stops watching a handle.
173 // This may be invoked on any thread.
174 void StopWatching(WatcherID watcher_id
);
182 // See description of |requests_| for details.
184 RequestData() : type(REQUEST_START
), stop_id(0), stop_event(NULL
) {}
187 WatchData start_data
;
189 base::WaitableEvent
* stop_event
;
192 typedef std::vector
<RequestData
> Requests
;
194 friend struct DefaultSingletonTraits
<WatcherThreadManager
>;
196 WatcherThreadManager();
198 // Schedules a request on the background thread. See |requests_| for details.
199 void AddRequest(const RequestData
& data
);
201 // Processes requests added to |requests_|. This is invoked on the backend
203 void ProcessRequestsOnBackendThread();
205 base::Thread thread_
;
207 base::AtomicSequenceNumber watcher_id_generator_
;
209 WatcherBackend backend_
;
211 // Protects |requests_|.
214 // Start/Stop result in adding a RequestData to |requests_| (protected by
215 // |lock_|). When the background thread wakes up it processes the requests.
218 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager
);
221 WatcherThreadManager::~WatcherThreadManager() {
225 WatcherThreadManager
* WatcherThreadManager::GetInstance() {
226 return Singleton
<WatcherThreadManager
>::get();
229 WatcherID
WatcherThreadManager::StartWatching(
230 const Handle
& handle
,
231 MojoHandleSignals handle_signals
,
232 base::TimeTicks deadline
,
233 const base::Callback
<void(MojoResult
)>& callback
) {
234 RequestData request_data
;
235 request_data
.type
= REQUEST_START
;
236 request_data
.start_data
.id
= watcher_id_generator_
.GetNext();
237 request_data
.start_data
.handle
= handle
;
238 request_data
.start_data
.callback
= callback
;
239 request_data
.start_data
.handle_signals
= handle_signals
;
240 request_data
.start_data
.deadline
= deadline
;
241 request_data
.start_data
.message_loop
= base::MessageLoopProxy::current();
242 DCHECK_NE(static_cast<base::MessageLoopProxy
*>(NULL
),
243 request_data
.start_data
.message_loop
.get());
244 AddRequest(request_data
);
245 return request_data
.start_data
.id
;
248 void WatcherThreadManager::StopWatching(WatcherID watcher_id
) {
249 // Handle the case of StartWatching() followed by StopWatching() before
250 // |thread_| woke up.
252 base::AutoLock
auto_lock(lock_
);
253 for (Requests::iterator i
= requests_
.begin(); i
!= requests_
.end(); ++i
) {
254 if (i
->type
== REQUEST_START
&& i
->start_data
.id
== watcher_id
) {
255 // Watcher ids are not reused, so if we find it we can stop.
262 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
263 base::WaitableEvent
event(true, false);
264 RequestData request_data
;
265 request_data
.type
= REQUEST_STOP
;
266 request_data
.stop_id
= watcher_id
;
267 request_data
.stop_event
= &event
;
268 AddRequest(request_data
);
270 // We need to block until the handle is actually removed.
274 void WatcherThreadManager::AddRequest(const RequestData
& data
) {
276 base::AutoLock
auto_lock(lock_
);
277 const bool was_empty
= requests_
.empty();
278 requests_
.push_back(data
);
282 // We own |thread_|, so it's safe to use Unretained() here.
283 thread_
.message_loop()->PostTask(
285 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread
,
286 base::Unretained(this)));
289 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
290 DCHECK_EQ(thread_
.message_loop(), base::MessageLoop::current());
294 base::AutoLock
auto_lock(lock_
);
295 requests_
.swap(requests
);
297 for (size_t i
= 0; i
< requests
.size(); ++i
) {
298 if (requests
[i
].type
== REQUEST_START
) {
299 backend_
.StartWatching(requests
[i
].start_data
);
301 backend_
.StopWatching(requests
[i
].stop_id
);
302 requests
[i
].stop_event
->Signal();
307 WatcherThreadManager::WatcherThreadManager()
308 : thread_(kWatcherThreadName
) {
309 base::Thread::Options thread_options
;
310 thread_options
.message_pump_factory
= base::Bind(&MessagePumpMojo::Create
);
311 thread_
.StartWithOptions(thread_options
);
314 // HandleWatcher::StateBase and subclasses -------------------------------------
316 // The base class of HandleWatcher's state. Owns the user's callback and
317 // monitors the current thread's MessageLoop to know when to force the callback
318 // to run (with an error) even though the pipe hasn't been signaled yet.
319 class HandleWatcher::StateBase
: public base::MessageLoop::DestructionObserver
{
321 StateBase(HandleWatcher
* watcher
,
322 const base::Callback
<void(MojoResult
)>& callback
)
326 base::MessageLoop::current()->AddDestructionObserver(this);
329 ~StateBase() override
{
330 base::MessageLoop::current()->RemoveDestructionObserver(this);
334 void NotifyHandleReady(MojoResult result
) {
336 NotifyAndDestroy(result
);
339 bool got_ready() const { return got_ready_
; }
342 void WillDestroyCurrentMessageLoop() override
{
343 // The current thread is exiting. Simulate a watch error.
344 NotifyAndDestroy(MOJO_RESULT_ABORTED
);
347 void NotifyAndDestroy(MojoResult result
) {
348 base::Callback
<void(MojoResult
)> callback
= callback_
;
349 watcher_
->Stop(); // Destroys |this|.
351 callback
.Run(result
);
354 HandleWatcher
* watcher_
;
355 base::Callback
<void(MojoResult
)> callback_
;
357 // Have we been notified that the handle is ready?
360 DISALLOW_COPY_AND_ASSIGN(StateBase
);
363 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
364 // SameThreadWatchingState is used to directly watch the handle on the same
366 class HandleWatcher::SameThreadWatchingState
: public StateBase
,
367 public MessagePumpMojoHandler
{
369 SameThreadWatchingState(HandleWatcher
* watcher
,
370 const Handle
& handle
,
371 MojoHandleSignals handle_signals
,
372 MojoDeadline deadline
,
373 const base::Callback
<void(MojoResult
)>& callback
)
374 : StateBase(watcher
, callback
),
376 DCHECK(MessagePumpMojo::IsCurrent());
378 MessagePumpMojo::current()->AddHandler(
379 this, handle
, handle_signals
, MojoDeadlineToTimeTicks(deadline
));
382 ~SameThreadWatchingState() override
{
384 MessagePumpMojo::current()->RemoveHandler(handle_
);
388 // MessagePumpMojoHandler overrides:
389 void OnHandleReady(const Handle
& handle
) override
{
390 StopWatchingAndNotifyReady(handle
, MOJO_RESULT_OK
);
393 void OnHandleError(const Handle
& handle
, MojoResult result
) override
{
394 StopWatchingAndNotifyReady(handle
, result
);
397 void StopWatchingAndNotifyReady(const Handle
& handle
, MojoResult result
) {
398 DCHECK_EQ(handle
.value(), handle_
.value());
399 MessagePumpMojo::current()->RemoveHandler(handle_
);
400 NotifyHandleReady(result
);
405 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState
);
408 // If the thread on which HandleWatcher is used runs a message pump different
409 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
410 // handle on the handle watcher thread.
411 class HandleWatcher::SecondaryThreadWatchingState
: public StateBase
{
413 SecondaryThreadWatchingState(HandleWatcher
* watcher
,
414 const Handle
& handle
,
415 MojoHandleSignals handle_signals
,
416 MojoDeadline deadline
,
417 const base::Callback
<void(MojoResult
)>& callback
)
418 : StateBase(watcher
, callback
),
419 weak_factory_(this) {
420 watcher_id_
= WatcherThreadManager::GetInstance()->StartWatching(
423 MojoDeadlineToTimeTicks(deadline
),
424 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady
,
425 weak_factory_
.GetWeakPtr()));
428 ~SecondaryThreadWatchingState() override
{
429 // If we've been notified the handle is ready (|got_ready()| is true) then
430 // the watch has been implicitly removed by
431 // WatcherThreadManager/MessagePumpMojo and we don't have to call
432 // StopWatching(). To do so would needlessly entail posting a task and
433 // blocking until the background thread services it.
435 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_
);
439 WatcherID watcher_id_
;
441 // Used to weakly bind |this| to the WatcherThreadManager.
442 base::WeakPtrFactory
<SecondaryThreadWatchingState
> weak_factory_
;
444 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState
);
447 // HandleWatcher ---------------------------------------------------------------
449 HandleWatcher::HandleWatcher() {
452 HandleWatcher::~HandleWatcher() {
455 void HandleWatcher::Start(const Handle
& handle
,
456 MojoHandleSignals handle_signals
,
457 MojoDeadline deadline
,
458 const base::Callback
<void(MojoResult
)>& callback
) {
459 DCHECK(handle
.is_valid());
460 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE
, handle_signals
);
462 if (MessagePumpMojo::IsCurrent()) {
463 state_
.reset(new SameThreadWatchingState(
464 this, handle
, handle_signals
, deadline
, callback
));
466 state_
.reset(new SecondaryThreadWatchingState(
467 this, handle
, handle_signals
, deadline
, callback
));
471 void HandleWatcher::Stop() {
475 } // namespace common