1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/handle_watcher.h"
9 #include "base/atomic_sequence_num.h"
10 #include "base/bind.h"
11 #include "base/lazy_instance.h"
12 #include "base/memory/weak_ptr.h"
13 #include "base/message_loop/message_loop.h"
14 #include "base/message_loop/message_loop_proxy.h"
15 #include "base/synchronization/lock.h"
16 #include "base/threading/thread.h"
17 #include "base/time/time.h"
18 #include "mojo/common/environment_data.h"
19 #include "mojo/common/message_pump_mojo.h"
20 #include "mojo/common/message_pump_mojo_handler.h"
21 #include "mojo/common/time_helper.h"
26 typedef int WatcherID
;
30 const char kWatcherThreadName
[] = "handle-watcher-thread";
32 const char kWatcherThreadManagerKey
[] = "watcher-thread-manager";
34 // TODO(sky): this should be unnecessary once MessageLoop has been refactored.
35 MessagePumpMojo
* message_pump_mojo
= NULL
;
37 scoped_ptr
<base::MessagePump
> CreateMessagePumpMojo() {
38 message_pump_mojo
= new MessagePumpMojo
;
39 return scoped_ptr
<base::MessagePump
>(message_pump_mojo
).Pass();
42 base::TimeTicks
MojoDeadlineToTimeTicks(MojoDeadline deadline
) {
43 return deadline
== MOJO_DEADLINE_INDEFINITE
? base::TimeTicks() :
44 internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline
);
47 // Tracks the data for a single call to Start().
51 wait_flags(MOJO_WAIT_FLAG_NONE
),
56 MojoWaitFlags wait_flags
;
57 base::TimeTicks deadline
;
58 base::Callback
<void(MojoResult
)> callback
;
59 scoped_refptr
<base::MessageLoopProxy
> message_loop
;
62 // WatcherBackend --------------------------------------------------------------
64 // WatcherBackend is responsible for managing the requests and interacting with
65 // MessagePumpMojo. All access (outside of creation/destruction) is done on the
66 // thread WatcherThreadManager creates.
67 class WatcherBackend
: public MessagePumpMojoHandler
{
70 virtual ~WatcherBackend();
72 void StartWatching(const WatchData
& data
);
73 void StopWatching(WatcherID watcher_id
);
76 typedef std::map
<Handle
, WatchData
> HandleToWatchDataMap
;
78 // Invoked when a handle needs to be removed and notified.
79 void RemoveAndNotify(const Handle
& handle
, MojoResult result
);
81 // Searches through |handle_to_data_| for |watcher_id|. Returns true if found
82 // and sets |handle| to the Handle. Returns false if not a known id.
83 bool GetMojoHandleByWatcherID(WatcherID watcher_id
, Handle
* handle
) const;
85 // MessagePumpMojoHandler overrides:
86 virtual void OnHandleReady(const Handle
& handle
) OVERRIDE
;
87 virtual void OnHandleError(const Handle
& handle
, MojoResult result
) OVERRIDE
;
89 // Maps from assigned id to WatchData.
90 HandleToWatchDataMap handle_to_data_
;
92 DISALLOW_COPY_AND_ASSIGN(WatcherBackend
);
95 WatcherBackend::WatcherBackend() {
98 WatcherBackend::~WatcherBackend() {
101 void WatcherBackend::StartWatching(const WatchData
& data
) {
102 RemoveAndNotify(data
.handle
, MOJO_RESULT_CANCELLED
);
104 DCHECK_EQ(0u, handle_to_data_
.count(data
.handle
));
106 handle_to_data_
[data
.handle
] = data
;
107 message_pump_mojo
->AddHandler(this, data
.handle
,
112 void WatcherBackend::StopWatching(WatcherID watcher_id
) {
113 // Because of the thread hop it is entirely possible to get here and not
114 // have a valid handle registered for |watcher_id|.
116 if (!GetMojoHandleByWatcherID(watcher_id
, &handle
))
119 handle_to_data_
.erase(handle
);
120 message_pump_mojo
->RemoveHandler(handle
);
123 void WatcherBackend::RemoveAndNotify(const Handle
& handle
,
125 if (handle_to_data_
.count(handle
) == 0)
128 const WatchData
data(handle_to_data_
[handle
]);
129 handle_to_data_
.erase(handle
);
130 message_pump_mojo
->RemoveHandler(handle
);
131 data
.message_loop
->PostTask(FROM_HERE
, base::Bind(data
.callback
, result
));
134 bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id
,
135 Handle
* handle
) const {
136 for (HandleToWatchDataMap::const_iterator i
= handle_to_data_
.begin();
137 i
!= handle_to_data_
.end(); ++i
) {
138 if (i
->second
.id
== watcher_id
) {
139 *handle
= i
->second
.handle
;
146 void WatcherBackend::OnHandleReady(const Handle
& handle
) {
147 RemoveAndNotify(handle
, MOJO_RESULT_OK
);
150 void WatcherBackend::OnHandleError(const Handle
& handle
, MojoResult result
) {
151 RemoveAndNotify(handle
, result
);
154 // WatcherThreadManager --------------------------------------------------------
156 // WatcherThreadManager manages the background thread that listens for handles
157 // to be ready. All requests are handled by WatcherBackend.
158 class WatcherThreadManager
{
160 ~WatcherThreadManager();
162 // Returns the shared instance.
163 static WatcherThreadManager
* GetInstance();
165 // Starts watching the requested handle. Returns a unique ID that is used to
166 // stop watching the handle. When the handle is ready |callback| is notified
167 // on the thread StartWatching() was invoked on.
168 // This may be invoked on any thread.
169 WatcherID
StartWatching(const Handle
& handle
,
170 MojoWaitFlags wait_flags
,
171 base::TimeTicks deadline
,
172 const base::Callback
<void(MojoResult
)>& callback
);
174 // Stops watching a handle.
175 // This may be invoked on any thread.
176 void StopWatching(WatcherID watcher_id
);
179 WatcherThreadManager();
181 base::Thread thread_
;
183 base::AtomicSequenceNumber watcher_id_generator_
;
185 WatcherBackend backend_
;
187 DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager
);
190 struct WatcherThreadManagerData
: EnvironmentData::Data
{
191 scoped_ptr
<WatcherThreadManager
> thread_manager
;
194 WatcherThreadManager::~WatcherThreadManager() {
198 static base::LazyInstance
<base::Lock
> thread_lookup_lock
=
199 LAZY_INSTANCE_INITIALIZER
;
201 WatcherThreadManager
* WatcherThreadManager::GetInstance() {
202 base::AutoLock
auto_lock(thread_lookup_lock
.Get());
203 WatcherThreadManagerData
* data
= static_cast<WatcherThreadManagerData
*>(
204 EnvironmentData::GetInstance()->GetData(kWatcherThreadManagerKey
));
206 data
= new WatcherThreadManagerData
;
207 data
->thread_manager
.reset(new WatcherThreadManager
);
208 EnvironmentData::GetInstance()->SetData(
209 kWatcherThreadManagerKey
,
210 scoped_ptr
<EnvironmentData::Data
>(data
));
212 return data
->thread_manager
.get();
215 WatcherID
WatcherThreadManager::StartWatching(
216 const Handle
& handle
,
217 MojoWaitFlags wait_flags
,
218 base::TimeTicks deadline
,
219 const base::Callback
<void(MojoResult
)>& callback
) {
221 data
.id
= watcher_id_generator_
.GetNext();
222 data
.handle
= handle
;
223 data
.callback
= callback
;
224 data
.wait_flags
= wait_flags
;
225 data
.deadline
= deadline
;
226 data
.message_loop
= base::MessageLoopProxy::current();
227 DCHECK_NE(static_cast<base::MessageLoopProxy
*>(NULL
),
228 data
.message_loop
.get());
229 // We outlive |thread_|, so it's safe to use Unretained() here.
230 thread_
.message_loop()->PostTask(
232 base::Bind(&WatcherBackend::StartWatching
,
233 base::Unretained(&backend_
),
238 void WatcherThreadManager::StopWatching(WatcherID watcher_id
) {
239 // We outlive |thread_|, so it's safe to use Unretained() here.
240 thread_
.message_loop()->PostTask(
242 base::Bind(&WatcherBackend::StopWatching
,
243 base::Unretained(&backend_
),
247 WatcherThreadManager::WatcherThreadManager()
248 : thread_(kWatcherThreadName
) {
249 base::Thread::Options thread_options
;
250 thread_options
.message_pump_factory
= base::Bind(&CreateMessagePumpMojo
);
251 thread_
.StartWithOptions(thread_options
);
256 // HandleWatcher::StartState ---------------------------------------------------
258 // Contains the information passed to Start().
259 struct HandleWatcher::StartState
{
260 explicit StartState(HandleWatcher
* watcher
) : weak_factory(watcher
) {
266 // ID assigned by WatcherThreadManager.
267 WatcherID watcher_id
;
269 // Callback to notify when done.
270 base::Callback
<void(MojoResult
)> callback
;
272 // When Start() is invoked a callback is passed to WatcherThreadManager
273 // using a WeakRef from |weak_refactory_|. The callback invokes
274 // OnHandleReady() (on the thread Start() is invoked from) which in turn
275 // notifies |callback_|. Doing this allows us to reset state when the handle
276 // is ready, and then notify the callback. Doing this also means Stop()
277 // cancels any pending callbacks that may be inflight.
278 base::WeakPtrFactory
<HandleWatcher
> weak_factory
;
281 // HandleWatcher ---------------------------------------------------------------
283 HandleWatcher::HandleWatcher() {
286 HandleWatcher::~HandleWatcher() {
290 void HandleWatcher::Start(const Handle
& handle
,
291 MojoWaitFlags wait_flags
,
292 MojoDeadline deadline
,
293 const base::Callback
<void(MojoResult
)>& callback
) {
294 DCHECK(handle
.is_valid());
295 DCHECK_NE(MOJO_WAIT_FLAG_NONE
, wait_flags
);
299 start_state_
.reset(new StartState(this));
300 start_state_
->callback
= callback
;
301 start_state_
->watcher_id
=
302 WatcherThreadManager::GetInstance()->StartWatching(
305 MojoDeadlineToTimeTicks(deadline
),
306 base::Bind(&HandleWatcher::OnHandleReady
,
307 start_state_
->weak_factory
.GetWeakPtr()));
310 void HandleWatcher::Stop() {
311 if (!start_state_
.get())
314 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
315 WatcherThreadManager::GetInstance()->StopWatching(old_state
->watcher_id
);
318 void HandleWatcher::OnHandleReady(MojoResult result
) {
319 DCHECK(start_state_
.get());
320 scoped_ptr
<StartState
> old_state(start_state_
.Pass());
321 old_state
->callback
.Run(result
);
323 // NOTE: We may have been deleted during callback execution.
326 } // namespace common