Extension syncing: Introduce a NeedsSync pref
[chromium-blink-merge.git] / ipc / mojo / ipc_message_pipe_reader.cc
blob44bd10a8f7f99dd5c4eeff69cd7db728bd42e1b0
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "ipc/mojo/async_handle_waiter.h"
14 #include "ipc/mojo/ipc_channel_mojo.h"
16 namespace IPC {
17 namespace internal {
19 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle,
20 MessagePipeReader::Delegate* delegate)
21 : pipe_(handle.Pass()),
22 delegate_(delegate),
23 async_waiter_(
24 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady,
25 base::Unretained(this)))),
26 pending_send_error_(MOJO_RESULT_OK) {
29 MessagePipeReader::~MessagePipeReader() {
30 // The pipe should be closed before deletion.
31 CHECK(!IsValid());
32 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
35 void MessagePipeReader::Close() {
36 // All pending errors should be signaled before Close().
37 DCHECK_EQ(pending_send_error_, MOJO_RESULT_OK);
38 async_waiter_.reset();
39 pipe_.reset();
40 OnPipeClosed();
43 void MessagePipeReader::CloseWithError(MojoResult error) {
44 OnPipeError(error);
45 Close();
48 void MessagePipeReader::CloseWithErrorIfPending() {
49 if (pending_send_error_ == MOJO_RESULT_OK)
50 return;
51 MojoResult error = pending_send_error_;
52 pending_send_error_ = MOJO_RESULT_OK;
53 CloseWithError(error);
54 return;
57 void MessagePipeReader::CloseWithErrorLater(MojoResult error) {
58 pending_send_error_ = error;
61 bool MessagePipeReader::Send(scoped_ptr<Message> message) {
62 DCHECK(IsValid());
64 message->TraceMessageBegin();
65 std::vector<MojoHandle> handles;
66 MojoResult result = MOJO_RESULT_OK;
67 result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
68 if (result == MOJO_RESULT_OK) {
69 result = MojoWriteMessage(handle(),
70 message->data(),
71 static_cast<uint32>(message->size()),
72 handles.empty() ? nullptr : &handles[0],
73 static_cast<uint32>(handles.size()),
74 MOJO_WRITE_MESSAGE_FLAG_NONE);
77 if (result != MOJO_RESULT_OK) {
78 std::for_each(handles.begin(), handles.end(), &MojoClose);
79 // We cannot call CloseWithError() here as Send() is protected by
80 // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
81 // cannot call CloseWithError() also because Send() can be called from
82 // non-UI thread while OnPipeError() expects to be called on IO thread.
83 CloseWithErrorLater(result);
84 return false;
87 return true;
90 void MessagePipeReader::OnMessageReceived() {
91 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
92 static_cast<uint32>(data_buffer().size()));
94 std::vector<MojoHandle> handle_buffer;
95 TakeHandleBuffer(&handle_buffer);
96 MojoResult write_result =
97 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer, &message);
98 if (write_result != MOJO_RESULT_OK) {
99 CloseWithError(write_result);
100 return;
103 message.TraceMessageEnd();
104 delegate_->OnMessageReceived(message);
107 void MessagePipeReader::OnPipeClosed() {
108 if (!delegate_)
109 return;
110 delegate_->OnPipeClosed(this);
111 delegate_ = nullptr;
114 void MessagePipeReader::OnPipeError(MojoResult error) {
115 if (!delegate_)
116 return;
117 delegate_->OnPipeError(this);
120 MojoResult MessagePipeReader::ReadMessageBytes() {
121 DCHECK(handle_buffer_.empty());
123 uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
124 uint32_t num_handles = 0;
125 MojoResult result = MojoReadMessage(pipe_.get().value(),
126 num_bytes ? &data_buffer_[0] : nullptr,
127 &num_bytes,
128 nullptr,
129 &num_handles,
130 MOJO_READ_MESSAGE_FLAG_NONE);
131 data_buffer_.resize(num_bytes);
132 handle_buffer_.resize(num_handles);
133 if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
134 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
135 // it needs more bufer. So we re-read it with resized buffers.
136 result = MojoReadMessage(pipe_.get().value(),
137 num_bytes ? &data_buffer_[0] : nullptr,
138 &num_bytes,
139 num_handles ? &handle_buffer_[0] : nullptr,
140 &num_handles,
141 MOJO_READ_MESSAGE_FLAG_NONE);
144 DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
145 DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
146 return result;
149 void MessagePipeReader::ReadAvailableMessages() {
150 while (pipe_.is_valid()) {
151 MojoResult read_result = ReadMessageBytes();
152 if (read_result == MOJO_RESULT_SHOULD_WAIT)
153 break;
154 if (read_result != MOJO_RESULT_OK) {
155 DLOG(WARNING)
156 << "Pipe got error from ReadMessage(). Closing: " << read_result;
157 OnPipeError(read_result);
158 Close();
159 break;
162 OnMessageReceived();
167 void MessagePipeReader::ReadMessagesThenWait() {
168 while (true) {
169 ReadAvailableMessages();
170 if (!pipe_.is_valid())
171 break;
172 // |Wait()| is safe to call only after all messages are read.
173 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
174 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
175 // MessagePipe.
176 MojoResult result =
177 async_waiter_->Wait(pipe_.get().value(), MOJO_HANDLE_SIGNAL_READABLE);
178 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
179 // that have been arrived after the last |ReadAvailableMessages()|.
180 // We have to consume then and retry in that case.
181 if (result != MOJO_RESULT_ALREADY_EXISTS) {
182 if (result != MOJO_RESULT_OK) {
183 LOG(ERROR) << "Failed to wait on the pipe. Result is " << result;
184 OnPipeError(result);
185 Close();
188 break;
193 void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
194 CloseWithErrorIfPending();
195 if (!IsValid()) {
196 // There was a pending error and it closed the pipe.
197 // We cannot do the work anymore.
198 return;
201 if (wait_result != MOJO_RESULT_OK) {
202 if (wait_result != MOJO_RESULT_ABORTED) {
203 // FAILED_PRECONDITION happens every time the peer is dead so
204 // it isn't worth polluting the log message.
205 LOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
206 << "Pipe got error from the waiter. Closing: " << wait_result;
207 OnPipeError(wait_result);
210 Close();
211 return;
214 ReadMessagesThenWait();
217 void MessagePipeReader::DelayedDeleter::operator()(
218 MessagePipeReader* ptr) const {
219 ptr->Close();
220 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE,
221 base::Bind(&DeleteNow, ptr));
224 } // namespace internal
225 } // namespace IPC