1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
10 #include "base/bind_helpers.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/mojo/async_handle_waiter.h"
16 #include "ipc/mojo/ipc_channel_mojo.h"
21 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle
,
22 MessagePipeReader::Delegate
* delegate
)
23 : pipe_(handle
.Pass()),
24 handle_copy_(pipe_
.get().value()),
27 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady
,
28 base::Unretained(this)))),
29 pending_send_error_(MOJO_RESULT_OK
) {
32 MessagePipeReader::~MessagePipeReader() {
33 DCHECK(thread_checker_
.CalledOnValidThread());
34 // The pipe should be closed before deletion.
38 void MessagePipeReader::Close() {
39 DCHECK(thread_checker_
.CalledOnValidThread());
40 async_waiter_
.reset();
45 void MessagePipeReader::CloseWithError(MojoResult error
) {
46 DCHECK(thread_checker_
.CalledOnValidThread());
51 void MessagePipeReader::CloseWithErrorIfPending() {
52 DCHECK(thread_checker_
.CalledOnValidThread());
53 MojoResult pending_error
= base::subtle::NoBarrier_Load(&pending_send_error_
);
54 if (pending_error
== MOJO_RESULT_OK
)
56 // NOTE: This races with Send(), and therefore the value of
57 // pending_send_error() can change.
58 CloseWithError(pending_error
);
62 void MessagePipeReader::CloseWithErrorLater(MojoResult error
) {
63 DCHECK_NE(error
, MOJO_RESULT_OK
);
64 // NOTE: No assumptions about the value of |pending_send_error_| or whether or
65 // not the error has been signaled can be made. If Send() is called
66 // immediately before Close() and errors, it's possible for the error to not
68 base::subtle::NoBarrier_Store(&pending_send_error_
, error
);
71 bool MessagePipeReader::Send(scoped_ptr
<Message
> message
) {
72 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
73 "MessagePipeReader::Send",
75 TRACE_EVENT_FLAG_FLOW_OUT
);
76 std::vector
<MojoHandle
> handles
;
77 MojoResult result
= MOJO_RESULT_OK
;
78 result
= ChannelMojo::ReadFromMessageAttachmentSet(message
.get(), &handles
);
79 if (result
== MOJO_RESULT_OK
) {
80 result
= MojoWriteMessage(handle(),
82 static_cast<uint32_t>(message
->size()),
83 handles
.empty() ? nullptr : &handles
[0],
84 static_cast<uint32_t>(handles
.size()),
85 MOJO_WRITE_MESSAGE_FLAG_NONE
);
88 if (result
!= MOJO_RESULT_OK
) {
89 std::for_each(handles
.begin(), handles
.end(), &MojoClose
);
90 // We cannot call CloseWithError() here as Send() is protected by
91 // ChannelMojo's lock and CloseWithError() could re-enter ChannelMojo. We
92 // cannot call CloseWithError() also because Send() can be called from
93 // non-UI thread while OnPipeError() expects to be called on IO thread.
94 CloseWithErrorLater(result
);
101 void MessagePipeReader::OnMessageReceived() {
102 Message
message(data_buffer().empty() ? "" : &data_buffer()[0],
103 static_cast<uint32_t>(data_buffer().size()));
105 std::vector
<MojoHandle
> handle_buffer
;
106 TakeHandleBuffer(&handle_buffer
);
107 MojoResult write_result
=
108 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer
, &message
);
109 if (write_result
!= MOJO_RESULT_OK
) {
110 CloseWithError(write_result
);
114 TRACE_EVENT_WITH_FLOW0(TRACE_DISABLED_BY_DEFAULT("ipc.flow"),
115 "MessagePipeReader::OnMessageReceived",
117 TRACE_EVENT_FLAG_FLOW_IN
);
118 delegate_
->OnMessageReceived(message
);
121 void MessagePipeReader::OnPipeClosed() {
122 DCHECK(thread_checker_
.CalledOnValidThread());
125 delegate_
->OnPipeClosed(this);
129 void MessagePipeReader::OnPipeError(MojoResult error
) {
130 DCHECK(thread_checker_
.CalledOnValidThread());
133 delegate_
->OnPipeError(this);
136 MojoResult
MessagePipeReader::ReadMessageBytes() {
137 DCHECK(thread_checker_
.CalledOnValidThread());
138 DCHECK(handle_buffer_
.empty());
140 uint32_t num_bytes
= static_cast<uint32_t>(data_buffer_
.size());
141 uint32_t num_handles
= 0;
142 MojoResult result
= MojoReadMessage(pipe_
.get().value(),
143 num_bytes
? &data_buffer_
[0] : nullptr,
147 MOJO_READ_MESSAGE_FLAG_NONE
);
148 data_buffer_
.resize(num_bytes
);
149 handle_buffer_
.resize(num_handles
);
150 if (result
== MOJO_RESULT_RESOURCE_EXHAUSTED
) {
151 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
152 // it needs more bufer. So we re-read it with resized buffers.
153 result
= MojoReadMessage(pipe_
.get().value(),
154 num_bytes
? &data_buffer_
[0] : nullptr,
156 num_handles
? &handle_buffer_
[0] : nullptr,
158 MOJO_READ_MESSAGE_FLAG_NONE
);
161 DCHECK(0 == num_bytes
|| data_buffer_
.size() == num_bytes
);
162 DCHECK(0 == num_handles
|| handle_buffer_
.size() == num_handles
);
166 void MessagePipeReader::ReadAvailableMessages() {
167 DCHECK(thread_checker_
.CalledOnValidThread());
168 while (pipe_
.is_valid()) {
169 MojoResult read_result
= ReadMessageBytes();
170 if (read_result
== MOJO_RESULT_SHOULD_WAIT
)
172 if (read_result
!= MOJO_RESULT_OK
) {
174 << "Pipe got error from ReadMessage(). Closing: " << read_result
;
175 OnPipeError(read_result
);
185 void MessagePipeReader::ReadMessagesThenWait() {
186 DCHECK(thread_checker_
.CalledOnValidThread());
188 ReadAvailableMessages();
189 if (!pipe_
.is_valid())
191 // |Wait()| is safe to call only after all messages are read.
192 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
193 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
196 async_waiter_
->Wait(pipe_
.get().value(), MOJO_HANDLE_SIGNAL_READABLE
);
197 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
198 // that have been arrived after the last |ReadAvailableMessages()|.
199 // We have to consume then and retry in that case.
200 if (result
!= MOJO_RESULT_ALREADY_EXISTS
) {
201 if (result
!= MOJO_RESULT_OK
) {
202 LOG(ERROR
) << "Failed to wait on the pipe. Result is " << result
;
212 void MessagePipeReader::PipeIsReady(MojoResult wait_result
) {
213 DCHECK(thread_checker_
.CalledOnValidThread());
214 CloseWithErrorIfPending();
216 // There was a pending error and it closed the pipe.
217 // We cannot do the work anymore.
221 if (wait_result
!= MOJO_RESULT_OK
) {
222 if (wait_result
!= MOJO_RESULT_ABORTED
) {
223 // FAILED_PRECONDITION happens every time the peer is dead so
224 // it isn't worth polluting the log message.
225 LOG_IF(WARNING
, wait_result
!= MOJO_RESULT_FAILED_PRECONDITION
)
226 << "Pipe got error from the waiter. Closing: " << wait_result
;
227 OnPipeError(wait_result
);
234 ReadMessagesThenWait();
237 void MessagePipeReader::DelayedDeleter::operator()(
238 MessagePipeReader
* ptr
) const {
240 base::ThreadTaskRunnerHandle::Get()->PostTask(FROM_HERE
,
241 base::Bind(&DeleteNow
, ptr
));
244 } // namespace internal