1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread_impl.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
15 using blink::WebMessagePortChannel
;
16 using blink::WebMessagePortChannelArray
;
17 using blink::WebMessagePortChannelClient
;
18 using blink::WebString
;
22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
)
25 route_id_(MSG_ROUTING_NONE
),
26 message_port_id_(MSG_ROUTING_NONE
),
27 main_thread_task_runner_(main_thread_task_runner
) {
32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
35 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
)
38 message_port_id_(message_port_id
),
39 main_thread_task_runner_(main_thread_task_runner
) {
44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45 // If we have any queued messages with attached ports, manually destroy them.
46 while (!message_queue_
.empty()) {
47 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
48 message_queue_
.front().ports
;
49 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
50 channel_array
[i
]->destroy();
55 if (message_port_id_
!= MSG_ROUTING_NONE
)
56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_
));
58 if (route_id_
!= MSG_ROUTING_NONE
)
59 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_
);
63 void WebMessagePortChannelImpl::CreatePair(
64 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
,
65 blink::WebMessagePortChannel
** channel1
,
66 blink::WebMessagePortChannel
** channel2
) {
67 WebMessagePortChannelImpl
* impl1
=
68 new WebMessagePortChannelImpl(main_thread_task_runner
);
69 WebMessagePortChannelImpl
* impl2
=
70 new WebMessagePortChannelImpl(main_thread_task_runner
);
72 impl1
->Entangle(impl2
);
73 impl2
->Entangle(impl1
);
80 std::vector
<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81 WebMessagePortChannelArray
* channels
) {
82 std::vector
<int> message_port_ids
;
84 message_port_ids
.resize(channels
->size());
85 // Extract the port IDs from the source array, then free it.
86 for (size_t i
= 0; i
< channels
->size(); ++i
) {
87 WebMessagePortChannelImpl
* webchannel
=
88 static_cast<WebMessagePortChannelImpl
*>((*channels
)[i
]);
89 // The message port ids might not be set up yet if this channel
90 // wasn't created on the main thread.
91 DCHECK(webchannel
->main_thread_task_runner_
->BelongsToCurrentThread());
92 message_port_ids
[i
] = webchannel
->message_port_id();
93 webchannel
->QueueMessages();
94 DCHECK(message_port_ids
[i
] != MSG_ROUTING_NONE
);
98 return message_port_ids
;
101 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
102 // Must lock here since client_ is called on the main thread.
103 base::AutoLock
auto_lock(lock_
);
107 void WebMessagePortChannelImpl::destroy() {
110 // Release the object on the main thread, since the destructor might want to
111 // send an IPC, and that has to happen on the main thread.
112 main_thread_task_runner_
->ReleaseSoon(FROM_HERE
, this);
115 void WebMessagePortChannelImpl::postMessage(
116 const WebString
& message
,
117 WebMessagePortChannelArray
* channels
) {
118 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
119 main_thread_task_runner_
->PostTask(
122 &WebMessagePortChannelImpl::PostMessage
, this,
123 static_cast<base::string16
>(message
), channels
));
125 PostMessage(message
, channels
);
129 void WebMessagePortChannelImpl::PostMessage(
130 const base::string16
& message
,
131 WebMessagePortChannelArray
* channels
) {
132 IPC::Message
* msg
= new MessagePortHostMsg_PostMessage(
133 message_port_id_
, message
, ExtractMessagePortIDs(channels
));
137 bool WebMessagePortChannelImpl::tryGetMessage(
139 WebMessagePortChannelArray
& channels
) {
140 base::AutoLock
auto_lock(lock_
);
141 if (message_queue_
.empty())
144 *message
= message_queue_
.front().message
;
145 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
146 message_queue_
.front().ports
;
147 WebMessagePortChannelArray
result_ports(channel_array
.size());
148 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
149 result_ports
[i
] = channel_array
[i
];
152 channels
.swap(result_ports
);
153 message_queue_
.pop();
157 void WebMessagePortChannelImpl::Init() {
158 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
159 main_thread_task_runner_
->PostTask(
160 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::Init
, this));
164 if (route_id_
== MSG_ROUTING_NONE
) {
165 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
166 Send(new MessagePortHostMsg_CreateMessagePort(
167 &route_id_
, &message_port_id_
));
168 } else if (message_port_id_
!= MSG_ROUTING_NONE
) {
169 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_
));
172 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_
, this);
175 void WebMessagePortChannelImpl::Entangle(
176 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
177 // The message port ids might not be set up yet, if this channel wasn't
178 // created on the main thread. So need to wait until we're on the main thread
179 // before getting the other message port id.
180 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
181 main_thread_task_runner_
->PostTask(
183 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
187 Send(new MessagePortHostMsg_Entangle(
188 message_port_id_
, channel
->message_port_id()));
191 void WebMessagePortChannelImpl::QueueMessages() {
192 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
193 main_thread_task_runner_
->PostTask(
194 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
197 // This message port is being sent elsewhere (perhaps to another process).
198 // The new endpoint needs to receive the queued messages, including ones that
199 // could still be in-flight. So we tell the browser to queue messages, and it
200 // sends us an ack, whose receipt we know means that no more messages are
201 // in-flight. We then send the queued messages to the browser, which prepends
202 // them to the ones it queued and it sends them to the new endpoint.
203 Send(new MessagePortHostMsg_QueueMessages(message_port_id_
));
205 // The process could potentially go away while we're still waiting for
206 // in-flight messages. Ensure it stays alive.
207 ChildProcess::current()->AddRefProcess();
210 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
211 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
212 DCHECK(!message
->is_sync());
213 main_thread_task_runner_
->PostTask(
215 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
219 ChildThreadImpl::current()->GetRouter()->Send(message
);
222 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
224 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
225 IPC_MESSAGE_HANDLER(MessagePortMsg_Message
, OnMessage
)
226 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued
, OnMessagesQueued
)
227 IPC_MESSAGE_UNHANDLED(handled
= false)
228 IPC_END_MESSAGE_MAP()
232 void WebMessagePortChannelImpl::OnMessage(
233 const base::string16
& message
,
234 const std::vector
<int>& sent_message_port_ids
,
235 const std::vector
<int>& new_routing_ids
) {
236 base::AutoLock
auto_lock(lock_
);
238 msg
.message
= message
;
239 if (!sent_message_port_ids
.empty()) {
240 msg
.ports
.resize(sent_message_port_ids
.size());
241 for (size_t i
= 0; i
< sent_message_port_ids
.size(); ++i
) {
242 msg
.ports
[i
] = new WebMessagePortChannelImpl(
244 sent_message_port_ids
[i
],
245 main_thread_task_runner_
.get());
249 bool was_empty
= message_queue_
.empty();
250 message_queue_
.push(msg
);
251 if (client_
&& was_empty
)
252 client_
->messageAvailable();
255 void WebMessagePortChannelImpl::OnMessagesQueued() {
256 std::vector
<QueuedMessage
> queued_messages
;
259 base::AutoLock
auto_lock(lock_
);
260 queued_messages
.reserve(message_queue_
.size());
261 while (!message_queue_
.empty()) {
262 base::string16 message
= message_queue_
.front().message
;
263 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
264 message_queue_
.front().ports
;
265 std::vector
<int> port_ids(channel_array
.size());
266 for (size_t i
= 0; i
< channel_array
.size(); ++i
) {
267 port_ids
[i
] = channel_array
[i
]->message_port_id();
268 channel_array
[i
]->QueueMessages();
270 queued_messages
.push_back(std::make_pair(message
, port_ids
));
271 message_queue_
.pop();
275 Send(new MessagePortHostMsg_SendQueuedMessages(
276 message_port_id_
, queued_messages
));
278 message_port_id_
= MSG_ROUTING_NONE
;
281 ChildProcess::current()->ReleaseProcess();
284 WebMessagePortChannelImpl::Message::Message() {}
286 WebMessagePortChannelImpl::Message::~Message() {}
288 } // namespace content