Add in-memory caching of network quality estimates across network changes.
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob269fd6a8a6241d6ccc647e319ce155d568a2908d
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
22 namespace IPC {
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
27 Listener* listener,
28 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
30 listener_(listener),
31 ipc_task_runner_(ipc_task_runner),
32 channel_connected_called_(false),
33 channel_send_thread_safe_(false),
34 message_filter_router_(new MessageFilterRouter()),
35 peer_pid_(base::kNullProcessId) {
36 DCHECK(ipc_task_runner_.get());
37 // The Listener thread where Messages are handled must be a separate thread
38 // to avoid oversubscribing the IO thread. If you trigger this error, you
39 // need to either:
40 // 1) Create the ChannelProxy on a different thread, or
41 // 2) Just use Channel
42 // Note, we currently make an exception for a NULL listener. That usage
43 // basically works, but is outside the intent of ChannelProxy. This support
44 // will disappear, so please don't rely on it. See crbug.com/364241
45 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
48 ChannelProxy::Context::~Context() {
51 void ChannelProxy::Context::ClearIPCTaskRunner() {
52 ipc_task_runner_ = NULL;
55 void ChannelProxy::Context::SetListenerTaskRunner(
56 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
57 DCHECK(ipc_task_runner_.get() != task_runner.get());
58 DCHECK(listener_task_runner_->BelongsToCurrentThread());
59 DCHECK(task_runner->BelongsToCurrentThread());
60 listener_task_runner_ = task_runner;
63 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
64 base::AutoLock l(channel_lifetime_lock_);
65 DCHECK(!channel_);
66 channel_id_ = factory->GetName();
67 channel_ = factory->BuildChannel(this);
68 channel_send_thread_safe_ = channel_->IsSendThreadSafe();
71 bool ChannelProxy::Context::TryFilters(const Message& message) {
72 DCHECK(message_filter_router_);
73 #ifdef IPC_MESSAGE_LOG_ENABLED
74 Logging* logger = Logging::GetInstance();
75 if (logger->Enabled())
76 logger->OnPreDispatchMessage(message);
77 #endif
79 if (message_filter_router_->TryFilters(message)) {
80 if (message.dispatch_error()) {
81 listener_task_runner_->PostTask(
82 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
84 #ifdef IPC_MESSAGE_LOG_ENABLED
85 if (logger->Enabled())
86 logger->OnPostDispatchMessage(message, channel_id_);
87 #endif
88 return true;
90 return false;
93 // Called on the IPC::Channel thread
94 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
95 // First give a chance to the filters to process this message.
96 if (!TryFilters(message))
97 OnMessageReceivedNoFilter(message);
98 return true;
101 // Called on the IPC::Channel thread
102 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
103 listener_task_runner_->PostTask(
104 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
105 return true;
108 // Called on the IPC::Channel thread
109 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
110 // We cache off the peer_pid so it can be safely accessed from both threads.
111 peer_pid_ = channel_->GetPeerPID();
113 // Add any pending filters. This avoids a race condition where someone
114 // creates a ChannelProxy, calls AddFilter, and then right after starts the
115 // peer process. The IO thread could receive a message before the task to add
116 // the filter is run on the IO thread.
117 OnAddFilter();
119 // See above comment about using listener_task_runner_ here.
120 listener_task_runner_->PostTask(
121 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
124 // Called on the IPC::Channel thread
125 void ChannelProxy::Context::OnChannelError() {
126 for (size_t i = 0; i < filters_.size(); ++i)
127 filters_[i]->OnChannelError();
129 // See above comment about using listener_task_runner_ here.
130 listener_task_runner_->PostTask(
131 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
134 // Called on the IPC::Channel thread
135 void ChannelProxy::Context::OnChannelOpened() {
136 DCHECK(channel_ != NULL);
138 // Assume a reference to ourselves on behalf of this thread. This reference
139 // will be released when we are closed.
140 AddRef();
142 if (!channel_->Connect()) {
143 OnChannelError();
144 return;
147 for (size_t i = 0; i < filters_.size(); ++i)
148 filters_[i]->OnFilterAdded(channel_.get());
151 // Called on the IPC::Channel thread
152 void ChannelProxy::Context::OnChannelClosed() {
153 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
154 tracked_objects::ScopedTracker tracking_profile(
155 FROM_HERE_WITH_EXPLICIT_FUNCTION(
156 "477117 ChannelProxy::Context::OnChannelClosed"));
157 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
158 // would result in this branch being taken.
159 if (!channel_)
160 return;
162 for (size_t i = 0; i < filters_.size(); ++i) {
163 filters_[i]->OnChannelClosing();
164 filters_[i]->OnFilterRemoved();
167 // We don't need the filters anymore.
168 message_filter_router_->Clear();
169 filters_.clear();
170 // We don't need the lock, because at this point, the listener thread can't
171 // access it any more.
172 pending_filters_.clear();
174 ClearChannel();
176 // Balance with the reference taken during startup. This may result in
177 // self-destruction.
178 Release();
181 void ChannelProxy::Context::Clear() {
182 listener_ = NULL;
185 // Called on the IPC::Channel thread
186 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
187 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
188 tracked_objects::ScopedTracker tracking_profile(
189 FROM_HERE_WITH_EXPLICIT_FUNCTION(
190 "477117 ChannelProxy::Context::OnSendMessage"));
191 if (!channel_) {
192 OnChannelClosed();
193 return;
196 if (!channel_->Send(message.release()))
197 OnChannelError();
200 // Called on the IPC::Channel thread
201 void ChannelProxy::Context::OnAddFilter() {
202 // Our OnChannelConnected method has not yet been called, so we can't be
203 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
204 // it invokes OnAddFilter, so any pending filter(s) will be added at that
205 // time.
206 if (peer_pid_ == base::kNullProcessId)
207 return;
209 std::vector<scoped_refptr<MessageFilter> > new_filters;
211 base::AutoLock auto_lock(pending_filters_lock_);
212 new_filters.swap(pending_filters_);
215 for (size_t i = 0; i < new_filters.size(); ++i) {
216 filters_.push_back(new_filters[i]);
218 message_filter_router_->AddFilter(new_filters[i].get());
220 // The channel has already been created and connected, so we need to
221 // inform the filters right now.
222 new_filters[i]->OnFilterAdded(channel_.get());
223 new_filters[i]->OnChannelConnected(peer_pid_);
227 // Called on the IPC::Channel thread
228 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
229 if (peer_pid_ == base::kNullProcessId) {
230 // The channel is not yet connected, so any filters are still pending.
231 base::AutoLock auto_lock(pending_filters_lock_);
232 for (size_t i = 0; i < pending_filters_.size(); ++i) {
233 if (pending_filters_[i].get() == filter) {
234 filter->OnFilterRemoved();
235 pending_filters_.erase(pending_filters_.begin() + i);
236 return;
239 return;
241 if (!channel_)
242 return; // The filters have already been deleted.
244 message_filter_router_->RemoveFilter(filter);
246 for (size_t i = 0; i < filters_.size(); ++i) {
247 if (filters_[i].get() == filter) {
248 filter->OnFilterRemoved();
249 filters_.erase(filters_.begin() + i);
250 return;
254 NOTREACHED() << "filter to be removed not found";
257 // Called on the listener's thread
258 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
259 base::AutoLock auto_lock(pending_filters_lock_);
260 pending_filters_.push_back(make_scoped_refptr(filter));
261 ipc_task_runner_->PostTask(
262 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
265 // Called on the listener's thread
266 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
267 #if defined(IPC_MESSAGE_LOG_ENABLED)
268 Logging* logger = Logging::GetInstance();
269 std::string name;
270 logger->GetMessageText(message.type(), &name, &message, NULL);
271 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
272 "name", name);
273 #else
274 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
275 "class", IPC_MESSAGE_ID_CLASS(message.type()),
276 "line", IPC_MESSAGE_ID_LINE(message.type()));
277 #endif
279 if (!listener_)
280 return;
282 OnDispatchConnected();
284 #ifdef IPC_MESSAGE_LOG_ENABLED
285 if (message.type() == IPC_LOGGING_ID) {
286 logger->OnReceivedLoggingMessage(message);
287 return;
290 if (logger->Enabled())
291 logger->OnPreDispatchMessage(message);
292 #endif
294 listener_->OnMessageReceived(message);
295 if (message.dispatch_error())
296 listener_->OnBadMessageReceived(message);
298 #ifdef IPC_MESSAGE_LOG_ENABLED
299 if (logger->Enabled())
300 logger->OnPostDispatchMessage(message, channel_id_);
301 #endif
304 // Called on the listener's thread
305 void ChannelProxy::Context::OnDispatchConnected() {
306 if (channel_connected_called_)
307 return;
309 channel_connected_called_ = true;
310 if (listener_)
311 listener_->OnChannelConnected(peer_pid_);
314 // Called on the listener's thread
315 void ChannelProxy::Context::OnDispatchError() {
316 if (listener_)
317 listener_->OnChannelError();
320 // Called on the listener's thread
321 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
322 if (listener_)
323 listener_->OnBadMessageReceived(message);
326 void ChannelProxy::Context::ClearChannel() {
327 base::AutoLock l(channel_lifetime_lock_);
328 channel_.reset();
331 void ChannelProxy::Context::SendFromThisThread(Message* message) {
332 base::AutoLock l(channel_lifetime_lock_);
333 if (!channel_)
334 return;
335 DCHECK(channel_->IsSendThreadSafe());
336 channel_->Send(message);
339 void ChannelProxy::Context::Send(Message* message) {
340 if (channel_send_thread_safe_) {
341 SendFromThisThread(message);
342 return;
345 ipc_task_runner()->PostTask(
346 FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
347 base::Passed(scoped_ptr<Message>(message))));
350 //-----------------------------------------------------------------------------
352 // static
353 scoped_ptr<ChannelProxy> ChannelProxy::Create(
354 const IPC::ChannelHandle& channel_handle,
355 Channel::Mode mode,
356 Listener* listener,
357 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
358 AttachmentBroker* broker) {
359 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
360 channel->Init(channel_handle, mode, true, broker);
361 return channel.Pass();
364 // static
365 scoped_ptr<ChannelProxy> ChannelProxy::Create(
366 scoped_ptr<ChannelFactory> factory,
367 Listener* listener,
368 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
369 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
370 channel->Init(factory.Pass(), true);
371 return channel.Pass();
374 ChannelProxy::ChannelProxy(Context* context)
375 : context_(context),
376 did_init_(false) {
377 #if defined(ENABLE_IPC_FUZZER)
378 outgoing_message_filter_ = NULL;
379 #endif
382 ChannelProxy::ChannelProxy(
383 Listener* listener,
384 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
385 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
386 #if defined(ENABLE_IPC_FUZZER)
387 outgoing_message_filter_ = NULL;
388 #endif
391 ChannelProxy::~ChannelProxy() {
392 DCHECK(CalledOnValidThread());
394 Close();
397 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
398 Channel::Mode mode,
399 bool create_pipe_now,
400 AttachmentBroker* broker) {
401 #if defined(OS_POSIX)
402 // When we are creating a server on POSIX, we need its file descriptor
403 // to be created immediately so that it can be accessed and passed
404 // to other processes. Forcing it to be created immediately avoids
405 // race conditions that may otherwise arise.
406 if (mode & Channel::MODE_SERVER_FLAG) {
407 create_pipe_now = true;
409 #endif // defined(OS_POSIX)
410 Init(ChannelFactory::Create(channel_handle, mode, broker), create_pipe_now);
413 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
414 bool create_pipe_now) {
415 DCHECK(CalledOnValidThread());
416 DCHECK(!did_init_);
418 if (create_pipe_now) {
419 // Create the channel immediately. This effectively sets up the
420 // low-level pipe so that the client can connect. Without creating
421 // the pipe immediately, it is possible for a listener to attempt
422 // to connect and get an error since the pipe doesn't exist yet.
423 context_->CreateChannel(factory.Pass());
424 } else {
425 context_->ipc_task_runner()->PostTask(
426 FROM_HERE, base::Bind(&Context::CreateChannel,
427 context_.get(), Passed(factory.Pass())));
430 // complete initialization on the background thread
431 context_->ipc_task_runner()->PostTask(
432 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
434 did_init_ = true;
437 void ChannelProxy::Close() {
438 DCHECK(CalledOnValidThread());
440 // Clear the backpointer to the listener so that any pending calls to
441 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
442 // possible that the channel could be closed while it is receiving messages!
443 context_->Clear();
445 if (context_->ipc_task_runner()) {
446 context_->ipc_task_runner()->PostTask(
447 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
451 bool ChannelProxy::Send(Message* message) {
452 DCHECK(did_init_);
454 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
455 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
457 #ifdef ENABLE_IPC_FUZZER
458 // In IPC fuzzing builds, it is possible to define a filter to apply to
459 // outgoing messages. It will either rewrite the message and return a new
460 // one, freeing the original, or return the message unchanged.
461 if (outgoing_message_filter())
462 message = outgoing_message_filter()->Rewrite(message);
463 #endif
465 #ifdef IPC_MESSAGE_LOG_ENABLED
466 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
467 #endif
469 context_->Send(message);
470 return true;
473 void ChannelProxy::AddFilter(MessageFilter* filter) {
474 DCHECK(CalledOnValidThread());
476 context_->AddFilter(filter);
479 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
480 DCHECK(CalledOnValidThread());
482 context_->ipc_task_runner()->PostTask(
483 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
484 make_scoped_refptr(filter)));
487 void ChannelProxy::SetListenerTaskRunner(
488 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
489 DCHECK(CalledOnValidThread());
491 context()->SetListenerTaskRunner(task_runner);
494 void ChannelProxy::ClearIPCTaskRunner() {
495 DCHECK(CalledOnValidThread());
497 context()->ClearIPCTaskRunner();
500 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
501 // See the TODO regarding lazy initialization of the channel in
502 // ChannelProxy::Init().
503 int ChannelProxy::GetClientFileDescriptor() {
504 DCHECK(CalledOnValidThread());
506 Channel* channel = context_.get()->channel_.get();
507 // Channel must have been created first.
508 DCHECK(channel) << context_.get()->channel_id_;
509 return channel->GetClientFileDescriptor();
512 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
513 DCHECK(CalledOnValidThread());
515 Channel* channel = context_.get()->channel_.get();
516 // Channel must have been created first.
517 DCHECK(channel) << context_.get()->channel_id_;
518 return channel->TakeClientFileDescriptor();
520 #endif
522 //-----------------------------------------------------------------------------
524 } // namespace IPC