Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / media / cast / net / pacing / paced_sender.cc
blob6b39f07408ccfbebd7e197b2d8d834a377949f4b
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/net/pacing/paced_sender.h"
7 #include "base/big_endian.h"
8 #include "base/bind.h"
9 #include "base/debug/dump_without_crashing.h"
10 #include "base/message_loop/message_loop.h"
11 #include "media/cast/logging/logging_impl.h"
13 namespace media {
14 namespace cast {
16 namespace {
18 static const int64 kPacingIntervalMs = 10;
19 // Each frame will be split into no more than kPacingMaxBurstsPerFrame
20 // bursts of packets.
21 static const size_t kPacingMaxBurstsPerFrame = 3;
22 static const size_t kMaxDedupeWindowMs = 500;
24 // "Impossible" upper-bound on the maximum number of packets that should ever be
25 // enqueued in the pacer. This is used to detect bugs, reported as crash dumps.
26 static const size_t kHugeQueueLengthSeconds = 10;
27 static const size_t kRidiculousNumberOfPackets =
28 kHugeQueueLengthSeconds * (kMaxBurstSize * 1000 / kPacingIntervalMs);
30 } // namespace
32 DedupInfo::DedupInfo() : last_byte_acked_for_audio(0) {}
34 // static
35 PacketKey PacedPacketSender::MakePacketKey(const base::TimeTicks& ticks,
36 uint32 ssrc,
37 uint16 packet_id) {
38 return std::make_pair(ticks, std::make_pair(ssrc, packet_id));
41 PacedSender::PacketSendRecord::PacketSendRecord()
42 : last_byte_sent(0), last_byte_sent_for_audio(0) {}
44 PacedSender::PacedSender(
45 size_t target_burst_size,
46 size_t max_burst_size,
47 base::TickClock* clock,
48 LoggingImpl* logging,
49 PacketSender* transport,
50 const scoped_refptr<base::SingleThreadTaskRunner>& transport_task_runner)
51 : clock_(clock),
52 logging_(logging),
53 transport_(transport),
54 transport_task_runner_(transport_task_runner),
55 audio_ssrc_(0),
56 video_ssrc_(0),
57 target_burst_size_(target_burst_size),
58 max_burst_size_(max_burst_size),
59 current_max_burst_size_(target_burst_size_),
60 next_max_burst_size_(target_burst_size_),
61 next_next_max_burst_size_(target_burst_size_),
62 current_burst_size_(0),
63 state_(State_Unblocked),
64 has_reached_upper_bound_once_(false),
65 weak_factory_(this) {
68 PacedSender::~PacedSender() {}
70 void PacedSender::RegisterAudioSsrc(uint32 audio_ssrc) {
71 audio_ssrc_ = audio_ssrc;
74 void PacedSender::RegisterVideoSsrc(uint32 video_ssrc) {
75 video_ssrc_ = video_ssrc;
78 void PacedSender::RegisterPrioritySsrc(uint32 ssrc) {
79 priority_ssrcs_.push_back(ssrc);
82 int64 PacedSender::GetLastByteSentForPacket(const PacketKey& packet_key) {
83 PacketSendHistory::const_iterator it = send_history_.find(packet_key);
84 if (it == send_history_.end())
85 return 0;
86 return it->second.last_byte_sent;
89 int64 PacedSender::GetLastByteSentForSsrc(uint32 ssrc) {
90 std::map<uint32, int64>::const_iterator it = last_byte_sent_.find(ssrc);
91 if (it == last_byte_sent_.end())
92 return 0;
93 return it->second;
96 bool PacedSender::SendPackets(const SendPacketVector& packets) {
97 if (packets.empty()) {
98 return true;
100 const bool high_priority = IsHighPriority(packets.begin()->first);
101 for (size_t i = 0; i < packets.size(); i++) {
102 DCHECK(IsHighPriority(packets[i].first) == high_priority);
103 if (high_priority) {
104 priority_packet_list_[packets[i].first] =
105 make_pair(PacketType_Normal, packets[i].second);
106 } else {
107 packet_list_[packets[i].first] =
108 make_pair(PacketType_Normal, packets[i].second);
111 if (state_ == State_Unblocked) {
112 SendStoredPackets();
114 return true;
117 bool PacedSender::ShouldResend(const PacketKey& packet_key,
118 const DedupInfo& dedup_info,
119 const base::TimeTicks& now) {
120 PacketSendHistory::const_iterator it = send_history_.find(packet_key);
122 // No history of previous transmission. It might be sent too long ago.
123 if (it == send_history_.end())
124 return true;
126 // Suppose there is request to retransmit X and there is an audio
127 // packet Y sent just before X. Reject retransmission of X if ACK for
128 // Y has not been received.
129 // Only do this for video packets.
130 if (packet_key.second.first == video_ssrc_) {
131 if (dedup_info.last_byte_acked_for_audio &&
132 it->second.last_byte_sent_for_audio &&
133 dedup_info.last_byte_acked_for_audio <
134 it->second.last_byte_sent_for_audio) {
135 return false;
138 // Retransmission interval has to be greater than |resend_interval|.
139 if (now - it->second.time < dedup_info.resend_interval)
140 return false;
141 return true;
144 bool PacedSender::ResendPackets(const SendPacketVector& packets,
145 const DedupInfo& dedup_info) {
146 if (packets.empty()) {
147 return true;
149 const bool high_priority = IsHighPriority(packets.begin()->first);
150 const base::TimeTicks now = clock_->NowTicks();
151 for (size_t i = 0; i < packets.size(); i++) {
152 if (!ShouldResend(packets[i].first, dedup_info, now)) {
153 LogPacketEvent(packets[i].second->data, PACKET_RTX_REJECTED);
154 continue;
157 DCHECK(IsHighPriority(packets[i].first) == high_priority);
158 if (high_priority) {
159 priority_packet_list_[packets[i].first] =
160 make_pair(PacketType_Resend, packets[i].second);
161 } else {
162 packet_list_[packets[i].first] =
163 make_pair(PacketType_Resend, packets[i].second);
166 if (state_ == State_Unblocked) {
167 SendStoredPackets();
169 return true;
172 bool PacedSender::SendRtcpPacket(uint32 ssrc, PacketRef packet) {
173 if (state_ == State_TransportBlocked) {
174 priority_packet_list_[
175 PacedPacketSender::MakePacketKey(base::TimeTicks(), ssrc, 0)] =
176 make_pair(PacketType_RTCP, packet);
177 } else {
178 // We pass the RTCP packets straight through.
179 if (!transport_->SendPacket(
180 packet,
181 base::Bind(&PacedSender::SendStoredPackets,
182 weak_factory_.GetWeakPtr()))) {
183 state_ = State_TransportBlocked;
186 return true;
189 void PacedSender::CancelSendingPacket(const PacketKey& packet_key) {
190 packet_list_.erase(packet_key);
191 priority_packet_list_.erase(packet_key);
194 PacketRef PacedSender::PopNextPacket(PacketType* packet_type,
195 PacketKey* packet_key) {
196 PacketList* list = !priority_packet_list_.empty() ?
197 &priority_packet_list_ : &packet_list_;
198 DCHECK(!list->empty());
199 PacketList::iterator i = list->begin();
200 *packet_type = i->second.first;
201 *packet_key = i->first;
202 PacketRef ret = i->second.second;
203 list->erase(i);
204 return ret;
207 bool PacedSender::IsHighPriority(const PacketKey& packet_key) const {
208 return std::find(priority_ssrcs_.begin(), priority_ssrcs_.end(),
209 packet_key.second.first) != priority_ssrcs_.end();
212 bool PacedSender::empty() const {
213 return packet_list_.empty() && priority_packet_list_.empty();
216 size_t PacedSender::size() const {
217 return packet_list_.size() + priority_packet_list_.size();
220 // This function can be called from three places:
221 // 1. User called one of the Send* functions and we were in an unblocked state.
222 // 2. state_ == State_TransportBlocked and the transport is calling us to
223 // let us know that it's ok to send again.
224 // 3. state_ == State_BurstFull and there are still packets to send. In this
225 // case we called PostDelayedTask on this function to start a new burst.
226 void PacedSender::SendStoredPackets() {
227 State previous_state = state_;
228 state_ = State_Unblocked;
229 if (empty()) {
230 return;
233 // If the queue ever becomes impossibly long, send a crash dump without
234 // actually crashing the process.
235 if (size() > kRidiculousNumberOfPackets && !has_reached_upper_bound_once_) {
236 NOTREACHED();
237 // Please use Cr=Internals-Cast label in bug reports:
238 base::debug::DumpWithoutCrashing();
239 has_reached_upper_bound_once_ = true;
242 base::TimeTicks now = clock_->NowTicks();
243 // I don't actually trust that PostDelayTask(x - now) will mean that
244 // now >= x when the call happens, so check if the previous state was
245 // State_BurstFull too.
246 if (now >= burst_end_ || previous_state == State_BurstFull) {
247 // Start a new burst.
248 current_burst_size_ = 0;
249 burst_end_ = now + base::TimeDelta::FromMilliseconds(kPacingIntervalMs);
251 // The goal here is to try to send out the queued packets over the next
252 // three bursts, while trying to keep the burst size below 10 if possible.
253 // We have some evidence that sending more than 12 packets in a row doesn't
254 // work very well, but we don't actually know why yet. Sending out packets
255 // sooner is better than sending out packets later as that gives us more
256 // time to re-send them if needed. So if we have less than 30 packets, just
257 // send 10 at a time. If we have less than 60 packets, send n / 3 at a time.
258 // if we have more than 60, we send 20 at a time. 20 packets is ~24Mbit/s
259 // which is more bandwidth than the cast library should need, and sending
260 // out more data per second is unlikely to be helpful.
261 size_t max_burst_size = std::min(
262 max_burst_size_,
263 std::max(target_burst_size_, size() / kPacingMaxBurstsPerFrame));
264 current_max_burst_size_ = std::max(next_max_burst_size_, max_burst_size);
265 next_max_burst_size_ = std::max(next_next_max_burst_size_, max_burst_size);
266 next_next_max_burst_size_ = max_burst_size;
269 base::Closure cb = base::Bind(&PacedSender::SendStoredPackets,
270 weak_factory_.GetWeakPtr());
271 while (!empty()) {
272 if (current_burst_size_ >= current_max_burst_size_) {
273 transport_task_runner_->PostDelayedTask(FROM_HERE,
275 burst_end_ - now);
276 state_ = State_BurstFull;
277 return;
279 PacketType packet_type;
280 PacketKey packet_key;
281 PacketRef packet = PopNextPacket(&packet_type, &packet_key);
282 PacketSendRecord send_record;
283 send_record.time = now;
285 switch (packet_type) {
286 case PacketType_Resend:
287 LogPacketEvent(packet->data, PACKET_RETRANSMITTED);
288 break;
289 case PacketType_Normal:
290 LogPacketEvent(packet->data, PACKET_SENT_TO_NETWORK);
291 break;
292 case PacketType_RTCP:
293 break;
296 const bool socket_blocked = !transport_->SendPacket(packet, cb);
298 // Save the send record.
299 send_record.last_byte_sent = transport_->GetBytesSent();
300 send_record.last_byte_sent_for_audio = GetLastByteSentForSsrc(audio_ssrc_);
301 send_history_[packet_key] = send_record;
302 send_history_buffer_[packet_key] = send_record;
303 last_byte_sent_[packet_key.second.first] = send_record.last_byte_sent;
305 if (socket_blocked) {
306 state_ = State_TransportBlocked;
307 return;
309 current_burst_size_++;
312 // Keep ~0.5 seconds of data (1000 packets).
313 if (send_history_buffer_.size() >=
314 max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs) {
315 send_history_.swap(send_history_buffer_);
316 send_history_buffer_.clear();
318 DCHECK_LE(send_history_buffer_.size(),
319 max_burst_size_ * kMaxDedupeWindowMs / kPacingIntervalMs);
320 state_ = State_Unblocked;
323 void PacedSender::LogPacketEvent(const Packet& packet, CastLoggingEvent event) {
324 // Get SSRC from packet and compare with the audio_ssrc / video_ssrc to see
325 // if the packet is audio or video.
326 DCHECK_GE(packet.size(), 12u);
327 base::BigEndianReader reader(reinterpret_cast<const char*>(&packet[8]), 4);
328 uint32 ssrc;
329 bool success = reader.ReadU32(&ssrc);
330 DCHECK(success);
331 bool is_audio;
332 if (ssrc == audio_ssrc_) {
333 is_audio = true;
334 } else if (ssrc == video_ssrc_) {
335 is_audio = false;
336 } else {
337 DVLOG(3) << "Got unknown ssrc " << ssrc << " when logging packet event";
338 return;
341 EventMediaType media_type = is_audio ? AUDIO_EVENT : VIDEO_EVENT;
342 logging_->InsertSinglePacketEvent(clock_->NowTicks(), event, media_type,
343 packet);
346 } // namespace cast
347 } // namespace media