Add DriveAppRegistryObserver.
[chromium-blink-merge.git] / mojo / system / remote_message_pipe_unittest.cc
blob4dee5772ab8d018f52cc933ab5184584cf4b7a42
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include <stdint.h>
6 #include <string.h>
8 #include <vector>
10 #include "base/basictypes.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/threading/platform_thread.h" // For |Sleep()|.
16 #include "mojo/embedder/platform_channel_pair.h"
17 #include "mojo/embedder/scoped_platform_handle.h"
18 #include "mojo/system/channel.h"
19 #include "mojo/system/local_message_pipe_endpoint.h"
20 #include "mojo/system/message_pipe.h"
21 #include "mojo/system/message_pipe_dispatcher.h"
22 #include "mojo/system/proxy_message_pipe_endpoint.h"
23 #include "mojo/system/raw_channel.h"
24 #include "mojo/system/test_utils.h"
25 #include "mojo/system/waiter.h"
26 #include "testing/gtest/include/gtest/gtest.h"
28 namespace mojo {
29 namespace system {
30 namespace {
32 class RemoteMessagePipeTest : public testing::Test {
33 public:
34 RemoteMessagePipeTest() : io_thread_(test::TestIOThread::kAutoStart) {}
35 virtual ~RemoteMessagePipeTest() {}
37 virtual void SetUp() OVERRIDE {
38 io_thread_.PostTaskAndWait(
39 FROM_HERE,
40 base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
41 base::Unretained(this)));
44 virtual void TearDown() OVERRIDE {
45 io_thread_.PostTaskAndWait(
46 FROM_HERE,
47 base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
48 base::Unretained(this)));
51 protected:
52 // This connects MP 0, port 1 and MP 1, port 0 (leaving MP 0, port 0 and MP 1,
53 // port 1 as the user-visible endpoints) to channel 0 and 1, respectively. MP
54 // 0, port 1 and MP 1, port 0 must have |ProxyMessagePipeEndpoint|s.
55 void ConnectMessagePipes(scoped_refptr<MessagePipe> mp0,
56 scoped_refptr<MessagePipe> mp1) {
57 io_thread_.PostTaskAndWait(
58 FROM_HERE,
59 base::Bind(&RemoteMessagePipeTest::ConnectMessagePipesOnIOThread,
60 base::Unretained(this), mp0, mp1));
63 // This connects |mp|'s port |channel_index ^ 1| to channel |channel_index|.
64 // It assumes/requires that this is the bootstrap case, i.e., that the
65 // endpoint IDs are both/will both be |Channel::kBootstrapEndpointId|. This
66 // returns *without* waiting for it to finish connecting.
67 void BootstrapMessagePipeNoWait(unsigned channel_index,
68 scoped_refptr<MessagePipe> mp) {
69 io_thread_.PostTask(
70 FROM_HERE,
71 base::Bind(&RemoteMessagePipeTest::BootstrapMessagePipeOnIOThread,
72 base::Unretained(this), channel_index, mp));
75 void RestoreInitialState() {
76 io_thread_.PostTaskAndWait(
77 FROM_HERE,
78 base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
79 base::Unretained(this)));
82 test::TestIOThread* io_thread() { return &io_thread_; }
84 private:
85 void SetUpOnIOThread() {
86 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
88 embedder::PlatformChannelPair channel_pair;
89 platform_handles_[0] = channel_pair.PassServerHandle();
90 platform_handles_[1] = channel_pair.PassClientHandle();
93 void TearDownOnIOThread() {
94 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
96 if (channels_[0].get()) {
97 channels_[0]->Shutdown();
98 channels_[0] = NULL;
100 if (channels_[1].get()) {
101 channels_[1]->Shutdown();
102 channels_[1] = NULL;
106 void CreateAndInitChannel(unsigned channel_index) {
107 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
108 CHECK(channel_index == 0 || channel_index == 1);
109 CHECK(!channels_[channel_index].get());
111 channels_[channel_index] = new Channel();
112 CHECK(channels_[channel_index]->Init(
113 RawChannel::Create(platform_handles_[channel_index].Pass())));
116 void ConnectMessagePipesOnIOThread(scoped_refptr<MessagePipe> mp0,
117 scoped_refptr<MessagePipe> mp1) {
118 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
120 if (!channels_[0].get())
121 CreateAndInitChannel(0);
122 if (!channels_[1].get())
123 CreateAndInitChannel(1);
125 MessageInTransit::EndpointId local_id0 =
126 channels_[0]->AttachMessagePipeEndpoint(mp0, 1);
127 MessageInTransit::EndpointId local_id1 =
128 channels_[1]->AttachMessagePipeEndpoint(mp1, 0);
130 CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
131 CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
134 void BootstrapMessagePipeOnIOThread(unsigned channel_index,
135 scoped_refptr<MessagePipe> mp) {
136 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
137 CHECK(channel_index == 0 || channel_index == 1);
139 unsigned port = channel_index ^ 1u;
141 // Important: If we don't boot
142 CreateAndInitChannel(channel_index);
143 CHECK_EQ(channels_[channel_index]->AttachMessagePipeEndpoint(mp, port),
144 Channel::kBootstrapEndpointId);
145 CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
146 Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
149 void RestoreInitialStateOnIOThread() {
150 CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
152 TearDownOnIOThread();
153 SetUpOnIOThread();
156 test::TestIOThread io_thread_;
157 embedder::ScopedPlatformHandle platform_handles_[2];
158 scoped_refptr<Channel> channels_[2];
160 DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
163 TEST_F(RemoteMessagePipeTest, Basic) {
164 const char hello[] = "hello";
165 const char world[] = "world!!!1!!!1!";
166 char buffer[100] = { 0 };
167 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
168 Waiter waiter;
170 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
171 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
172 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
174 scoped_refptr<MessagePipe> mp0(new MessagePipe(
175 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
176 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
177 scoped_refptr<MessagePipe> mp1(new MessagePipe(
178 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
179 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
180 ConnectMessagePipes(mp0, mp1);
182 // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.
184 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
185 // it later, it might already be readable.)
186 waiter.Init();
187 EXPECT_EQ(MOJO_RESULT_OK,
188 mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
190 // Write to MP 0, port 0.
191 EXPECT_EQ(MOJO_RESULT_OK,
192 mp0->WriteMessage(0,
193 hello, sizeof(hello),
194 NULL,
195 MOJO_WRITE_MESSAGE_FLAG_NONE));
197 // Wait.
198 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
199 mp1->RemoveWaiter(1, &waiter);
201 // Read from MP 1, port 1.
202 EXPECT_EQ(MOJO_RESULT_OK,
203 mp1->ReadMessage(1,
204 buffer, &buffer_size,
205 NULL, NULL,
206 MOJO_READ_MESSAGE_FLAG_NONE));
207 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
208 EXPECT_STREQ(hello, buffer);
210 // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.
212 waiter.Init();
213 EXPECT_EQ(MOJO_RESULT_OK,
214 mp0->AddWaiter(0, &waiter, MOJO_WAIT_FLAG_READABLE, 456));
216 EXPECT_EQ(MOJO_RESULT_OK,
217 mp1->WriteMessage(1,
218 world, sizeof(world),
219 NULL,
220 MOJO_WRITE_MESSAGE_FLAG_NONE));
222 EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
223 mp0->RemoveWaiter(0, &waiter);
225 buffer_size = static_cast<uint32_t>(sizeof(buffer));
226 EXPECT_EQ(MOJO_RESULT_OK,
227 mp0->ReadMessage(0,
228 buffer, &buffer_size,
229 NULL, NULL,
230 MOJO_READ_MESSAGE_FLAG_NONE));
231 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
232 EXPECT_STREQ(world, buffer);
234 // Close MP 0, port 0.
235 mp0->Close(0);
237 // Try to wait for MP 1, port 1 to become readable. This will eventually fail
238 // when it realizes that MP 0, port 0 has been closed. (It may also fail
239 // immediately.)
240 waiter.Init();
241 MojoResult result = mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789);
242 if (result == MOJO_RESULT_OK) {
243 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
244 waiter.Wait(MOJO_DEADLINE_INDEFINITE));
245 mp1->RemoveWaiter(1, &waiter);
246 } else {
247 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
250 // And MP 1, port 1.
251 mp1->Close(1);
254 TEST_F(RemoteMessagePipeTest, Multiplex) {
255 const char hello[] = "hello";
256 const char world[] = "world!!!1!!!1!";
257 char buffer[100] = { 0 };
258 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
259 Waiter waiter;
261 // Connect message pipes as in the |Basic| test.
263 scoped_refptr<MessagePipe> mp0(new MessagePipe(
264 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
265 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
266 scoped_refptr<MessagePipe> mp1(new MessagePipe(
267 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
268 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
269 ConnectMessagePipes(mp0, mp1);
271 // Now put another message pipe on the channel.
273 scoped_refptr<MessagePipe> mp2(new MessagePipe(
274 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
275 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
276 scoped_refptr<MessagePipe> mp3(new MessagePipe(
277 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
278 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
279 ConnectMessagePipes(mp2, mp3);
281 // Write: MP 2, port 0 -> MP 3, port 1.
283 waiter.Init();
284 EXPECT_EQ(MOJO_RESULT_OK,
285 mp3->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
287 EXPECT_EQ(MOJO_RESULT_OK,
288 mp2->WriteMessage(0,
289 hello, sizeof(hello),
290 NULL,
291 MOJO_WRITE_MESSAGE_FLAG_NONE));
293 EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
294 mp3->RemoveWaiter(1, &waiter);
296 // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
297 buffer_size = static_cast<uint32_t>(sizeof(buffer));
298 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
299 mp0->ReadMessage(0,
300 buffer, &buffer_size,
301 NULL, NULL,
302 MOJO_READ_MESSAGE_FLAG_NONE));
303 buffer_size = static_cast<uint32_t>(sizeof(buffer));
304 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
305 mp1->ReadMessage(1,
306 buffer, &buffer_size,
307 NULL, NULL,
308 MOJO_READ_MESSAGE_FLAG_NONE));
309 buffer_size = static_cast<uint32_t>(sizeof(buffer));
310 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
311 mp2->ReadMessage(0,
312 buffer, &buffer_size,
313 NULL, NULL,
314 MOJO_READ_MESSAGE_FLAG_NONE));
316 // Read from MP 3, port 1.
317 buffer_size = static_cast<uint32_t>(sizeof(buffer));
318 EXPECT_EQ(MOJO_RESULT_OK,
319 mp3->ReadMessage(1,
320 buffer, &buffer_size,
321 NULL, NULL,
322 MOJO_READ_MESSAGE_FLAG_NONE));
323 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
324 EXPECT_STREQ(hello, buffer);
326 // Write: MP 0, port 0 -> MP 1, port 1 again.
328 waiter.Init();
329 EXPECT_EQ(MOJO_RESULT_OK,
330 mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
332 EXPECT_EQ(MOJO_RESULT_OK,
333 mp0->WriteMessage(0,
334 world, sizeof(world),
335 NULL,
336 MOJO_WRITE_MESSAGE_FLAG_NONE));
338 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
339 mp1->RemoveWaiter(1, &waiter);
341 // Make sure there's nothing on the other ports.
342 buffer_size = static_cast<uint32_t>(sizeof(buffer));
343 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
344 mp0->ReadMessage(0,
345 buffer, &buffer_size,
346 NULL, NULL,
347 MOJO_READ_MESSAGE_FLAG_NONE));
348 buffer_size = static_cast<uint32_t>(sizeof(buffer));
349 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
350 mp2->ReadMessage(0,
351 buffer, &buffer_size,
352 NULL, NULL,
353 MOJO_READ_MESSAGE_FLAG_NONE));
354 buffer_size = static_cast<uint32_t>(sizeof(buffer));
355 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
356 mp3->ReadMessage(1,
357 buffer, &buffer_size,
358 NULL, NULL,
359 MOJO_READ_MESSAGE_FLAG_NONE));
361 buffer_size = static_cast<uint32_t>(sizeof(buffer));
362 EXPECT_EQ(MOJO_RESULT_OK,
363 mp1->ReadMessage(1,
364 buffer, &buffer_size,
365 NULL, NULL,
366 MOJO_READ_MESSAGE_FLAG_NONE));
367 EXPECT_EQ(sizeof(world), static_cast<size_t>(buffer_size));
368 EXPECT_STREQ(world, buffer);
370 mp0->Close(0);
371 mp1->Close(1);
372 mp2->Close(0);
373 mp3->Close(1);
376 TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
377 const char hello[] = "hello";
378 char buffer[100] = { 0 };
379 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
380 Waiter waiter;
382 // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
383 // connected to MP 1, port 0, which will be attached to channel 1. This leaves
384 // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.
386 scoped_refptr<MessagePipe> mp0(new MessagePipe(
387 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
388 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
390 // Write to MP 0, port 0.
391 EXPECT_EQ(MOJO_RESULT_OK,
392 mp0->WriteMessage(0,
393 hello, sizeof(hello),
394 NULL,
395 MOJO_WRITE_MESSAGE_FLAG_NONE));
397 BootstrapMessagePipeNoWait(0, mp0);
400 // Close MP 0, port 0 before channel 1 is even connected.
401 mp0->Close(0);
403 scoped_refptr<MessagePipe> mp1(new MessagePipe(
404 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
405 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
407 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
408 // it later, it might already be readable.)
409 waiter.Init();
410 EXPECT_EQ(MOJO_RESULT_OK,
411 mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
413 BootstrapMessagePipeNoWait(1, mp1);
415 // Wait.
416 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
417 mp1->RemoveWaiter(1, &waiter);
419 // Read from MP 1, port 1.
420 EXPECT_EQ(MOJO_RESULT_OK,
421 mp1->ReadMessage(1,
422 buffer, &buffer_size,
423 NULL, NULL,
424 MOJO_READ_MESSAGE_FLAG_NONE));
425 EXPECT_EQ(sizeof(hello), static_cast<size_t>(buffer_size));
426 EXPECT_STREQ(hello, buffer);
428 // And MP 1, port 1.
429 mp1->Close(1);
432 // TODO(vtl): Handle-passing isn't actually implemented yet. For now, this tests
433 // things leading up to it.
434 TEST_F(RemoteMessagePipeTest, HandlePassing) {
435 const char hello[] = "hello";
436 Waiter waiter;
438 scoped_refptr<MessagePipe> mp0(new MessagePipe(
439 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
440 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
441 scoped_refptr<MessagePipe> mp1(new MessagePipe(
442 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
443 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
444 ConnectMessagePipes(mp0, mp1);
446 // We'll try to pass this dispatcher.
447 scoped_refptr<MessagePipeDispatcher> dispatcher(new MessagePipeDispatcher());
448 scoped_refptr<MessagePipe> local_mp(new MessagePipe());
449 dispatcher->Init(local_mp, 0);
451 // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
452 // it later, it might already be readable.)
453 waiter.Init();
454 EXPECT_EQ(MOJO_RESULT_OK,
455 mp1->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 123));
457 // Write to MP 0, port 0.
459 DispatcherTransport
460 transport(test::DispatcherTryStartTransport(dispatcher.get()));
461 EXPECT_TRUE(transport.is_valid());
463 std::vector<DispatcherTransport> transports;
464 transports.push_back(transport);
465 EXPECT_EQ(MOJO_RESULT_OK,
466 mp0->WriteMessage(0, hello, sizeof(hello), &transports,
467 MOJO_WRITE_MESSAGE_FLAG_NONE));
468 transport.End();
470 // |dispatcher| should have been closed. This is |DCHECK()|ed when the
471 // |dispatcher| is destroyed.
472 EXPECT_TRUE(dispatcher->HasOneRef());
473 dispatcher = NULL;
476 // Wait.
477 EXPECT_EQ(123, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
478 mp1->RemoveWaiter(1, &waiter);
480 // Read from MP 1, port 1.
481 char read_buffer[100] = { 0 };
482 uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
483 std::vector<scoped_refptr<Dispatcher> > read_dispatchers;
484 uint32_t read_num_dispatchers = 10; // Maximum to get.
485 EXPECT_EQ(MOJO_RESULT_OK,
486 mp1->ReadMessage(1, read_buffer, &read_buffer_size,
487 &read_dispatchers, &read_num_dispatchers,
488 MOJO_READ_MESSAGE_FLAG_NONE));
489 EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
490 EXPECT_STREQ(hello, read_buffer);
491 EXPECT_EQ(1u, read_dispatchers.size());
492 EXPECT_EQ(1u, read_num_dispatchers);
493 ASSERT_TRUE(read_dispatchers[0].get());
494 EXPECT_TRUE(read_dispatchers[0]->HasOneRef());
496 EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
497 dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
499 // Write to "local_mp", port 1.
500 EXPECT_EQ(MOJO_RESULT_OK,
501 local_mp->WriteMessage(1, hello, sizeof(hello), NULL,
502 MOJO_WRITE_MESSAGE_FLAG_NONE));
504 // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
505 // here. (We don't crash if I sleep and then close.)
507 // Wait for the dispatcher to become readable.
508 waiter.Init();
509 EXPECT_EQ(MOJO_RESULT_OK,
510 dispatcher->AddWaiter(&waiter, MOJO_WAIT_FLAG_READABLE, 456));
511 EXPECT_EQ(456, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
512 dispatcher->RemoveWaiter(&waiter);
514 // Read from the dispatcher.
515 memset(read_buffer, 0, sizeof(read_buffer));
516 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
517 EXPECT_EQ(MOJO_RESULT_OK,
518 dispatcher->ReadMessage(read_buffer, &read_buffer_size, 0, NULL,
519 MOJO_READ_MESSAGE_FLAG_NONE));
520 EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
521 EXPECT_STREQ(hello, read_buffer);
523 // Prepare to wait on "local_mp", port 1.
524 waiter.Init();
525 EXPECT_EQ(MOJO_RESULT_OK,
526 local_mp->AddWaiter(1, &waiter, MOJO_WAIT_FLAG_READABLE, 789));
528 // Write to the dispatcher.
529 EXPECT_EQ(MOJO_RESULT_OK,
530 dispatcher->WriteMessage(hello, sizeof(hello), NULL,
531 MOJO_WRITE_MESSAGE_FLAG_NONE));
533 // Wait.
534 EXPECT_EQ(789, waiter.Wait(MOJO_DEADLINE_INDEFINITE));
535 local_mp->RemoveWaiter(1, &waiter);
537 // Read from "local_mp", port 1.
538 memset(read_buffer, 0, sizeof(read_buffer));
539 read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
540 EXPECT_EQ(MOJO_RESULT_OK,
541 local_mp->ReadMessage(1, read_buffer, &read_buffer_size, NULL, NULL,
542 MOJO_READ_MESSAGE_FLAG_NONE));
543 EXPECT_EQ(sizeof(hello), static_cast<size_t>(read_buffer_size));
544 EXPECT_STREQ(hello, read_buffer);
546 // TODO(vtl): Also test that messages queued up before the handle was sent are
547 // delivered properly.
549 // Close everything that belongs to us.
550 mp0->Close(0);
551 mp1->Close(1);
552 EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
553 // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
554 local_mp->Close(1);
557 // Test racing closes (on each end).
558 // Note: A flaky failure would almost certainly indicate a problem in the code
559 // itself (not in the test). Also, any logged warnings/errors would also
560 // probably be indicative of bugs.
561 TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
562 base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);
564 for (unsigned i = 0u; i < 256u; i++) {
565 scoped_refptr<MessagePipe> mp0(new MessagePipe(
566 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint()),
567 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint())));
568 BootstrapMessagePipeNoWait(0, mp0);
570 scoped_refptr<MessagePipe> mp1(new MessagePipe(
571 scoped_ptr<MessagePipeEndpoint>(new ProxyMessagePipeEndpoint()),
572 scoped_ptr<MessagePipeEndpoint>(new LocalMessagePipeEndpoint())));
573 BootstrapMessagePipeNoWait(1, mp1);
575 if (i & 1u) {
576 io_thread()->task_runner()->PostTask(
577 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
579 if (i & 2u)
580 base::PlatformThread::Sleep(delay);
582 mp0->Close(0);
584 if (i & 4u) {
585 io_thread()->task_runner()->PostTask(
586 FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
588 if (i & 8u)
589 base::PlatformThread::Sleep(delay);
591 mp1->Close(1);
593 RestoreInitialState();
597 } // namespace
598 } // namespace system
599 } // namespace mojo