1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/net/pacing/paced_sender.h"
7 #include "base/big_endian.h"
9 #include "base/debug/dump_without_crashing.h"
10 #include "base/message_loop/message_loop.h"
11 #include "media/cast/logging/logging_impl.h"
18 static const int64 kPacingIntervalMs
= 10;
19 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
21 static const size_t kPacingMaxBurstsPerFrame
= 3;
22 static const size_t kMaxDedupeWindowMs
= 500;
24 // "Impossible" upper-bound on the maximum number of packets that should ever be
25 // enqueued in the pacer. This is used to detect bugs, reported as crash dumps.
26 static const size_t kHugeQueueLengthSeconds
= 10;
27 static const size_t kRidiculousNumberOfPackets
=
28 kHugeQueueLengthSeconds
* (kMaxBurstSize
* 1000 / kPacingIntervalMs
);
32 DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {}
35 PacketKey
PacedPacketSender::MakePacketKey(const base::TimeTicks
& ticks
,
38 return std::make_pair(ticks
, std::make_pair(ssrc
, packet_id
));
41 PacedSender::PacketSendRecord::PacketSendRecord()
42 : last_byte_sent(0), last_byte_sent_for_audio(0) {}
44 PacedSender::PacedSender(
45 size_t target_burst_size
,
46 size_t max_burst_size
,
47 base::TickClock
* clock
,
49 PacketSender
* transport
,
50 const scoped_refptr
<base::SingleThreadTaskRunner
>& transport_task_runner
)
53 transport_(transport
),
54 transport_task_runner_(transport_task_runner
),
57 target_burst_size_(target_burst_size
),
58 max_burst_size_(max_burst_size
),
59 current_max_burst_size_(target_burst_size_
),
60 next_max_burst_size_(target_burst_size_
),
61 next_next_max_burst_size_(target_burst_size_
),
62 current_burst_size_(0),
63 state_(State_Unblocked
),
64 has_reached_upper_bound_once_(false),
68 PacedSender::~PacedSender() {}
70 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc
) {
71 audio_ssrc_
= audio_ssrc
;
74 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc
) {
75 video_ssrc_
= video_ssrc
;
78 void PacedSender::RegisterPrioritySsrc(uint32 ssrc
) {
79 priority_ssrcs_
.push_back(ssrc
);
82 int64
PacedSender::GetLastByteSentForPacket(const PacketKey
& packet_key
) {
83 PacketSendHistory::const_iterator it
= send_history_
.find(packet_key
);
84 if (it
== send_history_
.end())
86 return it
->second
.last_byte_sent
;
89 int64
PacedSender::GetLastByteSentForSsrc(uint32 ssrc
) {
90 std::map
<uint32
, int64
>::const_iterator it
= last_byte_sent_
.find(ssrc
);
91 if (it
== last_byte_sent_
.end())
96 bool PacedSender::SendPackets(const SendPacketVector
& packets
) {
97 if (packets
.empty()) {
100 const bool high_priority
= IsHighPriority(packets
.begin()->first
);
101 for (size_t i
= 0; i
< packets
.size(); i
++) {
102 DCHECK(IsHighPriority(packets
[i
].first
) == high_priority
);
104 priority_packet_list_
[packets
[i
].first
] =
105 make_pair(PacketType_Normal
, packets
[i
].second
);
107 packet_list_
[packets
[i
].first
] =
108 make_pair(PacketType_Normal
, packets
[i
].second
);
111 if (state_
== State_Unblocked
) {
117 bool PacedSender::ShouldResend(const PacketKey
& packet_key
,
118 const DedupInfo
& dedup_info
,
119 const base::TimeTicks
& now
) {
120 PacketSendHistory::const_iterator it
= send_history_
.find(packet_key
);
122 // No history of previous transmission. It might be sent too long ago.
123 if (it
== send_history_
.end())
126 // Suppose there is request to retransmit X and there is an audio
127 // packet Y sent just before X. Reject retransmission of X if ACK for
128 // Y has not been received.
129 // Only do this for video packets.
130 if (packet_key
.second
.first
== video_ssrc_
) {
131 if (dedup_info
.last_byte_acked_for_audio
&&
132 it
->second
.last_byte_sent_for_audio
&&
133 dedup_info
.last_byte_acked_for_audio
<
134 it
->second
.last_byte_sent_for_audio
) {
138 // Retransmission interval has to be greater than |resend_interval|.
139 if (now
- it
->second
.time
< dedup_info
.resend_interval
)
144 bool PacedSender::ResendPackets(const SendPacketVector
& packets
,
145 const DedupInfo
& dedup_info
) {
146 if (packets
.empty()) {
149 const bool high_priority
= IsHighPriority(packets
.begin()->first
);
150 const base::TimeTicks now
= clock_
->NowTicks();
151 for (size_t i
= 0; i
< packets
.size(); i
++) {
152 if (!ShouldResend(packets
[i
].first
, dedup_info
, now
)) {
153 LogPacketEvent(packets
[i
].second
->data
, PACKET_RTX_REJECTED
);
157 DCHECK(IsHighPriority(packets
[i
].first
) == high_priority
);
159 priority_packet_list_
[packets
[i
].first
] =
160 make_pair(PacketType_Resend
, packets
[i
].second
);
162 packet_list_
[packets
[i
].first
] =
163 make_pair(PacketType_Resend
, packets
[i
].second
);
166 if (state_
== State_Unblocked
) {
172 bool PacedSender::SendRtcpPacket(uint32 ssrc
, PacketRef packet
) {
173 if (state_
== State_TransportBlocked
) {
174 priority_packet_list_
[
175 PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc
, 0)] =
176 make_pair(PacketType_RTCP
, packet
);
178 // We pass the RTCP packets straight through.
179 if (!transport_
->SendPacket(
181 base::Bind(&PacedSender::SendStoredPackets
,
182 weak_factory_
.GetWeakPtr()))) {
183 state_
= State_TransportBlocked
;
189 void PacedSender::CancelSendingPacket(const PacketKey
& packet_key
) {
190 packet_list_
.erase(packet_key
);
191 priority_packet_list_
.erase(packet_key
);
194 PacketRef
PacedSender::PopNextPacket(PacketType
* packet_type
,
195 PacketKey
* packet_key
) {
196 PacketList
* list
= !priority_packet_list_
.empty() ?
197 &priority_packet_list_
: &packet_list_
;
198 DCHECK(!list
->empty());
199 PacketList::iterator i
= list
->begin();
200 *packet_type
= i
->second
.first
;
201 *packet_key
= i
->first
;
202 PacketRef ret
= i
->second
.second
;
207 bool PacedSender::IsHighPriority(const PacketKey
& packet_key
) const {
208 return std::find(priority_ssrcs_
.begin(), priority_ssrcs_
.end(),
209 packet_key
.second
.first
) != priority_ssrcs_
.end();
212 bool PacedSender::empty() const {
213 return packet_list_
.empty() && priority_packet_list_
.empty();
216 size_t PacedSender::size() const {
217 return packet_list_
.size() + priority_packet_list_
.size();
220 // This function can be called from three places:
221 // 1. User called one of the Send* functions and we were in an unblocked state.
222 // 2. state_ == State_TransportBlocked and the transport is calling us to
223 // let us know that it's ok to send again.
224 // 3. state_ == State_BurstFull and there are still packets to send. In this
225 // case we called PostDelayedTask on this function to start a new burst.
226 void PacedSender::SendStoredPackets() {
227 State previous_state
= state_
;
228 state_
= State_Unblocked
;
233 // If the queue ever becomes impossibly long, send a crash dump without
234 // actually crashing the process.
235 if (size() > kRidiculousNumberOfPackets
&& !has_reached_upper_bound_once_
) {
237 // Please use Cr=Internals-Cast label in bug reports:
238 base::debug::DumpWithoutCrashing();
239 has_reached_upper_bound_once_
= true;
242 base::TimeTicks now
= clock_
->NowTicks();
243 // I don't actually trust that PostDelayTask(x - now) will mean that
244 // now >= x when the call happens, so check if the previous state was
245 // State_BurstFull too.
246 if (now
>= burst_end_
|| previous_state
== State_BurstFull
) {
247 // Start a new burst.
248 current_burst_size_
= 0;
249 burst_end_
= now
+ base::TimeDelta::FromMilliseconds(kPacingIntervalMs
);
251 // The goal here is to try to send out the queued packets over the next
252 // three bursts, while trying to keep the burst size below 10 if possible.
253 // We have some evidence that sending more than 12 packets in a row doesn't
254 // work very well, but we don't actually know why yet. Sending out packets
255 // sooner is better than sending out packets later as that gives us more
256 // time to re-send them if needed. So if we have less than 30 packets, just
257 // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
258 // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
259 // which is more bandwidth than the cast library should need, and sending
260 // out more data per second is unlikely to be helpful.
261 size_t max_burst_size
= std::min(
263 std::max(target_burst_size_
, size() / kPacingMaxBurstsPerFrame
));
264 current_max_burst_size_
= std::max(next_max_burst_size_
, max_burst_size
);
265 next_max_burst_size_
= std::max(next_next_max_burst_size_
, max_burst_size
);
266 next_next_max_burst_size_
= max_burst_size
;
269 base::Closure cb
= base::Bind(&PacedSender::SendStoredPackets
,
270 weak_factory_
.GetWeakPtr());
272 if (current_burst_size_
>= current_max_burst_size_
) {
273 transport_task_runner_
->PostDelayedTask(FROM_HERE
,
276 state_
= State_BurstFull
;
279 PacketType packet_type
;
280 PacketKey packet_key
;
281 PacketRef packet
= PopNextPacket(&packet_type
, &packet_key
);
282 PacketSendRecord send_record
;
283 send_record
.time
= now
;
285 switch (packet_type
) {
286 case PacketType_Resend
:
287 LogPacketEvent(packet
->data
, PACKET_RETRANSMITTED
);
289 case PacketType_Normal
:
290 LogPacketEvent(packet
->data
, PACKET_SENT_TO_NETWORK
);
292 case PacketType_RTCP
:
296 const bool socket_blocked
= !transport_
->SendPacket(packet
, cb
);
298 // Save the send record.
299 send_record
.last_byte_sent
= transport_
->GetBytesSent();
300 send_record
.last_byte_sent_for_audio
= GetLastByteSentForSsrc(audio_ssrc_
);
301 send_history_
[packet_key
] = send_record
;
302 send_history_buffer_
[packet_key
] = send_record
;
303 last_byte_sent_
[packet_key
.second
.first
] = send_record
.last_byte_sent
;
305 if (socket_blocked
) {
306 state_
= State_TransportBlocked
;
309 current_burst_size_
++;
312 // Keep ~0.5 seconds of data (1000 packets).
313 if (send_history_buffer_
.size() >=
314 max_burst_size_
* kMaxDedupeWindowMs
/ kPacingIntervalMs
) {
315 send_history_
.swap(send_history_buffer_
);
316 send_history_buffer_
.clear();
318 DCHECK_LE(send_history_buffer_
.size(),
319 max_burst_size_
* kMaxDedupeWindowMs
/ kPacingIntervalMs
);
320 state_
= State_Unblocked
;
323 void PacedSender::LogPacketEvent(const Packet
& packet
, CastLoggingEvent event
) {
324 // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
325 // if the packet is audio or video.
326 DCHECK_GE(packet
.size(), 12u);
327 base::BigEndianReader
reader(reinterpret_cast<const char*>(&packet
[8]), 4);
329 bool success
= reader
.ReadU32(&ssrc
);
332 if (ssrc
== audio_ssrc_
) {
334 } else if (ssrc
== video_ssrc_
) {
337 DVLOG(3) << "Got unknown ssrc " << ssrc
<< " when logging packet event";
341 EventMediaType media_type
= is_audio
? AUDIO_EVENT
: VIDEO_EVENT
;
342 logging_
->InsertSinglePacketEvent(clock_
->NowTicks(), event
, media_type
,