1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/values.h"
10 #include "content/child/child_process.h"
11 #include "content/child/child_thread_impl.h"
12 #include "content/common/message_port_messages.h"
13 #include "content/public/child/v8_value_converter.h"
14 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
15 #include "third_party/WebKit/public/platform/WebString.h"
16 #include "third_party/WebKit/public/web/WebSerializedScriptValue.h"
17 #include "v8/include/v8.h"
19 using blink::WebMessagePortChannel
;
20 using blink::WebMessagePortChannelArray
;
21 using blink::WebMessagePortChannelClient
;
22 using blink::WebString
;
26 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
27 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
)
29 route_id_(MSG_ROUTING_NONE
),
30 message_port_id_(MSG_ROUTING_NONE
),
31 send_messages_as_values_(false),
32 main_thread_task_runner_(main_thread_task_runner
) {
37 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
39 const TransferredMessagePort
& port
,
40 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
)
43 message_port_id_(port
.id
),
44 send_messages_as_values_(port
.send_messages_as_values
),
45 main_thread_task_runner_(main_thread_task_runner
) {
50 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
51 // If we have any queued messages with attached ports, manually destroy them.
52 while (!message_queue_
.empty()) {
53 const WebMessagePortChannelArray
& channel_array
=
54 message_queue_
.front().ports
;
55 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
56 channel_array
[i
]->destroy();
61 if (message_port_id_
!= MSG_ROUTING_NONE
)
62 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_
));
64 if (route_id_
!= MSG_ROUTING_NONE
)
65 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_
);
69 void WebMessagePortChannelImpl::CreatePair(
70 const scoped_refptr
<base::SingleThreadTaskRunner
>& main_thread_task_runner
,
71 blink::WebMessagePortChannel
** channel1
,
72 blink::WebMessagePortChannel
** channel2
) {
73 WebMessagePortChannelImpl
* impl1
=
74 new WebMessagePortChannelImpl(main_thread_task_runner
);
75 WebMessagePortChannelImpl
* impl2
=
76 new WebMessagePortChannelImpl(main_thread_task_runner
);
78 impl1
->Entangle(impl2
);
79 impl2
->Entangle(impl1
);
86 std::vector
<TransferredMessagePort
>
87 WebMessagePortChannelImpl::ExtractMessagePortIDs(
88 WebMessagePortChannelArray
* channels
) {
89 std::vector
<TransferredMessagePort
> message_ports
;
91 // Extract the port IDs from the source array, then free it.
92 message_ports
= ExtractMessagePortIDs(*channels
);
99 std::vector
<TransferredMessagePort
>
100 WebMessagePortChannelImpl::ExtractMessagePortIDs(
101 const WebMessagePortChannelArray
& channels
) {
102 std::vector
<TransferredMessagePort
> message_ports(channels
.size());
103 for (size_t i
= 0; i
< channels
.size(); ++i
) {
104 WebMessagePortChannelImpl
* webchannel
=
105 static_cast<WebMessagePortChannelImpl
*>(channels
[i
]);
106 // The message port ids might not be set up yet if this channel
107 // wasn't created on the main thread.
108 DCHECK(webchannel
->main_thread_task_runner_
->BelongsToCurrentThread());
109 message_ports
[i
].id
= webchannel
->message_port_id();
110 message_ports
[i
].send_messages_as_values
=
111 webchannel
->send_messages_as_values_
;
112 webchannel
->QueueMessages();
113 DCHECK(message_ports
[i
].id
!= MSG_ROUTING_NONE
);
115 return message_ports
;
119 WebMessagePortChannelArray
WebMessagePortChannelImpl::CreatePorts(
120 const std::vector
<TransferredMessagePort
>& message_ports
,
121 const std::vector
<int>& new_routing_ids
,
122 const scoped_refptr
<base::SingleThreadTaskRunner
>&
123 main_thread_task_runner
) {
124 DCHECK_EQ(message_ports
.size(), new_routing_ids
.size());
125 WebMessagePortChannelArray
channels(message_ports
.size());
126 for (size_t i
= 0; i
< message_ports
.size() && i
< new_routing_ids
.size();
128 channels
[i
] = new WebMessagePortChannelImpl(
129 new_routing_ids
[i
], message_ports
[i
],
130 main_thread_task_runner
);
135 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
136 // Must lock here since client_ is called on the main thread.
137 base::AutoLock
auto_lock(lock_
);
141 void WebMessagePortChannelImpl::destroy() {
144 // Release the object on the main thread, since the destructor might want to
145 // send an IPC, and that has to happen on the main thread.
146 main_thread_task_runner_
->ReleaseSoon(FROM_HERE
, this);
149 void WebMessagePortChannelImpl::postMessage(
150 const WebString
& message_as_string
,
151 WebMessagePortChannelArray
* channels
) {
152 MessagePortMessage
message(message_as_string
);
153 if (send_messages_as_values_
) {
154 blink::WebSerializedScriptValue serialized_value
=
155 blink::WebSerializedScriptValue::fromString(message_as_string
);
156 v8::Handle
<v8::Value
> v8_value
= serialized_value
.deserialize();
157 scoped_ptr
<V8ValueConverter
> converter(V8ValueConverter::create());
158 converter
->SetDateAllowed(true);
159 converter
->SetRegExpAllowed(true);
160 scoped_ptr
<base::Value
> message_as_value(converter
->FromV8Value(
161 v8_value
, v8::Isolate::GetCurrent()->GetCurrentContext()));
162 message
= MessagePortMessage(message_as_value
.Pass());
164 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
165 main_thread_task_runner_
->PostTask(
166 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::PostMessage
, this,
169 PostMessage(message
, channels
);
173 void WebMessagePortChannelImpl::PostMessage(
174 const MessagePortMessage
& message
,
175 WebMessagePortChannelArray
* channels
) {
176 IPC::Message
* msg
= new MessagePortHostMsg_PostMessage(
177 message_port_id_
, message
, ExtractMessagePortIDs(channels
));
181 bool WebMessagePortChannelImpl::tryGetMessage(
183 WebMessagePortChannelArray
& channels
) {
184 base::AutoLock
auto_lock(lock_
);
185 if (message_queue_
.empty())
188 const MessagePortMessage
& data
= message_queue_
.front().message
;
189 DCHECK(data
.is_string() != data
.is_value());
190 if (data
.is_value()) {
191 v8::HandleScope
handle_scope(client_
->scriptIsolate());
192 v8::Context::Scope
context_scope(
193 client_
->scriptContextForMessageConversion());
194 scoped_ptr
<V8ValueConverter
> converter(V8ValueConverter::create());
195 converter
->SetDateAllowed(true);
196 converter
->SetRegExpAllowed(true);
197 v8::Handle
<v8::Value
> v8_value
= converter
->ToV8Value(
198 data
.as_value(), client_
->scriptContextForMessageConversion());
199 blink::WebSerializedScriptValue serialized_value
=
200 blink::WebSerializedScriptValue::serialize(v8_value
);
201 *message
= serialized_value
.toString();
203 *message
= message_queue_
.front().message
.message_as_string
;
205 channels
= message_queue_
.front().ports
;
206 message_queue_
.pop();
210 void WebMessagePortChannelImpl::Init() {
211 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
212 main_thread_task_runner_
->PostTask(
213 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::Init
, this));
217 if (route_id_
== MSG_ROUTING_NONE
) {
218 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
219 Send(new MessagePortHostMsg_CreateMessagePort(
220 &route_id_
, &message_port_id_
));
221 } else if (message_port_id_
!= MSG_ROUTING_NONE
) {
222 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_
));
225 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_
, this);
228 void WebMessagePortChannelImpl::Entangle(
229 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
230 // The message port ids might not be set up yet, if this channel wasn't
231 // created on the main thread. So need to wait until we're on the main thread
232 // before getting the other message port id.
233 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
234 main_thread_task_runner_
->PostTask(
236 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
240 Send(new MessagePortHostMsg_Entangle(
241 message_port_id_
, channel
->message_port_id()));
244 void WebMessagePortChannelImpl::QueueMessages() {
245 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
246 main_thread_task_runner_
->PostTask(
247 FROM_HERE
, base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
250 // This message port is being sent elsewhere (perhaps to another process).
251 // The new endpoint needs to receive the queued messages, including ones that
252 // could still be in-flight. So we tell the browser to queue messages, and it
253 // sends us an ack, whose receipt we know means that no more messages are
254 // in-flight. We then send the queued messages to the browser, which prepends
255 // them to the ones it queued and it sends them to the new endpoint.
256 Send(new MessagePortHostMsg_QueueMessages(message_port_id_
));
258 // The process could potentially go away while we're still waiting for
259 // in-flight messages. Ensure it stays alive.
260 ChildProcess::current()->AddRefProcess();
263 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
264 if (!main_thread_task_runner_
->BelongsToCurrentThread()) {
265 DCHECK(!message
->is_sync());
266 main_thread_task_runner_
->PostTask(
268 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
272 ChildThreadImpl::current()->GetRouter()->Send(message
);
275 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
277 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
278 IPC_MESSAGE_HANDLER(MessagePortMsg_Message
, OnMessage
)
279 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued
, OnMessagesQueued
)
280 IPC_MESSAGE_UNHANDLED(handled
= false)
281 IPC_END_MESSAGE_MAP()
285 void WebMessagePortChannelImpl::OnMessage(
286 const MessagePortMessage
& message
,
287 const std::vector
<TransferredMessagePort
>& sent_message_ports
,
288 const std::vector
<int>& new_routing_ids
) {
289 base::AutoLock
auto_lock(lock_
);
291 msg
.message
= message
;
292 msg
.ports
= CreatePorts(sent_message_ports
, new_routing_ids
,
293 main_thread_task_runner_
.get());
295 bool was_empty
= message_queue_
.empty();
296 message_queue_
.push(msg
);
297 if (client_
&& was_empty
)
298 client_
->messageAvailable();
301 void WebMessagePortChannelImpl::OnMessagesQueued() {
302 std::vector
<QueuedMessage
> queued_messages
;
305 base::AutoLock
auto_lock(lock_
);
306 queued_messages
.reserve(message_queue_
.size());
307 while (!message_queue_
.empty()) {
308 MessagePortMessage message
= message_queue_
.front().message
;
309 std::vector
<TransferredMessagePort
> ports
=
310 ExtractMessagePortIDs(message_queue_
.front().ports
);
311 queued_messages
.push_back(std::make_pair(message
, ports
));
312 message_queue_
.pop();
316 Send(new MessagePortHostMsg_SendQueuedMessages(
317 message_port_id_
, queued_messages
));
319 message_port_id_
= MSG_ROUTING_NONE
;
322 ChildProcess::current()->ReleaseProcess();
325 WebMessagePortChannelImpl::Message::Message() {}
327 WebMessagePortChannelImpl::Message::~Message() {}
329 } // namespace content