1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/message_pump/handle_watcher.h"
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/macros.h"
14 #include "base/memory/singleton.h"
15 #include "base/memory/weak_ptr.h"
16 #include "base/message_loop/message_loop.h"
17 #include "base/single_thread_task_runner.h"
18 #include "base/synchronization/lock.h"
19 #include "base/synchronization/waitable_event.h"
20 #include "base/thread_task_runner_handle.h"
21 #include "base/threading/thread.h"
22 #include "base/threading/thread_restrictions.h"
23 #include "base/time/time.h"
24 #include "mojo/message_pump/message_pump_mojo.h"
25 #include "mojo/message_pump/message_pump_mojo_handler.h"
26 #include "mojo/message_pump/time_helper.h"
31 typedef int WatcherID
;
35 const char kWatcherThreadName
[] = "handle-watcher-thread";
37 base::TimeTicks
MojoDeadlineToTimeTicks(MojoDeadline deadline
) {
38 return deadline
== MOJO_DEADLINE_INDEFINITE
? base::TimeTicks() :
39 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline
);
42 // Tracks the data for a single call to Start().
45 : id(0), handle_signals(MOJO_HANDLE_SIGNAL_NONE
), task_runner(NULL
) {}
49 MojoHandleSignals handle_signals
;
50 base::TimeTicks deadline
;
51 base::Callback
<void(MojoResult
)> callback
;
52 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
;
55 // WatcherBackend --------------------------------------------------------------
57 // WatcherBackend is responsible for managing the requests and interacting with
58 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
59 // thread WatcherThreadManager creates.
60 class WatcherBackend
: public MessagePumpMojoHandler
{
63 ~WatcherBackend() override
;
65 void StartWatching(const WatchData
& data
);
67 // Cancels a previously scheduled request to start a watch.
68 void StopWatching(WatcherID watcher_id
);
71 typedef std::map
<Handle
, WatchData
> HandleToWatchDataMap
;
73 // Invoked when a handle needs to be removed and notified.
74 void RemoveAndNotify(const Handle
& handle
, MojoResult result
);
76 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
77 // and sets |handle| to the Handle. Returns false if not a known id.
78 bool GetMojoHandleByWatcherID(WatcherID watcher_id
, Handle
* handle
) const;
80 // MessagePumpMojoHandler overrides:
81 void OnHandleReady(const Handle
& handle
) override
;
82 void OnHandleError(const Handle
& handle
, MojoResult result
) override
;
84 // Maps from assigned id to WatchData.
85 HandleToWatchDataMap handle_to_data_
;
87 DISALLOW_COPY_AND_ASSIGN(WatcherBackend
);
90 WatcherBackend::WatcherBackend() {
93 WatcherBackend::~WatcherBackend() {
96 void WatcherBackend::StartWatching(const WatchData
& data
) {
97 RemoveAndNotify(data
.handle
, MOJO_RESULT_CANCELLED
);
99 DCHECK_EQ(0u, handle_to_data_
.count(data
.handle
));
101 handle_to_data_
[data
.handle
] = data
;
102 MessagePumpMojo::current()->AddHandler(this, data
.handle
,
107 void WatcherBackend::StopWatching(WatcherID watcher_id
) {
108 // Because of the thread hop it is entirely possible to get here and not
109 // have a valid handle registered for |watcher_id|.
111 if (GetMojoHandleByWatcherID(watcher_id
, &handle
)) {
112 handle_to_data_
.erase(handle
);
113 MessagePumpMojo::current()->RemoveHandler(handle
);
117 void WatcherBackend::RemoveAndNotify(const Handle
& handle
,
119 if (handle_to_data_
.count(handle
) == 0)
122 const WatchData
data(handle_to_data_
[handle
]);
123 handle_to_data_
.erase(handle
);
124 MessagePumpMojo::current()->RemoveHandler(handle
);
126 data
.task_runner
->PostTask(FROM_HERE
, base::Bind(data
.callback
, result
));
129 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id
,
130 Handle
* handle
) const {
131 for (HandleToWatchDataMap::const_iterator i
= handle_to_data_
.begin();
132 i
!= handle_to_data_
.end(); ++i
) {
133 if (i
->second
.id
== watcher_id
) {
134 *handle
= i
->second
.handle
;
141 void WatcherBackend::OnHandleReady(const Handle
& handle
) {
142 RemoveAndNotify(handle
, MOJO_RESULT_OK
);
145 void WatcherBackend::OnHandleError(const Handle
& handle
, MojoResult result
) {
146 RemoveAndNotify(handle
, result
);
149 // WatcherThreadManager --------------------------------------------------------
151 // WatcherThreadManager manages the background thread that listens for handles
152 // to be ready. All requests are handled by WatcherBackend.
155 class WatcherThreadManager
{
157 ~WatcherThreadManager();
159 // Returns the shared instance.
160 static WatcherThreadManager
* GetInstance();
162 // Starts watching the requested handle. Returns a unique ID that is used to
163 // stop watching the handle. When the handle is ready |callback| is notified
164 // on the thread StartWatching() was invoked on.
165 // This may be invoked on any thread.
166 WatcherID
StartWatching(const Handle
& handle
,
167 MojoHandleSignals handle_signals
,
168 base::TimeTicks deadline
,
169 const base::Callback
<void(MojoResult
)>& callback
);
171 // Stops watching a handle.
172 // This may be invoked on any thread.
173 void StopWatching(WatcherID watcher_id
);
181 // See description of |requests_| for details.
183 RequestData() : type(REQUEST_START
), stop_id(0), stop_event(NULL
) {}
186 WatchData start_data
;
188 base::WaitableEvent
* stop_event
;
191 typedef std::vector
<RequestData
> Requests
;
193 friend struct base::DefaultSingletonTraits
<WatcherThreadManager
>;
195 WatcherThreadManager();
197 // Schedules a request on the background thread. See |requests_| for details.
198 void AddRequest(const RequestData
& data
);
200 // Processes requests added to |requests_|. This is invoked on the backend
202 void ProcessRequestsOnBackendThread();
204 base::Thread thread_
;
206 base::AtomicSequenceNumber watcher_id_generator_
;
208 WatcherBackend backend_
;
210 // Protects |requests_|.
213 // Start/Stop result in adding a RequestData to |requests_| (protected by
214 // |lock_|). When the background thread wakes up it processes the requests.
217 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager
);
220 WatcherThreadManager::~WatcherThreadManager() {
224 WatcherThreadManager
* WatcherThreadManager::GetInstance() {
225 return base::Singleton
<WatcherThreadManager
>::get();
228 WatcherID
WatcherThreadManager::StartWatching(
229 const Handle
& handle
,
230 MojoHandleSignals handle_signals
,
231 base::TimeTicks deadline
,
232 const base::Callback
<void(MojoResult
)>& callback
) {
233 RequestData request_data
;
234 request_data
.type
= REQUEST_START
;
235 request_data
.start_data
.id
= watcher_id_generator_
.GetNext();
236 request_data
.start_data
.handle
= handle
;
237 request_data
.start_data
.callback
= callback
;
238 request_data
.start_data
.handle_signals
= handle_signals
;
239 request_data
.start_data
.deadline
= deadline
;
240 request_data
.start_data
.task_runner
= base::ThreadTaskRunnerHandle::Get();
241 AddRequest(request_data
);
242 return request_data
.start_data
.id
;
245 void WatcherThreadManager::StopWatching(WatcherID watcher_id
) {
246 // Handle the case of StartWatching() followed by StopWatching() before
247 // |thread_| woke up.
249 base::AutoLock
auto_lock(lock_
);
250 for (Requests::iterator i
= requests_
.begin(); i
!= requests_
.end(); ++i
) {
251 if (i
->type
== REQUEST_START
&& i
->start_data
.id
== watcher_id
) {
252 // Watcher ids are not reused, so if we find it we can stop.
259 base::ThreadRestrictions::ScopedAllowWait allow_wait
;
260 base::WaitableEvent
event(true, false);
261 RequestData request_data
;
262 request_data
.type
= REQUEST_STOP
;
263 request_data
.stop_id
= watcher_id
;
264 request_data
.stop_event
= &event
;
265 AddRequest(request_data
);
267 // We need to block until the handle is actually removed.
271 void WatcherThreadManager::AddRequest(const RequestData
& data
) {
273 base::AutoLock
auto_lock(lock_
);
274 const bool was_empty
= requests_
.empty();
275 requests_
.push_back(data
);
279 // We own |thread_|, so it's safe to use Unretained() here.
280 thread_
.task_runner()->PostTask(
282 base::Bind(&WatcherThreadManager::ProcessRequestsOnBackendThread
,
283 base::Unretained(this)));
286 void WatcherThreadManager::ProcessRequestsOnBackendThread() {
287 DCHECK_EQ(thread_
.message_loop(), base::MessageLoop::current());
291 base::AutoLock
auto_lock(lock_
);
292 requests_
.swap(requests
);
294 for (size_t i
= 0; i
< requests
.size(); ++i
) {
295 if (requests
[i
].type
== REQUEST_START
) {
296 backend_
.StartWatching(requests
[i
].start_data
);
298 backend_
.StopWatching(requests
[i
].stop_id
);
299 requests
[i
].stop_event
->Signal();
304 WatcherThreadManager::WatcherThreadManager()
305 : thread_(kWatcherThreadName
) {
306 base::Thread::Options thread_options
;
307 thread_options
.message_pump_factory
= base::Bind(&MessagePumpMojo::Create
);
308 thread_
.StartWithOptions(thread_options
);
311 // HandleWatcher::StateBase and subclasses -------------------------------------
313 // The base class of HandleWatcher's state. Owns the user's callback and
314 // monitors the current thread's MessageLoop to know when to force the callback
315 // to run (with an error) even though the pipe hasn't been signaled yet.
316 class HandleWatcher::StateBase
: public base::MessageLoop::DestructionObserver
{
318 StateBase(HandleWatcher
* watcher
,
319 const base::Callback
<void(MojoResult
)>& callback
)
323 base::MessageLoop::current()->AddDestructionObserver(this);
326 ~StateBase() override
{
327 base::MessageLoop::current()->RemoveDestructionObserver(this);
331 void NotifyHandleReady(MojoResult result
) {
333 NotifyAndDestroy(result
);
336 bool got_ready() const { return got_ready_
; }
339 void WillDestroyCurrentMessageLoop() override
{
340 // The current thread is exiting. Simulate a watch error.
341 NotifyAndDestroy(MOJO_RESULT_ABORTED
);
344 void NotifyAndDestroy(MojoResult result
) {
345 base::Callback
<void(MojoResult
)> callback
= callback_
;
346 watcher_
->Stop(); // Destroys |this|.
348 callback
.Run(result
);
351 HandleWatcher
* watcher_
;
352 base::Callback
<void(MojoResult
)> callback_
;
354 // Have we been notified that the handle is ready?
357 DISALLOW_COPY_AND_ASSIGN(StateBase
);
360 // If the thread on which HandleWatcher is used runs MessagePumpMojo,
361 // SameThreadWatchingState is used to directly watch the handle on the same
363 class HandleWatcher::SameThreadWatchingState
: public StateBase
,
364 public MessagePumpMojoHandler
{
366 SameThreadWatchingState(HandleWatcher
* watcher
,
367 const Handle
& handle
,
368 MojoHandleSignals handle_signals
,
369 MojoDeadline deadline
,
370 const base::Callback
<void(MojoResult
)>& callback
)
371 : StateBase(watcher
, callback
),
373 DCHECK(MessagePumpMojo::IsCurrent());
375 MessagePumpMojo::current()->AddHandler(
376 this, handle
, handle_signals
, MojoDeadlineToTimeTicks(deadline
));
379 ~SameThreadWatchingState() override
{
381 MessagePumpMojo::current()->RemoveHandler(handle_
);
385 // MessagePumpMojoHandler overrides:
386 void OnHandleReady(const Handle
& handle
) override
{
387 StopWatchingAndNotifyReady(handle
, MOJO_RESULT_OK
);
390 void OnHandleError(const Handle
& handle
, MojoResult result
) override
{
391 StopWatchingAndNotifyReady(handle
, result
);
394 void StopWatchingAndNotifyReady(const Handle
& handle
, MojoResult result
) {
395 DCHECK_EQ(handle
.value(), handle_
.value());
396 MessagePumpMojo::current()->RemoveHandler(handle_
);
397 NotifyHandleReady(result
);
402 DISALLOW_COPY_AND_ASSIGN(SameThreadWatchingState
);
405 // If the thread on which HandleWatcher is used runs a message pump different
406 // from MessagePumpMojo, SecondaryThreadWatchingState is used to watch the
407 // handle on the handle watcher thread.
408 class HandleWatcher::SecondaryThreadWatchingState
: public StateBase
{
410 SecondaryThreadWatchingState(HandleWatcher
* watcher
,
411 const Handle
& handle
,
412 MojoHandleSignals handle_signals
,
413 MojoDeadline deadline
,
414 const base::Callback
<void(MojoResult
)>& callback
)
415 : StateBase(watcher
, callback
),
416 weak_factory_(this) {
417 watcher_id_
= WatcherThreadManager::GetInstance()->StartWatching(
420 MojoDeadlineToTimeTicks(deadline
),
421 base::Bind(&SecondaryThreadWatchingState::NotifyHandleReady
,
422 weak_factory_
.GetWeakPtr()));
425 ~SecondaryThreadWatchingState() override
{
426 // If we've been notified the handle is ready (|got_ready()| is true) then
427 // the watch has been implicitly removed by
428 // WatcherThreadManager/MessagePumpMojo and we don't have to call
429 // StopWatching(). To do so would needlessly entail posting a task and
430 // blocking until the background thread services it.
432 WatcherThreadManager::GetInstance()->StopWatching(watcher_id_
);
436 WatcherID watcher_id_
;
438 // Used to weakly bind |this| to the WatcherThreadManager.
439 base::WeakPtrFactory
<SecondaryThreadWatchingState
> weak_factory_
;
441 DISALLOW_COPY_AND_ASSIGN(SecondaryThreadWatchingState
);
444 // HandleWatcher ---------------------------------------------------------------
446 HandleWatcher::HandleWatcher() {
449 HandleWatcher::~HandleWatcher() {
452 void HandleWatcher::Start(const Handle
& handle
,
453 MojoHandleSignals handle_signals
,
454 MojoDeadline deadline
,
455 const base::Callback
<void(MojoResult
)>& callback
) {
456 DCHECK(handle
.is_valid());
457 DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE
, handle_signals
);
459 // Need to clear the state before creating a new one.
461 if (MessagePumpMojo::IsCurrent()) {
462 state_
.reset(new SameThreadWatchingState(
463 this, handle
, handle_signals
, deadline
, callback
));
465 state_
.reset(new SecondaryThreadWatchingState(
466 this, handle
, handle_signals
, deadline
, callback
));
470 void HandleWatcher::Stop() {
474 } // namespace common