Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / mojo / system / message_pipe_dispatcher_unittest.cc
blob2acb8cb6ceda2d1ed0b99d0028c1d2b749e7d2aa
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 // NOTE(vtl): Some of these tests are inherently flaky (e.g., if run on a
6 // heavily-loaded system). Sorry. |test::EpsilonTimeout()| may be increased to
7 // increase tolerance and reduce observed flakiness (though doing so reduces the
8 // meaningfulness of the test).
10 #include "mojo/system/message_pipe_dispatcher.h"
12 #include <string.h>
14 #include <limits>
16 #include "base/memory/ref_counted.h"
17 #include "base/memory/scoped_vector.h"
18 #include "base/rand_util.h"
19 #include "base/threading/platform_thread.h" // For |Sleep()|.
20 #include "base/threading/simple_thread.h"
21 #include "base/time/time.h"
22 #include "mojo/system/message_pipe.h"
23 #include "mojo/system/test_utils.h"
24 #include "mojo/system/waiter.h"
25 #include "mojo/system/waiter_test_utils.h"
26 #include "testing/gtest/include/gtest/gtest.h"
28 namespace mojo {
29 namespace system {
30 namespace {
32 TEST(MessagePipeDispatcherTest, Basic) {
33 test::Stopwatch stopwatch;
34 int32_t buffer[1];
35 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
36 uint32_t buffer_size;
38 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
39 for (unsigned i = 0; i < 2; i++) {
40 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
41 MessagePipeDispatcher::kDefaultCreateOptions));
42 EXPECT_EQ(Dispatcher::kTypeMessagePipe, d0->GetType());
43 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
44 MessagePipeDispatcher::kDefaultCreateOptions));
46 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
47 d0->Init(mp, i); // 0, 1.
48 d1->Init(mp, i ^ 1); // 1, 0.
50 Waiter w;
51 uint32_t context = 0;
52 HandleSignalsState hss;
54 // Try adding a writable waiter when already writable.
55 w.Init();
56 hss = HandleSignalsState();
57 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
58 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 0, &hss));
59 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
60 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
61 hss.satisfiable_signals);
62 // Shouldn't need to remove the waiter (it was not added).
64 // Add a readable waiter to |d0|, then make it readable (by writing to
65 // |d1|), then wait.
66 w.Init();
67 ASSERT_EQ(MOJO_RESULT_OK,
68 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, NULL));
69 buffer[0] = 123456789;
70 EXPECT_EQ(MOJO_RESULT_OK,
71 d1->WriteMessage(UserPointer<const void>(buffer),
72 kBufferSize,
73 NULL,
74 MOJO_WRITE_MESSAGE_FLAG_NONE));
75 stopwatch.Start();
76 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, &context));
77 EXPECT_EQ(1u, context);
78 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
79 hss = HandleSignalsState();
80 d0->RemoveWaiter(&w, &hss);
81 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
82 hss.satisfied_signals);
83 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
84 hss.satisfiable_signals);
86 // Try adding a readable waiter when already readable (from above).
87 w.Init();
88 hss = HandleSignalsState();
89 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
90 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
91 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
92 hss.satisfied_signals);
93 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
94 hss.satisfiable_signals);
95 // Shouldn't need to remove the waiter (it was not added).
97 // Make |d0| no longer readable (by reading from it).
98 buffer[0] = 0;
99 buffer_size = kBufferSize;
100 EXPECT_EQ(MOJO_RESULT_OK,
101 d0->ReadMessage(UserPointer<void>(buffer),
102 MakeUserPointer(&buffer_size),
104 NULL,
105 MOJO_READ_MESSAGE_FLAG_NONE));
106 EXPECT_EQ(kBufferSize, buffer_size);
107 EXPECT_EQ(123456789, buffer[0]);
109 // Wait for zero time for readability on |d0| (will time out).
110 w.Init();
111 ASSERT_EQ(MOJO_RESULT_OK,
112 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, NULL));
113 stopwatch.Start();
114 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED, w.Wait(0, NULL));
115 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
116 hss = HandleSignalsState();
117 d0->RemoveWaiter(&w, &hss);
118 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
119 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
120 hss.satisfiable_signals);
122 // Wait for non-zero, finite time for readability on |d0| (will time out).
123 w.Init();
124 ASSERT_EQ(MOJO_RESULT_OK,
125 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, NULL));
126 stopwatch.Start();
127 EXPECT_EQ(MOJO_RESULT_DEADLINE_EXCEEDED,
128 w.Wait(2 * test::EpsilonTimeout().InMicroseconds(), NULL));
129 base::TimeDelta elapsed = stopwatch.Elapsed();
130 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
131 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
132 hss = HandleSignalsState();
133 d0->RemoveWaiter(&w, &hss);
134 EXPECT_EQ(MOJO_HANDLE_SIGNAL_WRITABLE, hss.satisfied_signals);
135 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
136 hss.satisfiable_signals);
138 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
139 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
143 TEST(MessagePipeDispatcherTest, InvalidParams) {
144 char buffer[1];
146 scoped_refptr<MessagePipeDispatcher> d0(
147 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
148 scoped_refptr<MessagePipeDispatcher> d1(
149 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
151 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
152 d0->Init(mp, 0);
153 d1->Init(mp, 1);
156 // |WriteMessage|:
157 // Huge buffer size.
158 EXPECT_EQ(MOJO_RESULT_RESOURCE_EXHAUSTED,
159 d0->WriteMessage(UserPointer<const void>(buffer),
160 std::numeric_limits<uint32_t>::max(),
161 NULL,
162 MOJO_WRITE_MESSAGE_FLAG_NONE));
164 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
165 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
168 // These test invalid arguments that should cause death if we're being paranoid
169 // about checking arguments (which we would want to do if, e.g., we were in a
170 // true "kernel" situation, but we might not want to do otherwise for
171 // performance reasons). Probably blatant errors like passing in null pointers
172 // (for required pointer arguments) will still cause death, but perhaps not
173 // predictably.
174 TEST(MessagePipeDispatcherTest, InvalidParamsDeath) {
175 const char kMemoryCheckFailedRegex[] = "Check failed";
177 scoped_refptr<MessagePipeDispatcher> d0(
178 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
179 scoped_refptr<MessagePipeDispatcher> d1(
180 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
182 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
183 d0->Init(mp, 0);
184 d1->Init(mp, 1);
187 // |WriteMessage|:
188 // Null buffer with nonzero buffer size.
189 EXPECT_DEATH_IF_SUPPORTED(
190 d0->WriteMessage(
191 NullUserPointer(), 1, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE),
192 kMemoryCheckFailedRegex);
194 // |ReadMessage|:
195 // Null buffer with nonzero buffer size.
196 // First write something so that we actually have something to read.
197 EXPECT_EQ(
198 MOJO_RESULT_OK,
199 d1->WriteMessage(
200 UserPointer<const void>("x"), 1, NULL, MOJO_WRITE_MESSAGE_FLAG_NONE));
201 uint32_t buffer_size = 1;
202 EXPECT_DEATH_IF_SUPPORTED(d0->ReadMessage(NullUserPointer(),
203 MakeUserPointer(&buffer_size),
205 NULL,
206 MOJO_READ_MESSAGE_FLAG_NONE),
207 kMemoryCheckFailedRegex);
209 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
210 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
213 // Test what happens when one end is closed (single-threaded test).
214 TEST(MessagePipeDispatcherTest, BasicClosed) {
215 int32_t buffer[1];
216 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
217 uint32_t buffer_size;
219 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
220 for (unsigned i = 0; i < 2; i++) {
221 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
222 MessagePipeDispatcher::kDefaultCreateOptions));
223 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
224 MessagePipeDispatcher::kDefaultCreateOptions));
226 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
227 d0->Init(mp, i); // 0, 1.
228 d1->Init(mp, i ^ 1); // 1, 0.
230 Waiter w;
231 HandleSignalsState hss;
233 // Write (twice) to |d1|.
234 buffer[0] = 123456789;
235 EXPECT_EQ(MOJO_RESULT_OK,
236 d1->WriteMessage(UserPointer<const void>(buffer),
237 kBufferSize,
238 NULL,
239 MOJO_WRITE_MESSAGE_FLAG_NONE));
240 buffer[0] = 234567890;
241 EXPECT_EQ(MOJO_RESULT_OK,
242 d1->WriteMessage(UserPointer<const void>(buffer),
243 kBufferSize,
244 NULL,
245 MOJO_WRITE_MESSAGE_FLAG_NONE));
247 // Try waiting for readable on |d0|; should fail (already satisfied).
248 w.Init();
249 hss = HandleSignalsState();
250 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
251 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss));
252 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
253 hss.satisfied_signals);
254 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
255 hss.satisfiable_signals);
257 // Try reading from |d1|; should fail (nothing to read).
258 buffer[0] = 0;
259 buffer_size = kBufferSize;
260 EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
261 d1->ReadMessage(UserPointer<void>(buffer),
262 MakeUserPointer(&buffer_size),
264 NULL,
265 MOJO_READ_MESSAGE_FLAG_NONE));
267 // Close |d1|.
268 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
270 // Try waiting for readable on |d0|; should fail (already satisfied).
271 w.Init();
272 hss = HandleSignalsState();
273 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
274 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 1, &hss));
275 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
276 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
278 // Read from |d0|.
279 buffer[0] = 0;
280 buffer_size = kBufferSize;
281 EXPECT_EQ(MOJO_RESULT_OK,
282 d0->ReadMessage(UserPointer<void>(buffer),
283 MakeUserPointer(&buffer_size),
285 NULL,
286 MOJO_READ_MESSAGE_FLAG_NONE));
287 EXPECT_EQ(kBufferSize, buffer_size);
288 EXPECT_EQ(123456789, buffer[0]);
290 // Try waiting for readable on |d0|; should fail (already satisfied).
291 w.Init();
292 hss = HandleSignalsState();
293 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS,
294 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 2, &hss));
295 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfied_signals);
296 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE, hss.satisfiable_signals);
298 // Read again from |d0|.
299 buffer[0] = 0;
300 buffer_size = kBufferSize;
301 EXPECT_EQ(MOJO_RESULT_OK,
302 d0->ReadMessage(UserPointer<void>(buffer),
303 MakeUserPointer(&buffer_size),
305 NULL,
306 MOJO_READ_MESSAGE_FLAG_NONE));
307 EXPECT_EQ(kBufferSize, buffer_size);
308 EXPECT_EQ(234567890, buffer[0]);
310 // Try waiting for readable on |d0|; should fail (unsatisfiable).
311 w.Init();
312 hss = HandleSignalsState();
313 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
314 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 3, &hss));
315 EXPECT_EQ(0u, hss.satisfied_signals);
316 EXPECT_EQ(0u, hss.satisfiable_signals);
318 // Try waiting for writable on |d0|; should fail (unsatisfiable).
319 w.Init();
320 hss = HandleSignalsState();
321 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
322 d0->AddWaiter(&w, MOJO_HANDLE_SIGNAL_WRITABLE, 4, &hss));
323 EXPECT_EQ(0u, hss.satisfied_signals);
324 EXPECT_EQ(0u, hss.satisfiable_signals);
326 // Try reading from |d0|; should fail (nothing to read and other end
327 // closed).
328 buffer[0] = 0;
329 buffer_size = kBufferSize;
330 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
331 d0->ReadMessage(UserPointer<void>(buffer),
332 MakeUserPointer(&buffer_size),
334 NULL,
335 MOJO_READ_MESSAGE_FLAG_NONE));
337 // Try writing to |d0|; should fail (other end closed).
338 buffer[0] = 345678901;
339 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
340 d0->WriteMessage(UserPointer<const void>(buffer),
341 kBufferSize,
342 NULL,
343 MOJO_WRITE_MESSAGE_FLAG_NONE));
345 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
349 #if defined(OS_WIN)
350 // http://crbug.com/396386
351 #define MAYBE_BasicThreaded DISABLED_BasicThreaded
352 #else
353 #define MAYBE_BasicThreaded BasicThreaded
354 #endif
355 TEST(MessagePipeDispatcherTest, MAYBE_BasicThreaded) {
356 test::Stopwatch stopwatch;
357 int32_t buffer[1];
358 const uint32_t kBufferSize = static_cast<uint32_t>(sizeof(buffer));
359 uint32_t buffer_size;
360 base::TimeDelta elapsed;
361 bool did_wait;
362 MojoResult result;
363 uint32_t context;
364 HandleSignalsState hss;
366 // Run this test both with |d0| as port 0, |d1| as port 1 and vice versa.
367 for (unsigned i = 0; i < 2; i++) {
368 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
369 MessagePipeDispatcher::kDefaultCreateOptions));
370 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
371 MessagePipeDispatcher::kDefaultCreateOptions));
373 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
374 d0->Init(mp, i); // 0, 1.
375 d1->Init(mp, i ^ 1); // 1, 0.
378 // Wait for readable on |d1|, which will become readable after some time.
380 test::WaiterThread thread(d1,
381 MOJO_HANDLE_SIGNAL_READABLE,
382 MOJO_DEADLINE_INDEFINITE,
384 &did_wait,
385 &result,
386 &context,
387 &hss);
388 stopwatch.Start();
389 thread.Start();
390 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
391 // Wake it up by writing to |d0|.
392 buffer[0] = 123456789;
393 EXPECT_EQ(MOJO_RESULT_OK,
394 d0->WriteMessage(UserPointer<const void>(buffer),
395 kBufferSize,
396 NULL,
397 MOJO_WRITE_MESSAGE_FLAG_NONE));
398 } // Joins the thread.
399 elapsed = stopwatch.Elapsed();
400 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
401 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
402 EXPECT_TRUE(did_wait);
403 EXPECT_EQ(MOJO_RESULT_OK, result);
404 EXPECT_EQ(1u, context);
405 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
406 hss.satisfied_signals);
407 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
408 hss.satisfiable_signals);
410 // Now |d1| is already readable. Try waiting for it again.
412 test::WaiterThread thread(d1,
413 MOJO_HANDLE_SIGNAL_READABLE,
414 MOJO_DEADLINE_INDEFINITE,
416 &did_wait,
417 &result,
418 &context,
419 &hss);
420 stopwatch.Start();
421 thread.Start();
422 } // Joins the thread.
423 EXPECT_LT(stopwatch.Elapsed(), test::EpsilonTimeout());
424 EXPECT_FALSE(did_wait);
425 EXPECT_EQ(MOJO_RESULT_ALREADY_EXISTS, result);
426 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
427 hss.satisfied_signals);
428 EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
429 hss.satisfiable_signals);
431 // Consume what we wrote to |d0|.
432 buffer[0] = 0;
433 buffer_size = kBufferSize;
434 EXPECT_EQ(MOJO_RESULT_OK,
435 d1->ReadMessage(UserPointer<void>(buffer),
436 MakeUserPointer(&buffer_size),
438 NULL,
439 MOJO_READ_MESSAGE_FLAG_NONE));
440 EXPECT_EQ(kBufferSize, buffer_size);
441 EXPECT_EQ(123456789, buffer[0]);
443 // Wait for readable on |d1| and close |d0| after some time, which should
444 // cancel that wait.
446 test::WaiterThread thread(d1,
447 MOJO_HANDLE_SIGNAL_READABLE,
448 MOJO_DEADLINE_INDEFINITE,
450 &did_wait,
451 &result,
452 &context,
453 &hss);
454 stopwatch.Start();
455 thread.Start();
456 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
457 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
458 } // Joins the thread.
459 elapsed = stopwatch.Elapsed();
460 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
461 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
462 EXPECT_TRUE(did_wait);
463 EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION, result);
464 EXPECT_EQ(3u, context);
465 EXPECT_EQ(0u, hss.satisfied_signals);
466 EXPECT_EQ(0u, hss.satisfiable_signals);
468 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
471 for (unsigned i = 0; i < 2; i++) {
472 scoped_refptr<MessagePipeDispatcher> d0(new MessagePipeDispatcher(
473 MessagePipeDispatcher::kDefaultCreateOptions));
474 scoped_refptr<MessagePipeDispatcher> d1(new MessagePipeDispatcher(
475 MessagePipeDispatcher::kDefaultCreateOptions));
477 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
478 d0->Init(mp, i); // 0, 1.
479 d1->Init(mp, i ^ 1); // 1, 0.
482 // Wait for readable on |d1| and close |d1| after some time, which should
483 // cancel that wait.
485 test::WaiterThread thread(d1,
486 MOJO_HANDLE_SIGNAL_READABLE,
487 MOJO_DEADLINE_INDEFINITE,
489 &did_wait,
490 &result,
491 &context,
492 &hss);
493 stopwatch.Start();
494 thread.Start();
495 base::PlatformThread::Sleep(2 * test::EpsilonTimeout());
496 EXPECT_EQ(MOJO_RESULT_OK, d1->Close());
497 } // Joins the thread.
498 elapsed = stopwatch.Elapsed();
499 EXPECT_GT(elapsed, (2 - 1) * test::EpsilonTimeout());
500 EXPECT_LT(elapsed, (2 + 1) * test::EpsilonTimeout());
501 EXPECT_TRUE(did_wait);
502 EXPECT_EQ(MOJO_RESULT_CANCELLED, result);
503 EXPECT_EQ(4u, context);
504 EXPECT_EQ(0u, hss.satisfied_signals);
505 EXPECT_EQ(0u, hss.satisfiable_signals);
507 EXPECT_EQ(MOJO_RESULT_OK, d0->Close());
511 // Stress test -----------------------------------------------------------------
513 const size_t kMaxMessageSize = 2000;
515 class WriterThread : public base::SimpleThread {
516 public:
517 // |*messages_written| and |*bytes_written| belong to the thread while it's
518 // alive.
519 WriterThread(scoped_refptr<Dispatcher> write_dispatcher,
520 size_t* messages_written,
521 size_t* bytes_written)
522 : base::SimpleThread("writer_thread"),
523 write_dispatcher_(write_dispatcher),
524 messages_written_(messages_written),
525 bytes_written_(bytes_written) {
526 *messages_written_ = 0;
527 *bytes_written_ = 0;
530 virtual ~WriterThread() { Join(); }
532 private:
533 virtual void Run() OVERRIDE {
534 // Make some data to write.
535 unsigned char buffer[kMaxMessageSize];
536 for (size_t i = 0; i < kMaxMessageSize; i++)
537 buffer[i] = static_cast<unsigned char>(i);
539 // Number of messages to write.
540 *messages_written_ = static_cast<size_t>(base::RandInt(1000, 6000));
542 // Write messages.
543 for (size_t i = 0; i < *messages_written_; i++) {
544 uint32_t bytes_to_write = static_cast<uint32_t>(
545 base::RandInt(1, static_cast<int>(kMaxMessageSize)));
546 EXPECT_EQ(MOJO_RESULT_OK,
547 write_dispatcher_->WriteMessage(UserPointer<const void>(buffer),
548 bytes_to_write,
549 NULL,
550 MOJO_WRITE_MESSAGE_FLAG_NONE));
551 *bytes_written_ += bytes_to_write;
554 // Write one last "quit" message.
555 EXPECT_EQ(MOJO_RESULT_OK,
556 write_dispatcher_->WriteMessage(UserPointer<const void>("quit"),
558 NULL,
559 MOJO_WRITE_MESSAGE_FLAG_NONE));
562 const scoped_refptr<Dispatcher> write_dispatcher_;
563 size_t* const messages_written_;
564 size_t* const bytes_written_;
566 DISALLOW_COPY_AND_ASSIGN(WriterThread);
569 class ReaderThread : public base::SimpleThread {
570 public:
571 // |*messages_read| and |*bytes_read| belong to the thread while it's alive.
572 ReaderThread(scoped_refptr<Dispatcher> read_dispatcher,
573 size_t* messages_read,
574 size_t* bytes_read)
575 : base::SimpleThread("reader_thread"),
576 read_dispatcher_(read_dispatcher),
577 messages_read_(messages_read),
578 bytes_read_(bytes_read) {
579 *messages_read_ = 0;
580 *bytes_read_ = 0;
583 virtual ~ReaderThread() { Join(); }
585 private:
586 virtual void Run() OVERRIDE {
587 unsigned char buffer[kMaxMessageSize];
588 Waiter w;
589 HandleSignalsState hss;
590 MojoResult result;
592 // Read messages.
593 for (;;) {
594 // Wait for it to be readable.
595 w.Init();
596 hss = HandleSignalsState();
597 result =
598 read_dispatcher_->AddWaiter(&w, MOJO_HANDLE_SIGNAL_READABLE, 0, &hss);
599 EXPECT_TRUE(result == MOJO_RESULT_OK ||
600 result == MOJO_RESULT_ALREADY_EXISTS)
601 << "result: " << result;
602 if (result == MOJO_RESULT_OK) {
603 // Actually need to wait.
604 EXPECT_EQ(MOJO_RESULT_OK, w.Wait(MOJO_DEADLINE_INDEFINITE, NULL));
605 read_dispatcher_->RemoveWaiter(&w, &hss);
607 // We may not actually be readable, since we're racing with other threads.
608 EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));
610 // Now, try to do the read.
611 // Clear the buffer so that we can check the result.
612 memset(buffer, 0, sizeof(buffer));
613 uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
614 result = read_dispatcher_->ReadMessage(UserPointer<void>(buffer),
615 MakeUserPointer(&buffer_size),
617 NULL,
618 MOJO_READ_MESSAGE_FLAG_NONE);
619 EXPECT_TRUE(result == MOJO_RESULT_OK || result == MOJO_RESULT_SHOULD_WAIT)
620 << "result: " << result;
621 // We're racing with others to read, so maybe we failed.
622 if (result == MOJO_RESULT_SHOULD_WAIT)
623 continue; // In which case, try again.
624 // Check for quit.
625 if (buffer_size == 4 && memcmp("quit", buffer, 4) == 0)
626 return;
627 EXPECT_GE(buffer_size, 1u);
628 EXPECT_LE(buffer_size, kMaxMessageSize);
629 EXPECT_TRUE(IsValidMessage(buffer, buffer_size));
631 (*messages_read_)++;
632 *bytes_read_ += buffer_size;
636 static bool IsValidMessage(const unsigned char* buffer,
637 uint32_t message_size) {
638 size_t i;
639 for (i = 0; i < message_size; i++) {
640 if (buffer[i] != static_cast<unsigned char>(i))
641 return false;
643 // Check that the remaining bytes weren't stomped on.
644 for (; i < kMaxMessageSize; i++) {
645 if (buffer[i] != 0)
646 return false;
648 return true;
651 const scoped_refptr<Dispatcher> read_dispatcher_;
652 size_t* const messages_read_;
653 size_t* const bytes_read_;
655 DISALLOW_COPY_AND_ASSIGN(ReaderThread);
658 TEST(MessagePipeDispatcherTest, Stress) {
659 static const size_t kNumWriters = 30;
660 static const size_t kNumReaders = kNumWriters;
662 scoped_refptr<MessagePipeDispatcher> d_write(
663 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
664 scoped_refptr<MessagePipeDispatcher> d_read(
665 new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
667 scoped_refptr<MessagePipe> mp(MessagePipe::CreateLocalLocal());
668 d_write->Init(mp, 0);
669 d_read->Init(mp, 1);
672 size_t messages_written[kNumWriters];
673 size_t bytes_written[kNumWriters];
674 size_t messages_read[kNumReaders];
675 size_t bytes_read[kNumReaders];
677 // Make writers.
678 ScopedVector<WriterThread> writers;
679 for (size_t i = 0; i < kNumWriters; i++) {
680 writers.push_back(
681 new WriterThread(d_write, &messages_written[i], &bytes_written[i]));
684 // Make readers.
685 ScopedVector<ReaderThread> readers;
686 for (size_t i = 0; i < kNumReaders; i++) {
687 readers.push_back(
688 new ReaderThread(d_read, &messages_read[i], &bytes_read[i]));
691 // Start writers.
692 for (size_t i = 0; i < kNumWriters; i++)
693 writers[i]->Start();
695 // Start readers.
696 for (size_t i = 0; i < kNumReaders; i++)
697 readers[i]->Start();
699 // TODO(vtl): Maybe I should have an event that triggers all the threads to
700 // start doing stuff for real (so that the first ones created/started aren't
701 // advantaged).
702 } // Joins all the threads.
704 size_t total_messages_written = 0;
705 size_t total_bytes_written = 0;
706 for (size_t i = 0; i < kNumWriters; i++) {
707 total_messages_written += messages_written[i];
708 total_bytes_written += bytes_written[i];
710 size_t total_messages_read = 0;
711 size_t total_bytes_read = 0;
712 for (size_t i = 0; i < kNumReaders; i++) {
713 total_messages_read += messages_read[i];
714 total_bytes_read += bytes_read[i];
715 // We'd have to be really unlucky to have read no messages on a thread.
716 EXPECT_GT(messages_read[i], 0u) << "reader: " << i;
717 EXPECT_GE(bytes_read[i], messages_read[i]) << "reader: " << i;
719 EXPECT_EQ(total_messages_written, total_messages_read);
720 EXPECT_EQ(total_bytes_written, total_bytes_read);
722 EXPECT_EQ(MOJO_RESULT_OK, d_write->Close());
723 EXPECT_EQ(MOJO_RESULT_OK, d_read->Close());
726 } // namespace
727 } // namespace system
728 } // namespace mojo