1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_channel_mojo.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "ipc/ipc_listener.h"
12 #include "ipc/ipc_logging.h"
13 #include "ipc/ipc_message_attachment_set.h"
14 #include "ipc/ipc_message_macros.h"
15 #include "ipc/mojo/client_channel.mojom.h"
16 #include "ipc/mojo/ipc_mojo_bootstrap.h"
17 #include "ipc/mojo/ipc_mojo_handle_attachment.h"
18 #include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
19 #include "third_party/mojo/src/mojo/public/cpp/bindings/binding.h"
21 #if defined(OS_POSIX) && !defined(OS_NACL)
22 #include "ipc/ipc_platform_file_attachment_posix.h"
29 class MojoChannelFactory
: public ChannelFactory
{
31 MojoChannelFactory(scoped_refptr
<base::TaskRunner
> io_runner
,
32 ChannelHandle channel_handle
,
34 AttachmentBroker
* broker
)
35 : io_runner_(io_runner
),
36 channel_handle_(channel_handle
),
40 std::string
GetName() const override
{
41 return channel_handle_
.name
;
44 scoped_ptr
<Channel
> BuildChannel(Listener
* listener
) override
{
45 return ChannelMojo::Create(io_runner_
, channel_handle_
, mode_
, listener
,
50 scoped_refptr
<base::TaskRunner
> io_runner_
;
51 ChannelHandle channel_handle_
;
53 AttachmentBroker
* broker_
;
56 //------------------------------------------------------------------------------
58 class ClientChannelMojo
: public ChannelMojo
, public ClientChannel
{
60 ClientChannelMojo(scoped_refptr
<base::TaskRunner
> io_runner
,
61 const ChannelHandle
& handle
,
63 AttachmentBroker
* broker
);
64 ~ClientChannelMojo() override
;
65 // MojoBootstrap::Delegate implementation
66 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle
) override
;
68 // ClientChannel implementation
70 mojo::ScopedMessagePipeHandle pipe
,
72 const mojo::Callback
<void(int32_t)>& callback
) override
;
75 void BindPipe(mojo::ScopedMessagePipeHandle handle
);
76 void OnConnectionError();
78 mojo::Binding
<ClientChannel
> binding_
;
79 base::WeakPtrFactory
<ClientChannelMojo
> weak_factory_
;
81 DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo
);
84 ClientChannelMojo::ClientChannelMojo(scoped_refptr
<base::TaskRunner
> io_runner
,
85 const ChannelHandle
& handle
,
87 AttachmentBroker
* broker
)
88 : ChannelMojo(io_runner
, handle
, Channel::MODE_CLIENT
, listener
, broker
),
93 ClientChannelMojo::~ClientChannelMojo() {
96 void ClientChannelMojo::OnPipeAvailable(
97 mojo::embedder::ScopedPlatformHandle handle
) {
98 CreateMessagingPipe(handle
.Pass(), base::Bind(&ClientChannelMojo::BindPipe
,
99 weak_factory_
.GetWeakPtr()));
102 void ClientChannelMojo::Init(
103 mojo::ScopedMessagePipeHandle pipe
,
105 const mojo::Callback
<void(int32_t)>& callback
) {
106 InitMessageReader(pipe
.Pass(), static_cast<base::ProcessId
>(peer_pid
));
107 callback
.Run(GetSelfPID());
110 void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle
) {
111 binding_
.Bind(handle
.Pass());
114 void ClientChannelMojo::OnConnectionError() {
115 listener()->OnChannelError();
118 //------------------------------------------------------------------------------
120 class ServerChannelMojo
: public ChannelMojo
{
122 ServerChannelMojo(scoped_refptr
<base::TaskRunner
> io_runner
,
123 const ChannelHandle
& handle
,
125 AttachmentBroker
* broker
);
126 ~ServerChannelMojo() override
;
128 // MojoBootstrap::Delegate implementation
129 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle
) override
;
131 void Close() override
;
134 void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle
,
135 mojo::ScopedMessagePipeHandle handle
);
136 void OnConnectionError();
138 // ClientChannelClient implementation
139 void ClientChannelWasInitialized(int32_t peer_pid
);
141 mojo::InterfacePtr
<ClientChannel
> client_channel_
;
142 mojo::ScopedMessagePipeHandle message_pipe_
;
143 base::WeakPtrFactory
<ServerChannelMojo
> weak_factory_
;
145 DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo
);
148 ServerChannelMojo::ServerChannelMojo(scoped_refptr
<base::TaskRunner
> io_runner
,
149 const ChannelHandle
& handle
,
151 AttachmentBroker
* broker
)
152 : ChannelMojo(io_runner
, handle
, Channel::MODE_SERVER
, listener
, broker
),
153 weak_factory_(this) {
156 ServerChannelMojo::~ServerChannelMojo() {
160 void ServerChannelMojo::OnPipeAvailable(
161 mojo::embedder::ScopedPlatformHandle handle
) {
162 mojo::ScopedMessagePipeHandle peer
;
163 MojoResult create_result
=
164 mojo::CreateMessagePipe(nullptr, &message_pipe_
, &peer
);
165 if (create_result
!= MOJO_RESULT_OK
) {
166 LOG(WARNING
) << "mojo::CreateMessagePipe failed: " << create_result
;
167 listener()->OnChannelError();
172 base::Bind(&ServerChannelMojo::InitClientChannel
,
173 weak_factory_
.GetWeakPtr(), base::Passed(&peer
)));
176 void ServerChannelMojo::Close() {
177 client_channel_
.reset();
178 message_pipe_
.reset();
179 ChannelMojo::Close();
182 void ServerChannelMojo::InitClientChannel(
183 mojo::ScopedMessagePipeHandle peer_handle
,
184 mojo::ScopedMessagePipeHandle handle
) {
185 client_channel_
.Bind(
186 mojo::InterfacePtrInfo
<ClientChannel
>(handle
.Pass(), 0u));
187 client_channel_
.set_connection_error_handler(base::Bind(
188 &ServerChannelMojo::OnConnectionError
, base::Unretained(this)));
189 client_channel_
->Init(
190 peer_handle
.Pass(), static_cast<int32_t>(GetSelfPID()),
191 base::Bind(&ServerChannelMojo::ClientChannelWasInitialized
,
192 base::Unretained(this)));
195 void ServerChannelMojo::OnConnectionError() {
196 listener()->OnChannelError();
199 void ServerChannelMojo::ClientChannelWasInitialized(int32_t peer_pid
) {
200 InitMessageReader(message_pipe_
.Pass(), peer_pid
);
203 #if defined(OS_POSIX) && !defined(OS_NACL)
205 base::ScopedFD
TakeOrDupFile(internal::PlatformFileAttachment
* attachment
) {
206 return attachment
->Owns() ? base::ScopedFD(attachment
->TakePlatformFile())
207 : base::ScopedFD(dup(attachment
->file()));
214 //------------------------------------------------------------------------------
216 ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
217 scoped_refptr
<base::TaskRunner
> io_runner
)
218 : io_runner(io_runner
) {
221 ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
224 void ChannelMojo::ChannelInfoDeleter::operator()(
225 mojo::embedder::ChannelInfo
* ptr
) const {
226 if (base::ThreadTaskRunnerHandle::Get() == io_runner
) {
227 mojo::embedder::DestroyChannelOnIOThread(ptr
);
230 FROM_HERE
, base::Bind(&mojo::embedder::DestroyChannelOnIOThread
, ptr
));
234 //------------------------------------------------------------------------------
237 bool ChannelMojo::ShouldBeUsed() {
238 // TODO(rockot): Investigate performance bottlenecks and hopefully reenable
239 // this at some point. http://crbug.com/500019
244 scoped_ptr
<ChannelMojo
> ChannelMojo::Create(
245 scoped_refptr
<base::TaskRunner
> io_runner
,
246 const ChannelHandle
& channel_handle
,
249 AttachmentBroker
* broker
) {
251 case Channel::MODE_CLIENT
:
252 return make_scoped_ptr(
253 new ClientChannelMojo(io_runner
, channel_handle
, listener
, broker
));
254 case Channel::MODE_SERVER
:
255 return make_scoped_ptr(
256 new ServerChannelMojo(io_runner
, channel_handle
, listener
, broker
));
264 scoped_ptr
<ChannelFactory
> ChannelMojo::CreateServerFactory(
265 scoped_refptr
<base::TaskRunner
> io_runner
,
266 const ChannelHandle
& channel_handle
,
267 AttachmentBroker
* broker
) {
268 return make_scoped_ptr(new MojoChannelFactory(io_runner
, channel_handle
,
269 Channel::MODE_SERVER
, broker
));
273 scoped_ptr
<ChannelFactory
> ChannelMojo::CreateClientFactory(
274 scoped_refptr
<base::TaskRunner
> io_runner
,
275 const ChannelHandle
& channel_handle
,
276 AttachmentBroker
* broker
) {
277 return make_scoped_ptr(new MojoChannelFactory(io_runner
, channel_handle
,
278 Channel::MODE_CLIENT
, broker
));
281 ChannelMojo::ChannelMojo(scoped_refptr
<base::TaskRunner
> io_runner
,
282 const ChannelHandle
& handle
,
285 AttachmentBroker
* broker
)
286 : listener_(listener
),
287 peer_pid_(base::kNullProcessId
),
288 io_runner_(io_runner
),
289 channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
290 waiting_connect_(true),
291 weak_factory_(this) {
292 // Create MojoBootstrap after all members are set as it touches
293 // ChannelMojo from a different thread.
294 bootstrap_
= MojoBootstrap::Create(handle
, mode
, this, broker
);
295 if (io_runner
== base::MessageLoop::current()->task_runner()) {
298 io_runner
->PostTask(FROM_HERE
, base::Bind(&ChannelMojo::InitOnIOThread
,
299 base::Unretained(this)));
303 ChannelMojo::~ChannelMojo() {
307 void ChannelMojo::InitOnIOThread() {
309 new ScopedIPCSupport(base::MessageLoop::current()->task_runner()));
312 void ChannelMojo::CreateMessagingPipe(
313 mojo::embedder::ScopedPlatformHandle handle
,
314 const CreateMessagingPipeCallback
& callback
) {
315 auto return_callback
= base::Bind(&ChannelMojo::OnMessagingPipeCreated
,
316 weak_factory_
.GetWeakPtr(), callback
);
317 if (base::ThreadTaskRunnerHandle::Get() == io_runner_
) {
318 CreateMessagingPipeOnIOThread(
319 handle
.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback
);
321 io_runner_
->PostTask(
323 base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread
,
324 base::Passed(&handle
), base::ThreadTaskRunnerHandle::Get(),
330 void ChannelMojo::CreateMessagingPipeOnIOThread(
331 mojo::embedder::ScopedPlatformHandle handle
,
332 scoped_refptr
<base::TaskRunner
> callback_runner
,
333 const CreateMessagingPipeOnIOThreadCallback
& callback
) {
334 mojo::embedder::ChannelInfo
* channel_info
;
335 mojo::ScopedMessagePipeHandle pipe
=
336 mojo::embedder::CreateChannelOnIOThread(handle
.Pass(), &channel_info
);
337 if (base::ThreadTaskRunnerHandle::Get() == callback_runner
) {
338 callback
.Run(pipe
.Pass(), channel_info
);
340 callback_runner
->PostTask(
341 FROM_HERE
, base::Bind(callback
, base::Passed(&pipe
), channel_info
));
345 void ChannelMojo::OnMessagingPipeCreated(
346 const CreateMessagingPipeCallback
& callback
,
347 mojo::ScopedMessagePipeHandle handle
,
348 mojo::embedder::ChannelInfo
* channel_info
) {
349 DCHECK(!channel_info_
.get());
350 channel_info_
= scoped_ptr
<mojo::embedder::ChannelInfo
, ChannelInfoDeleter
>(
351 channel_info
, ChannelInfoDeleter(io_runner_
));
352 callback
.Run(handle
.Pass());
355 bool ChannelMojo::Connect() {
356 DCHECK(!message_reader_
);
357 return bootstrap_
->Connect();
360 void ChannelMojo::Close() {
361 scoped_ptr
<internal::MessagePipeReader
, ReaderDeleter
> to_be_deleted
;
364 // |message_reader_| has to be cleared inside the lock,
365 // but the instance has to be deleted outside.
366 base::AutoLock
l(lock_
);
367 to_be_deleted
= message_reader_
.Pass();
368 // We might Close() before we Connect().
369 waiting_connect_
= false;
372 channel_info_
.reset();
373 ipc_support_
.reset();
374 to_be_deleted
.reset();
377 void ChannelMojo::OnBootstrapError() {
378 listener_
->OnChannelError();
383 // ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
384 // |MessagePipeReader|.
385 struct ClosingDeleter
{
386 typedef base::DefaultDeleter
<internal::MessagePipeReader
> DefaultType
;
388 void operator()(internal::MessagePipeReader
* ptr
) const {
389 ptr
->CloseWithErrorIfPending();
396 void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe
,
398 scoped_ptr
<internal::MessagePipeReader
, ClosingDeleter
> reader(
399 new internal::MessagePipeReader(pipe
.Pass(), this));
402 base::AutoLock
l(lock_
);
403 for (size_t i
= 0; i
< pending_messages_
.size(); ++i
) {
404 bool sent
= reader
->Send(make_scoped_ptr(pending_messages_
[i
]));
405 pending_messages_
[i
] = nullptr;
407 // OnChannelError() is notified through ClosingDeleter.
408 pending_messages_
.clear();
409 LOG(ERROR
) << "Failed to flush pending messages";
414 // We set |message_reader_| here and won't get any |pending_messages_|
415 // hereafter. Although we might have some if there is an error, we don't
416 // care. They cannot be sent anyway.
417 message_reader_
.reset(reader
.release());
418 pending_messages_
.clear();
419 waiting_connect_
= false;
422 set_peer_pid(peer_pid
);
423 listener_
->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
425 message_reader_
->ReadMessagesThenWait();
428 void ChannelMojo::OnPipeClosed(internal::MessagePipeReader
* reader
) {
432 void ChannelMojo::OnPipeError(internal::MessagePipeReader
* reader
) {
433 listener_
->OnChannelError();
437 // Warning: Keep the implementation thread-safe.
438 bool ChannelMojo::Send(Message
* message
) {
439 base::AutoLock
l(lock_
);
440 if (!message_reader_
) {
441 pending_messages_
.push_back(message
);
442 // Counts as OK before the connection is established, but it's an
444 return waiting_connect_
;
447 return message_reader_
->Send(make_scoped_ptr(message
));
450 bool ChannelMojo::IsSendThreadSafe() const {
454 base::ProcessId
ChannelMojo::GetPeerPID() const {
458 base::ProcessId
ChannelMojo::GetSelfPID() const {
459 return bootstrap_
->GetSelfPID();
462 void ChannelMojo::OnMessageReceived(Message
& message
) {
463 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
464 "class", IPC_MESSAGE_ID_CLASS(message
.type()),
465 "line", IPC_MESSAGE_ID_LINE(message
.type()));
466 listener_
->OnMessageReceived(message
);
467 if (message
.dispatch_error())
468 listener_
->OnBadMessageReceived(message
);
471 #if defined(OS_POSIX) && !defined(OS_NACL)
472 int ChannelMojo::GetClientFileDescriptor() const {
473 return bootstrap_
->GetClientFileDescriptor();
476 base::ScopedFD
ChannelMojo::TakeClientFileDescriptor() {
477 return bootstrap_
->TakeClientFileDescriptor();
479 #endif // defined(OS_POSIX) && !defined(OS_NACL)
482 MojoResult
ChannelMojo::ReadFromMessageAttachmentSet(
484 std::vector
<MojoHandle
>* handles
) {
485 // We dup() the handles in IPC::Message to transmit.
486 // IPC::MessageAttachmentSet has intricate lifecycle semantics
487 // of FDs, so just to dup()-and-own them is the safest option.
488 if (message
->HasAttachments()) {
489 MessageAttachmentSet
* set
= message
->attachment_set();
490 for (unsigned i
= 0; i
< set
->size(); ++i
) {
491 scoped_refptr
<MessageAttachment
> attachment
= set
->GetAttachmentAt(i
);
492 switch (attachment
->GetType()) {
493 case MessageAttachment::TYPE_PLATFORM_FILE
:
494 #if defined(OS_POSIX) && !defined(OS_NACL)
496 base::ScopedFD file
=
497 TakeOrDupFile(static_cast<IPC::internal::PlatformFileAttachment
*>(
499 if (!file
.is_valid()) {
500 DPLOG(WARNING
) << "Failed to dup FD to transmit.";
502 return MOJO_RESULT_UNKNOWN
;
505 MojoHandle wrapped_handle
;
506 MojoResult wrap_result
= CreatePlatformHandleWrapper(
507 mojo::embedder::ScopedPlatformHandle(
508 mojo::embedder::PlatformHandle(file
.release())),
510 if (MOJO_RESULT_OK
!= wrap_result
) {
511 LOG(WARNING
) << "Pipe failed to wrap handles. Closing: "
517 handles
->push_back(wrapped_handle
);
521 #endif // defined(OS_POSIX) && !defined(OS_NACL)
523 case MessageAttachment::TYPE_MOJO_HANDLE
: {
524 mojo::ScopedHandle handle
=
525 static_cast<IPC::internal::MojoHandleAttachment
*>(
526 attachment
.get())->TakeHandle();
527 handles
->push_back(handle
.release().value());
529 case MessageAttachment::TYPE_BROKERABLE_ATTACHMENT
:
530 // Brokerable attachments are handled by the AttachmentBroker so
531 // there's no need to do anything here.
540 return MOJO_RESULT_OK
;
544 MojoResult
ChannelMojo::WriteToMessageAttachmentSet(
545 const std::vector
<MojoHandle
>& handle_buffer
,
547 for (size_t i
= 0; i
< handle_buffer
.size(); ++i
) {
548 bool ok
= message
->attachment_set()->AddAttachment(
549 new IPC::internal::MojoHandleAttachment(
550 mojo::MakeScopedHandle(mojo::Handle(handle_buffer
[i
]))));
553 LOG(ERROR
) << "Failed to add new Mojo handle.";
554 return MOJO_RESULT_UNKNOWN
;
558 return MOJO_RESULT_OK
;