1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
15 using blink::WebMessagePortChannel
;
16 using blink::WebMessagePortChannelArray
;
17 using blink::WebMessagePortChannelClient
;
18 using blink::WebString
;
22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23 base::MessageLoopProxy
* child_thread_loop
)
25 route_id_(MSG_ROUTING_NONE
),
26 message_port_id_(MSG_ROUTING_NONE
),
27 child_thread_loop_(child_thread_loop
) {
32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
35 base::MessageLoopProxy
* child_thread_loop
)
38 message_port_id_(message_port_id
),
39 child_thread_loop_(child_thread_loop
) {
44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45 // If we have any queued messages with attached ports, manually destroy them.
46 while (!message_queue_
.empty()) {
47 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
48 message_queue_
.front().ports
;
49 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
50 channel_array
[i
]->destroy();
55 if (message_port_id_
!= MSG_ROUTING_NONE
)
56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_
));
58 if (route_id_
!= MSG_ROUTING_NONE
)
59 ChildThread::current()->GetRouter()->RemoveRoute(route_id_
);
63 void WebMessagePortChannelImpl::CreatePair(
64 base::MessageLoopProxy
* child_thread_loop
,
65 blink::WebMessagePortChannel
** channel1
,
66 blink::WebMessagePortChannel
** channel2
) {
67 WebMessagePortChannelImpl
* impl1
=
68 new WebMessagePortChannelImpl(child_thread_loop
);
69 WebMessagePortChannelImpl
* impl2
=
70 new WebMessagePortChannelImpl(child_thread_loop
);
72 impl1
->Entangle(impl2
);
73 impl2
->Entangle(impl1
);
80 std::vector
<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81 WebMessagePortChannelArray
* channels
) {
82 std::vector
<int> message_port_ids
;
84 message_port_ids
.resize(channels
->size());
85 // Extract the port IDs from the source array, then free it.
86 for (size_t i
= 0; i
< channels
->size(); ++i
) {
87 WebMessagePortChannelImpl
* webchannel
=
88 static_cast<WebMessagePortChannelImpl
*>((*channels
)[i
]);
89 // The message port ids might not be set up yet if this channel
90 // wasn't created on the main thread.
91 DCHECK(webchannel
->child_thread_loop_
->BelongsToCurrentThread());
92 message_port_ids
[i
] = webchannel
->message_port_id();
93 webchannel
->QueueMessages();
94 DCHECK(message_port_ids
[i
] != MSG_ROUTING_NONE
);
98 return message_port_ids
;
101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
102 // Must lock here since client_ is called on the main thread.
103 base::AutoLock
auto_lock(lock_
);
107 void WebMessagePortChannelImpl::destroy() {
110 // Release the object on the main thread, since the destructor might want to
111 // send an IPC, and that has to happen on the main thread.
112 child_thread_loop_
->ReleaseSoon(FROM_HERE
, this);
115 void WebMessagePortChannelImpl::postMessage(
116 const WebString
& message
,
117 WebMessagePortChannelArray
* channels
) {
118 if (!child_thread_loop_
->BelongsToCurrentThread()) {
119 child_thread_loop_
->PostTask(
122 &WebMessagePortChannelImpl::PostMessage
, this,
123 static_cast<base::string16
>(message
), channels
));
125 PostMessage(message
, channels
);
129 void WebMessagePortChannelImpl::PostMessage(
130 const base::string16
& message
,
131 WebMessagePortChannelArray
* channels
) {
132 IPC::Message
* msg
= new MessagePortHostMsg_PostMessage(
133 message_port_id_
, message
, ExtractMessagePortIDs(channels
));
137 bool WebMessagePortChannelImpl::tryGetMessage(
139 WebMessagePortChannelArray
& channels
) {
140 base::AutoLock
auto_lock(lock_
);
141 if (message_queue_
.empty())
144 *message
= message_queue_
.front().message
;
145 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
146 message_queue_
.front().ports
;
147 WebMessagePortChannelArray
result_ports(channel_array
.size());
148 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
149 result_ports
[i
] = channel_array
[i
];
152 channels
.swap(result_ports
);
153 message_queue_
.pop();
157 void WebMessagePortChannelImpl::Init() {
158 if (!child_thread_loop_
->BelongsToCurrentThread()) {
159 child_thread_loop_
->PostTask(
160 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::Init
, this));
164 if (route_id_
== MSG_ROUTING_NONE
) {
165 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
166 Send(new MessagePortHostMsg_CreateMessagePort(
167 &route_id_
, &message_port_id_
));
170 ChildThread::current()->GetRouter()->AddRoute(route_id_
, this);
173 void WebMessagePortChannelImpl::Entangle(
174 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
175 // The message port ids might not be set up yet, if this channel wasn't
176 // created on the main thread. So need to wait until we're on the main thread
177 // before getting the other message port id.
178 if (!child_thread_loop_
->BelongsToCurrentThread()) {
179 child_thread_loop_
->PostTask(
181 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
185 Send(new MessagePortHostMsg_Entangle(
186 message_port_id_
, channel
->message_port_id()));
189 void WebMessagePortChannelImpl::QueueMessages() {
190 if (!child_thread_loop_
->BelongsToCurrentThread()) {
191 child_thread_loop_
->PostTask(
192 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
195 // This message port is being sent elsewhere (perhaps to another process).
196 // The new endpoint needs to receive the queued messages, including ones that
197 // could still be in-flight. So we tell the browser to queue messages, and it
198 // sends us an ack, whose receipt we know means that no more messages are
199 // in-flight. We then send the queued messages to the browser, which prepends
200 // them to the ones it queued and it sends them to the new endpoint.
201 Send(new MessagePortHostMsg_QueueMessages(message_port_id_
));
203 // The process could potentially go away while we're still waiting for
204 // in-flight messages. Ensure it stays alive.
205 ChildProcess::current()->AddRefProcess();
208 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
209 if (!child_thread_loop_
->BelongsToCurrentThread()) {
210 DCHECK(!message
->is_sync());
211 child_thread_loop_
->PostTask(
213 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
217 ChildThread::current()->GetRouter()->Send(message
);
220 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
222 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
223 IPC_MESSAGE_HANDLER(MessagePortMsg_Message
, OnMessage
)
224 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued
, OnMessagesQueued
)
225 IPC_MESSAGE_UNHANDLED(handled
= false)
226 IPC_END_MESSAGE_MAP()
230 void WebMessagePortChannelImpl::OnMessage(
231 const base::string16
& message
,
232 const std::vector
<int>& sent_message_port_ids
,
233 const std::vector
<int>& new_routing_ids
) {
234 base::AutoLock
auto_lock(lock_
);
236 msg
.message
= message
;
237 if (!sent_message_port_ids
.empty()) {
238 msg
.ports
.resize(sent_message_port_ids
.size());
239 for (size_t i
= 0; i
< sent_message_port_ids
.size(); ++i
) {
240 msg
.ports
[i
] = new WebMessagePortChannelImpl(new_routing_ids
[i
],
241 sent_message_port_ids
[i
],
242 child_thread_loop_
.get());
246 bool was_empty
= message_queue_
.empty();
247 message_queue_
.push(msg
);
248 if (client_
&& was_empty
)
249 client_
->messageAvailable();
252 void WebMessagePortChannelImpl::OnMessagesQueued() {
253 std::vector
<QueuedMessage
> queued_messages
;
256 base::AutoLock
auto_lock(lock_
);
257 queued_messages
.reserve(message_queue_
.size());
258 while (!message_queue_
.empty()) {
259 base::string16 message
= message_queue_
.front().message
;
260 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
261 message_queue_
.front().ports
;
262 std::vector
<int> port_ids(channel_array
.size());
263 for (size_t i
= 0; i
< channel_array
.size(); ++i
) {
264 port_ids
[i
] = channel_array
[i
]->message_port_id();
265 channel_array
[i
]->QueueMessages();
267 queued_messages
.push_back(std::make_pair(message
, port_ids
));
268 message_queue_
.pop();
272 Send(new MessagePortHostMsg_SendQueuedMessages(
273 message_port_id_
, queued_messages
));
275 message_port_id_
= MSG_ROUTING_NONE
;
278 ChildProcess::current()->ReleaseProcess();
281 WebMessagePortChannelImpl::Message::Message() {}
283 WebMessagePortChannelImpl::Message::~Message() {}
285 } // namespace content