1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
18 MessagePipe::MessagePipe(scoped_ptr
<MessagePipeEndpoint
> endpoint0
,
19 scoped_ptr
<MessagePipeEndpoint
> endpoint1
) {
20 endpoints_
[0].reset(endpoint0
.release());
21 endpoints_
[1].reset(endpoint1
.release());
24 MessagePipe::MessagePipe() {
25 endpoints_
[0].reset(new LocalMessagePipeEndpoint());
26 endpoints_
[1].reset(new LocalMessagePipeEndpoint());
30 unsigned MessagePipe::GetPeerPort(unsigned port
) {
31 DCHECK(port
== 0 || port
== 1);
35 MessagePipeEndpoint::Type
MessagePipe::GetType(unsigned port
) {
36 DCHECK(port
== 0 || port
== 1);
37 base::AutoLock
locker(lock_
);
38 DCHECK(endpoints_
[port
]);
40 return endpoints_
[port
]->GetType();
43 void MessagePipe::CancelAllWaiters(unsigned port
) {
44 DCHECK(port
== 0 || port
== 1);
46 base::AutoLock
locker(lock_
);
47 DCHECK(endpoints_
[port
]);
48 endpoints_
[port
]->CancelAllWaiters();
51 void MessagePipe::Close(unsigned port
) {
52 DCHECK(port
== 0 || port
== 1);
54 unsigned destination_port
= GetPeerPort(port
);
56 base::AutoLock
locker(lock_
);
57 DCHECK(endpoints_
[port
]);
59 endpoints_
[port
]->Close();
60 if (endpoints_
[destination_port
]) {
61 if (!endpoints_
[destination_port
]->OnPeerClose())
62 endpoints_
[destination_port
].reset();
64 endpoints_
[port
].reset();
67 // TODO(vtl): Handle flags.
68 MojoResult
MessagePipe::WriteMessage(
72 std::vector
<DispatcherTransport
>* transports
,
73 MojoWriteMessageFlags flags
) {
74 DCHECK(port
== 0 || port
== 1);
75 return EnqueueMessageInternal(
77 make_scoped_ptr(new MessageInTransit(
78 MessageInTransit::kTypeMessagePipeEndpoint
,
79 MessageInTransit::kSubtypeMessagePipeEndpointData
,
85 MojoResult
MessagePipe::ReadMessage(unsigned port
,
88 DispatcherVector
* dispatchers
,
89 uint32_t* num_dispatchers
,
90 MojoReadMessageFlags flags
) {
91 DCHECK(port
== 0 || port
== 1);
93 base::AutoLock
locker(lock_
);
94 DCHECK(endpoints_
[port
]);
96 return endpoints_
[port
]->ReadMessage(bytes
, num_bytes
, dispatchers
,
97 num_dispatchers
, flags
);
100 MojoResult
MessagePipe::AddWaiter(unsigned port
,
103 MojoResult wake_result
) {
104 DCHECK(port
== 0 || port
== 1);
106 base::AutoLock
locker(lock_
);
107 DCHECK(endpoints_
[port
]);
109 return endpoints_
[port
]->AddWaiter(waiter
, flags
, wake_result
);
112 void MessagePipe::RemoveWaiter(unsigned port
, Waiter
* waiter
) {
113 DCHECK(port
== 0 || port
== 1);
115 base::AutoLock
locker(lock_
);
116 DCHECK(endpoints_
[port
]);
118 endpoints_
[port
]->RemoveWaiter(waiter
);
121 void MessagePipe::ConvertLocalToProxy(unsigned port
) {
122 DCHECK(port
== 0 || port
== 1);
124 base::AutoLock
locker(lock_
);
125 DCHECK(endpoints_
[port
]);
126 DCHECK_EQ(endpoints_
[port
]->GetType(), MessagePipeEndpoint::kTypeLocal
);
128 bool is_peer_open
= !!endpoints_
[GetPeerPort(port
)];
130 // TODO(vtl): Hopefully this will work if the peer has been closed and when
131 // the peer is local. If the peer is remote, we should do something more
133 DCHECK(!is_peer_open
||
134 endpoints_
[GetPeerPort(port
)]->GetType() ==
135 MessagePipeEndpoint::kTypeLocal
);
137 scoped_ptr
<MessagePipeEndpoint
> replacement_endpoint(
138 new ProxyMessagePipeEndpoint(
139 static_cast<LocalMessagePipeEndpoint
*>(endpoints_
[port
].get()),
141 endpoints_
[port
].swap(replacement_endpoint
);
144 MojoResult
MessagePipe::EnqueueMessage(
146 scoped_ptr
<MessageInTransit
> message
) {
147 return EnqueueMessageInternal(port
, message
.Pass(), NULL
);
150 bool MessagePipe::Attach(unsigned port
,
151 scoped_refptr
<Channel
> channel
,
152 MessageInTransit::EndpointId local_id
) {
153 DCHECK(port
== 0 || port
== 1);
155 DCHECK_NE(local_id
, MessageInTransit::kInvalidEndpointId
);
157 base::AutoLock
locker(lock_
);
158 if (!endpoints_
[port
])
161 DCHECK_EQ(endpoints_
[port
]->GetType(), MessagePipeEndpoint::kTypeProxy
);
162 endpoints_
[port
]->Attach(channel
, local_id
);
166 void MessagePipe::Run(unsigned port
, MessageInTransit::EndpointId remote_id
) {
167 DCHECK(port
== 0 || port
== 1);
168 DCHECK_NE(remote_id
, MessageInTransit::kInvalidEndpointId
);
170 base::AutoLock
locker(lock_
);
171 DCHECK(endpoints_
[port
]);
172 if (!endpoints_
[port
]->Run(remote_id
))
173 endpoints_
[port
].reset();
176 void MessagePipe::OnRemove(unsigned port
) {
177 unsigned destination_port
= GetPeerPort(port
);
179 base::AutoLock
locker(lock_
);
180 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
181 if (!endpoints_
[port
])
184 endpoints_
[port
]->OnRemove();
185 if (endpoints_
[destination_port
]) {
186 if (!endpoints_
[destination_port
]->OnPeerClose())
187 endpoints_
[destination_port
].reset();
189 endpoints_
[port
].reset();
192 MessagePipe::~MessagePipe() {
193 // Owned by the dispatchers. The owning dispatchers should only release us via
194 // their |Close()| method, which should inform us of being closed via our
195 // |Close()|. Thus these should already be null.
196 DCHECK(!endpoints_
[0]);
197 DCHECK(!endpoints_
[1]);
200 MojoResult
MessagePipe::EnqueueMessageInternal(
202 scoped_ptr
<MessageInTransit
> message
,
203 std::vector
<DispatcherTransport
>* transports
) {
204 DCHECK(port
== 0 || port
== 1);
207 if (message
->type() == MessageInTransit::kTypeMessagePipe
) {
209 return HandleControlMessage(port
, message
.Pass());
212 DCHECK_EQ(message
->type(), MessageInTransit::kTypeMessagePipeEndpoint
);
214 base::AutoLock
locker(lock_
);
215 DCHECK(endpoints_
[GetPeerPort(port
)]);
217 // The destination port need not be open, unlike the source port.
218 if (!endpoints_
[port
])
219 return MOJO_RESULT_FAILED_PRECONDITION
;
222 MojoResult result
= AttachTransportsNoLock(port
, message
.get(), transports
);
223 if (result
!= MOJO_RESULT_OK
)
227 // The endpoint's |EnqueueMessage()| may not report failure.
228 endpoints_
[port
]->EnqueueMessage(message
.Pass());
229 return MOJO_RESULT_OK
;
232 MojoResult
MessagePipe::AttachTransportsNoLock(
234 MessageInTransit
* message
,
235 std::vector
<DispatcherTransport
>* transports
) {
236 DCHECK(!message
->has_dispatchers());
238 // You're not allowed to send either handle to a message pipe over the message
239 // pipe, so check for this. (The case of trying to write a handle to itself is
240 // taken care of by |Core|. That case kind of makes sense, but leads to
241 // complications if, e.g., both sides try to do the same thing with their
242 // respective handles simultaneously. The other case, of trying to write the
243 // peer handle to a handle, doesn't make sense -- since no handle will be
244 // available to read the message from.)
245 for (size_t i
= 0; i
< transports
->size(); i
++) {
246 if (!(*transports
)[i
].is_valid())
248 if ((*transports
)[i
].GetType() == Dispatcher::kTypeMessagePipe
) {
249 MessagePipeDispatcherTransport
mp_transport((*transports
)[i
]);
250 if (mp_transport
.GetMessagePipe() == this) {
251 // The other case should have been disallowed by |Core|. (Note: |port|
252 // is the peer port of the handle given to |WriteMessage()|.)
253 DCHECK_EQ(mp_transport
.GetPort(), port
);
254 return MOJO_RESULT_INVALID_ARGUMENT
;
259 // Clone the dispatchers and attach them to the message. (This must be done as
260 // a separate loop, since we want to leave the dispatchers alone on failure.)
261 scoped_ptr
<DispatcherVector
> dispatchers(new DispatcherVector());
262 dispatchers
->reserve(transports
->size());
263 for (size_t i
= 0; i
< transports
->size(); i
++) {
264 if ((*transports
)[i
].is_valid()) {
265 dispatchers
->push_back(
266 (*transports
)[i
].CreateEquivalentDispatcherAndClose());
268 LOG(WARNING
) << "Enqueueing null dispatcher";
269 dispatchers
->push_back(scoped_refptr
<Dispatcher
>());
272 message
->SetDispatchers(dispatchers
.Pass());
273 return MOJO_RESULT_OK
;
276 MojoResult
MessagePipe::HandleControlMessage(
278 scoped_ptr
<MessageInTransit
> message
) {
279 LOG(WARNING
) << "Unrecognized MessagePipe control message subtype "
280 << message
->subtype();
281 return MOJO_RESULT_UNKNOWN
;
284 } // namespace system