Roll tools/swarming_client/ to 41036ec833891e4cf59050e4ac0c5d4986e48a75.
[chromium-blink-merge.git] / ipc / ipc_channel_proxy.cc
blob64fab8fca56db0067e909aa5f96d2fe04443e912
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
7 #include "base/bind.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_channel_factory.h"
15 #include "ipc/ipc_listener.h"
16 #include "ipc/ipc_logging.h"
17 #include "ipc/ipc_message_macros.h"
18 #include "ipc/message_filter.h"
19 #include "ipc/message_filter_router.h"
21 namespace IPC {
23 //------------------------------------------------------------------------------
25 ChannelProxy::Context::Context(
26 Listener* listener,
27 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
28 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
29 listener_(listener),
30 ipc_task_runner_(ipc_task_runner),
31 channel_connected_called_(false),
32 message_filter_router_(new MessageFilterRouter()),
33 peer_pid_(base::kNullProcessId) {
34 DCHECK(ipc_task_runner_.get());
35 // The Listener thread where Messages are handled must be a separate thread
36 // to avoid oversubscribing the IO thread. If you trigger this error, you
37 // need to either:
38 // 1) Create the ChannelProxy on a different thread, or
39 // 2) Just use Channel
40 // Note, we currently make an exception for a NULL listener. That usage
41 // basically works, but is outside the intent of ChannelProxy. This support
42 // will disappear, so please don't rely on it. See crbug.com/364241
43 DCHECK(!listener || (ipc_task_runner_.get() != listener_task_runner_.get()));
46 ChannelProxy::Context::~Context() {
49 void ChannelProxy::Context::ClearIPCTaskRunner() {
50 ipc_task_runner_ = NULL;
53 void ChannelProxy::Context::CreateChannel(scoped_ptr<ChannelFactory> factory) {
54 DCHECK(!channel_);
55 channel_id_ = factory->GetName();
56 channel_ = factory->BuildChannel(this);
59 bool ChannelProxy::Context::TryFilters(const Message& message) {
60 DCHECK(message_filter_router_);
61 #ifdef IPC_MESSAGE_LOG_ENABLED
62 Logging* logger = Logging::GetInstance();
63 if (logger->Enabled())
64 logger->OnPreDispatchMessage(message);
65 #endif
67 if (message_filter_router_->TryFilters(message)) {
68 if (message.dispatch_error()) {
69 listener_task_runner_->PostTask(
70 FROM_HERE, base::Bind(&Context::OnDispatchBadMessage, this, message));
72 #ifdef IPC_MESSAGE_LOG_ENABLED
73 if (logger->Enabled())
74 logger->OnPostDispatchMessage(message, channel_id_);
75 #endif
76 return true;
78 return false;
81 // Called on the IPC::Channel thread
82 bool ChannelProxy::Context::OnMessageReceived(const Message& message) {
83 // First give a chance to the filters to process this message.
84 if (!TryFilters(message))
85 OnMessageReceivedNoFilter(message);
86 return true;
89 // Called on the IPC::Channel thread
90 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
91 listener_task_runner_->PostTask(
92 FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
93 return true;
96 // Called on the IPC::Channel thread
97 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
98 // We cache off the peer_pid so it can be safely accessed from both threads.
99 peer_pid_ = channel_->GetPeerPID();
101 // Add any pending filters. This avoids a race condition where someone
102 // creates a ChannelProxy, calls AddFilter, and then right after starts the
103 // peer process. The IO thread could receive a message before the task to add
104 // the filter is run on the IO thread.
105 OnAddFilter();
107 // See above comment about using listener_task_runner_ here.
108 listener_task_runner_->PostTask(
109 FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
112 // Called on the IPC::Channel thread
113 void ChannelProxy::Context::OnChannelError() {
114 for (size_t i = 0; i < filters_.size(); ++i)
115 filters_[i]->OnChannelError();
117 // See above comment about using listener_task_runner_ here.
118 listener_task_runner_->PostTask(
119 FROM_HERE, base::Bind(&Context::OnDispatchError, this));
122 // Called on the IPC::Channel thread
123 void ChannelProxy::Context::OnChannelOpened() {
124 DCHECK(channel_ != NULL);
126 // Assume a reference to ourselves on behalf of this thread. This reference
127 // will be released when we are closed.
128 AddRef();
130 if (!channel_->Connect()) {
131 OnChannelError();
132 return;
135 for (size_t i = 0; i < filters_.size(); ++i)
136 filters_[i]->OnFilterAdded(channel_.get());
139 // Called on the IPC::Channel thread
140 void ChannelProxy::Context::OnChannelClosed() {
141 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
142 // would result in this branch being taken.
143 if (!channel_)
144 return;
146 for (size_t i = 0; i < filters_.size(); ++i) {
147 filters_[i]->OnChannelClosing();
148 filters_[i]->OnFilterRemoved();
151 // We don't need the filters anymore.
152 message_filter_router_->Clear();
153 filters_.clear();
154 // We don't need the lock, because at this point, the listener thread can't
155 // access it any more.
156 pending_filters_.clear();
158 channel_.reset();
160 // Balance with the reference taken during startup. This may result in
161 // self-destruction.
162 Release();
165 void ChannelProxy::Context::Clear() {
166 listener_ = NULL;
169 // Called on the IPC::Channel thread
170 void ChannelProxy::Context::OnSendMessage(scoped_ptr<Message> message) {
171 if (!channel_) {
172 OnChannelClosed();
173 return;
176 if (!channel_->Send(message.release()))
177 OnChannelError();
180 // Called on the IPC::Channel thread
181 void ChannelProxy::Context::OnAddFilter() {
182 // Our OnChannelConnected method has not yet been called, so we can't be
183 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
184 // it invokes OnAddFilter, so any pending filter(s) will be added at that
185 // time.
186 if (peer_pid_ == base::kNullProcessId)
187 return;
189 std::vector<scoped_refptr<MessageFilter> > new_filters;
191 base::AutoLock auto_lock(pending_filters_lock_);
192 new_filters.swap(pending_filters_);
195 for (size_t i = 0; i < new_filters.size(); ++i) {
196 filters_.push_back(new_filters[i]);
198 message_filter_router_->AddFilter(new_filters[i].get());
200 // The channel has already been created and connected, so we need to
201 // inform the filters right now.
202 new_filters[i]->OnFilterAdded(channel_.get());
203 new_filters[i]->OnChannelConnected(peer_pid_);
207 // Called on the IPC::Channel thread
208 void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
209 if (peer_pid_ == base::kNullProcessId) {
210 // The channel is not yet connected, so any filters are still pending.
211 base::AutoLock auto_lock(pending_filters_lock_);
212 for (size_t i = 0; i < pending_filters_.size(); ++i) {
213 if (pending_filters_[i].get() == filter) {
214 filter->OnFilterRemoved();
215 pending_filters_.erase(pending_filters_.begin() + i);
216 return;
219 return;
221 if (!channel_)
222 return; // The filters have already been deleted.
224 message_filter_router_->RemoveFilter(filter);
226 for (size_t i = 0; i < filters_.size(); ++i) {
227 if (filters_[i].get() == filter) {
228 filter->OnFilterRemoved();
229 filters_.erase(filters_.begin() + i);
230 return;
234 NOTREACHED() << "filter to be removed not found";
237 // Called on the listener's thread
238 void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
239 base::AutoLock auto_lock(pending_filters_lock_);
240 pending_filters_.push_back(make_scoped_refptr(filter));
241 ipc_task_runner_->PostTask(
242 FROM_HERE, base::Bind(&Context::OnAddFilter, this));
245 // Called on the listener's thread
246 void ChannelProxy::Context::OnDispatchMessage(const Message& message) {
247 #ifdef IPC_MESSAGE_LOG_ENABLED
248 Logging* logger = Logging::GetInstance();
249 std::string name;
250 logger->GetMessageText(message.type(), &name, &message, NULL);
251 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
252 "name", name);
253 #else
254 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
255 "class", IPC_MESSAGE_ID_CLASS(message.type()),
256 "line", IPC_MESSAGE_ID_LINE(message.type()));
257 #endif
259 if (!listener_)
260 return;
262 OnDispatchConnected();
264 #ifdef IPC_MESSAGE_LOG_ENABLED
265 if (message.type() == IPC_LOGGING_ID) {
266 logger->OnReceivedLoggingMessage(message);
267 return;
270 if (logger->Enabled())
271 logger->OnPreDispatchMessage(message);
272 #endif
274 listener_->OnMessageReceived(message);
275 if (message.dispatch_error())
276 listener_->OnBadMessageReceived(message);
278 #ifdef IPC_MESSAGE_LOG_ENABLED
279 if (logger->Enabled())
280 logger->OnPostDispatchMessage(message, channel_id_);
281 #endif
284 // Called on the listener's thread
285 void ChannelProxy::Context::OnDispatchConnected() {
286 if (channel_connected_called_)
287 return;
289 channel_connected_called_ = true;
290 if (listener_)
291 listener_->OnChannelConnected(peer_pid_);
294 // Called on the listener's thread
295 void ChannelProxy::Context::OnDispatchError() {
296 if (listener_)
297 listener_->OnChannelError();
300 // Called on the listener's thread
301 void ChannelProxy::Context::OnDispatchBadMessage(const Message& message) {
302 if (listener_)
303 listener_->OnBadMessageReceived(message);
306 //-----------------------------------------------------------------------------
308 // static
309 scoped_ptr<ChannelProxy> ChannelProxy::Create(
310 const IPC::ChannelHandle& channel_handle,
311 Channel::Mode mode,
312 Listener* listener,
313 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
314 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
315 channel->Init(channel_handle, mode, true);
316 return channel.Pass();
319 // static
320 scoped_ptr<ChannelProxy> ChannelProxy::Create(
321 scoped_ptr<ChannelFactory> factory,
322 Listener* listener,
323 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
324 scoped_ptr<ChannelProxy> channel(new ChannelProxy(listener, ipc_task_runner));
325 channel->Init(factory.Pass(), true);
326 return channel.Pass();
329 ChannelProxy::ChannelProxy(Context* context)
330 : context_(context),
331 did_init_(false) {
334 ChannelProxy::ChannelProxy(
335 Listener* listener,
336 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
337 : context_(new Context(listener, ipc_task_runner)), did_init_(false) {
340 ChannelProxy::~ChannelProxy() {
341 DCHECK(CalledOnValidThread());
343 Close();
346 void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
347 Channel::Mode mode,
348 bool create_pipe_now) {
349 #if defined(OS_POSIX)
350 // When we are creating a server on POSIX, we need its file descriptor
351 // to be created immediately so that it can be accessed and passed
352 // to other processes. Forcing it to be created immediately avoids
353 // race conditions that may otherwise arise.
354 if (mode & Channel::MODE_SERVER_FLAG) {
355 create_pipe_now = true;
357 #endif // defined(OS_POSIX)
358 Init(ChannelFactory::Create(channel_handle, mode),
359 create_pipe_now);
362 void ChannelProxy::Init(scoped_ptr<ChannelFactory> factory,
363 bool create_pipe_now) {
364 DCHECK(CalledOnValidThread());
365 DCHECK(!did_init_);
367 if (create_pipe_now) {
368 // Create the channel immediately. This effectively sets up the
369 // low-level pipe so that the client can connect. Without creating
370 // the pipe immediately, it is possible for a listener to attempt
371 // to connect and get an error since the pipe doesn't exist yet.
372 context_->CreateChannel(factory.Pass());
373 } else {
374 context_->ipc_task_runner()->PostTask(
375 FROM_HERE, base::Bind(&Context::CreateChannel,
376 context_.get(), Passed(factory.Pass())));
379 // complete initialization on the background thread
380 context_->ipc_task_runner()->PostTask(
381 FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));
383 did_init_ = true;
386 void ChannelProxy::Close() {
387 DCHECK(CalledOnValidThread());
389 // Clear the backpointer to the listener so that any pending calls to
390 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
391 // possible that the channel could be closed while it is receiving messages!
392 context_->Clear();
394 if (context_->ipc_task_runner()) {
395 context_->ipc_task_runner()->PostTask(
396 FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
400 bool ChannelProxy::Send(Message* message) {
401 DCHECK(did_init_);
403 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
404 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
406 #ifdef IPC_MESSAGE_LOG_ENABLED
407 Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
408 #endif
410 context_->ipc_task_runner()->PostTask(
411 FROM_HERE,
412 base::Bind(&ChannelProxy::Context::OnSendMessage,
413 context_, base::Passed(scoped_ptr<Message>(message))));
414 return true;
417 void ChannelProxy::AddFilter(MessageFilter* filter) {
418 DCHECK(CalledOnValidThread());
420 context_->AddFilter(filter);
423 void ChannelProxy::RemoveFilter(MessageFilter* filter) {
424 DCHECK(CalledOnValidThread());
426 context_->ipc_task_runner()->PostTask(
427 FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
428 make_scoped_refptr(filter)));
431 void ChannelProxy::ClearIPCTaskRunner() {
432 DCHECK(CalledOnValidThread());
434 context()->ClearIPCTaskRunner();
437 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
438 // See the TODO regarding lazy initialization of the channel in
439 // ChannelProxy::Init().
440 int ChannelProxy::GetClientFileDescriptor() {
441 DCHECK(CalledOnValidThread());
443 Channel* channel = context_.get()->channel_.get();
444 // Channel must have been created first.
445 DCHECK(channel) << context_.get()->channel_id_;
446 return channel->GetClientFileDescriptor();
449 base::ScopedFD ChannelProxy::TakeClientFileDescriptor() {
450 DCHECK(CalledOnValidThread());
452 Channel* channel = context_.get()->channel_.get();
453 // Channel must have been created first.
454 DCHECK(channel) << context_.get()->channel_id_;
455 return channel->TakeClientFileDescriptor();
457 #endif
459 //-----------------------------------------------------------------------------
461 } // namespace IPC