1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe_dispatcher.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/constants.h"
10 #include "mojo/system/local_message_pipe_endpoint.h"
11 #include "mojo/system/memory.h"
12 #include "mojo/system/message_in_transit.h"
13 #include "mojo/system/message_pipe.h"
14 #include "mojo/system/options_validation.h"
15 #include "mojo/system/proxy_message_pipe_endpoint.h"
22 const unsigned kInvalidPort
= static_cast<unsigned>(-1);
24 struct SerializedMessagePipeDispatcher
{
25 MessageInTransit::EndpointId endpoint_id
;
30 // MessagePipeDispatcher -------------------------------------------------------
33 const MojoCreateMessagePipeOptions
34 MessagePipeDispatcher::kDefaultCreateOptions
= {
35 static_cast<uint32_t>(sizeof(MojoCreateMessagePipeOptions
)),
36 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE
};
38 MessagePipeDispatcher::MessagePipeDispatcher(
39 const MojoCreateMessagePipeOptions
& /*validated_options*/)
40 : port_(kInvalidPort
) {
44 MojoResult
MessagePipeDispatcher::ValidateCreateOptions(
45 UserPointer
<const MojoCreateMessagePipeOptions
> in_options
,
46 MojoCreateMessagePipeOptions
* out_options
) {
47 const MojoCreateMessagePipeOptionsFlags kKnownFlags
=
48 MOJO_CREATE_MESSAGE_PIPE_OPTIONS_FLAG_NONE
;
50 *out_options
= kDefaultCreateOptions
;
51 if (in_options
.IsNull())
52 return MOJO_RESULT_OK
;
54 UserOptionsReader
<MojoCreateMessagePipeOptions
> reader(in_options
);
55 if (!reader
.is_valid())
56 return MOJO_RESULT_INVALID_ARGUMENT
;
58 if (!OPTIONS_STRUCT_HAS_MEMBER(MojoCreateMessagePipeOptions
, flags
, reader
))
59 return MOJO_RESULT_OK
;
60 if ((reader
.options().flags
& ~kKnownFlags
))
61 return MOJO_RESULT_UNIMPLEMENTED
;
62 out_options
->flags
= reader
.options().flags
;
64 // Checks for fields beyond |flags|:
66 // (Nothing here yet.)
68 return MOJO_RESULT_OK
;
71 void MessagePipeDispatcher::Init(scoped_refptr
<MessagePipe
> message_pipe
,
73 DCHECK(message_pipe
.get());
74 DCHECK(port
== 0 || port
== 1);
76 message_pipe_
= message_pipe
;
80 Dispatcher::Type
MessagePipeDispatcher::GetType() const {
81 return kTypeMessagePipe
;
85 std::pair
<scoped_refptr
<MessagePipeDispatcher
>, scoped_refptr
<MessagePipe
> >
86 MessagePipeDispatcher::CreateRemoteMessagePipe() {
87 scoped_refptr
<MessagePipe
> message_pipe(MessagePipe::CreateLocalProxy());
88 scoped_refptr
<MessagePipeDispatcher
> dispatcher(
89 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
90 dispatcher
->Init(message_pipe
, 0);
92 return std::make_pair(dispatcher
, message_pipe
);
96 scoped_refptr
<MessagePipeDispatcher
> MessagePipeDispatcher::Deserialize(
100 if (size
!= sizeof(SerializedMessagePipeDispatcher
)) {
101 LOG(ERROR
) << "Invalid serialized message pipe dispatcher";
102 return scoped_refptr
<MessagePipeDispatcher
>();
105 std::pair
<scoped_refptr
<MessagePipeDispatcher
>, scoped_refptr
<MessagePipe
> >
106 remote_message_pipe
= CreateRemoteMessagePipe();
108 MessageInTransit::EndpointId remote_id
=
109 static_cast<const SerializedMessagePipeDispatcher
*>(source
)->endpoint_id
;
110 if (remote_id
== MessageInTransit::kInvalidEndpointId
) {
111 // This means that the other end was closed, and there were no messages
113 // TODO(vtl): This is wrong. We should produce a "dead" message pipe
116 return scoped_refptr
<MessagePipeDispatcher
>();
118 MessageInTransit::EndpointId local_id
=
119 channel
->AttachMessagePipeEndpoint(remote_message_pipe
.second
, 1);
120 if (local_id
== MessageInTransit::kInvalidEndpointId
) {
121 LOG(ERROR
) << "Failed to deserialize message pipe dispatcher (failed to "
122 "attach; remote ID = " << remote_id
<< ")";
123 return scoped_refptr
<MessagePipeDispatcher
>();
125 DVLOG(2) << "Deserializing message pipe dispatcher (remote ID = " << remote_id
126 << ", new local ID = " << local_id
<< ")";
128 if (!channel
->RunMessagePipeEndpoint(local_id
, remote_id
)) {
129 // In general, this shouldn't fail, since we generated |local_id| locally.
131 return scoped_refptr
<MessagePipeDispatcher
>();
134 // TODO(vtl): FIXME -- Need some error handling here.
135 channel
->RunRemoteMessagePipeEndpoint(local_id
, remote_id
);
136 return remote_message_pipe
.first
;
139 MessagePipeDispatcher::~MessagePipeDispatcher() {
140 // |Close()|/|CloseImplNoLock()| should have taken care of the pipe.
141 DCHECK(!message_pipe_
.get());
144 MessagePipe
* MessagePipeDispatcher::GetMessagePipeNoLock() const {
145 lock().AssertAcquired();
146 return message_pipe_
.get();
149 unsigned MessagePipeDispatcher::GetPortNoLock() const {
150 lock().AssertAcquired();
154 void MessagePipeDispatcher::CancelAllWaitersNoLock() {
155 lock().AssertAcquired();
156 message_pipe_
->CancelAllWaiters(port_
);
159 void MessagePipeDispatcher::CloseImplNoLock() {
160 lock().AssertAcquired();
161 message_pipe_
->Close(port_
);
162 message_pipe_
= NULL
;
163 port_
= kInvalidPort
;
166 scoped_refptr
<Dispatcher
>
167 MessagePipeDispatcher::CreateEquivalentDispatcherAndCloseImplNoLock() {
168 lock().AssertAcquired();
170 // TODO(vtl): Currently, there are no options, so we just use
171 // |kDefaultCreateOptions|. Eventually, we'll have to duplicate the options
173 scoped_refptr
<MessagePipeDispatcher
> rv
=
174 new MessagePipeDispatcher(kDefaultCreateOptions
);
175 rv
->Init(message_pipe_
, port_
);
176 message_pipe_
= NULL
;
177 port_
= kInvalidPort
;
178 return scoped_refptr
<Dispatcher
>(rv
.get());
181 MojoResult
MessagePipeDispatcher::WriteMessageImplNoLock(
182 UserPointer
<const void> bytes
,
184 std::vector
<DispatcherTransport
>* transports
,
185 MojoWriteMessageFlags flags
) {
186 DCHECK(!transports
|| (transports
->size() > 0 &&
187 transports
->size() <= kMaxMessageNumHandles
));
189 lock().AssertAcquired();
191 if (num_bytes
> kMaxMessageNumBytes
)
192 return MOJO_RESULT_RESOURCE_EXHAUSTED
;
194 return message_pipe_
->WriteMessage(
195 port_
, bytes
, num_bytes
, transports
, flags
);
198 MojoResult
MessagePipeDispatcher::ReadMessageImplNoLock(
199 UserPointer
<void> bytes
,
200 UserPointer
<uint32_t> num_bytes
,
201 DispatcherVector
* dispatchers
,
202 uint32_t* num_dispatchers
,
203 MojoReadMessageFlags flags
) {
204 lock().AssertAcquired();
205 return message_pipe_
->ReadMessage(
206 port_
, bytes
, num_bytes
, dispatchers
, num_dispatchers
, flags
);
209 HandleSignalsState
MessagePipeDispatcher::GetHandleSignalsStateImplNoLock()
211 lock().AssertAcquired();
212 return message_pipe_
->GetHandleSignalsState(port_
);
215 MojoResult
MessagePipeDispatcher::AddWaiterImplNoLock(
217 MojoHandleSignals signals
,
219 HandleSignalsState
* signals_state
) {
220 lock().AssertAcquired();
221 return message_pipe_
->AddWaiter(
222 port_
, waiter
, signals
, context
, signals_state
);
225 void MessagePipeDispatcher::RemoveWaiterImplNoLock(
227 HandleSignalsState
* signals_state
) {
228 lock().AssertAcquired();
229 message_pipe_
->RemoveWaiter(port_
, waiter
, signals_state
);
232 void MessagePipeDispatcher::StartSerializeImplNoLock(
233 Channel
* /*channel*/,
235 size_t* max_platform_handles
) {
236 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
237 *max_size
= sizeof(SerializedMessagePipeDispatcher
);
238 *max_platform_handles
= 0;
241 bool MessagePipeDispatcher::EndSerializeAndCloseImplNoLock(
245 embedder::PlatformHandleVector
* /*platform_handles*/) {
246 DCHECK(HasOneRef()); // Only one ref => no need to take the lock.
248 // Convert the local endpoint to a proxy endpoint (moving the message queue).
249 message_pipe_
->ConvertLocalToProxy(port_
);
251 // Attach the new proxy endpoint to the channel.
252 MessageInTransit::EndpointId endpoint_id
=
253 channel
->AttachMessagePipeEndpoint(message_pipe_
, port_
);
254 // Note: It's okay to get an endpoint ID of |kInvalidEndpointId|. (It's
255 // possible that the other endpoint -- the one that we're not sending -- was
256 // closed in the intervening time.) In that case, we need to deserialize a
257 // "dead" message pipe dispatcher on the other end. (Note that this is
258 // different from just producing |MOJO_HANDLE_INVALID|.)
259 DVLOG(2) << "Serializing message pipe dispatcher (local ID = " << endpoint_id
262 // We now have a local ID. Before we can run the proxy endpoint, we need to
263 // get an ack back from the other side with the remote ID.
264 static_cast<SerializedMessagePipeDispatcher
*>(destination
)->endpoint_id
=
267 message_pipe_
= NULL
;
268 port_
= kInvalidPort
;
270 *actual_size
= sizeof(SerializedMessagePipeDispatcher
);
274 // MessagePipeDispatcherTransport ----------------------------------------------
276 MessagePipeDispatcherTransport::MessagePipeDispatcherTransport(
277 DispatcherTransport transport
)
278 : DispatcherTransport(transport
) {
279 DCHECK_EQ(message_pipe_dispatcher()->GetType(), Dispatcher::kTypeMessagePipe
);
282 } // namespace system