Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / mojo / system / message_pipe.cc
blob13776d3d76558f6261f90bd964c1ad3dfa6190d4
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/message_pipe.h"
7 #include "base/logging.h"
8 #include "mojo/system/channel.h"
9 #include "mojo/system/local_message_pipe_endpoint.h"
10 #include "mojo/system/message_in_transit.h"
11 #include "mojo/system/message_pipe_dispatcher.h"
12 #include "mojo/system/message_pipe_endpoint.h"
13 #include "mojo/system/proxy_message_pipe_endpoint.h"
15 namespace mojo {
16 namespace system {
18 MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint0,
19 scoped_ptr<MessagePipeEndpoint> endpoint1) {
20 endpoints_[0].reset(endpoint0.release());
21 endpoints_[1].reset(endpoint1.release());
24 // static
25 MessagePipe* MessagePipe::CreateLocalLocal() {
26 return new MessagePipe(
27 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
28 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
31 // static
32 MessagePipe* MessagePipe::CreateLocalProxy() {
33 return new MessagePipe(
34 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint),
35 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint));
38 // static
39 MessagePipe* MessagePipe::CreateProxyLocal() {
40 return new MessagePipe(
41 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint),
42 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint));
45 // static
46 unsigned MessagePipe::GetPeerPort(unsigned port) {
47 DCHECK(port == 0 || port == 1);
48 return port ^ 1;
51 MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
52 DCHECK(port == 0 || port == 1);
53 base::AutoLock locker(lock_);
54 DCHECK(endpoints_[port]);
56 return endpoints_[port]->GetType();
59 void MessagePipe::CancelAllWaiters(unsigned port) {
60 DCHECK(port == 0 || port == 1);
62 base::AutoLock locker(lock_);
63 DCHECK(endpoints_[port]);
64 endpoints_[port]->CancelAllWaiters();
67 void MessagePipe::Close(unsigned port) {
68 DCHECK(port == 0 || port == 1);
70 unsigned destination_port = GetPeerPort(port);
72 base::AutoLock locker(lock_);
73 DCHECK(endpoints_[port]);
75 endpoints_[port]->Close();
76 if (endpoints_[destination_port]) {
77 if (!endpoints_[destination_port]->OnPeerClose())
78 endpoints_[destination_port].reset();
80 endpoints_[port].reset();
83 // TODO(vtl): Handle flags.
84 MojoResult MessagePipe::WriteMessage(
85 unsigned port,
86 UserPointer<const void> bytes,
87 uint32_t num_bytes,
88 std::vector<DispatcherTransport>* transports,
89 MojoWriteMessageFlags flags) {
90 DCHECK(port == 0 || port == 1);
91 return EnqueueMessageInternal(
92 GetPeerPort(port),
93 make_scoped_ptr(new MessageInTransit(
94 MessageInTransit::kTypeMessagePipeEndpoint,
95 MessageInTransit::kSubtypeMessagePipeEndpointData,
96 num_bytes,
97 bytes)),
98 transports);
101 MojoResult MessagePipe::ReadMessage(unsigned port,
102 UserPointer<void> bytes,
103 UserPointer<uint32_t> num_bytes,
104 DispatcherVector* dispatchers,
105 uint32_t* num_dispatchers,
106 MojoReadMessageFlags flags) {
107 DCHECK(port == 0 || port == 1);
109 base::AutoLock locker(lock_);
110 DCHECK(endpoints_[port]);
112 return endpoints_[port]->ReadMessage(
113 bytes, num_bytes, dispatchers, num_dispatchers, flags);
116 HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
117 DCHECK(port == 0 || port == 1);
119 base::AutoLock locker(const_cast<base::Lock&>(lock_));
120 DCHECK(endpoints_[port]);
122 return endpoints_[port]->GetHandleSignalsState();
125 MojoResult MessagePipe::AddWaiter(unsigned port,
126 Waiter* waiter,
127 MojoHandleSignals signals,
128 uint32_t context,
129 HandleSignalsState* signals_state) {
130 DCHECK(port == 0 || port == 1);
132 base::AutoLock locker(lock_);
133 DCHECK(endpoints_[port]);
135 return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
138 void MessagePipe::RemoveWaiter(unsigned port,
139 Waiter* waiter,
140 HandleSignalsState* signals_state) {
141 DCHECK(port == 0 || port == 1);
143 base::AutoLock locker(lock_);
144 DCHECK(endpoints_[port]);
146 endpoints_[port]->RemoveWaiter(waiter, signals_state);
149 void MessagePipe::ConvertLocalToProxy(unsigned port) {
150 DCHECK(port == 0 || port == 1);
152 base::AutoLock locker(lock_);
153 DCHECK(endpoints_[port]);
154 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
156 bool is_peer_open = !!endpoints_[GetPeerPort(port)];
158 // TODO(vtl): Hopefully this will work if the peer has been closed and when
159 // the peer is local. If the peer is remote, we should do something more
160 // sophisticated.
161 DCHECK(!is_peer_open ||
162 endpoints_[GetPeerPort(port)]->GetType() ==
163 MessagePipeEndpoint::kTypeLocal);
165 scoped_ptr<MessagePipeEndpoint> replacement_endpoint(
166 new ProxyMessagePipeEndpoint(
167 static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get()),
168 is_peer_open));
169 endpoints_[port].swap(replacement_endpoint);
172 MojoResult MessagePipe::EnqueueMessage(unsigned port,
173 scoped_ptr<MessageInTransit> message) {
174 return EnqueueMessageInternal(port, message.Pass(), NULL);
177 bool MessagePipe::Attach(unsigned port,
178 scoped_refptr<Channel> channel,
179 MessageInTransit::EndpointId local_id) {
180 DCHECK(port == 0 || port == 1);
181 DCHECK(channel.get());
182 DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
184 base::AutoLock locker(lock_);
185 if (!endpoints_[port])
186 return false;
188 DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeProxy);
189 endpoints_[port]->Attach(channel, local_id);
190 return true;
193 void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
194 DCHECK(port == 0 || port == 1);
195 DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
197 base::AutoLock locker(lock_);
198 DCHECK(endpoints_[port]);
199 if (!endpoints_[port]->Run(remote_id))
200 endpoints_[port].reset();
203 void MessagePipe::OnRemove(unsigned port) {
204 unsigned destination_port = GetPeerPort(port);
206 base::AutoLock locker(lock_);
207 // A |OnPeerClose()| can come in first, before |OnRemove()| gets called.
208 if (!endpoints_[port])
209 return;
211 endpoints_[port]->OnRemove();
212 if (endpoints_[destination_port]) {
213 if (!endpoints_[destination_port]->OnPeerClose())
214 endpoints_[destination_port].reset();
216 endpoints_[port].reset();
219 MessagePipe::~MessagePipe() {
220 // Owned by the dispatchers. The owning dispatchers should only release us via
221 // their |Close()| method, which should inform us of being closed via our
222 // |Close()|. Thus these should already be null.
223 DCHECK(!endpoints_[0]);
224 DCHECK(!endpoints_[1]);
227 MojoResult MessagePipe::EnqueueMessageInternal(
228 unsigned port,
229 scoped_ptr<MessageInTransit> message,
230 std::vector<DispatcherTransport>* transports) {
231 DCHECK(port == 0 || port == 1);
232 DCHECK(message);
234 if (message->type() == MessageInTransit::kTypeMessagePipe) {
235 DCHECK(!transports);
236 return HandleControlMessage(port, message.Pass());
239 DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
241 base::AutoLock locker(lock_);
242 DCHECK(endpoints_[GetPeerPort(port)]);
244 // The destination port need not be open, unlike the source port.
245 if (!endpoints_[port])
246 return MOJO_RESULT_FAILED_PRECONDITION;
248 if (transports) {
249 MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
250 if (result != MOJO_RESULT_OK)
251 return result;
254 // The endpoint's |EnqueueMessage()| may not report failure.
255 endpoints_[port]->EnqueueMessage(message.Pass());
256 return MOJO_RESULT_OK;
259 MojoResult MessagePipe::AttachTransportsNoLock(
260 unsigned port,
261 MessageInTransit* message,
262 std::vector<DispatcherTransport>* transports) {
263 DCHECK(!message->has_dispatchers());
265 // You're not allowed to send either handle to a message pipe over the message
266 // pipe, so check for this. (The case of trying to write a handle to itself is
267 // taken care of by |Core|. That case kind of makes sense, but leads to
268 // complications if, e.g., both sides try to do the same thing with their
269 // respective handles simultaneously. The other case, of trying to write the
270 // peer handle to a handle, doesn't make sense -- since no handle will be
271 // available to read the message from.)
272 for (size_t i = 0; i < transports->size(); i++) {
273 if (!(*transports)[i].is_valid())
274 continue;
275 if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
276 MessagePipeDispatcherTransport mp_transport((*transports)[i]);
277 if (mp_transport.GetMessagePipe() == this) {
278 // The other case should have been disallowed by |Core|. (Note: |port|
279 // is the peer port of the handle given to |WriteMessage()|.)
280 DCHECK_EQ(mp_transport.GetPort(), port);
281 return MOJO_RESULT_INVALID_ARGUMENT;
286 // Clone the dispatchers and attach them to the message. (This must be done as
287 // a separate loop, since we want to leave the dispatchers alone on failure.)
288 scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
289 dispatchers->reserve(transports->size());
290 for (size_t i = 0; i < transports->size(); i++) {
291 if ((*transports)[i].is_valid()) {
292 dispatchers->push_back(
293 (*transports)[i].CreateEquivalentDispatcherAndClose());
294 } else {
295 LOG(WARNING) << "Enqueueing null dispatcher";
296 dispatchers->push_back(scoped_refptr<Dispatcher>());
299 message->SetDispatchers(dispatchers.Pass());
300 return MOJO_RESULT_OK;
303 MojoResult MessagePipe::HandleControlMessage(
304 unsigned /*port*/,
305 scoped_ptr<MessageInTransit> message) {
306 LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
307 << message->subtype();
308 return MOJO_RESULT_UNKNOWN;
311 } // namespace system
312 } // namespace mojo