1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_message_pipe_reader.h"
8 #include "base/bind_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/message_loop/message_loop_proxy.h"
12 #include "ipc/mojo/async_handle_waiter.h"
13 #include "ipc/mojo/ipc_channel_mojo.h"
18 MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle
,
19 MessagePipeReader::Delegate
* delegate
)
20 : pipe_(handle
.Pass()),
23 new AsyncHandleWaiter(base::Bind(&MessagePipeReader::PipeIsReady
,
24 base::Unretained(this)))) {
27 MessagePipeReader::~MessagePipeReader() {
31 void MessagePipeReader::Close() {
32 async_waiter_
.reset();
37 void MessagePipeReader::CloseWithError(MojoResult error
) {
42 bool MessagePipeReader::Send(scoped_ptr
<Message
> message
) {
45 message
->TraceMessageBegin();
46 std::vector
<MojoHandle
> handles
;
47 MojoResult result
= MOJO_RESULT_OK
;
48 #if defined(OS_POSIX) && !defined(OS_NACL)
49 result
= ChannelMojo::ReadFromMessageAttachmentSet(message
.get(), &handles
);
51 if (result
== MOJO_RESULT_OK
) {
52 result
= MojoWriteMessage(handle(),
54 static_cast<uint32
>(message
->size()),
55 handles
.empty() ? nullptr : &handles
[0],
56 static_cast<uint32
>(handles
.size()),
57 MOJO_WRITE_MESSAGE_FLAG_NONE
);
60 if (result
!= MOJO_RESULT_OK
) {
61 std::for_each(handles
.begin(), handles
.end(), &MojoClose
);
62 CloseWithError(result
);
69 void MessagePipeReader::OnMessageReceived() {
70 Message
message(data_buffer().empty() ? "" : &data_buffer()[0],
71 static_cast<uint32
>(data_buffer().size()));
73 std::vector
<MojoHandle
> handle_buffer
;
74 TakeHandleBuffer(&handle_buffer
);
75 #if defined(OS_POSIX) && !defined(OS_NACL)
76 MojoResult write_result
=
77 ChannelMojo::WriteToMessageAttachmentSet(handle_buffer
, &message
);
78 if (write_result
!= MOJO_RESULT_OK
) {
79 CloseWithError(write_result
);
83 DCHECK(handle_buffer
.empty());
86 message
.TraceMessageEnd();
87 delegate_
->OnMessageReceived(message
);
90 void MessagePipeReader::OnPipeClosed() {
93 delegate_
->OnPipeClosed(this);
97 void MessagePipeReader::OnPipeError(MojoResult error
) {
100 delegate_
->OnPipeError(this);
103 MojoResult
MessagePipeReader::ReadMessageBytes() {
104 DCHECK(handle_buffer_
.empty());
106 uint32_t num_bytes
= static_cast<uint32_t>(data_buffer_
.size());
107 uint32_t num_handles
= 0;
108 MojoResult result
= MojoReadMessage(pipe_
.get().value(),
109 num_bytes
? &data_buffer_
[0] : nullptr,
113 MOJO_READ_MESSAGE_FLAG_NONE
);
114 data_buffer_
.resize(num_bytes
);
115 handle_buffer_
.resize(num_handles
);
116 if (result
== MOJO_RESULT_RESOURCE_EXHAUSTED
) {
117 // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
118 // it needs more bufer. So we re-read it with resized buffers.
119 result
= MojoReadMessage(pipe_
.get().value(),
120 num_bytes
? &data_buffer_
[0] : nullptr,
122 num_handles
? &handle_buffer_
[0] : nullptr,
124 MOJO_READ_MESSAGE_FLAG_NONE
);
127 DCHECK(0 == num_bytes
|| data_buffer_
.size() == num_bytes
);
128 DCHECK(0 == num_handles
|| handle_buffer_
.size() == num_handles
);
132 void MessagePipeReader::ReadAvailableMessages() {
133 while (pipe_
.is_valid()) {
134 MojoResult read_result
= ReadMessageBytes();
135 if (read_result
== MOJO_RESULT_SHOULD_WAIT
)
137 if (read_result
!= MOJO_RESULT_OK
) {
138 // FAILED_PRECONDITION means that all the received messages
139 // got consumed and the peer is already closed.
140 if (read_result
!= MOJO_RESULT_FAILED_PRECONDITION
) {
142 << "Pipe got error from ReadMessage(). Closing: " << read_result
;
143 OnPipeError(read_result
);
155 void MessagePipeReader::ReadMessagesThenWait() {
157 ReadAvailableMessages();
158 if (!pipe_
.is_valid())
160 // |Wait()| is safe to call only after all messages are read.
161 // If can fail with |MOJO_RESULT_ALREADY_EXISTS| otherwise.
162 // Also, we don't use MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
165 async_waiter_
->Wait(pipe_
.get().value(), MOJO_HANDLE_SIGNAL_READABLE
);
166 // If the result is |MOJO_RESULT_ALREADY_EXISTS|, there could be messages
167 // that have been arrived after the last |ReadAvailableMessages()|.
168 // We have to consume then and retry in that case.
169 if (result
!= MOJO_RESULT_ALREADY_EXISTS
) {
170 if (result
!= MOJO_RESULT_OK
) {
171 LOG(ERROR
) << "Failed to wait on the pipe. Result is " << result
;
181 void MessagePipeReader::PipeIsReady(MojoResult wait_result
) {
182 if (wait_result
!= MOJO_RESULT_OK
) {
183 if (wait_result
!= MOJO_RESULT_ABORTED
) {
184 // FAILED_PRECONDITION happens every time the peer is dead so
185 // it isn't worth polluting the log message.
186 LOG_IF(WARNING
, wait_result
!= MOJO_RESULT_FAILED_PRECONDITION
)
187 << "Pipe got error from the waiter. Closing: " << wait_result
;
188 OnPipeError(wait_result
);
195 ReadMessagesThenWait();
198 void MessagePipeReader::DelayedDeleter::operator()(
199 MessagePipeReader
* ptr
) const {
201 base::MessageLoopProxy::current()->PostTask(
202 FROM_HERE
, base::Bind(&DeleteNow
, ptr
));
205 } // namespace internal