Adding yfriedman@ as owner for the enhanced_bookmarks component.
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob2ea722df79b086f1cd1ba6f728a53052026bae07
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_channel_factory.h"
15 #include "ipc/ipc_listener.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/message_filter.h"
19 #include "ipc/message_filter_router.h"
21 namespace IPC {
23 //------------------------------------------------------------------------------
25 ChannelProxy::Context::Context(Listener* listener,
26 base::SingleThreadTaskRunner* ipc_task_runner)
27 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
28 listener_(listener),
29 ipc_task_runner_(ipc_task_runner),
30 channel_connected_called_(false),
31 message_filter_router_(new MessageFilterRouter()),
32 peer_pid_(base::kNullProcessId) {
33 DCHECK(ipc_task_runner_.get());
34 // The Listener thread where Messages are handled must be a separate thread
35 // to avoid oversubscribing the IO thread. If you trigger this error, you
36 // need to either:
37 // 1) Create the ChannelProxy on a different thread, or
38 // 2) Just use Channel
39 // Note, we currently make an exception for a NULL listener. That usage
40 // basically works, but is outside the intent of ChannelProxy. This support
41 // will disappear, so please don't rely on it. See crbug.com/364241
42 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
45 ChannelProxy::Context::~Context() {
48 void ChannelProxy::Context::ClearIPCTaskRunner() {
49 ipc_task_runner_ = NULL;
52 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
53 DCHECK(!channel_);
54 channel_id_ = factory->GetName();
55 channel_ = factory->BuildChannel(this);
58 bool ChannelProxy::Context::TryFilters(const Message& message) {
59 DCHECK(message_filter_router_);
60 #ifdef IPC_MESSAGE_LOG_ENABLED
61 Logging* logger = Logging::GetInstance();
62 if (logger->Enabled())
63 logger->OnPreDispatchMessage(message);
64 #endif
66 if (message_filter_router_->TryFilters(message)) {
67 if (message.dispatch_error()) {
68 listener_task_runner_->PostTask(
69 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
71 #ifdef IPC_MESSAGE_LOG_ENABLED
72 if (logger->Enabled())
73 logger->OnPostDispatchMessage(message, channel_id_);
74 #endif
75 return true;
77 return false;
80 // Called on the IPC::Channel thread
81 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
82 // First give a chance to the filters to process this message.
83 if (!TryFilters(message))
84 OnMessageReceivedNoFilter(message);
85 return true;
88 // Called on the IPC::Channel thread
89 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
90 listener_task_runner_->PostTask(
91 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
92 return true;
95 // Called on the IPC::Channel thread
96 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
97 // We cache off the peer_pid so it can be safely accessed from both threads.
98 peer_pid_ = channel_->GetPeerPID();
100 // Add any pending filters. This avoids a race condition where someone
101 // creates a ChannelProxy, calls AddFilter, and then right after starts the
102 // peer process. The IO thread could receive a message before the task to add
103 // the filter is run on the IO thread.
104 OnAddFilter();
106 // See above comment about using listener_task_runner_ here.
107 listener_task_runner_->PostTask(
108 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
111 // Called on the IPC::Channel thread
112 void ChannelProxy::Context::OnChannelError() {
113 for (size_t i = 0; i < filters_.size(); ++i)
114 filters_[i]->OnChannelError();
116 // See above comment about using listener_task_runner_ here.
117 listener_task_runner_->PostTask(
118 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
121 // Called on the IPC::Channel thread
122 void ChannelProxy::Context::OnChannelOpened() {
123 DCHECK(channel_ != NULL);
125 // Assume a reference to ourselves on behalf of this thread. This reference
126 // will be released when we are closed.
127 AddRef();
129 if (!channel_->Connect()) {
130 OnChannelError();
131 return;
134 for (size_t i = 0; i < filters_.size(); ++i)
135 filters_[i]->OnFilterAdded(channel_.get());
138 // Called on the IPC::Channel thread
139 void ChannelProxy::Context::OnChannelClosed() {
140 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
141 // would result in this branch being taken.
142 if (!channel_)
143 return;
145 for (size_t i = 0; i < filters_.size(); ++i) {
146 filters_[i]->OnChannelClosing();
147 filters_[i]->OnFilterRemoved();
150 // We don't need the filters anymore.
151 message_filter_router_->Clear();
152 filters_.clear();
153 // We don't need the lock, because at this point, the listener thread can't
154 // access it any more.
155 pending_filters_.clear();
157 channel_.reset();
159 // Balance with the reference taken during startup. This may result in
160 // self-destruction.
161 Release();
164 void ChannelProxy::Context::Clear() {
165 listener_ = NULL;
168 // Called on the IPC::Channel thread
169 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
170 if (!channel_) {
171 OnChannelClosed();
172 return;
175 if (!channel_->Send(message.release()))
176 OnChannelError();
179 // Called on the IPC::Channel thread
180 void ChannelProxy::Context::OnAddFilter() {
181 // Our OnChannelConnected method has not yet been called, so we can't be
182 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
183 // it invokes OnAddFilter, so any pending filter(s) will be added at that
184 // time.
185 if (peer_pid_ == base::kNullProcessId)
186 return;
188 std::vector<scoped_refptr<MessageFilter> > new_filters;
190 base::AutoLock auto_lock(pending_filters_lock_);
191 new_filters.swap(pending_filters_);
194 for (size_t i = 0; i < new_filters.size(); ++i) {
195 filters_.push_back(new_filters[i]);
197 message_filter_router_->AddFilter(new_filters[i].get());
199 // The channel has already been created and connected, so we need to
200 // inform the filters right now.
201 new_filters[i]->OnFilterAdded(channel_.get());
202 new_filters[i]->OnChannelConnected(peer_pid_);
206 // Called on the IPC::Channel thread
207 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
208 if (peer_pid_ == base::kNullProcessId) {
209 // The channel is not yet connected, so any filters are still pending.
210 base::AutoLock auto_lock(pending_filters_lock_);
211 for (size_t i = 0; i < pending_filters_.size(); ++i) {
212 if (pending_filters_[i].get() == filter) {
213 filter->OnFilterRemoved();
214 pending_filters_.erase(pending_filters_.begin() + i);
215 return;
218 return;
220 if (!channel_)
221 return; // The filters have already been deleted.
223 message_filter_router_->RemoveFilter(filter);
225 for (size_t i = 0; i < filters_.size(); ++i) {
226 if (filters_[i].get() == filter) {
227 filter->OnFilterRemoved();
228 filters_.erase(filters_.begin() + i);
229 return;
233 NOTREACHED() << "filter to be removed not found";
236 // Called on the listener's thread
237 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
238 base::AutoLock auto_lock(pending_filters_lock_);
239 pending_filters_.push_back(make_scoped_refptr(filter));
240 ipc_task_runner_->PostTask(
241 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
244 // Called on the listener's thread
245 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
246 #ifdef IPC_MESSAGE_LOG_ENABLED
247 Logging* logger = Logging::GetInstance();
248 std::string name;
249 logger->GetMessageText(message.type(), &name, &message, NULL);
250 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
251 "name", name);
252 #else
253 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
254 "class", IPC_MESSAGE_ID_CLASS(message.type()),
255 "line", IPC_MESSAGE_ID_LINE(message.type()));
256 #endif
258 if (!listener_)
259 return;
261 OnDispatchConnected();
263 #ifdef IPC_MESSAGE_LOG_ENABLED
264 if (message.type() == IPC_LOGGING_ID) {
265 logger->OnReceivedLoggingMessage(message);
266 return;
269 if (logger->Enabled())
270 logger->OnPreDispatchMessage(message);
271 #endif
273 listener_->OnMessageReceived(message);
274 if (message.dispatch_error())
275 listener_->OnBadMessageReceived(message);
277 #ifdef IPC_MESSAGE_LOG_ENABLED
278 if (logger->Enabled())
279 logger->OnPostDispatchMessage(message, channel_id_);
280 #endif
283 // Called on the listener's thread
284 void ChannelProxy::Context::OnDispatchConnected() {
285 if (channel_connected_called_)
286 return;
288 channel_connected_called_ = true;
289 if (listener_)
290 listener_->OnChannelConnected(peer_pid_);
293 // Called on the listener's thread
294 void ChannelProxy::Context::OnDispatchError() {
295 if (listener_)
296 listener_->OnChannelError();
299 // Called on the listener's thread
300 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
301 if (listener_)
302 listener_->OnBadMessageReceived(message);
305 //-----------------------------------------------------------------------------
307 // static
308 scoped_ptr<ChannelProxy> ChannelProxy::Create(
309 const IPC::ChannelHandle& channel_handle,
310 Channel::Mode mode,
311 Listener* listener,
312 base::SingleThreadTaskRunner* ipc_task_runner) {
313 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
314 channel->Init(channel_handle, mode, true);
315 return channel.Pass();
318 // static
319 scoped_ptr<ChannelProxy> ChannelProxy::Create(
320 scoped_ptr<ChannelFactory> factory,
321 Listener* listener,
322 base::SingleThreadTaskRunner* ipc_task_runner) {
323 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
324 channel->Init(factory.Pass(), true);
325 return channel.Pass();
328 ChannelProxy::ChannelProxy(Context* context)
329 : context_(context),
330 did_init_(false) {
333 ChannelProxy::ChannelProxy(Listener* listener,
334 base::SingleThreadTaskRunner* ipc_task_runner)
335 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
338 ChannelProxy::~ChannelProxy() {
339 DCHECK(CalledOnValidThread());
341 Close();
344 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
345 Channel::Mode mode,
346 bool create_pipe_now) {
347 #if defined(OS_POSIX)
348 // When we are creating a server on POSIX, we need its file descriptor
349 // to be created immediately so that it can be accessed and passed
350 // to other processes. Forcing it to be created immediately avoids
351 // race conditions that may otherwise arise.
352 if (mode & Channel::MODE_SERVER_FLAG) {
353 create_pipe_now = true;
355 #endif // defined(OS_POSIX)
356 Init(ChannelFactory::Create(channel_handle, mode),
357 create_pipe_now);
360 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
361 bool create_pipe_now) {
362 DCHECK(CalledOnValidThread());
363 DCHECK(!did_init_);
365 if (create_pipe_now) {
366 // Create the channel immediately. This effectively sets up the
367 // low-level pipe so that the client can connect. Without creating
368 // the pipe immediately, it is possible for a listener to attempt
369 // to connect and get an error since the pipe doesn't exist yet.
370 context_->CreateChannel(factory.Pass());
371 } else {
372 context_->ipc_task_runner()->PostTask(
373 FROM_HERE, base::Bind(&Context::CreateChannel,
374 context_.get(), Passed(factory.Pass())));
377 // complete initialization on the background thread
378 context_->ipc_task_runner()->PostTask(
379 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
381 did_init_ = true;
384 void ChannelProxy::Close() {
385 DCHECK(CalledOnValidThread());
387 // Clear the backpointer to the listener so that any pending calls to
388 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
389 // possible that the channel could be closed while it is receiving messages!
390 context_->Clear();
392 if (context_->ipc_task_runner()) {
393 context_->ipc_task_runner()->PostTask(
394 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
398 bool ChannelProxy::Send(Message* message) {
399 DCHECK(did_init_);
401 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
402 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
404 #ifdef IPC_MESSAGE_LOG_ENABLED
405 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
406 #endif
408 context_->ipc_task_runner()->PostTask(
409 FROM_HERE,
410 base::Bind(&ChannelProxy::Context::OnSendMessage,
411 context_, base::Passed(scoped_ptr<Message>(message))));
412 return true;
415 void ChannelProxy::AddFilter(MessageFilter* filter) {
416 DCHECK(CalledOnValidThread());
418 context_->AddFilter(filter);
421 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
422 DCHECK(CalledOnValidThread());
424 context_->ipc_task_runner()->PostTask(
425 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
426 make_scoped_refptr(filter)));
429 void ChannelProxy::ClearIPCTaskRunner() {
430 DCHECK(CalledOnValidThread());
432 context()->ClearIPCTaskRunner();
435 #if defined(OS_POSIX) && !defined(OS_NACL)
436 // See the TODO regarding lazy initialization of the channel in
437 // ChannelProxy::Init().
438 int ChannelProxy::GetClientFileDescriptor() {
439 DCHECK(CalledOnValidThread());
441 Channel* channel = context_.get()->channel_.get();
442 // Channel must have been created first.
443 DCHECK(channel) << context_.get()->channel_id_;
444 return channel->GetClientFileDescriptor();
447 int ChannelProxy::TakeClientFileDescriptor() {
448 DCHECK(CalledOnValidThread());
450 Channel* channel = context_.get()->channel_.get();
451 // Channel must have been created first.
452 DCHECK(channel) << context_.get()->channel_id_;
453 return channel->TakeClientFileDescriptor();
455 #endif
457 //-----------------------------------------------------------------------------
459 } // namespace IPC