Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / mojo / system / remote_message_pipe_unittest.cc
blobfdd5c4c9fa7f584ed0fff74ceb9de4d959507bdb
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include <stdint.h>
6 #include <stdio.h>
7 #include <string.h>
9 #include <vector>
11 #include "base/bind.h"
12 #include "base/file_util.h"
13 #include "base/files/file_path.h"
14 #include "base/files/scoped_file.h"
15 #include "base/files/scoped_temp_dir.h"
16 #include "base/location.h"
17 #include "base/logging.h"
18 #include "base/macros.h"
19 #include "base/message_loop/message_loop.h"
20 #include "base/threading/platform_thread.h" // For |Sleep()|.
21 #include "build/build_config.h" // TODO(vtl): Remove this.
22 #include "mojo/common/test/test_utils.h"
23 #include "mojo/embedder/platform_channel_pair.h"
24 #include "mojo/embedder/platform_shared_buffer.h"
25 #include "mojo/embedder/scoped_platform_handle.h"
26 #include "mojo/embedder/simple_platform_support.h"
27 #include "mojo/system/channel.h"
28 #include "mojo/system/message_pipe.h"
29 #include "mojo/system/message_pipe_dispatcher.h"
30 #include "mojo/system/platform_handle_dispatcher.h"
31 #include "mojo/system/raw_channel.h"
32 #include "mojo/system/shared_buffer_dispatcher.h"
33 #include "mojo/system/test_utils.h"
34 #include "mojo/system/waiter.h"
35 #include "testing/gtest/include/gtest/gtest.h"
37 namespace mojo {
38 namespace system {
39 namespace {
41 class RemoteMessagePipeTest : public testing::Test {
42 public:
43 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
44 virtual ~RemoteMessagePipeTest() {}
46 virtual void SetUp() OVERRIDE {
47 io_thread_.PostTaskAndWait(
48 FROM_HERE,
49 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
50 base::Unretained(this)));
53 virtual void TearDown() OVERRIDE {
54 io_thread_.PostTaskAndWait(
55 FROM_HERE,
56 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
57 base::Unretained(this)));
60 protected:
61 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
62 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
63 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
64 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
65 scoped_refptr<MessagePipe> mp1) {
66 io_thread_.PostTaskAndWait(
67 FROM_HERE,
68 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
69 base::Unretained(this),
70 mp0,
71 mp1));
74 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
75 // It assumes/requires that this is the bootstrap case, i.e., that the
76 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
77 // returns *without* waiting for it to finish connecting.
78 void BootstrapMessagePipeNoWait(unsigned channel_index,
79 scoped_refptr<MessagePipe> mp) {
80 io_thread_.PostTask(
81 FROM_HERE,
82 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
83 base::Unretained(this),
84 channel_index,
85 mp));
88 void RestoreInitialState() {
89 io_thread_.PostTaskAndWait(
90 FROM_HERE,
91 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
92 base::Unretained(this)));
95 embedder::PlatformSupport* platform_support() { return &platform_support_; }
96 test::TestIOThread* io_thread() { return &io_thread_; }
98 private:
99 void SetUpOnIOThread() {
100 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
102 embedder::PlatformChannelPair channel_pair;
103 platform_handles_[0] = channel_pair.PassServerHandle();
104 platform_handles_[1] = channel_pair.PassClientHandle();
107 void TearDownOnIOThread() {
108 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
110 if (channels_[0].get()) {
111 channels_[0]->Shutdown();
112 channels_[0] = NULL;
114 if (channels_[1].get()) {
115 channels_[1]->Shutdown();
116 channels_[1] = NULL;
120 void CreateAndInitChannel(unsigned channel_index) {
121 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
122 CHECK(channel_index == 0 || channel_index == 1);
123 CHECK(!channels_[channel_index].get());
125 channels_[channel_index] = new Channel(&platform_support_);
126 CHECK(channels_[channel_index]->Init(
127 RawChannel::Create(platform_handles_[channel_index].Pass())));
130 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
131 scoped_refptr<MessagePipe> mp1) {
132 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
134 if (!channels_[0].get())
135 CreateAndInitChannel(0);
136 if (!channels_[1].get())
137 CreateAndInitChannel(1);
139 MessageInTransit::EndpointId local_id0 =
140 channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
141 MessageInTransit::EndpointId local_id1 =
142 channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
144 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
145 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
148 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
149 scoped_refptr<MessagePipe> mp) {
150 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
151 CHECK(channel_index == 0 || channel_index == 1);
153 unsigned port = channel_index ^ 1u;
155 CreateAndInitChannel(channel_index);
156 MessageInTransit::EndpointId endpoint_id =
157 channels_[channel_index]->AttachMessagePipeEndpoint(mp, port);
158 if (endpoint_id == MessageInTransit::kInvalidEndpointId)
159 return;
161 CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
162 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
163 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
166 void RestoreInitialStateOnIOThread() {
167 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
169 TearDownOnIOThread();
170 SetUpOnIOThread();
173 embedder::SimplePlatformSupport platform_support_;
174 test::TestIOThread io_thread_;
175 embedder::ScopedPlatformHandle platform_handles_[2];
176 scoped_refptr<Channel> channels_[2];
178 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
181 TEST_F(RemoteMessagePipeTest, Basic) {
182 static const char kHello[] = "hello";
183 static const char kWorld[] = "world!!!1!!!1!";
184 char buffer[100] = {0};
185 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
186 Waiter waiter;
187 HandleSignalsState hss;
188 uint32_t context = 0;
190 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
191 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
192 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
194 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
195 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
196 ConnectMessagePipes(mp0, mp1);
198 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
200 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
201 // it later, it might already be readable.)
202 waiter.Init();
203 ASSERT_EQ(MOJO_RESULT_OK,
204 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
206 // Write to MP 0, port 0.
207 EXPECT_EQ(MOJO_RESULT_OK,
208 mp0->WriteMessage(0,
209 UserPointer<const void>(kHello),
210 sizeof(kHello),
211 NULL,
212 MOJO_WRITE_MESSAGE_FLAG_NONE));
214 // Wait.
215 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
216 EXPECT_EQ(123u, context);
217 hss = HandleSignalsState();
218 mp1->RemoveWaiter(1, &waiter, &hss);
219 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
220 hss.satisfied_signals);
221 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
222 hss.satisfiable_signals);
224 // Read from MP 1, port 1.
225 EXPECT_EQ(MOJO_RESULT_OK,
226 mp1->ReadMessage(1,
227 UserPointer<void>(buffer),
228 MakeUserPointer(&buffer_size),
229 NULL,
230 NULL,
231 MOJO_READ_MESSAGE_FLAG_NONE));
232 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
233 EXPECT_STREQ(kHello, buffer);
235 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
237 waiter.Init();
238 ASSERT_EQ(MOJO_RESULT_OK,
239 mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
241 EXPECT_EQ(MOJO_RESULT_OK,
242 mp1->WriteMessage(1,
243 UserPointer<const void>(kWorld),
244 sizeof(kWorld),
245 NULL,
246 MOJO_WRITE_MESSAGE_FLAG_NONE));
248 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
249 EXPECT_EQ(456u, context);
250 hss = HandleSignalsState();
251 mp0->RemoveWaiter(0, &waiter, &hss);
252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
253 hss.satisfied_signals);
254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
255 hss.satisfiable_signals);
257 buffer_size = static_cast<uint32_t>(sizeof(buffer));
258 EXPECT_EQ(MOJO_RESULT_OK,
259 mp0->ReadMessage(0,
260 UserPointer<void>(buffer),
261 MakeUserPointer(&buffer_size),
262 NULL,
263 NULL,
264 MOJO_READ_MESSAGE_FLAG_NONE));
265 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
266 EXPECT_STREQ(kWorld, buffer);
268 // Close MP 0, port 0.
269 mp0->Close(0);
271 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
272 // when it realizes that MP 0, port 0 has been closed. (It may also fail
273 // immediately.)
274 waiter.Init();
275 hss = HandleSignalsState();
276 MojoResult result =
277 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
278 if (result == MOJO_RESULT_OK) {
279 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
280 waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
281 EXPECT_EQ(789u, context);
282 hss = HandleSignalsState();
283 mp1->RemoveWaiter(1, &waiter, &hss);
285 EXPECT_EQ(0u, hss.satisfied_signals);
286 EXPECT_EQ(0u, hss.satisfiable_signals);
288 // And MP 1, port 1.
289 mp1->Close(1);
292 TEST_F(RemoteMessagePipeTest, Multiplex) {
293 static const char kHello[] = "hello";
294 static const char kWorld[] = "world!!!1!!!1!";
295 char buffer[100] = {0};
296 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
297 Waiter waiter;
298 HandleSignalsState hss;
299 uint32_t context = 0;
301 // Connect message pipes as in the |Basic| test.
303 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
304 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
305 ConnectMessagePipes(mp0, mp1);
307 // Now put another message pipe on the channel.
309 scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy());
310 scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal());
311 ConnectMessagePipes(mp2, mp3);
313 // Write: MP 2, port 0 -> MP 3, port 1.
315 waiter.Init();
316 ASSERT_EQ(MOJO_RESULT_OK,
317 mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
319 EXPECT_EQ(MOJO_RESULT_OK,
320 mp2->WriteMessage(0,
321 UserPointer<const void>(kHello),
322 sizeof(kHello),
323 NULL,
324 MOJO_WRITE_MESSAGE_FLAG_NONE));
326 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
327 EXPECT_EQ(789u, context);
328 hss = HandleSignalsState();
329 mp3->RemoveWaiter(1, &waiter, &hss);
330 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
331 hss.satisfied_signals);
332 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
333 hss.satisfiable_signals);
335 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
336 buffer_size = static_cast<uint32_t>(sizeof(buffer));
337 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
338 mp0->ReadMessage(0,
339 UserPointer<void>(buffer),
340 MakeUserPointer(&buffer_size),
341 NULL,
342 NULL,
343 MOJO_READ_MESSAGE_FLAG_NONE));
344 buffer_size = static_cast<uint32_t>(sizeof(buffer));
345 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
346 mp1->ReadMessage(1,
347 UserPointer<void>(buffer),
348 MakeUserPointer(&buffer_size),
349 NULL,
350 NULL,
351 MOJO_READ_MESSAGE_FLAG_NONE));
352 buffer_size = static_cast<uint32_t>(sizeof(buffer));
353 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
354 mp2->ReadMessage(0,
355 UserPointer<void>(buffer),
356 MakeUserPointer(&buffer_size),
357 NULL,
358 NULL,
359 MOJO_READ_MESSAGE_FLAG_NONE));
361 // Read from MP 3, port 1.
362 buffer_size = static_cast<uint32_t>(sizeof(buffer));
363 EXPECT_EQ(MOJO_RESULT_OK,
364 mp3->ReadMessage(1,
365 UserPointer<void>(buffer),
366 MakeUserPointer(&buffer_size),
367 NULL,
368 NULL,
369 MOJO_READ_MESSAGE_FLAG_NONE));
370 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
371 EXPECT_STREQ(kHello, buffer);
373 // Write: MP 0, port 0 -> MP 1, port 1 again.
375 waiter.Init();
376 ASSERT_EQ(MOJO_RESULT_OK,
377 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
379 EXPECT_EQ(MOJO_RESULT_OK,
380 mp0->WriteMessage(0,
381 UserPointer<const void>(kWorld),
382 sizeof(kWorld),
383 NULL,
384 MOJO_WRITE_MESSAGE_FLAG_NONE));
386 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
387 EXPECT_EQ(123u, context);
388 hss = HandleSignalsState();
389 mp1->RemoveWaiter(1, &waiter, &hss);
390 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
391 hss.satisfied_signals);
392 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
393 hss.satisfiable_signals);
395 // Make sure there's nothing on the other ports.
396 buffer_size = static_cast<uint32_t>(sizeof(buffer));
397 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
398 mp0->ReadMessage(0,
399 UserPointer<void>(buffer),
400 MakeUserPointer(&buffer_size),
401 NULL,
402 NULL,
403 MOJO_READ_MESSAGE_FLAG_NONE));
404 buffer_size = static_cast<uint32_t>(sizeof(buffer));
405 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
406 mp2->ReadMessage(0,
407 UserPointer<void>(buffer),
408 MakeUserPointer(&buffer_size),
409 NULL,
410 NULL,
411 MOJO_READ_MESSAGE_FLAG_NONE));
412 buffer_size = static_cast<uint32_t>(sizeof(buffer));
413 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
414 mp3->ReadMessage(1,
415 UserPointer<void>(buffer),
416 MakeUserPointer(&buffer_size),
417 NULL,
418 NULL,
419 MOJO_READ_MESSAGE_FLAG_NONE));
421 buffer_size = static_cast<uint32_t>(sizeof(buffer));
422 EXPECT_EQ(MOJO_RESULT_OK,
423 mp1->ReadMessage(1,
424 UserPointer<void>(buffer),
425 MakeUserPointer(&buffer_size),
426 NULL,
427 NULL,
428 MOJO_READ_MESSAGE_FLAG_NONE));
429 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
430 EXPECT_STREQ(kWorld, buffer);
432 mp0->Close(0);
433 mp1->Close(1);
434 mp2->Close(0);
435 mp3->Close(1);
438 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
439 static const char kHello[] = "hello";
440 char buffer[100] = {0};
441 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
442 Waiter waiter;
443 HandleSignalsState hss;
444 uint32_t context = 0;
446 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
447 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
448 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
450 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
452 // Write to MP 0, port 0.
453 EXPECT_EQ(MOJO_RESULT_OK,
454 mp0->WriteMessage(0,
455 UserPointer<const void>(kHello),
456 sizeof(kHello),
457 NULL,
458 MOJO_WRITE_MESSAGE_FLAG_NONE));
460 BootstrapMessagePipeNoWait(0, mp0);
462 // Close MP 0, port 0 before channel 1 is even connected.
463 mp0->Close(0);
465 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
467 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
468 // it later, it might already be readable.)
469 waiter.Init();
470 ASSERT_EQ(MOJO_RESULT_OK,
471 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
473 BootstrapMessagePipeNoWait(1, mp1);
475 // Wait.
476 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
477 EXPECT_EQ(123u, context);
478 hss = HandleSignalsState();
479 // Note: MP 1, port 1 should definitely should be readable, but it may or may
480 // not appear as writable (there's a race, and it may not have noticed that
481 // the other side was closed yet -- e.g., inserting a sleep here would make it
482 // much more likely to notice that it's no longer writable).
483 mp1->RemoveWaiter(1, &waiter, &hss);
484 EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
485 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
487 // Read from MP 1, port 1.
488 EXPECT_EQ(MOJO_RESULT_OK,
489 mp1->ReadMessage(1,
490 UserPointer<void>(buffer),
491 MakeUserPointer(&buffer_size),
492 NULL,
493 NULL,
494 MOJO_READ_MESSAGE_FLAG_NONE));
495 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
496 EXPECT_STREQ(kHello, buffer);
498 // And MP 1, port 1.
499 mp1->Close(1);
502 TEST_F(RemoteMessagePipeTest, HandlePassing) {
503 static const char kHello[] = "hello";
504 Waiter waiter;
505 HandleSignalsState hss;
506 uint32_t context = 0;
508 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
509 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
510 ConnectMessagePipes(mp0, mp1);
512 // We'll try to pass this dispatcher.
513 scoped_refptr<MessagePipeDispatcher> dispatcher(
514 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
515 scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
516 dispatcher->Init(local_mp, 0);
518 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
519 // it later, it might already be readable.)
520 waiter.Init();
521 ASSERT_EQ(MOJO_RESULT_OK,
522 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
524 // Write to MP 0, port 0.
526 DispatcherTransport transport(
527 test::DispatcherTryStartTransport(dispatcher.get()));
528 EXPECT_TRUE(transport.is_valid());
530 std::vector<DispatcherTransport> transports;
531 transports.push_back(transport);
532 EXPECT_EQ(MOJO_RESULT_OK,
533 mp0->WriteMessage(0,
534 UserPointer<const void>(kHello),
535 sizeof(kHello),
536 &transports,
537 MOJO_WRITE_MESSAGE_FLAG_NONE));
538 transport.End();
540 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
541 // |dispatcher| is destroyed.
542 EXPECT_TRUE(dispatcher->HasOneRef());
543 dispatcher = NULL;
546 // Wait.
547 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
548 EXPECT_EQ(123u, context);
549 hss = HandleSignalsState();
550 mp1->RemoveWaiter(1, &waiter, &hss);
551 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
552 hss.satisfied_signals);
553 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
554 hss.satisfiable_signals);
556 // Read from MP 1, port 1.
557 char read_buffer[100] = {0};
558 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
559 DispatcherVector read_dispatchers;
560 uint32_t read_num_dispatchers = 10; // Maximum to get.
561 EXPECT_EQ(MOJO_RESULT_OK,
562 mp1->ReadMessage(1,
563 UserPointer<void>(read_buffer),
564 MakeUserPointer(&read_buffer_size),
565 &read_dispatchers,
566 &read_num_dispatchers,
567 MOJO_READ_MESSAGE_FLAG_NONE));
568 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
569 EXPECT_STREQ(kHello, read_buffer);
570 EXPECT_EQ(1u, read_dispatchers.size());
571 EXPECT_EQ(1u, read_num_dispatchers);
572 ASSERT_TRUE(read_dispatchers[0].get());
573 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
575 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
576 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
578 // Add the waiter now, before it becomes readable to avoid a race.
579 waiter.Init();
580 ASSERT_EQ(
581 MOJO_RESULT_OK,
582 dispatcher->AddWaiter(&waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, NULL));
584 // Write to "local_mp", port 1.
585 EXPECT_EQ(MOJO_RESULT_OK,
586 local_mp->WriteMessage(1,
587 UserPointer<const void>(kHello),
588 sizeof(kHello),
589 NULL,
590 MOJO_WRITE_MESSAGE_FLAG_NONE));
592 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
593 // here. (We don't crash if I sleep and then close.)
595 // Wait for the dispatcher to become readable.
596 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
597 EXPECT_EQ(456u, context);
598 hss = HandleSignalsState();
599 dispatcher->RemoveWaiter(&waiter, &hss);
600 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
601 hss.satisfied_signals);
602 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
603 hss.satisfiable_signals);
605 // Read from the dispatcher.
606 memset(read_buffer, 0, sizeof(read_buffer));
607 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
608 EXPECT_EQ(MOJO_RESULT_OK,
609 dispatcher->ReadMessage(UserPointer<void>(read_buffer),
610 MakeUserPointer(&read_buffer_size),
612 NULL,
613 MOJO_READ_MESSAGE_FLAG_NONE));
614 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
615 EXPECT_STREQ(kHello, read_buffer);
617 // Prepare to wait on "local_mp", port 1.
618 waiter.Init();
619 ASSERT_EQ(
620 MOJO_RESULT_OK,
621 local_mp->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, NULL));
623 // Write to the dispatcher.
624 EXPECT_EQ(MOJO_RESULT_OK,
625 dispatcher->WriteMessage(UserPointer<const void>(kHello),
626 sizeof(kHello),
627 NULL,
628 MOJO_WRITE_MESSAGE_FLAG_NONE));
630 // Wait.
631 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
632 EXPECT_EQ(789u, context);
633 hss = HandleSignalsState();
634 local_mp->RemoveWaiter(1, &waiter, &hss);
635 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
636 hss.satisfied_signals);
637 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
638 hss.satisfiable_signals);
640 // Read from "local_mp", port 1.
641 memset(read_buffer, 0, sizeof(read_buffer));
642 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
643 EXPECT_EQ(MOJO_RESULT_OK,
644 local_mp->ReadMessage(1,
645 UserPointer<void>(read_buffer),
646 MakeUserPointer(&read_buffer_size),
647 NULL,
648 NULL,
649 MOJO_READ_MESSAGE_FLAG_NONE));
650 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
651 EXPECT_STREQ(kHello, read_buffer);
653 // TODO(vtl): Also test that messages queued up before the handle was sent are
654 // delivered properly.
656 // Close everything that belongs to us.
657 mp0->Close(0);
658 mp1->Close(1);
659 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
660 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
661 local_mp->Close(1);
664 #if defined(OS_POSIX)
665 #define MAYBE_SharedBufferPassing SharedBufferPassing
666 #else
667 // Not yet implemented (on Windows).
668 #define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
669 #endif
670 TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
671 static const char kHello[] = "hello";
672 Waiter waiter;
673 HandleSignalsState hss;
674 uint32_t context = 0;
676 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
677 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
678 ConnectMessagePipes(mp0, mp1);
680 // We'll try to pass this dispatcher.
681 scoped_refptr<SharedBufferDispatcher> dispatcher;
682 EXPECT_EQ(MOJO_RESULT_OK,
683 SharedBufferDispatcher::Create(
684 platform_support(),
685 SharedBufferDispatcher::kDefaultCreateOptions,
686 100,
687 &dispatcher));
688 ASSERT_TRUE(dispatcher.get());
690 // Make a mapping.
691 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
692 EXPECT_EQ(
693 MOJO_RESULT_OK,
694 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
695 ASSERT_TRUE(mapping0);
696 ASSERT_TRUE(mapping0->GetBase());
697 ASSERT_EQ(100u, mapping0->GetLength());
698 static_cast<char*>(mapping0->GetBase())[0] = 'A';
699 static_cast<char*>(mapping0->GetBase())[50] = 'B';
700 static_cast<char*>(mapping0->GetBase())[99] = 'C';
702 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
703 // it later, it might already be readable.)
704 waiter.Init();
705 ASSERT_EQ(MOJO_RESULT_OK,
706 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
708 // Write to MP 0, port 0.
710 DispatcherTransport transport(
711 test::DispatcherTryStartTransport(dispatcher.get()));
712 EXPECT_TRUE(transport.is_valid());
714 std::vector<DispatcherTransport> transports;
715 transports.push_back(transport);
716 EXPECT_EQ(MOJO_RESULT_OK,
717 mp0->WriteMessage(0,
718 UserPointer<const void>(kHello),
719 sizeof(kHello),
720 &transports,
721 MOJO_WRITE_MESSAGE_FLAG_NONE));
722 transport.End();
724 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
725 // |dispatcher| is destroyed.
726 EXPECT_TRUE(dispatcher->HasOneRef());
727 dispatcher = NULL;
730 // Wait.
731 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
732 EXPECT_EQ(123u, context);
733 hss = HandleSignalsState();
734 mp1->RemoveWaiter(1, &waiter, &hss);
735 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
736 hss.satisfied_signals);
737 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
738 hss.satisfiable_signals);
740 // Read from MP 1, port 1.
741 char read_buffer[100] = {0};
742 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
743 DispatcherVector read_dispatchers;
744 uint32_t read_num_dispatchers = 10; // Maximum to get.
745 EXPECT_EQ(MOJO_RESULT_OK,
746 mp1->ReadMessage(1,
747 UserPointer<void>(read_buffer),
748 MakeUserPointer(&read_buffer_size),
749 &read_dispatchers,
750 &read_num_dispatchers,
751 MOJO_READ_MESSAGE_FLAG_NONE));
752 EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
753 EXPECT_STREQ(kHello, read_buffer);
754 EXPECT_EQ(1u, read_dispatchers.size());
755 EXPECT_EQ(1u, read_num_dispatchers);
756 ASSERT_TRUE(read_dispatchers[0].get());
757 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
759 EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
760 dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());
762 // Make another mapping.
763 scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
764 EXPECT_EQ(
765 MOJO_RESULT_OK,
766 dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
767 ASSERT_TRUE(mapping1);
768 ASSERT_TRUE(mapping1->GetBase());
769 ASSERT_EQ(100u, mapping1->GetLength());
770 EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
771 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
772 EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
773 EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);
775 // Write stuff either way.
776 static_cast<char*>(mapping1->GetBase())[1] = 'x';
777 EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
778 static_cast<char*>(mapping0->GetBase())[2] = 'y';
779 EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);
781 // Kill the first mapping; the second should still be valid.
782 mapping0.reset();
783 EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
785 // Close everything that belongs to us.
786 mp0->Close(0);
787 mp1->Close(1);
788 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
790 // The second mapping should still be good.
791 EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
794 #if defined(OS_POSIX)
795 #define MAYBE_PlatformHandlePassing PlatformHandlePassing
796 #else
797 // Not yet implemented (on Windows).
798 #define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
799 #endif
800 TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
801 base::ScopedTempDir temp_dir;
802 ASSERT_TRUE(temp_dir.CreateUniqueTempDir());
804 static const char kHello[] = "hello";
805 static const char kWorld[] = "world";
806 Waiter waiter;
807 uint32_t context = 0;
808 HandleSignalsState hss;
810 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
811 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
812 ConnectMessagePipes(mp0, mp1);
814 base::FilePath unused;
815 base::ScopedFILE fp(
816 CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
817 EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
818 // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
819 // be passed.
820 scoped_refptr<PlatformHandleDispatcher> dispatcher(
821 new PlatformHandleDispatcher(
822 mojo::test::PlatformHandleFromFILE(fp.Pass())));
824 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
825 // it later, it might already be readable.)
826 waiter.Init();
827 ASSERT_EQ(MOJO_RESULT_OK,
828 mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, NULL));
830 // Write to MP 0, port 0.
832 DispatcherTransport transport(
833 test::DispatcherTryStartTransport(dispatcher.get()));
834 EXPECT_TRUE(transport.is_valid());
836 std::vector<DispatcherTransport> transports;
837 transports.push_back(transport);
838 EXPECT_EQ(MOJO_RESULT_OK,
839 mp0->WriteMessage(0,
840 UserPointer<const void>(kWorld),
841 sizeof(kWorld),
842 &transports,
843 MOJO_WRITE_MESSAGE_FLAG_NONE));
844 transport.End();
846 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
847 // |dispatcher| is destroyed.
848 EXPECT_TRUE(dispatcher->HasOneRef());
849 dispatcher = NULL;
852 // Wait.
853 EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
854 EXPECT_EQ(123u, context);
855 hss = HandleSignalsState();
856 mp1->RemoveWaiter(1, &waiter, &hss);
857 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
858 hss.satisfied_signals);
859 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
860 hss.satisfiable_signals);
862 // Read from MP 1, port 1.
863 char read_buffer[100] = {0};
864 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
865 DispatcherVector read_dispatchers;
866 uint32_t read_num_dispatchers = 10; // Maximum to get.
867 EXPECT_EQ(MOJO_RESULT_OK,
868 mp1->ReadMessage(1,
869 UserPointer<void>(read_buffer),
870 MakeUserPointer(&read_buffer_size),
871 &read_dispatchers,
872 &read_num_dispatchers,
873 MOJO_READ_MESSAGE_FLAG_NONE));
874 EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
875 EXPECT_STREQ(kWorld, read_buffer);
876 EXPECT_EQ(1u, read_dispatchers.size());
877 EXPECT_EQ(1u, read_num_dispatchers);
878 ASSERT_TRUE(read_dispatchers[0].get());
879 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
881 EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
882 dispatcher =
883 static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());
885 embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
886 EXPECT_TRUE(h.is_valid());
888 fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
889 EXPECT_FALSE(h.is_valid());
890 EXPECT_TRUE(fp);
892 rewind(fp.get());
893 memset(read_buffer, 0, sizeof(read_buffer));
894 EXPECT_EQ(sizeof(kHello),
895 fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
896 EXPECT_STREQ(kHello, read_buffer);
898 // Close everything that belongs to us.
899 mp0->Close(0);
900 mp1->Close(1);
901 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
904 // Test racing closes (on each end).
905 // Note: A flaky failure would almost certainly indicate a problem in the code
906 // itself (not in the test). Also, any logged warnings/errors would also
907 // probably be indicative of bugs.
908 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
909 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
911 for (unsigned i = 0; i < 256; i++) {
912 DVLOG(2) << "---------------------------------------- " << i;
913 scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy());
914 BootstrapMessagePipeNoWait(0, mp0);
916 scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal());
917 BootstrapMessagePipeNoWait(1, mp1);
919 if (i & 1u) {
920 io_thread()->task_runner()->PostTask(
921 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
923 if (i & 2u)
924 base::PlatformThread::Sleep(delay);
926 mp0->Close(0);
928 if (i & 4u) {
929 io_thread()->task_runner()->PostTask(
930 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
932 if (i & 8u)
933 base::PlatformThread::Sleep(delay);
935 mp1->Close(1);
937 RestoreInitialState();
941 } // namespace
942 } // namespace system
943 } // namespace mojo