Add ICU message format support
[chromium-blink-merge.git] / ipc / mojo / ipc_channel_mojo.cc
blob08c3458e8f7f18c7c9e04dff3e84dd8f2348fde5
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/mojo/ipc_channel_mojo.h"
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "ipc/ipc_listener.h"
12 #include "ipc/ipc_logging.h"
13 #include "ipc/ipc_message_attachment_set.h"
14 #include "ipc/ipc_message_macros.h"
15 #include "ipc/mojo/client_channel.mojom.h"
16 #include "ipc/mojo/ipc_mojo_bootstrap.h"
17 #include "ipc/mojo/ipc_mojo_handle_attachment.h"
18 #include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
20 #if defined(OS_POSIX) && !defined(OS_NACL)
21 #include "ipc/ipc_platform_file_attachment_posix.h"
22 #endif
24 namespace IPC {
26 namespace {
28 class MojoChannelFactory : public ChannelFactory {
29 public:
30 MojoChannelFactory(scoped_refptr<base::TaskRunner> io_runner,
31 ChannelHandle channel_handle,
32 Channel::Mode mode,
33 AttachmentBroker* broker)
34 : io_runner_(io_runner),
35 channel_handle_(channel_handle),
36 mode_(mode),
37 broker_(broker) {}
39 std::string GetName() const override {
40 return channel_handle_.name;
43 scoped_ptr<Channel> BuildChannel(Listener* listener) override {
44 return ChannelMojo::Create(io_runner_, channel_handle_, mode_, listener,
45 broker_);
48 private:
49 scoped_refptr<base::TaskRunner> io_runner_;
50 ChannelHandle channel_handle_;
51 Channel::Mode mode_;
52 AttachmentBroker* broker_;
55 //------------------------------------------------------------------------------
57 class ClientChannelMojo : public ChannelMojo, public ClientChannel {
58 public:
59 ClientChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
60 const ChannelHandle& handle,
61 Listener* listener,
62 AttachmentBroker* broker);
63 ~ClientChannelMojo() override;
64 // MojoBootstrap::Delegate implementation
65 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
67 // ClientChannel implementation
68 void Init(
69 mojo::ScopedMessagePipeHandle pipe,
70 int32_t peer_pid,
71 const mojo::Callback<void(int32_t)>& callback) override;
73 private:
74 void BindPipe(mojo::ScopedMessagePipeHandle handle);
75 void OnConnectionError();
77 mojo::Binding<ClientChannel> binding_;
78 base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
80 DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
83 ClientChannelMojo::ClientChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
84 const ChannelHandle& handle,
85 Listener* listener,
86 AttachmentBroker* broker)
87 : ChannelMojo(io_runner, handle, Channel::MODE_CLIENT, listener, broker),
88 binding_(this),
89 weak_factory_(this) {
92 ClientChannelMojo::~ClientChannelMojo() {
95 void ClientChannelMojo::OnPipeAvailable(
96 mojo::embedder::ScopedPlatformHandle handle) {
97 CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
98 weak_factory_.GetWeakPtr()));
101 void ClientChannelMojo::Init(
102 mojo::ScopedMessagePipeHandle pipe,
103 int32_t peer_pid,
104 const mojo::Callback<void(int32_t)>& callback) {
105 InitMessageReader(pipe.Pass(), static_cast<base::ProcessId>(peer_pid));
106 callback.Run(GetSelfPID());
109 void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
110 binding_.Bind(handle.Pass());
113 void ClientChannelMojo::OnConnectionError() {
114 listener()->OnChannelError();
117 //------------------------------------------------------------------------------
119 class ServerChannelMojo : public ChannelMojo {
120 public:
121 ServerChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
122 const ChannelHandle& handle,
123 Listener* listener,
124 AttachmentBroker* broker);
125 ~ServerChannelMojo() override;
127 // MojoBootstrap::Delegate implementation
128 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
129 // Channel override
130 void Close() override;
132 private:
133 void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
134 mojo::ScopedMessagePipeHandle handle);
135 void OnConnectionError();
137 // ClientChannelClient implementation
138 void ClientChannelWasInitialized(int32_t peer_pid);
140 mojo::InterfacePtr<ClientChannel> client_channel_;
141 mojo::ScopedMessagePipeHandle message_pipe_;
142 base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
144 DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
147 ServerChannelMojo::ServerChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
148 const ChannelHandle& handle,
149 Listener* listener,
150 AttachmentBroker* broker)
151 : ChannelMojo(io_runner, handle, Channel::MODE_SERVER, listener, broker),
152 weak_factory_(this) {
155 ServerChannelMojo::~ServerChannelMojo() {
156 Close();
159 void ServerChannelMojo::OnPipeAvailable(
160 mojo::embedder::ScopedPlatformHandle handle) {
161 mojo::ScopedMessagePipeHandle peer;
162 MojoResult create_result =
163 mojo::CreateMessagePipe(nullptr, &message_pipe_, &peer);
164 if (create_result != MOJO_RESULT_OK) {
165 LOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
166 listener()->OnChannelError();
167 return;
169 CreateMessagingPipe(
170 handle.Pass(),
171 base::Bind(&ServerChannelMojo::InitClientChannel,
172 weak_factory_.GetWeakPtr(), base::Passed(&peer)));
175 void ServerChannelMojo::Close() {
176 client_channel_.reset();
177 message_pipe_.reset();
178 ChannelMojo::Close();
181 void ServerChannelMojo::InitClientChannel(
182 mojo::ScopedMessagePipeHandle peer_handle,
183 mojo::ScopedMessagePipeHandle handle) {
184 client_channel_.Bind(
185 mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
186 client_channel_.set_connection_error_handler(base::Bind(
187 &ServerChannelMojo::OnConnectionError, base::Unretained(this)));
188 client_channel_->Init(
189 peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
190 base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
191 base::Unretained(this)));
194 void ServerChannelMojo::OnConnectionError() {
195 listener()->OnChannelError();
198 void ServerChannelMojo::ClientChannelWasInitialized(int32_t peer_pid) {
199 InitMessageReader(message_pipe_.Pass(), peer_pid);
202 #if defined(OS_POSIX) && !defined(OS_NACL)
204 base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
205 return attachment->Owns() ? base::ScopedFD(attachment->TakePlatformFile())
206 : base::ScopedFD(dup(attachment->file()));
209 #endif
211 } // namespace
213 //------------------------------------------------------------------------------
215 ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
216 scoped_refptr<base::TaskRunner> io_runner)
217 : io_runner(io_runner) {
220 ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
223 void ChannelMojo::ChannelInfoDeleter::operator()(
224 mojo::embedder::ChannelInfo* ptr) const {
225 if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
226 mojo::embedder::DestroyChannelOnIOThread(ptr);
227 } else {
228 io_runner->PostTask(
229 FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
233 //------------------------------------------------------------------------------
235 // static
236 bool ChannelMojo::ShouldBeUsed() {
237 // TODO(rockot): Investigate performance bottlenecks and hopefully reenable
238 // this at some point. http://crbug.com/500019
239 return false;
242 // static
243 scoped_ptr<ChannelMojo> ChannelMojo::Create(
244 scoped_refptr<base::TaskRunner> io_runner,
245 const ChannelHandle& channel_handle,
246 Mode mode,
247 Listener* listener,
248 AttachmentBroker* broker) {
249 switch (mode) {
250 case Channel::MODE_CLIENT:
251 return make_scoped_ptr(
252 new ClientChannelMojo(io_runner, channel_handle, listener, broker));
253 case Channel::MODE_SERVER:
254 return make_scoped_ptr(
255 new ServerChannelMojo(io_runner, channel_handle, listener, broker));
256 default:
257 NOTREACHED();
258 return nullptr;
262 // static
263 scoped_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
264 scoped_refptr<base::TaskRunner> io_runner,
265 const ChannelHandle& channel_handle,
266 AttachmentBroker* broker) {
267 return make_scoped_ptr(new MojoChannelFactory(io_runner, channel_handle,
268 Channel::MODE_SERVER, broker));
271 // static
272 scoped_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
273 scoped_refptr<base::TaskRunner> io_runner,
274 const ChannelHandle& channel_handle,
275 AttachmentBroker* broker) {
276 return make_scoped_ptr(new MojoChannelFactory(io_runner, channel_handle,
277 Channel::MODE_CLIENT, broker));
280 ChannelMojo::ChannelMojo(scoped_refptr<base::TaskRunner> io_runner,
281 const ChannelHandle& handle,
282 Mode mode,
283 Listener* listener,
284 AttachmentBroker* broker)
285 : listener_(listener),
286 peer_pid_(base::kNullProcessId),
287 io_runner_(io_runner),
288 channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
289 waiting_connect_(true),
290 weak_factory_(this) {
291 // Create MojoBootstrap after all members are set as it touches
292 // ChannelMojo from a different thread.
293 bootstrap_ = MojoBootstrap::Create(handle, mode, this, broker);
294 if (io_runner == base::MessageLoop::current()->task_runner()) {
295 InitOnIOThread();
296 } else {
297 io_runner->PostTask(FROM_HERE, base::Bind(&ChannelMojo::InitOnIOThread,
298 base::Unretained(this)));
302 ChannelMojo::~ChannelMojo() {
303 Close();
306 void ChannelMojo::InitOnIOThread() {
307 ipc_support_.reset(
308 new ScopedIPCSupport(base::MessageLoop::current()->task_runner()));
311 void ChannelMojo::CreateMessagingPipe(
312 mojo::embedder::ScopedPlatformHandle handle,
313 const CreateMessagingPipeCallback& callback) {
314 auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
315 weak_factory_.GetWeakPtr(), callback);
316 if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
317 CreateMessagingPipeOnIOThread(
318 handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
319 } else {
320 io_runner_->PostTask(
321 FROM_HERE,
322 base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
323 base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
324 return_callback));
328 // static
329 void ChannelMojo::CreateMessagingPipeOnIOThread(
330 mojo::embedder::ScopedPlatformHandle handle,
331 scoped_refptr<base::TaskRunner> callback_runner,
332 const CreateMessagingPipeOnIOThreadCallback& callback) {
333 mojo::embedder::ChannelInfo* channel_info;
334 mojo::ScopedMessagePipeHandle pipe =
335 mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
336 if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
337 callback.Run(pipe.Pass(), channel_info);
338 } else {
339 callback_runner->PostTask(
340 FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
344 void ChannelMojo::OnMessagingPipeCreated(
345 const CreateMessagingPipeCallback& callback,
346 mojo::ScopedMessagePipeHandle handle,
347 mojo::embedder::ChannelInfo* channel_info) {
348 DCHECK(!channel_info_.get());
349 channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
350 channel_info, ChannelInfoDeleter(io_runner_));
351 callback.Run(handle.Pass());
354 bool ChannelMojo::Connect() {
355 DCHECK(!message_reader_);
356 return bootstrap_->Connect();
359 void ChannelMojo::Close() {
360 scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
363 // |message_reader_| has to be cleared inside the lock,
364 // but the instance has to be deleted outside.
365 base::AutoLock l(lock_);
366 to_be_deleted = message_reader_.Pass();
367 // We might Close() before we Connect().
368 waiting_connect_ = false;
371 channel_info_.reset();
372 ipc_support_.reset();
373 to_be_deleted.reset();
376 void ChannelMojo::OnBootstrapError() {
377 listener_->OnChannelError();
380 namespace {
382 // ClosingDeleter calls |CloseWithErrorIfPending| before deleting the
383 // |MessagePipeReader|.
384 struct ClosingDeleter {
385 typedef base::DefaultDeleter<internal::MessagePipeReader> DefaultType;
387 void operator()(internal::MessagePipeReader* ptr) const {
388 ptr->CloseWithErrorIfPending();
389 delete ptr;
393 } // namespace
395 void ChannelMojo::InitMessageReader(mojo::ScopedMessagePipeHandle pipe,
396 int32_t peer_pid) {
397 scoped_ptr<internal::MessagePipeReader, ClosingDeleter> reader(
398 new internal::MessagePipeReader(pipe.Pass(), this));
401 base::AutoLock l(lock_);
402 for (size_t i = 0; i < pending_messages_.size(); ++i) {
403 bool sent = reader->Send(make_scoped_ptr(pending_messages_[i]));
404 pending_messages_[i] = nullptr;
405 if (!sent) {
406 // OnChannelError() is notified through ClosingDeleter.
407 pending_messages_.clear();
408 LOG(ERROR) << "Failed to flush pending messages";
409 return;
413 // We set |message_reader_| here and won't get any |pending_messages_|
414 // hereafter. Although we might have some if there is an error, we don't
415 // care. They cannot be sent anyway.
416 message_reader_.reset(reader.release());
417 pending_messages_.clear();
418 waiting_connect_ = false;
421 set_peer_pid(peer_pid);
422 listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
423 if (message_reader_)
424 message_reader_->ReadMessagesThenWait();
427 void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
428 Close();
431 void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
432 listener_->OnChannelError();
436 // Warning: Keep the implementation thread-safe.
437 bool ChannelMojo::Send(Message* message) {
438 base::AutoLock l(lock_);
439 if (!message_reader_) {
440 pending_messages_.push_back(message);
441 // Counts as OK before the connection is established, but it's an
442 // error otherwise.
443 return waiting_connect_;
446 return message_reader_->Send(make_scoped_ptr(message));
449 bool ChannelMojo::IsSendThreadSafe() const {
450 return true;
453 base::ProcessId ChannelMojo::GetPeerPID() const {
454 return peer_pid_;
457 base::ProcessId ChannelMojo::GetSelfPID() const {
458 return bootstrap_->GetSelfPID();
461 void ChannelMojo::OnMessageReceived(Message& message) {
462 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
463 "class", IPC_MESSAGE_ID_CLASS(message.type()),
464 "line", IPC_MESSAGE_ID_LINE(message.type()));
465 listener_->OnMessageReceived(message);
466 if (message.dispatch_error())
467 listener_->OnBadMessageReceived(message);
470 #if defined(OS_POSIX) && !defined(OS_NACL)
471 int ChannelMojo::GetClientFileDescriptor() const {
472 return bootstrap_->GetClientFileDescriptor();
475 base::ScopedFD ChannelMojo::TakeClientFileDescriptor() {
476 return bootstrap_->TakeClientFileDescriptor();
478 #endif // defined(OS_POSIX) && !defined(OS_NACL)
480 // static
481 MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
482 Message* message,
483 std::vector<MojoHandle>* handles) {
484 // We dup() the handles in IPC::Message to transmit.
485 // IPC::MessageAttachmentSet has intricate lifecycle semantics
486 // of FDs, so just to dup()-and-own them is the safest option.
487 if (message->HasAttachments()) {
488 MessageAttachmentSet* set = message->attachment_set();
489 for (unsigned i = 0; i < set->size(); ++i) {
490 scoped_refptr<MessageAttachment> attachment = set->GetAttachmentAt(i);
491 switch (attachment->GetType()) {
492 case MessageAttachment::TYPE_PLATFORM_FILE:
493 #if defined(OS_POSIX) && !defined(OS_NACL)
495 base::ScopedFD file =
496 TakeOrDupFile(static_cast<IPC::internal::PlatformFileAttachment*>(
497 attachment.get()));
498 if (!file.is_valid()) {
499 DPLOG(WARNING) << "Failed to dup FD to transmit.";
500 set->CommitAll();
501 return MOJO_RESULT_UNKNOWN;
504 MojoHandle wrapped_handle;
505 MojoResult wrap_result = CreatePlatformHandleWrapper(
506 mojo::embedder::ScopedPlatformHandle(
507 mojo::embedder::PlatformHandle(file.release())),
508 &wrapped_handle);
509 if (MOJO_RESULT_OK != wrap_result) {
510 LOG(WARNING) << "Pipe failed to wrap handles. Closing: "
511 << wrap_result;
512 set->CommitAll();
513 return wrap_result;
516 handles->push_back(wrapped_handle);
518 #else
519 NOTREACHED();
520 #endif // defined(OS_POSIX) && !defined(OS_NACL)
521 break;
522 case MessageAttachment::TYPE_MOJO_HANDLE: {
523 mojo::ScopedHandle handle =
524 static_cast<IPC::internal::MojoHandleAttachment*>(
525 attachment.get())->TakeHandle();
526 handles->push_back(handle.release().value());
527 } break;
528 case MessageAttachment::TYPE_BROKERABLE_ATTACHMENT:
529 // Brokerable attachments are handled by the AttachmentBroker so
530 // there's no need to do anything here.
531 NOTREACHED();
532 break;
536 set->CommitAll();
539 return MOJO_RESULT_OK;
542 // static
543 MojoResult ChannelMojo::WriteToMessageAttachmentSet(
544 const std::vector<MojoHandle>& handle_buffer,
545 Message* message) {
546 for (size_t i = 0; i < handle_buffer.size(); ++i) {
547 bool ok = message->attachment_set()->AddAttachment(
548 new IPC::internal::MojoHandleAttachment(
549 mojo::MakeScopedHandle(mojo::Handle(handle_buffer[i]))));
550 DCHECK(ok);
551 if (!ok) {
552 LOG(ERROR) << "Failed to add new Mojo handle.";
553 return MOJO_RESULT_UNKNOWN;
557 return MOJO_RESULT_OK;
560 } // namespace IPC