1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/system/proxy_message_pipe_endpoint.h"
9 #include "base/logging.h"
10 #include "mojo/system/channel.h"
11 #include "mojo/system/local_message_pipe_endpoint.h"
12 #include "mojo/system/message_pipe_dispatcher.h"
17 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint()
18 : local_id_(MessageInTransit::kInvalidEndpointId
),
19 remote_id_(MessageInTransit::kInvalidEndpointId
),
23 ProxyMessagePipeEndpoint::ProxyMessagePipeEndpoint(
24 LocalMessagePipeEndpoint
* local_message_pipe_endpoint
,
26 : local_id_(MessageInTransit::kInvalidEndpointId
),
27 remote_id_(MessageInTransit::kInvalidEndpointId
),
28 is_peer_open_(is_peer_open
),
29 paused_message_queue_(MessageInTransitQueue::PassContents(),
30 local_message_pipe_endpoint
->message_queue()) {
31 local_message_pipe_endpoint
->Close();
34 ProxyMessagePipeEndpoint::~ProxyMessagePipeEndpoint() {
35 DCHECK(!is_running());
36 DCHECK(!is_attached());
37 AssertConsistentState();
38 DCHECK(paused_message_queue_
.IsEmpty());
41 MessagePipeEndpoint::Type
ProxyMessagePipeEndpoint::GetType() const {
45 bool ProxyMessagePipeEndpoint::OnPeerClose() {
46 DCHECK(is_peer_open_
);
48 is_peer_open_
= false;
50 // If our outgoing message queue isn't empty, we shouldn't be destroyed yet.
51 if (!paused_message_queue_
.IsEmpty())
56 // If we're not running yet, we can't be destroyed yet, because we're
57 // still waiting for the "run" message from the other side.
67 // Note: We may have to enqueue messages even when our (local) peer isn't open
68 // -- it may have been written to and closed immediately, before we were ready.
69 // This case is handled in |Run()| (which will call us).
70 void ProxyMessagePipeEndpoint::EnqueueMessage(
71 scoped_ptr
<MessageInTransit
> message
) {
73 message
->SerializeAndCloseDispatchers(channel_
.get());
75 message
->set_source_id(local_id_
);
76 message
->set_destination_id(remote_id_
);
77 if (!channel_
->WriteMessage(message
.Pass()))
78 LOG(WARNING
) << "Failed to write message to channel";
80 paused_message_queue_
.AddMessage(message
.Pass());
84 void ProxyMessagePipeEndpoint::Attach(scoped_refptr
<Channel
> channel
,
85 MessageInTransit::EndpointId local_id
) {
87 DCHECK_NE(local_id
, MessageInTransit::kInvalidEndpointId
);
89 DCHECK(!is_attached());
91 AssertConsistentState();
94 AssertConsistentState();
97 bool ProxyMessagePipeEndpoint::Run(MessageInTransit::EndpointId remote_id
) {
98 // Assertions about arguments:
99 DCHECK_NE(remote_id
, MessageInTransit::kInvalidEndpointId
);
101 // Assertions about current state:
102 DCHECK(is_attached());
103 DCHECK(!is_running());
105 AssertConsistentState();
106 remote_id_
= remote_id
;
107 AssertConsistentState();
109 while (!paused_message_queue_
.IsEmpty())
110 EnqueueMessage(paused_message_queue_
.GetMessage());
113 return true; // Stay alive.
115 // We were just waiting to die.
120 void ProxyMessagePipeEndpoint::OnRemove() {
124 void ProxyMessagePipeEndpoint::Detach() {
125 DCHECK(is_attached());
127 AssertConsistentState();
128 channel_
->DetachMessagePipeEndpoint(local_id_
, remote_id_
);
130 local_id_
= MessageInTransit::kInvalidEndpointId
;
131 remote_id_
= MessageInTransit::kInvalidEndpointId
;
132 paused_message_queue_
.Clear();
133 AssertConsistentState();
137 void ProxyMessagePipeEndpoint::AssertConsistentState() const {
139 DCHECK_NE(local_id_
, MessageInTransit::kInvalidEndpointId
);
140 } else { // Not attached.
141 DCHECK_EQ(local_id_
, MessageInTransit::kInvalidEndpointId
);
142 DCHECK_EQ(remote_id_
, MessageInTransit::kInvalidEndpointId
);
147 } // namespace system