Revert 273599 "Cast: Synthetic benchmark tool."
[chromium-blink-merge.git] / media / cast / test / utility / udp_proxy.cc
blob05c3b93891aaf93a46fd52e0d490949f12ccf172
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/test/utility/udp_proxy.h"
7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h"
11 #include "base/threading/thread.h"
12 #include "base/time/default_tick_clock.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/udp/udp_socket.h"
17 namespace media {
18 namespace cast {
19 namespace test {
21 const size_t kMaxPacketSize = 65536;
23 PacketPipe::PacketPipe() {}
24 PacketPipe::~PacketPipe() {}
25 void PacketPipe::InitOnIOThread(
26 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
27 base::TickClock* clock) {
28 task_runner_ = task_runner;
29 clock_ = clock;
30 if (pipe_) {
31 pipe_->InitOnIOThread(task_runner, clock);
34 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
35 if (pipe_) {
36 pipe_->AppendToPipe(pipe.Pass());
37 } else {
38 pipe_ = pipe.Pass();
42 // Roughly emulates a buffer inside a device.
43 // If the buffer is full, packets are dropped.
44 // Packets are output at a maximum bandwidth.
45 class Buffer : public PacketPipe {
46 public:
47 Buffer(size_t buffer_size, double max_megabits_per_second)
48 : buffer_size_(0),
49 max_buffer_size_(buffer_size),
50 max_megabits_per_second_(max_megabits_per_second),
51 weak_factory_(this) {}
53 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
54 if (packet->size() + buffer_size_ <= max_buffer_size_) {
55 buffer_size_ += packet->size();
56 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
57 if (buffer_.size() == 1) {
58 Schedule();
63 private:
64 void Schedule() {
65 double megabits = buffer_.front()->size() * 8 / 1000000.0;
66 double seconds = megabits / max_megabits_per_second_;
67 int64 microseconds = static_cast<int64>(seconds * 1E6);
68 task_runner_->PostDelayedTask(
69 FROM_HERE,
70 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
71 base::TimeDelta::FromMicroseconds(microseconds));
74 void ProcessBuffer() {
75 CHECK(!buffer_.empty());
76 scoped_ptr<transport::Packet> packet(buffer_.front().release());
77 buffer_size_ -= packet->size();
78 buffer_.pop_front();
79 pipe_->Send(packet.Pass());
80 if (!buffer_.empty()) {
81 Schedule();
85 std::deque<linked_ptr<transport::Packet> > buffer_;
86 size_t buffer_size_;
87 size_t max_buffer_size_;
88 double max_megabits_per_second_; // megabits per second
89 base::WeakPtrFactory<Buffer> weak_factory_;
92 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
93 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
96 class RandomDrop : public PacketPipe {
97 public:
98 RandomDrop(double drop_fraction) : drop_fraction_(drop_fraction) {
101 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
102 if (base::RandDouble() >= drop_fraction_) {
103 pipe_->Send(packet.Pass());
107 private:
108 double drop_fraction_;
111 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
112 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
115 class SimpleDelayBase : public PacketPipe {
116 public:
117 SimpleDelayBase() : weak_factory_(this) {}
118 virtual ~SimpleDelayBase() {}
120 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
121 double seconds = GetDelay();
122 task_runner_->PostDelayedTask(
123 FROM_HERE,
124 base::Bind(&SimpleDelayBase::SendInternal,
125 weak_factory_.GetWeakPtr(),
126 base::Passed(&packet)),
127 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
129 protected:
130 virtual double GetDelay() = 0;
132 private:
133 virtual void SendInternal(scoped_ptr<transport::Packet> packet) {
134 pipe_->Send(packet.Pass());
137 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
140 class ConstantDelay : public SimpleDelayBase {
141 public:
142 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
143 virtual double GetDelay() OVERRIDE {
144 return delay_seconds_;
147 private:
148 double delay_seconds_;
151 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
152 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
155 class RandomUnsortedDelay : public SimpleDelayBase {
156 public:
157 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
159 virtual double GetDelay() OVERRIDE {
160 return random_delay_ * base::RandDouble();
163 private:
164 double random_delay_;
167 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
168 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
172 class RandomSortedDelay : public PacketPipe {
173 public:
174 RandomSortedDelay(double random_delay,
175 double extra_delay,
176 double seconds_between_extra_delay)
177 : random_delay_(random_delay),
178 extra_delay_(extra_delay),
179 seconds_between_extra_delay_(seconds_between_extra_delay),
180 weak_factory_(this) {}
182 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
183 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
184 if (buffer_.size() == 1) {
185 Schedule();
188 virtual void InitOnIOThread(
189 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
190 base::TickClock* clock) OVERRIDE {
191 PacketPipe::InitOnIOThread(task_runner, clock);
192 // As we start the stream, assume that we are in a random
193 // place between two extra delays, thus multiplier = 1.0;
194 ScheduleExtraDelay(1.0);
197 private:
198 void ScheduleExtraDelay(double mult) {
199 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
200 int64 microseconds = static_cast<int64>(seconds * 1E6);
201 task_runner_->PostDelayedTask(
202 FROM_HERE,
203 base::Bind(&RandomSortedDelay::CauseExtraDelay,
204 weak_factory_.GetWeakPtr()),
205 base::TimeDelta::FromMicroseconds(microseconds));
208 void CauseExtraDelay() {
209 block_until_ = clock_->NowTicks() +
210 base::TimeDelta::FromMicroseconds(
211 static_cast<int64>(extra_delay_ * 1E6));
212 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
213 // before scheduling another one to make the average equal to
214 // seconds_between_extra_delay_.
215 ScheduleExtraDelay(2.0);
218 void Schedule() {
219 double seconds = base::RandDouble() * random_delay_;
220 base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
221 base::TimeDelta delay_time =
222 base::TimeDelta::FromMicroseconds(
223 static_cast<int64>(seconds * 1E6));
224 if (block_time > delay_time) {
225 block_time = delay_time;
228 task_runner_->PostDelayedTask(FROM_HERE,
229 base::Bind(&RandomSortedDelay::ProcessBuffer,
230 weak_factory_.GetWeakPtr()),
231 delay_time);
234 void ProcessBuffer() {
235 CHECK(!buffer_.empty());
236 scoped_ptr<transport::Packet> packet(buffer_.front().release());
237 pipe_->Send(packet.Pass());
238 buffer_.pop_front();
239 if (!buffer_.empty()) {
240 Schedule();
244 base::TimeTicks block_until_;
245 std::deque<linked_ptr<transport::Packet> > buffer_;
246 double random_delay_;
247 double extra_delay_;
248 double seconds_between_extra_delay_;
249 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
252 scoped_ptr<PacketPipe> NewRandomSortedDelay(
253 double random_delay,
254 double extra_delay,
255 double seconds_between_extra_delay) {
256 return scoped_ptr<PacketPipe>(
257 new RandomSortedDelay(
258 random_delay, extra_delay, seconds_between_extra_delay))
259 .Pass();
262 class NetworkGlitchPipe : public PacketPipe {
263 public:
264 NetworkGlitchPipe(double average_work_time, double average_outage_time)
265 : works_(false),
266 max_work_time_(average_work_time * 2),
267 max_outage_time_(average_outage_time * 2),
268 weak_factory_(this) {}
270 virtual void InitOnIOThread(
271 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
272 base::TickClock* clock) OVERRIDE {
273 PacketPipe::InitOnIOThread(task_runner, clock);
274 Flip();
277 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
278 if (works_) {
279 pipe_->Send(packet.Pass());
283 private:
284 void Flip() {
285 works_ = !works_;
286 double seconds = base::RandDouble() *
287 (works_ ? max_work_time_ : max_outage_time_);
288 int64 microseconds = static_cast<int64>(seconds * 1E6);
289 task_runner_->PostDelayedTask(
290 FROM_HERE,
291 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
292 base::TimeDelta::FromMicroseconds(microseconds));
295 bool works_;
296 double max_work_time_;
297 double max_outage_time_;
298 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
301 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
302 double average_outage_time) {
303 return scoped_ptr<PacketPipe>(
304 new NetworkGlitchPipe(average_work_time, average_outage_time))
305 .Pass();
308 class UDPProxyImpl;
310 class PacketSender : public PacketPipe {
311 public:
312 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
313 : udp_proxy_(udp_proxy), destination_(destination) {}
314 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE;
315 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
316 NOTREACHED();
319 private:
320 UDPProxyImpl* udp_proxy_;
321 const net::IPEndPoint* destination_; // not owned
324 namespace {
325 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
326 if (*pipe) {
327 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
328 } else {
329 pipe->reset(next);
332 } // namespace
334 scoped_ptr<PacketPipe> WifiNetwork() {
335 // This represents the buffer on the sender.
336 scoped_ptr<PacketPipe> pipe;
337 BuildPipe(&pipe, new Buffer(256 << 10, 20));
338 BuildPipe(&pipe, new RandomDrop(0.005));
339 // This represents the buffer on the router.
340 BuildPipe(&pipe, new ConstantDelay(1E-3));
341 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
342 BuildPipe(&pipe, new Buffer(256 << 10, 20));
343 BuildPipe(&pipe, new ConstantDelay(1E-3));
344 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
345 BuildPipe(&pipe, new RandomDrop(0.005));
346 // This represents the buffer on the receiving device.
347 BuildPipe(&pipe, new Buffer(256 << 10, 20));
348 return pipe.Pass();
351 scoped_ptr<PacketPipe> BadNetwork() {
352 scoped_ptr<PacketPipe> pipe;
353 // This represents the buffer on the sender.
354 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
355 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
356 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
357 // This represents the buffer on the router.
358 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
359 BuildPipe(&pipe, new ConstantDelay(1E-3));
360 // Random 40ms every other second
361 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
362 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
363 // This represents the buffer on the receiving device.
364 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
365 return pipe.Pass();
369 scoped_ptr<PacketPipe> EvilNetwork() {
370 // This represents the buffer on the sender.
371 scoped_ptr<PacketPipe> pipe;
372 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
373 // This represents the buffer on the router.
374 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
375 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
376 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
377 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
378 BuildPipe(&pipe, new ConstantDelay(1E-3));
379 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
380 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
381 // This represents the buffer on the receiving device.
382 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
383 return pipe.Pass();
386 class UDPProxyImpl : public UDPProxy {
387 public:
388 UDPProxyImpl(const net::IPEndPoint& local_port,
389 const net::IPEndPoint& destination,
390 scoped_ptr<PacketPipe> to_dest_pipe,
391 scoped_ptr<PacketPipe> from_dest_pipe,
392 net::NetLog* net_log)
393 : local_port_(local_port),
394 destination_(destination),
395 destination_is_mutable_(destination.address().empty()),
396 proxy_thread_("media::cast::test::UdpProxy Thread"),
397 to_dest_pipe_(to_dest_pipe.Pass()),
398 from_dest_pipe_(from_dest_pipe.Pass()),
399 blocked_(false),
400 weak_factory_(this) {
401 proxy_thread_.StartWithOptions(
402 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
403 base::WaitableEvent start_event(false, false);
404 proxy_thread_.message_loop_proxy()->PostTask(
405 FROM_HERE,
406 base::Bind(&UDPProxyImpl::Start,
407 base::Unretained(this),
408 base::Unretained(&start_event),
409 net_log));
410 start_event.Wait();
413 virtual ~UDPProxyImpl() {
414 base::WaitableEvent stop_event(false, false);
415 proxy_thread_.message_loop_proxy()->PostTask(
416 FROM_HERE,
417 base::Bind(&UDPProxyImpl::Stop,
418 base::Unretained(this),
419 base::Unretained(&stop_event)));
420 stop_event.Wait();
421 proxy_thread_.Stop();
424 void Send(scoped_ptr<transport::Packet> packet,
425 const net::IPEndPoint& destination) {
426 if (blocked_) {
427 LOG(ERROR) << "Cannot write packet right now: blocked";
428 return;
431 VLOG(1) << "Sending packet, len = " << packet->size();
432 // We ignore all problems, callbacks and errors.
433 // If it didn't work we just drop the packet at and call it a day.
434 scoped_refptr<net::IOBuffer> buf =
435 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
436 size_t buf_size = packet->size();
437 int result;
438 if (destination.address().empty()) {
439 VLOG(1) << "Destination has not been set yet.";
440 result = net::ERR_INVALID_ARGUMENT;
441 } else {
442 VLOG(1) << "Destination:" << destination.ToString();
443 result = socket_->SendTo(buf,
444 static_cast<int>(buf_size),
445 destination,
446 base::Bind(&UDPProxyImpl::AllowWrite,
447 weak_factory_.GetWeakPtr(),
448 buf,
449 base::Passed(&packet)));
451 if (result == net::ERR_IO_PENDING) {
452 blocked_ = true;
453 } else if (result < 0) {
454 LOG(ERROR) << "Failed to write packet.";
458 private:
459 void Start(base::WaitableEvent* start_event,
460 net::NetLog* net_log) {
461 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
462 net::RandIntCallback(),
463 net_log,
464 net::NetLog::Source()));
465 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
466 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
467 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
468 &tick_clock_);
469 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current(),
470 &tick_clock_);
472 VLOG(0) << "From:" << local_port_.ToString();
473 if (!destination_is_mutable_)
474 VLOG(0) << "To:" << destination_.ToString();
476 CHECK_GE(socket_->Bind(local_port_), 0);
478 start_event->Signal();
479 PollRead();
482 void Stop(base::WaitableEvent* stop_event) {
483 to_dest_pipe_.reset(NULL);
484 from_dest_pipe_.reset(NULL);
485 socket_.reset(NULL);
486 stop_event->Signal();
489 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
490 DCHECK_NE(len, net::ERR_IO_PENDING);
491 VLOG(1) << "Got packet, len = " << len;
492 if (len < 0) {
493 LOG(WARNING) << "Socket read error: " << len;
494 return;
496 packet_->resize(len);
497 if (destination_is_mutable_ && set_destination_next_ &&
498 !(recv_address_ == return_address_) &&
499 !(recv_address_ == destination_)) {
500 destination_ = recv_address_;
502 if (recv_address_ == destination_) {
503 set_destination_next_ = false;
504 from_dest_pipe_->Send(packet_.Pass());
505 } else {
506 set_destination_next_ = true;
507 VLOG(1) << "Return address = " << recv_address_.ToString();
508 return_address_ = recv_address_;
509 to_dest_pipe_->Send(packet_.Pass());
513 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
514 ProcessPacket(recv_buf, len);
515 PollRead();
518 void PollRead() {
519 while (true) {
520 packet_.reset(new transport::Packet(kMaxPacketSize));
521 scoped_refptr<net::IOBuffer> recv_buf =
522 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
523 int len = socket_->RecvFrom(
524 recv_buf,
525 kMaxPacketSize,
526 &recv_address_,
527 base::Bind(&UDPProxyImpl::ReadCallback,
528 base::Unretained(this),
529 recv_buf));
530 if (len == net::ERR_IO_PENDING)
531 break;
532 ProcessPacket(recv_buf, len);
536 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
537 scoped_ptr<transport::Packet> packet,
538 int unused_len) {
539 DCHECK(blocked_);
540 blocked_ = false;
543 // Input
544 net::IPEndPoint local_port_;
546 net::IPEndPoint destination_;
547 bool destination_is_mutable_;
549 net::IPEndPoint return_address_;
550 bool set_destination_next_;
552 base::DefaultTickClock tick_clock_;
553 base::Thread proxy_thread_;
554 scoped_ptr<net::UDPSocket> socket_;
555 scoped_ptr<PacketPipe> to_dest_pipe_;
556 scoped_ptr<PacketPipe> from_dest_pipe_;
558 // For receiving.
559 net::IPEndPoint recv_address_;
560 scoped_ptr<transport::Packet> packet_;
562 // For sending.
563 bool blocked_;
565 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
568 void PacketSender::Send(scoped_ptr<transport::Packet> packet) {
569 udp_proxy_->Send(packet.Pass(), *destination_);
572 scoped_ptr<UDPProxy> UDPProxy::Create(
573 const net::IPEndPoint& local_port,
574 const net::IPEndPoint& destination,
575 scoped_ptr<PacketPipe> to_dest_pipe,
576 scoped_ptr<PacketPipe> from_dest_pipe,
577 net::NetLog* net_log) {
578 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
579 destination,
580 to_dest_pipe.Pass(),
581 from_dest_pipe.Pass(),
582 net_log));
583 return ret.Pass();
586 } // namespace test
587 } // namespace cast
588 } // namespace media