Roll src/third_party/skia de7665a:76033be
[chromium-blink-merge.git] / ipc / mojo / ipc_message_pipe_reader.cc
blob72e7ad7618f4c31b2587bd554f0f6ff381a73a2d
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "ipc/mojo/async_handle_waiter.h"
13 #include "ipc/mojo/ipc_channel_mojo.h"
15 namespace IPC {
16 namespace internal {
18 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
19 MessagePipeReader::Delegate* delegate)
20 : pipe_(handle.Pass()),
21 delegate_(delegate),
22 async_waiter_(
23 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
24 base::Unretained(this)))) {
27 MessagePipeReader::~MessagePipeReader() {
28 CHECK(!IsValid());
31 void MessagePipeReader::Close() {
32 async_waiter_.reset();
33 pipe_.reset();
34 OnPipeClosed();
37 void MessagePipeReader::CloseWithError(MojoResult error) {
38 OnPipeError(error);
39 Close();
42 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
43 DCHECK(IsValid());
45 message->TraceMessageBegin();
46 std::vector<MojoHandle> handles;
47 MojoResult result = MOJO_RESULT_OK;
48 #if defined(OS_POSIX) && !defined(OS_NACL)
49 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
50 #endif
51 if (result == MOJO_RESULT_OK) {
52 result = MojoWriteMessage(handle(),
53 message->data(),
54 static_cast<uint32>(message->size()),
55 handles.empty() ? nullptr : &handles[0],
56 static_cast<uint32>(handles.size()),
57 MOJO_WRITE_MESSAGE_FLAG_NONE);
60 if (result != MOJO_RESULT_OK) {
61 std::for_each(handles.begin(), handles.end(), &MojoClose);
62 CloseWithError(result);
63 return false;
66 return true;
69 void MessagePipeReader::OnMessageReceived() {
70 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
71 static_cast<uint32>(data_buffer().size()));
73 std::vector<MojoHandle> handle_buffer;
74 TakeHandleBuffer(&handle_buffer);
75 #if defined(OS_POSIX) && !defined(OS_NACL)
76 MojoResult write_result =
77 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
78 if (write_result != MOJO_RESULT_OK) {
79 CloseWithError(write_result);
80 return;
82 #else
83 DCHECK(handle_buffer.empty());
84 #endif
86 message.TraceMessageEnd();
87 delegate_->OnMessageReceived(message);
90 void MessagePipeReader::OnPipeClosed() {
91 if (!delegate_)
92 return;
93 delegate_->OnPipeClosed(this);
94 delegate_ = nullptr;
97 void MessagePipeReader::OnPipeError(MojoResult error) {
98 if (!delegate_)
99 return;
100 delegate_->OnPipeError(this);
103 MojoResult MessagePipeReader::ReadMessageBytes() {
104 DCHECK(handle_buffer_.empty());
106 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
107 uint32_t num_handles = 0;
108 MojoResult result = MojoReadMessage(pipe_.get().value(),
109 num_bytes ? &data_buffer_[0] : nullptr,
110 &num_bytes,
111 nullptr,
112 &num_handles,
113 MOJO_READ_MESSAGE_FLAG_NONE);
114 data_buffer_.resize(num_bytes);
115 handle_buffer_.resize(num_handles);
116 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
117 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
118 // it needs more bufer. So we re-read it with resized buffers.
119 result = MojoReadMessage(pipe_.get().value(),
120 num_bytes ? &data_buffer_[0] : nullptr,
121 &num_bytes,
122 num_handles ? &handle_buffer_[0] : nullptr,
123 &num_handles,
124 MOJO_READ_MESSAGE_FLAG_NONE);
127 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
128 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
129 return result;
132 void MessagePipeReader::ReadAvailableMessages() {
133 while (pipe_.is_valid()) {
134 MojoResult read_result = ReadMessageBytes();
135 if (read_result == MOJO_RESULT_SHOULD_WAIT)
136 break;
137 if (read_result != MOJO_RESULT_OK) {
138 // FAILED_PRECONDITION means that all the received messages
139 // got consumed and the peer is already closed.
140 if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
141 DLOG(WARNING)
142 << "Pipe got error from ReadMessage(). Closing: " << read_result;
143 OnPipeError(read_result);
146 Close();
147 break;
150 OnMessageReceived();
155 void MessagePipeReader::ReadMessagesThenWait() {
156 while (true) {
157 ReadAvailableMessages();
158 if (!pipe_.is_valid())
159 break;
160 // |Wait()| is safe to call only after all messages are read.
161 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
162 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
163 // MessagePipe.
164 MojoResult result =
165 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
166 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
167 // that have been arrived after the last |ReadAvailableMessages()|.
168 // We have to consume then and retry in that case.
169 if (result != MOJO_RESULT_ALREADY_EXISTS) {
170 if (result != MOJO_RESULT_OK) {
171 DLOG(ERROR) << "Result is " << result;
172 OnPipeError(result);
173 Close();
176 break;
181 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
182 if (wait_result != MOJO_RESULT_OK) {
183 if (wait_result != MOJO_RESULT_ABORTED) {
184 // FAILED_PRECONDITION happens every time the peer is dead so
185 // it isn't worth polluting the log message.
186 DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
187 << "Pipe got error from the waiter. Closing: " << wait_result;
188 OnPipeError(wait_result);
191 Close();
192 return;
195 ReadMessagesThenWait();
198 void MessagePipeReader::DelayedDeleter::operator()(
199 MessagePipeReader* ptr) const {
200 ptr->Close();
201 base::MessageLoopProxy::current()->PostTask(
202 FROM_HERE, base::Bind(&DeleteNow, ptr));
205 } // namespace internal
206 } // namespace IPC