Mojo: Remove MessagePipe::Attach().
[chromium-blink-merge.git] / mojo / system / channel.cc
blob3114a59106c9e2b5f5bb3d02ab2ae06cb5dd1ca9
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/channel.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/macros.h"
13 #include "base/strings/stringprintf.h"
14 #include "mojo/embedder/platform_handle_vector.h"
15 #include "mojo/system/message_pipe_endpoint.h"
16 #include "mojo/system/transport_data.h"
18 namespace mojo {
19 namespace system {
21 COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
22 MessageInTransit::kInvalidEndpointId,
23 kBootstrapEndpointId_is_invalid);
25 STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
26 Channel::kBootstrapEndpointId;
28 Channel::Channel(embedder::PlatformSupport* platform_support)
29 : platform_support_(platform_support),
30 is_running_(false),
31 is_shutting_down_(false),
32 next_local_id_(kBootstrapEndpointId) {
35 bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
36 DCHECK(creation_thread_checker_.CalledOnValidThread());
37 DCHECK(raw_channel);
39 // No need to take |lock_|, since this must be called before this object
40 // becomes thread-safe.
41 DCHECK(!is_running_);
42 raw_channel_ = raw_channel.Pass();
44 if (!raw_channel_->Init(this)) {
45 raw_channel_.reset();
46 return false;
49 is_running_ = true;
50 return true;
53 void Channel::Shutdown() {
54 DCHECK(creation_thread_checker_.CalledOnValidThread());
56 IdToEndpointMap to_destroy;
58 base::AutoLock locker(lock_);
59 if (!is_running_)
60 return;
62 // Note: Don't reset |raw_channel_|, in case we're being called from within
63 // |OnReadMessage()| or |OnError()|.
64 raw_channel_->Shutdown();
65 is_running_ = false;
67 // We need to deal with it outside the lock.
68 std::swap(to_destroy, local_id_to_endpoint_map_);
71 size_t num_live = 0;
72 size_t num_zombies = 0;
73 for (IdToEndpointMap::iterator it = to_destroy.begin();
74 it != to_destroy.end();
75 ++it) {
76 if (it->second->state_ == ChannelEndpoint::STATE_NORMAL) {
77 it->second->message_pipe_->OnRemove(it->second->port_);
78 num_live++;
79 } else {
80 DCHECK(!it->second->message_pipe_.get());
81 num_zombies++;
83 it->second->DetachFromChannel();
85 DVLOG_IF(2, num_live || num_zombies) << "Shut down Channel with " << num_live
86 << " live endpoints and " << num_zombies
87 << " zombies";
90 void Channel::WillShutdownSoon() {
91 base::AutoLock locker(lock_);
92 is_shutting_down_ = true;
95 // Note: |endpoint| being a |scoped_refptr| makes this function safe, since it
96 // keeps the endpoint alive even after the lock is released. Otherwise, there's
97 // the temptation to simply pass the result of |new ChannelEndpoint(...)|
98 // directly to this function, which wouldn't be sufficient for safety.
99 MessageInTransit::EndpointId Channel::AttachEndpoint(
100 scoped_refptr<ChannelEndpoint> endpoint) {
101 DCHECK(endpoint.get());
103 MessageInTransit::EndpointId local_id;
105 base::AutoLock locker(lock_);
107 DLOG_IF(WARNING, is_shutting_down_)
108 << "AttachEndpoint() while shutting down";
110 while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
111 local_id_to_endpoint_map_.find(next_local_id_) !=
112 local_id_to_endpoint_map_.end())
113 next_local_id_++;
115 local_id = next_local_id_;
116 next_local_id_++;
117 local_id_to_endpoint_map_[local_id] = endpoint;
120 endpoint->AttachToChannel(this, local_id);
121 return local_id;
124 bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
125 MessageInTransit::EndpointId remote_id) {
126 scoped_refptr<ChannelEndpoint> endpoint;
127 ChannelEndpoint::State state;
128 scoped_refptr<MessagePipe> message_pipe;
129 unsigned port;
131 base::AutoLock locker(lock_);
133 DLOG_IF(WARNING, is_shutting_down_)
134 << "RunMessagePipeEndpoint() while shutting down";
136 IdToEndpointMap::const_iterator it =
137 local_id_to_endpoint_map_.find(local_id);
138 if (it == local_id_to_endpoint_map_.end())
139 return false;
140 endpoint = it->second;
141 state = it->second->state_;
142 message_pipe = it->second->message_pipe_;
143 port = it->second->port_;
146 // Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
147 // and ignore it.
148 if (state != ChannelEndpoint::STATE_NORMAL) {
149 DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
150 "(local ID " << local_id << ", remote ID " << remote_id << ")";
151 return true;
154 // TODO(vtl): FIXME -- We need to handle the case that message pipe is already
155 // running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
156 endpoint->Run(remote_id);
157 // TODO(vtl): Get rid of this.
158 message_pipe->Run(port);
159 return true;
162 void Channel::RunRemoteMessagePipeEndpoint(
163 MessageInTransit::EndpointId local_id,
164 MessageInTransit::EndpointId remote_id) {
165 #if DCHECK_IS_ON
167 base::AutoLock locker(lock_);
168 DCHECK(local_id_to_endpoint_map_.find(local_id) !=
169 local_id_to_endpoint_map_.end());
171 #endif
173 if (!SendControlMessage(
174 MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
175 local_id,
176 remote_id)) {
177 HandleLocalError(base::StringPrintf(
178 "Failed to send message to run remote message pipe endpoint (local ID "
179 "%u, remote ID %u)",
180 static_cast<unsigned>(local_id),
181 static_cast<unsigned>(remote_id)));
185 bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
186 base::AutoLock locker(lock_);
187 if (!is_running_) {
188 // TODO(vtl): I think this is probably not an error condition, but I should
189 // think about it (and the shutdown sequence) more carefully.
190 LOG(WARNING) << "WriteMessage() after shutdown";
191 return false;
194 DLOG_IF(WARNING, is_shutting_down_) << "WriteMessage() while shutting down";
195 return raw_channel_->WriteMessage(message.Pass());
198 bool Channel::IsWriteBufferEmpty() {
199 base::AutoLock locker(lock_);
200 if (!is_running_)
201 return true;
202 return raw_channel_->IsWriteBufferEmpty();
205 void Channel::DetachMessagePipeEndpoint(
206 MessageInTransit::EndpointId local_id,
207 MessageInTransit::EndpointId remote_id) {
208 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
210 // If this is non-null after the locked block, the endpoint should be detached
211 // (and no remove message sent).
212 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
214 base::AutoLock locker_(lock_);
215 if (!is_running_)
216 return;
218 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
219 DCHECK(it != local_id_to_endpoint_map_.end());
221 switch (it->second->state_) {
222 case ChannelEndpoint::STATE_NORMAL:
223 it->second->state_ = ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK;
224 it->second->message_pipe_ = nullptr;
225 if (remote_id == MessageInTransit::kInvalidEndpointId)
226 return;
227 // We have to send a remove message (outside the lock).
228 break;
229 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
230 endpoint_to_detach = it->second;
231 local_id_to_endpoint_map_.erase(it);
232 // We have to detach (outside the lock).
233 break;
234 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
235 NOTREACHED();
236 return;
239 if (endpoint_to_detach.get()) {
240 endpoint_to_detach->DetachFromChannel();
241 return;
244 if (!SendControlMessage(
245 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
246 local_id,
247 remote_id)) {
248 HandleLocalError(base::StringPrintf(
249 "Failed to send message to remove remote message pipe endpoint (local "
250 "ID %u, remote ID %u)",
251 static_cast<unsigned>(local_id),
252 static_cast<unsigned>(remote_id)));
256 size_t Channel::GetSerializedPlatformHandleSize() const {
257 return raw_channel_->GetSerializedPlatformHandleSize();
260 Channel::~Channel() {
261 // The channel should have been shut down first.
262 DCHECK(!is_running_);
265 void Channel::OnReadMessage(
266 const MessageInTransit::View& message_view,
267 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
268 DCHECK(creation_thread_checker_.CalledOnValidThread());
270 switch (message_view.type()) {
271 case MessageInTransit::kTypeMessagePipeEndpoint:
272 case MessageInTransit::kTypeMessagePipe:
273 OnReadMessageForDownstream(message_view, platform_handles.Pass());
274 break;
275 case MessageInTransit::kTypeChannel:
276 OnReadMessageForChannel(message_view, platform_handles.Pass());
277 break;
278 default:
279 HandleRemoteError(
280 base::StringPrintf("Received message of invalid type %u",
281 static_cast<unsigned>(message_view.type())));
282 break;
286 void Channel::OnError(Error error) {
287 DCHECK(creation_thread_checker_.CalledOnValidThread());
289 switch (error) {
290 case ERROR_READ_SHUTDOWN:
291 // The other side was cleanly closed, so this isn't actually an error.
292 DVLOG(1) << "RawChannel read error (shutdown)";
293 break;
294 case ERROR_READ_BROKEN: {
295 base::AutoLock locker(lock_);
296 LOG_IF(ERROR, !is_shutting_down_)
297 << "RawChannel read error (connection broken)";
298 break;
300 case ERROR_READ_BAD_MESSAGE:
301 // Receiving a bad message means either a bug, data corruption, or
302 // malicious attack (probably due to some other bug).
303 LOG(ERROR) << "RawChannel read error (received bad message)";
304 break;
305 case ERROR_READ_UNKNOWN:
306 LOG(ERROR) << "RawChannel read error (unknown)";
307 break;
308 case ERROR_WRITE:
309 // Write errors are slightly notable: they probably shouldn't happen under
310 // normal operation (but maybe the other side crashed).
311 LOG(WARNING) << "RawChannel write error";
312 break;
314 Shutdown();
317 void Channel::OnReadMessageForDownstream(
318 const MessageInTransit::View& message_view,
319 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
320 DCHECK(creation_thread_checker_.CalledOnValidThread());
321 DCHECK(message_view.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
322 message_view.type() == MessageInTransit::kTypeMessagePipe);
324 MessageInTransit::EndpointId local_id = message_view.destination_id();
325 if (local_id == MessageInTransit::kInvalidEndpointId) {
326 HandleRemoteError("Received message with no destination ID");
327 return;
330 ChannelEndpoint::State state = ChannelEndpoint::STATE_NORMAL;
331 scoped_refptr<MessagePipe> message_pipe;
332 unsigned port = ~0u;
333 bool nonexistent_local_id_error = false;
335 base::AutoLock locker(lock_);
337 // Since we own |raw_channel_|, and this method and |Shutdown()| should only
338 // be called from the creation thread, |raw_channel_| should never be null
339 // here.
340 DCHECK(is_running_);
342 IdToEndpointMap::const_iterator it =
343 local_id_to_endpoint_map_.find(local_id);
344 if (it == local_id_to_endpoint_map_.end()) {
345 nonexistent_local_id_error = true;
346 } else {
347 state = it->second->state_;
348 message_pipe = it->second->message_pipe_;
349 port = it->second->port_;
352 if (nonexistent_local_id_error) {
353 HandleRemoteError(base::StringPrintf(
354 "Received a message for nonexistent local destination ID %u",
355 static_cast<unsigned>(local_id)));
356 // This is strongly indicative of some problem. However, it's not a fatal
357 // error, since it may indicate a buggy (or hostile) remote process. Don't
358 // die even for Debug builds, since handling this properly needs to be
359 // tested (TODO(vtl)).
360 DLOG(ERROR) << "This should not happen under normal operation.";
361 return;
364 // Ignore messages for zombie endpoints (not an error).
365 if (state != ChannelEndpoint::STATE_NORMAL) {
366 DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
367 << local_id << ", remote ID = " << message_view.source_id() << ")";
368 return;
371 // We need to duplicate the message (data), because |EnqueueMessage()| will
372 // take ownership of it.
373 scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
374 if (message_view.transport_data_buffer_size() > 0) {
375 DCHECK(message_view.transport_data_buffer());
376 message->SetDispatchers(TransportData::DeserializeDispatchers(
377 message_view.transport_data_buffer(),
378 message_view.transport_data_buffer_size(),
379 platform_handles.Pass(),
380 this));
382 MojoResult result = message_pipe->EnqueueMessage(
383 MessagePipe::GetPeerPort(port), message.Pass());
384 if (result != MOJO_RESULT_OK) {
385 // TODO(vtl): This might be a "non-error", e.g., if the destination endpoint
386 // has been closed (in an unavoidable race). This might also be a "remote"
387 // error, e.g., if the remote side is sending invalid control messages (to
388 // the message pipe).
389 HandleLocalError(base::StringPrintf(
390 "Failed to enqueue message to local ID %u (result %d)",
391 static_cast<unsigned>(local_id),
392 static_cast<int>(result)));
393 return;
397 void Channel::OnReadMessageForChannel(
398 const MessageInTransit::View& message_view,
399 embedder::ScopedPlatformHandleVectorPtr platform_handles) {
400 DCHECK(creation_thread_checker_.CalledOnValidThread());
401 DCHECK_EQ(message_view.type(), MessageInTransit::kTypeChannel);
403 // Currently, no channel messages take platform handles.
404 if (platform_handles) {
405 HandleRemoteError(
406 "Received invalid channel message (has platform handles)");
407 NOTREACHED();
408 return;
411 switch (message_view.subtype()) {
412 case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
413 DVLOG(2) << "Handling channel message to run message pipe (local ID "
414 << message_view.destination_id() << ", remote ID "
415 << message_view.source_id() << ")";
416 if (!RunMessagePipeEndpoint(message_view.destination_id(),
417 message_view.source_id())) {
418 HandleRemoteError(
419 "Received invalid channel message to run message pipe");
421 break;
422 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
423 DVLOG(2) << "Handling channel message to remove message pipe (local ID "
424 << message_view.destination_id() << ", remote ID "
425 << message_view.source_id() << ")";
426 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
427 message_view.source_id())) {
428 HandleRemoteError(
429 "Received invalid channel message to remove message pipe");
431 break;
432 case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
433 DVLOG(2) << "Handling channel message to ack remove message pipe (local "
434 "ID " << message_view.destination_id() << ", remote ID "
435 << message_view.source_id() << ")";
436 if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
437 message_view.source_id())) {
438 HandleRemoteError(
439 "Received invalid channel message to ack remove message pipe");
441 break;
442 default:
443 HandleRemoteError("Received invalid channel message");
444 NOTREACHED();
445 break;
449 bool Channel::RemoveMessagePipeEndpoint(
450 MessageInTransit::EndpointId local_id,
451 MessageInTransit::EndpointId remote_id) {
452 DCHECK(creation_thread_checker_.CalledOnValidThread());
454 // If this is non-null after the locked block, the endpoint should be detached
455 // (and no remove ack message sent).
456 scoped_refptr<ChannelEndpoint> endpoint_to_detach;
457 scoped_refptr<MessagePipe> message_pipe;
458 unsigned port = ~0u;
460 base::AutoLock locker(lock_);
462 IdToEndpointMap::iterator it = local_id_to_endpoint_map_.find(local_id);
463 if (it == local_id_to_endpoint_map_.end()) {
464 DVLOG(2) << "Remove message pipe error: not found";
465 return false;
468 switch (it->second->state_) {
469 case ChannelEndpoint::STATE_NORMAL:
470 it->second->state_ = ChannelEndpoint::STATE_WAIT_LOCAL_DETACH;
471 message_pipe = it->second->message_pipe_;
472 port = it->second->port_;
473 it->second->message_pipe_ = nullptr;
474 // We have to send a remove ack message (outside the lock).
475 break;
476 case ChannelEndpoint::STATE_WAIT_LOCAL_DETACH:
477 DVLOG(2) << "Remove message pipe error: wrong state";
478 return false;
479 case ChannelEndpoint::STATE_WAIT_REMOTE_REMOVE_ACK:
480 endpoint_to_detach = it->second;
481 local_id_to_endpoint_map_.erase(it);
482 // We have to detach (outside the lock).
483 break;
486 if (endpoint_to_detach.get()) {
487 endpoint_to_detach->DetachFromChannel();
488 return true;
491 if (!SendControlMessage(
492 MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
493 local_id,
494 remote_id)) {
495 HandleLocalError(base::StringPrintf(
496 "Failed to send message to remove remote message pipe endpoint ack "
497 "(local ID %u, remote ID %u)",
498 static_cast<unsigned>(local_id),
499 static_cast<unsigned>(remote_id)));
502 message_pipe->OnRemove(port);
504 return true;
507 bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
508 MessageInTransit::EndpointId local_id,
509 MessageInTransit::EndpointId remote_id) {
510 DVLOG(2) << "Sending channel control message: subtype " << subtype
511 << ", local ID " << local_id << ", remote ID " << remote_id;
512 scoped_ptr<MessageInTransit> message(new MessageInTransit(
513 MessageInTransit::kTypeChannel, subtype, 0, nullptr));
514 message->set_source_id(local_id);
515 message->set_destination_id(remote_id);
516 return WriteMessage(message.Pass());
519 void Channel::HandleRemoteError(const base::StringPiece& error_message) {
520 // TODO(vtl): Is this how we really want to handle this? Probably we want to
521 // terminate the connection, since it's spewing invalid stuff.
522 LOG(WARNING) << error_message;
525 void Channel::HandleLocalError(const base::StringPiece& error_message) {
526 // TODO(vtl): Is this how we really want to handle this?
527 // Sometimes we'll want to propagate the error back to the message pipe
528 // (endpoint), and notify it that the remote is (effectively) closed.
529 // Sometimes we'll want to kill the channel (and notify all the endpoints that
530 // their remotes are dead.
531 LOG(WARNING) << error_message;
534 } // namespace system
535 } // namespace mojo