1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
11 #include "base/bind.h"
12 #include "base/file_util.h"
13 #include "base/files/file_path.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/threading/platform_thread.h" // For |Sleep()|.
21 #include "build/build_config.h" // TODO(vtl): Remove this.
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/platform_channel_pair.h"
24 #include "mojo/embedder/platform_shared_buffer.h"
25 #include "mojo/embedder/scoped_platform_handle.h"
26 #include "mojo/embedder/simple_platform_support.h"
27 #include "mojo/system/channel.h"
28 #include "mojo/system/message_pipe.h"
29 #include "mojo/system/message_pipe_dispatcher.h"
30 #include "mojo/system/platform_handle_dispatcher.h"
31 #include "mojo/system/raw_channel.h"
32 #include "mojo/system/shared_buffer_dispatcher.h"
33 #include "mojo/system/test_utils.h"
34 #include "mojo/system/waiter.h"
35 #include "testing/gtest/include/gtest/gtest.h"
41 class RemoteMessagePipeTest
: public testing::Test
{
43 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart
) {}
44 virtual ~RemoteMessagePipeTest() {}
46 virtual void SetUp() OVERRIDE
{
47 io_thread_
.PostTaskAndWait(
49 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread
,
50 base::Unretained(this)));
53 virtual void TearDown() OVERRIDE
{
54 io_thread_
.PostTaskAndWait(
56 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread
,
57 base::Unretained(this)));
61 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
62 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
63 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
64 void ConnectMessagePipes(scoped_refptr
<MessagePipe
> mp0
,
65 scoped_refptr
<MessagePipe
> mp1
) {
66 io_thread_
.PostTaskAndWait(
68 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread
,
69 base::Unretained(this),
74 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
75 // It assumes/requires that this is the bootstrap case, i.e., that the
76 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
77 // returns *without* waiting for it to finish connecting.
78 void BootstrapMessagePipeNoWait(unsigned channel_index
,
79 scoped_refptr
<MessagePipe
> mp
) {
82 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread
,
83 base::Unretained(this),
88 void RestoreInitialState() {
89 io_thread_
.PostTaskAndWait(
91 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread
,
92 base::Unretained(this)));
95 embedder::PlatformSupport
* platform_support() { return &platform_support_
; }
96 test::TestIOThread
* io_thread() { return &io_thread_
; }
99 void SetUpOnIOThread() {
100 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
102 embedder::PlatformChannelPair channel_pair
;
103 platform_handles_
[0] = channel_pair
.PassServerHandle();
104 platform_handles_
[1] = channel_pair
.PassClientHandle();
107 void TearDownOnIOThread() {
108 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
110 if (channels_
[0].get()) {
111 channels_
[0]->Shutdown();
114 if (channels_
[1].get()) {
115 channels_
[1]->Shutdown();
120 void CreateAndInitChannel(unsigned channel_index
) {
121 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
122 CHECK(channel_index
== 0 || channel_index
== 1);
123 CHECK(!channels_
[channel_index
].get());
125 channels_
[channel_index
] = new Channel(&platform_support_
);
126 CHECK(channels_
[channel_index
]->Init(
127 RawChannel::Create(platform_handles_
[channel_index
].Pass())));
130 void ConnectMessagePipesOnIOThread(scoped_refptr
<MessagePipe
> mp0
,
131 scoped_refptr
<MessagePipe
> mp1
) {
132 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
134 if (!channels_
[0].get())
135 CreateAndInitChannel(0);
136 if (!channels_
[1].get())
137 CreateAndInitChannel(1);
139 MessageInTransit::EndpointId local_id0
=
140 channels_
[0]->AttachMessagePipeEndpoint(mp0
, 1);
141 MessageInTransit::EndpointId local_id1
=
142 channels_
[1]->AttachMessagePipeEndpoint(mp1
, 0);
144 CHECK(channels_
[0]->RunMessagePipeEndpoint(local_id0
, local_id1
));
145 CHECK(channels_
[1]->RunMessagePipeEndpoint(local_id1
, local_id0
));
148 void BootstrapMessagePipeOnIOThread(unsigned channel_index
,
149 scoped_refptr
<MessagePipe
> mp
) {
150 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
151 CHECK(channel_index
== 0 || channel_index
== 1);
153 unsigned port
= channel_index
^ 1u;
155 CreateAndInitChannel(channel_index
);
156 MessageInTransit::EndpointId endpoint_id
=
157 channels_
[channel_index
]->AttachMessagePipeEndpoint(mp
, port
);
158 if (endpoint_id
== MessageInTransit::kInvalidEndpointId
)
161 CHECK_EQ(endpoint_id
, Channel::kBootstrapEndpointId
);
162 CHECK(channels_
[channel_index
]->RunMessagePipeEndpoint(
163 Channel::kBootstrapEndpointId
, Channel::kBootstrapEndpointId
));
166 void RestoreInitialStateOnIOThread() {
167 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
169 TearDownOnIOThread();
173 embedder::SimplePlatformSupport platform_support_
;
174 test::TestIOThread io_thread_
;
175 embedder::ScopedPlatformHandle platform_handles_
[2];
176 scoped_refptr
<Channel
> channels_
[2];
178 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest
);
181 TEST_F(RemoteMessagePipeTest
, Basic
) {
182 static const char kHello
[] = "hello";
183 static const char kWorld
[] = "world!!!1!!!1!";
184 char buffer
[100] = {0};
185 uint32_t buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
187 HandleSignalsState hss
;
188 uint32_t context
= 0;
190 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
191 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
192 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
194 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
195 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
196 ConnectMessagePipes(mp0
, mp1
);
198 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
200 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
201 // it later, it might already be readable.)
203 ASSERT_EQ(MOJO_RESULT_OK
,
204 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
206 // Write to MP 0, port 0.
207 EXPECT_EQ(MOJO_RESULT_OK
,
209 UserPointer
<const void>(kHello
),
212 MOJO_WRITE_MESSAGE_FLAG_NONE
));
215 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
216 EXPECT_EQ(123u, context
);
217 hss
= HandleSignalsState();
218 mp1
->RemoveWaiter(1, &waiter
, &hss
);
219 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
220 hss
.satisfied_signals
);
221 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
222 hss
.satisfiable_signals
);
224 // Read from MP 1, port 1.
225 EXPECT_EQ(MOJO_RESULT_OK
,
227 UserPointer
<void>(buffer
),
228 MakeUserPointer(&buffer_size
),
231 MOJO_READ_MESSAGE_FLAG_NONE
));
232 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(buffer_size
));
233 EXPECT_STREQ(kHello
, buffer
);
235 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
238 ASSERT_EQ(MOJO_RESULT_OK
,
239 mp0
->AddWaiter(0, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 456, NULL
));
241 EXPECT_EQ(MOJO_RESULT_OK
,
243 UserPointer
<const void>(kWorld
),
246 MOJO_WRITE_MESSAGE_FLAG_NONE
));
248 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
249 EXPECT_EQ(456u, context
);
250 hss
= HandleSignalsState();
251 mp0
->RemoveWaiter(0, &waiter
, &hss
);
252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
253 hss
.satisfied_signals
);
254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
255 hss
.satisfiable_signals
);
257 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
258 EXPECT_EQ(MOJO_RESULT_OK
,
260 UserPointer
<void>(buffer
),
261 MakeUserPointer(&buffer_size
),
264 MOJO_READ_MESSAGE_FLAG_NONE
));
265 EXPECT_EQ(sizeof(kWorld
), static_cast<size_t>(buffer_size
));
266 EXPECT_STREQ(kWorld
, buffer
);
268 // Close MP 0, port 0.
271 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
272 // when it realizes that MP 0, port 0 has been closed. (It may also fail
275 hss
= HandleSignalsState();
277 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 789, &hss
);
278 if (result
== MOJO_RESULT_OK
) {
279 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
,
280 waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
281 EXPECT_EQ(789u, context
);
282 hss
= HandleSignalsState();
283 mp1
->RemoveWaiter(1, &waiter
, &hss
);
285 EXPECT_EQ(0u, hss
.satisfied_signals
);
286 EXPECT_EQ(0u, hss
.satisfiable_signals
);
292 TEST_F(RemoteMessagePipeTest
, Multiplex
) {
293 static const char kHello
[] = "hello";
294 static const char kWorld
[] = "world!!!1!!!1!";
295 char buffer
[100] = {0};
296 uint32_t buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
298 HandleSignalsState hss
;
299 uint32_t context
= 0;
301 // Connect message pipes as in the |Basic| test.
303 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
304 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
305 ConnectMessagePipes(mp0
, mp1
);
307 // Now put another message pipe on the channel.
309 scoped_refptr
<MessagePipe
> mp2(MessagePipe::CreateLocalProxy());
310 scoped_refptr
<MessagePipe
> mp3(MessagePipe::CreateProxyLocal());
311 ConnectMessagePipes(mp2
, mp3
);
313 // Write: MP 2, port 0 -> MP 3, port 1.
316 ASSERT_EQ(MOJO_RESULT_OK
,
317 mp3
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 789, NULL
));
319 EXPECT_EQ(MOJO_RESULT_OK
,
321 UserPointer
<const void>(kHello
),
324 MOJO_WRITE_MESSAGE_FLAG_NONE
));
326 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
327 EXPECT_EQ(789u, context
);
328 hss
= HandleSignalsState();
329 mp3
->RemoveWaiter(1, &waiter
, &hss
);
330 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
331 hss
.satisfied_signals
);
332 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
333 hss
.satisfiable_signals
);
335 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
336 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
337 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
339 UserPointer
<void>(buffer
),
340 MakeUserPointer(&buffer_size
),
343 MOJO_READ_MESSAGE_FLAG_NONE
));
344 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
345 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
347 UserPointer
<void>(buffer
),
348 MakeUserPointer(&buffer_size
),
351 MOJO_READ_MESSAGE_FLAG_NONE
));
352 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
353 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
355 UserPointer
<void>(buffer
),
356 MakeUserPointer(&buffer_size
),
359 MOJO_READ_MESSAGE_FLAG_NONE
));
361 // Read from MP 3, port 1.
362 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
363 EXPECT_EQ(MOJO_RESULT_OK
,
365 UserPointer
<void>(buffer
),
366 MakeUserPointer(&buffer_size
),
369 MOJO_READ_MESSAGE_FLAG_NONE
));
370 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(buffer_size
));
371 EXPECT_STREQ(kHello
, buffer
);
373 // Write: MP 0, port 0 -> MP 1, port 1 again.
376 ASSERT_EQ(MOJO_RESULT_OK
,
377 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
379 EXPECT_EQ(MOJO_RESULT_OK
,
381 UserPointer
<const void>(kWorld
),
384 MOJO_WRITE_MESSAGE_FLAG_NONE
));
386 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
387 EXPECT_EQ(123u, context
);
388 hss
= HandleSignalsState();
389 mp1
->RemoveWaiter(1, &waiter
, &hss
);
390 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
391 hss
.satisfied_signals
);
392 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
393 hss
.satisfiable_signals
);
395 // Make sure there's nothing on the other ports.
396 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
397 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
399 UserPointer
<void>(buffer
),
400 MakeUserPointer(&buffer_size
),
403 MOJO_READ_MESSAGE_FLAG_NONE
));
404 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
405 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
407 UserPointer
<void>(buffer
),
408 MakeUserPointer(&buffer_size
),
411 MOJO_READ_MESSAGE_FLAG_NONE
));
412 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
413 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
415 UserPointer
<void>(buffer
),
416 MakeUserPointer(&buffer_size
),
419 MOJO_READ_MESSAGE_FLAG_NONE
));
421 buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
422 EXPECT_EQ(MOJO_RESULT_OK
,
424 UserPointer
<void>(buffer
),
425 MakeUserPointer(&buffer_size
),
428 MOJO_READ_MESSAGE_FLAG_NONE
));
429 EXPECT_EQ(sizeof(kWorld
), static_cast<size_t>(buffer_size
));
430 EXPECT_STREQ(kWorld
, buffer
);
438 TEST_F(RemoteMessagePipeTest
, CloseBeforeConnect
) {
439 static const char kHello
[] = "hello";
440 char buffer
[100] = {0};
441 uint32_t buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
443 HandleSignalsState hss
;
444 uint32_t context
= 0;
446 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
447 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
448 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
450 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
452 // Write to MP 0, port 0.
453 EXPECT_EQ(MOJO_RESULT_OK
,
455 UserPointer
<const void>(kHello
),
458 MOJO_WRITE_MESSAGE_FLAG_NONE
));
460 BootstrapMessagePipeNoWait(0, mp0
);
462 // Close MP 0, port 0 before channel 1 is even connected.
465 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
467 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
468 // it later, it might already be readable.)
470 ASSERT_EQ(MOJO_RESULT_OK
,
471 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
473 BootstrapMessagePipeNoWait(1, mp1
);
476 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
477 EXPECT_EQ(123u, context
);
478 hss
= HandleSignalsState();
479 // Note: MP 1, port 1 should definitely should be readable, but it may or may
480 // not appear as writable (there's a race, and it may not have noticed that
481 // the other side was closed yet -- e.g., inserting a sleep here would make it
482 // much more likely to notice that it's no longer writable).
483 mp1
->RemoveWaiter(1, &waiter
, &hss
);
484 EXPECT_TRUE((hss
.satisfied_signals
& MOJO_HANDLE_SIGNAL_READABLE
));
485 EXPECT_TRUE((hss
.satisfiable_signals
& MOJO_HANDLE_SIGNAL_READABLE
));
487 // Read from MP 1, port 1.
488 EXPECT_EQ(MOJO_RESULT_OK
,
490 UserPointer
<void>(buffer
),
491 MakeUserPointer(&buffer_size
),
494 MOJO_READ_MESSAGE_FLAG_NONE
));
495 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(buffer_size
));
496 EXPECT_STREQ(kHello
, buffer
);
502 TEST_F(RemoteMessagePipeTest
, HandlePassing
) {
503 static const char kHello
[] = "hello";
505 HandleSignalsState hss
;
506 uint32_t context
= 0;
508 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
509 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
510 ConnectMessagePipes(mp0
, mp1
);
512 // We'll try to pass this dispatcher.
513 scoped_refptr
<MessagePipeDispatcher
> dispatcher(
514 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
515 scoped_refptr
<MessagePipe
> local_mp(MessagePipe::CreateLocalLocal());
516 dispatcher
->Init(local_mp
, 0);
518 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
519 // it later, it might already be readable.)
521 ASSERT_EQ(MOJO_RESULT_OK
,
522 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
524 // Write to MP 0, port 0.
526 DispatcherTransport
transport(
527 test::DispatcherTryStartTransport(dispatcher
.get()));
528 EXPECT_TRUE(transport
.is_valid());
530 std::vector
<DispatcherTransport
> transports
;
531 transports
.push_back(transport
);
532 EXPECT_EQ(MOJO_RESULT_OK
,
534 UserPointer
<const void>(kHello
),
537 MOJO_WRITE_MESSAGE_FLAG_NONE
));
540 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
541 // |dispatcher| is destroyed.
542 EXPECT_TRUE(dispatcher
->HasOneRef());
547 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
548 EXPECT_EQ(123u, context
);
549 hss
= HandleSignalsState();
550 mp1
->RemoveWaiter(1, &waiter
, &hss
);
551 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
552 hss
.satisfied_signals
);
553 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
554 hss
.satisfiable_signals
);
556 // Read from MP 1, port 1.
557 char read_buffer
[100] = {0};
558 uint32_t read_buffer_size
= static_cast<uint32_t>(sizeof(read_buffer
));
559 DispatcherVector read_dispatchers
;
560 uint32_t read_num_dispatchers
= 10; // Maximum to get.
561 EXPECT_EQ(MOJO_RESULT_OK
,
563 UserPointer
<void>(read_buffer
),
564 MakeUserPointer(&read_buffer_size
),
566 &read_num_dispatchers
,
567 MOJO_READ_MESSAGE_FLAG_NONE
));
568 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(read_buffer_size
));
569 EXPECT_STREQ(kHello
, read_buffer
);
570 EXPECT_EQ(1u, read_dispatchers
.size());
571 EXPECT_EQ(1u, read_num_dispatchers
);
572 ASSERT_TRUE(read_dispatchers
[0].get());
573 EXPECT_TRUE(read_dispatchers
[0]->HasOneRef());
575 EXPECT_EQ(Dispatcher::kTypeMessagePipe
, read_dispatchers
[0]->GetType());
576 dispatcher
= static_cast<MessagePipeDispatcher
*>(read_dispatchers
[0].get());
578 // Add the waiter now, before it becomes readable to avoid a race.
582 dispatcher
->AddWaiter(&waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 456, NULL
));
584 // Write to "local_mp", port 1.
585 EXPECT_EQ(MOJO_RESULT_OK
,
586 local_mp
->WriteMessage(1,
587 UserPointer
<const void>(kHello
),
590 MOJO_WRITE_MESSAGE_FLAG_NONE
));
592 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
593 // here. (We don't crash if I sleep and then close.)
595 // Wait for the dispatcher to become readable.
596 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
597 EXPECT_EQ(456u, context
);
598 hss
= HandleSignalsState();
599 dispatcher
->RemoveWaiter(&waiter
, &hss
);
600 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
601 hss
.satisfied_signals
);
602 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
603 hss
.satisfiable_signals
);
605 // Read from the dispatcher.
606 memset(read_buffer
, 0, sizeof(read_buffer
));
607 read_buffer_size
= static_cast<uint32_t>(sizeof(read_buffer
));
608 EXPECT_EQ(MOJO_RESULT_OK
,
609 dispatcher
->ReadMessage(UserPointer
<void>(read_buffer
),
610 MakeUserPointer(&read_buffer_size
),
613 MOJO_READ_MESSAGE_FLAG_NONE
));
614 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(read_buffer_size
));
615 EXPECT_STREQ(kHello
, read_buffer
);
617 // Prepare to wait on "local_mp", port 1.
621 local_mp
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 789, NULL
));
623 // Write to the dispatcher.
624 EXPECT_EQ(MOJO_RESULT_OK
,
625 dispatcher
->WriteMessage(UserPointer
<const void>(kHello
),
628 MOJO_WRITE_MESSAGE_FLAG_NONE
));
631 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
632 EXPECT_EQ(789u, context
);
633 hss
= HandleSignalsState();
634 local_mp
->RemoveWaiter(1, &waiter
, &hss
);
635 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
636 hss
.satisfied_signals
);
637 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
638 hss
.satisfiable_signals
);
640 // Read from "local_mp", port 1.
641 memset(read_buffer
, 0, sizeof(read_buffer
));
642 read_buffer_size
= static_cast<uint32_t>(sizeof(read_buffer
));
643 EXPECT_EQ(MOJO_RESULT_OK
,
644 local_mp
->ReadMessage(1,
645 UserPointer
<void>(read_buffer
),
646 MakeUserPointer(&read_buffer_size
),
649 MOJO_READ_MESSAGE_FLAG_NONE
));
650 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(read_buffer_size
));
651 EXPECT_STREQ(kHello
, read_buffer
);
653 // TODO(vtl): Also test that messages queued up before the handle was sent are
654 // delivered properly.
656 // Close everything that belongs to us.
659 EXPECT_EQ(MOJO_RESULT_OK
, dispatcher
->Close());
660 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
664 #if defined(OS_POSIX)
665 #define MAYBE_SharedBufferPassing SharedBufferPassing
667 // Not yet implemented (on Windows).
668 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
670 TEST_F(RemoteMessagePipeTest
, MAYBE_SharedBufferPassing
) {
671 static const char kHello
[] = "hello";
673 HandleSignalsState hss
;
674 uint32_t context
= 0;
676 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
677 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
678 ConnectMessagePipes(mp0
, mp1
);
680 // We'll try to pass this dispatcher.
681 scoped_refptr
<SharedBufferDispatcher
> dispatcher
;
682 EXPECT_EQ(MOJO_RESULT_OK
,
683 SharedBufferDispatcher::Create(
685 SharedBufferDispatcher::kDefaultCreateOptions
,
688 ASSERT_TRUE(dispatcher
.get());
691 scoped_ptr
<embedder::PlatformSharedBufferMapping
> mapping0
;
694 dispatcher
->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE
, &mapping0
));
695 ASSERT_TRUE(mapping0
);
696 ASSERT_TRUE(mapping0
->GetBase());
697 ASSERT_EQ(100u, mapping0
->GetLength());
698 static_cast<char*>(mapping0
->GetBase())[0] = 'A';
699 static_cast<char*>(mapping0
->GetBase())[50] = 'B';
700 static_cast<char*>(mapping0
->GetBase())[99] = 'C';
702 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
703 // it later, it might already be readable.)
705 ASSERT_EQ(MOJO_RESULT_OK
,
706 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
708 // Write to MP 0, port 0.
710 DispatcherTransport
transport(
711 test::DispatcherTryStartTransport(dispatcher
.get()));
712 EXPECT_TRUE(transport
.is_valid());
714 std::vector
<DispatcherTransport
> transports
;
715 transports
.push_back(transport
);
716 EXPECT_EQ(MOJO_RESULT_OK
,
718 UserPointer
<const void>(kHello
),
721 MOJO_WRITE_MESSAGE_FLAG_NONE
));
724 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
725 // |dispatcher| is destroyed.
726 EXPECT_TRUE(dispatcher
->HasOneRef());
731 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
732 EXPECT_EQ(123u, context
);
733 hss
= HandleSignalsState();
734 mp1
->RemoveWaiter(1, &waiter
, &hss
);
735 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
736 hss
.satisfied_signals
);
737 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
738 hss
.satisfiable_signals
);
740 // Read from MP 1, port 1.
741 char read_buffer
[100] = {0};
742 uint32_t read_buffer_size
= static_cast<uint32_t>(sizeof(read_buffer
));
743 DispatcherVector read_dispatchers
;
744 uint32_t read_num_dispatchers
= 10; // Maximum to get.
745 EXPECT_EQ(MOJO_RESULT_OK
,
747 UserPointer
<void>(read_buffer
),
748 MakeUserPointer(&read_buffer_size
),
750 &read_num_dispatchers
,
751 MOJO_READ_MESSAGE_FLAG_NONE
));
752 EXPECT_EQ(sizeof(kHello
), static_cast<size_t>(read_buffer_size
));
753 EXPECT_STREQ(kHello
, read_buffer
);
754 EXPECT_EQ(1u, read_dispatchers
.size());
755 EXPECT_EQ(1u, read_num_dispatchers
);
756 ASSERT_TRUE(read_dispatchers
[0].get());
757 EXPECT_TRUE(read_dispatchers
[0]->HasOneRef());
759 EXPECT_EQ(Dispatcher::kTypeSharedBuffer
, read_dispatchers
[0]->GetType());
760 dispatcher
= static_cast<SharedBufferDispatcher
*>(read_dispatchers
[0].get());
762 // Make another mapping.
763 scoped_ptr
<embedder::PlatformSharedBufferMapping
> mapping1
;
766 dispatcher
->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE
, &mapping1
));
767 ASSERT_TRUE(mapping1
);
768 ASSERT_TRUE(mapping1
->GetBase());
769 ASSERT_EQ(100u, mapping1
->GetLength());
770 EXPECT_NE(mapping1
->GetBase(), mapping0
->GetBase());
771 EXPECT_EQ('A', static_cast<char*>(mapping1
->GetBase())[0]);
772 EXPECT_EQ('B', static_cast<char*>(mapping1
->GetBase())[50]);
773 EXPECT_EQ('C', static_cast<char*>(mapping1
->GetBase())[99]);
775 // Write stuff either way.
776 static_cast<char*>(mapping1
->GetBase())[1] = 'x';
777 EXPECT_EQ('x', static_cast<char*>(mapping0
->GetBase())[1]);
778 static_cast<char*>(mapping0
->GetBase())[2] = 'y';
779 EXPECT_EQ('y', static_cast<char*>(mapping1
->GetBase())[2]);
781 // Kill the first mapping; the second should still be valid.
783 EXPECT_EQ('A', static_cast<char*>(mapping1
->GetBase())[0]);
785 // Close everything that belongs to us.
788 EXPECT_EQ(MOJO_RESULT_OK
, dispatcher
->Close());
790 // The second mapping should still be good.
791 EXPECT_EQ('x', static_cast<char*>(mapping1
->GetBase())[1]);
794 #if defined(OS_POSIX)
795 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
797 // Not yet implemented (on Windows).
798 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
800 TEST_F(RemoteMessagePipeTest
, MAYBE_PlatformHandlePassing
) {
801 base::ScopedTempDir temp_dir
;
802 ASSERT_TRUE(temp_dir
.CreateUniqueTempDir());
804 static const char kHello
[] = "hello";
805 static const char kWorld
[] = "world";
807 uint32_t context
= 0;
808 HandleSignalsState hss
;
810 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
811 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
812 ConnectMessagePipes(mp0
, mp1
);
814 base::FilePath unused
;
816 CreateAndOpenTemporaryFileInDir(temp_dir
.path(), &unused
));
817 EXPECT_EQ(sizeof(kHello
), fwrite(kHello
, 1, sizeof(kHello
), fp
.get()));
818 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
820 scoped_refptr
<PlatformHandleDispatcher
> dispatcher(
821 new PlatformHandleDispatcher(
822 mojo::test::PlatformHandleFromFILE(fp
.Pass())));
824 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
825 // it later, it might already be readable.)
827 ASSERT_EQ(MOJO_RESULT_OK
,
828 mp1
->AddWaiter(1, &waiter
, MOJO_HANDLE_SIGNAL_READABLE
, 123, NULL
));
830 // Write to MP 0, port 0.
832 DispatcherTransport
transport(
833 test::DispatcherTryStartTransport(dispatcher
.get()));
834 EXPECT_TRUE(transport
.is_valid());
836 std::vector
<DispatcherTransport
> transports
;
837 transports
.push_back(transport
);
838 EXPECT_EQ(MOJO_RESULT_OK
,
840 UserPointer
<const void>(kWorld
),
843 MOJO_WRITE_MESSAGE_FLAG_NONE
));
846 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
847 // |dispatcher| is destroyed.
848 EXPECT_TRUE(dispatcher
->HasOneRef());
853 EXPECT_EQ(MOJO_RESULT_OK
, waiter
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
854 EXPECT_EQ(123u, context
);
855 hss
= HandleSignalsState();
856 mp1
->RemoveWaiter(1, &waiter
, &hss
);
857 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
858 hss
.satisfied_signals
);
859 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
860 hss
.satisfiable_signals
);
862 // Read from MP 1, port 1.
863 char read_buffer
[100] = {0};
864 uint32_t read_buffer_size
= static_cast<uint32_t>(sizeof(read_buffer
));
865 DispatcherVector read_dispatchers
;
866 uint32_t read_num_dispatchers
= 10; // Maximum to get.
867 EXPECT_EQ(MOJO_RESULT_OK
,
869 UserPointer
<void>(read_buffer
),
870 MakeUserPointer(&read_buffer_size
),
872 &read_num_dispatchers
,
873 MOJO_READ_MESSAGE_FLAG_NONE
));
874 EXPECT_EQ(sizeof(kWorld
), static_cast<size_t>(read_buffer_size
));
875 EXPECT_STREQ(kWorld
, read_buffer
);
876 EXPECT_EQ(1u, read_dispatchers
.size());
877 EXPECT_EQ(1u, read_num_dispatchers
);
878 ASSERT_TRUE(read_dispatchers
[0].get());
879 EXPECT_TRUE(read_dispatchers
[0]->HasOneRef());
881 EXPECT_EQ(Dispatcher::kTypePlatformHandle
, read_dispatchers
[0]->GetType());
883 static_cast<PlatformHandleDispatcher
*>(read_dispatchers
[0].get());
885 embedder::ScopedPlatformHandle h
= dispatcher
->PassPlatformHandle().Pass();
886 EXPECT_TRUE(h
.is_valid());
888 fp
= mojo::test::FILEFromPlatformHandle(h
.Pass(), "rb").Pass();
889 EXPECT_FALSE(h
.is_valid());
893 memset(read_buffer
, 0, sizeof(read_buffer
));
894 EXPECT_EQ(sizeof(kHello
),
895 fread(read_buffer
, 1, sizeof(read_buffer
), fp
.get()));
896 EXPECT_STREQ(kHello
, read_buffer
);
898 // Close everything that belongs to us.
901 EXPECT_EQ(MOJO_RESULT_OK
, dispatcher
->Close());
904 // Test racing closes (on each end).
905 // Note: A flaky failure would almost certainly indicate a problem in the code
906 // itself (not in the test). Also, any logged warnings/errors would also
907 // probably be indicative of bugs.
908 TEST_F(RemoteMessagePipeTest
, RacingClosesStress
) {
909 base::TimeDelta delay
= base::TimeDelta::FromMilliseconds(5);
911 for (unsigned i
= 0; i
< 256; i
++) {
912 DVLOG(2) << "---------------------------------------- " << i
;
913 scoped_refptr
<MessagePipe
> mp0(MessagePipe::CreateLocalProxy());
914 BootstrapMessagePipeNoWait(0, mp0
);
916 scoped_refptr
<MessagePipe
> mp1(MessagePipe::CreateProxyLocal());
917 BootstrapMessagePipeNoWait(1, mp1
);
920 io_thread()->task_runner()->PostTask(
921 FROM_HERE
, base::Bind(&base::PlatformThread::Sleep
, delay
));
924 base::PlatformThread::Sleep(delay
);
929 io_thread()->task_runner()->PostTask(
930 FROM_HERE
, base::Bind(&base::PlatformThread::Sleep
, delay
));
933 base::PlatformThread::Sleep(delay
);
937 RestoreInitialState();
942 } // namespace system