1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/handle_watcher.h"
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/threading/thread.h"
16 #include "base/time/tick_clock.h"
17 #include "base/time/time.h"
18 #include "mojo/common/message_pump_mojo.h"
19 #include "mojo/common/message_pump_mojo_handler.h"
24 typedef int WatcherID
;
28 const char kWatcherThreadName
[] = "handle-watcher-thread";
30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
31 MessagePumpMojo
* message_pump_mojo
= NULL
;
33 scoped_ptr
<base::MessagePump
> CreateMessagePumpMojo() {
34 message_pump_mojo
= new MessagePumpMojo
;
35 return scoped_ptr
<base::MessagePump
>(message_pump_mojo
).Pass();
38 // Tracks the data for a single call to Start().
42 wait_flags(MOJO_WAIT_FLAG_NONE
),
47 MojoWaitFlags wait_flags
;
48 base::TimeTicks deadline
;
49 base::Callback
<void(MojoResult
)> callback
;
50 scoped_refptr
<base::MessageLoopProxy
> message_loop
;
53 // WatcherBackend --------------------------------------------------------------
55 // WatcherBackend is responsible for managing the requests and interacting with
56 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
57 // thread WatcherThreadManager creates.
58 class WatcherBackend
: public MessagePumpMojoHandler
{
61 virtual ~WatcherBackend();
63 void StartWatching(const WatchData
& data
);
64 void StopWatching(WatcherID watcher_id
);
67 typedef std::map
<Handle
, WatchData
> HandleToWatchDataMap
;
69 // Invoked when a handle needs to be removed and notified.
70 void RemoveAndNotify(const Handle
& handle
, MojoResult result
);
72 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
73 // and sets |handle| to the Handle. Returns false if not a known id.
74 bool GetMojoHandleByWatcherID(WatcherID watcher_id
, Handle
* handle
) const;
76 // MessagePumpMojoHandler overrides:
77 virtual void OnHandleReady(const Handle
& handle
) OVERRIDE
;
78 virtual void OnHandleError(const Handle
& handle
, MojoResult result
) OVERRIDE
;
80 // Maps from assigned id to WatchData.
81 HandleToWatchDataMap handle_to_data_
;
83 DISALLOW_COPY_AND_ASSIGN(WatcherBackend
);
86 WatcherBackend::WatcherBackend() {
89 WatcherBackend::~WatcherBackend() {
92 void WatcherBackend::StartWatching(const WatchData
& data
) {
93 RemoveAndNotify(data
.handle
, MOJO_RESULT_CANCELLED
);
95 DCHECK_EQ(0u, handle_to_data_
.count(data
.handle
));
97 handle_to_data_
[data
.handle
] = data
;
98 message_pump_mojo
->AddHandler(this, data
.handle
,
103 void WatcherBackend::StopWatching(WatcherID watcher_id
) {
104 // Because of the thread hop it is entirely possible to get here and not
105 // have a valid handle registered for |watcher_id|.
107 if (!GetMojoHandleByWatcherID(watcher_id
, &handle
))
110 handle_to_data_
.erase(handle
);
111 message_pump_mojo
->RemoveHandler(handle
);
114 void WatcherBackend::RemoveAndNotify(const Handle
& handle
,
116 if (handle_to_data_
.count(handle
) == 0)
119 const WatchData
data(handle_to_data_
[handle
]);
120 handle_to_data_
.erase(handle
);
121 message_pump_mojo
->RemoveHandler(handle
);
122 data
.message_loop
->PostTask(FROM_HERE
, base::Bind(data
.callback
, result
));
125 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id
,
126 Handle
* handle
) const {
127 for (HandleToWatchDataMap::const_iterator i
= handle_to_data_
.begin();
128 i
!= handle_to_data_
.end(); ++i
) {
129 if (i
->second
.id
== watcher_id
) {
130 *handle
= i
->second
.handle
;
137 void WatcherBackend::OnHandleReady(const Handle
& handle
) {
138 RemoveAndNotify(handle
, MOJO_RESULT_OK
);
141 void WatcherBackend::OnHandleError(const Handle
& handle
, MojoResult result
) {
142 RemoveAndNotify(handle
, result
);
145 // WatcherThreadManager --------------------------------------------------------
147 // WatcherThreadManager manages the background thread that listens for handles
148 // to be ready. All requests are handled by WatcherBackend.
149 class WatcherThreadManager
{
151 // Returns the shared instance.
152 static WatcherThreadManager
* GetInstance();
154 // Starts watching the requested handle. Returns a unique ID that is used to
155 // stop watching the handle. When the handle is ready |callback| is notified
156 // on the thread StartWatching() was invoked on.
157 // This may be invoked on any thread.
158 WatcherID
StartWatching(const Handle
& handle
,
159 MojoWaitFlags wait_flags
,
160 base::TimeTicks deadline
,
161 const base::Callback
<void(MojoResult
)>& callback
);
163 // Stops watching a handle.
164 // This may be invoked on any thread.
165 void StopWatching(WatcherID watcher_id
);
168 friend struct base::DefaultLazyInstanceTraits
<WatcherThreadManager
>;
170 WatcherThreadManager();
171 ~WatcherThreadManager();
173 base::Thread thread_
;
175 base::AtomicSequenceNumber watcher_id_generator_
;
177 WatcherBackend backend_
;
179 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager
);
182 WatcherThreadManager
* WatcherThreadManager::GetInstance() {
183 static base::LazyInstance
<WatcherThreadManager
> instance
=
184 LAZY_INSTANCE_INITIALIZER
;
185 return &instance
.Get();
188 WatcherID
WatcherThreadManager::StartWatching(
189 const Handle
& handle
,
190 MojoWaitFlags wait_flags
,
191 base::TimeTicks deadline
,
192 const base::Callback
<void(MojoResult
)>& callback
) {
194 data
.id
= watcher_id_generator_
.GetNext();
195 data
.handle
= handle
;
196 data
.callback
= callback
;
197 data
.wait_flags
= wait_flags
;
198 data
.deadline
= deadline
;
199 data
.message_loop
= base::MessageLoopProxy::current();
200 DCHECK_NE(static_cast<base::MessageLoopProxy
*>(NULL
), data
.message_loop
);
201 // We outlive |thread_|, so it's safe to use Unretained() here.
202 thread_
.message_loop()->PostTask(
204 base::Bind(&WatcherBackend::StartWatching
,
205 base::Unretained(&backend_
),
210 void WatcherThreadManager::StopWatching(WatcherID watcher_id
) {
211 // We outlive |thread_|, so it's safe to use Unretained() here.
212 thread_
.message_loop()->PostTask(
214 base::Bind(&WatcherBackend::StopWatching
,
215 base::Unretained(&backend_
),
219 WatcherThreadManager::WatcherThreadManager()
220 : thread_(kWatcherThreadName
) {
221 base::Thread::Options thread_options
;
222 thread_options
.message_pump_factory
= base::Bind(&CreateMessagePumpMojo
);
223 thread_
.StartWithOptions(thread_options
);
226 WatcherThreadManager::~WatcherThreadManager() {
232 // HandleWatcher::StartState ---------------------------------------------------
234 // Contains the information passed to Start().
235 struct HandleWatcher::StartState
{
236 explicit StartState(HandleWatcher
* watcher
) : weak_factory(watcher
) {
242 // ID assigned by WatcherThreadManager.
243 WatcherID watcher_id
;
245 // Callback to notify when done.
246 base::Callback
<void(MojoResult
)> callback
;
248 // When Start() is invoked a callback is passed to WatcherThreadManager
249 // using a WeakRef from |weak_refactory_|. The callback invokes
250 // OnHandleReady() (on the thread Start() is invoked from) which in turn
251 // notifies |callback_|. Doing this allows us to reset state when the handle
252 // is ready, and then notify the callback. Doing this also means Stop()
253 // cancels any pending callbacks that may be inflight.
254 base::WeakPtrFactory
<HandleWatcher
> weak_factory
;
257 // HandleWatcher ---------------------------------------------------------------
260 base::TickClock
* HandleWatcher::tick_clock_
= NULL
;
262 HandleWatcher::HandleWatcher() {
265 HandleWatcher::~HandleWatcher() {
269 void HandleWatcher::Start(const Handle
& handle
,
270 MojoWaitFlags wait_flags
,
271 MojoDeadline deadline
,
272 const base::Callback
<void(MojoResult
)>& callback
) {
273 DCHECK(handle
.is_valid());
274 DCHECK_NE(MOJO_WAIT_FLAG_NONE
, wait_flags
);
278 start_state_
.reset(new StartState(this));
279 start_state_
->callback
= callback
;
280 start_state_
->watcher_id
=
281 WatcherThreadManager::GetInstance()->StartWatching(
284 MojoDeadlineToTimeTicks(deadline
),
285 base::Bind(&HandleWatcher::OnHandleReady
,
286 start_state_
->weak_factory
.GetWeakPtr()));
289 void HandleWatcher::Stop() {
290 if (!start_state_
.get())
293 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
294 WatcherThreadManager::GetInstance()->StopWatching(old_state
->watcher_id
);
297 void HandleWatcher::OnHandleReady(MojoResult result
) {
298 DCHECK(start_state_
.get());
299 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
300 old_state
->callback
.Run(result
);
302 // NOTE: We may have been deleted during callback execution.
306 base::TimeTicks
HandleWatcher::NowTicks() {
307 return tick_clock_
? tick_clock_
->NowTicks() : base::TimeTicks::Now();
311 base::TimeTicks
HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline
) {
312 return deadline
== MOJO_DEADLINE_INDEFINITE
? base::TimeTicks() :
313 NowTicks() + base::TimeDelta::FromMicroseconds(deadline
);
316 } // namespace common