1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/handle_watcher.h"
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/threading/thread.h"
16 #include "base/time/time.h"
17 #include "mojo/common/message_pump_mojo.h"
18 #include "mojo/common/message_pump_mojo_handler.h"
19 #include "mojo/common/time_helper.h"
24 typedef int WatcherID
;
28 const char kWatcherThreadName
[] = "handle-watcher-thread";
30 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
31 MessagePumpMojo
* message_pump_mojo
= NULL
;
33 scoped_ptr
<base::MessagePump
> CreateMessagePumpMojo() {
34 message_pump_mojo
= new MessagePumpMojo
;
35 return scoped_ptr
<base::MessagePump
>(message_pump_mojo
).Pass();
38 base::TimeTicks
MojoDeadlineToTimeTicks(MojoDeadline deadline
) {
39 return deadline
== MOJO_DEADLINE_INDEFINITE
? base::TimeTicks() :
40 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline
);
43 // Tracks the data for a single call to Start().
47 wait_flags(MOJO_WAIT_FLAG_NONE
),
52 MojoWaitFlags wait_flags
;
53 base::TimeTicks deadline
;
54 base::Callback
<void(MojoResult
)> callback
;
55 scoped_refptr
<base::MessageLoopProxy
> message_loop
;
58 // WatcherBackend --------------------------------------------------------------
60 // WatcherBackend is responsible for managing the requests and interacting with
61 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
62 // thread WatcherThreadManager creates.
63 class WatcherBackend
: public MessagePumpMojoHandler
{
66 virtual ~WatcherBackend();
68 void StartWatching(const WatchData
& data
);
69 void StopWatching(WatcherID watcher_id
);
72 typedef std::map
<Handle
, WatchData
> HandleToWatchDataMap
;
74 // Invoked when a handle needs to be removed and notified.
75 void RemoveAndNotify(const Handle
& handle
, MojoResult result
);
77 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
78 // and sets |handle| to the Handle. Returns false if not a known id.
79 bool GetMojoHandleByWatcherID(WatcherID watcher_id
, Handle
* handle
) const;
81 // MessagePumpMojoHandler overrides:
82 virtual void OnHandleReady(const Handle
& handle
) OVERRIDE
;
83 virtual void OnHandleError(const Handle
& handle
, MojoResult result
) OVERRIDE
;
85 // Maps from assigned id to WatchData.
86 HandleToWatchDataMap handle_to_data_
;
88 DISALLOW_COPY_AND_ASSIGN(WatcherBackend
);
91 WatcherBackend::WatcherBackend() {
94 WatcherBackend::~WatcherBackend() {
97 void WatcherBackend::StartWatching(const WatchData
& data
) {
98 RemoveAndNotify(data
.handle
, MOJO_RESULT_CANCELLED
);
100 DCHECK_EQ(0u, handle_to_data_
.count(data
.handle
));
102 handle_to_data_
[data
.handle
] = data
;
103 message_pump_mojo
->AddHandler(this, data
.handle
,
108 void WatcherBackend::StopWatching(WatcherID watcher_id
) {
109 // Because of the thread hop it is entirely possible to get here and not
110 // have a valid handle registered for |watcher_id|.
112 if (!GetMojoHandleByWatcherID(watcher_id
, &handle
))
115 handle_to_data_
.erase(handle
);
116 message_pump_mojo
->RemoveHandler(handle
);
119 void WatcherBackend::RemoveAndNotify(const Handle
& handle
,
121 if (handle_to_data_
.count(handle
) == 0)
124 const WatchData
data(handle_to_data_
[handle
]);
125 handle_to_data_
.erase(handle
);
126 message_pump_mojo
->RemoveHandler(handle
);
127 data
.message_loop
->PostTask(FROM_HERE
, base::Bind(data
.callback
, result
));
130 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id
,
131 Handle
* handle
) const {
132 for (HandleToWatchDataMap::const_iterator i
= handle_to_data_
.begin();
133 i
!= handle_to_data_
.end(); ++i
) {
134 if (i
->second
.id
== watcher_id
) {
135 *handle
= i
->second
.handle
;
142 void WatcherBackend::OnHandleReady(const Handle
& handle
) {
143 RemoveAndNotify(handle
, MOJO_RESULT_OK
);
146 void WatcherBackend::OnHandleError(const Handle
& handle
, MojoResult result
) {
147 RemoveAndNotify(handle
, result
);
150 // WatcherThreadManager --------------------------------------------------------
152 // WatcherThreadManager manages the background thread that listens for handles
153 // to be ready. All requests are handled by WatcherBackend.
154 class WatcherThreadManager
{
156 // Returns the shared instance.
157 static WatcherThreadManager
* GetInstance();
159 // Starts watching the requested handle. Returns a unique ID that is used to
160 // stop watching the handle. When the handle is ready |callback| is notified
161 // on the thread StartWatching() was invoked on.
162 // This may be invoked on any thread.
163 WatcherID
StartWatching(const Handle
& handle
,
164 MojoWaitFlags wait_flags
,
165 base::TimeTicks deadline
,
166 const base::Callback
<void(MojoResult
)>& callback
);
168 // Stops watching a handle.
169 // This may be invoked on any thread.
170 void StopWatching(WatcherID watcher_id
);
173 friend struct base::DefaultLazyInstanceTraits
<WatcherThreadManager
>;
175 WatcherThreadManager();
176 ~WatcherThreadManager();
178 base::Thread thread_
;
180 base::AtomicSequenceNumber watcher_id_generator_
;
182 WatcherBackend backend_
;
184 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager
);
187 WatcherThreadManager
* WatcherThreadManager::GetInstance() {
188 static base::LazyInstance
<WatcherThreadManager
> instance
=
189 LAZY_INSTANCE_INITIALIZER
;
190 return &instance
.Get();
193 WatcherID
WatcherThreadManager::StartWatching(
194 const Handle
& handle
,
195 MojoWaitFlags wait_flags
,
196 base::TimeTicks deadline
,
197 const base::Callback
<void(MojoResult
)>& callback
) {
199 data
.id
= watcher_id_generator_
.GetNext();
200 data
.handle
= handle
;
201 data
.callback
= callback
;
202 data
.wait_flags
= wait_flags
;
203 data
.deadline
= deadline
;
204 data
.message_loop
= base::MessageLoopProxy::current();
205 DCHECK_NE(static_cast<base::MessageLoopProxy
*>(NULL
),
206 data
.message_loop
.get());
207 // We outlive |thread_|, so it's safe to use Unretained() here.
208 thread_
.message_loop()->PostTask(
210 base::Bind(&WatcherBackend::StartWatching
,
211 base::Unretained(&backend_
),
216 void WatcherThreadManager::StopWatching(WatcherID watcher_id
) {
217 // We outlive |thread_|, so it's safe to use Unretained() here.
218 thread_
.message_loop()->PostTask(
220 base::Bind(&WatcherBackend::StopWatching
,
221 base::Unretained(&backend_
),
225 WatcherThreadManager::WatcherThreadManager()
226 : thread_(kWatcherThreadName
) {
227 base::Thread::Options thread_options
;
228 thread_options
.message_pump_factory
= base::Bind(&CreateMessagePumpMojo
);
229 thread_
.StartWithOptions(thread_options
);
232 WatcherThreadManager::~WatcherThreadManager() {
238 // HandleWatcher::StartState ---------------------------------------------------
240 // Contains the information passed to Start().
241 struct HandleWatcher::StartState
{
242 explicit StartState(HandleWatcher
* watcher
) : weak_factory(watcher
) {
248 // ID assigned by WatcherThreadManager.
249 WatcherID watcher_id
;
251 // Callback to notify when done.
252 base::Callback
<void(MojoResult
)> callback
;
254 // When Start() is invoked a callback is passed to WatcherThreadManager
255 // using a WeakRef from |weak_refactory_|. The callback invokes
256 // OnHandleReady() (on the thread Start() is invoked from) which in turn
257 // notifies |callback_|. Doing this allows us to reset state when the handle
258 // is ready, and then notify the callback. Doing this also means Stop()
259 // cancels any pending callbacks that may be inflight.
260 base::WeakPtrFactory
<HandleWatcher
> weak_factory
;
263 // HandleWatcher ---------------------------------------------------------------
265 HandleWatcher::HandleWatcher() {
268 HandleWatcher::~HandleWatcher() {
272 void HandleWatcher::Start(const Handle
& handle
,
273 MojoWaitFlags wait_flags
,
274 MojoDeadline deadline
,
275 const base::Callback
<void(MojoResult
)>& callback
) {
276 DCHECK(handle
.is_valid());
277 DCHECK_NE(MOJO_WAIT_FLAG_NONE
, wait_flags
);
281 start_state_
.reset(new StartState(this));
282 start_state_
->callback
= callback
;
283 start_state_
->watcher_id
=
284 WatcherThreadManager::GetInstance()->StartWatching(
287 MojoDeadlineToTimeTicks(deadline
),
288 base::Bind(&HandleWatcher::OnHandleReady
,
289 start_state_
->weak_factory
.GetWeakPtr()));
292 void HandleWatcher::Stop() {
293 if (!start_state_
.get())
296 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
297 WatcherThreadManager::GetInstance()->StopWatching(old_state
->watcher_id
);
300 void HandleWatcher::OnHandleReady(MojoResult result
) {
301 DCHECK(start_state_
.get());
302 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
303 old_state
->callback
.Run(result
);
305 // NOTE: We may have been deleted during callback execution.
308 } // namespace common