1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/raw_channel.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/stl_util.h"
16 #include "mojo/system/message_in_transit.h"
17 #include "mojo/system/transport_data.h"
22 const size_t kReadSize
= 4096;
24 // RawChannel::ReadBuffer ------------------------------------------------------
26 RawChannel::ReadBuffer::ReadBuffer()
31 RawChannel::ReadBuffer::~ReadBuffer() {
34 void RawChannel::ReadBuffer::GetBuffer(char** addr
, size_t* size
) {
35 DCHECK_GE(buffer_
.size(), num_valid_bytes_
+ kReadSize
);
36 *addr
= &buffer_
[0] + num_valid_bytes_
;
40 // RawChannel::WriteBuffer -----------------------------------------------------
42 RawChannel::WriteBuffer::WriteBuffer(size_t serialized_platform_handle_size
)
43 : serialized_platform_handle_size_(serialized_platform_handle_size
),
44 platform_handles_offset_(0),
48 RawChannel::WriteBuffer::~WriteBuffer() {
49 STLDeleteElements(&message_queue_
);
52 bool RawChannel::WriteBuffer::HavePlatformHandlesToSend() const {
53 if (message_queue_
.empty())
56 const TransportData
* transport_data
=
57 message_queue_
.front()->transport_data();
61 const embedder::PlatformHandleVector
* all_platform_handles
=
62 transport_data
->platform_handles();
63 if (!all_platform_handles
) {
64 DCHECK_EQ(platform_handles_offset_
, 0u);
67 if (platform_handles_offset_
>= all_platform_handles
->size()) {
68 DCHECK_EQ(platform_handles_offset_
, all_platform_handles
->size());
75 void RawChannel::WriteBuffer::GetPlatformHandlesToSend(
76 size_t* num_platform_handles
,
77 embedder::PlatformHandle
** platform_handles
,
78 void** serialization_data
) {
79 DCHECK(HavePlatformHandlesToSend());
81 TransportData
* transport_data
= message_queue_
.front()->transport_data();
82 embedder::PlatformHandleVector
* all_platform_handles
=
83 transport_data
->platform_handles();
84 *num_platform_handles
=
85 all_platform_handles
->size() - platform_handles_offset_
;
86 *platform_handles
= &(*all_platform_handles
)[platform_handles_offset_
];
87 size_t serialization_data_offset
=
88 transport_data
->platform_handle_table_offset();
89 DCHECK_GT(serialization_data_offset
, 0u);
90 serialization_data_offset
+=
91 platform_handles_offset_
* serialized_platform_handle_size_
;
93 static_cast<char*>(transport_data
->buffer()) + serialization_data_offset
;
96 void RawChannel::WriteBuffer::GetBuffers(std::vector
<Buffer
>* buffers
) const {
99 if (message_queue_
.empty())
102 MessageInTransit
* message
= message_queue_
.front();
103 DCHECK_LT(data_offset_
, message
->total_size());
104 size_t bytes_to_write
= message
->total_size() - data_offset_
;
106 size_t transport_data_buffer_size
= message
->transport_data() ?
107 message
->transport_data()->buffer_size() : 0;
109 if (!transport_data_buffer_size
) {
110 // Only write from the main buffer.
111 DCHECK_LT(data_offset_
, message
->main_buffer_size());
112 DCHECK_LE(bytes_to_write
, message
->main_buffer_size());
114 static_cast<const char*>(message
->main_buffer()) + data_offset_
,
116 buffers
->push_back(buffer
);
120 if (data_offset_
>= message
->main_buffer_size()) {
121 // Only write from the transport data buffer.
122 DCHECK_LT(data_offset_
- message
->main_buffer_size(),
123 transport_data_buffer_size
);
124 DCHECK_LE(bytes_to_write
, transport_data_buffer_size
);
126 static_cast<const char*>(message
->transport_data()->buffer()) +
127 (data_offset_
- message
->main_buffer_size()),
129 buffers
->push_back(buffer
);
133 // TODO(vtl): We could actually send out buffers from multiple messages, with
134 // the "stopping" condition being reaching a message with platform handles
137 // Write from both buffers.
138 DCHECK_EQ(bytes_to_write
, message
->main_buffer_size() - data_offset_
+
139 transport_data_buffer_size
);
141 static_cast<const char*>(message
->main_buffer()) + data_offset_
,
142 message
->main_buffer_size() - data_offset_
144 buffers
->push_back(buffer1
);
146 static_cast<const char*>(message
->transport_data()->buffer()),
147 transport_data_buffer_size
149 buffers
->push_back(buffer2
);
152 // RawChannel ------------------------------------------------------------------
154 RawChannel::RawChannel()
155 : message_loop_for_io_(NULL
),
157 read_stopped_(false),
158 write_stopped_(false),
159 weak_ptr_factory_(this) {
162 RawChannel::~RawChannel() {
163 DCHECK(!read_buffer_
);
164 DCHECK(!write_buffer_
);
166 // No need to take the |write_lock_| here -- if there are still weak pointers
167 // outstanding, then we're hosed anyway (since we wouldn't be able to
168 // invalidate them cleanly, since we might not be on the I/O thread).
169 DCHECK(!weak_ptr_factory_
.HasWeakPtrs());
172 bool RawChannel::Init(Delegate
* delegate
) {
176 delegate_
= delegate
;
178 CHECK_EQ(base::MessageLoop::current()->type(), base::MessageLoop::TYPE_IO
);
179 DCHECK(!message_loop_for_io_
);
180 message_loop_for_io_
=
181 static_cast<base::MessageLoopForIO
*>(base::MessageLoop::current());
183 // No need to take the lock. No one should be using us yet.
184 DCHECK(!read_buffer_
);
185 read_buffer_
.reset(new ReadBuffer
);
186 DCHECK(!write_buffer_
);
187 write_buffer_
.reset(new WriteBuffer(GetSerializedPlatformHandleSize()));
191 message_loop_for_io_
= NULL
;
192 read_buffer_
.reset();
193 write_buffer_
.reset();
197 if (ScheduleRead() != IO_PENDING
) {
198 // This will notify the delegate about the read failure. Although we're on
199 // the I/O thread, don't call it in the nested context.
200 message_loop_for_io_
->PostTask(
202 base::Bind(&RawChannel::OnReadCompleted
, weak_ptr_factory_
.GetWeakPtr(),
206 // ScheduleRead() failure is treated as a read failure (by notifying the
207 // delegate), not as an init failure.
211 void RawChannel::Shutdown() {
212 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_
);
214 base::AutoLock
locker(write_lock_
);
216 LOG_IF(WARNING
, !write_buffer_
->message_queue_
.empty())
217 << "Shutting down RawChannel with write buffer nonempty";
219 // Reset the delegate so that it won't receive further calls.
221 read_stopped_
= true;
222 write_stopped_
= true;
223 weak_ptr_factory_
.InvalidateWeakPtrs();
225 OnShutdownNoLock(read_buffer_
.Pass(), write_buffer_
.Pass());
228 // Reminder: This must be thread-safe.
229 bool RawChannel::WriteMessage(scoped_ptr
<MessageInTransit
> message
) {
232 base::AutoLock
locker(write_lock_
);
236 if (!write_buffer_
->message_queue_
.empty()) {
237 EnqueueMessageNoLock(message
.Pass());
241 EnqueueMessageNoLock(message
.Pass());
242 DCHECK_EQ(write_buffer_
->data_offset_
, 0u);
244 size_t platform_handles_written
= 0;
245 size_t bytes_written
= 0;
246 IOResult io_result
= WriteNoLock(&platform_handles_written
, &bytes_written
);
247 if (io_result
== IO_PENDING
)
250 bool result
= OnWriteCompletedNoLock(io_result
== IO_SUCCEEDED
,
251 platform_handles_written
,
254 // Even if we're on the I/O thread, don't call |OnFatalError()| in the
256 message_loop_for_io_
->PostTask(
258 base::Bind(&RawChannel::CallOnFatalError
,
259 weak_ptr_factory_
.GetWeakPtr(),
260 Delegate::FATAL_ERROR_WRITE
));
266 // Reminder: This must be thread-safe.
267 bool RawChannel::IsWriteBufferEmpty() {
268 base::AutoLock
locker(write_lock_
);
269 return write_buffer_
->message_queue_
.empty();
272 void RawChannel::OnReadCompleted(bool result
, size_t bytes_read
) {
273 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_
);
280 IOResult io_result
= result
? IO_SUCCEEDED
: IO_FAILED
;
282 // Keep reading data in a loop, and dispatch messages if enough data is
283 // received. Exit the loop if any of the following happens:
284 // - one or more messages were dispatched;
285 // - the last read failed, was a partial read or would block;
286 // - |Shutdown()| was called.
288 if (io_result
!= IO_SUCCEEDED
) {
289 read_stopped_
= true;
290 CallOnFatalError(Delegate::FATAL_ERROR_READ
);
294 read_buffer_
->num_valid_bytes_
+= bytes_read
;
296 // Dispatch all the messages that we can.
297 bool did_dispatch_message
= false;
298 // Tracks the offset of the first undispatched message in |read_buffer_|.
299 // Currently, we copy data to ensure that this is zero at the beginning.
300 size_t read_buffer_start
= 0;
301 size_t remaining_bytes
= read_buffer_
->num_valid_bytes_
;
303 // Note that we rely on short-circuit evaluation here:
304 // - |read_buffer_start| may be an invalid index into
305 // |read_buffer_->buffer_| if |remaining_bytes| is zero.
306 // - |message_size| is only valid if |GetNextMessageSize()| returns true.
307 // TODO(vtl): Use |message_size| more intelligently (e.g., to request the
309 // TODO(vtl): Validate that |message_size| is sane.
310 while (remaining_bytes
> 0 &&
311 MessageInTransit::GetNextMessageSize(
312 &read_buffer_
->buffer_
[read_buffer_start
], remaining_bytes
,
314 remaining_bytes
>= message_size
) {
315 MessageInTransit::View
316 message_view(message_size
, &read_buffer_
->buffer_
[read_buffer_start
]);
317 DCHECK_EQ(message_view
.total_size(), message_size
);
319 const char* error_message
= NULL
;
320 if (!message_view
.IsValid(GetSerializedPlatformHandleSize(),
322 DCHECK(error_message
);
323 LOG(WARNING
) << "Received invalid message: " << error_message
;
324 read_stopped_
= true;
325 CallOnFatalError(Delegate::FATAL_ERROR_READ
);
329 if (message_view
.type() == MessageInTransit::kTypeRawChannel
) {
330 if (!OnReadMessageForRawChannel(message_view
)) {
331 read_stopped_
= true;
332 CallOnFatalError(Delegate::FATAL_ERROR_READ
);
336 embedder::ScopedPlatformHandleVectorPtr platform_handles
;
337 if (message_view
.transport_data_buffer()) {
338 size_t num_platform_handles
;
339 const void* platform_handle_table
;
340 TransportData::GetPlatformHandleTable(
341 message_view
.transport_data_buffer(),
342 &num_platform_handles
,
343 &platform_handle_table
);
345 if (num_platform_handles
> 0) {
347 GetReadPlatformHandles(num_platform_handles
,
348 platform_handle_table
).Pass();
349 if (!platform_handles
) {
350 LOG(WARNING
) << "Invalid number of platform handles received";
351 read_stopped_
= true;
352 CallOnFatalError(Delegate::FATAL_ERROR_READ
);
358 // TODO(vtl): In the case that we aren't expecting any platform handles,
359 // for the POSIX implementation, we should confirm that none are stored.
361 // Dispatch the message.
363 delegate_
->OnReadMessage(message_view
, platform_handles
.Pass());
365 // |Shutdown()| was called in |OnReadMessage()|.
366 // TODO(vtl): Add test for this case.
371 did_dispatch_message
= true;
374 read_buffer_start
+= message_size
;
375 remaining_bytes
-= message_size
;
378 if (read_buffer_start
> 0) {
379 // Move data back to start.
380 read_buffer_
->num_valid_bytes_
= remaining_bytes
;
381 if (read_buffer_
->num_valid_bytes_
> 0) {
382 memmove(&read_buffer_
->buffer_
[0],
383 &read_buffer_
->buffer_
[read_buffer_start
], remaining_bytes
);
385 read_buffer_start
= 0;
388 if (read_buffer_
->buffer_
.size() - read_buffer_
->num_valid_bytes_
<
390 // Use power-of-2 buffer sizes.
391 // TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
392 // maximum message size to whatever extent necessary).
393 // TODO(vtl): We may often be able to peek at the header and get the real
394 // required extra space (which may be much bigger than |kReadSize|).
395 size_t new_size
= std::max(read_buffer_
->buffer_
.size(), kReadSize
);
396 while (new_size
< read_buffer_
->num_valid_bytes_
+ kReadSize
)
399 // TODO(vtl): It's suboptimal to zero out the fresh memory.
400 read_buffer_
->buffer_
.resize(new_size
, 0);
403 // (1) If we dispatched any messages, stop reading for now (and let the
404 // message loop do its thing for another round).
405 // TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
406 // a single message. Risks: slower, more complex if we want to avoid lots of
407 // copying. ii. Keep reading until there's no more data and dispatch all the
408 // messages we can. Risks: starvation of other users of the message loop.)
409 // (2) If we didn't max out |kReadSize|, stop reading for now.
410 bool schedule_for_later
= did_dispatch_message
|| bytes_read
< kReadSize
;
412 io_result
= schedule_for_later
? ScheduleRead() : Read(&bytes_read
);
413 } while (io_result
!= IO_PENDING
);
416 void RawChannel::OnWriteCompleted(bool result
,
417 size_t platform_handles_written
,
418 size_t bytes_written
) {
419 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_
);
421 bool did_fail
= false;
423 base::AutoLock
locker(write_lock_
);
424 DCHECK_EQ(write_stopped_
, write_buffer_
->message_queue_
.empty());
426 if (write_stopped_
) {
431 did_fail
= !OnWriteCompletedNoLock(result
,
432 platform_handles_written
,
437 CallOnFatalError(Delegate::FATAL_ERROR_WRITE
);
440 void RawChannel::EnqueueMessageNoLock(scoped_ptr
<MessageInTransit
> message
) {
441 write_lock_
.AssertAcquired();
442 write_buffer_
->message_queue_
.push_back(message
.release());
445 bool RawChannel::OnReadMessageForRawChannel(
446 const MessageInTransit::View
& message_view
) {
447 // No non-implementation specific |RawChannel| control messages.
448 LOG(ERROR
) << "Invalid control message (subtype " << message_view
.subtype()
453 void RawChannel::CallOnFatalError(Delegate::FatalError fatal_error
) {
454 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io_
);
455 // TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
457 delegate_
->OnFatalError(fatal_error
);
460 bool RawChannel::OnWriteCompletedNoLock(bool result
,
461 size_t platform_handles_written
,
462 size_t bytes_written
) {
463 write_lock_
.AssertAcquired();
465 DCHECK(!write_stopped_
);
466 DCHECK(!write_buffer_
->message_queue_
.empty());
469 write_buffer_
->platform_handles_offset_
+= platform_handles_written
;
470 write_buffer_
->data_offset_
+= bytes_written
;
472 MessageInTransit
* message
= write_buffer_
->message_queue_
.front();
473 if (write_buffer_
->data_offset_
>= message
->total_size()) {
475 DCHECK_EQ(write_buffer_
->data_offset_
, message
->total_size());
476 write_buffer_
->message_queue_
.pop_front();
478 write_buffer_
->platform_handles_offset_
= 0;
479 write_buffer_
->data_offset_
= 0;
481 if (write_buffer_
->message_queue_
.empty())
485 // Schedule the next write.
486 IOResult io_result
= ScheduleWriteNoLock();
487 if (io_result
== IO_PENDING
)
489 DCHECK_EQ(io_result
, IO_FAILED
);
492 write_stopped_
= true;
493 STLDeleteElements(&write_buffer_
->message_queue_
);
494 write_buffer_
->platform_handles_offset_
= 0;
495 write_buffer_
->data_offset_
= 0;
499 } // namespace system