ipc: Make sure that ChannelReader is destroyed correctly.
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blobe48a71ccb57da8cd5f9bd52f28da061225ac48a7
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
22 namespace IPC {
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
27 Listener* listener,
28 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
30 listener_(listener),
31 ipc_task_runner_(ipc_task_runner),
32 channel_connected_called_(false),
33 channel_send_thread_safe_(false),
34 message_filter_router_(new MessageFilterRouter()),
35 peer_pid_(base::kNullProcessId),
36 attachment_broker_endpoint_(false) {
37 DCHECK(ipc_task_runner_.get());
38 // The Listener thread where Messages are handled must be a separate thread
39 // to avoid oversubscribing the IO thread. If you trigger this error, you
40 // need to either:
41 // 1) Create the ChannelProxy on a different thread, or
42 // 2) Just use Channel
43 // Note, we currently make an exception for a NULL listener. That usage
44 // basically works, but is outside the intent of ChannelProxy. This support
45 // will disappear, so please don't rely on it. See crbug.com/364241
46 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
49 ChannelProxy::Context::~Context() {
52 void ChannelProxy::Context::ClearIPCTaskRunner() {
53 ipc_task_runner_ = NULL;
56 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
57 base::AutoLock l(channel_lifetime_lock_);
58 DCHECK(!channel_);
59 channel_id_ = factory->GetName();
60 channel_ = factory->BuildChannel(this);
61 channel_send_thread_safe_ = channel_->IsSendThreadSafe();
62 channel_->SetAttachmentBrokerEndpoint(attachment_broker_endpoint_);
65 bool ChannelProxy::Context::TryFilters(const Message& message) {
66 DCHECK(message_filter_router_);
67 #ifdef IPC_MESSAGE_LOG_ENABLED
68 Logging* logger = Logging::GetInstance();
69 if (logger->Enabled())
70 logger->OnPreDispatchMessage(message);
71 #endif
73 if (message_filter_router_->TryFilters(message)) {
74 if (message.dispatch_error()) {
75 listener_task_runner_->PostTask(
76 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
78 #ifdef IPC_MESSAGE_LOG_ENABLED
79 if (logger->Enabled())
80 logger->OnPostDispatchMessage(message, channel_id_);
81 #endif
82 return true;
84 return false;
87 // Called on the IPC::Channel thread
88 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
89 // First give a chance to the filters to process this message.
90 if (!TryFilters(message))
91 OnMessageReceivedNoFilter(message);
92 return true;
95 // Called on the IPC::Channel thread
96 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
97 listener_task_runner_->PostTask(
98 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
99 return true;
102 // Called on the IPC::Channel thread
103 void ChannelProxy::Context::OnChannelConnected(int32_t peer_pid) {
104 // We cache off the peer_pid so it can be safely accessed from both threads.
105 peer_pid_ = channel_->GetPeerPID();
107 // Add any pending filters. This avoids a race condition where someone
108 // creates a ChannelProxy, calls AddFilter, and then right after starts the
109 // peer process. The IO thread could receive a message before the task to add
110 // the filter is run on the IO thread.
111 OnAddFilter();
113 // See above comment about using listener_task_runner_ here.
114 listener_task_runner_->PostTask(
115 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
118 // Called on the IPC::Channel thread
119 void ChannelProxy::Context::OnChannelError() {
120 for (size_t i = 0; i < filters_.size(); ++i)
121 filters_[i]->OnChannelError();
123 // See above comment about using listener_task_runner_ here.
124 listener_task_runner_->PostTask(
125 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
128 // Called on the IPC::Channel thread
129 void ChannelProxy::Context::OnChannelOpened() {
130 DCHECK(channel_ != NULL);
132 // Assume a reference to ourselves on behalf of this thread. This reference
133 // will be released when we are closed.
134 AddRef();
136 if (!channel_->Connect()) {
137 OnChannelError();
138 return;
141 for (size_t i = 0; i < filters_.size(); ++i)
142 filters_[i]->OnFilterAdded(channel_.get());
145 // Called on the IPC::Channel thread
146 void ChannelProxy::Context::OnChannelClosed() {
147 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
148 tracked_objects::ScopedTracker tracking_profile(
149 FROM_HERE_WITH_EXPLICIT_FUNCTION(
150 "477117 ChannelProxy::Context::OnChannelClosed"));
151 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
152 // would result in this branch being taken.
153 if (!channel_)
154 return;
156 for (size_t i = 0; i < filters_.size(); ++i) {
157 filters_[i]->OnChannelClosing();
158 filters_[i]->OnFilterRemoved();
161 // We don't need the filters anymore.
162 message_filter_router_->Clear();
163 filters_.clear();
164 // We don't need the lock, because at this point, the listener thread can't
165 // access it any more.
166 pending_filters_.clear();
168 ClearChannel();
170 // Balance with the reference taken during startup. This may result in
171 // self-destruction.
172 Release();
175 void ChannelProxy::Context::Clear() {
176 listener_ = NULL;
179 // Called on the IPC::Channel thread
180 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
181 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
182 tracked_objects::ScopedTracker tracking_profile(
183 FROM_HERE_WITH_EXPLICIT_FUNCTION(
184 "477117 ChannelProxy::Context::OnSendMessage"));
185 if (!channel_) {
186 OnChannelClosed();
187 return;
190 if (!channel_->Send(message.release()))
191 OnChannelError();
194 // Called on the IPC::Channel thread
195 void ChannelProxy::Context::OnAddFilter() {
196 // Our OnChannelConnected method has not yet been called, so we can't be
197 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
198 // it invokes OnAddFilter, so any pending filter(s) will be added at that
199 // time.
200 if (peer_pid_ == base::kNullProcessId)
201 return;
203 std::vector<scoped_refptr<MessageFilter> > new_filters;
205 base::AutoLock auto_lock(pending_filters_lock_);
206 new_filters.swap(pending_filters_);
209 for (size_t i = 0; i < new_filters.size(); ++i) {
210 filters_.push_back(new_filters[i]);
212 message_filter_router_->AddFilter(new_filters[i].get());
214 // The channel has already been created and connected, so we need to
215 // inform the filters right now.
216 new_filters[i]->OnFilterAdded(channel_.get());
217 new_filters[i]->OnChannelConnected(peer_pid_);
221 // Called on the IPC::Channel thread
222 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
223 if (peer_pid_ == base::kNullProcessId) {
224 // The channel is not yet connected, so any filters are still pending.
225 base::AutoLock auto_lock(pending_filters_lock_);
226 for (size_t i = 0; i < pending_filters_.size(); ++i) {
227 if (pending_filters_[i].get() == filter) {
228 filter->OnFilterRemoved();
229 pending_filters_.erase(pending_filters_.begin() + i);
230 return;
233 return;
235 if (!channel_)
236 return; // The filters have already been deleted.
238 message_filter_router_->RemoveFilter(filter);
240 for (size_t i = 0; i < filters_.size(); ++i) {
241 if (filters_[i].get() == filter) {
242 filter->OnFilterRemoved();
243 filters_.erase(filters_.begin() + i);
244 return;
248 NOTREACHED() << "filter to be removed not found";
251 // Called on the listener's thread
252 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
253 base::AutoLock auto_lock(pending_filters_lock_);
254 pending_filters_.push_back(make_scoped_refptr(filter));
255 ipc_task_runner_->PostTask(
256 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
259 // Called on the listener's thread
260 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
261 #if defined(IPC_MESSAGE_LOG_ENABLED)
262 Logging* logger = Logging::GetInstance();
263 std::string name;
264 logger->GetMessageText(message.type(), &name, &message, NULL);
265 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
266 "name", name);
267 #else
268 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
269 "class", IPC_MESSAGE_ID_CLASS(message.type()),
270 "line", IPC_MESSAGE_ID_LINE(message.type()));
271 #endif
273 if (!listener_)
274 return;
276 OnDispatchConnected();
278 #ifdef IPC_MESSAGE_LOG_ENABLED
279 if (message.type() == IPC_LOGGING_ID) {
280 logger->OnReceivedLoggingMessage(message);
281 return;
284 if (logger->Enabled())
285 logger->OnPreDispatchMessage(message);
286 #endif
288 listener_->OnMessageReceived(message);
289 if (message.dispatch_error())
290 listener_->OnBadMessageReceived(message);
292 #ifdef IPC_MESSAGE_LOG_ENABLED
293 if (logger->Enabled())
294 logger->OnPostDispatchMessage(message, channel_id_);
295 #endif
298 // Called on the listener's thread
299 void ChannelProxy::Context::OnDispatchConnected() {
300 if (channel_connected_called_)
301 return;
303 channel_connected_called_ = true;
304 if (listener_)
305 listener_->OnChannelConnected(peer_pid_);
308 // Called on the listener's thread
309 void ChannelProxy::Context::OnDispatchError() {
310 if (listener_)
311 listener_->OnChannelError();
314 // Called on the listener's thread
315 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
316 if (listener_)
317 listener_->OnBadMessageReceived(message);
320 void ChannelProxy::Context::ClearChannel() {
321 base::AutoLock l(channel_lifetime_lock_);
322 channel_.reset();
325 void ChannelProxy::Context::SendFromThisThread(Message* message) {
326 base::AutoLock l(channel_lifetime_lock_);
327 if (!channel_)
328 return;
329 DCHECK(channel_->IsSendThreadSafe());
330 channel_->Send(message);
333 void ChannelProxy::Context::Send(Message* message) {
334 if (channel_send_thread_safe_) {
335 SendFromThisThread(message);
336 return;
339 ipc_task_runner()->PostTask(
340 FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, this,
341 base::Passed(scoped_ptr<Message>(message))));
344 bool ChannelProxy::Context::IsChannelSendThreadSafe() const {
345 return channel_send_thread_safe_;
348 //-----------------------------------------------------------------------------
350 // static
351 scoped_ptr<ChannelProxy> ChannelProxy::Create(
352 const IPC::ChannelHandle& channel_handle,
353 Channel::Mode mode,
354 Listener* listener,
355 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
356 AttachmentBroker* broker) {
357 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
358 channel->Init(channel_handle, mode, true, broker);
359 return channel.Pass();
362 // static
363 scoped_ptr<ChannelProxy> ChannelProxy::Create(
364 scoped_ptr<ChannelFactory> factory,
365 Listener* listener,
366 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
367 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
368 channel->Init(factory.Pass(), true);
369 return channel.Pass();
372 ChannelProxy::ChannelProxy(Context* context)
373 : context_(context),
374 did_init_(false) {
375 #if defined(ENABLE_IPC_FUZZER)
376 outgoing_message_filter_ = NULL;
377 #endif
380 ChannelProxy::ChannelProxy(
381 Listener* listener,
382 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
383 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
384 #if defined(ENABLE_IPC_FUZZER)
385 outgoing_message_filter_ = NULL;
386 #endif
389 ChannelProxy::~ChannelProxy() {
390 DCHECK(CalledOnValidThread());
392 Close();
395 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
396 Channel::Mode mode,
397 bool create_pipe_now,
398 AttachmentBroker* broker) {
399 #if defined(OS_POSIX)
400 // When we are creating a server on POSIX, we need its file descriptor
401 // to be created immediately so that it can be accessed and passed
402 // to other processes. Forcing it to be created immediately avoids
403 // race conditions that may otherwise arise.
404 if (mode & Channel::MODE_SERVER_FLAG) {
405 create_pipe_now = true;
407 #endif // defined(OS_POSIX)
408 Init(ChannelFactory::Create(channel_handle, mode, broker), create_pipe_now);
411 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
412 bool create_pipe_now) {
413 DCHECK(CalledOnValidThread());
414 DCHECK(!did_init_);
416 if (create_pipe_now) {
417 // Create the channel immediately. This effectively sets up the
418 // low-level pipe so that the client can connect. Without creating
419 // the pipe immediately, it is possible for a listener to attempt
420 // to connect and get an error since the pipe doesn't exist yet.
421 context_->CreateChannel(factory.Pass());
422 } else {
423 context_->ipc_task_runner()->PostTask(
424 FROM_HERE, base::Bind(&Context::CreateChannel,
425 context_.get(), Passed(factory.Pass())));
428 // complete initialization on the background thread
429 context_->ipc_task_runner()->PostTask(
430 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
432 did_init_ = true;
433 OnChannelInit();
436 void ChannelProxy::Close() {
437 DCHECK(CalledOnValidThread());
439 // Clear the backpointer to the listener so that any pending calls to
440 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
441 // possible that the channel could be closed while it is receiving messages!
442 context_->Clear();
444 if (context_->ipc_task_runner()) {
445 context_->ipc_task_runner()->PostTask(
446 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
450 bool ChannelProxy::Send(Message* message) {
451 DCHECK(did_init_);
453 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
454 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
456 #ifdef ENABLE_IPC_FUZZER
457 // In IPC fuzzing builds, it is possible to define a filter to apply to
458 // outgoing messages. It will either rewrite the message and return a new
459 // one, freeing the original, or return the message unchanged.
460 if (outgoing_message_filter())
461 message = outgoing_message_filter()->Rewrite(message);
462 #endif
464 #ifdef IPC_MESSAGE_LOG_ENABLED
465 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
466 #endif
468 context_->Send(message);
469 return true;
472 void ChannelProxy::AddFilter(MessageFilter* filter) {
473 DCHECK(CalledOnValidThread());
475 context_->AddFilter(filter);
478 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
479 DCHECK(CalledOnValidThread());
481 context_->ipc_task_runner()->PostTask(
482 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
483 make_scoped_refptr(filter)));
486 void ChannelProxy::ClearIPCTaskRunner() {
487 DCHECK(CalledOnValidThread());
489 context()->ClearIPCTaskRunner();
492 base::ProcessId ChannelProxy::GetPeerPID() const {
493 return context_->peer_pid_;
496 void ChannelProxy::OnSetAttachmentBrokerEndpoint() {
497 context()->set_attachment_broker_endpoint(is_attachment_broker_endpoint());
500 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
501 // See the TODO regarding lazy initialization of the channel in
502 // ChannelProxy::Init().
503 int ChannelProxy::GetClientFileDescriptor() {
504 DCHECK(CalledOnValidThread());
506 Channel* channel = context_.get()->channel_.get();
507 // Channel must have been created first.
508 DCHECK(channel) << context_.get()->channel_id_;
509 return channel->GetClientFileDescriptor();
512 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
513 DCHECK(CalledOnValidThread());
515 Channel* channel = context_.get()->channel_.get();
516 // Channel must have been created first.
517 DCHECK(channel) << context_.get()->channel_id_;
518 return channel->TakeClientFileDescriptor();
520 #endif
522 void ChannelProxy::OnChannelInit() {
525 //-----------------------------------------------------------------------------
527 } // namespace IPC