1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
10 #include "mojo/system/message_pipe_dispatcher.h"
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
32 TEST(MessagePipeDispatcherTest
, Basic
) {
33 test::Stopwatch stopwatch
;
35 const uint32_t kBufferSize
= static_cast<uint32_t>(sizeof(buffer
));
38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39 for (unsigned i
= 0; i
< 2; i
++) {
40 scoped_refptr
<MessagePipeDispatcher
> d0(new MessagePipeDispatcher(
41 MessagePipeDispatcher::kDefaultCreateOptions
));
42 EXPECT_EQ(Dispatcher::kTypeMessagePipe
, d0
->GetType());
43 scoped_refptr
<MessagePipeDispatcher
> d1(new MessagePipeDispatcher(
44 MessagePipeDispatcher::kDefaultCreateOptions
));
46 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
47 d0
->Init(mp
, i
); // 0, 1.
48 d1
->Init(mp
, i
^ 1); // 1, 0.
52 HandleSignalsState hss
;
54 // Try adding a writable waiter when already writable.
56 hss
= HandleSignalsState();
57 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
,
58 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_WRITABLE
, 0, &hss
));
59 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE
, hss
.satisfied_signals
);
60 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
61 hss
.satisfiable_signals
);
62 // Shouldn't need to remove the waiter (it was not added).
64 // Add a readable waiter to |d0|, then make it readable (by writing to
67 ASSERT_EQ(MOJO_RESULT_OK
,
68 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 1, NULL
));
69 buffer
[0] = 123456789;
70 EXPECT_EQ(MOJO_RESULT_OK
,
71 d1
->WriteMessage(UserPointer
<const void>(buffer
),
74 MOJO_WRITE_MESSAGE_FLAG_NONE
));
76 EXPECT_EQ(MOJO_RESULT_OK
, w
.Wait(MOJO_DEADLINE_INDEFINITE
, &context
));
77 EXPECT_EQ(1u, context
);
78 EXPECT_LT(stopwatch
.Elapsed(), test::EpsilonTimeout());
79 hss
= HandleSignalsState();
80 d0
->RemoveWaiter(&w
, &hss
);
81 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
82 hss
.satisfied_signals
);
83 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
84 hss
.satisfiable_signals
);
86 // Try adding a readable waiter when already readable (from above).
88 hss
= HandleSignalsState();
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
,
90 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 2, &hss
));
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
92 hss
.satisfied_signals
);
93 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
94 hss
.satisfiable_signals
);
95 // Shouldn't need to remove the waiter (it was not added).
97 // Make |d0| no longer readable (by reading from it).
99 buffer_size
= kBufferSize
;
100 EXPECT_EQ(MOJO_RESULT_OK
,
101 d0
->ReadMessage(UserPointer
<void>(buffer
),
102 MakeUserPointer(&buffer_size
),
105 MOJO_READ_MESSAGE_FLAG_NONE
));
106 EXPECT_EQ(kBufferSize
, buffer_size
);
107 EXPECT_EQ(123456789, buffer
[0]);
109 // Wait for zero time for readability on |d0| (will time out).
111 ASSERT_EQ(MOJO_RESULT_OK
,
112 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 3, NULL
));
114 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED
, w
.Wait(0, NULL
));
115 EXPECT_LT(stopwatch
.Elapsed(), test::EpsilonTimeout());
116 hss
= HandleSignalsState();
117 d0
->RemoveWaiter(&w
, &hss
);
118 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE
, hss
.satisfied_signals
);
119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
120 hss
.satisfiable_signals
);
122 // Wait for non-zero, finite time for readability on |d0| (will time out).
124 ASSERT_EQ(MOJO_RESULT_OK
,
125 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 3, NULL
));
127 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED
,
128 w
.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL
));
129 base::TimeDelta elapsed
= stopwatch
.Elapsed();
130 EXPECT_GT(elapsed
, (2 - 1) * test::EpsilonTimeout());
131 EXPECT_LT(elapsed
, (2 + 1) * test::EpsilonTimeout());
132 hss
= HandleSignalsState();
133 d0
->RemoveWaiter(&w
, &hss
);
134 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE
, hss
.satisfied_signals
);
135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
136 hss
.satisfiable_signals
);
138 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
139 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
143 TEST(MessagePipeDispatcherTest
, InvalidParams
) {
146 scoped_refptr
<MessagePipeDispatcher
> d0(
147 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
148 scoped_refptr
<MessagePipeDispatcher
> d1(
149 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
151 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
158 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED
,
159 d0
->WriteMessage(UserPointer
<const void>(buffer
),
160 std::numeric_limits
<uint32_t>::max(),
162 MOJO_WRITE_MESSAGE_FLAG_NONE
));
164 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
165 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
168 // These test invalid arguments that should cause death if we're being paranoid
169 // about checking arguments (which we would want to do if, e.g., we were in a
170 // true "kernel" situation, but we might not want to do otherwise for
171 // performance reasons). Probably blatant errors like passing in null pointers
172 // (for required pointer arguments) will still cause death, but perhaps not
174 TEST(MessagePipeDispatcherTest
, InvalidParamsDeath
) {
175 const char kMemoryCheckFailedRegex
[] = "Check failed";
177 scoped_refptr
<MessagePipeDispatcher
> d0(
178 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
179 scoped_refptr
<MessagePipeDispatcher
> d1(
180 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
182 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
188 // Null buffer with nonzero buffer size.
189 EXPECT_DEATH_IF_SUPPORTED(
191 NullUserPointer(), 1, NULL
, MOJO_WRITE_MESSAGE_FLAG_NONE
),
192 kMemoryCheckFailedRegex
);
195 // Null buffer with nonzero buffer size.
196 // First write something so that we actually have something to read.
200 UserPointer
<const void>("x"), 1, NULL
, MOJO_WRITE_MESSAGE_FLAG_NONE
));
201 uint32_t buffer_size
= 1;
202 EXPECT_DEATH_IF_SUPPORTED(d0
->ReadMessage(NullUserPointer(),
203 MakeUserPointer(&buffer_size
),
206 MOJO_READ_MESSAGE_FLAG_NONE
),
207 kMemoryCheckFailedRegex
);
209 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
210 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
213 // Test what happens when one end is closed (single-threaded test).
214 TEST(MessagePipeDispatcherTest
, BasicClosed
) {
216 const uint32_t kBufferSize
= static_cast<uint32_t>(sizeof(buffer
));
217 uint32_t buffer_size
;
219 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
220 for (unsigned i
= 0; i
< 2; i
++) {
221 scoped_refptr
<MessagePipeDispatcher
> d0(new MessagePipeDispatcher(
222 MessagePipeDispatcher::kDefaultCreateOptions
));
223 scoped_refptr
<MessagePipeDispatcher
> d1(new MessagePipeDispatcher(
224 MessagePipeDispatcher::kDefaultCreateOptions
));
226 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
227 d0
->Init(mp
, i
); // 0, 1.
228 d1
->Init(mp
, i
^ 1); // 1, 0.
231 HandleSignalsState hss
;
233 // Write (twice) to |d1|.
234 buffer
[0] = 123456789;
235 EXPECT_EQ(MOJO_RESULT_OK
,
236 d1
->WriteMessage(UserPointer
<const void>(buffer
),
239 MOJO_WRITE_MESSAGE_FLAG_NONE
));
240 buffer
[0] = 234567890;
241 EXPECT_EQ(MOJO_RESULT_OK
,
242 d1
->WriteMessage(UserPointer
<const void>(buffer
),
245 MOJO_WRITE_MESSAGE_FLAG_NONE
));
247 // Try waiting for readable on |d0|; should fail (already satisfied).
249 hss
= HandleSignalsState();
250 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
,
251 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 0, &hss
));
252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
253 hss
.satisfied_signals
);
254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
255 hss
.satisfiable_signals
);
257 // Try reading from |d1|; should fail (nothing to read).
259 buffer_size
= kBufferSize
;
260 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT
,
261 d1
->ReadMessage(UserPointer
<void>(buffer
),
262 MakeUserPointer(&buffer_size
),
265 MOJO_READ_MESSAGE_FLAG_NONE
));
268 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
270 // Try waiting for readable on |d0|; should fail (already satisfied).
272 hss
= HandleSignalsState();
273 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
,
274 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 1, &hss
));
275 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
, hss
.satisfied_signals
);
276 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
, hss
.satisfiable_signals
);
280 buffer_size
= kBufferSize
;
281 EXPECT_EQ(MOJO_RESULT_OK
,
282 d0
->ReadMessage(UserPointer
<void>(buffer
),
283 MakeUserPointer(&buffer_size
),
286 MOJO_READ_MESSAGE_FLAG_NONE
));
287 EXPECT_EQ(kBufferSize
, buffer_size
);
288 EXPECT_EQ(123456789, buffer
[0]);
290 // Try waiting for readable on |d0|; should fail (already satisfied).
292 hss
= HandleSignalsState();
293 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
,
294 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 2, &hss
));
295 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
, hss
.satisfied_signals
);
296 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
, hss
.satisfiable_signals
);
298 // Read again from |d0|.
300 buffer_size
= kBufferSize
;
301 EXPECT_EQ(MOJO_RESULT_OK
,
302 d0
->ReadMessage(UserPointer
<void>(buffer
),
303 MakeUserPointer(&buffer_size
),
306 MOJO_READ_MESSAGE_FLAG_NONE
));
307 EXPECT_EQ(kBufferSize
, buffer_size
);
308 EXPECT_EQ(234567890, buffer
[0]);
310 // Try waiting for readable on |d0|; should fail (unsatisfiable).
312 hss
= HandleSignalsState();
313 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
,
314 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 3, &hss
));
315 EXPECT_EQ(0u, hss
.satisfied_signals
);
316 EXPECT_EQ(0u, hss
.satisfiable_signals
);
318 // Try waiting for writable on |d0|; should fail (unsatisfiable).
320 hss
= HandleSignalsState();
321 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
,
322 d0
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_WRITABLE
, 4, &hss
));
323 EXPECT_EQ(0u, hss
.satisfied_signals
);
324 EXPECT_EQ(0u, hss
.satisfiable_signals
);
326 // Try reading from |d0|; should fail (nothing to read and other end
329 buffer_size
= kBufferSize
;
330 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
,
331 d0
->ReadMessage(UserPointer
<void>(buffer
),
332 MakeUserPointer(&buffer_size
),
335 MOJO_READ_MESSAGE_FLAG_NONE
));
337 // Try writing to |d0|; should fail (other end closed).
338 buffer
[0] = 345678901;
339 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
,
340 d0
->WriteMessage(UserPointer
<const void>(buffer
),
343 MOJO_WRITE_MESSAGE_FLAG_NONE
));
345 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
350 // http://crbug.com/396386
351 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
353 #define MAYBE_BasicThreaded BasicThreaded
355 TEST(MessagePipeDispatcherTest
, MAYBE_BasicThreaded
) {
356 test::Stopwatch stopwatch
;
358 const uint32_t kBufferSize
= static_cast<uint32_t>(sizeof(buffer
));
359 uint32_t buffer_size
;
360 base::TimeDelta elapsed
;
364 HandleSignalsState hss
;
366 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
367 for (unsigned i
= 0; i
< 2; i
++) {
368 scoped_refptr
<MessagePipeDispatcher
> d0(new MessagePipeDispatcher(
369 MessagePipeDispatcher::kDefaultCreateOptions
));
370 scoped_refptr
<MessagePipeDispatcher
> d1(new MessagePipeDispatcher(
371 MessagePipeDispatcher::kDefaultCreateOptions
));
373 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
374 d0
->Init(mp
, i
); // 0, 1.
375 d1
->Init(mp
, i
^ 1); // 1, 0.
378 // Wait for readable on |d1|, which will become readable after some time.
380 test::WaiterThread
thread(d1
,
381 MOJO_HANDLE_SIGNAL_READABLE
,
382 MOJO_DEADLINE_INDEFINITE
,
390 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
391 // Wake it up by writing to |d0|.
392 buffer
[0] = 123456789;
393 EXPECT_EQ(MOJO_RESULT_OK
,
394 d0
->WriteMessage(UserPointer
<const void>(buffer
),
397 MOJO_WRITE_MESSAGE_FLAG_NONE
));
398 } // Joins the thread.
399 elapsed
= stopwatch
.Elapsed();
400 EXPECT_GT(elapsed
, (2 - 1) * test::EpsilonTimeout());
401 EXPECT_LT(elapsed
, (2 + 1) * test::EpsilonTimeout());
402 EXPECT_TRUE(did_wait
);
403 EXPECT_EQ(MOJO_RESULT_OK
, result
);
404 EXPECT_EQ(1u, context
);
405 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
406 hss
.satisfied_signals
);
407 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
408 hss
.satisfiable_signals
);
410 // Now |d1| is already readable. Try waiting for it again.
412 test::WaiterThread
thread(d1
,
413 MOJO_HANDLE_SIGNAL_READABLE
,
414 MOJO_DEADLINE_INDEFINITE
,
422 } // Joins the thread.
423 EXPECT_LT(stopwatch
.Elapsed(), test::EpsilonTimeout());
424 EXPECT_FALSE(did_wait
);
425 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS
, result
);
426 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
427 hss
.satisfied_signals
);
428 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE
| MOJO_HANDLE_SIGNAL_WRITABLE
,
429 hss
.satisfiable_signals
);
431 // Consume what we wrote to |d0|.
433 buffer_size
= kBufferSize
;
434 EXPECT_EQ(MOJO_RESULT_OK
,
435 d1
->ReadMessage(UserPointer
<void>(buffer
),
436 MakeUserPointer(&buffer_size
),
439 MOJO_READ_MESSAGE_FLAG_NONE
));
440 EXPECT_EQ(kBufferSize
, buffer_size
);
441 EXPECT_EQ(123456789, buffer
[0]);
443 // Wait for readable on |d1| and close |d0| after some time, which should
446 test::WaiterThread
thread(d1
,
447 MOJO_HANDLE_SIGNAL_READABLE
,
448 MOJO_DEADLINE_INDEFINITE
,
456 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
457 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
458 } // Joins the thread.
459 elapsed
= stopwatch
.Elapsed();
460 EXPECT_GT(elapsed
, (2 - 1) * test::EpsilonTimeout());
461 EXPECT_LT(elapsed
, (2 + 1) * test::EpsilonTimeout());
462 EXPECT_TRUE(did_wait
);
463 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION
, result
);
464 EXPECT_EQ(3u, context
);
465 EXPECT_EQ(0u, hss
.satisfied_signals
);
466 EXPECT_EQ(0u, hss
.satisfiable_signals
);
468 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
471 for (unsigned i
= 0; i
< 2; i
++) {
472 scoped_refptr
<MessagePipeDispatcher
> d0(new MessagePipeDispatcher(
473 MessagePipeDispatcher::kDefaultCreateOptions
));
474 scoped_refptr
<MessagePipeDispatcher
> d1(new MessagePipeDispatcher(
475 MessagePipeDispatcher::kDefaultCreateOptions
));
477 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
478 d0
->Init(mp
, i
); // 0, 1.
479 d1
->Init(mp
, i
^ 1); // 1, 0.
482 // Wait for readable on |d1| and close |d1| after some time, which should
485 test::WaiterThread
thread(d1
,
486 MOJO_HANDLE_SIGNAL_READABLE
,
487 MOJO_DEADLINE_INDEFINITE
,
495 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
496 EXPECT_EQ(MOJO_RESULT_OK
, d1
->Close());
497 } // Joins the thread.
498 elapsed
= stopwatch
.Elapsed();
499 EXPECT_GT(elapsed
, (2 - 1) * test::EpsilonTimeout());
500 EXPECT_LT(elapsed
, (2 + 1) * test::EpsilonTimeout());
501 EXPECT_TRUE(did_wait
);
502 EXPECT_EQ(MOJO_RESULT_CANCELLED
, result
);
503 EXPECT_EQ(4u, context
);
504 EXPECT_EQ(0u, hss
.satisfied_signals
);
505 EXPECT_EQ(0u, hss
.satisfiable_signals
);
507 EXPECT_EQ(MOJO_RESULT_OK
, d0
->Close());
511 // Stress test -----------------------------------------------------------------
513 const size_t kMaxMessageSize
= 2000;
515 class WriterThread
: public base::SimpleThread
{
517 // |*messages_written| and |*bytes_written| belong to the thread while it's
519 WriterThread(scoped_refptr
<Dispatcher
> write_dispatcher
,
520 size_t* messages_written
,
521 size_t* bytes_written
)
522 : base::SimpleThread("writer_thread"),
523 write_dispatcher_(write_dispatcher
),
524 messages_written_(messages_written
),
525 bytes_written_(bytes_written
) {
526 *messages_written_
= 0;
530 virtual ~WriterThread() { Join(); }
533 virtual void Run() OVERRIDE
{
534 // Make some data to write.
535 unsigned char buffer
[kMaxMessageSize
];
536 for (size_t i
= 0; i
< kMaxMessageSize
; i
++)
537 buffer
[i
] = static_cast<unsigned char>(i
);
539 // Number of messages to write.
540 *messages_written_
= static_cast<size_t>(base::RandInt(1000, 6000));
543 for (size_t i
= 0; i
< *messages_written_
; i
++) {
544 uint32_t bytes_to_write
= static_cast<uint32_t>(
545 base::RandInt(1, static_cast<int>(kMaxMessageSize
)));
546 EXPECT_EQ(MOJO_RESULT_OK
,
547 write_dispatcher_
->WriteMessage(UserPointer
<const void>(buffer
),
550 MOJO_WRITE_MESSAGE_FLAG_NONE
));
551 *bytes_written_
+= bytes_to_write
;
554 // Write one last "quit" message.
555 EXPECT_EQ(MOJO_RESULT_OK
,
556 write_dispatcher_
->WriteMessage(UserPointer
<const void>("quit"),
559 MOJO_WRITE_MESSAGE_FLAG_NONE
));
562 const scoped_refptr
<Dispatcher
> write_dispatcher_
;
563 size_t* const messages_written_
;
564 size_t* const bytes_written_
;
566 DISALLOW_COPY_AND_ASSIGN(WriterThread
);
569 class ReaderThread
: public base::SimpleThread
{
571 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
572 ReaderThread(scoped_refptr
<Dispatcher
> read_dispatcher
,
573 size_t* messages_read
,
575 : base::SimpleThread("reader_thread"),
576 read_dispatcher_(read_dispatcher
),
577 messages_read_(messages_read
),
578 bytes_read_(bytes_read
) {
583 virtual ~ReaderThread() { Join(); }
586 virtual void Run() OVERRIDE
{
587 unsigned char buffer
[kMaxMessageSize
];
589 HandleSignalsState hss
;
594 // Wait for it to be readable.
596 hss
= HandleSignalsState();
598 read_dispatcher_
->AddWaiter(&w
, MOJO_HANDLE_SIGNAL_READABLE
, 0, &hss
);
599 EXPECT_TRUE(result
== MOJO_RESULT_OK
||
600 result
== MOJO_RESULT_ALREADY_EXISTS
)
601 << "result: " << result
;
602 if (result
== MOJO_RESULT_OK
) {
603 // Actually need to wait.
604 EXPECT_EQ(MOJO_RESULT_OK
, w
.Wait(MOJO_DEADLINE_INDEFINITE
, NULL
));
605 read_dispatcher_
->RemoveWaiter(&w
, &hss
);
607 // We may not actually be readable, since we're racing with other threads.
608 EXPECT_TRUE((hss
.satisfiable_signals
& MOJO_HANDLE_SIGNAL_READABLE
));
610 // Now, try to do the read.
611 // Clear the buffer so that we can check the result.
612 memset(buffer
, 0, sizeof(buffer
));
613 uint32_t buffer_size
= static_cast<uint32_t>(sizeof(buffer
));
614 result
= read_dispatcher_
->ReadMessage(UserPointer
<void>(buffer
),
615 MakeUserPointer(&buffer_size
),
618 MOJO_READ_MESSAGE_FLAG_NONE
);
619 EXPECT_TRUE(result
== MOJO_RESULT_OK
|| result
== MOJO_RESULT_SHOULD_WAIT
)
620 << "result: " << result
;
621 // We're racing with others to read, so maybe we failed.
622 if (result
== MOJO_RESULT_SHOULD_WAIT
)
623 continue; // In which case, try again.
625 if (buffer_size
== 4 && memcmp("quit", buffer
, 4) == 0)
627 EXPECT_GE(buffer_size
, 1u);
628 EXPECT_LE(buffer_size
, kMaxMessageSize
);
629 EXPECT_TRUE(IsValidMessage(buffer
, buffer_size
));
632 *bytes_read_
+= buffer_size
;
636 static bool IsValidMessage(const unsigned char* buffer
,
637 uint32_t message_size
) {
639 for (i
= 0; i
< message_size
; i
++) {
640 if (buffer
[i
] != static_cast<unsigned char>(i
))
643 // Check that the remaining bytes weren't stomped on.
644 for (; i
< kMaxMessageSize
; i
++) {
651 const scoped_refptr
<Dispatcher
> read_dispatcher_
;
652 size_t* const messages_read_
;
653 size_t* const bytes_read_
;
655 DISALLOW_COPY_AND_ASSIGN(ReaderThread
);
658 TEST(MessagePipeDispatcherTest
, Stress
) {
659 static const size_t kNumWriters
= 30;
660 static const size_t kNumReaders
= kNumWriters
;
662 scoped_refptr
<MessagePipeDispatcher
> d_write(
663 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
664 scoped_refptr
<MessagePipeDispatcher
> d_read(
665 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions
));
667 scoped_refptr
<MessagePipe
> mp(MessagePipe::CreateLocalLocal());
668 d_write
->Init(mp
, 0);
672 size_t messages_written
[kNumWriters
];
673 size_t bytes_written
[kNumWriters
];
674 size_t messages_read
[kNumReaders
];
675 size_t bytes_read
[kNumReaders
];
678 ScopedVector
<WriterThread
> writers
;
679 for (size_t i
= 0; i
< kNumWriters
; i
++) {
681 new WriterThread(d_write
, &messages_written
[i
], &bytes_written
[i
]));
685 ScopedVector
<ReaderThread
> readers
;
686 for (size_t i
= 0; i
< kNumReaders
; i
++) {
688 new ReaderThread(d_read
, &messages_read
[i
], &bytes_read
[i
]));
692 for (size_t i
= 0; i
< kNumWriters
; i
++)
696 for (size_t i
= 0; i
< kNumReaders
; i
++)
699 // TODO(vtl): Maybe I should have an event that triggers all the threads to
700 // start doing stuff for real (so that the first ones created/started aren't
702 } // Joins all the threads.
704 size_t total_messages_written
= 0;
705 size_t total_bytes_written
= 0;
706 for (size_t i
= 0; i
< kNumWriters
; i
++) {
707 total_messages_written
+= messages_written
[i
];
708 total_bytes_written
+= bytes_written
[i
];
710 size_t total_messages_read
= 0;
711 size_t total_bytes_read
= 0;
712 for (size_t i
= 0; i
< kNumReaders
; i
++) {
713 total_messages_read
+= messages_read
[i
];
714 total_bytes_read
+= bytes_read
[i
];
715 // We'd have to be really unlucky to have read no messages on a thread.
716 EXPECT_GT(messages_read
[i
], 0u) << "reader: " << i
;
717 EXPECT_GE(bytes_read
[i
], messages_read
[i
]) << "reader: " << i
;
719 EXPECT_EQ(total_messages_written
, total_messages_read
);
720 EXPECT_EQ(total_bytes_written
, total_bytes_read
);
722 EXPECT_EQ(MOJO_RESULT_OK
, d_write
->Close());
723 EXPECT_EQ(MOJO_RESULT_OK
, d_read
->Close());
727 } // namespace system