ozone: drm: Fix graphics buffer handle leak in DrmConsoleBuffer
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob28fcbfd3f1757a236e5c37feeac3421ade4cb63e
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
22 namespace IPC {
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
27 Listener* listener,
28 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
30 listener_(listener),
31 ipc_task_runner_(ipc_task_runner),
32 channel_connected_called_(false),
33 message_filter_router_(new MessageFilterRouter()),
34 peer_pid_(base::kNullProcessId) {
35 DCHECK(ipc_task_runner_.get());
36 // The Listener thread where Messages are handled must be a separate thread
37 // to avoid oversubscribing the IO thread. If you trigger this error, you
38 // need to either:
39 // 1) Create the ChannelProxy on a different thread, or
40 // 2) Just use Channel
41 // Note, we currently make an exception for a NULL listener. That usage
42 // basically works, but is outside the intent of ChannelProxy. This support
43 // will disappear, so please don't rely on it. See crbug.com/364241
44 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
47 ChannelProxy::Context::~Context() {
50 void ChannelProxy::Context::ClearIPCTaskRunner() {
51 ipc_task_runner_ = NULL;
54 void ChannelProxy::Context::SetListenerTaskRunner(
55 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
56 DCHECK(ipc_task_runner_.get() != task_runner.get());
57 DCHECK(listener_task_runner_->BelongsToCurrentThread());
58 DCHECK(task_runner->BelongsToCurrentThread());
59 listener_task_runner_ = task_runner;
62 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
63 DCHECK(!channel_);
64 channel_id_ = factory->GetName();
65 channel_ = factory->BuildChannel(this);
68 bool ChannelProxy::Context::TryFilters(const Message& message) {
69 DCHECK(message_filter_router_);
70 #ifdef IPC_MESSAGE_LOG_ENABLED
71 Logging* logger = Logging::GetInstance();
72 if (logger->Enabled())
73 logger->OnPreDispatchMessage(message);
74 #endif
76 if (message_filter_router_->TryFilters(message)) {
77 if (message.dispatch_error()) {
78 listener_task_runner_->PostTask(
79 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
81 #ifdef IPC_MESSAGE_LOG_ENABLED
82 if (logger->Enabled())
83 logger->OnPostDispatchMessage(message, channel_id_);
84 #endif
85 return true;
87 return false;
90 // Called on the IPC::Channel thread
91 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
92 // First give a chance to the filters to process this message.
93 if (!TryFilters(message))
94 OnMessageReceivedNoFilter(message);
95 return true;
98 // Called on the IPC::Channel thread
99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
100 listener_task_runner_->PostTask(
101 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
102 return true;
105 // Called on the IPC::Channel thread
106 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
107 // We cache off the peer_pid so it can be safely accessed from both threads.
108 peer_pid_ = channel_->GetPeerPID();
110 // Add any pending filters. This avoids a race condition where someone
111 // creates a ChannelProxy, calls AddFilter, and then right after starts the
112 // peer process. The IO thread could receive a message before the task to add
113 // the filter is run on the IO thread.
114 OnAddFilter();
116 // See above comment about using listener_task_runner_ here.
117 listener_task_runner_->PostTask(
118 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
121 // Called on the IPC::Channel thread
122 void ChannelProxy::Context::OnChannelError() {
123 for (size_t i = 0; i < filters_.size(); ++i)
124 filters_[i]->OnChannelError();
126 // See above comment about using listener_task_runner_ here.
127 listener_task_runner_->PostTask(
128 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
131 // Called on the IPC::Channel thread
132 void ChannelProxy::Context::OnChannelOpened() {
133 DCHECK(channel_ != NULL);
135 // Assume a reference to ourselves on behalf of this thread. This reference
136 // will be released when we are closed.
137 AddRef();
139 if (!channel_->Connect()) {
140 OnChannelError();
141 return;
144 for (size_t i = 0; i < filters_.size(); ++i)
145 filters_[i]->OnFilterAdded(channel_.get());
148 // Called on the IPC::Channel thread
149 void ChannelProxy::Context::OnChannelClosed() {
150 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
151 tracked_objects::ScopedTracker tracking_profile(
152 FROM_HERE_WITH_EXPLICIT_FUNCTION(
153 "477117 ChannelProxy::Context::OnChannelClosed"));
154 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
155 // would result in this branch being taken.
156 if (!channel_)
157 return;
159 for (size_t i = 0; i < filters_.size(); ++i) {
160 filters_[i]->OnChannelClosing();
161 filters_[i]->OnFilterRemoved();
164 // We don't need the filters anymore.
165 message_filter_router_->Clear();
166 filters_.clear();
167 // We don't need the lock, because at this point, the listener thread can't
168 // access it any more.
169 pending_filters_.clear();
171 channel_.reset();
173 // Balance with the reference taken during startup. This may result in
174 // self-destruction.
175 Release();
178 void ChannelProxy::Context::Clear() {
179 listener_ = NULL;
182 // Called on the IPC::Channel thread
183 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
184 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
185 tracked_objects::ScopedTracker tracking_profile(
186 FROM_HERE_WITH_EXPLICIT_FUNCTION(
187 "477117 ChannelProxy::Context::OnSendMessage"));
188 if (!channel_) {
189 OnChannelClosed();
190 return;
193 if (!channel_->Send(message.release()))
194 OnChannelError();
197 // Called on the IPC::Channel thread
198 void ChannelProxy::Context::OnAddFilter() {
199 // Our OnChannelConnected method has not yet been called, so we can't be
200 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
201 // it invokes OnAddFilter, so any pending filter(s) will be added at that
202 // time.
203 if (peer_pid_ == base::kNullProcessId)
204 return;
206 std::vector<scoped_refptr<MessageFilter> > new_filters;
208 base::AutoLock auto_lock(pending_filters_lock_);
209 new_filters.swap(pending_filters_);
212 for (size_t i = 0; i < new_filters.size(); ++i) {
213 filters_.push_back(new_filters[i]);
215 message_filter_router_->AddFilter(new_filters[i].get());
217 // The channel has already been created and connected, so we need to
218 // inform the filters right now.
219 new_filters[i]->OnFilterAdded(channel_.get());
220 new_filters[i]->OnChannelConnected(peer_pid_);
224 // Called on the IPC::Channel thread
225 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
226 if (peer_pid_ == base::kNullProcessId) {
227 // The channel is not yet connected, so any filters are still pending.
228 base::AutoLock auto_lock(pending_filters_lock_);
229 for (size_t i = 0; i < pending_filters_.size(); ++i) {
230 if (pending_filters_[i].get() == filter) {
231 filter->OnFilterRemoved();
232 pending_filters_.erase(pending_filters_.begin() + i);
233 return;
236 return;
238 if (!channel_)
239 return; // The filters have already been deleted.
241 message_filter_router_->RemoveFilter(filter);
243 for (size_t i = 0; i < filters_.size(); ++i) {
244 if (filters_[i].get() == filter) {
245 filter->OnFilterRemoved();
246 filters_.erase(filters_.begin() + i);
247 return;
251 NOTREACHED() << "filter to be removed not found";
254 // Called on the listener's thread
255 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
256 base::AutoLock auto_lock(pending_filters_lock_);
257 pending_filters_.push_back(make_scoped_refptr(filter));
258 ipc_task_runner_->PostTask(
259 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
262 // Called on the listener's thread
263 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
264 #if defined(IPC_MESSAGE_LOG_ENABLED)
265 Logging* logger = Logging::GetInstance();
266 std::string name;
267 logger->GetMessageText(message.type(), &name, &message, NULL);
268 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
269 "name", name);
270 #else
271 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
272 "class", IPC_MESSAGE_ID_CLASS(message.type()),
273 "line", IPC_MESSAGE_ID_LINE(message.type()));
274 #endif
276 if (!listener_)
277 return;
279 OnDispatchConnected();
281 #ifdef IPC_MESSAGE_LOG_ENABLED
282 if (message.type() == IPC_LOGGING_ID) {
283 logger->OnReceivedLoggingMessage(message);
284 return;
287 if (logger->Enabled())
288 logger->OnPreDispatchMessage(message);
289 #endif
291 listener_->OnMessageReceived(message);
292 if (message.dispatch_error())
293 listener_->OnBadMessageReceived(message);
295 #ifdef IPC_MESSAGE_LOG_ENABLED
296 if (logger->Enabled())
297 logger->OnPostDispatchMessage(message, channel_id_);
298 #endif
301 // Called on the listener's thread
302 void ChannelProxy::Context::OnDispatchConnected() {
303 if (channel_connected_called_)
304 return;
306 channel_connected_called_ = true;
307 if (listener_)
308 listener_->OnChannelConnected(peer_pid_);
311 // Called on the listener's thread
312 void ChannelProxy::Context::OnDispatchError() {
313 if (listener_)
314 listener_->OnChannelError();
317 // Called on the listener's thread
318 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
319 if (listener_)
320 listener_->OnBadMessageReceived(message);
323 //-----------------------------------------------------------------------------
325 // static
326 scoped_ptr<ChannelProxy> ChannelProxy::Create(
327 const IPC::ChannelHandle& channel_handle,
328 Channel::Mode mode,
329 Listener* listener,
330 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
331 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
332 channel->Init(channel_handle, mode, true);
333 return channel.Pass();
336 // static
337 scoped_ptr<ChannelProxy> ChannelProxy::Create(
338 scoped_ptr<ChannelFactory> factory,
339 Listener* listener,
340 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
341 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
342 channel->Init(factory.Pass(), true);
343 return channel.Pass();
346 ChannelProxy::ChannelProxy(Context* context)
347 : context_(context),
348 did_init_(false) {
349 #if defined(ENABLE_IPC_FUZZER)
350 outgoing_message_filter_ = NULL;
351 #endif
354 ChannelProxy::ChannelProxy(
355 Listener* listener,
356 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
357 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
358 #if defined(ENABLE_IPC_FUZZER)
359 outgoing_message_filter_ = NULL;
360 #endif
363 ChannelProxy::~ChannelProxy() {
364 DCHECK(CalledOnValidThread());
366 Close();
369 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
370 Channel::Mode mode,
371 bool create_pipe_now) {
372 #if defined(OS_POSIX)
373 // When we are creating a server on POSIX, we need its file descriptor
374 // to be created immediately so that it can be accessed and passed
375 // to other processes. Forcing it to be created immediately avoids
376 // race conditions that may otherwise arise.
377 if (mode & Channel::MODE_SERVER_FLAG) {
378 create_pipe_now = true;
380 #endif // defined(OS_POSIX)
381 Init(ChannelFactory::Create(channel_handle, mode),
382 create_pipe_now);
385 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
386 bool create_pipe_now) {
387 DCHECK(CalledOnValidThread());
388 DCHECK(!did_init_);
390 if (create_pipe_now) {
391 // Create the channel immediately. This effectively sets up the
392 // low-level pipe so that the client can connect. Without creating
393 // the pipe immediately, it is possible for a listener to attempt
394 // to connect and get an error since the pipe doesn't exist yet.
395 context_->CreateChannel(factory.Pass());
396 } else {
397 context_->ipc_task_runner()->PostTask(
398 FROM_HERE, base::Bind(&Context::CreateChannel,
399 context_.get(), Passed(factory.Pass())));
402 // complete initialization on the background thread
403 context_->ipc_task_runner()->PostTask(
404 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
406 did_init_ = true;
409 void ChannelProxy::Close() {
410 DCHECK(CalledOnValidThread());
412 // Clear the backpointer to the listener so that any pending calls to
413 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
414 // possible that the channel could be closed while it is receiving messages!
415 context_->Clear();
417 if (context_->ipc_task_runner()) {
418 context_->ipc_task_runner()->PostTask(
419 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
423 bool ChannelProxy::Send(Message* message) {
424 DCHECK(did_init_);
426 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
427 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
429 #ifdef ENABLE_IPC_FUZZER
430 // In IPC fuzzing builds, it is possible to define a filter to apply to
431 // outgoing messages. It will either rewrite the message and return a new
432 // one, freeing the original, or return the message unchanged.
433 if (outgoing_message_filter())
434 message = outgoing_message_filter()->Rewrite(message);
435 #endif
437 #ifdef IPC_MESSAGE_LOG_ENABLED
438 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
439 #endif
441 context_->ipc_task_runner()->PostTask(
442 FROM_HERE,
443 base::Bind(&ChannelProxy::Context::OnSendMessage,
444 context_, base::Passed(scoped_ptr<Message>(message))));
445 return true;
448 void ChannelProxy::AddFilter(MessageFilter* filter) {
449 DCHECK(CalledOnValidThread());
451 context_->AddFilter(filter);
454 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
455 DCHECK(CalledOnValidThread());
457 context_->ipc_task_runner()->PostTask(
458 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
459 make_scoped_refptr(filter)));
462 void ChannelProxy::SetListenerTaskRunner(
463 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
464 DCHECK(CalledOnValidThread());
466 context()->SetListenerTaskRunner(task_runner);
469 void ChannelProxy::ClearIPCTaskRunner() {
470 DCHECK(CalledOnValidThread());
472 context()->ClearIPCTaskRunner();
475 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
476 // See the TODO regarding lazy initialization of the channel in
477 // ChannelProxy::Init().
478 int ChannelProxy::GetClientFileDescriptor() {
479 DCHECK(CalledOnValidThread());
481 Channel* channel = context_.get()->channel_.get();
482 // Channel must have been created first.
483 DCHECK(channel) << context_.get()->channel_id_;
484 return channel->GetClientFileDescriptor();
487 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
488 DCHECK(CalledOnValidThread());
490 Channel* channel = context_.get()->channel_.get();
491 // Channel must have been created first.
492 DCHECK(channel) << context_.get()->channel_id_;
493 return channel->TakeClientFileDescriptor();
495 #endif
497 //-----------------------------------------------------------------------------
499 } // namespace IPC