Switch global error menu icon to vectorized MD asset
[chromium-blink-merge.git] / ipc / mojo / ipc_message_pipe_reader.cc
blob6609f26bd0fbe305e0189f3ca3011e597f861a23
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/mojo/async_handle_waiter.h"
14 #include "ipc/mojo/ipc_channel_mojo.h"
16 namespace IPC {
17 namespace internal {
19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
20 MessagePipeReader::Delegate* delegate)
21 : pipe_(handle.Pass()),
22 handle_copy_(pipe_.get().value()),
23 delegate_(delegate),
24 async_waiter_(
25 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
26 base::Unretained(this)))),
27 pending_send_error_(MOJO_RESULT_OK) {
30 MessagePipeReader::~MessagePipeReader() {
31 DCHECK(thread_checker_.CalledOnValidThread());
32 // The pipe should be closed before deletion.
33 CHECK(!IsValid());
36 void MessagePipeReader::Close() {
37 DCHECK(thread_checker_.CalledOnValidThread());
38 async_waiter_.reset();
39 pipe_.reset();
40 OnPipeClosed();
43 void MessagePipeReader::CloseWithError(MojoResult error) {
44 DCHECK(thread_checker_.CalledOnValidThread());
45 OnPipeError(error);
46 Close();
49 void MessagePipeReader::CloseWithErrorIfPending() {
50 DCHECK(thread_checker_.CalledOnValidThread());
51 MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
52 if (pending_error == MOJO_RESULT_OK)
53 return;
54 // NOTE: This races with Send(), and therefore the value of
55 // pending_send_error() can change.
56 CloseWithError(pending_error);
57 return;
60 void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
61 DCHECK_NE(error, MOJO_RESULT_OK);
62 // NOTE: No assumptions about the value of |pending_send_error_| or whether or
63 // not the error has been signaled can be made. If Send() is called
64 // immediately before Close() and errors, it's possible for the error to not
65 // be signaled.
66 base::subtle::NoBarrier_Store(&pending_send_error_, error);
69 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
70 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
71 "MessagePipeReader::Send",
72 message->flags(),
73 TRACE_EVENT_FLAG_FLOW_OUT);
74 std::vector<MojoHandle> handles;
75 MojoResult result = MOJO_RESULT_OK;
76 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
77 if (result == MOJO_RESULT_OK) {
78 result = MojoWriteMessage(handle(),
79 message->data(),
80 static_cast<uint32>(message->size()),
81 handles.empty() ? nullptr : &handles[0],
82 static_cast<uint32>(handles.size()),
83 MOJO_WRITE_MESSAGE_FLAG_NONE);
86 if (result != MOJO_RESULT_OK) {
87 std::for_each(handles.begin(), handles.end(), &MojoClose);
88 // We cannot call CloseWithError() here as Send() is protected by
89 // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
90 // cannot call CloseWithError() also because Send() can be called from
91 // non-UI thread while OnPipeError() expects to be called on IO thread.
92 CloseWithErrorLater(result);
93 return false;
96 return true;
99 void MessagePipeReader::OnMessageReceived() {
100 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
101 static_cast<uint32>(data_buffer().size()));
103 std::vector<MojoHandle> handle_buffer;
104 TakeHandleBuffer(&handle_buffer);
105 MojoResult write_result =
106 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
107 if (write_result != MOJO_RESULT_OK) {
108 CloseWithError(write_result);
109 return;
112 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
113 "MessagePipeReader::OnMessageReceived",
114 message.flags(),
115 TRACE_EVENT_FLAG_FLOW_IN);
116 delegate_->OnMessageReceived(message);
119 void MessagePipeReader::OnPipeClosed() {
120 DCHECK(thread_checker_.CalledOnValidThread());
121 if (!delegate_)
122 return;
123 delegate_->OnPipeClosed(this);
124 delegate_ = nullptr;
127 void MessagePipeReader::OnPipeError(MojoResult error) {
128 DCHECK(thread_checker_.CalledOnValidThread());
129 if (!delegate_)
130 return;
131 delegate_->OnPipeError(this);
134 MojoResult MessagePipeReader::ReadMessageBytes() {
135 DCHECK(thread_checker_.CalledOnValidThread());
136 DCHECK(handle_buffer_.empty());
138 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
139 uint32_t num_handles = 0;
140 MojoResult result = MojoReadMessage(pipe_.get().value(),
141 num_bytes ? &data_buffer_[0] : nullptr,
142 &num_bytes,
143 nullptr,
144 &num_handles,
145 MOJO_READ_MESSAGE_FLAG_NONE);
146 data_buffer_.resize(num_bytes);
147 handle_buffer_.resize(num_handles);
148 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
149 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
150 // it needs more bufer. So we re-read it with resized buffers.
151 result = MojoReadMessage(pipe_.get().value(),
152 num_bytes ? &data_buffer_[0] : nullptr,
153 &num_bytes,
154 num_handles ? &handle_buffer_[0] : nullptr,
155 &num_handles,
156 MOJO_READ_MESSAGE_FLAG_NONE);
159 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
160 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
161 return result;
164 void MessagePipeReader::ReadAvailableMessages() {
165 DCHECK(thread_checker_.CalledOnValidThread());
166 while (pipe_.is_valid()) {
167 MojoResult read_result = ReadMessageBytes();
168 if (read_result == MOJO_RESULT_SHOULD_WAIT)
169 break;
170 if (read_result != MOJO_RESULT_OK) {
171 DLOG(WARNING)
172 << "Pipe got error from ReadMessage(). Closing: " << read_result;
173 OnPipeError(read_result);
174 Close();
175 break;
178 OnMessageReceived();
183 void MessagePipeReader::ReadMessagesThenWait() {
184 DCHECK(thread_checker_.CalledOnValidThread());
185 while (true) {
186 ReadAvailableMessages();
187 if (!pipe_.is_valid())
188 break;
189 // |Wait()| is safe to call only after all messages are read.
190 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
191 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
192 // MessagePipe.
193 MojoResult result =
194 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
195 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
196 // that have been arrived after the last |ReadAvailableMessages()|.
197 // We have to consume then and retry in that case.
198 if (result != MOJO_RESULT_ALREADY_EXISTS) {
199 if (result != MOJO_RESULT_OK) {
200 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
201 OnPipeError(result);
202 Close();
205 break;
210 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
211 DCHECK(thread_checker_.CalledOnValidThread());
212 CloseWithErrorIfPending();
213 if (!IsValid()) {
214 // There was a pending error and it closed the pipe.
215 // We cannot do the work anymore.
216 return;
219 if (wait_result != MOJO_RESULT_OK) {
220 if (wait_result != MOJO_RESULT_ABORTED) {
221 // FAILED_PRECONDITION happens every time the peer is dead so
222 // it isn't worth polluting the log message.
223 LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
224 << "Pipe got error from the waiter. Closing: " << wait_result;
225 OnPipeError(wait_result);
228 Close();
229 return;
232 ReadMessagesThenWait();
235 void MessagePipeReader::DelayedDeleter::operator()(
236 MessagePipeReader* ptr) const {
237 ptr->Close();
238 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
239 base::Bind(&DeleteNow, ptr));
242 } // namespace internal
243 } // namespace IPC