1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "content/child/child_process.h"
10 #include "content/child/child_thread.h"
11 #include "content/common/message_port_messages.h"
12 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
13 #include "third_party/WebKit/public/platform/WebString.h"
15 using blink::WebMessagePortChannel
;
16 using blink::WebMessagePortChannelArray
;
17 using blink::WebMessagePortChannelClient
;
18 using blink::WebString
;
22 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
23 base::MessageLoopProxy
* child_thread_loop
)
25 route_id_(MSG_ROUTING_NONE
),
26 message_port_id_(MSG_ROUTING_NONE
),
27 child_thread_loop_(child_thread_loop
) {
32 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
35 base::MessageLoopProxy
* child_thread_loop
)
38 message_port_id_(message_port_id
),
39 child_thread_loop_(child_thread_loop
) {
44 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
45 // If we have any queued messages with attached ports, manually destroy them.
46 while (!message_queue_
.empty()) {
47 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
48 message_queue_
.front().ports
;
49 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
50 channel_array
[i
]->destroy();
55 if (message_port_id_
!= MSG_ROUTING_NONE
)
56 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_
));
58 if (route_id_
!= MSG_ROUTING_NONE
)
59 ChildThread::current()->GetRouter()->RemoveRoute(route_id_
);
63 void WebMessagePortChannelImpl::CreatePair(
64 base::MessageLoopProxy
* child_thread_loop
,
65 blink::WebMessagePortChannel
** channel1
,
66 blink::WebMessagePortChannel
** channel2
) {
67 WebMessagePortChannelImpl
* impl1
=
68 new WebMessagePortChannelImpl(child_thread_loop
);
69 WebMessagePortChannelImpl
* impl2
=
70 new WebMessagePortChannelImpl(child_thread_loop
);
72 impl1
->Entangle(impl2
);
73 impl2
->Entangle(impl1
);
80 std::vector
<int> WebMessagePortChannelImpl::ExtractMessagePortIDs(
81 WebMessagePortChannelArray
* channels
) {
82 std::vector
<int> message_port_ids
;
84 message_port_ids
.resize(channels
->size());
85 // Extract the port IDs from the source array, then free it.
86 for (size_t i
= 0; i
< channels
->size(); ++i
) {
87 WebMessagePortChannelImpl
* webchannel
=
88 static_cast<WebMessagePortChannelImpl
*>((*channels
)[i
]);
89 message_port_ids
[i
] = webchannel
->message_port_id();
90 webchannel
->QueueMessages();
91 DCHECK(message_port_ids
[i
] != MSG_ROUTING_NONE
);
95 return message_port_ids
;
98 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
99 // Must lock here since client_ is called on the main thread.
100 base::AutoLock
auto_lock(lock_
);
104 void WebMessagePortChannelImpl::destroy() {
107 // Release the object on the main thread, since the destructor might want to
108 // send an IPC, and that has to happen on the main thread.
109 child_thread_loop_
->ReleaseSoon(FROM_HERE
, this);
112 void WebMessagePortChannelImpl::postMessage(
113 const WebString
& message
,
114 WebMessagePortChannelArray
* channels
) {
115 if (!child_thread_loop_
->BelongsToCurrentThread()) {
116 child_thread_loop_
->PostTask(
119 &WebMessagePortChannelImpl::PostMessage
, this, message
, channels
));
121 PostMessage(message
, channels
);
125 void WebMessagePortChannelImpl::PostMessage(
126 const base::string16
& message
,
127 WebMessagePortChannelArray
* channels
) {
128 IPC::Message
* msg
= new MessagePortHostMsg_PostMessage(
129 message_port_id_
, message
, ExtractMessagePortIDs(channels
));
133 bool WebMessagePortChannelImpl::tryGetMessage(
135 WebMessagePortChannelArray
& channels
) {
136 base::AutoLock
auto_lock(lock_
);
137 if (message_queue_
.empty())
140 *message
= message_queue_
.front().message
;
141 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
142 message_queue_
.front().ports
;
143 WebMessagePortChannelArray
result_ports(channel_array
.size());
144 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
145 result_ports
[i
] = channel_array
[i
];
148 channels
.swap(result_ports
);
149 message_queue_
.pop();
153 void WebMessagePortChannelImpl::Init() {
154 if (!child_thread_loop_
->BelongsToCurrentThread()) {
155 child_thread_loop_
->PostTask(
156 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::Init
, this));
160 if (route_id_
== MSG_ROUTING_NONE
) {
161 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
162 Send(new MessagePortHostMsg_CreateMessagePort(
163 &route_id_
, &message_port_id_
));
166 ChildThread::current()->GetRouter()->AddRoute(route_id_
, this);
169 void WebMessagePortChannelImpl::Entangle(
170 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
171 // The message port ids might not be set up yet, if this channel wasn't
172 // created on the main thread. So need to wait until we're on the main thread
173 // before getting the other message port id.
174 if (!child_thread_loop_
->BelongsToCurrentThread()) {
175 child_thread_loop_
->PostTask(
177 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
181 Send(new MessagePortHostMsg_Entangle(
182 message_port_id_
, channel
->message_port_id()));
185 void WebMessagePortChannelImpl::QueueMessages() {
186 if (!child_thread_loop_
->BelongsToCurrentThread()) {
187 child_thread_loop_
->PostTask(
188 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
191 // This message port is being sent elsewhere (perhaps to another process).
192 // The new endpoint needs to receive the queued messages, including ones that
193 // could still be in-flight. So we tell the browser to queue messages, and it
194 // sends us an ack, whose receipt we know means that no more messages are
195 // in-flight. We then send the queued messages to the browser, which prepends
196 // them to the ones it queued and it sends them to the new endpoint.
197 Send(new MessagePortHostMsg_QueueMessages(message_port_id_
));
199 // The process could potentially go away while we're still waiting for
200 // in-flight messages. Ensure it stays alive.
201 ChildProcess::current()->AddRefProcess();
204 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
205 if (!child_thread_loop_
->BelongsToCurrentThread()) {
206 DCHECK(!message
->is_sync());
207 child_thread_loop_
->PostTask(
209 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
213 ChildThread::current()->GetRouter()->Send(message
);
216 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
218 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
219 IPC_MESSAGE_HANDLER(MessagePortMsg_Message
, OnMessage
)
220 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued
, OnMessagesQueued
)
221 IPC_MESSAGE_UNHANDLED(handled
= false)
222 IPC_END_MESSAGE_MAP()
226 void WebMessagePortChannelImpl::OnMessage(
227 const base::string16
& message
,
228 const std::vector
<int>& sent_message_port_ids
,
229 const std::vector
<int>& new_routing_ids
) {
230 base::AutoLock
auto_lock(lock_
);
232 msg
.message
= message
;
233 if (!sent_message_port_ids
.empty()) {
234 msg
.ports
.resize(sent_message_port_ids
.size());
235 for (size_t i
= 0; i
< sent_message_port_ids
.size(); ++i
) {
236 msg
.ports
[i
] = new WebMessagePortChannelImpl(new_routing_ids
[i
],
237 sent_message_port_ids
[i
],
238 child_thread_loop_
.get());
242 bool was_empty
= message_queue_
.empty();
243 message_queue_
.push(msg
);
244 if (client_
&& was_empty
)
245 client_
->messageAvailable();
248 void WebMessagePortChannelImpl::OnMessagesQueued() {
249 std::vector
<QueuedMessage
> queued_messages
;
252 base::AutoLock
auto_lock(lock_
);
253 queued_messages
.reserve(message_queue_
.size());
254 while (!message_queue_
.empty()) {
255 base::string16 message
= message_queue_
.front().message
;
256 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
257 message_queue_
.front().ports
;
258 std::vector
<int> port_ids(channel_array
.size());
259 for (size_t i
= 0; i
< channel_array
.size(); ++i
) {
260 port_ids
[i
] = channel_array
[i
]->message_port_id();
261 channel_array
[i
]->QueueMessages();
263 queued_messages
.push_back(std::make_pair(message
, port_ids
));
264 message_queue_
.pop();
268 Send(new MessagePortHostMsg_SendQueuedMessages(
269 message_port_id_
, queued_messages
));
271 message_port_id_
= MSG_ROUTING_NONE
;
274 ChildProcess::current()->ReleaseProcess();
277 WebMessagePortChannelImpl::Message::Message() {}
279 WebMessagePortChannelImpl::Message::~Message() {}
281 } // namespace content