Ignore title parameter for navigator.registerProtocolHandler
[chromium-blink-merge.git] / media / cast / test / utility / udp_proxy.cc
blob113281528d8c81345dfa386cec4d5077d78c6864
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/test/utility/udp_proxy.h"
7 #include "base/logging.h"
8 #include "base/memory/linked_ptr.h"
9 #include "base/rand_util.h"
10 #include "base/synchronization/waitable_event.h"
11 #include "base/threading/thread.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/udp/udp_socket.h"
16 namespace media {
17 namespace cast {
18 namespace test {
20 const size_t kMaxPacketSize = 65536;
22 PacketPipe::PacketPipe() {}
23 PacketPipe::~PacketPipe() {}
24 void PacketPipe::InitOnIOThread(
25 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) {
26 task_runner_ = task_runner;
27 if (pipe_) {
28 pipe_->InitOnIOThread(task_runner);
31 void PacketPipe::AppendToPipe(scoped_ptr<PacketPipe> pipe) {
32 if (pipe_) {
33 pipe_->AppendToPipe(pipe.Pass());
34 } else {
35 pipe_ = pipe.Pass();
39 // Roughly emulates a buffer inside a device.
40 // If the buffer is full, packets are dropped.
41 // Packets are output at a maximum bandwidth.
42 class Buffer : public PacketPipe {
43 public:
44 Buffer(size_t buffer_size, double max_megabits_per_second)
45 : buffer_size_(0),
46 max_buffer_size_(buffer_size),
47 max_megabits_per_second_(max_megabits_per_second),
48 weak_factory_(this) {}
50 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
51 if (packet->size() + buffer_size_ <= max_buffer_size_) {
52 buffer_size_ += packet->size();
53 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
54 if (buffer_.size() == 1) {
55 Schedule();
60 private:
61 void Schedule() {
62 double megabits = buffer_.front()->size() * 8 / 1000000.0;
63 double seconds = megabits / max_megabits_per_second_;
64 int64 microseconds = static_cast<int64>(seconds * 1E6);
65 task_runner_->PostDelayedTask(
66 FROM_HERE,
67 base::Bind(&Buffer::ProcessBuffer, weak_factory_.GetWeakPtr()),
68 base::TimeDelta::FromMicroseconds(microseconds));
71 void ProcessBuffer() {
72 CHECK(!buffer_.empty());
73 scoped_ptr<transport::Packet> packet(buffer_.front().release());
74 buffer_size_ -= packet->size();
75 buffer_.pop_front();
76 pipe_->Send(packet.Pass());
77 if (!buffer_.empty()) {
78 Schedule();
82 std::deque<linked_ptr<transport::Packet> > buffer_;
83 size_t buffer_size_;
84 size_t max_buffer_size_;
85 double max_megabits_per_second_; // megabits per second
86 base::WeakPtrFactory<Buffer> weak_factory_;
89 scoped_ptr<PacketPipe> NewBuffer(size_t buffer_size, double bandwidth) {
90 return scoped_ptr<PacketPipe>(new Buffer(buffer_size, bandwidth)).Pass();
93 class RandomDrop : public PacketPipe {
94 public:
95 RandomDrop(double drop_fraction) : drop_fraction_(drop_fraction) {
98 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
99 if (base::RandDouble() >= drop_fraction_) {
100 pipe_->Send(packet.Pass());
104 private:
105 double drop_fraction_;
108 scoped_ptr<PacketPipe> NewRandomDrop(double drop_fraction) {
109 return scoped_ptr<PacketPipe>(new RandomDrop(drop_fraction)).Pass();
112 class SimpleDelayBase : public PacketPipe {
113 public:
114 SimpleDelayBase() : weak_factory_(this) {}
115 virtual ~SimpleDelayBase() {}
117 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
118 double seconds = GetDelay();
119 task_runner_->PostDelayedTask(
120 FROM_HERE,
121 base::Bind(&SimpleDelayBase::SendInternal,
122 weak_factory_.GetWeakPtr(),
123 base::Passed(&packet)),
124 base::TimeDelta::FromMicroseconds(static_cast<int64>(seconds * 1E6)));
126 protected:
127 virtual double GetDelay() = 0;
129 private:
130 virtual void SendInternal(scoped_ptr<transport::Packet> packet) {
131 pipe_->Send(packet.Pass());
134 base::WeakPtrFactory<SimpleDelayBase> weak_factory_;
137 class ConstantDelay : public SimpleDelayBase {
138 public:
139 ConstantDelay(double delay_seconds) : delay_seconds_(delay_seconds) {}
140 virtual double GetDelay() OVERRIDE {
141 return delay_seconds_;
144 private:
145 double delay_seconds_;
148 scoped_ptr<PacketPipe> NewConstantDelay(double delay_seconds) {
149 return scoped_ptr<PacketPipe>(new ConstantDelay(delay_seconds)).Pass();
152 class RandomUnsortedDelay : public SimpleDelayBase {
153 public:
154 RandomUnsortedDelay(double random_delay) : random_delay_(random_delay) {}
156 virtual double GetDelay() OVERRIDE {
157 return random_delay_ * base::RandDouble();
160 private:
161 double random_delay_;
164 scoped_ptr<PacketPipe> NewRandomUnsortedDelay(double random_delay) {
165 return scoped_ptr<PacketPipe>(new RandomUnsortedDelay(random_delay)).Pass();
169 class RandomSortedDelay : public PacketPipe {
170 public:
171 RandomSortedDelay(double random_delay,
172 double extra_delay,
173 double seconds_between_extra_delay)
174 : random_delay_(random_delay),
175 extra_delay_(extra_delay),
176 seconds_between_extra_delay_(seconds_between_extra_delay),
177 weak_factory_(this) {}
179 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
180 buffer_.push_back(linked_ptr<transport::Packet>(packet.release()));
181 if (buffer_.size() == 1) {
182 Schedule();
185 virtual void InitOnIOThread(
186 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) OVERRIDE {
187 PacketPipe::InitOnIOThread(task_runner);
188 // As we start the stream, assume that we are in a random
189 // place between two extra delays, thus multiplier = 1.0;
190 ScheduleExtraDelay(1.0);
193 private:
194 void ScheduleExtraDelay(double mult) {
195 double seconds = seconds_between_extra_delay_ * mult * base::RandDouble();
196 int64 microseconds = static_cast<int64>(seconds * 1E6);
197 task_runner_->PostDelayedTask(
198 FROM_HERE,
199 base::Bind(&RandomSortedDelay::CauseExtraDelay,
200 weak_factory_.GetWeakPtr()),
201 base::TimeDelta::FromMicroseconds(microseconds));
204 void CauseExtraDelay() {
205 block_until_ = base::TimeTicks::Now() +
206 base::TimeDelta::FromMicroseconds(
207 static_cast<int64>(extra_delay_ * 1E6));
208 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
209 // before scheduling another one to make the average equal to
210 // seconds_between_extra_delay_.
211 ScheduleExtraDelay(2.0);
214 void Schedule() {
215 double seconds = base::RandDouble() * random_delay_;
216 base::TimeDelta block_time = block_until_ - base::TimeTicks::Now();
217 base::TimeDelta delay_time =
218 base::TimeDelta::FromMicroseconds(
219 static_cast<int64>(seconds * 1E6));
220 if (block_time > delay_time) {
221 block_time = delay_time;
224 task_runner_->PostDelayedTask(FROM_HERE,
225 base::Bind(&RandomSortedDelay::ProcessBuffer,
226 weak_factory_.GetWeakPtr()),
227 delay_time);
230 void ProcessBuffer() {
231 CHECK(!buffer_.empty());
232 scoped_ptr<transport::Packet> packet(buffer_.front().release());
233 pipe_->Send(packet.Pass());
234 buffer_.pop_front();
235 if (!buffer_.empty()) {
236 Schedule();
240 base::TimeTicks block_until_;
241 std::deque<linked_ptr<transport::Packet> > buffer_;
242 double random_delay_;
243 double extra_delay_;
244 double seconds_between_extra_delay_;
245 base::WeakPtrFactory<RandomSortedDelay> weak_factory_;
248 scoped_ptr<PacketPipe> NewRandomSortedDelay(
249 double random_delay,
250 double extra_delay,
251 double seconds_between_extra_delay) {
252 return scoped_ptr<PacketPipe>(
253 new RandomSortedDelay(
254 random_delay, extra_delay, seconds_between_extra_delay))
255 .Pass();
258 class NetworkGlitchPipe : public PacketPipe {
259 public:
260 NetworkGlitchPipe(double average_work_time, double average_outage_time)
261 : works_(false),
262 max_work_time_(average_work_time * 2),
263 max_outage_time_(average_outage_time * 2),
264 weak_factory_(this) {}
266 virtual void InitOnIOThread(
267 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner) OVERRIDE {
268 PacketPipe::InitOnIOThread(task_runner);
269 Flip();
272 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
273 if (works_) {
274 pipe_->Send(packet.Pass());
278 private:
279 void Flip() {
280 works_ = !works_;
281 double seconds = base::RandDouble() *
282 (works_ ? max_work_time_ : max_outage_time_);
283 int64 microseconds = static_cast<int64>(seconds * 1E6);
284 task_runner_->PostDelayedTask(
285 FROM_HERE,
286 base::Bind(&NetworkGlitchPipe::Flip, weak_factory_.GetWeakPtr()),
287 base::TimeDelta::FromMicroseconds(microseconds));
290 bool works_;
291 double max_work_time_;
292 double max_outage_time_;
293 base::WeakPtrFactory<NetworkGlitchPipe> weak_factory_;
296 scoped_ptr<PacketPipe> NewNetworkGlitchPipe(double average_work_time,
297 double average_outage_time) {
298 return scoped_ptr<PacketPipe>(
299 new NetworkGlitchPipe(average_work_time, average_outage_time))
300 .Pass();
303 class PacketSender : public PacketPipe {
304 public:
305 PacketSender(net::UDPSocket* udp_socket,
306 const net::IPEndPoint* destination) :
307 blocked_(false),
308 udp_socket_(udp_socket),
309 destination_(destination),
310 weak_factory_(this) {
312 virtual void Send(scoped_ptr<transport::Packet> packet) OVERRIDE {
313 if (blocked_) {
314 LOG(ERROR) << "Cannot write packet right now: blocked";
315 return;
318 VLOG(1) << "Sending packet, len = " << packet->size();
319 // We ignore all problems, callbacks and errors.
320 // If it didn't work we just drop the packet at and call it a day.
321 scoped_refptr<net::IOBuffer> buf =
322 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->front()));
323 size_t buf_size = packet->size();
324 int result;
325 if (destination_->address().empty()) {
326 VLOG(1) << "Destination has not been set yet.";
327 result = net::ERR_INVALID_ARGUMENT;
328 } else {
329 VLOG(1) << "Destination:" << destination_->ToString();
330 result = udp_socket_->SendTo(buf,
331 static_cast<int>(buf_size),
332 *destination_,
333 base::Bind(&PacketSender::AllowWrite,
334 weak_factory_.GetWeakPtr(),
335 buf,
336 base::Passed(&packet)));
338 if (result == net::ERR_IO_PENDING) {
339 blocked_ = true;
340 } else if (result < 0) {
341 LOG(ERROR) << "Failed to write packet.";
344 virtual void AppendToPipe(scoped_ptr<PacketPipe> pipe) OVERRIDE {
345 NOTREACHED();
348 private:
349 void AllowWrite(scoped_refptr<net::IOBuffer> buf,
350 scoped_ptr<transport::Packet> packet,
351 int unused_len) {
352 DCHECK(blocked_);
353 blocked_ = false;
355 bool blocked_;
356 net::UDPSocket* udp_socket_;
357 const net::IPEndPoint* destination_; // not owned
358 base::WeakPtrFactory<PacketSender> weak_factory_;
361 namespace {
362 void BuildPipe(scoped_ptr<PacketPipe>* pipe, PacketPipe* next) {
363 if (*pipe) {
364 (*pipe)->AppendToPipe(scoped_ptr<PacketPipe>(next).Pass());
365 } else {
366 pipe->reset(next);
369 } // namespace
371 scoped_ptr<PacketPipe> WifiNetwork() {
372 // This represents the buffer on the sender.
373 scoped_ptr<PacketPipe> pipe;
374 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
375 BuildPipe(&pipe, new RandomDrop(0.005));
376 // This represents the buffer on the router.
377 BuildPipe(&pipe, new ConstantDelay(1E-3));
378 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
379 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
380 BuildPipe(&pipe, new ConstantDelay(1E-3));
381 BuildPipe(&pipe, new RandomSortedDelay(1E-3, 20E-3, 3));
382 BuildPipe(&pipe, new RandomDrop(0.005));
383 // This represents the buffer on the receiving device.
384 BuildPipe(&pipe, new Buffer(256 << 10, 5000000));
385 return pipe.Pass();
388 scoped_ptr<PacketPipe> BadNetwork() {
389 scoped_ptr<PacketPipe> pipe;
390 // This represents the buffer on the sender.
391 BuildPipe(&pipe, new Buffer(64 << 10, 5000000)); // 64 kb buf, 5mbit/s
392 BuildPipe(&pipe, new RandomDrop(0.05)); // 5% packet drop
393 BuildPipe(&pipe, new RandomSortedDelay(2E-3, 20E-3, 1));
394 // This represents the buffer on the router.
395 BuildPipe(&pipe, new Buffer(64 << 10, 2000000)); // 64 kb buf, 2mbit/s
396 BuildPipe(&pipe, new ConstantDelay(1E-3));
397 // Random 40ms every other second
398 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
399 BuildPipe(&pipe, new RandomUnsortedDelay(5E-3));
400 // This represents the buffer on the receiving device.
401 BuildPipe(&pipe, new Buffer(64 << 10, 4000000)); // 64 kb buf, 4mbit/s
402 return pipe.Pass();
406 scoped_ptr<PacketPipe> EvilNetwork() {
407 // This represents the buffer on the sender.
408 scoped_ptr<PacketPipe> pipe;
409 BuildPipe(&pipe, new Buffer(4 << 10, 2000000));
410 // This represents the buffer on the router.
411 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
412 BuildPipe(&pipe, new RandomSortedDelay(20E-3, 60E-3, 1));
413 BuildPipe(&pipe, new Buffer(4 << 10, 1000000)); // 4 kb buf, 1mbit/s
414 BuildPipe(&pipe, new RandomDrop(0.1)); // 10% packet drop
415 BuildPipe(&pipe, new ConstantDelay(1E-3));
416 BuildPipe(&pipe, new NetworkGlitchPipe(2.0, 0.3));
417 BuildPipe(&pipe, new RandomUnsortedDelay(20E-3));
418 // This represents the buffer on the receiving device.
419 BuildPipe(&pipe, new Buffer(4 << 10, 2000000)); // 4 kb buf, 2mbit/s
420 return pipe.Pass();
423 class UDPProxyImpl : public UDPProxy {
424 public:
425 UDPProxyImpl(const net::IPEndPoint& local_port,
426 const net::IPEndPoint& destination,
427 scoped_ptr<PacketPipe> to_dest_pipe,
428 scoped_ptr<PacketPipe> from_dest_pipe,
429 net::NetLog* net_log) :
430 local_port_(local_port),
431 destination_(destination),
432 proxy_thread_("media::cast::test::UdpProxy Thread"),
433 to_dest_pipe_(to_dest_pipe.Pass()),
434 from_dest_pipe_(from_dest_pipe.Pass()) {
435 proxy_thread_.StartWithOptions(
436 base::Thread::Options(base::MessageLoop::TYPE_IO, 0));
437 base::WaitableEvent start_event(false, false);
438 proxy_thread_.message_loop_proxy()->PostTask(
439 FROM_HERE,
440 base::Bind(&UDPProxyImpl::Start,
441 base::Unretained(this),
442 base::Unretained(&start_event),
443 net_log));
444 start_event.Wait();
447 virtual ~UDPProxyImpl() {
448 base::WaitableEvent stop_event(false, false);
449 proxy_thread_.message_loop_proxy()->PostTask(
450 FROM_HERE,
451 base::Bind(&UDPProxyImpl::Stop,
452 base::Unretained(this),
453 base::Unretained(&stop_event)));
454 stop_event.Wait();
455 proxy_thread_.Stop();
458 private:
459 void Start(base::WaitableEvent* start_event,
460 net::NetLog* net_log) {
461 socket_.reset(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
462 net::RandIntCallback(),
463 net_log,
464 net::NetLog::Source()));
465 BuildPipe(&to_dest_pipe_, new PacketSender(socket_.get(), &destination_));
466 BuildPipe(&from_dest_pipe_,
467 new PacketSender(socket_.get(), &return_address_));
468 to_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current());
469 from_dest_pipe_->InitOnIOThread(base::MessageLoopProxy::current());
471 VLOG(0) << "From:" << local_port_.ToString();
472 VLOG(0) << "To:" << destination_.ToString();
474 CHECK_GE(socket_->Bind(local_port_), 0);
476 start_event->Signal();
477 PollRead();
480 void Stop(base::WaitableEvent* stop_event) {
481 to_dest_pipe_.reset(NULL);
482 from_dest_pipe_.reset(NULL);
483 socket_.reset(NULL);
484 stop_event->Signal();
487 void ProcessPacket(scoped_refptr<net::IOBuffer> recv_buf, int len) {
488 DCHECK_NE(len, net::ERR_IO_PENDING);
489 VLOG(1) << "Got packet, len = " << len;
490 if (len < 0) {
491 LOG(WARNING) << "Socket read error: " << len;
492 return;
494 packet_->resize(len);
495 if (recv_address_ == destination_) {
496 from_dest_pipe_->Send(packet_.Pass());
497 } else {
498 VLOG(1) << "Return address = " << recv_address_.ToString();
499 return_address_ = recv_address_;
500 to_dest_pipe_->Send(packet_.Pass());
504 void ReadCallback(scoped_refptr<net::IOBuffer> recv_buf, int len) {
505 ProcessPacket(recv_buf, len);
506 PollRead();
509 void PollRead() {
510 while (true) {
511 packet_.reset(new transport::Packet(kMaxPacketSize));
512 scoped_refptr<net::IOBuffer> recv_buf =
513 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_->front()));
514 int len = socket_->RecvFrom(
515 recv_buf,
516 kMaxPacketSize,
517 &recv_address_,
518 base::Bind(&UDPProxyImpl::ReadCallback,
519 base::Unretained(this),
520 recv_buf));
521 if (len == net::ERR_IO_PENDING)
522 break;
523 ProcessPacket(recv_buf, len);
528 net::IPEndPoint local_port_;
529 net::IPEndPoint destination_;
530 net::IPEndPoint recv_address_;
531 net::IPEndPoint return_address_;
532 base::Thread proxy_thread_;
533 scoped_ptr<net::UDPSocket> socket_;
534 scoped_ptr<PacketPipe> to_dest_pipe_;
535 scoped_ptr<PacketPipe> from_dest_pipe_;
536 scoped_ptr<transport::Packet> packet_;
539 scoped_ptr<UDPProxy> UDPProxy::Create(
540 const net::IPEndPoint& local_port,
541 const net::IPEndPoint& destination,
542 scoped_ptr<PacketPipe> to_dest_pipe,
543 scoped_ptr<PacketPipe> from_dest_pipe,
544 net::NetLog* net_log) {
545 scoped_ptr<UDPProxy> ret(new UDPProxyImpl(local_port,
546 destination,
547 to_dest_pipe.Pass(),
548 from_dest_pipe.Pass(),
549 net_log));
550 return ret.Pass();
553 } // namespace test
554 } // namespace cast
555 } // namespace media