1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/channel.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId
!=
22 MessageInTransit::kInvalidEndpointId
,
23 kBootstrapEndpointId_is_invalid
);
25 STATIC_CONST_MEMBER_DEFINITION
const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId
;
28 Channel::Channel(embedder::PlatformSupport
* platform_support
)
29 : platform_support_(platform_support
),
31 is_shutting_down_(false),
32 next_local_id_(kBootstrapEndpointId
) {
35 bool Channel::Init(scoped_ptr
<RawChannel
> raw_channel
) {
36 DCHECK(creation_thread_checker_
.CalledOnValidThread());
39 // No need to take |lock_|, since this must be called before this object
40 // becomes thread-safe.
42 raw_channel_
= raw_channel
.Pass();
44 if (!raw_channel_
->Init(this)) {
53 void Channel::Shutdown() {
54 DCHECK(creation_thread_checker_
.CalledOnValidThread());
56 IdToEndpointMap to_destroy
;
58 base::AutoLock
locker(lock_
);
62 // Note: Don't reset |raw_channel_|, in case we're being called from within
63 // |OnReadMessage()| or |OnError()|.
64 raw_channel_
->Shutdown();
67 // We need to deal with it outside the lock.
68 std::swap(to_destroy
, local_id_to_endpoint_map_
);
72 size_t num_zombies
= 0;
73 for (IdToEndpointMap::iterator it
= to_destroy
.begin();
74 it
!= to_destroy
.end();
76 if (it
->second
->state_
== ChannelEndpoint::STATE_NORMAL
) {
77 it
->second
->message_pipe_
->OnRemove(it
->second
->port_
);
80 DCHECK(!it
->second
->message_pipe_
.get());
83 it
->second
->DetachFromChannel();
85 DVLOG_IF(2, num_live
|| num_zombies
) << "Shut down Channel with " << num_live
86 << " live endpoints and " << num_zombies
90 void Channel::WillShutdownSoon() {
91 base::AutoLock
locker(lock_
);
92 is_shutting_down_
= true;
95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
98 // directly to this function, which wouldn't be sufficient for safety.
99 MessageInTransit::EndpointId
Channel::AttachEndpoint(
100 scoped_refptr
<ChannelEndpoint
> endpoint
) {
101 DCHECK(endpoint
.get());
103 MessageInTransit::EndpointId local_id
;
105 base::AutoLock
locker(lock_
);
107 DLOG_IF(WARNING
, is_shutting_down_
)
108 << "AttachEndpoint() while shutting down";
110 while (next_local_id_
== MessageInTransit::kInvalidEndpointId
||
111 local_id_to_endpoint_map_
.find(next_local_id_
) !=
112 local_id_to_endpoint_map_
.end())
115 local_id
= next_local_id_
;
117 local_id_to_endpoint_map_
[local_id
] = endpoint
;
120 endpoint
->AttachToChannel(this, local_id
);
124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id
,
125 MessageInTransit::EndpointId remote_id
) {
126 scoped_refptr
<ChannelEndpoint
> endpoint
;
127 ChannelEndpoint::State state
;
128 scoped_refptr
<MessagePipe
> message_pipe
;
131 base::AutoLock
locker(lock_
);
133 DLOG_IF(WARNING
, is_shutting_down_
)
134 << "RunMessagePipeEndpoint() while shutting down";
136 IdToEndpointMap::const_iterator it
=
137 local_id_to_endpoint_map_
.find(local_id
);
138 if (it
== local_id_to_endpoint_map_
.end())
140 endpoint
= it
->second
;
141 state
= it
->second
->state_
;
142 message_pipe
= it
->second
->message_pipe_
;
143 port
= it
->second
->port_
;
146 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
148 if (state
!= ChannelEndpoint::STATE_NORMAL
) {
149 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
150 "(local ID " << local_id
<< ", remote ID " << remote_id
<< ")";
154 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
155 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
156 endpoint
->Run(remote_id
);
157 // TODO(vtl): Get rid of this.
158 message_pipe
->Run(port
);
162 void Channel::RunRemoteMessagePipeEndpoint(
163 MessageInTransit::EndpointId local_id
,
164 MessageInTransit::EndpointId remote_id
) {
167 base::AutoLock
locker(lock_
);
168 DCHECK(local_id_to_endpoint_map_
.find(local_id
) !=
169 local_id_to_endpoint_map_
.end());
173 if (!SendControlMessage(
174 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint
,
177 HandleLocalError(base::StringPrintf(
178 "Failed to send message to run remote message pipe endpoint (local ID "
180 static_cast<unsigned>(local_id
),
181 static_cast<unsigned>(remote_id
)));
185 bool Channel::WriteMessage(scoped_ptr
<MessageInTransit
> message
) {
186 base::AutoLock
locker(lock_
);
188 // TODO(vtl): I think this is probably not an error condition, but I should
189 // think about it (and the shutdown sequence) more carefully.
190 LOG(WARNING
) << "WriteMessage() after shutdown";
194 DLOG_IF(WARNING
, is_shutting_down_
) << "WriteMessage() while shutting down";
195 return raw_channel_
->WriteMessage(message
.Pass());
198 bool Channel::IsWriteBufferEmpty() {
199 base::AutoLock
locker(lock_
);
202 return raw_channel_
->IsWriteBufferEmpty();
205 void Channel::DetachMessagePipeEndpoint(
206 MessageInTransit::EndpointId local_id
,
207 MessageInTransit::EndpointId remote_id
) {
208 DCHECK_NE(local_id
, MessageInTransit::kInvalidEndpointId
);
210 // If this is non-null after the locked block, the endpoint should be detached
211 // (and no remove message sent).
212 scoped_refptr
<ChannelEndpoint
> endpoint_to_detach
;
214 base::AutoLock
locker_(lock_
);
218 IdToEndpointMap::iterator it
= local_id_to_endpoint_map_
.find(local_id
);
219 DCHECK(it
!= local_id_to_endpoint_map_
.end());
221 switch (it
->second
->state_
) {
222 case ChannelEndpoint::STATE_NORMAL
:
223 it
->second
->state_
= ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK
;
224 it
->second
->message_pipe_
= nullptr;
225 if (remote_id
== MessageInTransit::kInvalidEndpointId
)
227 // We have to send a remove message (outside the lock).
229 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH
:
230 endpoint_to_detach
= it
->second
;
231 local_id_to_endpoint_map_
.erase(it
);
232 // We have to detach (outside the lock).
234 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK
:
239 if (endpoint_to_detach
.get()) {
240 endpoint_to_detach
->DetachFromChannel();
244 if (!SendControlMessage(
245 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint
,
248 HandleLocalError(base::StringPrintf(
249 "Failed to send message to remove remote message pipe endpoint (local "
250 "ID %u, remote ID %u)",
251 static_cast<unsigned>(local_id
),
252 static_cast<unsigned>(remote_id
)));
256 size_t Channel::GetSerializedPlatformHandleSize() const {
257 return raw_channel_
->GetSerializedPlatformHandleSize();
260 Channel::~Channel() {
261 // The channel should have been shut down first.
262 DCHECK(!is_running_
);
265 void Channel::OnReadMessage(
266 const MessageInTransit::View
& message_view
,
267 embedder::ScopedPlatformHandleVectorPtr platform_handles
) {
268 DCHECK(creation_thread_checker_
.CalledOnValidThread());
270 switch (message_view
.type()) {
271 case MessageInTransit::kTypeMessagePipeEndpoint
:
272 case MessageInTransit::kTypeMessagePipe
:
273 OnReadMessageForDownstream(message_view
, platform_handles
.Pass());
275 case MessageInTransit::kTypeChannel
:
276 OnReadMessageForChannel(message_view
, platform_handles
.Pass());
280 base::StringPrintf("Received message of invalid type %u",
281 static_cast<unsigned>(message_view
.type())));
286 void Channel::OnError(Error error
) {
287 DCHECK(creation_thread_checker_
.CalledOnValidThread());
290 case ERROR_READ_SHUTDOWN
:
291 // The other side was cleanly closed, so this isn't actually an error.
292 DVLOG(1) << "RawChannel read error (shutdown)";
294 case ERROR_READ_BROKEN
: {
295 base::AutoLock
locker(lock_
);
296 LOG_IF(ERROR
, !is_shutting_down_
)
297 << "RawChannel read error (connection broken)";
300 case ERROR_READ_BAD_MESSAGE
:
301 // Receiving a bad message means either a bug, data corruption, or
302 // malicious attack (probably due to some other bug).
303 LOG(ERROR
) << "RawChannel read error (received bad message)";
305 case ERROR_READ_UNKNOWN
:
306 LOG(ERROR
) << "RawChannel read error (unknown)";
309 // Write errors are slightly notable: they probably shouldn't happen under
310 // normal operation (but maybe the other side crashed).
311 LOG(WARNING
) << "RawChannel write error";
317 void Channel::OnReadMessageForDownstream(
318 const MessageInTransit::View
& message_view
,
319 embedder::ScopedPlatformHandleVectorPtr platform_handles
) {
320 DCHECK(creation_thread_checker_
.CalledOnValidThread());
321 DCHECK(message_view
.type() == MessageInTransit::kTypeMessagePipeEndpoint
||
322 message_view
.type() == MessageInTransit::kTypeMessagePipe
);
324 MessageInTransit::EndpointId local_id
= message_view
.destination_id();
325 if (local_id
== MessageInTransit::kInvalidEndpointId
) {
326 HandleRemoteError("Received message with no destination ID");
330 ChannelEndpoint::State state
= ChannelEndpoint::STATE_NORMAL
;
331 scoped_refptr
<MessagePipe
> message_pipe
;
333 bool nonexistent_local_id_error
= false;
335 base::AutoLock
locker(lock_
);
337 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
338 // be called from the creation thread, |raw_channel_| should never be null
342 IdToEndpointMap::const_iterator it
=
343 local_id_to_endpoint_map_
.find(local_id
);
344 if (it
== local_id_to_endpoint_map_
.end()) {
345 nonexistent_local_id_error
= true;
347 state
= it
->second
->state_
;
348 message_pipe
= it
->second
->message_pipe_
;
349 port
= it
->second
->port_
;
352 if (nonexistent_local_id_error
) {
353 HandleRemoteError(base::StringPrintf(
354 "Received a message for nonexistent local destination ID %u",
355 static_cast<unsigned>(local_id
)));
356 // This is strongly indicative of some problem. However, it's not a fatal
357 // error, since it may indicate a buggy (or hostile) remote process. Don't
358 // die even for Debug builds, since handling this properly needs to be
359 // tested (TODO(vtl)).
360 DLOG(ERROR
) << "This should not happen under normal operation.";
364 // Ignore messages for zombie endpoints (not an error).
365 if (state
!= ChannelEndpoint::STATE_NORMAL
) {
366 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
367 << local_id
<< ", remote ID = " << message_view
.source_id() << ")";
371 // We need to duplicate the message (data), because |EnqueueMessage()| will
372 // take ownership of it.
373 scoped_ptr
<MessageInTransit
> message(new MessageInTransit(message_view
));
374 if (message_view
.transport_data_buffer_size() > 0) {
375 DCHECK(message_view
.transport_data_buffer());
376 message
->SetDispatchers(TransportData::DeserializeDispatchers(
377 message_view
.transport_data_buffer(),
378 message_view
.transport_data_buffer_size(),
379 platform_handles
.Pass(),
382 MojoResult result
= message_pipe
->EnqueueMessage(
383 MessagePipe::GetPeerPort(port
), message
.Pass());
384 if (result
!= MOJO_RESULT_OK
) {
385 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
386 // has been closed (in an unavoidable race). This might also be a "remote"
387 // error, e.g., if the remote side is sending invalid control messages (to
388 // the message pipe).
389 HandleLocalError(base::StringPrintf(
390 "Failed to enqueue message to local ID %u (result %d)",
391 static_cast<unsigned>(local_id
),
392 static_cast<int>(result
)));
397 void Channel::OnReadMessageForChannel(
398 const MessageInTransit::View
& message_view
,
399 embedder::ScopedPlatformHandleVectorPtr platform_handles
) {
400 DCHECK(creation_thread_checker_
.CalledOnValidThread());
401 DCHECK_EQ(message_view
.type(), MessageInTransit::kTypeChannel
);
403 // Currently, no channel messages take platform handles.
404 if (platform_handles
) {
406 "Received invalid channel message (has platform handles)");
411 switch (message_view
.subtype()) {
412 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint
:
413 DVLOG(2) << "Handling channel message to run message pipe (local ID "
414 << message_view
.destination_id() << ", remote ID "
415 << message_view
.source_id() << ")";
416 if (!RunMessagePipeEndpoint(message_view
.destination_id(),
417 message_view
.source_id())) {
419 "Received invalid channel message to run message pipe");
422 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint
:
423 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
424 << message_view
.destination_id() << ", remote ID "
425 << message_view
.source_id() << ")";
426 if (!RemoveMessagePipeEndpoint(message_view
.destination_id(),
427 message_view
.source_id())) {
429 "Received invalid channel message to remove message pipe");
432 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck
:
433 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
434 "ID " << message_view
.destination_id() << ", remote ID "
435 << message_view
.source_id() << ")";
436 if (!RemoveMessagePipeEndpoint(message_view
.destination_id(),
437 message_view
.source_id())) {
439 "Received invalid channel message to ack remove message pipe");
443 HandleRemoteError("Received invalid channel message");
449 bool Channel::RemoveMessagePipeEndpoint(
450 MessageInTransit::EndpointId local_id
,
451 MessageInTransit::EndpointId remote_id
) {
452 DCHECK(creation_thread_checker_
.CalledOnValidThread());
454 // If this is non-null after the locked block, the endpoint should be detached
455 // (and no remove ack message sent).
456 scoped_refptr
<ChannelEndpoint
> endpoint_to_detach
;
457 scoped_refptr
<MessagePipe
> message_pipe
;
460 base::AutoLock
locker(lock_
);
462 IdToEndpointMap::iterator it
= local_id_to_endpoint_map_
.find(local_id
);
463 if (it
== local_id_to_endpoint_map_
.end()) {
464 DVLOG(2) << "Remove message pipe error: not found";
468 switch (it
->second
->state_
) {
469 case ChannelEndpoint::STATE_NORMAL
:
470 it
->second
->state_
= ChannelEndpoint::STATE_WAIT_LOCAL_DETACH
;
471 message_pipe
= it
->second
->message_pipe_
;
472 port
= it
->second
->port_
;
473 it
->second
->message_pipe_
= nullptr;
474 // We have to send a remove ack message (outside the lock).
476 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH
:
477 DVLOG(2) << "Remove message pipe error: wrong state";
479 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK
:
480 endpoint_to_detach
= it
->second
;
481 local_id_to_endpoint_map_
.erase(it
);
482 // We have to detach (outside the lock).
486 if (endpoint_to_detach
.get()) {
487 endpoint_to_detach
->DetachFromChannel();
491 if (!SendControlMessage(
492 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck
,
495 HandleLocalError(base::StringPrintf(
496 "Failed to send message to remove remote message pipe endpoint ack "
497 "(local ID %u, remote ID %u)",
498 static_cast<unsigned>(local_id
),
499 static_cast<unsigned>(remote_id
)));
502 message_pipe
->OnRemove(port
);
507 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype
,
508 MessageInTransit::EndpointId local_id
,
509 MessageInTransit::EndpointId remote_id
) {
510 DVLOG(2) << "Sending channel control message: subtype " << subtype
511 << ", local ID " << local_id
<< ", remote ID " << remote_id
;
512 scoped_ptr
<MessageInTransit
> message(new MessageInTransit(
513 MessageInTransit::kTypeChannel
, subtype
, 0, nullptr));
514 message
->set_source_id(local_id
);
515 message
->set_destination_id(remote_id
);
516 return WriteMessage(message
.Pass());
519 void Channel::HandleRemoteError(const base::StringPiece
& error_message
) {
520 // TODO(vtl): Is this how we really want to handle this? Probably we want to
521 // terminate the connection, since it's spewing invalid stuff.
522 LOG(WARNING
) << error_message
;
525 void Channel::HandleLocalError(const base::StringPiece
& error_message
) {
526 // TODO(vtl): Is this how we really want to handle this?
527 // Sometimes we'll want to propagate the error back to the message pipe
528 // (endpoint), and notify it that the remote is (effectively) closed.
529 // Sometimes we'll want to kill the channel (and notify all the endpoints that
530 // their remotes are dead.
531 LOG(WARNING
) << error_message
;
534 } // namespace system