1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
10 #include "base/bind_helpers.h"
11 #include "base/compiler_specific.h"
12 #include "jingle/glue/thread_wrapper.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/test_completion_callback.h"
16 #include "net/udp/udp_socket.h"
17 #include "testing/gmock/include/gmock/gmock.h"
18 #include "testing/gtest/include/gtest/gtest.h"
21 namespace jingle_glue
{
25 } // namespace jingle_glue
27 namespace jingle_glue
{
31 const int kMessageSize
= 1024;
32 const int kMessages
= 100;
33 const int kTestDataSize
= kMessages
* kMessageSize
;
37 virtual ~RateLimiter() { };
38 // Returns true if the new packet needs to be dropped, false otherwise.
39 virtual bool DropNextPacket() = 0;
42 class LeakyBucket
: public RateLimiter
{
44 // |rate| is in drops per second.
45 LeakyBucket(double volume
, double rate
)
49 last_update_(base::TimeTicks::Now()) {
52 ~LeakyBucket() override
{}
54 bool DropNextPacket() override
{
55 base::TimeTicks now
= base::TimeTicks::Now();
56 double interval
= (now
- last_update_
).InSecondsF();
58 level_
= level_
+ 1.0 - interval
* rate_
;
59 if (level_
> volume_
) {
62 } else if (level_
< 0.0) {
72 base::TimeTicks last_update_
;
75 class FakeSocket
: public net::Socket
{
78 : rate_limiter_(NULL
),
81 ~FakeSocket() override
{}
83 void AppendInputPacket(const std::vector
<char>& data
) {
84 if (rate_limiter_
&& rate_limiter_
->DropNextPacket())
85 return; // Lose the packet.
87 if (!read_callback_
.is_null()) {
88 int size
= std::min(read_buffer_size_
, static_cast<int>(data
.size()));
89 memcpy(read_buffer_
->data(), &data
[0], data
.size());
90 net::CompletionCallback cb
= read_callback_
;
91 read_callback_
.Reset();
95 incoming_packets_
.push_back(data
);
99 void Connect(FakeSocket
* peer_socket
) {
100 peer_socket_
= peer_socket
;
103 void set_rate_limiter(RateLimiter
* rate_limiter
) {
104 rate_limiter_
= rate_limiter
;
107 void set_latency(int latency_ms
) { latency_ms_
= latency_ms
; };
109 // net::Socket interface.
110 int Read(net::IOBuffer
* buf
,
112 const net::CompletionCallback
& callback
) override
{
113 CHECK(read_callback_
.is_null());
116 if (incoming_packets_
.size() > 0) {
117 scoped_refptr
<net::IOBuffer
> buffer(buf
);
119 static_cast<int>(incoming_packets_
.front().size()), buf_len
);
120 memcpy(buffer
->data(), &*incoming_packets_
.front().begin(), size
);
121 incoming_packets_
.pop_front();
124 read_callback_
= callback
;
126 read_buffer_size_
= buf_len
;
127 return net::ERR_IO_PENDING
;
131 int Write(net::IOBuffer
* buf
,
133 const net::CompletionCallback
& callback
) override
{
136 base::MessageLoop::current()->PostDelayedTask(
138 base::Bind(&FakeSocket::AppendInputPacket
,
139 base::Unretained(peer_socket_
),
140 std::vector
<char>(buf
->data(), buf
->data() + buf_len
)),
141 base::TimeDelta::FromMilliseconds(latency_ms_
));
147 int SetReceiveBufferSize(int32 size
) override
{
149 return net::ERR_NOT_IMPLEMENTED
;
151 int SetSendBufferSize(int32 size
) override
{
153 return net::ERR_NOT_IMPLEMENTED
;
157 scoped_refptr
<net::IOBuffer
> read_buffer_
;
158 int read_buffer_size_
;
159 net::CompletionCallback read_callback_
;
161 std::deque
<std::vector
<char> > incoming_packets_
;
163 FakeSocket
* peer_socket_
;
164 RateLimiter
* rate_limiter_
;
168 class TCPChannelTester
: public base::RefCountedThreadSafe
<TCPChannelTester
> {
170 TCPChannelTester(base::MessageLoop
* message_loop
,
171 net::Socket
* client_socket
,
172 net::Socket
* host_socket
)
173 : message_loop_(message_loop
),
174 host_socket_(host_socket
),
175 client_socket_(client_socket
),
181 message_loop_
->PostTask(
182 FROM_HERE
, base::Bind(&TCPChannelTester::DoStart
, this));
185 void CheckResults() {
186 EXPECT_EQ(0, write_errors_
);
187 EXPECT_EQ(0, read_errors_
);
189 ASSERT_EQ(kTestDataSize
+ kMessageSize
, input_buffer_
->capacity());
191 output_buffer_
->SetOffset(0);
192 ASSERT_EQ(kTestDataSize
, output_buffer_
->size());
194 EXPECT_EQ(0, memcmp(output_buffer_
->data(),
195 input_buffer_
->StartOfBuffer(), kTestDataSize
));
199 virtual ~TCPChannelTester() {}
203 message_loop_
->PostTask(FROM_HERE
, base::MessageLoop::QuitClosure());
213 output_buffer_
= new net::DrainableIOBuffer(
214 new net::IOBuffer(kTestDataSize
), kTestDataSize
);
215 memset(output_buffer_
->data(), 123, kTestDataSize
);
217 input_buffer_
= new net::GrowableIOBuffer();
218 // Always keep kMessageSize bytes available at the end of the input buffer.
219 input_buffer_
->SetCapacity(kMessageSize
);
225 if (output_buffer_
->BytesRemaining() == 0)
228 int bytes_to_write
= std::min(output_buffer_
->BytesRemaining(),
230 result
= client_socket_
->Write(
231 output_buffer_
.get(),
233 base::Bind(&TCPChannelTester::OnWritten
, base::Unretained(this)));
234 HandleWriteResult(result
);
238 void OnWritten(int result
) {
239 HandleWriteResult(result
);
243 void HandleWriteResult(int result
) {
244 if (result
<= 0 && result
!= net::ERR_IO_PENDING
) {
245 LOG(ERROR
) << "Received error " << result
<< " when trying to write";
248 } else if (result
> 0) {
249 output_buffer_
->DidConsume(result
);
256 input_buffer_
->set_offset(input_buffer_
->capacity() - kMessageSize
);
258 result
= host_socket_
->Read(
261 base::Bind(&TCPChannelTester::OnRead
, base::Unretained(this)));
262 HandleReadResult(result
);
266 void OnRead(int result
) {
267 HandleReadResult(result
);
271 void HandleReadResult(int result
) {
272 if (result
<= 0 && result
!= net::ERR_IO_PENDING
) {
274 LOG(ERROR
) << "Received error " << result
<< " when trying to read";
278 } else if (result
> 0) {
279 // Allocate memory for the next read.
280 input_buffer_
->SetCapacity(input_buffer_
->capacity() + result
);
281 if (input_buffer_
->capacity() == kTestDataSize
+ kMessageSize
)
287 friend class base::RefCountedThreadSafe
<TCPChannelTester
>;
289 base::MessageLoop
* message_loop_
;
290 net::Socket
* host_socket_
;
291 net::Socket
* client_socket_
;
294 scoped_refptr
<net::DrainableIOBuffer
> output_buffer_
;
295 scoped_refptr
<net::GrowableIOBuffer
> input_buffer_
;
301 class PseudoTcpAdapterTest
: public testing::Test
{
303 void SetUp() override
{
304 JingleThreadWrapper::EnsureForCurrentMessageLoop();
306 host_socket_
= new FakeSocket();
307 client_socket_
= new FakeSocket();
309 host_socket_
->Connect(client_socket_
);
310 client_socket_
->Connect(host_socket_
);
312 host_pseudotcp_
.reset(new PseudoTcpAdapter(host_socket_
));
313 client_pseudotcp_
.reset(new PseudoTcpAdapter(client_socket_
));
316 FakeSocket
* host_socket_
;
317 FakeSocket
* client_socket_
;
319 scoped_ptr
<PseudoTcpAdapter
> host_pseudotcp_
;
320 scoped_ptr
<PseudoTcpAdapter
> client_pseudotcp_
;
321 base::MessageLoop message_loop_
;
324 TEST_F(PseudoTcpAdapterTest
, DataTransfer
) {
325 net::TestCompletionCallback host_connect_cb
;
326 net::TestCompletionCallback client_connect_cb
;
328 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
329 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
331 if (rv1
== net::ERR_IO_PENDING
)
332 rv1
= host_connect_cb
.WaitForResult();
333 if (rv2
== net::ERR_IO_PENDING
)
334 rv2
= client_connect_cb
.WaitForResult();
335 ASSERT_EQ(net::OK
, rv1
);
336 ASSERT_EQ(net::OK
, rv2
);
338 scoped_refptr
<TCPChannelTester
> tester
=
339 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
340 client_pseudotcp_
.get());
344 tester
->CheckResults();
347 TEST_F(PseudoTcpAdapterTest
, LimitedChannel
) {
348 const int kLatencyMs
= 20;
349 const int kPacketsPerSecond
= 400;
350 const int kBurstPackets
= 10;
352 LeakyBucket
host_limiter(kBurstPackets
, kPacketsPerSecond
);
353 host_socket_
->set_latency(kLatencyMs
);
354 host_socket_
->set_rate_limiter(&host_limiter
);
356 LeakyBucket
client_limiter(kBurstPackets
, kPacketsPerSecond
);
357 host_socket_
->set_latency(kLatencyMs
);
358 client_socket_
->set_rate_limiter(&client_limiter
);
360 net::TestCompletionCallback host_connect_cb
;
361 net::TestCompletionCallback client_connect_cb
;
363 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
364 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
366 if (rv1
== net::ERR_IO_PENDING
)
367 rv1
= host_connect_cb
.WaitForResult();
368 if (rv2
== net::ERR_IO_PENDING
)
369 rv2
= client_connect_cb
.WaitForResult();
370 ASSERT_EQ(net::OK
, rv1
);
371 ASSERT_EQ(net::OK
, rv2
);
373 scoped_refptr
<TCPChannelTester
> tester
=
374 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
375 client_pseudotcp_
.get());
379 tester
->CheckResults();
382 class DeleteOnConnected
{
384 DeleteOnConnected(base::MessageLoop
* message_loop
,
385 scoped_ptr
<PseudoTcpAdapter
>* adapter
)
386 : message_loop_(message_loop
), adapter_(adapter
) {}
387 void OnConnected(int error
) {
389 message_loop_
->PostTask(FROM_HERE
, base::MessageLoop::QuitClosure());
391 base::MessageLoop
* message_loop_
;
392 scoped_ptr
<PseudoTcpAdapter
>* adapter_
;
395 TEST_F(PseudoTcpAdapterTest
, DeleteOnConnected
) {
396 // This test verifies that deleting the adapter mid-callback doesn't lead
397 // to deleted structures being touched as the stack unrolls, so the failure
398 // mode is a crash rather than a normal test failure.
399 net::TestCompletionCallback client_connect_cb
;
400 DeleteOnConnected
host_delete(&message_loop_
, &host_pseudotcp_
);
402 host_pseudotcp_
->Connect(base::Bind(&DeleteOnConnected::OnConnected
,
403 base::Unretained(&host_delete
)));
404 client_pseudotcp_
->Connect(client_connect_cb
.callback());
407 ASSERT_EQ(NULL
, host_pseudotcp_
.get());
410 // Verify that we can send/receive data with the write-waits-for-send
412 TEST_F(PseudoTcpAdapterTest
, WriteWaitsForSendLetsDataThrough
) {
413 net::TestCompletionCallback host_connect_cb
;
414 net::TestCompletionCallback client_connect_cb
;
416 host_pseudotcp_
->SetWriteWaitsForSend(true);
417 client_pseudotcp_
->SetWriteWaitsForSend(true);
419 // Disable Nagle's algorithm because the test is slow when it is
421 host_pseudotcp_
->SetNoDelay(true);
423 int rv1
= host_pseudotcp_
->Connect(host_connect_cb
.callback());
424 int rv2
= client_pseudotcp_
->Connect(client_connect_cb
.callback());
426 if (rv1
== net::ERR_IO_PENDING
)
427 rv1
= host_connect_cb
.WaitForResult();
428 if (rv2
== net::ERR_IO_PENDING
)
429 rv2
= client_connect_cb
.WaitForResult();
430 ASSERT_EQ(net::OK
, rv1
);
431 ASSERT_EQ(net::OK
, rv2
);
433 scoped_refptr
<TCPChannelTester
> tester
=
434 new TCPChannelTester(&message_loop_
, host_pseudotcp_
.get(),
435 client_pseudotcp_
.get());
439 tester
->CheckResults();
444 } // namespace jingle_glue