1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/trace_event/trace_event.h"
13 #include "media/base/bind_to_current_loop.h"
14 #include "media/base/decoder_buffer.h"
15 #include "media/base/media_log.h"
16 #include "media/base/video_decoder.h"
17 #include "media/filters/decrypting_demuxer_stream.h"
21 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
22 // templated classes such as this.
23 template <DemuxerStream::Type StreamType
>
24 static const char* GetTraceString();
26 #define FUNCTION_DVLOG(level) \
27 DVLOG(level) << __FUNCTION__ << "<" << GetStreamTypeString() << ">"
30 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
31 return "DecoderStream<VIDEO>::Decode";
35 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
36 return "DecoderStream<AUDIO>::Decode";
39 template <DemuxerStream::Type StreamType
>
40 DecoderStream
<StreamType
>::DecoderStream(
41 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
42 ScopedVector
<Decoder
> decoders
,
43 const scoped_refptr
<MediaLog
>& media_log
)
44 : task_runner_(task_runner
),
45 media_log_(media_log
),
46 state_(STATE_UNINITIALIZED
),
48 decoder_selector_(new DecoderSelector
<StreamType
>(task_runner
,
51 active_splice_(false),
53 pending_decode_requests_(0),
57 template <DemuxerStream::Type StreamType
>
58 DecoderStream
<StreamType
>::~DecoderStream() {
60 DCHECK(task_runner_
->BelongsToCurrentThread());
62 decoder_selector_
.reset();
64 if (!init_cb_
.is_null()) {
65 task_runner_
->PostTask(FROM_HERE
,
66 base::Bind(base::ResetAndReturn(&init_cb_
), false));
68 if (!read_cb_
.is_null()) {
69 task_runner_
->PostTask(FROM_HERE
, base::Bind(
70 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
72 if (!reset_cb_
.is_null())
73 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
77 decrypting_demuxer_stream_
.reset();
80 template <DemuxerStream::Type StreamType
>
81 std::string DecoderStream
<StreamType
>::GetStreamTypeString() {
82 return DecoderStreamTraits
<StreamType
>::ToString();
85 template <DemuxerStream::Type StreamType
>
86 void DecoderStream
<StreamType
>::Initialize(
87 DemuxerStream
* stream
,
88 const InitCB
& init_cb
,
89 const SetDecryptorReadyCB
& set_decryptor_ready_cb
,
90 const StatisticsCB
& statistics_cb
,
91 const base::Closure
& waiting_for_decryption_key_cb
) {
93 DCHECK(task_runner_
->BelongsToCurrentThread());
94 DCHECK_EQ(state_
, STATE_UNINITIALIZED
);
95 DCHECK(init_cb_
.is_null());
96 DCHECK(!init_cb
.is_null());
98 statistics_cb_
= statistics_cb
;
100 waiting_for_decryption_key_cb_
= waiting_for_decryption_key_cb
;
103 state_
= STATE_INITIALIZING
;
104 SelectDecoder(set_decryptor_ready_cb
);
107 template <DemuxerStream::Type StreamType
>
108 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
110 DCHECK(task_runner_
->BelongsToCurrentThread());
111 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_INITIALIZING
)
113 // No two reads in the flight at any time.
114 DCHECK(read_cb_
.is_null());
115 // No read during resetting or stopping process.
116 DCHECK(reset_cb_
.is_null());
118 if (state_
== STATE_ERROR
) {
119 task_runner_
->PostTask(
120 FROM_HERE
, base::Bind(read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
124 if (state_
== STATE_END_OF_STREAM
&& ready_outputs_
.empty()) {
125 task_runner_
->PostTask(
126 FROM_HERE
, base::Bind(read_cb
, OK
, StreamTraits::CreateEOSOutput()));
130 if (!ready_outputs_
.empty()) {
131 task_runner_
->PostTask(FROM_HERE
,
132 base::Bind(read_cb
, OK
, ready_outputs_
.front()));
133 ready_outputs_
.pop_front();
138 if (state_
== STATE_NORMAL
&& CanDecodeMore())
139 ReadFromDemuxerStream();
142 template <DemuxerStream::Type StreamType
>
143 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
145 DCHECK(task_runner_
->BelongsToCurrentThread());
146 DCHECK_NE(state_
, STATE_UNINITIALIZED
);
147 DCHECK(reset_cb_
.is_null());
151 if (!read_cb_
.is_null()) {
152 task_runner_
->PostTask(FROM_HERE
, base::Bind(
153 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
156 ready_outputs_
.clear();
158 // During decoder reinitialization, the Decoder does not need to be and
159 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
161 if (state_
== STATE_REINITIALIZING_DECODER
)
164 // During pending demuxer read and when not using DecryptingDemuxerStream,
165 // the Decoder will be reset after demuxer read is returned
166 // (in OnBufferReady()).
167 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
170 if (decrypting_demuxer_stream_
) {
171 decrypting_demuxer_stream_
->Reset(base::Bind(
172 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
179 template <DemuxerStream::Type StreamType
>
180 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
181 DCHECK(task_runner_
->BelongsToCurrentThread());
182 return !ready_outputs_
.empty() || decoder_
->CanReadWithoutStalling();
186 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
187 DCHECK(task_runner_
->BelongsToCurrentThread());
191 template <DemuxerStream::Type StreamType
>
192 int DecoderStream
<StreamType
>::GetMaxDecodeRequests() const {
193 return decoder_
->GetMaxDecodeRequests();
197 int DecoderStream
<DemuxerStream::AUDIO
>::GetMaxDecodeRequests() const {
201 template <DemuxerStream::Type StreamType
>
202 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
203 DCHECK(task_runner_
->BelongsToCurrentThread());
205 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
206 // It only makes sense to saturate decoder completely when output queue is
209 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
210 return !decoding_eos_
&& num_decodes
< GetMaxDecodeRequests();
213 template <DemuxerStream::Type StreamType
>
214 void DecoderStream
<StreamType
>::SelectDecoder(
215 const SetDecryptorReadyCB
& set_decryptor_ready_cb
) {
216 decoder_selector_
->SelectDecoder(
217 stream_
, set_decryptor_ready_cb
,
218 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
219 weak_factory_
.GetWeakPtr()),
220 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
221 weak_factory_
.GetWeakPtr()),
222 waiting_for_decryption_key_cb_
);
225 template <DemuxerStream::Type StreamType
>
226 void DecoderStream
<StreamType
>::OnDecoderSelected(
227 scoped_ptr
<Decoder
> selected_decoder
,
228 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
229 FUNCTION_DVLOG(2) << ": "
230 << (selected_decoder
? selected_decoder
->GetDisplayName()
231 : "No decoder selected.");
232 DCHECK(task_runner_
->BelongsToCurrentThread());
233 DCHECK(state_
== STATE_INITIALIZING
|| state_
== STATE_REINITIALIZING_DECODER
)
235 if (state_
== STATE_INITIALIZING
) {
236 DCHECK(!init_cb_
.is_null());
237 DCHECK(read_cb_
.is_null());
238 DCHECK(reset_cb_
.is_null());
243 previous_decoder_
= decoder_
.Pass();
244 decoder_
= selected_decoder
.Pass();
245 if (decrypting_demuxer_stream
) {
246 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
247 stream_
= decrypting_demuxer_stream_
.get();
251 if (state_
== STATE_INITIALIZING
) {
252 state_
= STATE_UNINITIALIZED
;
253 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
254 << " decoder initialization failed";
255 base::ResetAndReturn(&init_cb_
).Run(false);
257 CompleteDecoderReinitialization(false);
262 media_log_
->SetBooleanProperty(GetStreamTypeString() + "_dds",
263 decrypting_demuxer_stream_
);
264 media_log_
->SetStringProperty(GetStreamTypeString() + "_decoder",
265 decoder_
->GetDisplayName());
267 if (state_
== STATE_REINITIALIZING_DECODER
) {
268 CompleteDecoderReinitialization(true);
272 // Initialization succeeded.
273 state_
= STATE_NORMAL
;
274 if (StreamTraits::NeedsBitstreamConversion(decoder_
.get()))
275 stream_
->EnableBitstreamConverter();
276 base::ResetAndReturn(&init_cb_
).Run(true);
279 template <DemuxerStream::Type StreamType
>
280 void DecoderStream
<StreamType
>::SatisfyRead(
282 const scoped_refptr
<Output
>& output
) {
283 DCHECK(!read_cb_
.is_null());
284 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
287 template <DemuxerStream::Type StreamType
>
288 void DecoderStream
<StreamType
>::Decode(
289 const scoped_refptr
<DecoderBuffer
>& buffer
) {
291 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
292 DCHECK_LT(pending_decode_requests_
, GetMaxDecodeRequests());
293 DCHECK(reset_cb_
.is_null());
294 DCHECK(buffer
.get());
296 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
298 TRACE_EVENT_ASYNC_BEGIN2(
299 "media", GetTraceString
<StreamType
>(), this, "key frame",
300 !buffer
->end_of_stream() && buffer
->is_key_frame(), "timestamp (ms)",
301 buffer
->timestamp().InMilliseconds());
303 if (buffer
->end_of_stream())
304 decoding_eos_
= true;
306 ++pending_decode_requests_
;
307 decoder_
->Decode(buffer
,
308 base::Bind(&DecoderStream
<StreamType
>::OnDecodeDone
,
309 weak_factory_
.GetWeakPtr(),
311 buffer
->end_of_stream()));
314 template <DemuxerStream::Type StreamType
>
315 void DecoderStream
<StreamType
>::FlushDecoder() {
316 Decode(DecoderBuffer::CreateEOSBuffer());
319 template <DemuxerStream::Type StreamType
>
320 void DecoderStream
<StreamType
>::OnDecodeDone(int buffer_size
,
322 typename
Decoder::Status status
) {
323 FUNCTION_DVLOG(2) << ": " << status
;
324 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
325 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
327 DCHECK_GT(pending_decode_requests_
, 0);
329 --pending_decode_requests_
;
331 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
334 DCHECK(!pending_decode_requests_
);
335 decoding_eos_
= false;
338 if (state_
== STATE_ERROR
) {
339 DCHECK(read_cb_
.is_null());
343 // Drop decoding result if Reset() was called during decoding.
344 // The resetting process will be handled when the decoder is reset.
345 if (!reset_cb_
.is_null())
349 case Decoder::kDecodeError
:
350 state_
= STATE_ERROR
;
351 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString() << " decode error";
352 ready_outputs_
.clear();
353 if (!read_cb_
.is_null())
354 SatisfyRead(DECODE_ERROR
, NULL
);
357 case Decoder::kAborted
:
358 // Decoder can return kAborted during Reset() or during destruction.
362 // Any successful decode counts!
364 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
366 if (state_
== STATE_NORMAL
) {
368 state_
= STATE_END_OF_STREAM
;
369 if (ready_outputs_
.empty() && !read_cb_
.is_null())
370 SatisfyRead(OK
, StreamTraits::CreateEOSOutput());
375 ReadFromDemuxerStream();
379 if (state_
== STATE_FLUSHING_DECODER
&& !pending_decode_requests_
)
380 ReinitializeDecoder();
385 template <DemuxerStream::Type StreamType
>
386 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
387 const scoped_refptr
<Output
>& output
) {
388 FUNCTION_DVLOG(2) << ": " << output
->timestamp().InMilliseconds() << " ms";
389 DCHECK(output
.get());
390 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
391 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
394 if (state_
== STATE_ERROR
) {
395 DCHECK(read_cb_
.is_null());
399 // Drop decoding result if Reset() was called during decoding.
400 // The resetting process will be handled when the decoder is reset.
401 if (!reset_cb_
.is_null())
404 if (!read_cb_
.is_null()) {
405 // If |ready_outputs_| was non-empty, the read would have already been
406 // satisifed by Read().
407 DCHECK(ready_outputs_
.empty());
408 SatisfyRead(OK
, output
);
412 // Store decoded output.
413 ready_outputs_
.push_back(output
);
416 template <DemuxerStream::Type StreamType
>
417 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
419 DCHECK_EQ(state_
, STATE_NORMAL
);
420 DCHECK(CanDecodeMore());
421 DCHECK(reset_cb_
.is_null());
423 state_
= STATE_PENDING_DEMUXER_READ
;
424 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
425 weak_factory_
.GetWeakPtr()));
428 template <DemuxerStream::Type StreamType
>
429 void DecoderStream
<StreamType
>::OnBufferReady(
430 DemuxerStream::Status status
,
431 const scoped_refptr
<DecoderBuffer
>& buffer
) {
432 FUNCTION_DVLOG(2) << ": " << status
<< ", "
433 << (buffer
.get() ? buffer
->AsHumanReadableString()
436 DCHECK(task_runner_
->BelongsToCurrentThread());
437 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
439 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
441 // Decoding has been stopped (e.g due to an error).
442 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
443 DCHECK(state_
== STATE_ERROR
);
444 DCHECK(read_cb_
.is_null());
448 state_
= STATE_NORMAL
;
450 if (status
== DemuxerStream::kConfigChanged
) {
451 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
452 DCHECK(stream_
->SupportsConfigChanges());
454 if (!config_change_observer_cb_
.is_null())
455 config_change_observer_cb_
.Run();
457 state_
= STATE_FLUSHING_DECODER
;
458 if (!reset_cb_
.is_null()) {
459 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
460 // which will continue the resetting process in it's callback.
461 if (!decrypting_demuxer_stream_
)
462 Reset(base::ResetAndReturn(&reset_cb_
));
463 // Reinitialization will continue after Reset() is done.
470 if (!reset_cb_
.is_null()) {
471 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
472 // which will continue the resetting process in it's callback.
473 if (!decrypting_demuxer_stream_
)
474 Reset(base::ResetAndReturn(&reset_cb_
));
478 if (status
== DemuxerStream::kAborted
) {
479 if (!read_cb_
.is_null())
480 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
484 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
485 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
486 if (active_splice_
|| has_splice_ts
) {
487 splice_observer_cb_
.Run(buffer
->splice_timestamp());
488 active_splice_
= has_splice_ts
;
492 DCHECK(status
== DemuxerStream::kOk
) << status
;
495 // Read more data if the decoder supports multiple parallel decoding requests.
497 ReadFromDemuxerStream();
500 template <DemuxerStream::Type StreamType
>
501 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
503 DCHECK(task_runner_
->BelongsToCurrentThread());
504 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
);
505 DCHECK_EQ(pending_decode_requests_
, 0);
507 state_
= STATE_REINITIALIZING_DECODER
;
508 DecoderStreamTraits
<StreamType
>::InitializeDecoder(
509 decoder_
.get(), stream_
,
510 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
511 weak_factory_
.GetWeakPtr()),
512 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
513 weak_factory_
.GetWeakPtr()));
516 template <DemuxerStream::Type StreamType
>
517 void DecoderStream
<StreamType
>::OnDecoderReinitialized(bool success
) {
519 DCHECK(task_runner_
->BelongsToCurrentThread());
520 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
522 // ReinitializeDecoder() can be called in two cases:
523 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
524 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
525 // Also, Reset() can be called during pending ReinitializeDecoder().
526 // This function needs to handle them all!
529 // Reinitialization failed. Try to fall back to one of the remaining
530 // decoders. This will consume at least one decoder so doing it more than
532 // For simplicity, don't attempt to fall back to a decryptor. Calling this
533 // with a null callback ensures that one won't be selected.
534 SelectDecoder(SetDecryptorReadyCB());
536 CompleteDecoderReinitialization(true);
540 template <DemuxerStream::Type StreamType
>
541 void DecoderStream
<StreamType
>::CompleteDecoderReinitialization(bool success
) {
543 DCHECK(task_runner_
->BelongsToCurrentThread());
544 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
546 state_
= success
? STATE_NORMAL
: STATE_ERROR
;
548 if (!reset_cb_
.is_null()) {
549 base::ResetAndReturn(&reset_cb_
).Run();
553 if (read_cb_
.is_null())
556 if (state_
== STATE_ERROR
) {
557 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
558 << " decoder reinitialization failed";
559 SatisfyRead(DECODE_ERROR
, NULL
);
563 ReadFromDemuxerStream();
566 template <DemuxerStream::Type StreamType
>
567 void DecoderStream
<StreamType
>::ResetDecoder() {
569 DCHECK(task_runner_
->BelongsToCurrentThread());
570 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
571 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
572 DCHECK(!reset_cb_
.is_null());
574 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
575 weak_factory_
.GetWeakPtr()));
578 template <DemuxerStream::Type StreamType
>
579 void DecoderStream
<StreamType
>::OnDecoderReset() {
581 DCHECK(task_runner_
->BelongsToCurrentThread());
582 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
583 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
584 // If Reset() was called during pending read, read callback should be fired
585 // before the reset callback is fired.
586 DCHECK(read_cb_
.is_null());
587 DCHECK(!reset_cb_
.is_null());
589 if (state_
!= STATE_FLUSHING_DECODER
) {
590 state_
= STATE_NORMAL
;
591 active_splice_
= false;
592 base::ResetAndReturn(&reset_cb_
).Run();
596 // The resetting process will be continued in OnDecoderReinitialized().
597 ReinitializeDecoder();
600 template class DecoderStream
<DemuxerStream::VIDEO
>;
601 template class DecoderStream
<DemuxerStream::AUDIO
>;