1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_nacl.h"
13 #include "base/bind.h"
14 #include "base/logging.h"
15 #include "base/message_loop/message_loop_proxy.h"
16 #include "base/synchronization/lock.h"
17 #include "base/task_runner_util.h"
18 #include "base/threading/simple_thread.h"
19 #include "ipc/file_descriptor_set_posix.h"
20 #include "ipc/ipc_listener.h"
21 #include "ipc/ipc_logging.h"
22 #include "native_client/src/public/imc_syscalls.h"
23 #include "native_client/src/public/imc_types.h"
27 struct MessageContents
{
28 std::vector
<char> data
;
34 bool ReadDataOnReaderThread(int pipe
, MessageContents
* contents
) {
39 contents
->data
.resize(Channel::kReadBufferSize
);
40 contents
->fds
.resize(FileDescriptorSet::kMaxDescriptorsPerMessage
);
42 NaClAbiNaClImcMsgIoVec iov
= { &contents
->data
[0], contents
->data
.size() };
43 NaClAbiNaClImcMsgHdr msg
= {
44 &iov
, 1, &contents
->fds
[0], contents
->fds
.size()
47 int bytes_read
= imc_recvmsg(pipe
, &msg
, 0);
49 if (bytes_read
<= 0) {
50 // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either
51 // due to error or for regular shutdown).
52 contents
->data
.clear();
53 contents
->fds
.clear();
57 // Resize the buffers down to the number of bytes and fds we actually read.
58 contents
->data
.resize(bytes_read
);
59 contents
->fds
.resize(msg
.desc_length
);
65 class Channel::ChannelImpl::ReaderThreadRunner
66 : public base::DelegateSimpleThread::Delegate
{
68 // |pipe|: A file descriptor from which we will read using imc_recvmsg.
69 // |data_read_callback|: A callback we invoke (on the main thread) when we
71 // |failure_callback|: A callback we invoke when we have a failure reading
73 // |main_message_loop|: A proxy for the main thread, where we will invoke the
77 base::Callback
<void (scoped_ptr
<MessageContents
>)> data_read_callback
,
78 base::Callback
<void ()> failure_callback
,
79 scoped_refptr
<base::MessageLoopProxy
> main_message_loop
);
81 // DelegateSimpleThread implementation. Reads data from the pipe in a loop
82 // until either we are told to quit or a read fails.
83 virtual void Run() OVERRIDE
;
87 base::Callback
<void (scoped_ptr
<MessageContents
>)> data_read_callback_
;
88 base::Callback
<void ()> failure_callback_
;
89 scoped_refptr
<base::MessageLoopProxy
> main_message_loop_
;
91 DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner
);
94 Channel::ChannelImpl::ReaderThreadRunner::ReaderThreadRunner(
96 base::Callback
<void (scoped_ptr
<MessageContents
>)> data_read_callback
,
97 base::Callback
<void ()> failure_callback
,
98 scoped_refptr
<base::MessageLoopProxy
> main_message_loop
)
100 data_read_callback_(data_read_callback
),
101 failure_callback_(failure_callback
),
102 main_message_loop_(main_message_loop
) {
105 void Channel::ChannelImpl::ReaderThreadRunner::Run() {
107 scoped_ptr
<MessageContents
> msg_contents(new MessageContents
);
108 bool success
= ReadDataOnReaderThread(pipe_
, msg_contents
.get());
110 main_message_loop_
->PostTask(FROM_HERE
,
111 base::Bind(data_read_callback_
, base::Passed(&msg_contents
)));
113 main_message_loop_
->PostTask(FROM_HERE
, failure_callback_
);
114 // Because the read failed, we know we're going to quit. Don't bother
115 // trying to read again.
121 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle
& channel_handle
,
124 : ChannelReader(listener
),
126 waiting_connect_(true),
128 pipe_name_(channel_handle
.name
),
129 weak_ptr_factory_(this) {
130 if (!CreatePipe(channel_handle
)) {
131 // The pipe may have been closed already.
132 const char *modestr
= (mode_
& MODE_SERVER_FLAG
) ? "server" : "client";
133 LOG(WARNING
) << "Unable to create pipe named \"" << channel_handle
.name
134 << "\" in " << modestr
<< " mode";
138 Channel::ChannelImpl::~ChannelImpl() {
142 base::ProcessId
Channel::ChannelImpl::peer_pid() const {
143 // This shouldn't actually get used in the untrusted side of the proxy, and we
144 // don't have the real pid anyway.
148 bool Channel::ChannelImpl::Connect() {
150 DLOG(WARNING
) << "Channel creation failed: " << pipe_name_
;
154 // Note that Connect is called on the "Channel" thread (i.e., the same thread
155 // where Channel::Send will be called, and the same thread that should receive
156 // messages). The constructor might be invoked on another thread (see
157 // ChannelProxy for an example of that). Therefore, we must wait until Connect
158 // is called to decide which MessageLoopProxy to pass to ReaderThreadRunner.
159 reader_thread_runner_
.reset(
160 new ReaderThreadRunner(
162 base::Bind(&Channel::ChannelImpl::DidRecvMsg
,
163 weak_ptr_factory_
.GetWeakPtr()),
164 base::Bind(&Channel::ChannelImpl::ReadDidFail
,
165 weak_ptr_factory_
.GetWeakPtr()),
166 base::MessageLoopProxy::current()));
167 reader_thread_
.reset(
168 new base::DelegateSimpleThread(reader_thread_runner_
.get(),
169 "ipc_channel_nacl reader thread"));
170 reader_thread_
->Start();
171 waiting_connect_
= false;
172 // If there were any messages queued before connection, send them.
173 ProcessOutgoingMessages();
174 base::MessageLoopProxy::current()->PostTask(FROM_HERE
,
175 base::Bind(&Channel::ChannelImpl::CallOnChannelConnected
,
176 weak_ptr_factory_
.GetWeakPtr()));
181 void Channel::ChannelImpl::Close() {
182 // For now, we assume that at shutdown, the reader thread will be woken with
183 // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we
184 // might simply be killed with no chance to clean up anyway :-).
185 // If untrusted code tries to close the channel prior to shutdown, it's likely
187 // TODO(dmichael): Can we do anything smarter here to make sure the reader
188 // thread wakes up and quits?
189 reader_thread_
->Join();
192 reader_thread_runner_
.reset();
193 reader_thread_
.reset();
195 output_queue_
.clear();
198 bool Channel::ChannelImpl::Send(Message
* message
) {
199 DVLOG(2) << "sending message @" << message
<< " on channel @" << this
200 << " with type " << message
->type();
201 scoped_ptr
<Message
> message_ptr(message
);
203 #ifdef IPC_MESSAGE_LOG_ENABLED
204 Logging::GetInstance()->OnSendMessage(message_ptr
.get(), "");
205 #endif // IPC_MESSAGE_LOG_ENABLED
207 message
->TraceMessageBegin();
208 output_queue_
.push_back(linked_ptr
<Message
>(message_ptr
.release()));
209 if (!waiting_connect_
)
210 return ProcessOutgoingMessages();
215 void Channel::ChannelImpl::DidRecvMsg(scoped_ptr
<MessageContents
> contents
) {
216 // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from
217 // the reader thread after Close is called. If so, we ignore it.
221 linked_ptr
<std::vector
<char> > data(new std::vector
<char>);
222 data
->swap(contents
->data
);
223 read_queue_
.push_back(data
);
225 input_fds_
.insert(input_fds_
.end(),
226 contents
->fds
.begin(), contents
->fds
.end());
227 contents
->fds
.clear();
229 // In POSIX, we would be told when there are bytes to read by implementing
230 // OnFileCanReadWithoutBlocking in MessageLoopForIO::Watcher. In NaCl, we
231 // instead know at this point because the reader thread posted some data to
233 ProcessIncomingMessages();
236 void Channel::ChannelImpl::ReadDidFail() {
240 bool Channel::ChannelImpl::CreatePipe(
241 const IPC::ChannelHandle
& channel_handle
) {
244 // There's one possible case in NaCl:
245 // 1) It's a channel wrapping a pipe that is given to us.
246 // We don't support these:
247 // 2) It's for a named channel.
248 // 3) It's for a client that we implement ourself.
249 // 4) It's the initial IPC channel.
251 if (channel_handle
.socket
.fd
== -1) {
255 pipe_
= channel_handle
.socket
.fd
;
259 bool Channel::ChannelImpl::ProcessOutgoingMessages() {
260 DCHECK(!waiting_connect_
); // Why are we trying to send messages if there's
262 if (output_queue_
.empty())
268 // Write out all the messages. The trusted implementation is guaranteed to not
269 // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg.
270 while (!output_queue_
.empty()) {
271 linked_ptr
<Message
> msg
= output_queue_
.front();
272 output_queue_
.pop_front();
274 int fds
[FileDescriptorSet::kMaxDescriptorsPerMessage
];
275 const size_t num_fds
= msg
->file_descriptor_set()->size();
276 DCHECK(num_fds
<= FileDescriptorSet::kMaxDescriptorsPerMessage
);
277 msg
->file_descriptor_set()->GetDescriptors(fds
);
279 NaClAbiNaClImcMsgIoVec iov
= {
280 const_cast<void*>(msg
->data()), msg
->size()
282 NaClAbiNaClImcMsgHdr msgh
= { &iov
, 1, fds
, num_fds
};
283 ssize_t bytes_written
= imc_sendmsg(pipe_
, &msgh
, 0);
285 DCHECK(bytes_written
); // The trusted side shouldn't return 0.
286 if (bytes_written
< 0) {
287 // The trusted side should only ever give us an error of EPIPE. We
288 // should never be interrupted, nor should we get EAGAIN.
289 DCHECK(errno
== EPIPE
);
291 PLOG(ERROR
) << "pipe_ error on "
293 << " Currently writing message of size: "
297 msg
->file_descriptor_set()->CommitAll();
301 DVLOG(2) << "sent message @" << msg
.get() << " with type " << msg
->type()
302 << " on fd " << pipe_
;
307 void Channel::ChannelImpl::CallOnChannelConnected() {
308 listener()->OnChannelConnected(peer_pid());
311 Channel::ChannelImpl::ReadState
Channel::ChannelImpl::ReadData(
318 if (read_queue_
.empty())
320 while (!read_queue_
.empty() && *bytes_read
< buffer_len
) {
321 linked_ptr
<std::vector
<char> > vec(read_queue_
.front());
322 size_t bytes_to_read
= buffer_len
- *bytes_read
;
323 if (vec
->size() <= bytes_to_read
) {
324 // We can read and discard the entire vector.
325 std::copy(vec
->begin(), vec
->end(), buffer
+ *bytes_read
);
326 *bytes_read
+= vec
->size();
327 read_queue_
.pop_front();
329 // Read all the bytes we can and discard them from the front of the
330 // vector. (This can be slowish, since erase has to move the back of the
331 // vector to the front, but it's hopefully a temporary hack and it keeps
333 std::copy(vec
->begin(), vec
->begin() + bytes_to_read
,
334 buffer
+ *bytes_read
);
335 vec
->erase(vec
->begin(), vec
->begin() + bytes_to_read
);
336 *bytes_read
+= bytes_to_read
;
339 return READ_SUCCEEDED
;
342 bool Channel::ChannelImpl::WillDispatchInputMessage(Message
* msg
) {
343 uint16 header_fds
= msg
->header()->num_fds
;
344 CHECK(header_fds
== input_fds_
.size());
346 return true; // Nothing to do.
348 // The shenaniganery below with &foo.front() requires input_fds_ to have
349 // contiguous underlying storage (such as a simple array or a std::vector).
350 // This is why the header warns not to make input_fds_ a deque<>.
351 msg
->file_descriptor_set()->SetDescriptors(&input_fds_
.front(),
357 bool Channel::ChannelImpl::DidEmptyInputBuffers() {
358 // When the input data buffer is empty, the fds should be too.
359 return input_fds_
.empty();
362 void Channel::ChannelImpl::HandleInternalMessage(const Message
& msg
) {
363 // The trusted side IPC::Channel should handle the "hello" handshake; we
364 // should not receive the "Hello" message.
368 //------------------------------------------------------------------------------
369 // Channel's methods simply call through to ChannelImpl.
371 Channel::Channel(const IPC::ChannelHandle
& channel_handle
,
374 : channel_impl_(new ChannelImpl(channel_handle
, mode
, listener
)) {
377 Channel::~Channel() {
378 delete channel_impl_
;
381 bool Channel::Connect() {
382 return channel_impl_
->Connect();
385 void Channel::Close() {
386 channel_impl_
->Close();
389 base::ProcessId
Channel::peer_pid() const {
390 return channel_impl_
->peer_pid();
393 bool Channel::Send(Message
* message
) {
394 return channel_impl_
->Send(message
);