make use of media_use_ffmpeg in BUILD.gn
[chromium-blink-merge.git] / ipc / mojo / ipc_message_pipe_reader.cc
blob2202a575307a6d1cb1ee0e35e76e84af7b505fb8
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/mojo/async_handle_waiter.h"
14 #include "ipc/mojo/ipc_channel_mojo.h"
16 namespace IPC {
17 namespace internal {
19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
20 MessagePipeReader::Delegate* delegate)
21 : pipe_(handle.Pass()),
22 delegate_(delegate),
23 async_waiter_(
24 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
25 base::Unretained(this)))),
26 pending_send_error_(MOJO_RESULT_OK) {
29 MessagePipeReader::~MessagePipeReader() {
30 // The pipe should be closed before deletion.
31 CHECK(!IsValid());
32 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
35 void MessagePipeReader::Close() {
36 // All pending errors should be signaled before Close().
37 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
38 async_waiter_.reset();
39 pipe_.reset();
40 OnPipeClosed();
43 void MessagePipeReader::CloseWithError(MojoResult error) {
44 OnPipeError(error);
45 Close();
48 void MessagePipeReader::CloseWithErrorIfPending() {
49 if (pending_send_error_ == MOJO_RESULT_OK)
50 return;
51 MojoResult error = pending_send_error_;
52 pending_send_error_ = MOJO_RESULT_OK;
53 CloseWithError(error);
54 return;
57 void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
58 pending_send_error_ = error;
61 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
62 DCHECK(IsValid());
64 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
65 "MessagePipeReader::Send",
66 message->flags(),
67 TRACE_EVENT_FLAG_FLOW_OUT);
68 std::vector<MojoHandle> handles;
69 MojoResult result = MOJO_RESULT_OK;
70 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
71 if (result == MOJO_RESULT_OK) {
72 result = MojoWriteMessage(handle(),
73 message->data(),
74 static_cast<uint32>(message->size()),
75 handles.empty() ? nullptr : &handles[0],
76 static_cast<uint32>(handles.size()),
77 MOJO_WRITE_MESSAGE_FLAG_NONE);
80 if (result != MOJO_RESULT_OK) {
81 std::for_each(handles.begin(), handles.end(), &MojoClose);
82 // We cannot call CloseWithError() here as Send() is protected by
83 // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
84 // cannot call CloseWithError() also because Send() can be called from
85 // non-UI thread while OnPipeError() expects to be called on IO thread.
86 CloseWithErrorLater(result);
87 return false;
90 return true;
93 void MessagePipeReader::OnMessageReceived() {
94 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
95 static_cast<uint32>(data_buffer().size()));
97 std::vector<MojoHandle> handle_buffer;
98 TakeHandleBuffer(&handle_buffer);
99 MojoResult write_result =
100 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
101 if (write_result != MOJO_RESULT_OK) {
102 CloseWithError(write_result);
103 return;
106 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
107 "MessagePipeReader::OnMessageReceived",
108 message.flags(),
109 TRACE_EVENT_FLAG_FLOW_IN);
110 delegate_->OnMessageReceived(message);
113 void MessagePipeReader::OnPipeClosed() {
114 if (!delegate_)
115 return;
116 delegate_->OnPipeClosed(this);
117 delegate_ = nullptr;
120 void MessagePipeReader::OnPipeError(MojoResult error) {
121 if (!delegate_)
122 return;
123 delegate_->OnPipeError(this);
126 MojoResult MessagePipeReader::ReadMessageBytes() {
127 DCHECK(handle_buffer_.empty());
129 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
130 uint32_t num_handles = 0;
131 MojoResult result = MojoReadMessage(pipe_.get().value(),
132 num_bytes ? &data_buffer_[0] : nullptr,
133 &num_bytes,
134 nullptr,
135 &num_handles,
136 MOJO_READ_MESSAGE_FLAG_NONE);
137 data_buffer_.resize(num_bytes);
138 handle_buffer_.resize(num_handles);
139 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
140 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
141 // it needs more bufer. So we re-read it with resized buffers.
142 result = MojoReadMessage(pipe_.get().value(),
143 num_bytes ? &data_buffer_[0] : nullptr,
144 &num_bytes,
145 num_handles ? &handle_buffer_[0] : nullptr,
146 &num_handles,
147 MOJO_READ_MESSAGE_FLAG_NONE);
150 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
151 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
152 return result;
155 void MessagePipeReader::ReadAvailableMessages() {
156 while (pipe_.is_valid()) {
157 MojoResult read_result = ReadMessageBytes();
158 if (read_result == MOJO_RESULT_SHOULD_WAIT)
159 break;
160 if (read_result != MOJO_RESULT_OK) {
161 DLOG(WARNING)
162 << "Pipe got error from ReadMessage(). Closing: " << read_result;
163 OnPipeError(read_result);
164 Close();
165 break;
168 OnMessageReceived();
173 void MessagePipeReader::ReadMessagesThenWait() {
174 while (true) {
175 ReadAvailableMessages();
176 if (!pipe_.is_valid())
177 break;
178 // |Wait()| is safe to call only after all messages are read.
179 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
180 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
181 // MessagePipe.
182 MojoResult result =
183 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
184 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
185 // that have been arrived after the last |ReadAvailableMessages()|.
186 // We have to consume then and retry in that case.
187 if (result != MOJO_RESULT_ALREADY_EXISTS) {
188 if (result != MOJO_RESULT_OK) {
189 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
190 OnPipeError(result);
191 Close();
194 break;
199 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
200 CloseWithErrorIfPending();
201 if (!IsValid()) {
202 // There was a pending error and it closed the pipe.
203 // We cannot do the work anymore.
204 return;
207 if (wait_result != MOJO_RESULT_OK) {
208 if (wait_result != MOJO_RESULT_ABORTED) {
209 // FAILED_PRECONDITION happens every time the peer is dead so
210 // it isn't worth polluting the log message.
211 LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
212 << "Pipe got error from the waiter. Closing: " << wait_result;
213 OnPipeError(wait_result);
216 Close();
217 return;
220 ReadMessagesThenWait();
223 void MessagePipeReader::DelayedDeleter::operator()(
224 MessagePipeReader* ptr) const {
225 ptr->Close();
226 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
227 base::Bind(&DeleteNow, ptr));
230 } // namespace internal
231 } // namespace IPC