IDB: Make readonly transactions wait for earlier readwrite transactions
[chromium-blink-merge.git] / jingle / glue / pseudotcp_adapter_unittest.cc
blob1b2674f6789bd4cce6067edeff2d01798ddca389
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
7 #include <vector>
9 #include "base/bind.h"
10 #include "base/bind_helpers.h"
11 #include "base/compiler_specific.h"
12 #include "jingle/glue/thread_wrapper.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/test_completion_callback.h"
16 #include "net/udp/udp_socket.h"
17 #include "testing/gmock/include/gmock/gmock.h"
18 #include "testing/gtest/include/gtest/gtest.h"
21 namespace jingle_glue {
22 namespace {
23 class FakeSocket;
24 } // namespace
25 } // namespace jingle_glue
27 namespace jingle_glue {
29 namespace {
31 const int kMessageSize = 1024;
32 const int kMessages = 100;
33 const int kTestDataSize = kMessages * kMessageSize;
35 class RateLimiter {
36 public:
37 virtual ~RateLimiter() { };
38 // Returns true if the new packet needs to be dropped, false otherwise.
39 virtual bool DropNextPacket() = 0;
42 class LeakyBucket : public RateLimiter {
43 public:
44 // |rate| is in drops per second.
45 LeakyBucket(double volume, double rate)
46 : volume_(volume),
47 rate_(rate),
48 level_(0.0),
49 last_update_(base::TimeTicks::HighResNow()) {
52 ~LeakyBucket() override {}
54 bool DropNextPacket() override {
55 base::TimeTicks now = base::TimeTicks::HighResNow();
56 double interval = (now - last_update_).InSecondsF();
57 last_update_ = now;
58 level_ = level_ + 1.0 - interval * rate_;
59 if (level_ > volume_) {
60 level_ = volume_;
61 return true;
62 } else if (level_ < 0.0) {
63 level_ = 0.0;
65 return false;
68 private:
69 double volume_;
70 double rate_;
71 double level_;
72 base::TimeTicks last_update_;
75 class FakeSocket : public net::Socket {
76 public:
77 FakeSocket()
78 : rate_limiter_(NULL),
79 latency_ms_(0) {
81 ~FakeSocket() override {}
83 void AppendInputPacket(const std::vector<char>& data) {
84 if (rate_limiter_ && rate_limiter_->DropNextPacket())
85 return; // Lose the packet.
87 if (!read_callback_.is_null()) {
88 int size = std::min(read_buffer_size_, static_cast<int>(data.size()));
89 memcpy(read_buffer_->data(), &data[0], data.size());
90 net::CompletionCallback cb = read_callback_;
91 read_callback_.Reset();
92 read_buffer_ = NULL;
93 cb.Run(size);
94 } else {
95 incoming_packets_.push_back(data);
99 void Connect(FakeSocket* peer_socket) {
100 peer_socket_ = peer_socket;
103 void set_rate_limiter(RateLimiter* rate_limiter) {
104 rate_limiter_ = rate_limiter;
107 void set_latency(int latency_ms) { latency_ms_ = latency_ms; };
109 // net::Socket interface.
110 int Read(net::IOBuffer* buf,
111 int buf_len,
112 const net::CompletionCallback& callback) override {
113 CHECK(read_callback_.is_null());
114 CHECK(buf);
116 if (incoming_packets_.size() > 0) {
117 scoped_refptr<net::IOBuffer> buffer(buf);
118 int size = std::min(
119 static_cast<int>(incoming_packets_.front().size()), buf_len);
120 memcpy(buffer->data(), &*incoming_packets_.front().begin(), size);
121 incoming_packets_.pop_front();
122 return size;
123 } else {
124 read_callback_ = callback;
125 read_buffer_ = buf;
126 read_buffer_size_ = buf_len;
127 return net::ERR_IO_PENDING;
131 int Write(net::IOBuffer* buf,
132 int buf_len,
133 const net::CompletionCallback& callback) override {
134 DCHECK(buf);
135 if (peer_socket_) {
136 base::MessageLoop::current()->PostDelayedTask(
137 FROM_HERE,
138 base::Bind(&FakeSocket::AppendInputPacket,
139 base::Unretained(peer_socket_),
140 std::vector<char>(buf->data(), buf->data() + buf_len)),
141 base::TimeDelta::FromMilliseconds(latency_ms_));
144 return buf_len;
147 int SetReceiveBufferSize(int32 size) override {
148 NOTIMPLEMENTED();
149 return net::ERR_NOT_IMPLEMENTED;
151 int SetSendBufferSize(int32 size) override {
152 NOTIMPLEMENTED();
153 return net::ERR_NOT_IMPLEMENTED;
156 private:
157 scoped_refptr<net::IOBuffer> read_buffer_;
158 int read_buffer_size_;
159 net::CompletionCallback read_callback_;
161 std::deque<std::vector<char> > incoming_packets_;
163 FakeSocket* peer_socket_;
164 RateLimiter* rate_limiter_;
165 int latency_ms_;
168 class TCPChannelTester : public base::RefCountedThreadSafe<TCPChannelTester> {
169 public:
170 TCPChannelTester(base::MessageLoop* message_loop,
171 net::Socket* client_socket,
172 net::Socket* host_socket)
173 : message_loop_(message_loop),
174 host_socket_(host_socket),
175 client_socket_(client_socket),
176 done_(false),
177 write_errors_(0),
178 read_errors_(0) {}
180 void Start() {
181 message_loop_->PostTask(
182 FROM_HERE, base::Bind(&TCPChannelTester::DoStart, this));
185 void CheckResults() {
186 EXPECT_EQ(0, write_errors_);
187 EXPECT_EQ(0, read_errors_);
189 ASSERT_EQ(kTestDataSize + kMessageSize, input_buffer_->capacity());
191 output_buffer_->SetOffset(0);
192 ASSERT_EQ(kTestDataSize, output_buffer_->size());
194 EXPECT_EQ(0, memcmp(output_buffer_->data(),
195 input_buffer_->StartOfBuffer(), kTestDataSize));
198 protected:
199 virtual ~TCPChannelTester() {}
201 void Done() {
202 done_ = true;
203 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure());
206 void DoStart() {
207 InitBuffers();
208 DoRead();
209 DoWrite();
212 void InitBuffers() {
213 output_buffer_ = new net::DrainableIOBuffer(
214 new net::IOBuffer(kTestDataSize), kTestDataSize);
215 memset(output_buffer_->data(), 123, kTestDataSize);
217 input_buffer_ = new net::GrowableIOBuffer();
218 // Always keep kMessageSize bytes available at the end of the input buffer.
219 input_buffer_->SetCapacity(kMessageSize);
222 void DoWrite() {
223 int result = 1;
224 while (result > 0) {
225 if (output_buffer_->BytesRemaining() == 0)
226 break;
228 int bytes_to_write = std::min(output_buffer_->BytesRemaining(),
229 kMessageSize);
230 result = client_socket_->Write(
231 output_buffer_.get(),
232 bytes_to_write,
233 base::Bind(&TCPChannelTester::OnWritten, base::Unretained(this)));
234 HandleWriteResult(result);
238 void OnWritten(int result) {
239 HandleWriteResult(result);
240 DoWrite();
243 void HandleWriteResult(int result) {
244 if (result <= 0 && result != net::ERR_IO_PENDING) {
245 LOG(ERROR) << "Received error " << result << " when trying to write";
246 write_errors_++;
247 Done();
248 } else if (result > 0) {
249 output_buffer_->DidConsume(result);
253 void DoRead() {
254 int result = 1;
255 while (result > 0) {
256 input_buffer_->set_offset(input_buffer_->capacity() - kMessageSize);
258 result = host_socket_->Read(
259 input_buffer_.get(),
260 kMessageSize,
261 base::Bind(&TCPChannelTester::OnRead, base::Unretained(this)));
262 HandleReadResult(result);
266 void OnRead(int result) {
267 HandleReadResult(result);
268 DoRead();
271 void HandleReadResult(int result) {
272 if (result <= 0 && result != net::ERR_IO_PENDING) {
273 if (!done_) {
274 LOG(ERROR) << "Received error " << result << " when trying to read";
275 read_errors_++;
276 Done();
278 } else if (result > 0) {
279 // Allocate memory for the next read.
280 input_buffer_->SetCapacity(input_buffer_->capacity() + result);
281 if (input_buffer_->capacity() == kTestDataSize + kMessageSize)
282 Done();
286 private:
287 friend class base::RefCountedThreadSafe<TCPChannelTester>;
289 base::MessageLoop* message_loop_;
290 net::Socket* host_socket_;
291 net::Socket* client_socket_;
292 bool done_;
294 scoped_refptr<net::DrainableIOBuffer> output_buffer_;
295 scoped_refptr<net::GrowableIOBuffer> input_buffer_;
297 int write_errors_;
298 int read_errors_;
301 class PseudoTcpAdapterTest : public testing::Test {
302 protected:
303 virtual void SetUp() override {
304 JingleThreadWrapper::EnsureForCurrentMessageLoop();
306 host_socket_ = new FakeSocket();
307 client_socket_ = new FakeSocket();
309 host_socket_->Connect(client_socket_);
310 client_socket_->Connect(host_socket_);
312 host_pseudotcp_.reset(new PseudoTcpAdapter(host_socket_));
313 client_pseudotcp_.reset(new PseudoTcpAdapter(client_socket_));
316 FakeSocket* host_socket_;
317 FakeSocket* client_socket_;
319 scoped_ptr<PseudoTcpAdapter> host_pseudotcp_;
320 scoped_ptr<PseudoTcpAdapter> client_pseudotcp_;
321 base::MessageLoop message_loop_;
324 TEST_F(PseudoTcpAdapterTest, DataTransfer) {
325 net::TestCompletionCallback host_connect_cb;
326 net::TestCompletionCallback client_connect_cb;
328 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback());
329 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback());
331 if (rv1 == net::ERR_IO_PENDING)
332 rv1 = host_connect_cb.WaitForResult();
333 if (rv2 == net::ERR_IO_PENDING)
334 rv2 = client_connect_cb.WaitForResult();
335 ASSERT_EQ(net::OK, rv1);
336 ASSERT_EQ(net::OK, rv2);
338 scoped_refptr<TCPChannelTester> tester =
339 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(),
340 client_pseudotcp_.get());
342 tester->Start();
343 message_loop_.Run();
344 tester->CheckResults();
347 TEST_F(PseudoTcpAdapterTest, LimitedChannel) {
348 const int kLatencyMs = 20;
349 const int kPacketsPerSecond = 400;
350 const int kBurstPackets = 10;
352 LeakyBucket host_limiter(kBurstPackets, kPacketsPerSecond);
353 host_socket_->set_latency(kLatencyMs);
354 host_socket_->set_rate_limiter(&host_limiter);
356 LeakyBucket client_limiter(kBurstPackets, kPacketsPerSecond);
357 host_socket_->set_latency(kLatencyMs);
358 client_socket_->set_rate_limiter(&client_limiter);
360 net::TestCompletionCallback host_connect_cb;
361 net::TestCompletionCallback client_connect_cb;
363 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback());
364 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback());
366 if (rv1 == net::ERR_IO_PENDING)
367 rv1 = host_connect_cb.WaitForResult();
368 if (rv2 == net::ERR_IO_PENDING)
369 rv2 = client_connect_cb.WaitForResult();
370 ASSERT_EQ(net::OK, rv1);
371 ASSERT_EQ(net::OK, rv2);
373 scoped_refptr<TCPChannelTester> tester =
374 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(),
375 client_pseudotcp_.get());
377 tester->Start();
378 message_loop_.Run();
379 tester->CheckResults();
382 class DeleteOnConnected {
383 public:
384 DeleteOnConnected(base::MessageLoop* message_loop,
385 scoped_ptr<PseudoTcpAdapter>* adapter)
386 : message_loop_(message_loop), adapter_(adapter) {}
387 void OnConnected(int error) {
388 adapter_->reset();
389 message_loop_->PostTask(FROM_HERE, base::MessageLoop::QuitClosure());
391 base::MessageLoop* message_loop_;
392 scoped_ptr<PseudoTcpAdapter>* adapter_;
395 TEST_F(PseudoTcpAdapterTest, DeleteOnConnected) {
396 // This test verifies that deleting the adapter mid-callback doesn't lead
397 // to deleted structures being touched as the stack unrolls, so the failure
398 // mode is a crash rather than a normal test failure.
399 net::TestCompletionCallback client_connect_cb;
400 DeleteOnConnected host_delete(&message_loop_, &host_pseudotcp_);
402 host_pseudotcp_->Connect(base::Bind(&DeleteOnConnected::OnConnected,
403 base::Unretained(&host_delete)));
404 client_pseudotcp_->Connect(client_connect_cb.callback());
405 message_loop_.Run();
407 ASSERT_EQ(NULL, host_pseudotcp_.get());
410 // Verify that we can send/receive data with the write-waits-for-send
411 // flag set.
412 TEST_F(PseudoTcpAdapterTest, WriteWaitsForSendLetsDataThrough) {
413 net::TestCompletionCallback host_connect_cb;
414 net::TestCompletionCallback client_connect_cb;
416 host_pseudotcp_->SetWriteWaitsForSend(true);
417 client_pseudotcp_->SetWriteWaitsForSend(true);
419 // Disable Nagle's algorithm because the test is slow when it is
420 // enabled.
421 host_pseudotcp_->SetNoDelay(true);
423 int rv1 = host_pseudotcp_->Connect(host_connect_cb.callback());
424 int rv2 = client_pseudotcp_->Connect(client_connect_cb.callback());
426 if (rv1 == net::ERR_IO_PENDING)
427 rv1 = host_connect_cb.WaitForResult();
428 if (rv2 == net::ERR_IO_PENDING)
429 rv2 = client_connect_cb.WaitForResult();
430 ASSERT_EQ(net::OK, rv1);
431 ASSERT_EQ(net::OK, rv2);
433 scoped_refptr<TCPChannelTester> tester =
434 new TCPChannelTester(&message_loop_, host_pseudotcp_.get(),
435 client_pseudotcp_.get());
437 tester->Start();
438 message_loop_.Run();
439 tester->CheckResults();
442 } // namespace
444 } // namespace jingle_glue