1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
18 MessagePipe::MessagePipe(scoped_ptr
<MessagePipeEndpoint
> endpoint0
,
19 scoped_ptr
<MessagePipeEndpoint
> endpoint1
) {
20 endpoints_
[0].reset(endpoint0
.release());
21 endpoints_
[1].reset(endpoint1
.release());
25 MessagePipe
* MessagePipe::CreateLocalLocal() {
26 return new MessagePipe(
27 scoped_ptr
<MessagePipeEndpoint
>(new LocalMessagePipeEndpoint
),
28 scoped_ptr
<MessagePipeEndpoint
>(new LocalMessagePipeEndpoint
));
32 MessagePipe
* MessagePipe::CreateLocalProxy() {
33 return new MessagePipe(
34 scoped_ptr
<MessagePipeEndpoint
>(new LocalMessagePipeEndpoint
),
35 scoped_ptr
<MessagePipeEndpoint
>(new ProxyMessagePipeEndpoint
));
39 MessagePipe
* MessagePipe::CreateProxyLocal() {
40 return new MessagePipe(
41 scoped_ptr
<MessagePipeEndpoint
>(new ProxyMessagePipeEndpoint
),
42 scoped_ptr
<MessagePipeEndpoint
>(new LocalMessagePipeEndpoint
));
46 unsigned MessagePipe::GetPeerPort(unsigned port
) {
47 DCHECK(port
== 0 || port
== 1);
51 MessagePipeEndpoint::Type
MessagePipe::GetType(unsigned port
) {
52 DCHECK(port
== 0 || port
== 1);
53 base::AutoLock
locker(lock_
);
54 DCHECK(endpoints_
[port
]);
56 return endpoints_
[port
]->GetType();
59 void MessagePipe::CancelAllWaiters(unsigned port
) {
60 DCHECK(port
== 0 || port
== 1);
62 base::AutoLock
locker(lock_
);
63 DCHECK(endpoints_
[port
]);
64 endpoints_
[port
]->CancelAllWaiters();
67 void MessagePipe::Close(unsigned port
) {
68 DCHECK(port
== 0 || port
== 1);
70 unsigned destination_port
= GetPeerPort(port
);
72 base::AutoLock
locker(lock_
);
73 DCHECK(endpoints_
[port
]);
75 endpoints_
[port
]->Close();
76 if (endpoints_
[destination_port
]) {
77 if (!endpoints_
[destination_port
]->OnPeerClose())
78 endpoints_
[destination_port
].reset();
80 endpoints_
[port
].reset();
83 // TODO(vtl): Handle flags.
84 MojoResult
MessagePipe::WriteMessage(
86 UserPointer
<const void> bytes
,
88 std::vector
<DispatcherTransport
>* transports
,
89 MojoWriteMessageFlags flags
) {
90 DCHECK(port
== 0 || port
== 1);
91 return EnqueueMessageInternal(
93 make_scoped_ptr(new MessageInTransit(
94 MessageInTransit::kTypeMessagePipeEndpoint
,
95 MessageInTransit::kSubtypeMessagePipeEndpointData
,
101 MojoResult
MessagePipe::ReadMessage(unsigned port
,
102 UserPointer
<void> bytes
,
103 UserPointer
<uint32_t> num_bytes
,
104 DispatcherVector
* dispatchers
,
105 uint32_t* num_dispatchers
,
106 MojoReadMessageFlags flags
) {
107 DCHECK(port
== 0 || port
== 1);
109 base::AutoLock
locker(lock_
);
110 DCHECK(endpoints_
[port
]);
112 return endpoints_
[port
]->ReadMessage(
113 bytes
, num_bytes
, dispatchers
, num_dispatchers
, flags
);
116 HandleSignalsState
MessagePipe::GetHandleSignalsState(unsigned port
) const {
117 DCHECK(port
== 0 || port
== 1);
119 base::AutoLock
locker(const_cast<base::Lock
&>(lock_
));
120 DCHECK(endpoints_
[port
]);
122 return endpoints_
[port
]->GetHandleSignalsState();
125 MojoResult
MessagePipe::AddWaiter(unsigned port
,
127 MojoHandleSignals signals
,
129 HandleSignalsState
* signals_state
) {
130 DCHECK(port
== 0 || port
== 1);
132 base::AutoLock
locker(lock_
);
133 DCHECK(endpoints_
[port
]);
135 return endpoints_
[port
]->AddWaiter(waiter
, signals
, context
, signals_state
);
138 void MessagePipe::RemoveWaiter(unsigned port
,
140 HandleSignalsState
* signals_state
) {
141 DCHECK(port
== 0 || port
== 1);
143 base::AutoLock
locker(lock_
);
144 DCHECK(endpoints_
[port
]);
146 endpoints_
[port
]->RemoveWaiter(waiter
, signals_state
);
149 void MessagePipe::ConvertLocalToProxy(unsigned port
) {
150 DCHECK(port
== 0 || port
== 1);
152 base::AutoLock
locker(lock_
);
153 DCHECK(endpoints_
[port
]);
154 DCHECK_EQ(endpoints_
[port
]->GetType(), MessagePipeEndpoint::kTypeLocal
);
156 bool is_peer_open
= !!endpoints_
[GetPeerPort(port
)];
158 // TODO(vtl): Hopefully this will work if the peer has been closed and when
159 // the peer is local. If the peer is remote, we should do something more
161 DCHECK(!is_peer_open
||
162 endpoints_
[GetPeerPort(port
)]->GetType() ==
163 MessagePipeEndpoint::kTypeLocal
);
165 scoped_ptr
<MessagePipeEndpoint
> replacement_endpoint(
166 new ProxyMessagePipeEndpoint(
167 static_cast<LocalMessagePipeEndpoint
*>(endpoints_
[port
].get()),
169 endpoints_
[port
].swap(replacement_endpoint
);
172 MojoResult
MessagePipe::EnqueueMessage(unsigned port
,
173 scoped_ptr
<MessageInTransit
> message
) {
174 return EnqueueMessageInternal(port
, message
.Pass(), NULL
);
177 bool MessagePipe::Attach(unsigned port
,
178 scoped_refptr
<Channel
> channel
,
179 MessageInTransit::EndpointId local_id
) {
180 DCHECK(port
== 0 || port
== 1);
181 DCHECK(channel
.get());
182 DCHECK_NE(local_id
, MessageInTransit::kInvalidEndpointId
);
184 base::AutoLock
locker(lock_
);
185 if (!endpoints_
[port
])
188 DCHECK_EQ(endpoints_
[port
]->GetType(), MessagePipeEndpoint::kTypeProxy
);
189 endpoints_
[port
]->Attach(channel
, local_id
);
193 void MessagePipe::Run(unsigned port
, MessageInTransit::EndpointId remote_id
) {
194 DCHECK(port
== 0 || port
== 1);
195 DCHECK_NE(remote_id
, MessageInTransit::kInvalidEndpointId
);
197 base::AutoLock
locker(lock_
);
198 DCHECK(endpoints_
[port
]);
199 if (!endpoints_
[port
]->Run(remote_id
))
200 endpoints_
[port
].reset();
203 void MessagePipe::OnRemove(unsigned port
) {
204 unsigned destination_port
= GetPeerPort(port
);
206 base::AutoLock
locker(lock_
);
207 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
208 if (!endpoints_
[port
])
211 endpoints_
[port
]->OnRemove();
212 if (endpoints_
[destination_port
]) {
213 if (!endpoints_
[destination_port
]->OnPeerClose())
214 endpoints_
[destination_port
].reset();
216 endpoints_
[port
].reset();
219 MessagePipe::~MessagePipe() {
220 // Owned by the dispatchers. The owning dispatchers should only release us via
221 // their |Close()| method, which should inform us of being closed via our
222 // |Close()|. Thus these should already be null.
223 DCHECK(!endpoints_
[0]);
224 DCHECK(!endpoints_
[1]);
227 MojoResult
MessagePipe::EnqueueMessageInternal(
229 scoped_ptr
<MessageInTransit
> message
,
230 std::vector
<DispatcherTransport
>* transports
) {
231 DCHECK(port
== 0 || port
== 1);
234 if (message
->type() == MessageInTransit::kTypeMessagePipe
) {
236 return HandleControlMessage(port
, message
.Pass());
239 DCHECK_EQ(message
->type(), MessageInTransit::kTypeMessagePipeEndpoint
);
241 base::AutoLock
locker(lock_
);
242 DCHECK(endpoints_
[GetPeerPort(port
)]);
244 // The destination port need not be open, unlike the source port.
245 if (!endpoints_
[port
])
246 return MOJO_RESULT_FAILED_PRECONDITION
;
249 MojoResult result
= AttachTransportsNoLock(port
, message
.get(), transports
);
250 if (result
!= MOJO_RESULT_OK
)
254 // The endpoint's |EnqueueMessage()| may not report failure.
255 endpoints_
[port
]->EnqueueMessage(message
.Pass());
256 return MOJO_RESULT_OK
;
259 MojoResult
MessagePipe::AttachTransportsNoLock(
261 MessageInTransit
* message
,
262 std::vector
<DispatcherTransport
>* transports
) {
263 DCHECK(!message
->has_dispatchers());
265 // You're not allowed to send either handle to a message pipe over the message
266 // pipe, so check for this. (The case of trying to write a handle to itself is
267 // taken care of by |Core|. That case kind of makes sense, but leads to
268 // complications if, e.g., both sides try to do the same thing with their
269 // respective handles simultaneously. The other case, of trying to write the
270 // peer handle to a handle, doesn't make sense -- since no handle will be
271 // available to read the message from.)
272 for (size_t i
= 0; i
< transports
->size(); i
++) {
273 if (!(*transports
)[i
].is_valid())
275 if ((*transports
)[i
].GetType() == Dispatcher::kTypeMessagePipe
) {
276 MessagePipeDispatcherTransport
mp_transport((*transports
)[i
]);
277 if (mp_transport
.GetMessagePipe() == this) {
278 // The other case should have been disallowed by |Core|. (Note: |port|
279 // is the peer port of the handle given to |WriteMessage()|.)
280 DCHECK_EQ(mp_transport
.GetPort(), port
);
281 return MOJO_RESULT_INVALID_ARGUMENT
;
286 // Clone the dispatchers and attach them to the message. (This must be done as
287 // a separate loop, since we want to leave the dispatchers alone on failure.)
288 scoped_ptr
<DispatcherVector
> dispatchers(new DispatcherVector());
289 dispatchers
->reserve(transports
->size());
290 for (size_t i
= 0; i
< transports
->size(); i
++) {
291 if ((*transports
)[i
].is_valid()) {
292 dispatchers
->push_back(
293 (*transports
)[i
].CreateEquivalentDispatcherAndClose());
295 LOG(WARNING
) << "Enqueueing null dispatcher";
296 dispatchers
->push_back(scoped_refptr
<Dispatcher
>());
299 message
->SetDispatchers(dispatchers
.Pass());
300 return MOJO_RESULT_OK
;
303 MojoResult
MessagePipe::HandleControlMessage(
305 scoped_ptr
<MessageInTransit
> message
) {
306 LOG(WARNING
) << "Unrecognized MessagePipe control message subtype "
307 << message
->subtype();
308 return MOJO_RESULT_UNKNOWN
;
311 } // namespace system