1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/trace_event/trace_event.h"
13 #include "media/base/bind_to_current_loop.h"
14 #include "media/base/decoder_buffer.h"
15 #include "media/base/media_log.h"
16 #include "media/base/video_decoder.h"
17 #include "media/base/video_frame.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType
>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << "<" << GetStreamTypeString() << ">"
31 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
32 return "DecoderStream<VIDEO>::Decode";
36 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
37 return "DecoderStream<AUDIO>::Decode";
40 template <DemuxerStream::Type StreamType
>
41 DecoderStream
<StreamType
>::DecoderStream(
42 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
43 ScopedVector
<Decoder
> decoders
,
44 const scoped_refptr
<MediaLog
>& media_log
)
45 : task_runner_(task_runner
),
46 media_log_(media_log
),
47 state_(STATE_UNINITIALIZED
),
49 decoder_selector_(new DecoderSelector
<StreamType
>(task_runner
,
52 active_splice_(false),
54 pending_decode_requests_(0),
58 template <DemuxerStream::Type StreamType
>
59 DecoderStream
<StreamType
>::~DecoderStream() {
61 DCHECK(task_runner_
->BelongsToCurrentThread());
63 decoder_selector_
.reset();
65 if (!init_cb_
.is_null()) {
66 task_runner_
->PostTask(FROM_HERE
,
67 base::Bind(base::ResetAndReturn(&init_cb_
), false));
69 if (!read_cb_
.is_null()) {
70 task_runner_
->PostTask(FROM_HERE
, base::Bind(
71 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
73 if (!reset_cb_
.is_null())
74 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
78 decrypting_demuxer_stream_
.reset();
81 template <DemuxerStream::Type StreamType
>
82 std::string DecoderStream
<StreamType
>::GetStreamTypeString() {
83 return DecoderStreamTraits
<StreamType
>::ToString();
86 template <DemuxerStream::Type StreamType
>
87 void DecoderStream
<StreamType
>::Initialize(
88 DemuxerStream
* stream
,
89 const InitCB
& init_cb
,
90 const SetDecryptorReadyCB
& set_decryptor_ready_cb
,
91 const StatisticsCB
& statistics_cb
,
92 const base::Closure
& waiting_for_decryption_key_cb
) {
94 DCHECK(task_runner_
->BelongsToCurrentThread());
95 DCHECK_EQ(state_
, STATE_UNINITIALIZED
);
96 DCHECK(init_cb_
.is_null());
97 DCHECK(!init_cb
.is_null());
99 statistics_cb_
= statistics_cb
;
101 waiting_for_decryption_key_cb_
= waiting_for_decryption_key_cb
;
104 state_
= STATE_INITIALIZING
;
105 SelectDecoder(set_decryptor_ready_cb
);
108 template <DemuxerStream::Type StreamType
>
109 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
111 DCHECK(task_runner_
->BelongsToCurrentThread());
112 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_INITIALIZING
)
114 // No two reads in the flight at any time.
115 DCHECK(read_cb_
.is_null());
116 // No read during resetting or stopping process.
117 DCHECK(reset_cb_
.is_null());
119 if (state_
== STATE_ERROR
) {
120 task_runner_
->PostTask(
121 FROM_HERE
, base::Bind(read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
125 if (state_
== STATE_END_OF_STREAM
&& ready_outputs_
.empty()) {
126 task_runner_
->PostTask(
127 FROM_HERE
, base::Bind(read_cb
, OK
, StreamTraits::CreateEOSOutput()));
131 if (!ready_outputs_
.empty()) {
132 task_runner_
->PostTask(FROM_HERE
,
133 base::Bind(read_cb
, OK
, ready_outputs_
.front()));
134 ready_outputs_
.pop_front();
139 if (state_
== STATE_NORMAL
&& CanDecodeMore())
140 ReadFromDemuxerStream();
143 template <DemuxerStream::Type StreamType
>
144 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
146 DCHECK(task_runner_
->BelongsToCurrentThread());
147 DCHECK_NE(state_
, STATE_UNINITIALIZED
);
148 DCHECK(reset_cb_
.is_null());
152 if (!read_cb_
.is_null()) {
153 task_runner_
->PostTask(FROM_HERE
, base::Bind(
154 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
157 ready_outputs_
.clear();
159 // During decoder reinitialization, the Decoder does not need to be and
160 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
162 if (state_
== STATE_REINITIALIZING_DECODER
)
165 // During pending demuxer read and when not using DecryptingDemuxerStream,
166 // the Decoder will be reset after demuxer read is returned
167 // (in OnBufferReady()).
168 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
171 if (decrypting_demuxer_stream_
) {
172 decrypting_demuxer_stream_
->Reset(base::Bind(
173 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
180 template <DemuxerStream::Type StreamType
>
181 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
182 DCHECK(task_runner_
->BelongsToCurrentThread());
183 return !ready_outputs_
.empty() || decoder_
->CanReadWithoutStalling();
187 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
188 DCHECK(task_runner_
->BelongsToCurrentThread());
192 template <DemuxerStream::Type StreamType
>
193 int DecoderStream
<StreamType
>::GetMaxDecodeRequests() const {
194 return decoder_
->GetMaxDecodeRequests();
198 int DecoderStream
<DemuxerStream::AUDIO
>::GetMaxDecodeRequests() const {
202 template <DemuxerStream::Type StreamType
>
203 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
204 DCHECK(task_runner_
->BelongsToCurrentThread());
206 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
207 // It only makes sense to saturate decoder completely when output queue is
210 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
211 return !decoding_eos_
&& num_decodes
< GetMaxDecodeRequests();
214 template <DemuxerStream::Type StreamType
>
215 void DecoderStream
<StreamType
>::SelectDecoder(
216 const SetDecryptorReadyCB
& set_decryptor_ready_cb
) {
217 decoder_selector_
->SelectDecoder(
218 stream_
, set_decryptor_ready_cb
,
219 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
220 weak_factory_
.GetWeakPtr()),
221 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
222 weak_factory_
.GetWeakPtr()),
223 waiting_for_decryption_key_cb_
);
226 template <DemuxerStream::Type StreamType
>
227 void DecoderStream
<StreamType
>::OnDecoderSelected(
228 scoped_ptr
<Decoder
> selected_decoder
,
229 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
230 FUNCTION_DVLOG(2) << ": "
231 << (selected_decoder
? selected_decoder
->GetDisplayName()
232 : "No decoder selected.");
233 DCHECK(task_runner_
->BelongsToCurrentThread());
234 DCHECK(state_
== STATE_INITIALIZING
|| state_
== STATE_REINITIALIZING_DECODER
)
236 if (state_
== STATE_INITIALIZING
) {
237 DCHECK(!init_cb_
.is_null());
238 DCHECK(read_cb_
.is_null());
239 DCHECK(reset_cb_
.is_null());
244 previous_decoder_
= decoder_
.Pass();
245 decoder_
= selected_decoder
.Pass();
246 if (decrypting_demuxer_stream
) {
247 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
248 stream_
= decrypting_demuxer_stream_
.get();
252 if (state_
== STATE_INITIALIZING
) {
253 state_
= STATE_UNINITIALIZED
;
254 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
255 << " decoder initialization failed";
256 base::ResetAndReturn(&init_cb_
).Run(false);
258 CompleteDecoderReinitialization(false);
263 media_log_
->SetBooleanProperty(GetStreamTypeString() + "_dds",
264 decrypting_demuxer_stream_
);
265 media_log_
->SetStringProperty(GetStreamTypeString() + "_decoder",
266 decoder_
->GetDisplayName());
268 if (state_
== STATE_REINITIALIZING_DECODER
) {
269 CompleteDecoderReinitialization(true);
273 // Initialization succeeded.
274 state_
= STATE_NORMAL
;
275 if (StreamTraits::NeedsBitstreamConversion(decoder_
.get()))
276 stream_
->EnableBitstreamConverter();
277 base::ResetAndReturn(&init_cb_
).Run(true);
280 template <DemuxerStream::Type StreamType
>
281 void DecoderStream
<StreamType
>::SatisfyRead(
283 const scoped_refptr
<Output
>& output
) {
284 DCHECK(!read_cb_
.is_null());
285 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
288 template <DemuxerStream::Type StreamType
>
289 void DecoderStream
<StreamType
>::Decode(
290 const scoped_refptr
<DecoderBuffer
>& buffer
) {
292 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
293 DCHECK_LT(pending_decode_requests_
, GetMaxDecodeRequests());
294 DCHECK(reset_cb_
.is_null());
295 DCHECK(buffer
.get());
297 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
299 TRACE_EVENT_ASYNC_BEGIN2(
300 "media", GetTraceString
<StreamType
>(), this, "key frame",
301 !buffer
->end_of_stream() && buffer
->is_key_frame(), "timestamp (ms)",
302 !buffer
->end_of_stream() ? buffer
->timestamp().InMilliseconds() : 0);
304 if (buffer
->end_of_stream())
305 decoding_eos_
= true;
307 ++pending_decode_requests_
;
308 decoder_
->Decode(buffer
,
309 base::Bind(&DecoderStream
<StreamType
>::OnDecodeDone
,
310 weak_factory_
.GetWeakPtr(),
312 buffer
->end_of_stream()));
315 template <DemuxerStream::Type StreamType
>
316 void DecoderStream
<StreamType
>::FlushDecoder() {
317 Decode(DecoderBuffer::CreateEOSBuffer());
320 template <DemuxerStream::Type StreamType
>
321 void DecoderStream
<StreamType
>::OnDecodeDone(int buffer_size
,
323 typename
Decoder::Status status
) {
324 FUNCTION_DVLOG(2) << ": " << status
;
325 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
326 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
328 DCHECK_GT(pending_decode_requests_
, 0);
330 --pending_decode_requests_
;
332 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
335 DCHECK(!pending_decode_requests_
);
336 decoding_eos_
= false;
339 if (state_
== STATE_ERROR
) {
340 DCHECK(read_cb_
.is_null());
344 // Drop decoding result if Reset() was called during decoding.
345 // The resetting process will be handled when the decoder is reset.
346 if (!reset_cb_
.is_null())
350 case Decoder::kDecodeError
:
351 state_
= STATE_ERROR
;
352 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString() << " decode error";
353 ready_outputs_
.clear();
354 if (!read_cb_
.is_null())
355 SatisfyRead(DECODE_ERROR
, NULL
);
358 case Decoder::kAborted
:
359 // Decoder can return kAborted during Reset() or during destruction.
363 // Any successful decode counts!
365 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
367 if (state_
== STATE_NORMAL
) {
369 state_
= STATE_END_OF_STREAM
;
370 if (ready_outputs_
.empty() && !read_cb_
.is_null())
371 SatisfyRead(OK
, StreamTraits::CreateEOSOutput());
376 ReadFromDemuxerStream();
380 if (state_
== STATE_FLUSHING_DECODER
&& !pending_decode_requests_
)
381 ReinitializeDecoder();
386 template <DemuxerStream::Type StreamType
>
387 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
388 const scoped_refptr
<Output
>& output
) {
389 FUNCTION_DVLOG(2) << ": " << output
->timestamp().InMilliseconds() << " ms";
390 DCHECK(output
.get());
391 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
392 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
395 if (state_
== STATE_ERROR
) {
396 DCHECK(read_cb_
.is_null());
400 // Drop decoding result if Reset() was called during decoding.
401 // The resetting process will be handled when the decoder is reset.
402 if (!reset_cb_
.is_null())
405 if (!read_cb_
.is_null()) {
406 // If |ready_outputs_| was non-empty, the read would have already been
407 // satisifed by Read().
408 DCHECK(ready_outputs_
.empty());
409 SatisfyRead(OK
, output
);
413 // Store decoded output.
414 ready_outputs_
.push_back(output
);
417 template <DemuxerStream::Type StreamType
>
418 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
420 DCHECK_EQ(state_
, STATE_NORMAL
);
421 DCHECK(CanDecodeMore());
422 DCHECK(reset_cb_
.is_null());
424 state_
= STATE_PENDING_DEMUXER_READ
;
425 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
426 weak_factory_
.GetWeakPtr()));
429 template <DemuxerStream::Type StreamType
>
430 void DecoderStream
<StreamType
>::OnBufferReady(
431 DemuxerStream::Status status
,
432 const scoped_refptr
<DecoderBuffer
>& buffer
) {
433 FUNCTION_DVLOG(2) << ": " << status
<< ", "
434 << (buffer
.get() ? buffer
->AsHumanReadableString()
437 DCHECK(task_runner_
->BelongsToCurrentThread());
438 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
440 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
442 // Decoding has been stopped (e.g due to an error).
443 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
444 DCHECK(state_
== STATE_ERROR
);
445 DCHECK(read_cb_
.is_null());
449 state_
= STATE_NORMAL
;
451 if (status
== DemuxerStream::kConfigChanged
) {
452 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
453 DCHECK(stream_
->SupportsConfigChanges());
455 if (!config_change_observer_cb_
.is_null())
456 config_change_observer_cb_
.Run();
458 state_
= STATE_FLUSHING_DECODER
;
459 if (!reset_cb_
.is_null()) {
460 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
461 // which will continue the resetting process in it's callback.
462 if (!decrypting_demuxer_stream_
)
463 Reset(base::ResetAndReturn(&reset_cb_
));
464 // Reinitialization will continue after Reset() is done.
471 if (!reset_cb_
.is_null()) {
472 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
473 // which will continue the resetting process in it's callback.
474 if (!decrypting_demuxer_stream_
)
475 Reset(base::ResetAndReturn(&reset_cb_
));
479 if (status
== DemuxerStream::kAborted
) {
480 if (!read_cb_
.is_null())
481 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
485 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
486 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
487 if (active_splice_
|| has_splice_ts
) {
488 splice_observer_cb_
.Run(buffer
->splice_timestamp());
489 active_splice_
= has_splice_ts
;
493 DCHECK(status
== DemuxerStream::kOk
) << status
;
496 // Read more data if the decoder supports multiple parallel decoding requests.
498 ReadFromDemuxerStream();
501 template <DemuxerStream::Type StreamType
>
502 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
504 DCHECK(task_runner_
->BelongsToCurrentThread());
505 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
);
506 DCHECK_EQ(pending_decode_requests_
, 0);
508 state_
= STATE_REINITIALIZING_DECODER
;
509 DecoderStreamTraits
<StreamType
>::InitializeDecoder(
510 decoder_
.get(), stream_
,
511 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
512 weak_factory_
.GetWeakPtr()),
513 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
514 weak_factory_
.GetWeakPtr()));
517 template <DemuxerStream::Type StreamType
>
518 void DecoderStream
<StreamType
>::OnDecoderReinitialized(bool success
) {
520 DCHECK(task_runner_
->BelongsToCurrentThread());
521 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
523 // ReinitializeDecoder() can be called in two cases:
524 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
525 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
526 // Also, Reset() can be called during pending ReinitializeDecoder().
527 // This function needs to handle them all!
530 // Reinitialization failed. Try to fall back to one of the remaining
531 // decoders. This will consume at least one decoder so doing it more than
533 // For simplicity, don't attempt to fall back to a decryptor. Calling this
534 // with a null callback ensures that one won't be selected.
535 SelectDecoder(SetDecryptorReadyCB());
537 CompleteDecoderReinitialization(true);
541 template <DemuxerStream::Type StreamType
>
542 void DecoderStream
<StreamType
>::CompleteDecoderReinitialization(bool success
) {
544 DCHECK(task_runner_
->BelongsToCurrentThread());
545 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
547 state_
= success
? STATE_NORMAL
: STATE_ERROR
;
549 if (!reset_cb_
.is_null()) {
550 base::ResetAndReturn(&reset_cb_
).Run();
554 if (read_cb_
.is_null())
557 if (state_
== STATE_ERROR
) {
558 MEDIA_LOG(ERROR
, media_log_
) << GetStreamTypeString()
559 << " decoder reinitialization failed";
560 SatisfyRead(DECODE_ERROR
, NULL
);
564 ReadFromDemuxerStream();
567 template <DemuxerStream::Type StreamType
>
568 void DecoderStream
<StreamType
>::ResetDecoder() {
570 DCHECK(task_runner_
->BelongsToCurrentThread());
571 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
572 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
573 DCHECK(!reset_cb_
.is_null());
575 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
576 weak_factory_
.GetWeakPtr()));
579 template <DemuxerStream::Type StreamType
>
580 void DecoderStream
<StreamType
>::OnDecoderReset() {
582 DCHECK(task_runner_
->BelongsToCurrentThread());
583 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
584 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
585 // If Reset() was called during pending read, read callback should be fired
586 // before the reset callback is fired.
587 DCHECK(read_cb_
.is_null());
588 DCHECK(!reset_cb_
.is_null());
590 if (state_
!= STATE_FLUSHING_DECODER
) {
591 state_
= STATE_NORMAL
;
592 active_splice_
= false;
593 base::ResetAndReturn(&reset_cb_
).Run();
597 // The resetting process will be continued in OnDecoderReinitialized().
598 ReinitializeDecoder();
601 template class DecoderStream
<DemuxerStream::VIDEO
>;
602 template class DecoderStream
<DemuxerStream::AUDIO
>;