Roll src/third_party/WebKit d9c6159:8139f33 (svn 201974:201975)
[chromium-blink-merge.git] / ipc / mojo / ipc_message_pipe_reader.cc
blob11aab8d6155bf27aaf24bc0b1ac9f9775a19f1ff
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
7 #include <stdint.h>
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/mojo/async_handle_waiter.h"
16 #include "ipc/mojo/ipc_channel_mojo.h"
18 namespace IPC {
19 namespace internal {
21 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
22 MessagePipeReader::Delegate* delegate)
23 : pipe_(handle.Pass()),
24 handle_copy_(pipe_.get().value()),
25 delegate_(delegate),
26 async_waiter_(
27 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
28 base::Unretained(this)))),
29 pending_send_error_(MOJO_RESULT_OK) {
32 MessagePipeReader::~MessagePipeReader() {
33 DCHECK(thread_checker_.CalledOnValidThread());
34 // The pipe should be closed before deletion.
35 CHECK(!IsValid());
38 void MessagePipeReader::Close() {
39 DCHECK(thread_checker_.CalledOnValidThread());
40 async_waiter_.reset();
41 pipe_.reset();
42 OnPipeClosed();
45 void MessagePipeReader::CloseWithError(MojoResult error) {
46 DCHECK(thread_checker_.CalledOnValidThread());
47 OnPipeError(error);
48 Close();
51 void MessagePipeReader::CloseWithErrorIfPending() {
52 DCHECK(thread_checker_.CalledOnValidThread());
53 MojoResult pending_error = base::subtle::NoBarrier_Load(&pending_send_error_);
54 if (pending_error == MOJO_RESULT_OK)
55 return;
56 // NOTE: This races with Send(), and therefore the value of
57 // pending_send_error() can change.
58 CloseWithError(pending_error);
59 return;
62 void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
63 DCHECK_NE(error, MOJO_RESULT_OK);
64 // NOTE: No assumptions about the value of |pending_send_error_| or whether or
65 // not the error has been signaled can be made. If Send() is called
66 // immediately before Close() and errors, it's possible for the error to not
67 // be signaled.
68 base::subtle::NoBarrier_Store(&pending_send_error_, error);
71 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
72 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
73 "MessagePipeReader::Send",
74 message->flags(),
75 TRACE_EVENT_FLAG_FLOW_OUT);
76 std::vector<MojoHandle> handles;
77 MojoResult result = MOJO_RESULT_OK;
78 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
79 if (result == MOJO_RESULT_OK) {
80 result = MojoWriteMessage(handle(),
81 message->data(),
82 static_cast<uint32_t>(message->size()),
83 handles.empty() ? nullptr : &handles[0],
84 static_cast<uint32_t>(handles.size()),
85 MOJO_WRITE_MESSAGE_FLAG_NONE);
88 if (result != MOJO_RESULT_OK) {
89 std::for_each(handles.begin(), handles.end(), &MojoClose);
90 // We cannot call CloseWithError() here as Send() is protected by
91 // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
92 // cannot call CloseWithError() also because Send() can be called from
93 // non-UI thread while OnPipeError() expects to be called on IO thread.
94 CloseWithErrorLater(result);
95 return false;
98 return true;
101 void MessagePipeReader::OnMessageReceived() {
102 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
103 static_cast<uint32_t>(data_buffer().size()));
105 std::vector<MojoHandle> handle_buffer;
106 TakeHandleBuffer(&handle_buffer);
107 MojoResult write_result =
108 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
109 if (write_result != MOJO_RESULT_OK) {
110 CloseWithError(write_result);
111 return;
114 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
115 "MessagePipeReader::OnMessageReceived",
116 message.flags(),
117 TRACE_EVENT_FLAG_FLOW_IN);
118 delegate_->OnMessageReceived(message);
121 void MessagePipeReader::OnPipeClosed() {
122 DCHECK(thread_checker_.CalledOnValidThread());
123 if (!delegate_)
124 return;
125 delegate_->OnPipeClosed(this);
126 delegate_ = nullptr;
129 void MessagePipeReader::OnPipeError(MojoResult error) {
130 DCHECK(thread_checker_.CalledOnValidThread());
131 if (!delegate_)
132 return;
133 delegate_->OnPipeError(this);
136 MojoResult MessagePipeReader::ReadMessageBytes() {
137 DCHECK(thread_checker_.CalledOnValidThread());
138 DCHECK(handle_buffer_.empty());
140 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
141 uint32_t num_handles = 0;
142 MojoResult result = MojoReadMessage(pipe_.get().value(),
143 num_bytes ? &data_buffer_[0] : nullptr,
144 &num_bytes,
145 nullptr,
146 &num_handles,
147 MOJO_READ_MESSAGE_FLAG_NONE);
148 data_buffer_.resize(num_bytes);
149 handle_buffer_.resize(num_handles);
150 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
151 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
152 // it needs more bufer. So we re-read it with resized buffers.
153 result = MojoReadMessage(pipe_.get().value(),
154 num_bytes ? &data_buffer_[0] : nullptr,
155 &num_bytes,
156 num_handles ? &handle_buffer_[0] : nullptr,
157 &num_handles,
158 MOJO_READ_MESSAGE_FLAG_NONE);
161 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
162 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
163 return result;
166 void MessagePipeReader::ReadAvailableMessages() {
167 DCHECK(thread_checker_.CalledOnValidThread());
168 while (pipe_.is_valid()) {
169 MojoResult read_result = ReadMessageBytes();
170 if (read_result == MOJO_RESULT_SHOULD_WAIT)
171 break;
172 if (read_result != MOJO_RESULT_OK) {
173 DLOG(WARNING)
174 << "Pipe got error from ReadMessage(). Closing: " << read_result;
175 OnPipeError(read_result);
176 Close();
177 break;
180 OnMessageReceived();
185 void MessagePipeReader::ReadMessagesThenWait() {
186 DCHECK(thread_checker_.CalledOnValidThread());
187 while (true) {
188 ReadAvailableMessages();
189 if (!pipe_.is_valid())
190 break;
191 // |Wait()| is safe to call only after all messages are read.
192 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
193 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
194 // MessagePipe.
195 MojoResult result =
196 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
197 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
198 // that have been arrived after the last |ReadAvailableMessages()|.
199 // We have to consume then and retry in that case.
200 if (result != MOJO_RESULT_ALREADY_EXISTS) {
201 if (result != MOJO_RESULT_OK) {
202 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
203 OnPipeError(result);
204 Close();
207 break;
212 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
213 DCHECK(thread_checker_.CalledOnValidThread());
214 CloseWithErrorIfPending();
215 if (!IsValid()) {
216 // There was a pending error and it closed the pipe.
217 // We cannot do the work anymore.
218 return;
221 if (wait_result != MOJO_RESULT_OK) {
222 if (wait_result != MOJO_RESULT_ABORTED) {
223 // FAILED_PRECONDITION happens every time the peer is dead so
224 // it isn't worth polluting the log message.
225 LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
226 << "Pipe got error from the waiter. Closing: " << wait_result;
227 OnPipeError(wait_result);
230 Close();
231 return;
234 ReadMessagesThenWait();
237 void MessagePipeReader::DelayedDeleter::operator()(
238 MessagePipeReader* ptr) const {
239 ptr->Close();
240 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
241 base::Bind(&DeleteNow, ptr));
244 } // namespace internal
245 } // namespace IPC