1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_proxy.h"
8 #include "base/compiler_specific.h"
9 #include "base/location.h"
10 #include "base/memory/ref_counted.h"
11 #include "base/memory/scoped_ptr.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/thread_task_runner_handle.h"
14 #include "ipc/ipc_listener.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_macros.h"
17 #include "ipc/message_filter.h"
18 #include "ipc/message_filter_router.h"
22 //------------------------------------------------------------------------------
24 ChannelProxy::Context::Context(Listener
* listener
,
25 base::SingleThreadTaskRunner
* ipc_task_runner
)
26 : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
28 ipc_task_runner_(ipc_task_runner
),
29 channel_connected_called_(false),
30 message_filter_router_(new MessageFilterRouter()),
31 peer_pid_(base::kNullProcessId
) {
32 DCHECK(ipc_task_runner_
.get());
33 // The Listener thread where Messages are handled must be a separate thread
34 // to avoid oversubscribing the IO thread. If you trigger this error, you
36 // 1) Create the ChannelProxy on a different thread, or
37 // 2) Just use Channel
38 // Note, we currently make an exception for a NULL listener. That usage
39 // basically works, but is outside the intent of ChannelProxy. This support
40 // will disappear, so please don't rely on it. See crbug.com/364241
41 DCHECK(!listener
|| (ipc_task_runner_
.get() != listener_task_runner_
.get()));
44 ChannelProxy::Context::~Context() {
47 void ChannelProxy::Context::ClearIPCTaskRunner() {
48 ipc_task_runner_
= NULL
;
51 void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle
& handle
,
52 const Channel::Mode
& mode
) {
54 channel_id_
= handle
.name
;
55 channel_
= Channel::Create(handle
, mode
, this);
58 bool ChannelProxy::Context::TryFilters(const Message
& message
) {
59 DCHECK(message_filter_router_
);
60 #ifdef IPC_MESSAGE_LOG_ENABLED
61 Logging
* logger
= Logging::GetInstance();
62 if (logger
->Enabled())
63 logger
->OnPreDispatchMessage(message
);
66 if (message_filter_router_
->TryFilters(message
)) {
67 if (message
.dispatch_error()) {
68 listener_task_runner_
->PostTask(
69 FROM_HERE
, base::Bind(&Context::OnDispatchBadMessage
, this, message
));
71 #ifdef IPC_MESSAGE_LOG_ENABLED
72 if (logger
->Enabled())
73 logger
->OnPostDispatchMessage(message
, channel_id_
);
80 // Called on the IPC::Channel thread
81 bool ChannelProxy::Context::OnMessageReceived(const Message
& message
) {
82 // First give a chance to the filters to process this message.
83 if (!TryFilters(message
))
84 OnMessageReceivedNoFilter(message
);
88 // Called on the IPC::Channel thread
89 bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message
& message
) {
90 listener_task_runner_
->PostTask(
91 FROM_HERE
, base::Bind(&Context::OnDispatchMessage
, this, message
));
95 // Called on the IPC::Channel thread
96 void ChannelProxy::Context::OnChannelConnected(int32 peer_pid
) {
97 // We cache off the peer_pid so it can be safely accessed from both threads.
98 peer_pid_
= channel_
->GetPeerPID();
100 // Add any pending filters. This avoids a race condition where someone
101 // creates a ChannelProxy, calls AddFilter, and then right after starts the
102 // peer process. The IO thread could receive a message before the task to add
103 // the filter is run on the IO thread.
106 // See above comment about using listener_task_runner_ here.
107 listener_task_runner_
->PostTask(
108 FROM_HERE
, base::Bind(&Context::OnDispatchConnected
, this));
111 // Called on the IPC::Channel thread
112 void ChannelProxy::Context::OnChannelError() {
113 for (size_t i
= 0; i
< filters_
.size(); ++i
)
114 filters_
[i
]->OnChannelError();
116 // See above comment about using listener_task_runner_ here.
117 listener_task_runner_
->PostTask(
118 FROM_HERE
, base::Bind(&Context::OnDispatchError
, this));
121 // Called on the IPC::Channel thread
122 void ChannelProxy::Context::OnChannelOpened() {
123 DCHECK(channel_
!= NULL
);
125 // Assume a reference to ourselves on behalf of this thread. This reference
126 // will be released when we are closed.
129 if (!channel_
->Connect()) {
134 for (size_t i
= 0; i
< filters_
.size(); ++i
)
135 filters_
[i
]->OnFilterAdded(channel_
.get());
138 // Called on the IPC::Channel thread
139 void ChannelProxy::Context::OnChannelClosed() {
140 // It's okay for IPC::ChannelProxy::Close to be called more than once, which
141 // would result in this branch being taken.
145 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
146 filters_
[i
]->OnChannelClosing();
147 filters_
[i
]->OnFilterRemoved();
150 // We don't need the filters anymore.
151 message_filter_router_
->Clear();
153 // We don't need the lock, because at this point, the listener thread can't
154 // access it any more.
155 pending_filters_
.clear();
159 // Balance with the reference taken during startup. This may result in
164 void ChannelProxy::Context::Clear() {
168 // Called on the IPC::Channel thread
169 void ChannelProxy::Context::OnSendMessage(scoped_ptr
<Message
> message
) {
175 if (!channel_
->Send(message
.release()))
179 // Called on the IPC::Channel thread
180 void ChannelProxy::Context::OnAddFilter() {
181 // Our OnChannelConnected method has not yet been called, so we can't be
182 // sure that channel_ is valid yet. When OnChannelConnected *is* called,
183 // it invokes OnAddFilter, so any pending filter(s) will be added at that
185 if (peer_pid_
== base::kNullProcessId
)
188 std::vector
<scoped_refptr
<MessageFilter
> > new_filters
;
190 base::AutoLock
auto_lock(pending_filters_lock_
);
191 new_filters
.swap(pending_filters_
);
194 for (size_t i
= 0; i
< new_filters
.size(); ++i
) {
195 filters_
.push_back(new_filters
[i
]);
197 message_filter_router_
->AddFilter(new_filters
[i
].get());
199 // The channel has already been created and connected, so we need to
200 // inform the filters right now.
201 new_filters
[i
]->OnFilterAdded(channel_
.get());
202 new_filters
[i
]->OnChannelConnected(peer_pid_
);
206 // Called on the IPC::Channel thread
207 void ChannelProxy::Context::OnRemoveFilter(MessageFilter
* filter
) {
208 if (peer_pid_
== base::kNullProcessId
) {
209 // The channel is not yet connected, so any filters are still pending.
210 base::AutoLock
auto_lock(pending_filters_lock_
);
211 for (size_t i
= 0; i
< pending_filters_
.size(); ++i
) {
212 if (pending_filters_
[i
].get() == filter
) {
213 filter
->OnFilterRemoved();
214 pending_filters_
.erase(pending_filters_
.begin() + i
);
221 return; // The filters have already been deleted.
223 message_filter_router_
->RemoveFilter(filter
);
225 for (size_t i
= 0; i
< filters_
.size(); ++i
) {
226 if (filters_
[i
].get() == filter
) {
227 filter
->OnFilterRemoved();
228 filters_
.erase(filters_
.begin() + i
);
233 NOTREACHED() << "filter to be removed not found";
236 // Called on the listener's thread
237 void ChannelProxy::Context::AddFilter(MessageFilter
* filter
) {
238 base::AutoLock
auto_lock(pending_filters_lock_
);
239 pending_filters_
.push_back(make_scoped_refptr(filter
));
240 ipc_task_runner_
->PostTask(
241 FROM_HERE
, base::Bind(&Context::OnAddFilter
, this));
244 // Called on the listener's thread
245 void ChannelProxy::Context::OnDispatchMessage(const Message
& message
) {
246 #ifdef IPC_MESSAGE_LOG_ENABLED
247 Logging
* logger
= Logging::GetInstance();
249 logger
->GetMessageText(message
.type(), &name
, &message
, NULL
);
250 TRACE_EVENT1("ipc", "ChannelProxy::Context::OnDispatchMessage",
253 TRACE_EVENT2("ipc", "ChannelProxy::Context::OnDispatchMessage",
254 "class", IPC_MESSAGE_ID_CLASS(message
.type()),
255 "line", IPC_MESSAGE_ID_LINE(message
.type()));
261 OnDispatchConnected();
263 #ifdef IPC_MESSAGE_LOG_ENABLED
264 if (message
.type() == IPC_LOGGING_ID
) {
265 logger
->OnReceivedLoggingMessage(message
);
269 if (logger
->Enabled())
270 logger
->OnPreDispatchMessage(message
);
273 listener_
->OnMessageReceived(message
);
274 if (message
.dispatch_error())
275 listener_
->OnBadMessageReceived(message
);
277 #ifdef IPC_MESSAGE_LOG_ENABLED
278 if (logger
->Enabled())
279 logger
->OnPostDispatchMessage(message
, channel_id_
);
283 // Called on the listener's thread
284 void ChannelProxy::Context::OnDispatchConnected() {
285 if (channel_connected_called_
)
288 channel_connected_called_
= true;
290 listener_
->OnChannelConnected(peer_pid_
);
293 // Called on the listener's thread
294 void ChannelProxy::Context::OnDispatchError() {
296 listener_
->OnChannelError();
299 // Called on the listener's thread
300 void ChannelProxy::Context::OnDispatchBadMessage(const Message
& message
) {
302 listener_
->OnBadMessageReceived(message
);
305 //-----------------------------------------------------------------------------
308 scoped_ptr
<ChannelProxy
> ChannelProxy::Create(
309 const IPC::ChannelHandle
& channel_handle
,
312 base::SingleThreadTaskRunner
* ipc_task_runner
) {
313 scoped_ptr
<ChannelProxy
> channel(new ChannelProxy(listener
, ipc_task_runner
));
314 channel
->Init(channel_handle
, mode
, true);
315 return channel
.Pass();
318 ChannelProxy::ChannelProxy(Context
* context
)
323 ChannelProxy::ChannelProxy(Listener
* listener
,
324 base::SingleThreadTaskRunner
* ipc_task_runner
)
325 : context_(new Context(listener
, ipc_task_runner
)), did_init_(false) {
328 ChannelProxy::~ChannelProxy() {
329 DCHECK(CalledOnValidThread());
334 void ChannelProxy::Init(const IPC::ChannelHandle
& channel_handle
,
336 bool create_pipe_now
) {
337 DCHECK(CalledOnValidThread());
339 #if defined(OS_POSIX)
340 // When we are creating a server on POSIX, we need its file descriptor
341 // to be created immediately so that it can be accessed and passed
342 // to other processes. Forcing it to be created immediately avoids
343 // race conditions that may otherwise arise.
344 if (mode
& Channel::MODE_SERVER_FLAG
) {
345 create_pipe_now
= true;
347 #endif // defined(OS_POSIX)
349 if (create_pipe_now
) {
350 // Create the channel immediately. This effectively sets up the
351 // low-level pipe so that the client can connect. Without creating
352 // the pipe immediately, it is possible for a listener to attempt
353 // to connect and get an error since the pipe doesn't exist yet.
354 context_
->CreateChannel(channel_handle
, mode
);
356 context_
->ipc_task_runner()->PostTask(
357 FROM_HERE
, base::Bind(&Context::CreateChannel
, context_
.get(),
358 channel_handle
, mode
));
361 // complete initialization on the background thread
362 context_
->ipc_task_runner()->PostTask(
363 FROM_HERE
, base::Bind(&Context::OnChannelOpened
, context_
.get()));
368 void ChannelProxy::Close() {
369 DCHECK(CalledOnValidThread());
371 // Clear the backpointer to the listener so that any pending calls to
372 // Context::OnDispatchMessage or OnDispatchError will be ignored. It is
373 // possible that the channel could be closed while it is receiving messages!
376 if (context_
->ipc_task_runner()) {
377 context_
->ipc_task_runner()->PostTask(
378 FROM_HERE
, base::Bind(&Context::OnChannelClosed
, context_
.get()));
382 bool ChannelProxy::Send(Message
* message
) {
385 // TODO(alexeypa): add DCHECK(CalledOnValidThread()) here. Currently there are
386 // tests that call Send() from a wrong thread. See http://crbug.com/163523.
388 #ifdef IPC_MESSAGE_LOG_ENABLED
389 Logging::GetInstance()->OnSendMessage(message
, context_
->channel_id());
392 context_
->ipc_task_runner()->PostTask(
394 base::Bind(&ChannelProxy::Context::OnSendMessage
,
395 context_
, base::Passed(scoped_ptr
<Message
>(message
))));
399 void ChannelProxy::AddFilter(MessageFilter
* filter
) {
400 DCHECK(CalledOnValidThread());
402 context_
->AddFilter(filter
);
405 void ChannelProxy::RemoveFilter(MessageFilter
* filter
) {
406 DCHECK(CalledOnValidThread());
408 context_
->ipc_task_runner()->PostTask(
409 FROM_HERE
, base::Bind(&Context::OnRemoveFilter
, context_
.get(),
410 make_scoped_refptr(filter
)));
413 void ChannelProxy::ClearIPCTaskRunner() {
414 DCHECK(CalledOnValidThread());
416 context()->ClearIPCTaskRunner();
419 #if defined(OS_POSIX) && !defined(OS_NACL)
420 // See the TODO regarding lazy initialization of the channel in
421 // ChannelProxy::Init().
422 int ChannelProxy::GetClientFileDescriptor() {
423 DCHECK(CalledOnValidThread());
425 Channel
* channel
= context_
.get()->channel_
.get();
426 // Channel must have been created first.
427 DCHECK(channel
) << context_
.get()->channel_id_
;
428 return channel
->GetClientFileDescriptor();
431 int ChannelProxy::TakeClientFileDescriptor() {
432 DCHECK(CalledOnValidThread());
434 Channel
* channel
= context_
.get()->channel_
.get();
435 // Channel must have been created first.
436 DCHECK(channel
) << context_
.get()->channel_id_
;
437 return channel
->TakeClientFileDescriptor();
441 //-----------------------------------------------------------------------------