1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/edk/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/edk/system/channel.h"
9 #include "mojo/edk/system/channel_endpoint.h"
10 #include "mojo/edk/system/channel_endpoint_id.h"
11 #include "mojo/edk/system/endpoint_relayer.h"
12 #include "mojo/edk/system/local_message_pipe_endpoint.h"
13 #include "mojo/edk/system/message_in_transit.h"
14 #include "mojo/edk/system/message_pipe_dispatcher.h"
15 #include "mojo/edk/system/message_pipe_endpoint.h"
16 #include "mojo/edk/system/proxy_message_pipe_endpoint.h"
23 // TODO(vtl): Move this into |Channel| (and possible further).
24 struct SerializedMessagePipe
{
25 // This is the endpoint ID on the receiving side, and should be a "remote ID".
26 // (The receiving side should already have had an endpoint attached and been
27 // run via the |Channel|s. This endpoint will have both IDs assigned, so this
28 // ID is only needed to associate that endpoint with a particular dispatcher.)
29 ChannelEndpointId receiver_endpoint_id
;
35 MessagePipe
* MessagePipe::CreateLocalLocal() {
36 MessagePipe
* message_pipe
= new MessagePipe();
37 message_pipe
->endpoints_
[0].reset(new LocalMessagePipeEndpoint());
38 message_pipe
->endpoints_
[1].reset(new LocalMessagePipeEndpoint());
43 MessagePipe
* MessagePipe::CreateLocalProxy(
44 scoped_refptr
<ChannelEndpoint
>* channel_endpoint
) {
45 DCHECK(!*channel_endpoint
); // Not technically wrong, but unlikely.
46 MessagePipe
* message_pipe
= new MessagePipe();
47 message_pipe
->endpoints_
[0].reset(new LocalMessagePipeEndpoint());
48 *channel_endpoint
= new ChannelEndpoint(message_pipe
, 1);
49 message_pipe
->endpoints_
[1].reset(
50 new ProxyMessagePipeEndpoint(channel_endpoint
->get()));
55 MessagePipe
* MessagePipe::CreateProxyLocal(
56 scoped_refptr
<ChannelEndpoint
>* channel_endpoint
) {
57 DCHECK(!*channel_endpoint
); // Not technically wrong, but unlikely.
58 MessagePipe
* message_pipe
= new MessagePipe();
59 *channel_endpoint
= new ChannelEndpoint(message_pipe
, 0);
60 message_pipe
->endpoints_
[0].reset(
61 new ProxyMessagePipeEndpoint(channel_endpoint
->get()));
62 message_pipe
->endpoints_
[1].reset(new LocalMessagePipeEndpoint());
67 unsigned MessagePipe::GetPeerPort(unsigned port
) {
68 DCHECK(port
== 0 || port
== 1);
73 bool MessagePipe::Deserialize(Channel
* channel
,
76 scoped_refptr
<MessagePipe
>* message_pipe
,
78 DCHECK(!*message_pipe
); // Not technically wrong, but unlikely.
80 if (size
!= sizeof(SerializedMessagePipe
)) {
81 LOG(ERROR
) << "Invalid serialized message pipe";
85 const SerializedMessagePipe
* s
=
86 static_cast<const SerializedMessagePipe
*>(source
);
87 *message_pipe
= channel
->PassIncomingMessagePipe(s
->receiver_endpoint_id
);
89 LOG(ERROR
) << "Failed to deserialize message pipe (ID = "
90 << s
->receiver_endpoint_id
<< ")";
94 DVLOG(2) << "Deserializing message pipe dispatcher (new local ID = "
95 << s
->receiver_endpoint_id
<< ")";
100 MessagePipeEndpoint::Type
MessagePipe::GetType(unsigned port
) {
101 DCHECK(port
== 0 || port
== 1);
102 base::AutoLock
locker(lock_
);
103 DCHECK(endpoints_
[port
]);
105 return endpoints_
[port
]->GetType();
108 void MessagePipe::CancelAllAwakables(unsigned port
) {
109 DCHECK(port
== 0 || port
== 1);
111 base::AutoLock
locker(lock_
);
112 DCHECK(endpoints_
[port
]);
113 endpoints_
[port
]->CancelAllAwakables();
116 void MessagePipe::Close(unsigned port
) {
117 DCHECK(port
== 0 || port
== 1);
119 unsigned peer_port
= GetPeerPort(port
);
121 base::AutoLock
locker(lock_
);
122 // The endpoint's |OnPeerClose()| may have been called first and returned
123 // false, which would have resulted in its destruction.
124 if (!endpoints_
[port
])
127 endpoints_
[port
]->Close();
128 if (endpoints_
[peer_port
]) {
129 if (!endpoints_
[peer_port
]->OnPeerClose())
130 endpoints_
[peer_port
].reset();
132 endpoints_
[port
].reset();
135 // TODO(vtl): Handle flags.
136 MojoResult
MessagePipe::WriteMessage(
138 UserPointer
<const void> bytes
,
140 std::vector
<DispatcherTransport
>* transports
,
141 MojoWriteMessageFlags flags
) {
142 DCHECK(port
== 0 || port
== 1);
144 base::AutoLock
locker(lock_
);
145 return EnqueueMessageNoLock(
147 make_scoped_ptr(new MessageInTransit(
148 MessageInTransit::kTypeEndpoint
,
149 MessageInTransit::kSubtypeEndpointData
, num_bytes
, bytes
)),
153 MojoResult
MessagePipe::ReadMessage(unsigned port
,
154 UserPointer
<void> bytes
,
155 UserPointer
<uint32_t> num_bytes
,
156 DispatcherVector
* dispatchers
,
157 uint32_t* num_dispatchers
,
158 MojoReadMessageFlags flags
) {
159 DCHECK(port
== 0 || port
== 1);
161 base::AutoLock
locker(lock_
);
162 DCHECK(endpoints_
[port
]);
164 return endpoints_
[port
]->ReadMessage(bytes
, num_bytes
, dispatchers
,
165 num_dispatchers
, flags
);
168 HandleSignalsState
MessagePipe::GetHandleSignalsState(unsigned port
) const {
169 DCHECK(port
== 0 || port
== 1);
171 base::AutoLock
locker(const_cast<base::Lock
&>(lock_
));
172 DCHECK(endpoints_
[port
]);
174 return endpoints_
[port
]->GetHandleSignalsState();
177 MojoResult
MessagePipe::AddAwakable(unsigned port
,
179 MojoHandleSignals signals
,
181 HandleSignalsState
* signals_state
) {
182 DCHECK(port
== 0 || port
== 1);
184 base::AutoLock
locker(lock_
);
185 DCHECK(endpoints_
[port
]);
187 return endpoints_
[port
]->AddAwakable(awakable
, signals
, context
,
191 void MessagePipe::RemoveAwakable(unsigned port
,
193 HandleSignalsState
* signals_state
) {
194 DCHECK(port
== 0 || port
== 1);
196 base::AutoLock
locker(lock_
);
197 DCHECK(endpoints_
[port
]);
199 endpoints_
[port
]->RemoveAwakable(awakable
, signals_state
);
202 void MessagePipe::StartSerialize(unsigned /*port*/,
203 Channel
* /*channel*/,
205 size_t* max_platform_handles
) {
206 *max_size
= sizeof(SerializedMessagePipe
);
207 *max_platform_handles
= 0;
210 bool MessagePipe::EndSerialize(
215 embedder::PlatformHandleVector
* /*platform_handles*/) {
216 DCHECK(port
== 0 || port
== 1);
218 scoped_refptr
<ChannelEndpoint
> channel_endpoint
;
220 base::AutoLock
locker(lock_
);
221 DCHECK(endpoints_
[port
]);
223 // The port being serialized must be local.
224 DCHECK_EQ(endpoints_
[port
]->GetType(), MessagePipeEndpoint::kTypeLocal
);
226 // There are three possibilities for the peer port (below). In all cases, we
227 // pass the contents of |port|'s message queue to the channel, and it'll
228 // (presumably) make a |ChannelEndpoint| from it.
230 // 1. The peer port is (known to be) closed.
232 // There's no reason for us to continue to exist and no need for the
233 // channel to give us the |ChannelEndpoint|. It only remains for us to
234 // "close" |port|'s |LocalMessagePipeEndpoint| and prepare for
237 // 2. The peer port is local (the typical case).
239 // The channel gives us back a |ChannelEndpoint|, which we hook up to a
240 // |ProxyMessagePipeEndpoint| to replace |port|'s
241 // |LocalMessagePipeEndpoint|. We continue to exist, since the peer
242 // port's message pipe dispatcher will continue to hold a reference to
245 // 3. The peer port is remote.
247 // We also pass its |ChannelEndpoint| to the channel, which then decides
248 // what to do. We have no reason to continue to exist.
250 // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
252 unsigned peer_port
= GetPeerPort(port
);
253 if (!endpoints_
[peer_port
]) {
255 channel_endpoint
= new ChannelEndpoint(
256 nullptr, 0, static_cast<LocalMessagePipeEndpoint
*>(
257 endpoints_
[port
].get())->message_queue());
258 endpoints_
[port
]->Close();
259 endpoints_
[port
].reset();
260 } else if (endpoints_
[peer_port
]->GetType() ==
261 MessagePipeEndpoint::kTypeLocal
) {
263 channel_endpoint
= new ChannelEndpoint(
264 this, port
, static_cast<LocalMessagePipeEndpoint
*>(
265 endpoints_
[port
].get())->message_queue());
266 endpoints_
[port
]->Close();
267 endpoints_
[port
].reset(
268 new ProxyMessagePipeEndpoint(channel_endpoint
.get()));
271 DLOG(WARNING
) << "Direct message pipe passing across multiple channels "
272 "not yet implemented; will proxy";
274 // Create an |EndpointRelayer| to replace ourselves (rather than having a
275 // |MessagePipe| object that exists solely to relay messages between two
276 // |ChannelEndpoint|s, owned by the |Channel| through them.
278 // This reduces overhead somewhat, and more importantly restores some
279 // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
281 // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
282 // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
283 // pointer (and not have the |Channel| owning the relayer via its
284 // |ChannelEndpoint|s.
286 // TODO(vtl): This is not obviously the right place for (all of) this
287 // logic, nor is it obviously factored correctly.
289 DCHECK_EQ(endpoints_
[peer_port
]->GetType(),
290 MessagePipeEndpoint::kTypeProxy
);
291 ProxyMessagePipeEndpoint
* peer_endpoint
=
292 static_cast<ProxyMessagePipeEndpoint
*>(endpoints_
[peer_port
].get());
293 scoped_refptr
<ChannelEndpoint
> peer_channel_endpoint
=
294 peer_endpoint
->ReleaseChannelEndpoint();
296 scoped_refptr
<EndpointRelayer
> relayer(new EndpointRelayer());
297 // We'll assign our peer port's endpoint to the relayer's port 1, and this
298 // port's endpoint to the relayer's port 0.
299 channel_endpoint
= new ChannelEndpoint(
300 relayer
.get(), 0, static_cast<LocalMessagePipeEndpoint
*>(
301 endpoints_
[port
].get())->message_queue());
302 relayer
->Init(channel_endpoint
.get(), peer_channel_endpoint
.get());
303 peer_channel_endpoint
->ReplaceClient(relayer
.get(), 1);
305 endpoints_
[port
]->Close();
306 endpoints_
[port
].reset();
307 // No need to call |Close()| after |ReleaseChannelEndpoint()|.
308 endpoints_
[peer_port
].reset();
312 SerializedMessagePipe
* s
= static_cast<SerializedMessagePipe
*>(destination
);
314 // Convert the local endpoint to a proxy endpoint (moving the message queue)
315 // and attach it to the channel.
316 s
->receiver_endpoint_id
=
317 channel
->AttachAndRunEndpoint(channel_endpoint
, false);
318 DVLOG(2) << "Serializing message pipe (remote ID = "
319 << s
->receiver_endpoint_id
<< ")";
320 *actual_size
= sizeof(SerializedMessagePipe
);
324 bool MessagePipe::OnReadMessage(unsigned port
, MessageInTransit
* message
) {
325 base::AutoLock
locker(lock_
);
327 if (!endpoints_
[port
]) {
328 // This will happen only on the rare occasion that the call to
329 // |OnReadMessage()| is racing with us calling
330 // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
331 // and the |ChannelEndpoint| can retry (calling the new client's
332 // |OnReadMessage()|).
336 // This is called when the |ChannelEndpoint| for the
337 // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
338 // We need to pass this message on to its peer port (typically a
339 // |LocalMessagePipeEndpoint|).
340 MojoResult result
= EnqueueMessageNoLock(GetPeerPort(port
),
341 make_scoped_ptr(message
), nullptr);
342 DLOG_IF(WARNING
, result
!= MOJO_RESULT_OK
)
343 << "EnqueueMessageNoLock() failed (result = " << result
<< ")";
347 void MessagePipe::OnDetachFromChannel(unsigned port
) {
351 MessagePipe::MessagePipe() {
354 MessagePipe::~MessagePipe() {
355 // Owned by the dispatchers. The owning dispatchers should only release us via
356 // their |Close()| method, which should inform us of being closed via our
357 // |Close()|. Thus these should already be null.
358 DCHECK(!endpoints_
[0]);
359 DCHECK(!endpoints_
[1]);
362 MojoResult
MessagePipe::EnqueueMessageNoLock(
364 scoped_ptr
<MessageInTransit
> message
,
365 std::vector
<DispatcherTransport
>* transports
) {
366 DCHECK(port
== 0 || port
== 1);
369 DCHECK_EQ(message
->type(), MessageInTransit::kTypeEndpoint
);
370 DCHECK(endpoints_
[GetPeerPort(port
)]);
372 // The destination port need not be open, unlike the source port.
373 if (!endpoints_
[port
])
374 return MOJO_RESULT_FAILED_PRECONDITION
;
377 MojoResult result
= AttachTransportsNoLock(port
, message
.get(), transports
);
378 if (result
!= MOJO_RESULT_OK
)
382 // The endpoint's |EnqueueMessage()| may not report failure.
383 endpoints_
[port
]->EnqueueMessage(message
.Pass());
384 return MOJO_RESULT_OK
;
387 MojoResult
MessagePipe::AttachTransportsNoLock(
389 MessageInTransit
* message
,
390 std::vector
<DispatcherTransport
>* transports
) {
391 DCHECK(!message
->has_dispatchers());
393 // You're not allowed to send either handle to a message pipe over the message
394 // pipe, so check for this. (The case of trying to write a handle to itself is
395 // taken care of by |Core|. That case kind of makes sense, but leads to
396 // complications if, e.g., both sides try to do the same thing with their
397 // respective handles simultaneously. The other case, of trying to write the
398 // peer handle to a handle, doesn't make sense -- since no handle will be
399 // available to read the message from.)
400 for (size_t i
= 0; i
< transports
->size(); i
++) {
401 if (!(*transports
)[i
].is_valid())
403 if ((*transports
)[i
].GetType() == Dispatcher::kTypeMessagePipe
) {
404 MessagePipeDispatcherTransport
mp_transport((*transports
)[i
]);
405 if (mp_transport
.GetMessagePipe() == this) {
406 // The other case should have been disallowed by |Core|. (Note: |port|
407 // is the peer port of the handle given to |WriteMessage()|.)
408 DCHECK_EQ(mp_transport
.GetPort(), port
);
409 return MOJO_RESULT_INVALID_ARGUMENT
;
414 // Clone the dispatchers and attach them to the message. (This must be done as
415 // a separate loop, since we want to leave the dispatchers alone on failure.)
416 scoped_ptr
<DispatcherVector
> dispatchers(new DispatcherVector());
417 dispatchers
->reserve(transports
->size());
418 for (size_t i
= 0; i
< transports
->size(); i
++) {
419 if ((*transports
)[i
].is_valid()) {
420 dispatchers
->push_back(
421 (*transports
)[i
].CreateEquivalentDispatcherAndClose());
423 LOG(WARNING
) << "Enqueueing null dispatcher";
424 dispatchers
->push_back(nullptr);
427 message
->SetDispatchers(dispatchers
.Pass());
428 return MOJO_RESULT_OK
;
431 } // namespace system