1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/test/utility/udp_proxy.h"
7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h"
11 #include "base/threading/thread.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/udp/udp_socket.h"
20 const size_t kMaxPacketSize
= 65536;
22 PacketPipe::PacketPipe() {}
23 PacketPipe::~PacketPipe() {}
24 void PacketPipe::InitOnIOThread(
25 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
) {
26 task_runner_
= task_runner
;
28 pipe_
->InitOnIOThread(task_runner
);
31 void PacketPipe::AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) {
33 pipe_
->AppendToPipe(pipe
.Pass());
39 // Roughly emulates a buffer inside a device.
40 // If the buffer is full, packets are dropped.
41 // Packets are output at a maximum bandwidth.
42 class Buffer
: public PacketPipe
{
44 Buffer(size_t buffer_size
, double max_megabits_per_second
)
46 max_buffer_size_(buffer_size
),
47 max_megabits_per_second_(max_megabits_per_second
),
48 weak_factory_(this) {}
50 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
51 if (packet
->size() + buffer_size_
<= max_buffer_size_
) {
52 buffer_size_
+= packet
->size();
53 buffer_
.push_back(linked_ptr
<transport::Packet
>(packet
.release()));
54 if (buffer_
.size() == 1) {
62 double megabits
= buffer_
.front()->size() * 8 / 1000000.0;
63 double seconds
= megabits
/ max_megabits_per_second_
;
64 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
65 task_runner_
->PostDelayedTask(
67 base::Bind(&Buffer::ProcessBuffer
, weak_factory_
.GetWeakPtr()),
68 base::TimeDelta::FromMicroseconds(microseconds
));
71 void ProcessBuffer() {
72 CHECK(!buffer_
.empty());
73 scoped_ptr
<transport::Packet
> packet(buffer_
.front().release());
74 buffer_size_
-= packet
->size();
76 pipe_
->Send(packet
.Pass());
77 if (!buffer_
.empty()) {
82 std::deque
<linked_ptr
<transport::Packet
> > buffer_
;
84 size_t max_buffer_size_
;
85 double max_megabits_per_second_
; // megabits per second
86 base::WeakPtrFactory
<Buffer
> weak_factory_
;
89 scoped_ptr
<PacketPipe
> NewBuffer(size_t buffer_size
, double bandwidth
) {
90 return scoped_ptr
<PacketPipe
>(new Buffer(buffer_size
, bandwidth
)).Pass();
93 class RandomDrop
: public PacketPipe
{
95 RandomDrop(double drop_fraction
) : drop_fraction_(drop_fraction
) {
98 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
99 if (base::RandDouble() >= drop_fraction_
) {
100 pipe_
->Send(packet
.Pass());
105 double drop_fraction_
;
108 scoped_ptr
<PacketPipe
> NewRandomDrop(double drop_fraction
) {
109 return scoped_ptr
<PacketPipe
>(new RandomDrop(drop_fraction
)).Pass();
112 class SimpleDelayBase
: public PacketPipe
{
114 SimpleDelayBase() : weak_factory_(this) {}
115 virtual ~SimpleDelayBase() {}
117 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
118 double seconds
= GetDelay();
119 task_runner_
->PostDelayedTask(
121 base::Bind(&SimpleDelayBase::SendInternal
,
122 weak_factory_
.GetWeakPtr(),
123 base::Passed(&packet
)),
124 base::TimeDelta::FromMicroseconds(static_cast<int64
>(seconds
* 1E6
)));
127 virtual double GetDelay() = 0;
130 virtual void SendInternal(scoped_ptr
<transport::Packet
> packet
) {
131 pipe_
->Send(packet
.Pass());
134 base::WeakPtrFactory
<SimpleDelayBase
> weak_factory_
;
137 class ConstantDelay
: public SimpleDelayBase
{
139 ConstantDelay(double delay_seconds
) : delay_seconds_(delay_seconds
) {}
140 virtual double GetDelay() OVERRIDE
{
141 return delay_seconds_
;
145 double delay_seconds_
;
148 scoped_ptr
<PacketPipe
> NewConstantDelay(double delay_seconds
) {
149 return scoped_ptr
<PacketPipe
>(new ConstantDelay(delay_seconds
)).Pass();
152 class RandomUnsortedDelay
: public SimpleDelayBase
{
154 RandomUnsortedDelay(double random_delay
) : random_delay_(random_delay
) {}
156 virtual double GetDelay() OVERRIDE
{
157 return random_delay_
* base::RandDouble();
161 double random_delay_
;
164 scoped_ptr
<PacketPipe
> NewRandomUnsortedDelay(double random_delay
) {
165 return scoped_ptr
<PacketPipe
>(new RandomUnsortedDelay(random_delay
)).Pass();
169 class RandomSortedDelay
: public PacketPipe
{
171 RandomSortedDelay(double random_delay
,
173 double seconds_between_extra_delay
)
174 : random_delay_(random_delay
),
175 extra_delay_(extra_delay
),
176 seconds_between_extra_delay_(seconds_between_extra_delay
),
177 weak_factory_(this) {}
179 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
180 buffer_
.push_back(linked_ptr
<transport::Packet
>(packet
.release()));
181 if (buffer_
.size() == 1) {
185 virtual void InitOnIOThread(
186 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
) OVERRIDE
{
187 PacketPipe::InitOnIOThread(task_runner
);
188 // As we start the stream, assume that we are in a random
189 // place between two extra delays, thus multiplier = 1.0;
190 ScheduleExtraDelay(1.0);
194 void ScheduleExtraDelay(double mult
) {
195 double seconds
= seconds_between_extra_delay_
* mult
* base::RandDouble();
196 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
197 task_runner_
->PostDelayedTask(
199 base::Bind(&RandomSortedDelay::CauseExtraDelay
,
200 weak_factory_
.GetWeakPtr()),
201 base::TimeDelta::FromMicroseconds(microseconds
));
204 void CauseExtraDelay() {
205 block_until_
= base::TimeTicks::Now() +
206 base::TimeDelta::FromMicroseconds(
207 static_cast<int64
>(extra_delay_
* 1E6
));
208 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
209 // before scheduling another one to make the average equal to
210 // seconds_between_extra_delay_.
211 ScheduleExtraDelay(2.0);
215 double seconds
= base::RandDouble() * random_delay_
;
216 base::TimeDelta block_time
= block_until_
- base::TimeTicks::Now();
217 base::TimeDelta delay_time
=
218 base::TimeDelta::FromMicroseconds(
219 static_cast<int64
>(seconds
* 1E6
));
220 if (block_time
> delay_time
) {
221 block_time
= delay_time
;
224 task_runner_
->PostDelayedTask(FROM_HERE
,
225 base::Bind(&RandomSortedDelay::ProcessBuffer
,
226 weak_factory_
.GetWeakPtr()),
230 void ProcessBuffer() {
231 CHECK(!buffer_
.empty());
232 scoped_ptr
<transport::Packet
> packet(buffer_
.front().release());
233 pipe_
->Send(packet
.Pass());
235 if (!buffer_
.empty()) {
240 base::TimeTicks block_until_
;
241 std::deque
<linked_ptr
<transport::Packet
> > buffer_
;
242 double random_delay_
;
244 double seconds_between_extra_delay_
;
245 base::WeakPtrFactory
<RandomSortedDelay
> weak_factory_
;
248 scoped_ptr
<PacketPipe
> NewRandomSortedDelay(
251 double seconds_between_extra_delay
) {
252 return scoped_ptr
<PacketPipe
>(
253 new RandomSortedDelay(
254 random_delay
, extra_delay
, seconds_between_extra_delay
))
258 class NetworkGlitchPipe
: public PacketPipe
{
260 NetworkGlitchPipe(double average_work_time
, double average_outage_time
)
262 max_work_time_(average_work_time
* 2),
263 max_outage_time_(average_outage_time
* 2),
264 weak_factory_(this) {}
266 virtual void InitOnIOThread(
267 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
) OVERRIDE
{
268 PacketPipe::InitOnIOThread(task_runner
);
272 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
274 pipe_
->Send(packet
.Pass());
281 double seconds
= base::RandDouble() *
282 (works_
? max_work_time_
: max_outage_time_
);
283 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
284 task_runner_
->PostDelayedTask(
286 base::Bind(&NetworkGlitchPipe::Flip
, weak_factory_
.GetWeakPtr()),
287 base::TimeDelta::FromMicroseconds(microseconds
));
291 double max_work_time_
;
292 double max_outage_time_
;
293 base::WeakPtrFactory
<NetworkGlitchPipe
> weak_factory_
;
296 scoped_ptr
<PacketPipe
> NewNetworkGlitchPipe(double average_work_time
,
297 double average_outage_time
) {
298 return scoped_ptr
<PacketPipe
>(
299 new NetworkGlitchPipe(average_work_time
, average_outage_time
))
303 class PacketSender
: public PacketPipe
{
305 PacketSender(net::UDPSocket
* udp_socket
,
306 const net::IPEndPoint
* destination
) :
308 udp_socket_(udp_socket
),
309 destination_(destination
),
310 weak_factory_(this) {
312 virtual void Send(scoped_ptr
<transport::Packet
> packet
) OVERRIDE
{
314 LOG(ERROR
) << "Cannot write packet right now: blocked";
318 VLOG(1) << "Sending packet, len = " << packet
->size();
319 // We ignore all problems, callbacks and errors.
320 // If it didn't work we just drop the packet at and call it a day.
321 scoped_refptr
<net::IOBuffer
> buf
=
322 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet
->front()));
323 size_t buf_size
= packet
->size();
325 if (destination_
->address().empty()) {
326 VLOG(1) << "Destination has not been set yet.";
327 result
= net::ERR_INVALID_ARGUMENT
;
329 VLOG(1) << "Destination:" << destination_
->ToString();
330 result
= udp_socket_
->SendTo(buf
,
331 static_cast<int>(buf_size
),
333 base::Bind(&PacketSender::AllowWrite
,
334 weak_factory_
.GetWeakPtr(),
336 base::Passed(&packet
)));
338 if (result
== net::ERR_IO_PENDING
) {
340 } else if (result
< 0) {
341 LOG(ERROR
) << "Failed to write packet.";
344 virtual void AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) OVERRIDE
{
349 void AllowWrite(scoped_refptr
<net::IOBuffer
> buf
,
350 scoped_ptr
<transport::Packet
> packet
,
356 net::UDPSocket
* udp_socket_
;
357 const net::IPEndPoint
* destination_
; // not owned
358 base::WeakPtrFactory
<PacketSender
> weak_factory_
;
362 void BuildPipe(scoped_ptr
<PacketPipe
>* pipe
, PacketPipe
* next
) {
364 (*pipe
)->AppendToPipe(scoped_ptr
<PacketPipe
>(next
).Pass());
371 scoped_ptr
<PacketPipe
> WifiNetwork() {
372 // This represents the buffer on the sender.
373 scoped_ptr
<PacketPipe
> pipe
;
374 BuildPipe(&pipe
, new Buffer(256 << 10, 5000000));
375 BuildPipe(&pipe
, new RandomDrop(0.005));
376 // This represents the buffer on the router.
377 BuildPipe(&pipe
, new ConstantDelay(1E-3));
378 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
379 BuildPipe(&pipe
, new Buffer(256 << 10, 5000000));
380 BuildPipe(&pipe
, new ConstantDelay(1E-3));
381 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
382 BuildPipe(&pipe
, new RandomDrop(0.005));
383 // This represents the buffer on the receiving device.
384 BuildPipe(&pipe
, new Buffer(256 << 10, 5000000));
388 scoped_ptr
<PacketPipe
> BadNetwork() {
389 scoped_ptr
<PacketPipe
> pipe
;
390 // This represents the buffer on the sender.
391 BuildPipe(&pipe
, new Buffer(64 << 10, 5000000)); // 64 kb buf, 5mbit/s
392 BuildPipe(&pipe
, new RandomDrop(0.05)); // 5% packet drop
393 BuildPipe(&pipe
, new RandomSortedDelay(2E-3, 20E-3, 1));
394 // This represents the buffer on the router.
395 BuildPipe(&pipe
, new Buffer(64 << 10, 2000000)); // 64 kb buf, 2mbit/s
396 BuildPipe(&pipe
, new ConstantDelay(1E-3));
397 // Random 40ms every other second
398 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
399 BuildPipe(&pipe
, new RandomUnsortedDelay(5E-3));
400 // This represents the buffer on the receiving device.
401 BuildPipe(&pipe
, new Buffer(64 << 10, 4000000)); // 64 kb buf, 4mbit/s
406 scoped_ptr
<PacketPipe
> EvilNetwork() {
407 // This represents the buffer on the sender.
408 scoped_ptr
<PacketPipe
> pipe
;
409 BuildPipe(&pipe
, new Buffer(4 << 10, 2000000));
410 // This represents the buffer on the router.
411 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
412 BuildPipe(&pipe
, new RandomSortedDelay(20E-3, 60E-3, 1));
413 BuildPipe(&pipe
, new Buffer(4 << 10, 1000000)); // 4 kb buf, 1mbit/s
414 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
415 BuildPipe(&pipe
, new ConstantDelay(1E-3));
416 BuildPipe(&pipe
, new NetworkGlitchPipe(2.0, 0.3));
417 BuildPipe(&pipe
, new RandomUnsortedDelay(20E-3));
418 // This represents the buffer on the receiving device.
419 BuildPipe(&pipe
, new Buffer(4 << 10, 2000000)); // 4 kb buf, 2mbit/s
423 class UDPProxyImpl
: public UDPProxy
{
425 UDPProxyImpl(const net::IPEndPoint
& local_port
,
426 const net::IPEndPoint
& destination
,
427 scoped_ptr
<PacketPipe
> to_dest_pipe
,
428 scoped_ptr
<PacketPipe
> from_dest_pipe
,
429 net::NetLog
* net_log
) :
430 local_port_(local_port
),
431 destination_(destination
),
432 proxy_thread_("media::cast::test::UdpProxy Thread"),
433 to_dest_pipe_(to_dest_pipe
.Pass()),
434 from_dest_pipe_(from_dest_pipe
.Pass()) {
435 proxy_thread_
.StartWithOptions(
436 base::Thread::Options(base::MessageLoop::TYPE_IO
, 0));
437 base::WaitableEvent
start_event(false, false);
438 proxy_thread_
.message_loop_proxy()->PostTask(
440 base::Bind(&UDPProxyImpl::Start
,
441 base::Unretained(this),
442 base::Unretained(&start_event
),
447 virtual ~UDPProxyImpl() {
448 base::WaitableEvent
stop_event(false, false);
449 proxy_thread_
.message_loop_proxy()->PostTask(
451 base::Bind(&UDPProxyImpl::Stop
,
452 base::Unretained(this),
453 base::Unretained(&stop_event
)));
455 proxy_thread_
.Stop();
459 void Start(base::WaitableEvent
* start_event
,
460 net::NetLog
* net_log
) {
461 socket_
.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND
,
462 net::RandIntCallback(),
464 net::NetLog::Source()));
465 BuildPipe(&to_dest_pipe_
, new PacketSender(socket_
.get(), &destination_
));
466 BuildPipe(&from_dest_pipe_
,
467 new PacketSender(socket_
.get(), &return_address_
));
468 to_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current());
469 from_dest_pipe_
->InitOnIOThread(base::MessageLoopProxy::current());
471 VLOG(0) << "From:" << local_port_
.ToString();
472 VLOG(0) << "To:" << destination_
.ToString();
474 CHECK_GE(socket_
->Bind(local_port_
), 0);
476 start_event
->Signal();
480 void Stop(base::WaitableEvent
* stop_event
) {
481 to_dest_pipe_
.reset(NULL
);
482 from_dest_pipe_
.reset(NULL
);
484 stop_event
->Signal();
487 void ProcessPacket(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
488 DCHECK_NE(len
, net::ERR_IO_PENDING
);
489 VLOG(1) << "Got packet, len = " << len
;
491 LOG(WARNING
) << "Socket read error: " << len
;
494 packet_
->resize(len
);
495 if (recv_address_
== destination_
) {
496 from_dest_pipe_
->Send(packet_
.Pass());
498 VLOG(1) << "Return address = " << recv_address_
.ToString();
499 return_address_
= recv_address_
;
500 to_dest_pipe_
->Send(packet_
.Pass());
504 void ReadCallback(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
505 ProcessPacket(recv_buf
, len
);
511 packet_
.reset(new transport::Packet(kMaxPacketSize
));
512 scoped_refptr
<net::IOBuffer
> recv_buf
=
513 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_
->front()));
514 int len
= socket_
->RecvFrom(
518 base::Bind(&UDPProxyImpl::ReadCallback
,
519 base::Unretained(this),
521 if (len
== net::ERR_IO_PENDING
)
523 ProcessPacket(recv_buf
, len
);
528 net::IPEndPoint local_port_
;
529 net::IPEndPoint destination_
;
530 net::IPEndPoint recv_address_
;
531 net::IPEndPoint return_address_
;
532 base::Thread proxy_thread_
;
533 scoped_ptr
<net::UDPSocket
> socket_
;
534 scoped_ptr
<PacketPipe
> to_dest_pipe_
;
535 scoped_ptr
<PacketPipe
> from_dest_pipe_
;
536 scoped_ptr
<transport::Packet
> packet_
;
539 scoped_ptr
<UDPProxy
> UDPProxy::Create(
540 const net::IPEndPoint
& local_port
,
541 const net::IPEndPoint
& destination
,
542 scoped_ptr
<PacketPipe
> to_dest_pipe
,
543 scoped_ptr
<PacketPipe
> from_dest_pipe
,
544 net::NetLog
* net_log
) {
545 scoped_ptr
<UDPProxy
> ret(new UDPProxyImpl(local_port
,
548 from_dest_pipe
.Pass(),