1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/test/utility/udp_proxy.h"
7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h"
11 #include "base/threading/thread.h"
12 #include "base/time/default_tick_clock.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/udp/udp_socket.h"
21 const size_t kMaxPacketSize
= 65536;
23 PacketPipe::PacketPipe() {}
24 PacketPipe::~PacketPipe() {}
25 void PacketPipe::InitOnIOThread(
26 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
27 base::TickClock
* clock
) {
28 task_runner_
= task_runner
;
31 pipe_
->InitOnIOThread(task_runner
, clock
);
34 void PacketPipe::AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) {
36 pipe_
->AppendToPipe(pipe
.Pass());
42 // Roughly emulates a buffer inside a device.
43 // If the buffer is full, packets are dropped.
44 // Packets are output at a maximum bandwidth.
45 class Buffer
: public PacketPipe
{
47 Buffer(size_t buffer_size
, double max_megabits_per_second
)
49 max_buffer_size_(buffer_size
),
50 max_megabits_per_second_(max_megabits_per_second
),
51 weak_factory_(this) {}
53 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
54 if (packet
->size() + buffer_size_
<= max_buffer_size_
) {
55 buffer_size_
+= packet
->size();
56 buffer_
.push_back(linked_ptr
<transport::Packet
>(packet
.release()));
57 if (buffer_
.size() == 1) {
65 double megabits
= buffer_
.front()->size() * 8 / 1000000.0;
66 double seconds
= megabits
/ max_megabits_per_second_
;
67 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
68 task_runner_
->PostDelayedTask(
70 base::Bind(&Buffer::ProcessBuffer
, weak_factory_
.GetWeakPtr()),
71 base::TimeDelta::FromMicroseconds(microseconds
));
74 void ProcessBuffer() {
75 CHECK(!buffer_
.empty());
76 scoped_ptr
<transport::Packet
> packet(buffer_
.front().release());
77 buffer_size_
-= packet
->size();
79 pipe_
->Send(packet
.Pass());
80 if (!buffer_
.empty()) {
85 std::deque
<linked_ptr
<transport::Packet
> > buffer_
;
87 size_t max_buffer_size_
;
88 double max_megabits_per_second_
; // megabits per second
89 base::WeakPtrFactory
<Buffer
> weak_factory_
;
92 scoped_ptr
<PacketPipe
> NewBuffer(size_t buffer_size
, double bandwidth
) {
93 return scoped_ptr
<PacketPipe
>(new Buffer(buffer_size
, bandwidth
)).Pass();
96 class RandomDrop
: public PacketPipe
{
98 RandomDrop(double drop_fraction
) : drop_fraction_(drop_fraction
) {
101 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
102 if (base::RandDouble() >= drop_fraction_
) {
103 pipe_
->Send(packet
.Pass());
108 double drop_fraction_
;
111 scoped_ptr
<PacketPipe
> NewRandomDrop(double drop_fraction
) {
112 return scoped_ptr
<PacketPipe
>(new RandomDrop(drop_fraction
)).Pass();
115 class SimpleDelayBase
: public PacketPipe
{
117 SimpleDelayBase() : weak_factory_(this) {}
118 virtual ~SimpleDelayBase() {}
120 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
121 double seconds
= GetDelay();
122 task_runner_
->PostDelayedTask(
124 base::Bind(&SimpleDelayBase::SendInternal
,
125 weak_factory_
.GetWeakPtr(),
126 base::Passed(&packet
)),
127 base::TimeDelta::FromMicroseconds(static_cast<int64
>(seconds
* 1E6
)));
130 virtual double GetDelay() = 0;
133 virtual void SendInternal(scoped_ptr
<transport::Packet
> packet
) {
134 pipe_
->Send(packet
.Pass());
137 base::WeakPtrFactory
<SimpleDelayBase
> weak_factory_
;
140 class ConstantDelay
: public SimpleDelayBase
{
142 ConstantDelay(double delay_seconds
) : delay_seconds_(delay_seconds
) {}
143 virtual double GetDelay() OVERRIDE
{
144 return delay_seconds_
;
148 double delay_seconds_
;
151 scoped_ptr
<PacketPipe
> NewConstantDelay(double delay_seconds
) {
152 return scoped_ptr
<PacketPipe
>(new ConstantDelay(delay_seconds
)).Pass();
155 class RandomUnsortedDelay
: public SimpleDelayBase
{
157 RandomUnsortedDelay(double random_delay
) : random_delay_(random_delay
) {}
159 virtual double GetDelay() OVERRIDE
{
160 return random_delay_
* base::RandDouble();
164 double random_delay_
;
167 scoped_ptr
<PacketPipe
> NewRandomUnsortedDelay(double random_delay
) {
168 return scoped_ptr
<PacketPipe
>(new RandomUnsortedDelay(random_delay
)).Pass();
172 class RandomSortedDelay
: public PacketPipe
{
174 RandomSortedDelay(double random_delay
,
176 double seconds_between_extra_delay
)
177 : random_delay_(random_delay
),
178 extra_delay_(extra_delay
),
179 seconds_between_extra_delay_(seconds_between_extra_delay
),
180 weak_factory_(this) {}
182 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
183 buffer_
.push_back(linked_ptr
<transport::Packet
>(packet
.release()));
184 if (buffer_
.size() == 1) {
188 virtual void InitOnIOThread(
189 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
190 base::TickClock
* clock
) OVERRIDE
{
191 PacketPipe::InitOnIOThread(task_runner
, clock
);
192 // As we start the stream, assume that we are in a random
193 // place between two extra delays, thus multiplier = 1.0;
194 ScheduleExtraDelay(1.0);
198 void ScheduleExtraDelay(double mult
) {
199 double seconds
= seconds_between_extra_delay_
* mult
* base::RandDouble();
200 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
201 task_runner_
->PostDelayedTask(
203 base::Bind(&RandomSortedDelay::CauseExtraDelay
,
204 weak_factory_
.GetWeakPtr()),
205 base::TimeDelta::FromMicroseconds(microseconds
));
208 void CauseExtraDelay() {
209 block_until_
= clock_
->NowTicks() +
210 base::TimeDelta::FromMicroseconds(
211 static_cast<int64
>(extra_delay_
* 1E6
));
212 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
213 // before scheduling another one to make the average equal to
214 // seconds_between_extra_delay_.
215 ScheduleExtraDelay(2.0);
219 double seconds
= base::RandDouble() * random_delay_
;
220 base::TimeDelta block_time
= block_until_
- base::TimeTicks::Now();
221 base::TimeDelta delay_time
=
222 base::TimeDelta::FromMicroseconds(
223 static_cast<int64
>(seconds
* 1E6
));
224 if (block_time
> delay_time
) {
225 block_time
= delay_time
;
228 task_runner_
->PostDelayedTask(FROM_HERE
,
229 base::Bind(&RandomSortedDelay::ProcessBuffer
,
230 weak_factory_
.GetWeakPtr()),
234 void ProcessBuffer() {
235 CHECK(!buffer_
.empty());
236 scoped_ptr
<transport::Packet
> packet(buffer_
.front().release());
237 pipe_
->Send(packet
.Pass());
239 if (!buffer_
.empty()) {
244 base::TimeTicks block_until_
;
245 std::deque
<linked_ptr
<transport::Packet
> > buffer_
;
246 double random_delay_
;
248 double seconds_between_extra_delay_
;
249 base::WeakPtrFactory
<RandomSortedDelay
> weak_factory_
;
252 scoped_ptr
<PacketPipe
> NewRandomSortedDelay(
255 double seconds_between_extra_delay
) {
256 return scoped_ptr
<PacketPipe
>(
257 new RandomSortedDelay(
258 random_delay
, extra_delay
, seconds_between_extra_delay
))
262 class NetworkGlitchPipe
: public PacketPipe
{
264 NetworkGlitchPipe(double average_work_time
, double average_outage_time
)
266 max_work_time_(average_work_time
* 2),
267 max_outage_time_(average_outage_time
* 2),
268 weak_factory_(this) {}
270 virtual void InitOnIOThread(
271 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
272 base::TickClock
* clock
) OVERRIDE
{
273 PacketPipe::InitOnIOThread(task_runner
, clock
);
277 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
279 pipe_
->Send(packet
.Pass());
286 double seconds
= base::RandDouble() *
287 (works_
? max_work_time_
: max_outage_time_
);
288 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
289 task_runner_
->PostDelayedTask(
291 base::Bind(&NetworkGlitchPipe::Flip
, weak_factory_
.GetWeakPtr()),
292 base::TimeDelta::FromMicroseconds(microseconds
));
296 double max_work_time_
;
297 double max_outage_time_
;
298 base::WeakPtrFactory
<NetworkGlitchPipe
> weak_factory_
;
301 scoped_ptr
<PacketPipe
> NewNetworkGlitchPipe(double average_work_time
,
302 double average_outage_time
) {
303 return scoped_ptr
<PacketPipe
>(
304 new NetworkGlitchPipe(average_work_time
, average_outage_time
))
310 class PacketSender
: public PacketPipe
{
312 PacketSender(UDPProxyImpl
* udp_proxy
, const net::IPEndPoint
* destination
)
313 : udp_proxy_(udp_proxy
), destination_(destination
) {}
314 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
;
315 virtual void AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) OVERRIDE
{
320 UDPProxyImpl
* udp_proxy_
;
321 const net::IPEndPoint
* destination_
; // not owned
325 void BuildPipe(scoped_ptr
<PacketPipe
>* pipe
, PacketPipe
* next
) {
327 (*pipe
)->AppendToPipe(scoped_ptr
<PacketPipe
>(next
).Pass());
334 scoped_ptr
<PacketPipe
> WifiNetwork() {
335 // This represents the buffer on the sender.
336 scoped_ptr
<PacketPipe
> pipe
;
337 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
338 BuildPipe(&pipe
, new RandomDrop(0.005));
339 // This represents the buffer on the router.
340 BuildPipe(&pipe
, new ConstantDelay(1E-3));
341 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
342 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
343 BuildPipe(&pipe
, new ConstantDelay(1E-3));
344 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
345 BuildPipe(&pipe
, new RandomDrop(0.005));
346 // This represents the buffer on the receiving device.
347 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
351 scoped_ptr
<PacketPipe
> BadNetwork() {
352 scoped_ptr
<PacketPipe
> pipe
;
353 // This represents the buffer on the sender.
354 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
355 BuildPipe(&pipe
, new RandomDrop(0.05)); // 5% packet drop
356 BuildPipe(&pipe
, new RandomSortedDelay(2E-3, 20E-3, 1));
357 // This represents the buffer on the router.
358 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
359 BuildPipe(&pipe
, new ConstantDelay(1E-3));
360 // Random 40ms every other second
361 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
362 BuildPipe(&pipe
, new RandomUnsortedDelay(5E-3));
363 // This represents the buffer on the receiving device.
364 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
369 scoped_ptr
<PacketPipe
> EvilNetwork() {
370 // This represents the buffer on the sender.
371 scoped_ptr
<PacketPipe
> pipe
;
372 BuildPipe(&pipe
, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
373 // This represents the buffer on the router.
374 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
375 BuildPipe(&pipe
, new RandomSortedDelay(20E-3, 60E-3, 1));
376 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
377 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
378 BuildPipe(&pipe
, new ConstantDelay(1E-3));
379 BuildPipe(&pipe
, new NetworkGlitchPipe(2.0, 0.3));
380 BuildPipe(&pipe
, new RandomUnsortedDelay(20E-3));
381 // This represents the buffer on the receiving device.
382 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
386 class UDPProxyImpl
: public UDPProxy
{
388 UDPProxyImpl(const net::IPEndPoint
& local_port
,
389 const net::IPEndPoint
& destination
,
390 scoped_ptr
<PacketPipe
> to_dest_pipe
,
391 scoped_ptr
<PacketPipe
> from_dest_pipe
,
392 net::NetLog
* net_log
)
393 : local_port_(local_port
),
394 destination_(destination
),
395 destination_is_mutable_(destination
.address().empty()),
396 proxy_thread_("media::cast::test::UdpProxy Thread"),
397 to_dest_pipe_(to_dest_pipe
.Pass()),
398 from_dest_pipe_(from_dest_pipe
.Pass()),
400 weak_factory_(this) {
401 proxy_thread_
.StartWithOptions(
402 base::Thread::Options(base::MessageLoop::TYPE_IO
, 0));
403 base::WaitableEvent
start_event(false, false);
404 proxy_thread_
.message_loop_proxy()->PostTask(
406 base::Bind(&UDPProxyImpl::Start
,
407 base::Unretained(this),
408 base::Unretained(&start_event
),
413 virtual ~UDPProxyImpl() {
414 base::WaitableEvent
stop_event(false, false);
415 proxy_thread_
.message_loop_proxy()->PostTask(
417 base::Bind(&UDPProxyImpl::Stop
,
418 base::Unretained(this),
419 base::Unretained(&stop_event
)));
421 proxy_thread_
.Stop();
424 void Send(scoped_ptr
<transport::Packet
> packet
,
425 const net::IPEndPoint
& destination
) {
427 LOG(ERROR
) << "Cannot write packet right now: blocked";
431 VLOG(1) << "Sending packet, len = " << packet
->size();
432 // We ignore all problems, callbacks and errors.
433 // If it didn't work we just drop the packet at and call it a day.
434 scoped_refptr
<net::IOBuffer
> buf
=
435 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet
->front()));
436 size_t buf_size
= packet
->size();
438 if (destination
.address().empty()) {
439 VLOG(1) << "Destination has not been set yet.";
440 result
= net::ERR_INVALID_ARGUMENT
;
442 VLOG(1) << "Destination:" << destination
.ToString();
443 result
= socket_
->SendTo(buf
,
444 static_cast<int>(buf_size
),
446 base::Bind(&UDPProxyImpl::AllowWrite
,
447 weak_factory_
.GetWeakPtr(),
449 base::Passed(&packet
)));
451 if (result
== net::ERR_IO_PENDING
) {
453 } else if (result
< 0) {
454 LOG(ERROR
) << "Failed to write packet.";
459 void Start(base::WaitableEvent
* start_event
,
460 net::NetLog
* net_log
) {
461 socket_
.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND
,
462 net::RandIntCallback(),
464 net::NetLog::Source()));
465 BuildPipe(&to_dest_pipe_
, new PacketSender(this, &destination_
));
466 BuildPipe(&from_dest_pipe_
, new PacketSender(this, &return_address_
));
467 to_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current(),
469 from_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current(),
472 VLOG(0) << "From:" << local_port_
.ToString();
473 if (!destination_is_mutable_
)
474 VLOG(0) << "To:" << destination_
.ToString();
476 CHECK_GE(socket_
->Bind(local_port_
), 0);
478 start_event
->Signal();
482 void Stop(base::WaitableEvent
* stop_event
) {
483 to_dest_pipe_
.reset(NULL
);
484 from_dest_pipe_
.reset(NULL
);
486 stop_event
->Signal();
489 void ProcessPacket(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
490 DCHECK_NE(len
, net::ERR_IO_PENDING
);
491 VLOG(1) << "Got packet, len = " << len
;
493 LOG(WARNING
) << "Socket read error: " << len
;
496 packet_
->resize(len
);
497 if (destination_is_mutable_
&& set_destination_next_
&&
498 !(recv_address_
== return_address_
) &&
499 !(recv_address_
== destination_
)) {
500 destination_
= recv_address_
;
502 if (recv_address_
== destination_
) {
503 set_destination_next_
= false;
504 from_dest_pipe_
->Send(packet_
.Pass());
506 set_destination_next_
= true;
507 VLOG(1) << "Return address = " << recv_address_
.ToString();
508 return_address_
= recv_address_
;
509 to_dest_pipe_
->Send(packet_
.Pass());
513 void ReadCallback(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
514 ProcessPacket(recv_buf
, len
);
520 packet_
.reset(new transport::Packet(kMaxPacketSize
));
521 scoped_refptr
<net::IOBuffer
> recv_buf
=
522 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_
->front()));
523 int len
= socket_
->RecvFrom(
527 base::Bind(&UDPProxyImpl::ReadCallback
,
528 base::Unretained(this),
530 if (len
== net::ERR_IO_PENDING
)
532 ProcessPacket(recv_buf
, len
);
536 void AllowWrite(scoped_refptr
<net::IOBuffer
> buf
,
537 scoped_ptr
<transport::Packet
> packet
,
544 net::IPEndPoint local_port_
;
546 net::IPEndPoint destination_
;
547 bool destination_is_mutable_
;
549 net::IPEndPoint return_address_
;
550 bool set_destination_next_
;
552 base::DefaultTickClock tick_clock_
;
553 base::Thread proxy_thread_
;
554 scoped_ptr
<net::UDPSocket
> socket_
;
555 scoped_ptr
<PacketPipe
> to_dest_pipe_
;
556 scoped_ptr
<PacketPipe
> from_dest_pipe_
;
559 net::IPEndPoint recv_address_
;
560 scoped_ptr
<transport::Packet
> packet_
;
565 base::WeakPtrFactory
<UDPProxyImpl
> weak_factory_
;
568 void PacketSender::Send(scoped_ptr
<transport::Packet
> packet
) {
569 udp_proxy_
->Send(packet
.Pass(), *destination_
);
572 scoped_ptr
<UDPProxy
> UDPProxy::Create(
573 const net::IPEndPoint
& local_port
,
574 const net::IPEndPoint
& destination
,
575 scoped_ptr
<PacketPipe
> to_dest_pipe
,
576 scoped_ptr
<PacketPipe
> from_dest_pipe
,
577 net::NetLog
* net_log
) {
578 scoped_ptr
<UDPProxy
> ret(new UDPProxyImpl(local_port
,
581 from_dest_pipe
.Pass(),