Re-subimission of https://codereview.chromium.org/1041213003/
[chromium-blink-merge.git] / content / child / webmessageportchannel_impl.cc
blob86764afb268d47f893c89a85bee919fdf63eb04a
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/webmessageportchannel_impl.h"
7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/values.h"
10 #include "content/child/child_process.h"
11 #include "content/child/child_thread_impl.h"
12 #include "content/common/message_port_messages.h"
13 #include "content/public/child/v8_value_converter.h"
14 #include "third_party/WebKit/public/platform/WebMessagePortChannelClient.h"
15 #include "third_party/WebKit/public/platform/WebString.h"
16 #include "third_party/WebKit/public/web/WebSerializedScriptValue.h"
17 #include "v8/include/v8.h"
19 using blink::WebMessagePortChannel;
20 using blink::WebMessagePortChannelArray;
21 using blink::WebMessagePortChannelClient;
22 using blink::WebString;
24 namespace content {
26 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
27 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner)
28 : client_(NULL),
29 route_id_(MSG_ROUTING_NONE),
30 message_port_id_(MSG_ROUTING_NONE),
31 send_messages_as_values_(false),
32 main_thread_task_runner_(main_thread_task_runner) {
33 AddRef();
34 Init();
37 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
38 int route_id,
39 const TransferredMessagePort& port,
40 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner)
41 : client_(NULL),
42 route_id_(route_id),
43 message_port_id_(port.id),
44 send_messages_as_values_(port.send_messages_as_values),
45 main_thread_task_runner_(main_thread_task_runner) {
46 AddRef();
47 Init();
50 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
51 // If we have any queued messages with attached ports, manually destroy them.
52 while (!message_queue_.empty()) {
53 const WebMessagePortChannelArray& channel_array =
54 message_queue_.front().ports;
55 for (size_t i = 0; i < channel_array.size(); i++) {
56 channel_array[i]->destroy();
58 message_queue_.pop();
61 if (message_port_id_ != MSG_ROUTING_NONE)
62 Send(new MessagePortHostMsg_DestroyMessagePort(message_port_id_));
64 if (route_id_ != MSG_ROUTING_NONE)
65 ChildThreadImpl::current()->GetRouter()->RemoveRoute(route_id_);
68 // static
69 void WebMessagePortChannelImpl::CreatePair(
70 const scoped_refptr<base::SingleThreadTaskRunner>& main_thread_task_runner,
71 blink::WebMessagePortChannel** channel1,
72 blink::WebMessagePortChannel** channel2) {
73 WebMessagePortChannelImpl* impl1 =
74 new WebMessagePortChannelImpl(main_thread_task_runner);
75 WebMessagePortChannelImpl* impl2 =
76 new WebMessagePortChannelImpl(main_thread_task_runner);
78 impl1->Entangle(impl2);
79 impl2->Entangle(impl1);
81 *channel1 = impl1;
82 *channel2 = impl2;
85 // static
86 std::vector<TransferredMessagePort>
87 WebMessagePortChannelImpl::ExtractMessagePortIDs(
88 WebMessagePortChannelArray* channels) {
89 std::vector<TransferredMessagePort> message_ports;
90 if (channels) {
91 // Extract the port IDs from the source array, then free it.
92 message_ports = ExtractMessagePortIDs(*channels);
93 delete channels;
95 return message_ports;
98 // static
99 std::vector<TransferredMessagePort>
100 WebMessagePortChannelImpl::ExtractMessagePortIDs(
101 const WebMessagePortChannelArray& channels) {
102 std::vector<TransferredMessagePort> message_ports(channels.size());
103 for (size_t i = 0; i < channels.size(); ++i) {
104 WebMessagePortChannelImpl* webchannel =
105 static_cast<WebMessagePortChannelImpl*>(channels[i]);
106 // The message port ids might not be set up yet if this channel
107 // wasn't created on the main thread.
108 DCHECK(webchannel->main_thread_task_runner_->BelongsToCurrentThread());
109 message_ports[i].id = webchannel->message_port_id();
110 message_ports[i].send_messages_as_values =
111 webchannel->send_messages_as_values_;
112 webchannel->QueueMessages();
113 DCHECK(message_ports[i].id != MSG_ROUTING_NONE);
115 return message_ports;
118 // static
119 WebMessagePortChannelArray WebMessagePortChannelImpl::CreatePorts(
120 const std::vector<TransferredMessagePort>& message_ports,
121 const std::vector<int>& new_routing_ids,
122 const scoped_refptr<base::SingleThreadTaskRunner>&
123 main_thread_task_runner) {
124 DCHECK_EQ(message_ports.size(), new_routing_ids.size());
125 WebMessagePortChannelArray channels(message_ports.size());
126 for (size_t i = 0; i < message_ports.size() && i < new_routing_ids.size();
127 ++i) {
128 channels[i] = new WebMessagePortChannelImpl(
129 new_routing_ids[i], message_ports[i],
130 main_thread_task_runner);
132 return channels;
135 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
136 // Must lock here since client_ is called on the main thread.
137 base::AutoLock auto_lock(lock_);
138 client_ = client;
141 void WebMessagePortChannelImpl::destroy() {
142 setClient(NULL);
144 // Release the object on the main thread, since the destructor might want to
145 // send an IPC, and that has to happen on the main thread.
146 main_thread_task_runner_->ReleaseSoon(FROM_HERE, this);
149 void WebMessagePortChannelImpl::postMessage(
150 const WebString& message_as_string,
151 WebMessagePortChannelArray* channels) {
152 MessagePortMessage message(message_as_string);
153 if (send_messages_as_values_) {
154 blink::WebSerializedScriptValue serialized_value =
155 blink::WebSerializedScriptValue::fromString(message_as_string);
156 v8::Handle<v8::Value> v8_value = serialized_value.deserialize();
157 scoped_ptr<V8ValueConverter> converter(V8ValueConverter::create());
158 converter->SetDateAllowed(true);
159 converter->SetRegExpAllowed(true);
160 scoped_ptr<base::Value> message_as_value(converter->FromV8Value(
161 v8_value, v8::Isolate::GetCurrent()->GetCurrentContext()));
162 message = MessagePortMessage(message_as_value.Pass());
164 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
165 main_thread_task_runner_->PostTask(
166 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::PostMessage, this,
167 message, channels));
168 } else {
169 PostMessage(message, channels);
173 void WebMessagePortChannelImpl::PostMessage(
174 const MessagePortMessage& message,
175 WebMessagePortChannelArray* channels) {
176 IPC::Message* msg = new MessagePortHostMsg_PostMessage(
177 message_port_id_, message, ExtractMessagePortIDs(channels));
178 Send(msg);
181 bool WebMessagePortChannelImpl::tryGetMessage(
182 WebString* message,
183 WebMessagePortChannelArray& channels) {
184 base::AutoLock auto_lock(lock_);
185 if (message_queue_.empty())
186 return false;
188 const MessagePortMessage& data = message_queue_.front().message;
189 DCHECK(data.is_string() != data.is_value());
190 if (data.is_value()) {
191 v8::HandleScope handle_scope(client_->scriptIsolate());
192 v8::Context::Scope context_scope(
193 client_->scriptContextForMessageConversion());
194 scoped_ptr<V8ValueConverter> converter(V8ValueConverter::create());
195 converter->SetDateAllowed(true);
196 converter->SetRegExpAllowed(true);
197 v8::Handle<v8::Value> v8_value = converter->ToV8Value(
198 data.as_value(), client_->scriptContextForMessageConversion());
199 blink::WebSerializedScriptValue serialized_value =
200 blink::WebSerializedScriptValue::serialize(v8_value);
201 *message = serialized_value.toString();
202 } else {
203 *message = message_queue_.front().message.message_as_string;
205 channels = message_queue_.front().ports;
206 message_queue_.pop();
207 return true;
210 void WebMessagePortChannelImpl::Init() {
211 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
212 main_thread_task_runner_->PostTask(
213 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::Init, this));
214 return;
217 if (route_id_ == MSG_ROUTING_NONE) {
218 DCHECK(message_port_id_ == MSG_ROUTING_NONE);
219 Send(new MessagePortHostMsg_CreateMessagePort(
220 &route_id_, &message_port_id_));
221 } else if (message_port_id_ != MSG_ROUTING_NONE) {
222 Send(new MessagePortHostMsg_ReleaseMessages(message_port_id_));
225 ChildThreadImpl::current()->GetRouter()->AddRoute(route_id_, this);
228 void WebMessagePortChannelImpl::Entangle(
229 scoped_refptr<WebMessagePortChannelImpl> channel) {
230 // The message port ids might not be set up yet, if this channel wasn't
231 // created on the main thread. So need to wait until we're on the main thread
232 // before getting the other message port id.
233 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
234 main_thread_task_runner_->PostTask(
235 FROM_HERE,
236 base::Bind(&WebMessagePortChannelImpl::Entangle, this, channel));
237 return;
240 Send(new MessagePortHostMsg_Entangle(
241 message_port_id_, channel->message_port_id()));
244 void WebMessagePortChannelImpl::QueueMessages() {
245 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
246 main_thread_task_runner_->PostTask(
247 FROM_HERE, base::Bind(&WebMessagePortChannelImpl::QueueMessages, this));
248 return;
250 // This message port is being sent elsewhere (perhaps to another process).
251 // The new endpoint needs to receive the queued messages, including ones that
252 // could still be in-flight. So we tell the browser to queue messages, and it
253 // sends us an ack, whose receipt we know means that no more messages are
254 // in-flight. We then send the queued messages to the browser, which prepends
255 // them to the ones it queued and it sends them to the new endpoint.
256 Send(new MessagePortHostMsg_QueueMessages(message_port_id_));
258 // The process could potentially go away while we're still waiting for
259 // in-flight messages. Ensure it stays alive.
260 ChildProcess::current()->AddRefProcess();
263 void WebMessagePortChannelImpl::Send(IPC::Message* message) {
264 if (!main_thread_task_runner_->BelongsToCurrentThread()) {
265 DCHECK(!message->is_sync());
266 main_thread_task_runner_->PostTask(
267 FROM_HERE,
268 base::Bind(&WebMessagePortChannelImpl::Send, this, message));
269 return;
272 ChildThreadImpl::current()->GetRouter()->Send(message);
275 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
276 bool handled = true;
277 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
278 IPC_MESSAGE_HANDLER(MessagePortMsg_Message, OnMessage)
279 IPC_MESSAGE_HANDLER(MessagePortMsg_MessagesQueued, OnMessagesQueued)
280 IPC_MESSAGE_UNHANDLED(handled = false)
281 IPC_END_MESSAGE_MAP()
282 return handled;
285 void WebMessagePortChannelImpl::OnMessage(
286 const MessagePortMessage& message,
287 const std::vector<TransferredMessagePort>& sent_message_ports,
288 const std::vector<int>& new_routing_ids) {
289 base::AutoLock auto_lock(lock_);
290 Message msg;
291 msg.message = message;
292 msg.ports = CreatePorts(sent_message_ports, new_routing_ids,
293 main_thread_task_runner_.get());
295 bool was_empty = message_queue_.empty();
296 message_queue_.push(msg);
297 if (client_ && was_empty)
298 client_->messageAvailable();
301 void WebMessagePortChannelImpl::OnMessagesQueued() {
302 std::vector<QueuedMessage> queued_messages;
305 base::AutoLock auto_lock(lock_);
306 queued_messages.reserve(message_queue_.size());
307 while (!message_queue_.empty()) {
308 MessagePortMessage message = message_queue_.front().message;
309 std::vector<TransferredMessagePort> ports =
310 ExtractMessagePortIDs(message_queue_.front().ports);
311 queued_messages.push_back(std::make_pair(message, ports));
312 message_queue_.pop();
316 Send(new MessagePortHostMsg_SendQueuedMessages(
317 message_port_id_, queued_messages));
319 message_port_id_ = MSG_ROUTING_NONE;
321 Release();
322 ChildProcess::current()->ReleaseProcess();
325 WebMessagePortChannelImpl::Message::Message() {}
327 WebMessagePortChannelImpl::Message::~Message() {}
329 } // namespace content