1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
9 #include "media/cast/test/utility/udp_proxy.h"
11 #include "base/logging.h"
12 #include "base/rand_util.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "base/threading/thread.h"
16 #include "base/time/default_tick_clock.h"
17 #include "net/base/io_buffer.h"
18 #include "net/base/net_errors.h"
19 #include "net/udp/udp_server_socket.h"
25 const size_t kMaxPacketSize
= 65536;
27 PacketPipe::PacketPipe() {}
28 PacketPipe::~PacketPipe() {}
29 void PacketPipe::InitOnIOThread(
30 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
31 base::TickClock
* clock
) {
32 task_runner_
= task_runner
;
35 pipe_
->InitOnIOThread(task_runner
, clock
);
38 void PacketPipe::AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) {
40 pipe_
->AppendToPipe(pipe
.Pass());
46 // Roughly emulates a buffer inside a device.
47 // If the buffer is full, packets are dropped.
48 // Packets are output at a maximum bandwidth.
49 class Buffer
: public PacketPipe
{
51 Buffer(size_t buffer_size
, double max_megabits_per_second
)
53 max_buffer_size_(buffer_size
),
54 max_megabits_per_second_(max_megabits_per_second
),
56 CHECK_GT(max_buffer_size_
, 0UL);
57 CHECK_GT(max_megabits_per_second
, 0);
60 void Send(scoped_ptr
<Packet
> packet
) final
{
61 if (packet
->size() + buffer_size_
<= max_buffer_size_
) {
62 buffer_size_
+= packet
->size();
63 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
64 if (buffer_
.size() == 1) {
72 last_schedule_
= clock_
->NowTicks();
73 double megabits
= buffer_
.front()->size() * 8 / 1000000.0;
74 double seconds
= megabits
/ max_megabits_per_second_
;
75 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
76 task_runner_
->PostDelayedTask(
78 base::Bind(&Buffer::ProcessBuffer
, weak_factory_
.GetWeakPtr()),
79 base::TimeDelta::FromMicroseconds(microseconds
));
82 void ProcessBuffer() {
83 int64 bytes_to_send
= static_cast<int64
>(
84 (clock_
->NowTicks() - last_schedule_
).InSecondsF() *
85 max_megabits_per_second_
* 1E6
/ 8);
86 if (bytes_to_send
< static_cast<int64
>(buffer_
.front()->size())) {
87 bytes_to_send
= buffer_
.front()->size();
89 while (!buffer_
.empty() &&
90 static_cast<int64
>(buffer_
.front()->size()) <= bytes_to_send
) {
91 CHECK(!buffer_
.empty());
92 scoped_ptr
<Packet
> packet(buffer_
.front().release());
93 bytes_to_send
-= packet
->size();
94 buffer_size_
-= packet
->size();
96 pipe_
->Send(packet
.Pass());
98 if (!buffer_
.empty()) {
103 std::deque
<linked_ptr
<Packet
> > buffer_
;
104 base::TimeTicks last_schedule_
;
106 size_t max_buffer_size_
;
107 double max_megabits_per_second_
; // megabits per second
108 base::WeakPtrFactory
<Buffer
> weak_factory_
;
111 scoped_ptr
<PacketPipe
> NewBuffer(size_t buffer_size
, double bandwidth
) {
112 return scoped_ptr
<PacketPipe
>(new Buffer(buffer_size
, bandwidth
)).Pass();
115 class RandomDrop
: public PacketPipe
{
117 RandomDrop(double drop_fraction
)
118 : drop_fraction_(static_cast<int>(drop_fraction
* RAND_MAX
)) {}
120 void Send(scoped_ptr
<Packet
> packet
) final
{
121 if (rand() > drop_fraction_
) {
122 pipe_
->Send(packet
.Pass());
130 scoped_ptr
<PacketPipe
> NewRandomDrop(double drop_fraction
) {
131 return scoped_ptr
<PacketPipe
>(new RandomDrop(drop_fraction
)).Pass();
134 class SimpleDelayBase
: public PacketPipe
{
136 SimpleDelayBase() : weak_factory_(this) {}
137 ~SimpleDelayBase() override
{}
139 void Send(scoped_ptr
<Packet
> packet
) override
{
140 double seconds
= GetDelay();
141 task_runner_
->PostDelayedTask(
143 base::Bind(&SimpleDelayBase::SendInternal
,
144 weak_factory_
.GetWeakPtr(),
145 base::Passed(&packet
)),
146 base::TimeDelta::FromMicroseconds(static_cast<int64
>(seconds
* 1E6
)));
149 virtual double GetDelay() = 0;
152 virtual void SendInternal(scoped_ptr
<Packet
> packet
) {
153 pipe_
->Send(packet
.Pass());
156 base::WeakPtrFactory
<SimpleDelayBase
> weak_factory_
;
159 class ConstantDelay
: public SimpleDelayBase
{
161 ConstantDelay(double delay_seconds
) : delay_seconds_(delay_seconds
) {}
162 double GetDelay() final
{ return delay_seconds_
; }
165 double delay_seconds_
;
168 scoped_ptr
<PacketPipe
> NewConstantDelay(double delay_seconds
) {
169 return scoped_ptr
<PacketPipe
>(new ConstantDelay(delay_seconds
)).Pass();
172 class RandomUnsortedDelay
: public SimpleDelayBase
{
174 RandomUnsortedDelay(double random_delay
) : random_delay_(random_delay
) {}
176 double GetDelay() override
{ return random_delay_
* base::RandDouble(); }
179 double random_delay_
;
182 scoped_ptr
<PacketPipe
> NewRandomUnsortedDelay(double random_delay
) {
183 return scoped_ptr
<PacketPipe
>(new RandomUnsortedDelay(random_delay
)).Pass();
186 class DuplicateAndDelay
: public RandomUnsortedDelay
{
188 DuplicateAndDelay(double delay_min
,
189 double random_delay
) :
190 RandomUnsortedDelay(random_delay
),
191 delay_min_(delay_min
) {
193 void Send(scoped_ptr
<Packet
> packet
) final
{
194 pipe_
->Send(scoped_ptr
<Packet
>(new Packet(*packet
.get())));
195 RandomUnsortedDelay::Send(packet
.Pass());
197 double GetDelay() final
{
198 return RandomUnsortedDelay::GetDelay() + delay_min_
;
204 scoped_ptr
<PacketPipe
> NewDuplicateAndDelay(double delay_min
,
205 double random_delay
) {
206 return scoped_ptr
<PacketPipe
>(
207 new DuplicateAndDelay(delay_min
, random_delay
)).Pass();
210 class RandomSortedDelay
: public PacketPipe
{
212 RandomSortedDelay(double random_delay
,
214 double seconds_between_extra_delay
)
215 : random_delay_(random_delay
),
216 extra_delay_(extra_delay
),
217 seconds_between_extra_delay_(seconds_between_extra_delay
),
218 weak_factory_(this) {}
220 void Send(scoped_ptr
<Packet
> packet
) final
{
221 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
222 if (buffer_
.size() == 1) {
223 next_send_
= std::max(
225 base::TimeDelta::FromSecondsD(base::RandDouble() * random_delay_
),
231 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
232 base::TickClock
* clock
) final
{
233 PacketPipe::InitOnIOThread(task_runner
, clock
);
234 // As we start the stream, assume that we are in a random
235 // place between two extra delays, thus multiplier = 1.0;
236 ScheduleExtraDelay(1.0);
240 void ScheduleExtraDelay(double mult
) {
241 double seconds
= seconds_between_extra_delay_
* mult
* base::RandDouble();
242 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
243 task_runner_
->PostDelayedTask(
245 base::Bind(&RandomSortedDelay::CauseExtraDelay
,
246 weak_factory_
.GetWeakPtr()),
247 base::TimeDelta::FromMicroseconds(microseconds
));
250 void CauseExtraDelay() {
251 next_send_
= std::max
<base::TimeTicks
>(
253 base::TimeDelta::FromMicroseconds(
254 static_cast<int64
>(extra_delay_
* 1E6
)),
256 // An extra delay just happened, wait up to seconds_between_extra_delay_*2
257 // before scheduling another one to make the average equal to
258 // seconds_between_extra_delay_.
259 ScheduleExtraDelay(2.0);
262 void ProcessBuffer() {
263 base::TimeTicks now
= clock_
->NowTicks();
264 while (!buffer_
.empty() && next_send_
<= now
) {
265 scoped_ptr
<Packet
> packet(buffer_
.front().release());
266 pipe_
->Send(packet
.Pass());
269 next_send_
+= base::TimeDelta::FromSecondsD(
270 base::RandDouble() * random_delay_
);
273 if (!buffer_
.empty()) {
274 task_runner_
->PostDelayedTask(
276 base::Bind(&RandomSortedDelay::ProcessBuffer
,
277 weak_factory_
.GetWeakPtr()),
282 base::TimeTicks block_until_
;
283 std::deque
<linked_ptr
<Packet
> > buffer_
;
284 double random_delay_
;
286 double seconds_between_extra_delay_
;
287 base::TimeTicks next_send_
;
288 base::WeakPtrFactory
<RandomSortedDelay
> weak_factory_
;
291 scoped_ptr
<PacketPipe
> NewRandomSortedDelay(
294 double seconds_between_extra_delay
) {
295 return scoped_ptr
<PacketPipe
>(
296 new RandomSortedDelay(
297 random_delay
, extra_delay
, seconds_between_extra_delay
))
301 class NetworkGlitchPipe
: public PacketPipe
{
303 NetworkGlitchPipe(double average_work_time
, double average_outage_time
)
305 max_work_time_(average_work_time
* 2),
306 max_outage_time_(average_outage_time
* 2),
307 weak_factory_(this) {}
310 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
311 base::TickClock
* clock
) final
{
312 PacketPipe::InitOnIOThread(task_runner
, clock
);
316 void Send(scoped_ptr
<Packet
> packet
) final
{
318 pipe_
->Send(packet
.Pass());
325 double seconds
= base::RandDouble() *
326 (works_
? max_work_time_
: max_outage_time_
);
327 int64 microseconds
= static_cast<int64
>(seconds
* 1E6
);
328 task_runner_
->PostDelayedTask(
330 base::Bind(&NetworkGlitchPipe::Flip
, weak_factory_
.GetWeakPtr()),
331 base::TimeDelta::FromMicroseconds(microseconds
));
335 double max_work_time_
;
336 double max_outage_time_
;
337 base::WeakPtrFactory
<NetworkGlitchPipe
> weak_factory_
;
340 scoped_ptr
<PacketPipe
> NewNetworkGlitchPipe(double average_work_time
,
341 double average_outage_time
) {
342 return scoped_ptr
<PacketPipe
>(
343 new NetworkGlitchPipe(average_work_time
, average_outage_time
))
348 // Internal buffer object for a client of the IPP model.
349 class InterruptedPoissonProcess::InternalBuffer
: public PacketPipe
{
351 InternalBuffer(base::WeakPtr
<InterruptedPoissonProcess
> ipp
,
357 weak_factory_(this) {
360 void Send(scoped_ptr
<Packet
> packet
) final
{
361 // Drop if buffer is full.
362 if (stored_size_
>= stored_limit_
)
364 stored_size_
+= packet
->size();
365 buffer_
.push_back(linked_ptr
<Packet
>(packet
.release()));
366 buffer_time_
.push_back(clock_
->NowTicks());
367 DCHECK(buffer_
.size() == buffer_time_
.size());
371 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
372 base::TickClock
* clock
) final
{
375 ipp_
->InitOnIOThread(task_runner
, clock
);
376 PacketPipe::InitOnIOThread(task_runner
, clock
);
379 void SendOnePacket() {
380 scoped_ptr
<Packet
> packet(buffer_
.front().release());
381 stored_size_
-= packet
->size();
383 buffer_time_
.pop_front();
384 pipe_
->Send(packet
.Pass());
385 DCHECK(buffer_
.size() == buffer_time_
.size());
389 return buffer_
.empty();
392 base::TimeTicks
FirstPacketTime() const {
393 DCHECK(!buffer_time_
.empty());
394 return buffer_time_
.front();
397 base::WeakPtr
<InternalBuffer
> GetWeakPtr() {
398 return weak_factory_
.GetWeakPtr();
403 const base::WeakPtr
<InterruptedPoissonProcess
> ipp_
;
405 const size_t stored_limit_
;
406 std::deque
<linked_ptr
<Packet
> > buffer_
;
407 std::deque
<base::TimeTicks
> buffer_time_
;
408 base::TickClock
* clock_
;
409 base::WeakPtrFactory
<InternalBuffer
> weak_factory_
;
411 DISALLOW_COPY_AND_ASSIGN(InternalBuffer
);
414 InterruptedPoissonProcess::InterruptedPoissonProcess(
415 const std::vector
<double>& average_rates
,
416 double coef_burstiness
,
417 double coef_variance
,
420 average_rates_(average_rates
),
421 coef_burstiness_(coef_burstiness
),
422 coef_variance_(coef_variance
),
425 weak_factory_(this) {
426 mt_rand_
.init_genrand(rand_seed
);
427 DCHECK(!average_rates
.empty());
431 InterruptedPoissonProcess::~InterruptedPoissonProcess() {
434 void InterruptedPoissonProcess::InitOnIOThread(
435 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
436 base::TickClock
* clock
) {
437 // Already initialized and started.
438 if (task_runner_
.get() && clock_
)
440 task_runner_
= task_runner
;
447 scoped_ptr
<PacketPipe
> InterruptedPoissonProcess::NewBuffer(size_t size
) {
448 scoped_ptr
<InternalBuffer
> buffer(
449 new InternalBuffer(weak_factory_
.GetWeakPtr(), size
));
450 send_buffers_
.push_back(buffer
->GetWeakPtr());
451 return buffer
.Pass();
454 base::TimeDelta
InterruptedPoissonProcess::NextEvent(double rate
) {
455 // Rate is per milliseconds.
456 // The time until next event is exponentially distributed to the
457 // inverse of |rate|.
458 return base::TimeDelta::FromMillisecondsD(
459 fabs(-log(1.0 - RandDouble()) / rate
));
462 double InterruptedPoissonProcess::RandDouble() {
463 // Generate a 64-bits random number from MT19937 and then convert
465 uint64 rand
= mt_rand_
.genrand_int32();
467 rand
|= mt_rand_
.genrand_int32();
468 return base::BitsToOpenEndedUnitInterval(rand
);
471 void InterruptedPoissonProcess::ComputeRates() {
472 double avg_rate
= average_rates_
[rate_index_
];
474 send_rate_
= avg_rate
/ coef_burstiness_
;
476 2 * avg_rate
* (1 - coef_burstiness_
) * (1 - coef_burstiness_
) /
477 coef_burstiness_
/ (coef_variance_
- 1);
479 2 * avg_rate
* (1 - coef_burstiness_
) / (coef_variance_
- 1);
482 void InterruptedPoissonProcess::UpdateRates() {
485 // Rates are updated once per second.
486 rate_index_
= (rate_index_
+ 1) % average_rates_
.size();
487 task_runner_
->PostDelayedTask(
489 base::Bind(&InterruptedPoissonProcess::UpdateRates
,
490 weak_factory_
.GetWeakPtr()),
491 base::TimeDelta::FromSeconds(1));
494 void InterruptedPoissonProcess::SwitchOff() {
496 task_runner_
->PostDelayedTask(
498 base::Bind(&InterruptedPoissonProcess::SwitchOn
,
499 weak_factory_
.GetWeakPtr()),
500 NextEvent(switch_on_rate_
));
503 void InterruptedPoissonProcess::SwitchOn() {
505 task_runner_
->PostDelayedTask(
507 base::Bind(&InterruptedPoissonProcess::SwitchOff
,
508 weak_factory_
.GetWeakPtr()),
509 NextEvent(switch_off_rate_
));
512 void InterruptedPoissonProcess::SendPacket() {
513 task_runner_
->PostDelayedTask(
515 base::Bind(&InterruptedPoissonProcess::SendPacket
,
516 weak_factory_
.GetWeakPtr()),
517 NextEvent(send_rate_
));
519 // If OFF then don't send.
523 // Find the earliest packet to send.
524 base::TimeTicks earliest_time
;
525 for (size_t i
= 0; i
< send_buffers_
.size(); ++i
) {
526 if (!send_buffers_
[i
])
528 if (send_buffers_
[i
]->Empty())
530 if (earliest_time
.is_null() ||
531 send_buffers_
[i
]->FirstPacketTime() < earliest_time
)
532 earliest_time
= send_buffers_
[i
]->FirstPacketTime();
534 for (size_t i
= 0; i
< send_buffers_
.size(); ++i
) {
535 if (!send_buffers_
[i
])
537 if (send_buffers_
[i
]->Empty())
539 if (send_buffers_
[i
]->FirstPacketTime() != earliest_time
)
541 send_buffers_
[i
]->SendOnePacket();
548 class PacketSender
: public PacketPipe
{
550 PacketSender(UDPProxyImpl
* udp_proxy
, const net::IPEndPoint
* destination
)
551 : udp_proxy_(udp_proxy
), destination_(destination
) {}
552 void Send(scoped_ptr
<Packet
> packet
) final
;
553 void AppendToPipe(scoped_ptr
<PacketPipe
> pipe
) final
{ NOTREACHED(); }
556 UDPProxyImpl
* udp_proxy_
;
557 const net::IPEndPoint
* destination_
; // not owned
561 void BuildPipe(scoped_ptr
<PacketPipe
>* pipe
, PacketPipe
* next
) {
563 (*pipe
)->AppendToPipe(scoped_ptr
<PacketPipe
>(next
).Pass());
570 scoped_ptr
<PacketPipe
> GoodNetwork() {
571 // This represents the buffer on the sender.
572 scoped_ptr
<PacketPipe
> pipe
;
573 BuildPipe(&pipe
, new Buffer(2 << 20, 50));
574 BuildPipe(&pipe
, new ConstantDelay(1E-3));
575 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 2E-3, 3));
576 // This represents the buffer on the receiving device.
577 BuildPipe(&pipe
, new Buffer(2 << 20, 50));
581 scoped_ptr
<PacketPipe
> WifiNetwork() {
582 // This represents the buffer on the sender.
583 scoped_ptr
<PacketPipe
> pipe
;
584 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
585 BuildPipe(&pipe
, new RandomDrop(0.005));
586 // This represents the buffer on the router.
587 BuildPipe(&pipe
, new ConstantDelay(1E-3));
588 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
589 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
590 BuildPipe(&pipe
, new ConstantDelay(1E-3));
591 BuildPipe(&pipe
, new RandomSortedDelay(1E-3, 20E-3, 3));
592 BuildPipe(&pipe
, new RandomDrop(0.005));
593 // This represents the buffer on the receiving device.
594 BuildPipe(&pipe
, new Buffer(256 << 10, 20));
598 scoped_ptr
<PacketPipe
> BadNetwork() {
599 scoped_ptr
<PacketPipe
> pipe
;
600 // This represents the buffer on the sender.
601 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
602 BuildPipe(&pipe
, new RandomDrop(0.05)); // 5% packet drop
603 BuildPipe(&pipe
, new RandomSortedDelay(2E-3, 20E-3, 1));
604 // This represents the buffer on the router.
605 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 4mbit/s
606 BuildPipe(&pipe
, new ConstantDelay(1E-3));
607 // Random 40ms every other second
608 // BuildPipe(&pipe, new NetworkGlitchPipe(2, 40E-1));
609 BuildPipe(&pipe
, new RandomUnsortedDelay(5E-3));
610 // This represents the buffer on the receiving device.
611 BuildPipe(&pipe
, new Buffer(64 << 10, 5)); // 64 kb buf, 5mbit/s
616 scoped_ptr
<PacketPipe
> EvilNetwork() {
617 // This represents the buffer on the sender.
618 scoped_ptr
<PacketPipe
> pipe
;
619 BuildPipe(&pipe
, new Buffer(4 << 10, 5)); // 4 kb buf, 2mbit/s
620 // This represents the buffer on the router.
621 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
622 BuildPipe(&pipe
, new RandomSortedDelay(20E-3, 60E-3, 1));
623 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
624 BuildPipe(&pipe
, new RandomDrop(0.1)); // 10% packet drop
625 BuildPipe(&pipe
, new ConstantDelay(1E-3));
626 BuildPipe(&pipe
, new NetworkGlitchPipe(2.0, 0.3));
627 BuildPipe(&pipe
, new RandomUnsortedDelay(20E-3));
628 // This represents the buffer on the receiving device.
629 BuildPipe(&pipe
, new Buffer(4 << 10, 2)); // 4 kb buf, 2mbit/s
633 scoped_ptr
<InterruptedPoissonProcess
> DefaultInterruptedPoissonProcess() {
634 // The following values are taken from a session reported from a user.
635 // They are experimentally tested to demonstrate challenging network
636 // conditions. The average bitrate is about 2mbits/s.
638 // Each element in this vector is the average number of packets sent
639 // per millisecond. The average changes and rotates every second.
640 std::vector
<double> average_rates
;
641 average_rates
.push_back(0.609);
642 average_rates
.push_back(0.495);
643 average_rates
.push_back(0.561);
644 average_rates
.push_back(0.458);
645 average_rates
.push_back(0.538);
646 average_rates
.push_back(0.513);
647 average_rates
.push_back(0.585);
648 average_rates
.push_back(0.592);
649 average_rates
.push_back(0.658);
650 average_rates
.push_back(0.556);
651 average_rates
.push_back(0.371);
652 average_rates
.push_back(0.595);
653 average_rates
.push_back(0.490);
654 average_rates
.push_back(0.980);
655 average_rates
.push_back(0.781);
656 average_rates
.push_back(0.463);
658 const double burstiness
= 0.609;
659 const double variance
= 4.1;
661 scoped_ptr
<InterruptedPoissonProcess
> ipp(
662 new InterruptedPoissonProcess(
663 average_rates
, burstiness
, variance
, 0));
667 class UDPProxyImpl
: public UDPProxy
{
669 UDPProxyImpl(const net::IPEndPoint
& local_port
,
670 const net::IPEndPoint
& destination
,
671 scoped_ptr
<PacketPipe
> to_dest_pipe
,
672 scoped_ptr
<PacketPipe
> from_dest_pipe
,
673 net::NetLog
* net_log
)
674 : local_port_(local_port
),
675 destination_(destination
),
676 destination_is_mutable_(destination
.address().empty()),
677 proxy_thread_("media::cast::test::UdpProxy Thread"),
678 to_dest_pipe_(to_dest_pipe
.Pass()),
679 from_dest_pipe_(from_dest_pipe
.Pass()),
681 weak_factory_(this) {
682 proxy_thread_
.StartWithOptions(
683 base::Thread::Options(base::MessageLoop::TYPE_IO
, 0));
684 base::WaitableEvent
start_event(false, false);
685 proxy_thread_
.task_runner()->PostTask(
687 base::Bind(&UDPProxyImpl::Start
,
688 base::Unretained(this),
689 base::Unretained(&start_event
),
694 ~UDPProxyImpl() final
{
695 base::WaitableEvent
stop_event(false, false);
696 proxy_thread_
.task_runner()->PostTask(
698 base::Bind(&UDPProxyImpl::Stop
,
699 base::Unretained(this),
700 base::Unretained(&stop_event
)));
702 proxy_thread_
.Stop();
705 void Send(scoped_ptr
<Packet
> packet
,
706 const net::IPEndPoint
& destination
) {
708 LOG(ERROR
) << "Cannot write packet right now: blocked";
712 VLOG(1) << "Sending packet, len = " << packet
->size();
713 // We ignore all problems, callbacks and errors.
714 // If it didn't work we just drop the packet at and call it a day.
715 scoped_refptr
<net::IOBuffer
> buf
=
716 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet
->front()));
717 size_t buf_size
= packet
->size();
719 if (destination
.address().empty()) {
720 VLOG(1) << "Destination has not been set yet.";
721 result
= net::ERR_INVALID_ARGUMENT
;
723 VLOG(1) << "Destination:" << destination
.ToString();
724 result
= socket_
->SendTo(buf
.get(),
725 static_cast<int>(buf_size
),
727 base::Bind(&UDPProxyImpl::AllowWrite
,
728 weak_factory_
.GetWeakPtr(),
730 base::Passed(&packet
)));
732 if (result
== net::ERR_IO_PENDING
) {
734 } else if (result
< 0) {
735 LOG(ERROR
) << "Failed to write packet.";
740 void Start(base::WaitableEvent
* start_event
,
741 net::NetLog
* net_log
) {
742 socket_
.reset(new net::UDPServerSocket(net_log
, net::NetLog::Source()));
743 BuildPipe(&to_dest_pipe_
, new PacketSender(this, &destination_
));
744 BuildPipe(&from_dest_pipe_
, new PacketSender(this, &return_address_
));
745 to_dest_pipe_
->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
747 from_dest_pipe_
->InitOnIOThread(base::ThreadTaskRunnerHandle::Get(),
750 VLOG(0) << "From:" << local_port_
.ToString();
751 if (!destination_is_mutable_
)
752 VLOG(0) << "To:" << destination_
.ToString();
754 CHECK_GE(socket_
->Listen(local_port_
), 0);
756 start_event
->Signal();
760 void Stop(base::WaitableEvent
* stop_event
) {
761 to_dest_pipe_
.reset(NULL
);
762 from_dest_pipe_
.reset(NULL
);
764 stop_event
->Signal();
767 void ProcessPacket(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
768 DCHECK_NE(len
, net::ERR_IO_PENDING
);
769 VLOG(1) << "Got packet, len = " << len
;
771 LOG(WARNING
) << "Socket read error: " << len
;
774 packet_
->resize(len
);
775 if (destination_is_mutable_
&& set_destination_next_
&&
776 !(recv_address_
== return_address_
) &&
777 !(recv_address_
== destination_
)) {
778 destination_
= recv_address_
;
780 if (recv_address_
== destination_
) {
781 set_destination_next_
= false;
782 from_dest_pipe_
->Send(packet_
.Pass());
784 set_destination_next_
= true;
785 VLOG(1) << "Return address = " << recv_address_
.ToString();
786 return_address_
= recv_address_
;
787 to_dest_pipe_
->Send(packet_
.Pass());
791 void ReadCallback(scoped_refptr
<net::IOBuffer
> recv_buf
, int len
) {
792 ProcessPacket(recv_buf
, len
);
798 packet_
.reset(new Packet(kMaxPacketSize
));
799 scoped_refptr
<net::IOBuffer
> recv_buf
=
800 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet_
->front()));
801 int len
= socket_
->RecvFrom(
806 &UDPProxyImpl::ReadCallback
, base::Unretained(this), recv_buf
));
807 if (len
== net::ERR_IO_PENDING
)
809 ProcessPacket(recv_buf
, len
);
813 void AllowWrite(scoped_refptr
<net::IOBuffer
> buf
,
814 scoped_ptr
<Packet
> packet
,
821 net::IPEndPoint local_port_
;
823 net::IPEndPoint destination_
;
824 bool destination_is_mutable_
;
826 net::IPEndPoint return_address_
;
827 bool set_destination_next_
;
829 base::DefaultTickClock tick_clock_
;
830 base::Thread proxy_thread_
;
831 scoped_ptr
<net::UDPServerSocket
> socket_
;
832 scoped_ptr
<PacketPipe
> to_dest_pipe_
;
833 scoped_ptr
<PacketPipe
> from_dest_pipe_
;
836 net::IPEndPoint recv_address_
;
837 scoped_ptr
<Packet
> packet_
;
842 base::WeakPtrFactory
<UDPProxyImpl
> weak_factory_
;
845 void PacketSender::Send(scoped_ptr
<Packet
> packet
) {
846 udp_proxy_
->Send(packet
.Pass(), *destination_
);
849 scoped_ptr
<UDPProxy
> UDPProxy::Create(
850 const net::IPEndPoint
& local_port
,
851 const net::IPEndPoint
& destination
,
852 scoped_ptr
<PacketPipe
> to_dest_pipe
,
853 scoped_ptr
<PacketPipe
> from_dest_pipe
,
854 net::NetLog
* net_log
) {
855 scoped_ptr
<UDPProxy
> ret(new UDPProxyImpl(local_port
,
858 from_dest_pipe
.Pass(),