1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
10 #include "base/bind_helpers.h"
11 #include "base/compiler_specific.h"
12 #include "jingle/glue/thread_wrapper.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/test_completion_callback.h"
16 #include "net/udp/udp_socket.h"
17 #include "testing/gmock/include/gmock/gmock.h"
18 #include "testing/gtest/include/gtest/gtest.h"
21 namespace jingle_glue
{
25 } // namespace jingle_glue
27 namespace jingle_glue
{
31 const int kMessageSize
= 1024;
32 const int kMessages
= 100;
33 const int kTestDataSize
= kMessages
* kMessageSize
;
37 virtual ~RateLimiter() { };
38 // Returns true if the new packet needs to be dropped, false otherwise.
39 virtual bool DropNextPacket() = 0;
42 class LeakyBucket
: public RateLimiter
{
44 // |rate| is in drops per second.
45 LeakyBucket(double volume
, double rate
)
49 last_update_(base::TimeTicks::HighResNow()) {
52 virtual ~LeakyBucket() { }
54 virtual bool DropNextPacket() OVERRIDE
{
55 base::TimeTicks now
= base::TimeTicks::HighResNow();
56 double interval
= (now
- last_update_
).InSecondsF();
58 level_
= level_
+ 1.0 - interval
* rate_
;
59 if (level_
> volume_
) {
62 } else if (level_
< 0.0) {
72 base::TimeTicks last_update_
;
75 class FakeSocket
: public net::Socket
{
78 : rate_limiter_(NULL
),
81 virtual ~FakeSocket() { }
83 void AppendInputPacket(const std::vector
<char>& data
) {
84 if (rate_limiter_
&& rate_limiter_
->DropNextPacket())
85 return; // Lose the packet.
87 if (!read_callback_
.is_null()) {
88 int size
= std::min(read_buffer_size_
, static_cast<int>(data
.size()));
89 memcpy(read_buffer_
->data(), &data
[0], data
.size());
90 net::CompletionCallback cb
= read_callback_
;
91 read_callback_
.Reset();
95 incoming_packets_
.push_back(data
);
99 void Connect(FakeSocket
* peer_socket
) {
100 peer_socket_
= peer_socket
;
103 void set_rate_limiter(RateLimiter
* rate_limiter
) {
104 rate_limiter_
= rate_limiter
;
107 void set_latency(int latency_ms
) { latency_ms_
= latency_ms
; };
109 // net::Socket interface.
110 virtual int Read(net::IOBuffer
* buf
, int buf_len
,
111 const net::CompletionCallback
& callback
) OVERRIDE
{
112 CHECK(read_callback_
.is_null());
115 if (incoming_packets_
.size() > 0) {
116 scoped_refptr
<net::IOBuffer
> buffer(buf
);
118 static_cast<int>(incoming_packets_
.front().size()), buf_len
);
119 memcpy(buffer
->data(), &*incoming_packets_
.front().begin(), size
);
120 incoming_packets_
.pop_front();
123 read_callback_
= callback
;
125 read_buffer_size_
= buf_len
;
126 return net::ERR_IO_PENDING
;
130 virtual int Write(net::IOBuffer
* buf
, int buf_len
,
131 const net::CompletionCallback
& callback
) OVERRIDE
{
134 base::MessageLoop::current()->PostDelayedTask(
136 base::Bind(&FakeSocket::AppendInputPacket
,
137 base::Unretained(peer_socket_
),
138 std::vector
<char>(buf
->data(), buf
->data() + buf_len
)),
139 base::TimeDelta::FromMilliseconds(latency_ms_
));
145 virtual bool SetReceiveBufferSize(int32 size
) OVERRIDE
{
149 virtual bool SetSendBufferSize(int32 size
) OVERRIDE
{
155 scoped_refptr
<net::IOBuffer
> read_buffer_
;
156 int read_buffer_size_
;
157 net::CompletionCallback read_callback_
;
159 std::deque
<std::vector
<char> > incoming_packets_
;
161 FakeSocket
* peer_socket_
;
162 RateLimiter
* rate_limiter_
;
166 class TCPChannelTester
: public base::RefCountedThreadSafe
<TCPChannelTester
> {
168 TCPChannelTester(base::MessageLoop
* message_loop
,
169 net::Socket
* client_socket
,
170 net::Socket
* host_socket
)
171 : message_loop_(message_loop
),
172 host_socket_(host_socket
),
173 client_socket_(client_socket
),
179 message_loop_
->PostTask(
180 FROM_HERE
, base::Bind(&TCPChannelTester::DoStart
, this));
183 void CheckResults() {
184 EXPECT_EQ(0, write_errors_
);
185 EXPECT_EQ(0, read_errors_
);
187 ASSERT_EQ(kTestDataSize
+ kMessageSize
, input_buffer_
->capacity());
189 output_buffer_
->SetOffset(0);
190 ASSERT_EQ(kTestDataSize
, output_buffer_
->size());
192 EXPECT_EQ(0, memcmp(output_buffer_
->data(),
193 input_buffer_
->StartOfBuffer(), kTestDataSize
));
197 virtual ~TCPChannelTester() {}
201 message_loop_
->PostTask(FROM_HERE
, base::MessageLoop::QuitClosure());
211 output_buffer_
= new net::DrainableIOBuffer(
212 new net::IOBuffer(kTestDataSize
), kTestDataSize
);
213 memset(output_buffer_
->data(), 123, kTestDataSize
);
215 input_buffer_
= new net::GrowableIOBuffer();
216 // Always keep kMessageSize bytes available at the end of the input buffer.
217 input_buffer_
->SetCapacity(kMessageSize
);
223 if (output_buffer_
->BytesRemaining() == 0)
226 int bytes_to_write
= std::min(output_buffer_
->BytesRemaining(),
228 result
= client_socket_
->Write(
229 output_buffer_
.get(),
231 base::Bind(&TCPChannelTester::OnWritten
, base::Unretained(this)));
232 HandleWriteResult(result
);
236 void OnWritten(int result
) {
237 HandleWriteResult(result
);
241 void HandleWriteResult(int result
) {
242 if (result
<= 0 && result
!= net::ERR_IO_PENDING
) {
243 LOG(ERROR
) << "Received error " << result
<< " when trying to write";
246 } else if (result
> 0) {
247 output_buffer_
->DidConsume(result
);
254 input_buffer_
->set_offset(input_buffer_
->capacity() - kMessageSize
);
256 result
= host_socket_
->Read(
259 base::Bind(&TCPChannelTester::OnRead
, base::Unretained(this)));
260 HandleReadResult(result
);
264 void OnRead(int result
) {
265 HandleReadResult(result
);
269 void HandleReadResult(int result
) {
270 if (result
<= 0 && result
!= net::ERR_IO_PENDING
) {
272 LOG(ERROR
) << "Received error " << result
<< " when trying to read";
276 } else if (result
> 0) {
277 // Allocate memory for the next read.
278 input_buffer_
->SetCapacity(input_buffer_
->capacity() + result
);
279 if (input_buffer_
->capacity() == kTestDataSize
+ kMessageSize
)
285 friend class base::RefCountedThreadSafe
<TCPChannelTester
>;
287 base::MessageLoop
* message_loop_
;
288 net::Socket
* host_socket_
;
289 net::Socket
* client_socket_
;
292 scoped_refptr
<net::DrainableIOBuffer
> output_buffer_
;
293 scoped_refptr
<net::GrowableIOBuffer
> input_buffer_
;
299 class PseudoTcpAdapterTest
: public testing::Test
{
301 virtual void SetUp() OVERRIDE
{
302 JingleThreadWrapper::EnsureForCurrentMessageLoop();
304 host_socket_
= new FakeSocket();
305 client_socket_
= new FakeSocket();
307 host_socket_
->Connect(client_socket_
);
308 client_socket_
->Connect(host_socket_
);
310 host_pseudotcp_
.reset(new PseudoTcpAdapter(host_socket_
));
311 client_pseudotcp_
.reset(new PseudoTcpAdapter(client_socket_
));
314 FakeSocket
* host_socket_
;
315 FakeSocket
* client_socket_
;
317 scoped_ptr
<PseudoTcpAdapter
> host_pseudotcp_
;
318 scoped_ptr
<PseudoTcpAdapter
> client_pseudotcp_
;
319 base::MessageLoop message_loop_
;
322 TEST_F(PseudoTcpAdapterTest
, DataTransfer
) {
323 net::TestCompletionCallback host_connect_cb
;
324 net::TestCompletionCallback client_connect_cb
;
326 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
327 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
329 if (rv1
== net::ERR_IO_PENDING
)
330 rv1
= host_connect_cb
.WaitForResult();
331 if (rv2
== net::ERR_IO_PENDING
)
332 rv2
= client_connect_cb
.WaitForResult();
333 ASSERT_EQ(net::OK
, rv1
);
334 ASSERT_EQ(net::OK
, rv2
);
336 scoped_refptr
<TCPChannelTester
> tester
=
337 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
338 client_pseudotcp_
.get());
342 tester
->CheckResults();
345 TEST_F(PseudoTcpAdapterTest
, LimitedChannel
) {
346 const int kLatencyMs
= 20;
347 const int kPacketsPerSecond
= 400;
348 const int kBurstPackets
= 10;
350 LeakyBucket
host_limiter(kBurstPackets
, kPacketsPerSecond
);
351 host_socket_
->set_latency(kLatencyMs
);
352 host_socket_
->set_rate_limiter(&host_limiter
);
354 LeakyBucket
client_limiter(kBurstPackets
, kPacketsPerSecond
);
355 host_socket_
->set_latency(kLatencyMs
);
356 client_socket_
->set_rate_limiter(&client_limiter
);
358 net::TestCompletionCallback host_connect_cb
;
359 net::TestCompletionCallback client_connect_cb
;
361 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
362 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
364 if (rv1
== net::ERR_IO_PENDING
)
365 rv1
= host_connect_cb
.WaitForResult();
366 if (rv2
== net::ERR_IO_PENDING
)
367 rv2
= client_connect_cb
.WaitForResult();
368 ASSERT_EQ(net::OK
, rv1
);
369 ASSERT_EQ(net::OK
, rv2
);
371 scoped_refptr
<TCPChannelTester
> tester
=
372 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
373 client_pseudotcp_
.get());
377 tester
->CheckResults();
380 class DeleteOnConnected
{
382 DeleteOnConnected(base::MessageLoop
* message_loop
,
383 scoped_ptr
<PseudoTcpAdapter
>* adapter
)
384 : message_loop_(message_loop
), adapter_(adapter
) {}
385 void OnConnected(int error
) {
387 message_loop_
->PostTask(FROM_HERE
, base::MessageLoop::QuitClosure());
389 base::MessageLoop
* message_loop_
;
390 scoped_ptr
<PseudoTcpAdapter
>* adapter_
;
393 TEST_F(PseudoTcpAdapterTest
, DeleteOnConnected
) {
394 // This test verifies that deleting the adapter mid-callback doesn't lead
395 // to deleted structures being touched as the stack unrolls, so the failure
396 // mode is a crash rather than a normal test failure.
397 net::TestCompletionCallback client_connect_cb
;
398 DeleteOnConnected
host_delete(&message_loop_
, &host_pseudotcp_
);
400 host_pseudotcp_
->Connect(base::Bind(&DeleteOnConnected::OnConnected
,
401 base::Unretained(&host_delete
)));
402 client_pseudotcp_
->Connect(client_connect_cb
.callback());
405 ASSERT_EQ(NULL
, host_pseudotcp_
.get());
408 // Verify that we can send/receive data with the write-waits-for-send
410 TEST_F(PseudoTcpAdapterTest
, WriteWaitsForSendLetsDataThrough
) {
411 net::TestCompletionCallback host_connect_cb
;
412 net::TestCompletionCallback client_connect_cb
;
414 host_pseudotcp_
->SetWriteWaitsForSend(true);
415 client_pseudotcp_
->SetWriteWaitsForSend(true);
417 // Disable Nagle's algorithm because the test is slow when it is
419 host_pseudotcp_
->SetNoDelay(true);
421 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
422 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
424 if (rv1
== net::ERR_IO_PENDING
)
425 rv1
= host_connect_cb
.WaitForResult();
426 if (rv2
== net::ERR_IO_PENDING
)
427 rv2
= client_connect_cb
.WaitForResult();
428 ASSERT_EQ(net::OK
, rv1
);
429 ASSERT_EQ(net::OK
, rv2
);
431 scoped_refptr
<TCPChannelTester
> tester
=
432 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
433 client_pseudotcp_
.get());
437 tester
->CheckResults();
442 } // namespace jingle_glue