Drop frames less than a millisecond apart in media time.
[chromium-blink-merge.git] / ipc / ipc_channel_nacl.cc
blobf5b33cea41adc83a4035eb0d3d64db124a3a972f
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_nacl.h"
7 #include <errno.h>
8 #include <stddef.h>
9 #include <sys/types.h>
11 #include <algorithm>
13 #include "base/bind.h"
14 #include "base/logging.h"
15 #include "base/single_thread_task_runner.h"
16 #include "base/synchronization/lock.h"
17 #include "base/task_runner_util.h"
18 #include "base/thread_task_runner_handle.h"
19 #include "base/threading/simple_thread.h"
20 #include "ipc/ipc_listener.h"
21 #include "ipc/ipc_logging.h"
22 #include "ipc/ipc_message_attachment_set.h"
23 #include "native_client/src/public/imc_syscalls.h"
24 #include "native_client/src/public/imc_types.h"
26 namespace IPC {
28 struct MessageContents {
29 std::vector<char> data;
30 std::vector<int> fds;
33 namespace {
35 bool ReadDataOnReaderThread(int pipe, MessageContents* contents) {
36 DCHECK(pipe >= 0);
37 if (pipe < 0)
38 return false;
40 contents->data.resize(Channel::kReadBufferSize);
41 contents->fds.resize(NACL_ABI_IMC_DESC_MAX);
43 NaClAbiNaClImcMsgIoVec iov = { &contents->data[0], contents->data.size() };
44 NaClAbiNaClImcMsgHdr msg = {
45 &iov, 1, &contents->fds[0], contents->fds.size()
48 int bytes_read = imc_recvmsg(pipe, &msg, 0);
50 if (bytes_read <= 0) {
51 // NaClIPCAdapter::BlockingReceive returns -1 when the pipe closes (either
52 // due to error or for regular shutdown).
53 contents->data.clear();
54 contents->fds.clear();
55 return false;
57 DCHECK(bytes_read);
58 // Resize the buffers down to the number of bytes and fds we actually read.
59 contents->data.resize(bytes_read);
60 contents->fds.resize(msg.desc_length);
61 return true;
64 } // namespace
66 class ChannelNacl::ReaderThreadRunner
67 : public base::DelegateSimpleThread::Delegate {
68 public:
69 // |pipe|: A file descriptor from which we will read using imc_recvmsg.
70 // |data_read_callback|: A callback we invoke (on the main thread) when we
71 // have read data.
72 // |failure_callback|: A callback we invoke when we have a failure reading
73 // from |pipe|.
74 // |main_message_loop|: A proxy for the main thread, where we will invoke the
75 // above callbacks.
76 ReaderThreadRunner(
77 int pipe,
78 base::Callback<void(scoped_ptr<MessageContents>)> data_read_callback,
79 base::Callback<void()> failure_callback,
80 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner);
82 // DelegateSimpleThread implementation. Reads data from the pipe in a loop
83 // until either we are told to quit or a read fails.
84 void Run() override;
86 private:
87 int pipe_;
88 base::Callback<void (scoped_ptr<MessageContents>)> data_read_callback_;
89 base::Callback<void ()> failure_callback_;
90 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner_;
92 DISALLOW_COPY_AND_ASSIGN(ReaderThreadRunner);
95 ChannelNacl::ReaderThreadRunner::ReaderThreadRunner(
96 int pipe,
97 base::Callback<void(scoped_ptr<MessageContents>)> data_read_callback,
98 base::Callback<void()> failure_callback,
99 scoped_refptr<base::SingleThreadTaskRunner> main_task_runner)
100 : pipe_(pipe),
101 data_read_callback_(data_read_callback),
102 failure_callback_(failure_callback),
103 main_task_runner_(main_task_runner) {
106 void ChannelNacl::ReaderThreadRunner::Run() {
107 while (true) {
108 scoped_ptr<MessageContents> msg_contents(new MessageContents);
109 bool success = ReadDataOnReaderThread(pipe_, msg_contents.get());
110 if (success) {
111 main_task_runner_->PostTask(
112 FROM_HERE,
113 base::Bind(data_read_callback_, base::Passed(&msg_contents)));
114 } else {
115 main_task_runner_->PostTask(FROM_HERE, failure_callback_);
116 // Because the read failed, we know we're going to quit. Don't bother
117 // trying to read again.
118 return;
123 ChannelNacl::ChannelNacl(const IPC::ChannelHandle& channel_handle,
124 Mode mode,
125 Listener* listener)
126 : ChannelReader(listener),
127 mode_(mode),
128 waiting_connect_(true),
129 pipe_(-1),
130 pipe_name_(channel_handle.name),
131 weak_ptr_factory_(this) {
132 if (!CreatePipe(channel_handle)) {
133 // The pipe may have been closed already.
134 const char *modestr = (mode_ & MODE_SERVER_FLAG) ? "server" : "client";
135 LOG(WARNING) << "Unable to create pipe named \"" << channel_handle.name
136 << "\" in " << modestr << " mode";
140 ChannelNacl::~ChannelNacl() {
141 Close();
144 base::ProcessId ChannelNacl::GetPeerPID() const {
145 // This shouldn't actually get used in the untrusted side of the proxy, and we
146 // don't have the real pid anyway.
147 return -1;
150 base::ProcessId ChannelNacl::GetSelfPID() const {
151 return -1;
154 bool ChannelNacl::Connect() {
155 if (pipe_ == -1) {
156 DLOG(WARNING) << "Channel creation failed: " << pipe_name_;
157 return false;
160 // Note that Connect is called on the "Channel" thread (i.e., the same thread
161 // where Channel::Send will be called, and the same thread that should receive
162 // messages). The constructor might be invoked on another thread (see
163 // ChannelProxy for an example of that). Therefore, we must wait until Connect
164 // is called to decide which SingleThreadTaskRunner to pass to
165 // ReaderThreadRunner.
166 reader_thread_runner_.reset(new ReaderThreadRunner(
167 pipe_,
168 base::Bind(&ChannelNacl::DidRecvMsg, weak_ptr_factory_.GetWeakPtr()),
169 base::Bind(&ChannelNacl::ReadDidFail, weak_ptr_factory_.GetWeakPtr()),
170 base::ThreadTaskRunnerHandle::Get()));
171 reader_thread_.reset(
172 new base::DelegateSimpleThread(reader_thread_runner_.get(),
173 "ipc_channel_nacl reader thread"));
174 reader_thread_->Start();
175 waiting_connect_ = false;
176 // If there were any messages queued before connection, send them.
177 ProcessOutgoingMessages();
178 base::ThreadTaskRunnerHandle::Get()->PostTask(
179 FROM_HERE, base::Bind(&ChannelNacl::CallOnChannelConnected,
180 weak_ptr_factory_.GetWeakPtr()));
182 return true;
185 void ChannelNacl::Close() {
186 // For now, we assume that at shutdown, the reader thread will be woken with
187 // a failure (see NaClIPCAdapter::BlockingRead and CloseChannel). Or... we
188 // might simply be killed with no chance to clean up anyway :-).
189 // If untrusted code tries to close the channel prior to shutdown, it's likely
190 // to hang.
191 // TODO(dmichael): Can we do anything smarter here to make sure the reader
192 // thread wakes up and quits?
193 reader_thread_->Join();
194 close(pipe_);
195 pipe_ = -1;
196 reader_thread_runner_.reset();
197 reader_thread_.reset();
198 read_queue_.clear();
199 output_queue_.clear();
202 bool ChannelNacl::Send(Message* message) {
203 DCHECK(!message->HasAttachments());
204 DVLOG(2) << "sending message @" << message << " on channel @" << this
205 << " with type " << message->type();
206 scoped_ptr<Message> message_ptr(message);
208 #ifdef IPC_MESSAGE_LOG_ENABLED
209 Logging::GetInstance()->OnSendMessage(message_ptr.get(), "");
210 #endif // IPC_MESSAGE_LOG_ENABLED
212 message->TraceMessageBegin();
213 output_queue_.push_back(linked_ptr<Message>(message_ptr.release()));
214 if (!waiting_connect_)
215 return ProcessOutgoingMessages();
217 return true;
220 void ChannelNacl::DidRecvMsg(scoped_ptr<MessageContents> contents) {
221 // Close sets the pipe to -1. It's possible we'll get a buffer sent to us from
222 // the reader thread after Close is called. If so, we ignore it.
223 if (pipe_ == -1)
224 return;
226 linked_ptr<std::vector<char> > data(new std::vector<char>);
227 data->swap(contents->data);
228 read_queue_.push_back(data);
230 input_fds_.insert(input_fds_.end(),
231 contents->fds.begin(), contents->fds.end());
232 contents->fds.clear();
234 // In POSIX, we would be told when there are bytes to read by implementing
235 // OnFileCanReadWithoutBlocking in MessageLoopForIO::Watcher. In NaCl, we
236 // instead know at this point because the reader thread posted some data to
237 // us.
238 ProcessIncomingMessages();
241 void ChannelNacl::ReadDidFail() {
242 Close();
245 bool ChannelNacl::CreatePipe(
246 const IPC::ChannelHandle& channel_handle) {
247 DCHECK(pipe_ == -1);
249 // There's one possible case in NaCl:
250 // 1) It's a channel wrapping a pipe that is given to us.
251 // We don't support these:
252 // 2) It's for a named channel.
253 // 3) It's for a client that we implement ourself.
254 // 4) It's the initial IPC channel.
256 if (channel_handle.socket.fd == -1) {
257 NOTIMPLEMENTED();
258 return false;
260 pipe_ = channel_handle.socket.fd;
261 return true;
264 bool ChannelNacl::ProcessOutgoingMessages() {
265 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
266 // no connection?
267 if (output_queue_.empty())
268 return true;
270 if (pipe_ == -1)
271 return false;
273 // Write out all the messages. The trusted implementation is guaranteed to not
274 // block. See NaClIPCAdapter::Send for the implementation of imc_sendmsg.
275 while (!output_queue_.empty()) {
276 linked_ptr<Message> msg = output_queue_.front();
277 output_queue_.pop_front();
279 int fds[MessageAttachmentSet::kMaxDescriptorsPerMessage];
280 const size_t num_fds = msg->attachment_set()->size();
281 DCHECK(num_fds <= MessageAttachmentSet::kMaxDescriptorsPerMessage);
282 msg->attachment_set()->PeekDescriptors(fds);
284 NaClAbiNaClImcMsgIoVec iov = {
285 const_cast<void*>(msg->data()), msg->size()
287 NaClAbiNaClImcMsgHdr msgh = { &iov, 1, fds, num_fds };
288 ssize_t bytes_written = imc_sendmsg(pipe_, &msgh, 0);
290 DCHECK(bytes_written); // The trusted side shouldn't return 0.
291 if (bytes_written < 0) {
292 // The trusted side should only ever give us an error of EPIPE. We
293 // should never be interrupted, nor should we get EAGAIN.
294 DCHECK(errno == EPIPE);
295 Close();
296 PLOG(ERROR) << "pipe_ error on "
297 << pipe_
298 << " Currently writing message of size: "
299 << msg->size();
300 return false;
301 } else {
302 msg->attachment_set()->CommitAll();
305 // Message sent OK!
306 DVLOG(2) << "sent message @" << msg.get() << " with type " << msg->type()
307 << " on fd " << pipe_;
309 return true;
312 void ChannelNacl::CallOnChannelConnected() {
313 listener()->OnChannelConnected(GetPeerPID());
316 ChannelNacl::ReadState ChannelNacl::ReadData(
317 char* buffer,
318 int buffer_len,
319 int* bytes_read) {
320 *bytes_read = 0;
321 if (pipe_ == -1)
322 return READ_FAILED;
323 if (read_queue_.empty())
324 return READ_PENDING;
325 while (!read_queue_.empty() && *bytes_read < buffer_len) {
326 linked_ptr<std::vector<char> > vec(read_queue_.front());
327 size_t bytes_to_read = buffer_len - *bytes_read;
328 if (vec->size() <= bytes_to_read) {
329 // We can read and discard the entire vector.
330 std::copy(vec->begin(), vec->end(), buffer + *bytes_read);
331 *bytes_read += vec->size();
332 read_queue_.pop_front();
333 } else {
334 // Read all the bytes we can and discard them from the front of the
335 // vector. (This can be slowish, since erase has to move the back of the
336 // vector to the front, but it's hopefully a temporary hack and it keeps
337 // the code simple).
338 std::copy(vec->begin(), vec->begin() + bytes_to_read,
339 buffer + *bytes_read);
340 vec->erase(vec->begin(), vec->begin() + bytes_to_read);
341 *bytes_read += bytes_to_read;
344 return READ_SUCCEEDED;
347 bool ChannelNacl::WillDispatchInputMessage(Message* msg) {
348 uint16 header_fds = msg->header()->num_fds;
349 CHECK(header_fds == input_fds_.size());
350 if (header_fds == 0)
351 return true; // Nothing to do.
353 // The shenaniganery below with &foo.front() requires input_fds_ to have
354 // contiguous underlying storage (such as a simple array or a std::vector).
355 // This is why the header warns not to make input_fds_ a deque<>.
356 msg->attachment_set()->AddDescriptorsToOwn(&input_fds_.front(), header_fds);
357 input_fds_.clear();
358 return true;
361 bool ChannelNacl::DidEmptyInputBuffers() {
362 // When the input data buffer is empty, the fds should be too.
363 return input_fds_.empty();
366 void ChannelNacl::HandleInternalMessage(const Message& msg) {
367 // The trusted side IPC::Channel should handle the "hello" handshake; we
368 // should not receive the "Hello" message.
369 NOTREACHED();
372 // Channel's methods
374 // static
375 scoped_ptr<Channel> Channel::Create(
376 const IPC::ChannelHandle &channel_handle, Mode mode, Listener* listener) {
377 return scoped_ptr<Channel>(
378 new ChannelNacl(channel_handle, mode, listener));
381 } // namespace IPC