1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/profiler/scoped_tracker.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "ipc/ipc_channel_factory.h"
16 #include "ipc/ipc_listener.h"
17 #include "ipc/ipc_logging.h"
18 #include "ipc/ipc_message_macros.h"
19 #include "ipc/message_filter.h"
20 #include "ipc/message_filter_router.h"
24 //------------------------------------------------------------------------------
26 ChannelProxy::Context::Context(
28 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
)
29 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
31 ipc_task_runner_(ipc_task_runner
),
32 channel_connected_called_(false),
33 message_filter_router_(new MessageFilterRouter()),
34 peer_pid_(base::kNullProcessId
) {
35 DCHECK(ipc_task_runner_
.get());
36 // The Listener thread where Messages are handled must be a separate thread
37 // to avoid oversubscribing the IO thread. If you trigger this error, you
39 // 1) Create the ChannelProxy on a different thread, or
40 // 2) Just use Channel
41 // Note, we currently make an exception for a NULL listener. That usage
42 // basically works, but is outside the intent of ChannelProxy. This support
43 // will disappear, so please don't rely on it. See crbug.com/364241
44 DCHECK(!listener
|| (ipc_task_runner_
.get() != listener_task_runner_
.get()));
47 ChannelProxy::Context::~Context() {
50 void ChannelProxy::Context::ClearIPCTaskRunner() {
51 ipc_task_runner_
= NULL
;
54 void ChannelProxy::Context::SetListenerTaskRunner(
55 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
) {
56 DCHECK(ipc_task_runner_
.get() != task_runner
.get());
57 DCHECK(listener_task_runner_
->BelongsToCurrentThread());
58 DCHECK(task_runner
->BelongsToCurrentThread());
59 listener_task_runner_
= task_runner
;
62 void ChannelProxy::Context::CreateChannel(scoped_ptr
<ChannelFactory
> factory
) {
64 channel_id_
= factory
->GetName();
65 channel_
= factory
->BuildChannel(this);
68 bool ChannelProxy::Context::TryFilters(const Message
& message
) {
69 DCHECK(message_filter_router_
);
70 #ifdef IPC_MESSAGE_LOG_ENABLED
71 Logging
* logger
= Logging::GetInstance();
72 if (logger
->Enabled())
73 logger
->OnPreDispatchMessage(message
);
76 if (message_filter_router_
->TryFilters(message
)) {
77 if (message
.dispatch_error()) {
78 listener_task_runner_
->PostTask(
79 FROM_HERE
, base::Bind(&Context::OnDispatchBadMessage
, this, message
));
81 #ifdef IPC_MESSAGE_LOG_ENABLED
82 if (logger
->Enabled())
83 logger
->OnPostDispatchMessage(message
, channel_id_
);
90 // Called on the IPC::Channel thread
91 bool ChannelProxy::Context::OnMessageReceived(const Message
& message
) {
92 // First give a chance to the filters to process this message.
93 if (!TryFilters(message
))
94 OnMessageReceivedNoFilter(message
);
98 // Called on the IPC::Channel thread
99 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message
& message
) {
100 listener_task_runner_
->PostTask(
101 FROM_HERE
, base::Bind(&Context::OnDispatchMessage
, this, message
));
105 // Called on the IPC::Channel thread
106 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid
) {
107 // We cache off the peer_pid so it can be safely accessed from both threads.
108 peer_pid_
= channel_
->GetPeerPID();
110 // Add any pending filters. This avoids a race condition where someone
111 // creates a ChannelProxy, calls AddFilter, and then right after starts the
112 // peer process. The IO thread could receive a message before the task to add
113 // the filter is run on the IO thread.
116 // See above comment about using listener_task_runner_ here.
117 listener_task_runner_
->PostTask(
118 FROM_HERE
, base::Bind(&Context::OnDispatchConnected
, this));
121 // Called on the IPC::Channel thread
122 void ChannelProxy::Context::OnChannelError() {
123 for (size_t i
= 0; i
< filters_
.size(); ++i
)
124 filters_
[i
]->OnChannelError();
126 // See above comment about using listener_task_runner_ here.
127 listener_task_runner_
->PostTask(
128 FROM_HERE
, base::Bind(&Context::OnDispatchError
, this));
131 // Called on the IPC::Channel thread
132 void ChannelProxy::Context::OnChannelOpened() {
133 DCHECK(channel_
!= NULL
);
135 // Assume a reference to ourselves on behalf of this thread. This reference
136 // will be released when we are closed.
139 if (!channel_
->Connect()) {
144 for (size_t i
= 0; i
< filters_
.size(); ++i
)
145 filters_
[i
]->OnFilterAdded(channel_
.get());
148 // Called on the IPC::Channel thread
149 void ChannelProxy::Context::OnChannelClosed() {
150 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
151 tracked_objects::ScopedTracker
tracking_profile(
152 FROM_HERE_WITH_EXPLICIT_FUNCTION(
153 "477117 ChannelProxy::Context::OnChannelClosed"));
154 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
155 // would result in this branch being taken.
159 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
160 filters_
[i
]->OnChannelClosing();
161 filters_
[i
]->OnFilterRemoved();
164 // We don't need the filters anymore.
165 message_filter_router_
->Clear();
167 // We don't need the lock, because at this point, the listener thread can't
168 // access it any more.
169 pending_filters_
.clear();
173 // Balance with the reference taken during startup. This may result in
178 void ChannelProxy::Context::Clear() {
182 // Called on the IPC::Channel thread
183 void ChannelProxy::Context::OnSendMessage(scoped_ptr
<Message
> message
) {
184 // TODO(pkasting): Remove ScopedTracker below once crbug.com/477117 is fixed.
185 tracked_objects::ScopedTracker
tracking_profile(
186 FROM_HERE_WITH_EXPLICIT_FUNCTION(
187 "477117 ChannelProxy::Context::OnSendMessage"));
193 if (!channel_
->Send(message
.release()))
197 // Called on the IPC::Channel thread
198 void ChannelProxy::Context::OnAddFilter() {
199 // Our OnChannelConnected method has not yet been called, so we can't be
200 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
201 // it invokes OnAddFilter, so any pending filter(s) will be added at that
203 if (peer_pid_
== base::kNullProcessId
)
206 std::vector
<scoped_refptr
<MessageFilter
> > new_filters
;
208 base::AutoLock
auto_lock(pending_filters_lock_
);
209 new_filters
.swap(pending_filters_
);
212 for (size_t i
= 0; i
< new_filters
.size(); ++i
) {
213 filters_
.push_back(new_filters
[i
]);
215 message_filter_router_
->AddFilter(new_filters
[i
].get());
217 // The channel has already been created and connected, so we need to
218 // inform the filters right now.
219 new_filters
[i
]->OnFilterAdded(channel_
.get());
220 new_filters
[i
]->OnChannelConnected(peer_pid_
);
224 // Called on the IPC::Channel thread
225 void ChannelProxy::Context::OnRemoveFilter(MessageFilter
* filter
) {
226 if (peer_pid_
== base::kNullProcessId
) {
227 // The channel is not yet connected, so any filters are still pending.
228 base::AutoLock
auto_lock(pending_filters_lock_
);
229 for (size_t i
= 0; i
< pending_filters_
.size(); ++i
) {
230 if (pending_filters_
[i
].get() == filter
) {
231 filter
->OnFilterRemoved();
232 pending_filters_
.erase(pending_filters_
.begin() + i
);
239 return; // The filters have already been deleted.
241 message_filter_router_
->RemoveFilter(filter
);
243 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
244 if (filters_
[i
].get() == filter
) {
245 filter
->OnFilterRemoved();
246 filters_
.erase(filters_
.begin() + i
);
251 NOTREACHED() << "filter to be removed not found";
254 // Called on the listener's thread
255 void ChannelProxy::Context::AddFilter(MessageFilter
* filter
) {
256 base::AutoLock
auto_lock(pending_filters_lock_
);
257 pending_filters_
.push_back(make_scoped_refptr(filter
));
258 ipc_task_runner_
->PostTask(
259 FROM_HERE
, base::Bind(&Context::OnAddFilter
, this));
262 // Called on the listener's thread
263 void ChannelProxy::Context::OnDispatchMessage(const Message
& message
) {
264 #if defined(IPC_MESSAGE_LOG_ENABLED)
265 Logging
* logger
= Logging::GetInstance();
267 logger
->GetMessageText(message
.type(), &name
, &message
, NULL
);
268 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
271 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
272 "class", IPC_MESSAGE_ID_CLASS(message
.type()),
273 "line", IPC_MESSAGE_ID_LINE(message
.type()));
279 OnDispatchConnected();
281 #ifdef IPC_MESSAGE_LOG_ENABLED
282 if (message
.type() == IPC_LOGGING_ID
) {
283 logger
->OnReceivedLoggingMessage(message
);
287 if (logger
->Enabled())
288 logger
->OnPreDispatchMessage(message
);
291 listener_
->OnMessageReceived(message
);
292 if (message
.dispatch_error())
293 listener_
->OnBadMessageReceived(message
);
295 #ifdef IPC_MESSAGE_LOG_ENABLED
296 if (logger
->Enabled())
297 logger
->OnPostDispatchMessage(message
, channel_id_
);
301 // Called on the listener's thread
302 void ChannelProxy::Context::OnDispatchConnected() {
303 if (channel_connected_called_
)
306 channel_connected_called_
= true;
308 listener_
->OnChannelConnected(peer_pid_
);
311 // Called on the listener's thread
312 void ChannelProxy::Context::OnDispatchError() {
314 listener_
->OnChannelError();
317 // Called on the listener's thread
318 void ChannelProxy::Context::OnDispatchBadMessage(const Message
& message
) {
320 listener_
->OnBadMessageReceived(message
);
323 //-----------------------------------------------------------------------------
326 scoped_ptr
<ChannelProxy
> ChannelProxy::Create(
327 const IPC::ChannelHandle
& channel_handle
,
330 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
) {
331 scoped_ptr
<ChannelProxy
> channel(new ChannelProxy(listener
, ipc_task_runner
));
332 channel
->Init(channel_handle
, mode
, true);
333 return channel
.Pass();
337 scoped_ptr
<ChannelProxy
> ChannelProxy::Create(
338 scoped_ptr
<ChannelFactory
> factory
,
340 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
) {
341 scoped_ptr
<ChannelProxy
> channel(new ChannelProxy(listener
, ipc_task_runner
));
342 channel
->Init(factory
.Pass(), true);
343 return channel
.Pass();
346 ChannelProxy::ChannelProxy(Context
* context
)
349 #if defined(ENABLE_IPC_FUZZER)
350 outgoing_message_filter_
= NULL
;
354 ChannelProxy::ChannelProxy(
356 const scoped_refptr
<base::SingleThreadTaskRunner
>& ipc_task_runner
)
357 : context_(new Context(listener
, ipc_task_runner
)), did_init_(false) {
358 #if defined(ENABLE_IPC_FUZZER)
359 outgoing_message_filter_
= NULL
;
363 ChannelProxy::~ChannelProxy() {
364 DCHECK(CalledOnValidThread());
369 void ChannelProxy::Init(const IPC::ChannelHandle
& channel_handle
,
371 bool create_pipe_now
) {
372 #if defined(OS_POSIX)
373 // When we are creating a server on POSIX, we need its file descriptor
374 // to be created immediately so that it can be accessed and passed
375 // to other processes. Forcing it to be created immediately avoids
376 // race conditions that may otherwise arise.
377 if (mode
& Channel::MODE_SERVER_FLAG
) {
378 create_pipe_now
= true;
380 #endif // defined(OS_POSIX)
381 Init(ChannelFactory::Create(channel_handle
, mode
),
385 void ChannelProxy::Init(scoped_ptr
<ChannelFactory
> factory
,
386 bool create_pipe_now
) {
387 DCHECK(CalledOnValidThread());
390 if (create_pipe_now
) {
391 // Create the channel immediately. This effectively sets up the
392 // low-level pipe so that the client can connect. Without creating
393 // the pipe immediately, it is possible for a listener to attempt
394 // to connect and get an error since the pipe doesn't exist yet.
395 context_
->CreateChannel(factory
.Pass());
397 context_
->ipc_task_runner()->PostTask(
398 FROM_HERE
, base::Bind(&Context::CreateChannel
,
399 context_
.get(), Passed(factory
.Pass())));
402 // complete initialization on the background thread
403 context_
->ipc_task_runner()->PostTask(
404 FROM_HERE
, base::Bind(&Context::OnChannelOpened
, context_
.get()));
409 void ChannelProxy::Close() {
410 DCHECK(CalledOnValidThread());
412 // Clear the backpointer to the listener so that any pending calls to
413 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
414 // possible that the channel could be closed while it is receiving messages!
417 if (context_
->ipc_task_runner()) {
418 context_
->ipc_task_runner()->PostTask(
419 FROM_HERE
, base::Bind(&Context::OnChannelClosed
, context_
.get()));
423 bool ChannelProxy::Send(Message
* message
) {
426 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
427 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
429 #ifdef ENABLE_IPC_FUZZER
430 // In IPC fuzzing builds, it is possible to define a filter to apply to
431 // outgoing messages. It will either rewrite the message and return a new
432 // one, freeing the original, or return the message unchanged.
433 if (outgoing_message_filter())
434 message
= outgoing_message_filter()->Rewrite(message
);
437 #ifdef IPC_MESSAGE_LOG_ENABLED
438 Logging::GetInstance()->OnSendMessage(message
, context_
->channel_id());
441 context_
->ipc_task_runner()->PostTask(
443 base::Bind(&ChannelProxy::Context::OnSendMessage
,
444 context_
, base::Passed(scoped_ptr
<Message
>(message
))));
448 void ChannelProxy::AddFilter(MessageFilter
* filter
) {
449 DCHECK(CalledOnValidThread());
451 context_
->AddFilter(filter
);
454 void ChannelProxy::RemoveFilter(MessageFilter
* filter
) {
455 DCHECK(CalledOnValidThread());
457 context_
->ipc_task_runner()->PostTask(
458 FROM_HERE
, base::Bind(&Context::OnRemoveFilter
, context_
.get(),
459 make_scoped_refptr(filter
)));
462 void ChannelProxy::SetListenerTaskRunner(
463 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner
) {
464 DCHECK(CalledOnValidThread());
466 context()->SetListenerTaskRunner(task_runner
);
469 void ChannelProxy::ClearIPCTaskRunner() {
470 DCHECK(CalledOnValidThread());
472 context()->ClearIPCTaskRunner();
475 #if defined(OS_POSIX) && !defined(OS_NACL_SFI)
476 // See the TODO regarding lazy initialization of the channel in
477 // ChannelProxy::Init().
478 int ChannelProxy::GetClientFileDescriptor() {
479 DCHECK(CalledOnValidThread());
481 Channel
* channel
= context_
.get()->channel_
.get();
482 // Channel must have been created first.
483 DCHECK(channel
) << context_
.get()->channel_id_
;
484 return channel
->GetClientFileDescriptor();
487 base::ScopedFD
ChannelProxy::TakeClientFileDescriptor() {
488 DCHECK(CalledOnValidThread());
490 Channel
* channel
= context_
.get()->channel_
.get();
491 // Channel must have been created first.
492 DCHECK(channel
) << context_
.get()->channel_id_
;
493 return channel
->TakeClientFileDescriptor();
497 //-----------------------------------------------------------------------------