Add ICU message format support
[chromium-blink-merge.git] / mojo / message_pump / message_pump_mojo.cc
bloba24349be3d721dcc418d50abd2645d959f593fbb
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/message_pump/message_pump_mojo.h"
7 #include <algorithm>
8 #include <vector>
10 #include "base/debug/alias.h"
11 #include "base/lazy_instance.h"
12 #include "base/logging.h"
13 #include "base/threading/thread_local.h"
14 #include "base/time/time.h"
15 #include "mojo/message_pump/message_pump_mojo_handler.h"
16 #include "mojo/message_pump/time_helper.h"
18 namespace mojo {
19 namespace common {
20 namespace {
22 base::LazyInstance<base::ThreadLocalPointer<MessagePumpMojo> >::Leaky
23 g_tls_current_pump = LAZY_INSTANCE_INITIALIZER;
25 MojoDeadline TimeTicksToMojoDeadline(base::TimeTicks time_ticks,
26 base::TimeTicks now) {
27 // The is_null() check matches that of HandleWatcher as well as how
28 // |delayed_work_time| is used.
29 if (time_ticks.is_null())
30 return MOJO_DEADLINE_INDEFINITE;
31 const int64_t delta = (time_ticks - now).InMicroseconds();
32 return delta < 0 ? static_cast<MojoDeadline>(0) :
33 static_cast<MojoDeadline>(delta);
36 } // namespace
38 // State needed for one iteration of WaitMany. The first handle and flags
39 // corresponds to that of the control pipe.
40 struct MessagePumpMojo::WaitState {
41 std::vector<Handle> handles;
42 std::vector<MojoHandleSignals> wait_signals;
45 struct MessagePumpMojo::RunState {
46 RunState() : should_quit(false) {
47 CreateMessagePipe(NULL, &read_handle, &write_handle);
50 base::TimeTicks delayed_work_time;
52 // Used to wake up WaitForWork().
53 ScopedMessagePipeHandle read_handle;
54 ScopedMessagePipeHandle write_handle;
56 bool should_quit;
59 MessagePumpMojo::MessagePumpMojo() : run_state_(NULL), next_handler_id_(0) {
60 DCHECK(!current())
61 << "There is already a MessagePumpMojo instance on this thread.";
62 g_tls_current_pump.Pointer()->Set(this);
65 MessagePumpMojo::~MessagePumpMojo() {
66 DCHECK_EQ(this, current());
67 g_tls_current_pump.Pointer()->Set(NULL);
70 // static
71 scoped_ptr<base::MessagePump> MessagePumpMojo::Create() {
72 return scoped_ptr<MessagePump>(new MessagePumpMojo());
75 // static
76 MessagePumpMojo* MessagePumpMojo::current() {
77 return g_tls_current_pump.Pointer()->Get();
80 void MessagePumpMojo::AddHandler(MessagePumpMojoHandler* handler,
81 const Handle& handle,
82 MojoHandleSignals wait_signals,
83 base::TimeTicks deadline) {
84 CHECK(handler);
85 DCHECK(handle.is_valid());
86 // Assume it's an error if someone tries to reregister an existing handle.
87 CHECK_EQ(0u, handlers_.count(handle));
88 Handler handler_data;
89 handler_data.handler = handler;
90 handler_data.wait_signals = wait_signals;
91 handler_data.deadline = deadline;
92 handler_data.id = next_handler_id_++;
93 handlers_[handle] = handler_data;
96 void MessagePumpMojo::RemoveHandler(const Handle& handle) {
97 handlers_.erase(handle);
100 void MessagePumpMojo::AddObserver(Observer* observer) {
101 observers_.AddObserver(observer);
104 void MessagePumpMojo::RemoveObserver(Observer* observer) {
105 observers_.RemoveObserver(observer);
108 void MessagePumpMojo::Run(Delegate* delegate) {
109 RunState run_state;
110 // TODO: better deal with error handling.
111 CHECK(run_state.read_handle.is_valid());
112 CHECK(run_state.write_handle.is_valid());
113 RunState* old_state = NULL;
115 base::AutoLock auto_lock(run_state_lock_);
116 old_state = run_state_;
117 run_state_ = &run_state;
119 DoRunLoop(&run_state, delegate);
121 base::AutoLock auto_lock(run_state_lock_);
122 run_state_ = old_state;
126 void MessagePumpMojo::Quit() {
127 base::AutoLock auto_lock(run_state_lock_);
128 if (run_state_)
129 run_state_->should_quit = true;
132 void MessagePumpMojo::ScheduleWork() {
133 base::AutoLock auto_lock(run_state_lock_);
134 if (run_state_)
135 SignalControlPipe(*run_state_);
138 void MessagePumpMojo::ScheduleDelayedWork(
139 const base::TimeTicks& delayed_work_time) {
140 base::AutoLock auto_lock(run_state_lock_);
141 if (!run_state_)
142 return;
143 run_state_->delayed_work_time = delayed_work_time;
146 void MessagePumpMojo::DoRunLoop(RunState* run_state, Delegate* delegate) {
147 bool more_work_is_plausible = true;
148 for (;;) {
149 const bool block = !more_work_is_plausible;
150 more_work_is_plausible = DoInternalWork(*run_state, block);
152 if (run_state->should_quit)
153 break;
155 more_work_is_plausible |= delegate->DoWork();
156 if (run_state->should_quit)
157 break;
159 more_work_is_plausible |= delegate->DoDelayedWork(
160 &run_state->delayed_work_time);
161 if (run_state->should_quit)
162 break;
164 if (more_work_is_plausible)
165 continue;
167 more_work_is_plausible = delegate->DoIdleWork();
168 if (run_state->should_quit)
169 break;
173 bool MessagePumpMojo::DoInternalWork(const RunState& run_state, bool block) {
174 const MojoDeadline deadline = block ? GetDeadlineForWait(run_state) : 0;
175 const WaitState wait_state = GetWaitState(run_state);
177 const WaitManyResult wait_many_result =
178 WaitMany(wait_state.handles, wait_state.wait_signals, deadline, nullptr);
179 const MojoResult result = wait_many_result.result;
180 bool did_work = true;
181 if (result == MOJO_RESULT_OK) {
182 if (wait_many_result.index == 0) {
183 // Control pipe was written to.
184 ReadMessageRaw(run_state.read_handle.get(), NULL, NULL, NULL, NULL,
185 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
186 } else {
187 DCHECK(handlers_.find(wait_state.handles[wait_many_result.index]) !=
188 handlers_.end());
189 WillSignalHandler();
190 handlers_[wait_state.handles[wait_many_result.index]]
191 .handler->OnHandleReady(wait_state.handles[wait_many_result.index]);
192 DidSignalHandler();
194 } else {
195 switch (result) {
196 case MOJO_RESULT_CANCELLED:
197 case MOJO_RESULT_FAILED_PRECONDITION:
198 RemoveInvalidHandle(wait_state, result, wait_many_result.index);
199 break;
200 case MOJO_RESULT_DEADLINE_EXCEEDED:
201 did_work = false;
202 break;
203 default:
204 base::debug::Alias(&result);
205 // Unexpected result is likely fatal, crash so we can determine cause.
206 CHECK(false);
210 // Notify and remove any handlers whose time has expired. Make a copy in case
211 // someone tries to add/remove new handlers from notification.
212 const HandleToHandler cloned_handlers(handlers_);
213 const base::TimeTicks now(internal::NowTicks());
214 for (HandleToHandler::const_iterator i = cloned_handlers.begin();
215 i != cloned_handlers.end(); ++i) {
216 // Since we're iterating over a clone of the handlers, verify the handler is
217 // still valid before notifying.
218 if (!i->second.deadline.is_null() && i->second.deadline < now &&
219 handlers_.find(i->first) != handlers_.end() &&
220 handlers_[i->first].id == i->second.id) {
221 WillSignalHandler();
222 i->second.handler->OnHandleError(i->first, MOJO_RESULT_DEADLINE_EXCEEDED);
223 DidSignalHandler();
224 handlers_.erase(i->first);
225 did_work = true;
228 return did_work;
231 void MessagePumpMojo::RemoveInvalidHandle(const WaitState& wait_state,
232 MojoResult result,
233 uint32_t index) {
234 // TODO(sky): deal with control pipe going bad.
235 CHECK(result == MOJO_RESULT_FAILED_PRECONDITION ||
236 result == MOJO_RESULT_CANCELLED);
237 CHECK_NE(index, 0u); // Indicates the control pipe went bad.
239 // Remove the handle first, this way if OnHandleError() tries to remove the
240 // handle our iterator isn't invalidated.
241 CHECK(handlers_.find(wait_state.handles[index]) != handlers_.end());
242 MessagePumpMojoHandler* handler =
243 handlers_[wait_state.handles[index]].handler;
244 handlers_.erase(wait_state.handles[index]);
245 WillSignalHandler();
246 handler->OnHandleError(wait_state.handles[index], result);
247 DidSignalHandler();
250 void MessagePumpMojo::SignalControlPipe(const RunState& run_state) {
251 const MojoResult result =
252 WriteMessageRaw(run_state.write_handle.get(), NULL, 0, NULL, 0,
253 MOJO_WRITE_MESSAGE_FLAG_NONE);
254 // If we can't write we likely won't wake up the thread and there is a strong
255 // chance we'll deadlock.
256 CHECK_EQ(MOJO_RESULT_OK, result);
259 MessagePumpMojo::WaitState MessagePumpMojo::GetWaitState(
260 const RunState& run_state) const {
261 WaitState wait_state;
262 wait_state.handles.push_back(run_state.read_handle.get());
263 wait_state.wait_signals.push_back(MOJO_HANDLE_SIGNAL_READABLE);
265 for (HandleToHandler::const_iterator i = handlers_.begin();
266 i != handlers_.end(); ++i) {
267 wait_state.handles.push_back(i->first);
268 wait_state.wait_signals.push_back(i->second.wait_signals);
270 return wait_state;
273 MojoDeadline MessagePumpMojo::GetDeadlineForWait(
274 const RunState& run_state) const {
275 const base::TimeTicks now(internal::NowTicks());
276 MojoDeadline deadline = TimeTicksToMojoDeadline(run_state.delayed_work_time,
277 now);
278 for (HandleToHandler::const_iterator i = handlers_.begin();
279 i != handlers_.end(); ++i) {
280 deadline = std::min(
281 TimeTicksToMojoDeadline(i->second.deadline, now), deadline);
283 return deadline;
286 void MessagePumpMojo::WillSignalHandler() {
287 FOR_EACH_OBSERVER(Observer, observers_, WillSignalHandler());
290 void MessagePumpMojo::DidSignalHandler() {
291 FOR_EACH_OBSERVER(Observer, observers_, DidSignalHandler());
294 } // namespace common
295 } // namespace mojo