1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/receiver/frame_receiver.h"
9 #include "base/big_endian.h"
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "media/cast/cast_environment.h"
16 const int kMinSchedulingDelayMs
= 1;
22 FrameReceiver::FrameReceiver(
23 const scoped_refptr
<CastEnvironment
>& cast_environment
,
24 const FrameReceiverConfig
& config
,
25 EventMediaType event_media_type
,
26 PacedPacketSender
* const packet_sender
)
27 : cast_environment_(cast_environment
),
28 packet_parser_(config
.incoming_ssrc
, config
.rtp_payload_type
),
29 stats_(cast_environment
->Clock()),
30 event_media_type_(event_media_type
),
31 event_subscriber_(kReceiverRtcpEventHistorySize
, event_media_type
),
32 rtp_timebase_(config
.frequency
),
33 target_playout_delay_(
34 base::TimeDelta::FromMilliseconds(config
.rtp_max_delay_ms
)),
35 expected_frame_duration_(
36 base::TimeDelta::FromSeconds(1) / config
.max_frame_rate
),
37 reports_are_scheduled_(false),
38 framer_(cast_environment
->Clock(),
42 config
.rtp_max_delay_ms
* config
.max_frame_rate
/ 1000),
43 rtcp_(RtcpCastMessageCallback(),
45 RtcpLogMessageCallback(),
46 cast_environment_
->Clock(),
49 config
.incoming_ssrc
),
50 is_waiting_for_consecutive_frame_(false),
51 lip_sync_drift_(ClockDriftSmoother::GetDefaultTimeConstant()),
52 rtcp_interval_(base::TimeDelta::FromMilliseconds(config
.rtcp_interval
)),
54 DCHECK_GT(config
.rtp_max_delay_ms
, 0);
55 DCHECK_GT(config
.max_frame_rate
, 0);
56 decryptor_
.Initialize(config
.aes_key
, config
.aes_iv_mask
);
57 cast_environment_
->Logging()->AddRawEventSubscriber(&event_subscriber_
);
58 memset(frame_id_to_rtp_timestamp_
, 0, sizeof(frame_id_to_rtp_timestamp_
));
61 FrameReceiver::~FrameReceiver() {
62 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
63 cast_environment_
->Logging()->RemoveRawEventSubscriber(&event_subscriber_
);
66 void FrameReceiver::RequestEncodedFrame(
67 const ReceiveEncodedFrameCallback
& callback
) {
68 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
69 frame_request_queue_
.push_back(callback
);
70 EmitAvailableEncodedFrames();
73 bool FrameReceiver::ProcessPacket(scoped_ptr
<Packet
> packet
) {
74 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
76 if (Rtcp::IsRtcpPacket(&packet
->front(), packet
->size())) {
77 rtcp_
.IncomingRtcpPacket(&packet
->front(), packet
->size());
79 RtpCastHeader rtp_header
;
80 const uint8
* payload_data
;
82 if (!packet_parser_
.ParsePacket(&packet
->front(),
90 ProcessParsedPacket(rtp_header
, payload_data
, payload_size
);
91 stats_
.UpdateStatistics(rtp_header
);
94 if (!reports_are_scheduled_
) {
95 ScheduleNextRtcpReport();
96 ScheduleNextCastMessage();
97 reports_are_scheduled_
= true;
104 bool FrameReceiver::ParseSenderSsrc(const uint8
* packet
,
107 base::BigEndianReader
big_endian_reader(
108 reinterpret_cast<const char*>(packet
), length
);
109 return big_endian_reader
.Skip(8) && big_endian_reader
.ReadU32(ssrc
);
112 void FrameReceiver::ProcessParsedPacket(const RtpCastHeader
& rtp_header
,
113 const uint8
* payload_data
,
114 size_t payload_size
) {
115 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
117 const base::TimeTicks now
= cast_environment_
->Clock()->NowTicks();
119 frame_id_to_rtp_timestamp_
[rtp_header
.frame_id
& 0xff] =
120 rtp_header
.rtp_timestamp
;
121 cast_environment_
->Logging()->InsertPacketEvent(
122 now
, PACKET_RECEIVED
, event_media_type_
, rtp_header
.rtp_timestamp
,
123 rtp_header
.frame_id
, rtp_header
.packet_id
, rtp_header
.max_packet_id
,
126 bool duplicate
= false;
127 const bool complete
=
128 framer_
.InsertPacket(payload_data
, payload_size
, rtp_header
, &duplicate
);
130 // Duplicate packets are ignored.
134 // Update lip-sync values upon receiving the first packet of each frame, or if
135 // they have never been set yet.
136 if (rtp_header
.packet_id
== 0 || lip_sync_reference_time_
.is_null()) {
137 RtpTimestamp fresh_sync_rtp
;
138 base::TimeTicks fresh_sync_reference
;
139 if (!rtcp_
.GetLatestLipSyncTimes(&fresh_sync_rtp
, &fresh_sync_reference
)) {
140 // HACK: The sender should have provided Sender Reports before the first
141 // frame was sent. However, the spec does not currently require this.
142 // Therefore, when the data is missing, the local clock is used to
143 // generate reference timestamps.
144 VLOG(2) << "Lip sync info missing. Falling-back to local clock.";
145 fresh_sync_rtp
= rtp_header
.rtp_timestamp
;
146 fresh_sync_reference
= now
;
148 // |lip_sync_reference_time_| is always incremented according to the time
149 // delta computed from the difference in RTP timestamps. Then,
150 // |lip_sync_drift_| accounts for clock drift and also smoothes-out any
151 // sudden/discontinuous shifts in the series of reference time values.
152 if (lip_sync_reference_time_
.is_null()) {
153 lip_sync_reference_time_
= fresh_sync_reference
;
155 lip_sync_reference_time_
+= RtpDeltaToTimeDelta(
156 static_cast<int32
>(fresh_sync_rtp
- lip_sync_rtp_timestamp_
),
159 lip_sync_rtp_timestamp_
= fresh_sync_rtp
;
160 lip_sync_drift_
.Update(
161 now
, fresh_sync_reference
- lip_sync_reference_time_
);
164 // Another frame is complete from a non-duplicate packet. Attempt to emit
165 // more frames to satisfy enqueued requests.
167 EmitAvailableEncodedFrames();
170 void FrameReceiver::CastFeedback(const RtcpCastMessage
& cast_message
) {
171 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
173 base::TimeTicks now
= cast_environment_
->Clock()->NowTicks();
174 RtpTimestamp rtp_timestamp
=
175 frame_id_to_rtp_timestamp_
[cast_message
.ack_frame_id
& 0xff];
176 cast_environment_
->Logging()->InsertFrameEvent(
177 now
, FRAME_ACK_SENT
, event_media_type_
,
178 rtp_timestamp
, cast_message
.ack_frame_id
);
180 ReceiverRtcpEventSubscriber::RtcpEventMultiMap rtcp_events
;
181 event_subscriber_
.GetRtcpEventsAndReset(&rtcp_events
);
182 rtcp_
.SendRtcpFromRtpReceiver(&cast_message
, target_playout_delay_
,
186 void FrameReceiver::EmitAvailableEncodedFrames() {
187 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
189 while (!frame_request_queue_
.empty()) {
190 // Attempt to peek at the next completed frame from the |framer_|.
191 // TODO(miu): We should only be peeking at the metadata, and not copying the
192 // payload yet! Or, at least, peek using a StringPiece instead of a copy.
193 scoped_ptr
<EncodedFrame
> encoded_frame(
195 bool is_consecutively_next_frame
= false;
196 bool have_multiple_complete_frames
= false;
197 if (!framer_
.GetEncodedFrame(encoded_frame
.get(),
198 &is_consecutively_next_frame
,
199 &have_multiple_complete_frames
)) {
200 VLOG(1) << "Wait for more packets to produce a completed frame.";
201 return; // ProcessParsedPacket() will invoke this method in the future.
204 const base::TimeTicks now
= cast_environment_
->Clock()->NowTicks();
205 const base::TimeTicks playout_time
= GetPlayoutTime(*encoded_frame
);
207 // If we have multiple decodable frames, and the current frame is
208 // too old, then skip it and decode the next frame instead.
209 if (have_multiple_complete_frames
&& now
> playout_time
) {
210 framer_
.ReleaseFrame(encoded_frame
->frame_id
);
214 // If |framer_| has a frame ready that is out of sequence, examine the
215 // playout time to determine whether it's acceptable to continue, thereby
216 // skipping one or more frames. Skip if the missing frame wouldn't complete
217 // playing before the start of playback of the available frame.
218 if (!is_consecutively_next_frame
) {
219 // This assumes that decoding takes as long as playing, which might
221 const base::TimeTicks earliest_possible_end_time_of_missing_frame
=
222 now
+ expected_frame_duration_
* 2;
223 if (earliest_possible_end_time_of_missing_frame
< playout_time
) {
224 VLOG(1) << "Wait for next consecutive frame instead of skipping.";
225 if (!is_waiting_for_consecutive_frame_
) {
226 is_waiting_for_consecutive_frame_
= true;
227 cast_environment_
->PostDelayedTask(
228 CastEnvironment::MAIN
,
230 base::Bind(&FrameReceiver::EmitAvailableEncodedFramesAfterWaiting
,
231 weak_factory_
.GetWeakPtr()),
238 // At this point, we have the complete next frame, or a decodable
239 // frame from somewhere later in the stream, AND we have given up
240 // on waiting for any frames in between, so now we can ACK the frame.
241 framer_
.AckFrame(encoded_frame
->frame_id
);
243 // Decrypt the payload data in the frame, if crypto is being used.
244 if (decryptor_
.is_activated()) {
245 std::string decrypted_data
;
246 if (!decryptor_
.Decrypt(encoded_frame
->frame_id
,
249 // Decryption failed. Give up on this frame.
250 framer_
.ReleaseFrame(encoded_frame
->frame_id
);
253 encoded_frame
->data
.swap(decrypted_data
);
256 // At this point, we have a decrypted EncodedFrame ready to be emitted.
257 encoded_frame
->reference_time
= playout_time
;
258 framer_
.ReleaseFrame(encoded_frame
->frame_id
);
259 if (encoded_frame
->new_playout_delay_ms
) {
260 target_playout_delay_
= base::TimeDelta::FromMilliseconds(
261 encoded_frame
->new_playout_delay_ms
);
263 cast_environment_
->PostTask(CastEnvironment::MAIN
,
265 base::Bind(&FrameReceiver::EmitOneFrame
,
266 weak_factory_
.GetWeakPtr(),
267 frame_request_queue_
.front(),
268 base::Passed(&encoded_frame
)));
269 frame_request_queue_
.pop_front();
273 void FrameReceiver::EmitAvailableEncodedFramesAfterWaiting() {
274 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
275 DCHECK(is_waiting_for_consecutive_frame_
);
276 is_waiting_for_consecutive_frame_
= false;
277 EmitAvailableEncodedFrames();
280 void FrameReceiver::EmitOneFrame(const ReceiveEncodedFrameCallback
& callback
,
281 scoped_ptr
<EncodedFrame
> encoded_frame
) const {
282 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
283 if (!callback
.is_null())
284 callback
.Run(encoded_frame
.Pass());
287 base::TimeTicks
FrameReceiver::GetPlayoutTime(const EncodedFrame
& frame
) const {
288 base::TimeDelta target_playout_delay
= target_playout_delay_
;
289 if (frame
.new_playout_delay_ms
) {
290 target_playout_delay
= base::TimeDelta::FromMilliseconds(
291 frame
.new_playout_delay_ms
);
293 return lip_sync_reference_time_
+
294 lip_sync_drift_
.Current() +
296 static_cast<int32
>(frame
.rtp_timestamp
- lip_sync_rtp_timestamp_
),
298 target_playout_delay
;
301 void FrameReceiver::ScheduleNextCastMessage() {
302 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
303 base::TimeTicks send_time
;
304 framer_
.TimeToSendNextCastMessage(&send_time
);
305 base::TimeDelta time_to_send
=
306 send_time
- cast_environment_
->Clock()->NowTicks();
307 time_to_send
= std::max(
308 time_to_send
, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs
));
309 cast_environment_
->PostDelayedTask(
310 CastEnvironment::MAIN
,
312 base::Bind(&FrameReceiver::SendNextCastMessage
,
313 weak_factory_
.GetWeakPtr()),
317 void FrameReceiver::SendNextCastMessage() {
318 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
319 framer_
.SendCastMessage(); // Will only send a message if it is time.
320 ScheduleNextCastMessage();
323 void FrameReceiver::ScheduleNextRtcpReport() {
324 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
325 base::TimeDelta time_to_next
= rtcp_interval_
;
326 time_to_next
= std::max(
327 time_to_next
, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs
));
329 cast_environment_
->PostDelayedTask(
330 CastEnvironment::MAIN
,
332 base::Bind(&FrameReceiver::SendNextRtcpReport
,
333 weak_factory_
.GetWeakPtr()),
337 void FrameReceiver::SendNextRtcpReport() {
338 DCHECK(cast_environment_
->CurrentlyOn(CastEnvironment::MAIN
));
339 rtcp_
.SendRtcpFromRtpReceiver(NULL
, base::TimeDelta(), NULL
, &stats_
);
340 ScheduleNextRtcpReport();