Change DtmfSenderHandler to handle events on the signaling thread.
[chromium-blink-merge.git] / mojo / common / message_pump_mojo.cc
blobc06927015945f36ba451498a8df9c5639a2ebd95
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/common/message_pump_mojo.h"
7 #include <algorithm>
8 #include <vector>
10 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/threading/thread_local.h"
14 #include "base/time/time.h"
15 #include "mojo/common/message_pump_mojo_handler.h"
16 #include "mojo/common/time_helper.h"
18 namespace mojo {
19 namespace common {
20 namespace {
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used.
29 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta);
36 } // namespace
38 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals;
45 struct MessagePumpMojo::RunState {
46 RunState() : should_quit(false) {
47 CreateMessagePipe(NULL, &read_handle, &write_handle);
50 base::TimeTicks delayed_work_time;
52 // Used to wake up WaitForWork().
53 ScopedMessagePipeHandle read_handle;
54 ScopedMessagePipeHandle write_handle;
56 bool should_quit;
59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
60 DCHECK(!current())
61 << "There is already a MessagePumpMojo instance on this thread.";
62 g_tls_current_pump.Pointer()->Set(this);
65 MessagePumpMojo::~MessagePumpMojo() {
66 DCHECK_EQ(this, current());
67 g_tls_current_pump.Pointer()->Set(NULL);
70 // static
71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
72 return scoped_ptr<MessagePump>(new MessagePumpMojo());
75 // static
76 MessagePumpMojo* MessagePumpMojo::current() {
77 return g_tls_current_pump.Pointer()->Get();
80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
81 const Handle& handle,
82 MojoHandleSignals wait_signals,
83 base::TimeTicks deadline) {
84 CHECK(handler);
85 DCHECK(handle.is_valid());
86 // Assume it's an error if someone tries to reregister an existing handle.
87 CHECK_EQ(0u, handlers_.count(handle));
88 Handler handler_data;
89 handler_data.handler = handler;
90 handler_data.wait_signals = wait_signals;
91 handler_data.deadline = deadline;
92 handler_data.id = next_handler_id_++;
93 handlers_[handle] = handler_data;
96 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
97 handlers_.erase(handle);
100 void MessagePumpMojo::AddObserver(Observer* observer) {
101 observers_.AddObserver(observer);
104 void MessagePumpMojo::RemoveObserver(Observer* observer) {
105 observers_.RemoveObserver(observer);
108 void MessagePumpMojo::Run(Delegate* delegate) {
109 RunState run_state;
110 // TODO: better deal with error handling.
111 CHECK(run_state.read_handle.is_valid());
112 CHECK(run_state.write_handle.is_valid());
113 RunState* old_state = NULL;
115 base::AutoLock auto_lock(run_state_lock_);
116 old_state = run_state_;
117 run_state_ = &run_state;
119 DoRunLoop(&run_state, delegate);
121 base::AutoLock auto_lock(run_state_lock_);
122 run_state_ = old_state;
126 void MessagePumpMojo::Quit() {
127 base::AutoLock auto_lock(run_state_lock_);
128 if (run_state_)
129 run_state_->should_quit = true;
132 void MessagePumpMojo::ScheduleWork() {
133 base::AutoLock auto_lock(run_state_lock_);
134 if (run_state_)
135 SignalControlPipe(*run_state_);
138 void MessagePumpMojo::ScheduleDelayedWork(
139 const base::TimeTicks& delayed_work_time) {
140 base::AutoLock auto_lock(run_state_lock_);
141 if (!run_state_)
142 return;
143 run_state_->delayed_work_time = delayed_work_time;
146 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
147 bool more_work_is_plausible = true;
148 for (;;) {
149 const bool block = !more_work_is_plausible;
150 more_work_is_plausible = DoInternalWork(*run_state, block);
152 if (run_state->should_quit)
153 break;
155 more_work_is_plausible |= delegate->DoWork();
156 if (run_state->should_quit)
157 break;
159 more_work_is_plausible |= delegate->DoDelayedWork(
160 &run_state->delayed_work_time);
161 if (run_state->should_quit)
162 break;
164 if (more_work_is_plausible)
165 continue;
167 more_work_is_plausible = delegate->DoIdleWork();
168 if (run_state->should_quit)
169 break;
173 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
174 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
175 const WaitState wait_state = GetWaitState(run_state);
176 const MojoResult result =
177 WaitMany(wait_state.handles, wait_state.wait_signals, deadline);
178 bool did_work = true;
179 if (result == 0) {
180 // Control pipe was written to.
181 ReadMessageRaw(run_state.read_handle.get(), NULL, NULL, NULL, NULL,
182 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
183 } else if (result > 0) {
184 const size_t index = static_cast<size_t>(result);
185 DCHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
186 WillSignalHandler();
187 handlers_[wait_state.handles[index]].handler->OnHandleReady(
188 wait_state.handles[index]);
189 DidSignalHandler();
190 } else {
191 switch (result) {
192 case MOJO_RESULT_CANCELLED:
193 case MOJO_RESULT_FAILED_PRECONDITION:
194 RemoveFirstInvalidHandle(wait_state);
195 break;
196 case MOJO_RESULT_DEADLINE_EXCEEDED:
197 did_work = false;
198 break;
199 default:
200 base::debug::Alias(&result);
201 // Unexpected result is likely fatal, crash so we can determine cause.
202 CHECK(false);
206 // Notify and remove any handlers whose time has expired. Make a copy in case
207 // someone tries to add/remove new handlers from notification.
208 const HandleToHandler cloned_handlers(handlers_);
209 const base::TimeTicks now(internal::NowTicks());
210 for (HandleToHandler::const_iterator i = cloned_handlers.begin();
211 i != cloned_handlers.end(); ++i) {
212 // Since we're iterating over a clone of the handlers, verify the handler is
213 // still valid before notifying.
214 if (!i->second.deadline.is_null() && i->second.deadline < now &&
215 handlers_.find(i->first) != handlers_.end() &&
216 handlers_[i->first].id == i->second.id) {
217 WillSignalHandler();
218 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
219 DidSignalHandler();
220 handlers_.erase(i->first);
221 did_work = true;
224 return did_work;
227 void MessagePumpMojo::RemoveFirstInvalidHandle(const WaitState& wait_state) {
228 // TODO(sky): deal with control pipe going bad.
229 for (size_t i = 0; i < wait_state.handles.size(); ++i) {
230 const MojoResult result =
231 Wait(wait_state.handles[i], wait_state.wait_signals[i], 0);
232 if (result == MOJO_RESULT_INVALID_ARGUMENT) {
233 // We should never have an invalid argument. If we do it indicates
234 // RemoveHandler() was not invoked and is likely to cause problems else
235 // where in the stack if we ignore it.
236 CHECK(false);
237 } else if (result == MOJO_RESULT_FAILED_PRECONDITION ||
238 result == MOJO_RESULT_CANCELLED) {
239 CHECK_NE(i, 0u); // Indicates the control pipe went bad.
241 // Remove the handle first, this way if OnHandleError() tries to remove
242 // the handle our iterator isn't invalidated.
243 CHECK(handlers_.find(wait_state.handles[i]) != handlers_.end());
244 MessagePumpMojoHandler* handler =
245 handlers_[wait_state.handles[i]].handler;
246 handlers_.erase(wait_state.handles[i]);
247 WillSignalHandler();
248 handler->OnHandleError(wait_state.handles[i], result);
249 DidSignalHandler();
250 return;
255 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
256 const MojoResult result =
257 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
258 MOJO_WRITE_MESSAGE_FLAG_NONE);
259 // If we can't write we likely won't wake up the thread and there is a strong
260 // chance we'll deadlock.
261 CHECK_EQ(MOJO_RESULT_OK, result);
264 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
265 const RunState& run_state) const {
266 WaitState wait_state;
267 wait_state.handles.push_back(run_state.read_handle.get());
268 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
270 for (HandleToHandler::const_iterator i = handlers_.begin();
271 i != handlers_.end(); ++i) {
272 wait_state.handles.push_back(i->first);
273 wait_state.wait_signals.push_back(i->second.wait_signals);
275 return wait_state;
278 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
279 const RunState& run_state) const {
280 const base::TimeTicks now(internal::NowTicks());
281 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
282 now);
283 for (HandleToHandler::const_iterator i = handlers_.begin();
284 i != handlers_.end(); ++i) {
285 deadline = std::min(
286 TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
288 return deadline;
291 void MessagePumpMojo::WillSignalHandler() {
292 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
295 void MessagePumpMojo::DidSignalHandler() {
296 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
299 } // namespace common
300 } // namespace mojo