tuple: update to make use of C++11
[chromium-blink-merge.git] / mojo / edk / system / message_pipe.cc
blob3e243b648419e5c25a7e74ecf69398e4db5aac82
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/endpoint_relayer.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/message_pipe_dispatcher.h"
15 #include "mojo/edk/system/message_pipe_endpoint.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
18 namespace mojo {
19 namespace system {
21 namespace {
23 // TODO(vtl): Move this into |Channel| (and possible further).
24 struct SerializedMessagePipe {
25 // This is the endpoint ID on the receiving side, and should be a "remote ID".
26 // (The receiving side should already have had an endpoint attached and been
27 // run via the |Channel|s. This endpoint will have both IDs assigned, so this
28 // ID is only needed to associate that endpoint with a particular dispatcher.)
29 ChannelEndpointId receiver_endpoint_id;
32 } // namespace
34 // static
35 MessagePipe* MessagePipe::CreateLocalLocal() {
36 MessagePipe* message_pipe = new MessagePipe();
37 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
38 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
39 return message_pipe;
42 // static
43 MessagePipe* MessagePipe::CreateLocalProxy(
44 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
45 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
46 MessagePipe* message_pipe = new MessagePipe();
47 message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
48 *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
49 message_pipe->endpoints_[1].reset(
50 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
51 return message_pipe;
54 // static
55 MessagePipe* MessagePipe::CreateProxyLocal(
56 scoped_refptr<ChannelEndpoint>* channel_endpoint) {
57 DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
58 MessagePipe* message_pipe = new MessagePipe();
59 *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
60 message_pipe->endpoints_[0].reset(
61 new ProxyMessagePipeEndpoint(channel_endpoint->get()));
62 message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
63 return message_pipe;
66 // static
67 unsigned MessagePipe::GetPeerPort(unsigned port) {
68 DCHECK(port == 0 || port == 1);
69 return port ^ 1;
72 // static
73 bool MessagePipe::Deserialize(Channel* channel,
74 const void* source,
75 size_t size,
76 scoped_refptr<MessagePipe>* message_pipe,
77 unsigned* port) {
78 DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
80 if (size != sizeof(SerializedMessagePipe)) {
81 LOG(ERROR) << "Invalid serialized message pipe";
82 return false;
85 const SerializedMessagePipe* s =
86 static_cast<const SerializedMessagePipe*>(source);
87 *message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
88 if (!*message_pipe) {
89 LOG(ERROR) << "Failed to deserialize message pipe (ID = "
90 << s->receiver_endpoint_id << ")";
91 return false;
94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
95 << s->receiver_endpoint_id << ")";
96 *port = 0;
97 return true;
100 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
101 DCHECK(port == 0 || port == 1);
102 base::AutoLock locker(lock_);
103 DCHECK(endpoints_[port]);
105 return endpoints_[port]->GetType();
108 void MessagePipe::CancelAllAwakables(unsigned port) {
109 DCHECK(port == 0 || port == 1);
111 base::AutoLock locker(lock_);
112 DCHECK(endpoints_[port]);
113 endpoints_[port]->CancelAllAwakables();
116 void MessagePipe::Close(unsigned port) {
117 DCHECK(port == 0 || port == 1);
119 unsigned peer_port = GetPeerPort(port);
121 base::AutoLock locker(lock_);
122 // The endpoint's |OnPeerClose()| may have been called first and returned
123 // false, which would have resulted in its destruction.
124 if (!endpoints_[port])
125 return;
127 endpoints_[port]->Close();
128 if (endpoints_[peer_port]) {
129 if (!endpoints_[peer_port]->OnPeerClose())
130 endpoints_[peer_port].reset();
132 endpoints_[port].reset();
135 // TODO(vtl): Handle flags.
136 MojoResult MessagePipe::WriteMessage(
137 unsigned port,
138 UserPointer<const void> bytes,
139 uint32_t num_bytes,
140 std::vector<DispatcherTransport>* transports,
141 MojoWriteMessageFlags flags) {
142 DCHECK(port == 0 || port == 1);
144 base::AutoLock locker(lock_);
145 return EnqueueMessageNoLock(
146 GetPeerPort(port),
147 make_scoped_ptr(new MessageInTransit(
148 MessageInTransit::kTypeEndpoint,
149 MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)),
150 transports);
153 MojoResult MessagePipe::ReadMessage(unsigned port,
154 UserPointer<void> bytes,
155 UserPointer<uint32_t> num_bytes,
156 DispatcherVector* dispatchers,
157 uint32_t* num_dispatchers,
158 MojoReadMessageFlags flags) {
159 DCHECK(port == 0 || port == 1);
161 base::AutoLock locker(lock_);
162 DCHECK(endpoints_[port]);
164 return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
165 num_dispatchers, flags);
168 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
169 DCHECK(port == 0 || port == 1);
171 base::AutoLock locker(const_cast<base::Lock&>(lock_));
172 DCHECK(endpoints_[port]);
174 return endpoints_[port]->GetHandleSignalsState();
177 MojoResult MessagePipe::AddAwakable(unsigned port,
178 Awakable* awakable,
179 MojoHandleSignals signals,
180 uint32_t context,
181 HandleSignalsState* signals_state) {
182 DCHECK(port == 0 || port == 1);
184 base::AutoLock locker(lock_);
185 DCHECK(endpoints_[port]);
187 return endpoints_[port]->AddAwakable(awakable, signals, context,
188 signals_state);
191 void MessagePipe::RemoveAwakable(unsigned port,
192 Awakable* awakable,
193 HandleSignalsState* signals_state) {
194 DCHECK(port == 0 || port == 1);
196 base::AutoLock locker(lock_);
197 DCHECK(endpoints_[port]);
199 endpoints_[port]->RemoveAwakable(awakable, signals_state);
202 void MessagePipe::StartSerialize(unsigned /*port*/,
203 Channel* /*channel*/,
204 size_t* max_size,
205 size_t* max_platform_handles) {
206 *max_size = sizeof(SerializedMessagePipe);
207 *max_platform_handles = 0;
210 bool MessagePipe::EndSerialize(
211 unsigned port,
212 Channel* channel,
213 void* destination,
214 size_t* actual_size,
215 embedder::PlatformHandleVector* /*platform_handles*/) {
216 DCHECK(port == 0 || port == 1);
218 scoped_refptr<ChannelEndpoint> channel_endpoint;
220 base::AutoLock locker(lock_);
221 DCHECK(endpoints_[port]);
223 // The port being serialized must be local.
224 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
226 // There are three possibilities for the peer port (below). In all cases, we
227 // pass the contents of |port|'s message queue to the channel, and it'll
228 // (presumably) make a |ChannelEndpoint| from it.
230 // 1. The peer port is (known to be) closed.
232 // There's no reason for us to continue to exist and no need for the
233 // channel to give us the |ChannelEndpoint|. It only remains for us to
234 // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for
235 // destruction.
237 // 2. The peer port is local (the typical case).
239 // The channel gives us back a |ChannelEndpoint|, which we hook up to a
240 // |ProxyMessagePipeEndpoint| to replace |port|'s
241 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer
242 // port's message pipe dispatcher will continue to hold a reference to
243 // us.
245 // 3. The peer port is remote.
247 // We also pass its |ChannelEndpoint| to the channel, which then decides
248 // what to do. We have no reason to continue to exist.
250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
252 unsigned peer_port = GetPeerPort(port);
253 if (!endpoints_[peer_port]) {
254 // Case 1.
255 channel_endpoint = new ChannelEndpoint(
256 nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
257 endpoints_[port].get())->message_queue());
258 endpoints_[port]->Close();
259 endpoints_[port].reset();
260 } else if (endpoints_[peer_port]->GetType() ==
261 MessagePipeEndpoint::kTypeLocal) {
262 // Case 2.
263 channel_endpoint = new ChannelEndpoint(
264 this, port, static_cast<LocalMessagePipeEndpoint*>(
265 endpoints_[port].get())->message_queue());
266 endpoints_[port]->Close();
267 endpoints_[port].reset(
268 new ProxyMessagePipeEndpoint(channel_endpoint.get()));
269 } else {
270 // Case 3.
271 DLOG(WARNING) << "Direct message pipe passing across multiple channels "
272 "not yet implemented; will proxy";
274 // Create an |EndpointRelayer| to replace ourselves (rather than having a
275 // |MessagePipe| object that exists solely to relay messages between two
276 // |ChannelEndpoint|s, owned by the |Channel| through them.
278 // This reduces overhead somewhat, and more importantly restores some
279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
281 // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
282 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
283 // pointer (and not have the |Channel| owning the relayer via its
284 // |ChannelEndpoint|s.
286 // TODO(vtl): This is not obviously the right place for (all of) this
287 // logic, nor is it obviously factored correctly.
289 DCHECK_EQ(endpoints_[peer_port]->GetType(),
290 MessagePipeEndpoint::kTypeProxy);
291 ProxyMessagePipeEndpoint* peer_endpoint =
292 static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
293 scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
294 peer_endpoint->ReleaseChannelEndpoint();
296 scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
297 // We'll assign our peer port's endpoint to the relayer's port 1, and this
298 // port's endpoint to the relayer's port 0.
299 channel_endpoint = new ChannelEndpoint(
300 relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
301 endpoints_[port].get())->message_queue());
302 relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
303 peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
305 endpoints_[port]->Close();
306 endpoints_[port].reset();
307 // No need to call |Close()| after |ReleaseChannelEndpoint()|.
308 endpoints_[peer_port].reset();
312 SerializedMessagePipe* s = static_cast<SerializedMessagePipe*>(destination);
314 // Convert the local endpoint to a proxy endpoint (moving the message queue)
315 // and attach it to the channel.
316 s->receiver_endpoint_id =
317 channel->AttachAndRunEndpoint(channel_endpoint, false);
318 DVLOG(2) << "Serializing message pipe (remote ID = "
319 << s->receiver_endpoint_id << ")";
320 *actual_size = sizeof(SerializedMessagePipe);
321 return true;
324 bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
325 base::AutoLock locker(lock_);
327 if (!endpoints_[port]) {
328 // This will happen only on the rare occasion that the call to
329 // |OnReadMessage()| is racing with us calling
330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
331 // and the |ChannelEndpoint| can retry (calling the new client's
332 // |OnReadMessage()|).
333 return false;
336 // This is called when the |ChannelEndpoint| for the
337 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
338 // We need to pass this message on to its peer port (typically a
339 // |LocalMessagePipeEndpoint|).
340 MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
341 make_scoped_ptr(message), nullptr);
342 DLOG_IF(WARNING, result != MOJO_RESULT_OK)
343 << "EnqueueMessageNoLock() failed (result = " << result << ")";
344 return true;
347 void MessagePipe::OnDetachFromChannel(unsigned port) {
348 Close(port);
351 MessagePipe::MessagePipe() {
354 MessagePipe::~MessagePipe() {
355 // Owned by the dispatchers. The owning dispatchers should only release us via
356 // their |Close()| method, which should inform us of being closed via our
357 // |Close()|. Thus these should already be null.
358 DCHECK(!endpoints_[0]);
359 DCHECK(!endpoints_[1]);
362 MojoResult MessagePipe::EnqueueMessageNoLock(
363 unsigned port,
364 scoped_ptr<MessageInTransit> message,
365 std::vector<DispatcherTransport>* transports) {
366 DCHECK(port == 0 || port == 1);
367 DCHECK(message);
369 DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
370 DCHECK(endpoints_[GetPeerPort(port)]);
372 // The destination port need not be open, unlike the source port.
373 if (!endpoints_[port])
374 return MOJO_RESULT_FAILED_PRECONDITION;
376 if (transports) {
377 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
378 if (result != MOJO_RESULT_OK)
379 return result;
382 // The endpoint's |EnqueueMessage()| may not report failure.
383 endpoints_[port]->EnqueueMessage(message.Pass());
384 return MOJO_RESULT_OK;
387 MojoResult MessagePipe::AttachTransportsNoLock(
388 unsigned port,
389 MessageInTransit* message,
390 std::vector<DispatcherTransport>* transports) {
391 DCHECK(!message->has_dispatchers());
393 // You're not allowed to send either handle to a message pipe over the message
394 // pipe, so check for this. (The case of trying to write a handle to itself is
395 // taken care of by |Core|. That case kind of makes sense, but leads to
396 // complications if, e.g., both sides try to do the same thing with their
397 // respective handles simultaneously. The other case, of trying to write the
398 // peer handle to a handle, doesn't make sense -- since no handle will be
399 // available to read the message from.)
400 for (size_t i = 0; i < transports->size(); i++) {
401 if (!(*transports)[i].is_valid())
402 continue;
403 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
404 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
405 if (mp_transport.GetMessagePipe() == this) {
406 // The other case should have been disallowed by |Core|. (Note: |port|
407 // is the peer port of the handle given to |WriteMessage()|.)
408 DCHECK_EQ(mp_transport.GetPort(), port);
409 return MOJO_RESULT_INVALID_ARGUMENT;
414 // Clone the dispatchers and attach them to the message. (This must be done as
415 // a separate loop, since we want to leave the dispatchers alone on failure.)
416 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
417 dispatchers->reserve(transports->size());
418 for (size_t i = 0; i < transports->size(); i++) {
419 if ((*transports)[i].is_valid()) {
420 dispatchers->push_back(
421 (*transports)[i].CreateEquivalentDispatcherAndClose());
422 } else {
423 LOG(WARNING) << "Enqueueing null dispatcher";
424 dispatchers->push_back(nullptr);
427 message->SetDispatchers(dispatchers.Pass());
428 return MOJO_RESULT_OK;
431 } // namespace system
432 } // namespace mojo