Extension syncing: Introduce a NeedsSync pref
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blobe8e3d289aceadefa9ce9d0b17c17aff07b6a7895
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
22 namespace IPC {
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
27 Listener* listener,
28 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
30 listener_(listener),
31 ipc_task_runner_(ipc_task_runner),
32 channel_connected_called_(false),
33 channel_send_thread_safe_(false),
34 message_filter_router_(new MessageFilterRouter()),
35 peer_pid_(base::kNullProcessId) {
36 DCHECK(ipc_task_runner_.get());
37 // The Listener thread where Messages are handled must be a separate thread
38 // to avoid oversubscribing the IO thread. If you trigger this error, you
39 // need to either:
40 // 1) Create the ChannelProxy on a different thread, or
41 // 2) Just use Channel
42 // Note, we currently make an exception for a NULL listener. That usage
43 // basically works, but is outside the intent of ChannelProxy. This support
44 // will disappear, so please don't rely on it. See crbug.com/364241
45 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
48 ChannelProxy::Context::~Context() {
51 void ChannelProxy::Context::ClearIPCTaskRunner() {
52 ipc_task_runner_ = NULL;
55 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
56 base::AutoLock l(channel_lifetime_lock_);
57 DCHECK(!channel_);
58 channel_id_ = factory->GetName();
59 channel_ = factory->BuildChannel(this);
60 channel_send_thread_safe_ = channel_->IsSendThreadSafe();
63 bool ChannelProxy::Context::TryFilters(const Message& message) {
64 DCHECK(message_filter_router_);
65 #ifdef IPC_MESSAGE_LOG_ENABLED
66 Logging* logger = Logging::GetInstance();
67 if (logger->Enabled())
68 logger->OnPreDispatchMessage(message);
69 #endif
71 if (message_filter_router_->TryFilters(message)) {
72 if (message.dispatch_error()) {
73 listener_task_runner_->PostTask(
74 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
76 #ifdef IPC_MESSAGE_LOG_ENABLED
77 if (logger->Enabled())
78 logger->OnPostDispatchMessage(message, channel_id_);
79 #endif
80 return true;
82 return false;
85 // Called on the IPC::Channel thread
86 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
87 // First give a chance to the filters to process this message.
88 if (!TryFilters(message))
89 OnMessageReceivedNoFilter(message);
90 return true;
93 // Called on the IPC::Channel thread
94 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
95 listener_task_runner_->PostTask(
96 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
97 return true;
100 // Called on the IPC::Channel thread
101 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
102 // We cache off the peer_pid so it can be safely accessed from both threads.
103 peer_pid_ = channel_->GetPeerPID();
105 // Add any pending filters. This avoids a race condition where someone
106 // creates a ChannelProxy, calls AddFilter, and then right after starts the
107 // peer process. The IO thread could receive a message before the task to add
108 // the filter is run on the IO thread.
109 OnAddFilter();
111 // See above comment about using listener_task_runner_ here.
112 listener_task_runner_->PostTask(
113 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
116 // Called on the IPC::Channel thread
117 void ChannelProxy::Context::OnChannelError() {
118 for (size_t i = 0; i < filters_.size(); ++i)
119 filters_[i]->OnChannelError();
121 // See above comment about using listener_task_runner_ here.
122 listener_task_runner_->PostTask(
123 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
126 // Called on the IPC::Channel thread
127 void ChannelProxy::Context::OnChannelOpened() {
128 DCHECK(channel_ != NULL);
130 // Assume a reference to ourselves on behalf of this thread. This reference
131 // will be released when we are closed.
132 AddRef();
134 if (!channel_->Connect()) {
135 OnChannelError();
136 return;
139 for (size_t i = 0; i < filters_.size(); ++i)
140 filters_[i]->OnFilterAdded(channel_.get());
143 // Called on the IPC::Channel thread
144 void ChannelProxy::Context::OnChannelClosed() {
145 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
146 tracked_objects::ScopedTracker tracking_profile(
147 FROM_HERE_WITH_EXPLICIT_FUNCTION(
148 "477117 ChannelProxy::Context::OnChannelClosed"));
149 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
150 // would result in this branch being taken.
151 if (!channel_)
152 return;
154 for (size_t i = 0; i < filters_.size(); ++i) {
155 filters_[i]->OnChannelClosing();
156 filters_[i]->OnFilterRemoved();
159 // We don't need the filters anymore.
160 message_filter_router_->Clear();
161 filters_.clear();
162 // We don't need the lock, because at this point, the listener thread can't
163 // access it any more.
164 pending_filters_.clear();
166 ClearChannel();
168 // Balance with the reference taken during startup. This may result in
169 // self-destruction.
170 Release();
173 void ChannelProxy::Context::Clear() {
174 listener_ = NULL;
177 // Called on the IPC::Channel thread
178 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
179 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
180 tracked_objects::ScopedTracker tracking_profile(
181 FROM_HERE_WITH_EXPLICIT_FUNCTION(
182 "477117 ChannelProxy::Context::OnSendMessage"));
183 if (!channel_) {
184 OnChannelClosed();
185 return;
188 if (!channel_->Send(message.release()))
189 OnChannelError();
192 // Called on the IPC::Channel thread
193 void ChannelProxy::Context::OnAddFilter() {
194 // Our OnChannelConnected method has not yet been called, so we can't be
195 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
196 // it invokes OnAddFilter, so any pending filter(s) will be added at that
197 // time.
198 if (peer_pid_ == base::kNullProcessId)
199 return;
201 std::vector<scoped_refptr<MessageFilter> > new_filters;
203 base::AutoLock auto_lock(pending_filters_lock_);
204 new_filters.swap(pending_filters_);
207 for (size_t i = 0; i < new_filters.size(); ++i) {
208 filters_.push_back(new_filters[i]);
210 message_filter_router_->AddFilter(new_filters[i].get());
212 // The channel has already been created and connected, so we need to
213 // inform the filters right now.
214 new_filters[i]->OnFilterAdded(channel_.get());
215 new_filters[i]->OnChannelConnected(peer_pid_);
219 // Called on the IPC::Channel thread
220 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
221 if (peer_pid_ == base::kNullProcessId) {
222 // The channel is not yet connected, so any filters are still pending.
223 base::AutoLock auto_lock(pending_filters_lock_);
224 for (size_t i = 0; i < pending_filters_.size(); ++i) {
225 if (pending_filters_[i].get() == filter) {
226 filter->OnFilterRemoved();
227 pending_filters_.erase(pending_filters_.begin() + i);
228 return;
231 return;
233 if (!channel_)
234 return; // The filters have already been deleted.
236 message_filter_router_->RemoveFilter(filter);
238 for (size_t i = 0; i < filters_.size(); ++i) {
239 if (filters_[i].get() == filter) {
240 filter->OnFilterRemoved();
241 filters_.erase(filters_.begin() + i);
242 return;
246 NOTREACHED() << "filter to be removed not found";
249 // Called on the listener's thread
250 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
251 base::AutoLock auto_lock(pending_filters_lock_);
252 pending_filters_.push_back(make_scoped_refptr(filter));
253 ipc_task_runner_->PostTask(
254 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
257 // Called on the listener's thread
258 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
259 #if defined(IPC_MESSAGE_LOG_ENABLED)
260 Logging* logger = Logging::GetInstance();
261 std::string name;
262 logger->GetMessageText(message.type(), &name, &message, NULL);
263 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
264 "name", name);
265 #else
266 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
267 "class", IPC_MESSAGE_ID_CLASS(message.type()),
268 "line", IPC_MESSAGE_ID_LINE(message.type()));
269 #endif
271 if (!listener_)
272 return;
274 OnDispatchConnected();
276 #ifdef IPC_MESSAGE_LOG_ENABLED
277 if (message.type() == IPC_LOGGING_ID) {
278 logger->OnReceivedLoggingMessage(message);
279 return;
282 if (logger->Enabled())
283 logger->OnPreDispatchMessage(message);
284 #endif
286 listener_->OnMessageReceived(message);
287 if (message.dispatch_error())
288 listener_->OnBadMessageReceived(message);
290 #ifdef IPC_MESSAGE_LOG_ENABLED
291 if (logger->Enabled())
292 logger->OnPostDispatchMessage(message, channel_id_);
293 #endif
296 // Called on the listener's thread
297 void ChannelProxy::Context::OnDispatchConnected() {
298 if (channel_connected_called_)
299 return;
301 channel_connected_called_ = true;
302 if (listener_)
303 listener_->OnChannelConnected(peer_pid_);
306 // Called on the listener's thread
307 void ChannelProxy::Context::OnDispatchError() {
308 if (listener_)
309 listener_->OnChannelError();
312 // Called on the listener's thread
313 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
314 if (listener_)
315 listener_->OnBadMessageReceived(message);
318 void ChannelProxy::Context::ClearChannel() {
319 base::AutoLock l(channel_lifetime_lock_);
320 channel_.reset();
323 void ChannelProxy::Context::SendFromThisThread(Message* message) {
324 base::AutoLock l(channel_lifetime_lock_);
325 if (!channel_)
326 return;
327 DCHECK(channel_->IsSendThreadSafe());
328 channel_->Send(message);
331 void ChannelProxy::Context::Send(Message* message) {
332 if (channel_send_thread_safe_) {
333 SendFromThisThread(message);
334 return;
337 ipc_task_runner()->PostTask(
338 FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
339 base::Passed(scoped_ptr<Message>(message))));
342 //-----------------------------------------------------------------------------
344 // static
345 scoped_ptr<ChannelProxy> ChannelProxy::Create(
346 const IPC::ChannelHandle& channel_handle,
347 Channel::Mode mode,
348 Listener* listener,
349 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
350 AttachmentBroker* broker) {
351 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
352 channel->Init(channel_handle, mode, true, broker);
353 return channel.Pass();
356 // static
357 scoped_ptr<ChannelProxy> ChannelProxy::Create(
358 scoped_ptr<ChannelFactory> factory,
359 Listener* listener,
360 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
361 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
362 channel->Init(factory.Pass(), true);
363 return channel.Pass();
366 ChannelProxy::ChannelProxy(Context* context)
367 : context_(context),
368 did_init_(false) {
369 #if defined(ENABLE_IPC_FUZZER)
370 outgoing_message_filter_ = NULL;
371 #endif
374 ChannelProxy::ChannelProxy(
375 Listener* listener,
376 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
377 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
378 #if defined(ENABLE_IPC_FUZZER)
379 outgoing_message_filter_ = NULL;
380 #endif
383 ChannelProxy::~ChannelProxy() {
384 DCHECK(CalledOnValidThread());
386 Close();
389 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
390 Channel::Mode mode,
391 bool create_pipe_now,
392 AttachmentBroker* broker) {
393 #if defined(OS_POSIX)
394 // When we are creating a server on POSIX, we need its file descriptor
395 // to be created immediately so that it can be accessed and passed
396 // to other processes. Forcing it to be created immediately avoids
397 // race conditions that may otherwise arise.
398 if (mode & Channel::MODE_SERVER_FLAG) {
399 create_pipe_now = true;
401 #endif // defined(OS_POSIX)
402 Init(ChannelFactory::Create(channel_handle, mode, broker), create_pipe_now);
405 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
406 bool create_pipe_now) {
407 DCHECK(CalledOnValidThread());
408 DCHECK(!did_init_);
410 if (create_pipe_now) {
411 // Create the channel immediately. This effectively sets up the
412 // low-level pipe so that the client can connect. Without creating
413 // the pipe immediately, it is possible for a listener to attempt
414 // to connect and get an error since the pipe doesn't exist yet.
415 context_->CreateChannel(factory.Pass());
416 } else {
417 context_->ipc_task_runner()->PostTask(
418 FROM_HERE, base::Bind(&Context::CreateChannel,
419 context_.get(), Passed(factory.Pass())));
422 // complete initialization on the background thread
423 context_->ipc_task_runner()->PostTask(
424 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
426 did_init_ = true;
429 void ChannelProxy::Close() {
430 DCHECK(CalledOnValidThread());
432 // Clear the backpointer to the listener so that any pending calls to
433 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
434 // possible that the channel could be closed while it is receiving messages!
435 context_->Clear();
437 if (context_->ipc_task_runner()) {
438 context_->ipc_task_runner()->PostTask(
439 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
443 bool ChannelProxy::Send(Message* message) {
444 DCHECK(did_init_);
446 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
447 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
449 #ifdef ENABLE_IPC_FUZZER
450 // In IPC fuzzing builds, it is possible to define a filter to apply to
451 // outgoing messages. It will either rewrite the message and return a new
452 // one, freeing the original, or return the message unchanged.
453 if (outgoing_message_filter())
454 message = outgoing_message_filter()->Rewrite(message);
455 #endif
457 #ifdef IPC_MESSAGE_LOG_ENABLED
458 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
459 #endif
461 context_->Send(message);
462 return true;
465 void ChannelProxy::AddFilter(MessageFilter* filter) {
466 DCHECK(CalledOnValidThread());
468 context_->AddFilter(filter);
471 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
472 DCHECK(CalledOnValidThread());
474 context_->ipc_task_runner()->PostTask(
475 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
476 make_scoped_refptr(filter)));
479 void ChannelProxy::ClearIPCTaskRunner() {
480 DCHECK(CalledOnValidThread());
482 context()->ClearIPCTaskRunner();
485 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
486 // See the TODO regarding lazy initialization of the channel in
487 // ChannelProxy::Init().
488 int ChannelProxy::GetClientFileDescriptor() {
489 DCHECK(CalledOnValidThread());
491 Channel* channel = context_.get()->channel_.get();
492 // Channel must have been created first.
493 DCHECK(channel) << context_.get()->channel_id_;
494 return channel->GetClientFileDescriptor();
497 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
498 DCHECK(CalledOnValidThread());
500 Channel* channel = context_.get()->channel_.get();
501 // Channel must have been created first.
502 DCHECK(channel) << context_.get()->channel_id_;
503 return channel->TakeClientFileDescriptor();
505 #endif
507 //-----------------------------------------------------------------------------
509 } // namespace IPC