Pin Chrome's shortcut to the Win10 Start menu on install and OS upgrade.
[chromium-blink-merge.git] / media / cast / test / utility / udp_proxy.cc
blob19191ede44ff48213450d66a49a0173ebba5c67d
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include <math.h>
6 #include <stdlib.h>
7 #include <vector>
9 #include "media/cast/test/utility/udp_proxy.h"
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread.h"
16 #include "base/time/default_tick_clock.h"
17 #include "net/base/io_buffer.h"
18 #include "net/base/net_errors.h"
19 #include "net/udp/udp_server_socket.h"
21 namespace media {
22 namespace cast {
23 namespace test {
25 const size_t kMaxPacketSize = 65536;
27 PacketPipe::PacketPipe() {}
28 PacketPipe::~PacketPipe() {}
29 void PacketPipe::InitOnIOThread(
30 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
31 base::TickClock* clock) {
32 task_runner_ = task_runner;
33 clock_ = clock;
34 if (pipe_) {
35 pipe_->InitOnIOThread(task_runner, clock);
38 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
39 if (pipe_) {
40 pipe_->AppendToPipe(pipe.Pass());
41 } else {
42 pipe_ = pipe.Pass();
46 // Roughly emulates a buffer inside a device.
47 // If the buffer is full, packets are dropped.
48 // Packets are output at a maximum bandwidth.
49 class Buffer : public PacketPipe {
50 public:
51 Buffer(size_t buffer_size, double max_megabits_per_second)
52 : buffer_size_(0),
53 max_buffer_size_(buffer_size),
54 max_megabits_per_second_(max_megabits_per_second),
55 weak_factory_(this) {
56 CHECK_GT(max_buffer_size_, 0UL);
57 CHECK_GT(max_megabits_per_second, 0);
60 void Send(scoped_ptr<Packet> packet) final {
61 if (packet->size() + buffer_size_ <= max_buffer_size_) {
62 buffer_size_ += packet->size();
63 buffer_.push_back(linked_ptr<Packet>(packet.release()));
64 if (buffer_.size() == 1) {
65 Schedule();
70 private:
71 void Schedule() {
72 last_schedule_ = clock_->NowTicks();
73 double megabits = buffer_.front()->size() * 8 / 1000000.0;
74 double seconds = megabits / max_megabits_per_second_;
75 int64 microseconds = static_cast<int64>(seconds * 1E6);
76 task_runner_->PostDelayedTask(
77 FROM_HERE,
78 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
79 base::TimeDelta::FromMicroseconds(microseconds));
82 void ProcessBuffer() {
83 int64 bytes_to_send = static_cast<int64>(
84 (clock_->NowTicks() - last_schedule_).InSecondsF() *
85 max_megabits_per_second_ * 1E6 / 8);
86 if (bytes_to_send < static_cast<int64>(buffer_.front()->size())) {
87 bytes_to_send = buffer_.front()->size();
89 while (!buffer_.empty() &&
90 static_cast<int64>(buffer_.front()->size()) <= bytes_to_send) {
91 CHECK(!buffer_.empty());
92 scoped_ptr<Packet> packet(buffer_.front().release());
93 bytes_to_send -= packet->size();
94 buffer_size_ -= packet->size();
95 buffer_.pop_front();
96 pipe_->Send(packet.Pass());
98 if (!buffer_.empty()) {
99 Schedule();
103 std::deque<linked_ptr<Packet> > buffer_;
104 base::TimeTicks last_schedule_;
105 size_t buffer_size_;
106 size_t max_buffer_size_;
107 double max_megabits_per_second_; // megabits per second
108 base::WeakPtrFactory<Buffer> weak_factory_;
111 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
112 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
115 class RandomDrop : public PacketPipe {
116 public:
117 RandomDrop(double drop_fraction)
118 : drop_fraction_(static_cast<int>(drop_fraction * RAND_MAX)) {}
120 void Send(scoped_ptr<Packet> packet) final {
121 if (rand() > drop_fraction_) {
122 pipe_->Send(packet.Pass());
126 private:
127 int drop_fraction_;
130 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
131 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
134 class SimpleDelayBase : public PacketPipe {
135 public:
136 SimpleDelayBase() : weak_factory_(this) {}
137 ~SimpleDelayBase() override {}
139 void Send(scoped_ptr<Packet> packet) override {
140 double seconds = GetDelay();
141 task_runner_->PostDelayedTask(
142 FROM_HERE,
143 base::Bind(&SimpleDelayBase::SendInternal,
144 weak_factory_.GetWeakPtr(),
145 base::Passed(&packet)),
146 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
148 protected:
149 virtual double GetDelay() = 0;
151 private:
152 virtual void SendInternal(scoped_ptr<Packet> packet) {
153 pipe_->Send(packet.Pass());
156 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
159 class ConstantDelay : public SimpleDelayBase {
160 public:
161 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
162 double GetDelay() final { return delay_seconds_; }
164 private:
165 double delay_seconds_;
168 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
169 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
172 class RandomUnsortedDelay : public SimpleDelayBase {
173 public:
174 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
176 double GetDelay() override { return random_delay_ * base::RandDouble(); }
178 private:
179 double random_delay_;
182 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
183 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
186 class DuplicateAndDelay : public RandomUnsortedDelay {
187 public:
188 DuplicateAndDelay(double delay_min,
189 double random_delay) :
190 RandomUnsortedDelay(random_delay),
191 delay_min_(delay_min) {
193 void Send(scoped_ptr<Packet> packet) final {
194 pipe_->Send(scoped_ptr<Packet>(new Packet(*packet.get())));
195 RandomUnsortedDelay::Send(packet.Pass());
197 double GetDelay() final {
198 return RandomUnsortedDelay::GetDelay() + delay_min_;
200 private:
201 double delay_min_;
204 scoped_ptr<PacketPipe> NewDuplicateAndDelay(double delay_min,
205 double random_delay) {
206 return scoped_ptr<PacketPipe>(
207 new DuplicateAndDelay(delay_min, random_delay)).Pass();
210 class RandomSortedDelay : public PacketPipe {
211 public:
212 RandomSortedDelay(double random_delay,
213 double extra_delay,
214 double seconds_between_extra_delay)
215 : random_delay_(random_delay),
216 extra_delay_(extra_delay),
217 seconds_between_extra_delay_(seconds_between_extra_delay),
218 weak_factory_(this) {}
220 void Send(scoped_ptr<Packet> packet) final {
221 buffer_.push_back(linked_ptr<Packet>(packet.release()));
222 if (buffer_.size() == 1) {
223 next_send_ = std::max(
224 clock_->NowTicks() +
225 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_),
226 next_send_);
227 ProcessBuffer();
230 void InitOnIOThread(
231 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
232 base::TickClock* clock) final {
233 PacketPipe::InitOnIOThread(task_runner, clock);
234 // As we start the stream, assume that we are in a random
235 // place between two extra delays, thus multiplier = 1.0;
236 ScheduleExtraDelay(1.0);
239 private:
240 void ScheduleExtraDelay(double mult) {
241 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
242 int64 microseconds = static_cast<int64>(seconds * 1E6);
243 task_runner_->PostDelayedTask(
244 FROM_HERE,
245 base::Bind(&RandomSortedDelay::CauseExtraDelay,
246 weak_factory_.GetWeakPtr()),
247 base::TimeDelta::FromMicroseconds(microseconds));
250 void CauseExtraDelay() {
251 next_send_ = std::max<base::TimeTicks>(
252 clock_->NowTicks() +
253 base::TimeDelta::FromMicroseconds(
254 static_cast<int64>(extra_delay_ * 1E6)),
255 next_send_);
256 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
257 // before scheduling another one to make the average equal to
258 // seconds_between_extra_delay_.
259 ScheduleExtraDelay(2.0);
262 void ProcessBuffer() {
263 base::TimeTicks now = clock_->NowTicks();
264 while (!buffer_.empty() && next_send_ <= now) {
265 scoped_ptr<Packet> packet(buffer_.front().release());
266 pipe_->Send(packet.Pass());
267 buffer_.pop_front();
269 next_send_ += base::TimeDelta::FromSecondsD(
270 base::RandDouble() * random_delay_);
273 if (!buffer_.empty()) {
274 task_runner_->PostDelayedTask(
275 FROM_HERE,
276 base::Bind(&RandomSortedDelay::ProcessBuffer,
277 weak_factory_.GetWeakPtr()),
278 next_send_ - now);
282 base::TimeTicks block_until_;
283 std::deque<linked_ptr<Packet> > buffer_;
284 double random_delay_;
285 double extra_delay_;
286 double seconds_between_extra_delay_;
287 base::TimeTicks next_send_;
288 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
291 scoped_ptr<PacketPipe> NewRandomSortedDelay(
292 double random_delay,
293 double extra_delay,
294 double seconds_between_extra_delay) {
295 return scoped_ptr<PacketPipe>(
296 new RandomSortedDelay(
297 random_delay, extra_delay, seconds_between_extra_delay))
298 .Pass();
301 class NetworkGlitchPipe : public PacketPipe {
302 public:
303 NetworkGlitchPipe(double average_work_time, double average_outage_time)
304 : works_(false),
305 max_work_time_(average_work_time * 2),
306 max_outage_time_(average_outage_time * 2),
307 weak_factory_(this) {}
309 void InitOnIOThread(
310 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
311 base::TickClock* clock) final {
312 PacketPipe::InitOnIOThread(task_runner, clock);
313 Flip();
316 void Send(scoped_ptr<Packet> packet) final {
317 if (works_) {
318 pipe_->Send(packet.Pass());
322 private:
323 void Flip() {
324 works_ = !works_;
325 double seconds = base::RandDouble() *
326 (works_ ? max_work_time_ : max_outage_time_);
327 int64 microseconds = static_cast<int64>(seconds * 1E6);
328 task_runner_->PostDelayedTask(
329 FROM_HERE,
330 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
331 base::TimeDelta::FromMicroseconds(microseconds));
334 bool works_;
335 double max_work_time_;
336 double max_outage_time_;
337 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
340 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
341 double average_outage_time) {
342 return scoped_ptr<PacketPipe>(
343 new NetworkGlitchPipe(average_work_time, average_outage_time))
344 .Pass();
348 // Internal buffer object for a client of the IPP model.
349 class InterruptedPoissonProcess::InternalBuffer : public PacketPipe {
350 public:
351 InternalBuffer(base::WeakPtr<InterruptedPoissonProcess> ipp,
352 size_t size)
353 : ipp_(ipp),
354 stored_size_(0),
355 stored_limit_(size),
356 clock_(NULL),
357 weak_factory_(this) {
360 void Send(scoped_ptr<Packet> packet) final {
361 // Drop if buffer is full.
362 if (stored_size_ >= stored_limit_)
363 return;
364 stored_size_ += packet->size();
365 buffer_.push_back(linked_ptr<Packet>(packet.release()));
366 buffer_time_.push_back(clock_->NowTicks());
367 DCHECK(buffer_.size() == buffer_time_.size());
370 void InitOnIOThread(
371 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
372 base::TickClock* clock) final {
373 clock_ = clock;
374 if (ipp_)
375 ipp_->InitOnIOThread(task_runner, clock);
376 PacketPipe::InitOnIOThread(task_runner, clock);
379 void SendOnePacket() {
380 scoped_ptr<Packet> packet(buffer_.front().release());
381 stored_size_ -= packet->size();
382 buffer_.pop_front();
383 buffer_time_.pop_front();
384 pipe_->Send(packet.Pass());
385 DCHECK(buffer_.size() == buffer_time_.size());
388 bool Empty() const {
389 return buffer_.empty();
392 base::TimeTicks FirstPacketTime() const {
393 DCHECK(!buffer_time_.empty());
394 return buffer_time_.front();
397 base::WeakPtr<InternalBuffer> GetWeakPtr() {
398 return weak_factory_.GetWeakPtr();
402 private:
403 const base::WeakPtr<InterruptedPoissonProcess> ipp_;
404 size_t stored_size_;
405 const size_t stored_limit_;
406 std::deque<linked_ptr<Packet> > buffer_;
407 std::deque<base::TimeTicks> buffer_time_;
408 base::TickClock* clock_;
409 base::WeakPtrFactory<InternalBuffer> weak_factory_;
411 DISALLOW_COPY_AND_ASSIGN(InternalBuffer);
414 InterruptedPoissonProcess::InterruptedPoissonProcess(
415 const std::vector<double>& average_rates,
416 double coef_burstiness,
417 double coef_variance,
418 uint32 rand_seed)
419 : clock_(NULL),
420 average_rates_(average_rates),
421 coef_burstiness_(coef_burstiness),
422 coef_variance_(coef_variance),
423 rate_index_(0),
424 on_state_(true),
425 weak_factory_(this) {
426 mt_rand_.init_genrand(rand_seed);
427 DCHECK(!average_rates.empty());
428 ComputeRates();
431 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
434 void InterruptedPoissonProcess::InitOnIOThread(
435 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
436 base::TickClock* clock) {
437 // Already initialized and started.
438 if (task_runner_.get() && clock_)
439 return;
440 task_runner_ = task_runner;
441 clock_ = clock;
442 UpdateRates();
443 SwitchOn();
444 SendPacket();
447 scoped_ptr<PacketPipe> InterruptedPoissonProcess::NewBuffer(size_t size) {
448 scoped_ptr<InternalBuffer> buffer(
449 new InternalBuffer(weak_factory_.GetWeakPtr(), size));
450 send_buffers_.push_back(buffer->GetWeakPtr());
451 return buffer.Pass();
454 base::TimeDelta InterruptedPoissonProcess::NextEvent(double rate) {
455 // Rate is per milliseconds.
456 // The time until next event is exponentially distributed to the
457 // inverse of |rate|.
458 return base::TimeDelta::FromMillisecondsD(
459 fabs(-log(1.0 - RandDouble()) / rate));
462 double InterruptedPoissonProcess::RandDouble() {
463 // Generate a 64-bits random number from MT19937 and then convert
464 // it to double.
465 uint64 rand = mt_rand_.genrand_int32();
466 rand <<= 32;
467 rand |= mt_rand_.genrand_int32();
468 return base::BitsToOpenEndedUnitInterval(rand);
471 void InterruptedPoissonProcess::ComputeRates() {
472 double avg_rate = average_rates_[rate_index_];
474 send_rate_ = avg_rate / coef_burstiness_;
475 switch_off_rate_ =
476 2 * avg_rate * (1 - coef_burstiness_) * (1 - coef_burstiness_) /
477 coef_burstiness_ / (coef_variance_ - 1);
478 switch_on_rate_ =
479 2 * avg_rate * (1 - coef_burstiness_) / (coef_variance_ - 1);
482 void InterruptedPoissonProcess::UpdateRates() {
483 ComputeRates();
485 // Rates are updated once per second.
486 rate_index_ = (rate_index_ + 1) % average_rates_.size();
487 task_runner_->PostDelayedTask(
488 FROM_HERE,
489 base::Bind(&InterruptedPoissonProcess::UpdateRates,
490 weak_factory_.GetWeakPtr()),
491 base::TimeDelta::FromSeconds(1));
494 void InterruptedPoissonProcess::SwitchOff() {
495 on_state_ = false;
496 task_runner_->PostDelayedTask(
497 FROM_HERE,
498 base::Bind(&InterruptedPoissonProcess::SwitchOn,
499 weak_factory_.GetWeakPtr()),
500 NextEvent(switch_on_rate_));
503 void InterruptedPoissonProcess::SwitchOn() {
504 on_state_ = true;
505 task_runner_->PostDelayedTask(
506 FROM_HERE,
507 base::Bind(&InterruptedPoissonProcess::SwitchOff,
508 weak_factory_.GetWeakPtr()),
509 NextEvent(switch_off_rate_));
512 void InterruptedPoissonProcess::SendPacket() {
513 task_runner_->PostDelayedTask(
514 FROM_HERE,
515 base::Bind(&InterruptedPoissonProcess::SendPacket,
516 weak_factory_.GetWeakPtr()),
517 NextEvent(send_rate_));
519 // If OFF then don't send.
520 if (!on_state_)
521 return;
523 // Find the earliest packet to send.
524 base::TimeTicks earliest_time;
525 for (size_t i = 0; i < send_buffers_.size(); ++i) {
526 if (!send_buffers_[i])
527 continue;
528 if (send_buffers_[i]->Empty())
529 continue;
530 if (earliest_time.is_null() ||
531 send_buffers_[i]->FirstPacketTime() < earliest_time)
532 earliest_time = send_buffers_[i]->FirstPacketTime();
534 for (size_t i = 0; i < send_buffers_.size(); ++i) {
535 if (!send_buffers_[i])
536 continue;
537 if (send_buffers_[i]->Empty())
538 continue;
539 if (send_buffers_[i]->FirstPacketTime() != earliest_time)
540 continue;
541 send_buffers_[i]->SendOnePacket();
542 break;
546 class UDPProxyImpl;
548 class PacketSender : public PacketPipe {
549 public:
550 PacketSender(UDPProxyImpl* udp_proxy, const net::IPEndPoint* destination)
551 : udp_proxy_(udp_proxy), destination_(destination) {}
552 void Send(scoped_ptr<Packet> packet) final;
553 void AppendToPipe(scoped_ptr<PacketPipe> pipe) final { NOTREACHED(); }
555 private:
556 UDPProxyImpl* udp_proxy_;
557 const net::IPEndPoint* destination_; // not owned
560 namespace {
561 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
562 if (*pipe) {
563 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
564 } else {
565 pipe->reset(next);
568 } // namespace
570 scoped_ptr<PacketPipe> GoodNetwork() {
571 // This represents the buffer on the sender.
572 scoped_ptr<PacketPipe> pipe;
573 BuildPipe(&pipe, new Buffer(2 << 20, 50));
574 BuildPipe(&pipe, new ConstantDelay(1E-3));
575 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 2E-3, 3));
576 // This represents the buffer on the receiving device.
577 BuildPipe(&pipe, new Buffer(2 << 20, 50));
578 return pipe.Pass();
581 scoped_ptr<PacketPipe> WifiNetwork() {
582 // This represents the buffer on the sender.
583 scoped_ptr<PacketPipe> pipe;
584 BuildPipe(&pipe, new Buffer(256 << 10, 20));
585 BuildPipe(&pipe, new RandomDrop(0.005));
586 // This represents the buffer on the router.
587 BuildPipe(&pipe, new ConstantDelay(1E-3));
588 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
589 BuildPipe(&pipe, new Buffer(256 << 10, 20));
590 BuildPipe(&pipe, new ConstantDelay(1E-3));
591 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
592 BuildPipe(&pipe, new RandomDrop(0.005));
593 // This represents the buffer on the receiving device.
594 BuildPipe(&pipe, new Buffer(256 << 10, 20));
595 return pipe.Pass();
598 scoped_ptr<PacketPipe> BadNetwork() {
599 scoped_ptr<PacketPipe> pipe;
600 // This represents the buffer on the sender.
601 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
602 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
603 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
604 // This represents the buffer on the router.
605 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
606 BuildPipe(&pipe, new ConstantDelay(1E-3));
607 // Random 40ms every other second
608 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
609 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
610 // This represents the buffer on the receiving device.
611 BuildPipe(&pipe, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
612 return pipe.Pass();
616 scoped_ptr<PacketPipe> EvilNetwork() {
617 // This represents the buffer on the sender.
618 scoped_ptr<PacketPipe> pipe;
619 BuildPipe(&pipe, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
620 // This represents the buffer on the router.
621 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
622 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
623 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
624 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
625 BuildPipe(&pipe, new ConstantDelay(1E-3));
626 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
627 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
628 // This represents the buffer on the receiving device.
629 BuildPipe(&pipe, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
630 return pipe.Pass();
633 scoped_ptr<InterruptedPoissonProcess> DefaultInterruptedPoissonProcess() {
634 // The following values are taken from a session reported from a user.
635 // They are experimentally tested to demonstrate challenging network
636 // conditions. The average bitrate is about 2mbits/s.
638 // Each element in this vector is the average number of packets sent
639 // per millisecond. The average changes and rotates every second.
640 std::vector<double> average_rates;
641 average_rates.push_back(0.609);
642 average_rates.push_back(0.495);
643 average_rates.push_back(0.561);
644 average_rates.push_back(0.458);
645 average_rates.push_back(0.538);
646 average_rates.push_back(0.513);
647 average_rates.push_back(0.585);
648 average_rates.push_back(0.592);
649 average_rates.push_back(0.658);
650 average_rates.push_back(0.556);
651 average_rates.push_back(0.371);
652 average_rates.push_back(0.595);
653 average_rates.push_back(0.490);
654 average_rates.push_back(0.980);
655 average_rates.push_back(0.781);
656 average_rates.push_back(0.463);
658 const double burstiness = 0.609;
659 const double variance = 4.1;
661 scoped_ptr<InterruptedPoissonProcess> ipp(
662 new InterruptedPoissonProcess(
663 average_rates, burstiness, variance, 0));
664 return ipp.Pass();
667 class UDPProxyImpl : public UDPProxy {
668 public:
669 UDPProxyImpl(const net::IPEndPoint& local_port,
670 const net::IPEndPoint& destination,
671 scoped_ptr<PacketPipe> to_dest_pipe,
672 scoped_ptr<PacketPipe> from_dest_pipe,
673 net::NetLog* net_log)
674 : local_port_(local_port),
675 destination_(destination),
676 destination_is_mutable_(destination.address().empty()),
677 proxy_thread_("media::cast::test::UdpProxy Thread"),
678 to_dest_pipe_(to_dest_pipe.Pass()),
679 from_dest_pipe_(from_dest_pipe.Pass()),
680 blocked_(false),
681 weak_factory_(this) {
682 proxy_thread_.StartWithOptions(
683 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
684 base::WaitableEvent start_event(false, false);
685 proxy_thread_.task_runner()->PostTask(
686 FROM_HERE,
687 base::Bind(&UDPProxyImpl::Start,
688 base::Unretained(this),
689 base::Unretained(&start_event),
690 net_log));
691 start_event.Wait();
694 ~UDPProxyImpl() final {
695 base::WaitableEvent stop_event(false, false);
696 proxy_thread_.task_runner()->PostTask(
697 FROM_HERE,
698 base::Bind(&UDPProxyImpl::Stop,
699 base::Unretained(this),
700 base::Unretained(&stop_event)));
701 stop_event.Wait();
702 proxy_thread_.Stop();
705 void Send(scoped_ptr<Packet> packet,
706 const net::IPEndPoint& destination) {
707 if (blocked_) {
708 LOG(ERROR) << "Cannot write packet right now: blocked";
709 return;
712 VLOG(1) << "Sending packet, len = " << packet->size();
713 // We ignore all problems, callbacks and errors.
714 // If it didn't work we just drop the packet at and call it a day.
715 scoped_refptr<net::IOBuffer> buf =
716 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
717 size_t buf_size = packet->size();
718 int result;
719 if (destination.address().empty()) {
720 VLOG(1) << "Destination has not been set yet.";
721 result = net::ERR_INVALID_ARGUMENT;
722 } else {
723 VLOG(1) << "Destination:" << destination.ToString();
724 result = socket_->SendTo(buf.get(),
725 static_cast<int>(buf_size),
726 destination,
727 base::Bind(&UDPProxyImpl::AllowWrite,
728 weak_factory_.GetWeakPtr(),
729 buf,
730 base::Passed(&packet)));
732 if (result == net::ERR_IO_PENDING) {
733 blocked_ = true;
734 } else if (result < 0) {
735 LOG(ERROR) << "Failed to write packet.";
739 private:
740 void Start(base::WaitableEvent* start_event,
741 net::NetLog* net_log) {
742 socket_.reset(new net::UDPServerSocket(net_log, net::NetLog::Source()));
743 BuildPipe(&to_dest_pipe_, new PacketSender(this, &destination_));
744 BuildPipe(&from_dest_pipe_, new PacketSender(this, &return_address_));
745 to_dest_pipe_->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
746 &tick_clock_);
747 from_dest_pipe_->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
748 &tick_clock_);
750 VLOG(0) << "From:" << local_port_.ToString();
751 if (!destination_is_mutable_)
752 VLOG(0) << "To:" << destination_.ToString();
754 CHECK_GE(socket_->Listen(local_port_), 0);
756 start_event->Signal();
757 PollRead();
760 void Stop(base::WaitableEvent* stop_event) {
761 to_dest_pipe_.reset(NULL);
762 from_dest_pipe_.reset(NULL);
763 socket_.reset(NULL);
764 stop_event->Signal();
767 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
768 DCHECK_NE(len, net::ERR_IO_PENDING);
769 VLOG(1) << "Got packet, len = " << len;
770 if (len < 0) {
771 LOG(WARNING) << "Socket read error: " << len;
772 return;
774 packet_->resize(len);
775 if (destination_is_mutable_ && set_destination_next_ &&
776 !(recv_address_ == return_address_) &&
777 !(recv_address_ == destination_)) {
778 destination_ = recv_address_;
780 if (recv_address_ == destination_) {
781 set_destination_next_ = false;
782 from_dest_pipe_->Send(packet_.Pass());
783 } else {
784 set_destination_next_ = true;
785 VLOG(1) << "Return address = " << recv_address_.ToString();
786 return_address_ = recv_address_;
787 to_dest_pipe_->Send(packet_.Pass());
791 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
792 ProcessPacket(recv_buf, len);
793 PollRead();
796 void PollRead() {
797 while (true) {
798 packet_.reset(new Packet(kMaxPacketSize));
799 scoped_refptr<net::IOBuffer> recv_buf =
800 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
801 int len = socket_->RecvFrom(
802 recv_buf.get(),
803 kMaxPacketSize,
804 &recv_address_,
805 base::Bind(
806 &UDPProxyImpl::ReadCallback, base::Unretained(this), recv_buf));
807 if (len == net::ERR_IO_PENDING)
808 break;
809 ProcessPacket(recv_buf, len);
813 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
814 scoped_ptr<Packet> packet,
815 int unused_len) {
816 DCHECK(blocked_);
817 blocked_ = false;
820 // Input
821 net::IPEndPoint local_port_;
823 net::IPEndPoint destination_;
824 bool destination_is_mutable_;
826 net::IPEndPoint return_address_;
827 bool set_destination_next_;
829 base::DefaultTickClock tick_clock_;
830 base::Thread proxy_thread_;
831 scoped_ptr<net::UDPServerSocket> socket_;
832 scoped_ptr<PacketPipe> to_dest_pipe_;
833 scoped_ptr<PacketPipe> from_dest_pipe_;
835 // For receiving.
836 net::IPEndPoint recv_address_;
837 scoped_ptr<Packet> packet_;
839 // For sending.
840 bool blocked_;
842 base::WeakPtrFactory<UDPProxyImpl> weak_factory_;
845 void PacketSender::Send(scoped_ptr<Packet> packet) {
846 udp_proxy_->Send(packet.Pass(), *destination_);
849 scoped_ptr<UDPProxy> UDPProxy::Create(
850 const net::IPEndPoint& local_port,
851 const net::IPEndPoint& destination,
852 scoped_ptr<PacketPipe> to_dest_pipe,
853 scoped_ptr<PacketPipe> from_dest_pipe,
854 net::NetLog* net_log) {
855 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
856 destination,
857 to_dest_pipe.Pass(),
858 from_dest_pipe.Pass(),
859 net_log));
860 return ret.Pass();
863 } // namespace test
864 } // namespace cast
865 } // namespace media