1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/common/webmessageportchannel_impl.h"
8 #include "content/common/child_process.h"
9 #include "content/common/child_thread.h"
10 #include "content/common/worker_messages.h"
11 #include "third_party/WebKit/Source/WebKit/chromium/public/platform/WebString.h"
12 #include "third_party/WebKit/Source/WebKit/chromium/public/WebMessagePortChannelClient.h"
14 using WebKit::WebMessagePortChannel
;
15 using WebKit::WebMessagePortChannelArray
;
16 using WebKit::WebMessagePortChannelClient
;
17 using WebKit::WebString
;
21 WebMessagePortChannelImpl::WebMessagePortChannelImpl()
23 route_id_(MSG_ROUTING_NONE
),
24 message_port_id_(MSG_ROUTING_NONE
) {
29 WebMessagePortChannelImpl::WebMessagePortChannelImpl(
34 message_port_id_(message_port_id
) {
39 WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
40 // If we have any queued messages with attached ports, manually destroy them.
41 while (!message_queue_
.empty()) {
42 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
43 message_queue_
.front().ports
;
44 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
45 channel_array
[i
]->destroy();
50 if (message_port_id_
!= MSG_ROUTING_NONE
)
51 Send(new WorkerProcessHostMsg_DestroyMessagePort(message_port_id_
));
53 if (route_id_
!= MSG_ROUTING_NONE
)
54 ChildThread::current()->RemoveRoute(route_id_
);
57 void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient
* client
) {
58 // Must lock here since client_ is called on the main thread.
59 base::AutoLock
auto_lock(lock_
);
63 void WebMessagePortChannelImpl::destroy() {
66 // Release the object on the main thread, since the destructor might want to
67 // send an IPC, and that has to happen on the main thread.
68 ChildThread::current()->message_loop()->ReleaseSoon(FROM_HERE
, this);
71 void WebMessagePortChannelImpl::entangle(WebMessagePortChannel
* channel
) {
72 // The message port ids might not be set up yet, if this channel wasn't
73 // created on the main thread. So need to wait until we're on the main thread
74 // before getting the other message port id.
75 scoped_refptr
<WebMessagePortChannelImpl
> webchannel(
76 static_cast<WebMessagePortChannelImpl
*>(channel
));
80 void WebMessagePortChannelImpl::postMessage(
81 const WebString
& message
,
82 WebMessagePortChannelArray
* channels
) {
83 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
84 ChildThread::current()->message_loop()->PostTask(
86 base::Bind(&WebMessagePortChannelImpl::postMessage
, this,
91 std::vector
<int> message_port_ids(channels
? channels
->size() : 0);
93 // Extract the port IDs from the source array, then free it.
94 for (size_t i
= 0; i
< channels
->size(); ++i
) {
95 WebMessagePortChannelImpl
* webchannel
=
96 static_cast<WebMessagePortChannelImpl
*>((*channels
)[i
]);
97 message_port_ids
[i
] = webchannel
->message_port_id();
98 webchannel
->QueueMessages();
99 DCHECK(message_port_ids
[i
] != MSG_ROUTING_NONE
);
104 IPC::Message
* msg
= new WorkerProcessHostMsg_PostMessage(
105 message_port_id_
, message
, message_port_ids
);
109 bool WebMessagePortChannelImpl::tryGetMessage(
111 WebMessagePortChannelArray
& channels
) {
112 base::AutoLock
auto_lock(lock_
);
113 if (message_queue_
.empty())
116 *message
= message_queue_
.front().message
;
117 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
118 message_queue_
.front().ports
;
119 WebMessagePortChannelArray
result_ports(channel_array
.size());
120 for (size_t i
= 0; i
< channel_array
.size(); i
++) {
121 result_ports
[i
] = channel_array
[i
];
124 channels
.swap(result_ports
);
125 message_queue_
.pop();
129 void WebMessagePortChannelImpl::Init() {
130 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
131 ChildThread::current()->message_loop()->PostTask(
133 base::Bind(&WebMessagePortChannelImpl::Init
, this));
137 if (route_id_
== MSG_ROUTING_NONE
) {
138 DCHECK(message_port_id_
== MSG_ROUTING_NONE
);
139 Send(new WorkerProcessHostMsg_CreateMessagePort(
140 &route_id_
, &message_port_id_
));
143 ChildThread::current()->AddRoute(route_id_
, this);
146 void WebMessagePortChannelImpl::Entangle(
147 scoped_refptr
<WebMessagePortChannelImpl
> channel
) {
148 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
149 ChildThread::current()->message_loop()->PostTask(
151 base::Bind(&WebMessagePortChannelImpl::Entangle
, this, channel
));
155 Send(new WorkerProcessHostMsg_Entangle(
156 message_port_id_
, channel
->message_port_id()));
159 void WebMessagePortChannelImpl::QueueMessages() {
160 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
161 ChildThread::current()->message_loop()->PostTask(
163 base::Bind(&WebMessagePortChannelImpl::QueueMessages
, this));
166 // This message port is being sent elsewhere (perhaps to another process).
167 // The new endpoint needs to receive the queued messages, including ones that
168 // could still be in-flight. So we tell the browser to queue messages, and it
169 // sends us an ack, whose receipt we know means that no more messages are
170 // in-flight. We then send the queued messages to the browser, which prepends
171 // them to the ones it queued and it sends them to the new endpoint.
172 Send(new WorkerProcessHostMsg_QueueMessages(message_port_id_
));
174 // The process could potentially go away while we're still waiting for
175 // in-flight messages. Ensure it stays alive.
176 ChildProcess::current()->AddRefProcess();
179 void WebMessagePortChannelImpl::Send(IPC::Message
* message
) {
180 if (MessageLoop::current() != ChildThread::current()->message_loop()) {
181 DCHECK(!message
->is_sync());
182 ChildThread::current()->message_loop()->PostTask(
184 base::Bind(&WebMessagePortChannelImpl::Send
, this, message
));
188 ChildThread::current()->Send(message
);
191 bool WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message
& message
) {
193 IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl
, message
)
194 IPC_MESSAGE_HANDLER(WorkerProcessMsg_Message
, OnMessage
)
195 IPC_MESSAGE_HANDLER(WorkerProcessMsg_MessagesQueued
, OnMessagedQueued
)
196 IPC_MESSAGE_UNHANDLED(handled
= false)
197 IPC_END_MESSAGE_MAP()
201 void WebMessagePortChannelImpl::OnMessage(
202 const string16
& message
,
203 const std::vector
<int>& sent_message_port_ids
,
204 const std::vector
<int>& new_routing_ids
) {
205 base::AutoLock
auto_lock(lock_
);
207 msg
.message
= message
;
208 if (!sent_message_port_ids
.empty()) {
209 msg
.ports
.resize(sent_message_port_ids
.size());
210 for (size_t i
= 0; i
< sent_message_port_ids
.size(); ++i
) {
211 msg
.ports
[i
] = new WebMessagePortChannelImpl(
212 new_routing_ids
[i
], sent_message_port_ids
[i
]);
216 bool was_empty
= message_queue_
.empty();
217 message_queue_
.push(msg
);
218 if (client_
&& was_empty
)
219 client_
->messageAvailable();
222 void WebMessagePortChannelImpl::OnMessagedQueued() {
223 std::vector
<QueuedMessage
> queued_messages
;
226 base::AutoLock
auto_lock(lock_
);
227 queued_messages
.reserve(message_queue_
.size());
228 while (!message_queue_
.empty()) {
229 string16 message
= message_queue_
.front().message
;
230 const std::vector
<WebMessagePortChannelImpl
*>& channel_array
=
231 message_queue_
.front().ports
;
232 std::vector
<int> port_ids(channel_array
.size());
233 for (size_t i
= 0; i
< channel_array
.size(); ++i
) {
234 port_ids
[i
] = channel_array
[i
]->message_port_id();
236 queued_messages
.push_back(std::make_pair(message
, port_ids
));
237 message_queue_
.pop();
241 Send(new WorkerProcessHostMsg_SendQueuedMessages(
242 message_port_id_
, queued_messages
));
244 message_port_id_
= MSG_ROUTING_NONE
;
247 ChildProcess::current()->ReleaseProcess();
250 WebMessagePortChannelImpl::Message::Message() {}
252 WebMessagePortChannelImpl::Message::~Message() {}
254 } // namespace content