1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
15 using blink::WebMessagePortChannel
;
16 using blink::WebMessagePortChannelArray
;
17 using blink::WebMessagePortChannelClient
;
18 using blink::WebString
;
22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23 base::MessageLoopProxy
* child_thread_loop
)
25 route_id_(MSG_ROUTING_NONE
),
26 message_port_id_(MSG_ROUTING_NONE
),
27 child_thread_loop_(child_thread_loop
) {
32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
35 base::MessageLoopProxy
* child_thread_loop
)
38 message_port_id_(message_port_id
),
39 child_thread_loop_(child_thread_loop
) {
44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45 // If we have any queued messages with attached ports, manually destroy them.
46 while (!message_queue_
.empty()) {
47 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
48 message_queue_
.front().ports
;
49 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
50 channel_array
[i
]->destroy();
55 if (message_port_id_
!= MSG_ROUTING_NONE
)
56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_
));
58 if (route_id_
!= MSG_ROUTING_NONE
)
59 ChildThread::current()->RemoveRoute(route_id_
);
62 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
63 // Must lock here since client_ is called on the main thread.
64 base::AutoLock
auto_lock(lock_
);
68 void WebMessagePortChannelImpl::destroy() {
71 // Release the object on the main thread, since the destructor might want to
72 // send an IPC, and that has to happen on the main thread.
73 child_thread_loop_
->ReleaseSoon(FROM_HERE
, this);
76 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel
* channel
) {
77 // The message port ids might not be set up yet, if this channel wasn't
78 // created on the main thread. So need to wait until we're on the main thread
79 // before getting the other message port id.
80 scoped_refptr
<WebMessagePortChannelImpl
> webchannel(
81 static_cast<WebMessagePortChannelImpl
*>(channel
));
85 void WebMessagePortChannelImpl::postMessage(
86 const WebString
& message
,
87 WebMessagePortChannelArray
* channels
) {
88 if (!child_thread_loop_
->BelongsToCurrentThread()) {
89 child_thread_loop_
->PostTask(
92 &WebMessagePortChannelImpl::postMessage
, this, message
, channels
));
96 std::vector
<int> message_port_ids(channels
? channels
->size() : 0);
98 // Extract the port IDs from the source array, then free it.
99 for (size_t i
= 0; i
< channels
->size(); ++i
) {
100 WebMessagePortChannelImpl
* webchannel
=
101 static_cast<WebMessagePortChannelImpl
*>((*channels
)[i
]);
102 message_port_ids
[i
] = webchannel
->message_port_id();
103 webchannel
->QueueMessages();
104 DCHECK(message_port_ids
[i
] != MSG_ROUTING_NONE
);
109 IPC::Message
* msg
= new MessagePortHostMsg_PostMessage(
110 message_port_id_
, message
, message_port_ids
);
114 bool WebMessagePortChannelImpl::tryGetMessage(
116 WebMessagePortChannelArray
& channels
) {
117 base::AutoLock
auto_lock(lock_
);
118 if (message_queue_
.empty())
121 *message
= message_queue_
.front().message
;
122 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
123 message_queue_
.front().ports
;
124 WebMessagePortChannelArray
result_ports(channel_array
.size());
125 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
126 result_ports
[i
] = channel_array
[i
];
129 channels
.swap(result_ports
);
130 message_queue_
.pop();
134 void WebMessagePortChannelImpl::Init() {
135 if (!child_thread_loop_
->BelongsToCurrentThread()) {
136 child_thread_loop_
->PostTask(
137 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::Init
, this));
141 if (route_id_
== MSG_ROUTING_NONE
) {
142 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
143 Send(new MessagePortHostMsg_CreateMessagePort(
144 &route_id_
, &message_port_id_
));
147 ChildThread::current()->AddRoute(route_id_
, this);
150 void WebMessagePortChannelImpl::Entangle(
151 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
152 if (!child_thread_loop_
->BelongsToCurrentThread()) {
153 child_thread_loop_
->PostTask(
155 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
159 Send(new MessagePortHostMsg_Entangle(
160 message_port_id_
, channel
->message_port_id()));
163 void WebMessagePortChannelImpl::QueueMessages() {
164 if (!child_thread_loop_
->BelongsToCurrentThread()) {
165 child_thread_loop_
->PostTask(
166 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
169 // This message port is being sent elsewhere (perhaps to another process).
170 // The new endpoint needs to receive the queued messages, including ones that
171 // could still be in-flight. So we tell the browser to queue messages, and it
172 // sends us an ack, whose receipt we know means that no more messages are
173 // in-flight. We then send the queued messages to the browser, which prepends
174 // them to the ones it queued and it sends them to the new endpoint.
175 Send(new MessagePortHostMsg_QueueMessages(message_port_id_
));
177 // The process could potentially go away while we're still waiting for
178 // in-flight messages. Ensure it stays alive.
179 ChildProcess::current()->AddRefProcess();
182 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
183 if (!child_thread_loop_
->BelongsToCurrentThread()) {
184 DCHECK(!message
->is_sync());
185 child_thread_loop_
->PostTask(
187 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
191 ChildThread::current()->Send(message
);
194 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
196 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
197 IPC_MESSAGE_HANDLER(MessagePortMsg_Message
, OnMessage
)
198 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued
, OnMessagesQueued
)
199 IPC_MESSAGE_UNHANDLED(handled
= false)
200 IPC_END_MESSAGE_MAP()
204 void WebMessagePortChannelImpl::OnMessage(
205 const base::string16
& message
,
206 const std::vector
<int>& sent_message_port_ids
,
207 const std::vector
<int>& new_routing_ids
) {
208 base::AutoLock
auto_lock(lock_
);
210 msg
.message
= message
;
211 if (!sent_message_port_ids
.empty()) {
212 msg
.ports
.resize(sent_message_port_ids
.size());
213 for (size_t i
= 0; i
< sent_message_port_ids
.size(); ++i
) {
214 msg
.ports
[i
] = new WebMessagePortChannelImpl(new_routing_ids
[i
],
215 sent_message_port_ids
[i
],
216 child_thread_loop_
.get());
220 bool was_empty
= message_queue_
.empty();
221 message_queue_
.push(msg
);
222 if (client_
&& was_empty
)
223 client_
->messageAvailable();
226 void WebMessagePortChannelImpl::OnMessagesQueued() {
227 std::vector
<QueuedMessage
> queued_messages
;
230 base::AutoLock
auto_lock(lock_
);
231 queued_messages
.reserve(message_queue_
.size());
232 while (!message_queue_
.empty()) {
233 base::string16 message
= message_queue_
.front().message
;
234 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
235 message_queue_
.front().ports
;
236 std::vector
<int> port_ids(channel_array
.size());
237 for (size_t i
= 0; i
< channel_array
.size(); ++i
) {
238 port_ids
[i
] = channel_array
[i
]->message_port_id();
239 channel_array
[i
]->QueueMessages();
241 queued_messages
.push_back(std::make_pair(message
, port_ids
));
242 message_queue_
.pop();
246 Send(new MessagePortHostMsg_SendQueuedMessages(
247 message_port_id_
, queued_messages
));
249 message_port_id_
= MSG_ROUTING_NONE
;
252 ChildProcess::current()->ReleaseProcess();
255 WebMessagePortChannelImpl::Message::Message() {}
257 WebMessagePortChannelImpl::Message::~Message() {}
259 } // namespace content