1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/raw_channel.h"
14 #include "base/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/macros.h"
18 #include "base/memory/scoped_ptr.h"
19 #include "base/memory/weak_ptr.h"
20 #include "base/message_loop/message_loop.h"
21 #include "base/synchronization/lock.h"
22 #include "mojo/edk/embedder/platform_channel_utils_posix.h"
23 #include "mojo/edk/embedder/platform_handle.h"
24 #include "mojo/edk/embedder/platform_handle_vector.h"
25 #include "mojo/edk/system/transport_data.h"
32 class RawChannelPosix
: public RawChannel
,
33 public base::MessageLoopForIO::Watcher
{
35 explicit RawChannelPosix(embedder::ScopedPlatformHandle handle
);
36 ~RawChannelPosix() override
;
38 // |RawChannel| public methods:
39 size_t GetSerializedPlatformHandleSize() const override
;
42 // |RawChannel| protected methods:
43 // Actually override this so that we can send multiple messages with (only)
45 void EnqueueMessageNoLock(scoped_ptr
<MessageInTransit
> message
) override
;
46 // Override this to handle those extra FD-only messages.
47 bool OnReadMessageForRawChannel(
48 const MessageInTransit::View
& message_view
) override
;
49 IOResult
Read(size_t* bytes_read
) override
;
50 IOResult
ScheduleRead() override
;
51 embedder::ScopedPlatformHandleVectorPtr
GetReadPlatformHandles(
52 size_t num_platform_handles
,
53 const void* platform_handle_table
) override
;
54 IOResult
WriteNoLock(size_t* platform_handles_written
,
55 size_t* bytes_written
) override
;
56 IOResult
ScheduleWriteNoLock() override
;
57 bool OnInit() override
;
58 void OnShutdownNoLock(scoped_ptr
<ReadBuffer
> read_buffer
,
59 scoped_ptr
<WriteBuffer
> write_buffer
) override
;
61 // |base::MessageLoopForIO::Watcher| implementation:
62 void OnFileCanReadWithoutBlocking(int fd
) override
;
63 void OnFileCanWriteWithoutBlocking(int fd
) override
;
65 // Implements most of |Read()| (except for a bit of clean-up):
66 IOResult
ReadImpl(size_t* bytes_read
);
68 // Watches for |fd_| to become writable. Must be called on the I/O thread.
71 embedder::ScopedPlatformHandle fd_
;
73 // The following members are only used on the I/O thread:
74 scoped_ptr
<base::MessageLoopForIO::FileDescriptorWatcher
> read_watcher_
;
75 scoped_ptr
<base::MessageLoopForIO::FileDescriptorWatcher
> write_watcher_
;
79 std::deque
<embedder::PlatformHandle
> read_platform_handles_
;
81 // The following members are used on multiple threads and protected by
85 // This is used for posting tasks from write threads to the I/O thread. It
86 // must only be accessed under |write_lock_|. The weak pointers it produces
87 // are only used/invalidated on the I/O thread.
88 base::WeakPtrFactory
<RawChannelPosix
> weak_ptr_factory_
;
90 DISALLOW_COPY_AND_ASSIGN(RawChannelPosix
);
93 RawChannelPosix::RawChannelPosix(embedder::ScopedPlatformHandle handle
)
96 pending_write_(false),
97 weak_ptr_factory_(this) {
98 DCHECK(fd_
.is_valid());
101 RawChannelPosix::~RawChannelPosix() {
102 DCHECK(!pending_read_
);
103 DCHECK(!pending_write_
);
105 // No need to take the |write_lock()| here -- if there are still weak pointers
106 // outstanding, then we're hosed anyway (since we wouldn't be able to
107 // invalidate them cleanly, since we might not be on the I/O thread).
108 DCHECK(!weak_ptr_factory_
.HasWeakPtrs());
110 // These must have been shut down/destroyed on the I/O thread.
111 DCHECK(!read_watcher_
);
112 DCHECK(!write_watcher_
);
114 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
117 size_t RawChannelPosix::GetSerializedPlatformHandleSize() const {
118 // We don't actually need any space on POSIX (since we just send FDs).
122 void RawChannelPosix::EnqueueMessageNoLock(
123 scoped_ptr
<MessageInTransit
> message
) {
124 if (message
->transport_data()) {
125 embedder::PlatformHandleVector
* const platform_handles
=
126 message
->transport_data()->platform_handles();
127 if (platform_handles
&&
128 platform_handles
->size() > embedder::kPlatformChannelMaxNumHandles
) {
129 // We can't attach all the FDs to a single message, so we have to "split"
130 // the message. Send as many control messages as needed first with FDs
131 // attached (and no data).
133 for (; platform_handles
->size() - i
>
134 embedder::kPlatformChannelMaxNumHandles
;
135 i
+= embedder::kPlatformChannelMaxNumHandles
) {
136 scoped_ptr
<MessageInTransit
> fd_message(new MessageInTransit(
137 MessageInTransit::kTypeRawChannel
,
138 MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles
, 0,
140 embedder::ScopedPlatformHandleVectorPtr
fds(
141 new embedder::PlatformHandleVector(
142 platform_handles
->begin() + i
,
143 platform_handles
->begin() + i
+
144 embedder::kPlatformChannelMaxNumHandles
));
145 fd_message
->SetTransportData(
146 make_scoped_ptr(new TransportData(fds
.Pass())));
147 RawChannel::EnqueueMessageNoLock(fd_message
.Pass());
150 // Remove the handles that we "moved" into the other messages.
151 platform_handles
->erase(platform_handles
->begin(),
152 platform_handles
->begin() + i
);
156 RawChannel::EnqueueMessageNoLock(message
.Pass());
159 bool RawChannelPosix::OnReadMessageForRawChannel(
160 const MessageInTransit::View
& message_view
) {
161 DCHECK_EQ(message_view
.type(), MessageInTransit::kTypeRawChannel
);
163 if (message_view
.subtype() ==
164 MessageInTransit::kSubtypeRawChannelPosixExtraPlatformHandles
) {
165 // We don't need to do anything. |RawChannel| won't extract the platform
166 // handles, and they'll be accumulated in |Read()|.
170 return RawChannel::OnReadMessageForRawChannel(message_view
);
173 RawChannel::IOResult
RawChannelPosix::Read(size_t* bytes_read
) {
174 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
175 DCHECK(!pending_read_
);
177 IOResult rv
= ReadImpl(bytes_read
);
178 if (rv
!= IO_SUCCEEDED
&& rv
!= IO_PENDING
) {
179 // Make sure that |OnFileCanReadWithoutBlocking()| won't be called again.
180 read_watcher_
.reset();
185 RawChannel::IOResult
RawChannelPosix::ScheduleRead() {
186 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
187 DCHECK(!pending_read_
);
189 pending_read_
= true;
194 embedder::ScopedPlatformHandleVectorPtr
RawChannelPosix::GetReadPlatformHandles(
195 size_t num_platform_handles
,
196 const void* /*platform_handle_table*/) {
197 DCHECK_GT(num_platform_handles
, 0u);
199 if (read_platform_handles_
.size() < num_platform_handles
) {
200 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
201 read_platform_handles_
.clear();
202 return embedder::ScopedPlatformHandleVectorPtr();
205 embedder::ScopedPlatformHandleVectorPtr
rv(
206 new embedder::PlatformHandleVector(num_platform_handles
));
207 rv
->assign(read_platform_handles_
.begin(),
208 read_platform_handles_
.begin() + num_platform_handles
);
209 read_platform_handles_
.erase(
210 read_platform_handles_
.begin(),
211 read_platform_handles_
.begin() + num_platform_handles
);
215 RawChannel::IOResult
RawChannelPosix::WriteNoLock(
216 size_t* platform_handles_written
,
217 size_t* bytes_written
) {
218 write_lock().AssertAcquired();
220 DCHECK(!pending_write_
);
222 size_t num_platform_handles
= 0;
223 ssize_t write_result
;
224 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
225 embedder::PlatformHandle
* platform_handles
;
226 void* serialization_data
; // Actually unused.
227 write_buffer_no_lock()->GetPlatformHandlesToSend(
228 &num_platform_handles
, &platform_handles
, &serialization_data
);
229 DCHECK_GT(num_platform_handles
, 0u);
230 DCHECK_LE(num_platform_handles
, embedder::kPlatformChannelMaxNumHandles
);
231 DCHECK(platform_handles
);
233 // TODO(vtl): Reduce code duplication. (This is duplicated from below.)
234 std::vector
<WriteBuffer::Buffer
> buffers
;
235 write_buffer_no_lock()->GetBuffers(&buffers
);
236 DCHECK(!buffers
.empty());
237 const size_t kMaxBufferCount
= 10;
238 iovec iov
[kMaxBufferCount
];
239 size_t buffer_count
= std::min(buffers
.size(), kMaxBufferCount
);
240 for (size_t i
= 0; i
< buffer_count
; ++i
) {
241 iov
[i
].iov_base
= const_cast<char*>(buffers
[i
].addr
);
242 iov
[i
].iov_len
= buffers
[i
].size
;
245 write_result
= embedder::PlatformChannelSendmsgWithHandles(
246 fd_
.get(), iov
, buffer_count
, platform_handles
, num_platform_handles
);
247 for (size_t i
= 0; i
< num_platform_handles
; i
++)
248 platform_handles
[i
].CloseIfNecessary();
250 std::vector
<WriteBuffer::Buffer
> buffers
;
251 write_buffer_no_lock()->GetBuffers(&buffers
);
252 DCHECK(!buffers
.empty());
254 if (buffers
.size() == 1) {
255 write_result
= embedder::PlatformChannelWrite(fd_
.get(), buffers
[0].addr
,
258 const size_t kMaxBufferCount
= 10;
259 iovec iov
[kMaxBufferCount
];
260 size_t buffer_count
= std::min(buffers
.size(), kMaxBufferCount
);
261 for (size_t i
= 0; i
< buffer_count
; ++i
) {
262 iov
[i
].iov_base
= const_cast<char*>(buffers
[i
].addr
);
263 iov
[i
].iov_len
= buffers
[i
].size
;
267 embedder::PlatformChannelWritev(fd_
.get(), iov
, buffer_count
);
271 if (write_result
>= 0) {
272 *platform_handles_written
= num_platform_handles
;
273 *bytes_written
= static_cast<size_t>(write_result
);
278 return IO_FAILED_SHUTDOWN
;
280 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
) {
281 PLOG(WARNING
) << "sendmsg/write/writev";
282 return IO_FAILED_UNKNOWN
;
285 return ScheduleWriteNoLock();
288 RawChannel::IOResult
RawChannelPosix::ScheduleWriteNoLock() {
289 write_lock().AssertAcquired();
291 DCHECK(!pending_write_
);
293 // Set up to wait for the FD to become writable.
294 // If we're not on the I/O thread, we have to post a task to do this.
295 if (base::MessageLoop::current() != message_loop_for_io()) {
296 message_loop_for_io()->PostTask(FROM_HERE
,
297 base::Bind(&RawChannelPosix::WaitToWrite
,
298 weak_ptr_factory_
.GetWeakPtr()));
299 pending_write_
= true;
303 if (message_loop_for_io()->WatchFileDescriptor(
304 fd_
.get().fd
, false, base::MessageLoopForIO::WATCH_WRITE
,
305 write_watcher_
.get(), this)) {
306 pending_write_
= true;
310 return IO_FAILED_UNKNOWN
;
313 bool RawChannelPosix::OnInit() {
314 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
316 DCHECK(!read_watcher_
);
317 read_watcher_
.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
318 DCHECK(!write_watcher_
);
319 write_watcher_
.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
321 if (!message_loop_for_io()->WatchFileDescriptor(
322 fd_
.get().fd
, true, base::MessageLoopForIO::WATCH_READ
,
323 read_watcher_
.get(), this)) {
324 // TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
325 // (in the sense of returning the message loop's state to what it was before
327 read_watcher_
.reset();
328 write_watcher_
.reset();
335 void RawChannelPosix::OnShutdownNoLock(
336 scoped_ptr
<ReadBuffer
> /*read_buffer*/,
337 scoped_ptr
<WriteBuffer
> /*write_buffer*/) {
338 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
339 write_lock().AssertAcquired();
341 read_watcher_
.reset(); // This will stop watching (if necessary).
342 write_watcher_
.reset(); // This will stop watching (if necessary).
344 pending_read_
= false;
345 pending_write_
= false;
347 DCHECK(fd_
.is_valid());
350 weak_ptr_factory_
.InvalidateWeakPtrs();
353 void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd
) {
354 DCHECK_EQ(fd
, fd_
.get().fd
);
355 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
357 if (!pending_read_
) {
362 pending_read_
= false;
363 size_t bytes_read
= 0;
364 IOResult io_result
= Read(&bytes_read
);
365 if (io_result
!= IO_PENDING
)
366 OnReadCompleted(io_result
, bytes_read
);
368 // On failure, |read_watcher_| must have been reset; on success,
369 // we assume that |OnReadCompleted()| always schedules another read.
370 // Otherwise, we could end up spinning -- getting
371 // |OnFileCanReadWithoutBlocking()| again and again but not doing any actual
373 // TODO(yzshen): An alternative is to stop watching if RawChannel doesn't
374 // schedule a new read. But that code won't be reached under the current
375 // RawChannel implementation.
376 DCHECK(!read_watcher_
|| pending_read_
);
379 void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd
) {
380 DCHECK_EQ(fd
, fd_
.get().fd
);
381 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
384 size_t platform_handles_written
= 0;
385 size_t bytes_written
= 0;
387 base::AutoLock
locker(write_lock());
389 DCHECK(pending_write_
);
391 pending_write_
= false;
392 io_result
= WriteNoLock(&platform_handles_written
, &bytes_written
);
395 if (io_result
!= IO_PENDING
)
396 OnWriteCompleted(io_result
, platform_handles_written
, bytes_written
);
399 RawChannel::IOResult
RawChannelPosix::ReadImpl(size_t* bytes_read
) {
400 char* buffer
= nullptr;
401 size_t bytes_to_read
= 0;
402 read_buffer()->GetBuffer(&buffer
, &bytes_to_read
);
404 size_t old_num_platform_handles
= read_platform_handles_
.size();
405 ssize_t read_result
= embedder::PlatformChannelRecvmsg(
406 fd_
.get(), buffer
, bytes_to_read
, &read_platform_handles_
);
407 if (read_platform_handles_
.size() > old_num_platform_handles
) {
408 DCHECK_LE(read_platform_handles_
.size() - old_num_platform_handles
,
409 embedder::kPlatformChannelMaxNumHandles
);
411 // We should never accumulate more than |TransportData::kMaxPlatformHandles
412 // + embedder::kPlatformChannelMaxNumHandles| handles. (The latter part is
413 // possible because we could have accumulated all the handles for a message,
414 // then received the message data plus the first set of handles for the next
415 // message in the subsequent |recvmsg()|.)
416 if (read_platform_handles_
.size() >
417 (TransportData::GetMaxPlatformHandles() +
418 embedder::kPlatformChannelMaxNumHandles
)) {
419 LOG(ERROR
) << "Received too many platform handles";
420 embedder::CloseAllPlatformHandles(&read_platform_handles_
);
421 read_platform_handles_
.clear();
422 return IO_FAILED_UNKNOWN
;
426 if (read_result
> 0) {
427 *bytes_read
= static_cast<size_t>(read_result
);
431 // |read_result == 0| means "end of file".
432 if (read_result
== 0)
433 return IO_FAILED_SHUTDOWN
;
435 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)
436 return ScheduleRead();
438 if (errno
== ECONNRESET
)
439 return IO_FAILED_BROKEN
;
441 PLOG(WARNING
) << "recvmsg";
442 return IO_FAILED_UNKNOWN
;
445 void RawChannelPosix::WaitToWrite() {
446 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
448 DCHECK(write_watcher_
);
450 if (!message_loop_for_io()->WatchFileDescriptor(
451 fd_
.get().fd
, false, base::MessageLoopForIO::WATCH_WRITE
,
452 write_watcher_
.get(), this)) {
454 base::AutoLock
locker(write_lock());
456 DCHECK(pending_write_
);
457 pending_write_
= false;
459 OnWriteCompleted(IO_FAILED_UNKNOWN
, 0, 0);
465 // -----------------------------------------------------------------------------
467 // Static factory method declared in raw_channel.h.
469 scoped_ptr
<RawChannel
> RawChannel::Create(
470 embedder::ScopedPlatformHandle handle
) {
471 return make_scoped_ptr(new RawChannelPosix(handle
.Pass()));
474 } // namespace system