1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/trace_event/trace_event.h"
13 #include "media/base/bind_to_current_loop.h"
14 #include "media/base/decoder_buffer.h"
15 #include "media/base/video_decoder.h"
16 #include "media/filters/decrypting_demuxer_stream.h"
20 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
21 // templated classes such as this.
22 template <DemuxerStream::Type StreamType
>
23 static const char* GetTraceString();
25 #define FUNCTION_DVLOG(level) \
26 DVLOG(level) << __FUNCTION__ << \
27 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
30 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
31 return "DecoderStream<VIDEO>::Decode";
35 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
36 return "DecoderStream<AUDIO>::Decode";
39 template <DemuxerStream::Type StreamType
>
40 DecoderStream
<StreamType
>::DecoderStream(
41 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
42 ScopedVector
<Decoder
> decoders
,
43 const scoped_refptr
<MediaLog
>& media_log
)
44 : task_runner_(task_runner
),
45 media_log_(media_log
),
46 state_(STATE_UNINITIALIZED
),
49 new DecoderSelector
<StreamType
>(task_runner
, decoders
.Pass())),
50 active_splice_(false),
52 pending_decode_requests_(0),
56 template <DemuxerStream::Type StreamType
>
57 DecoderStream
<StreamType
>::~DecoderStream() {
59 DCHECK(task_runner_
->BelongsToCurrentThread());
61 decoder_selector_
.reset();
63 if (!init_cb_
.is_null()) {
64 task_runner_
->PostTask(FROM_HERE
,
65 base::Bind(base::ResetAndReturn(&init_cb_
), false));
67 if (!read_cb_
.is_null()) {
68 task_runner_
->PostTask(FROM_HERE
, base::Bind(
69 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
71 if (!reset_cb_
.is_null())
72 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
76 decrypting_demuxer_stream_
.reset();
79 template <DemuxerStream::Type StreamType
>
80 void DecoderStream
<StreamType
>::Initialize(
81 DemuxerStream
* stream
,
82 const InitCB
& init_cb
,
83 const SetDecryptorReadyCB
& set_decryptor_ready_cb
,
84 const StatisticsCB
& statistics_cb
,
85 const base::Closure
& waiting_for_decryption_key_cb
) {
87 DCHECK(task_runner_
->BelongsToCurrentThread());
88 DCHECK_EQ(state_
, STATE_UNINITIALIZED
);
89 DCHECK(init_cb_
.is_null());
90 DCHECK(!init_cb
.is_null());
92 statistics_cb_
= statistics_cb
;
94 waiting_for_decryption_key_cb_
= waiting_for_decryption_key_cb
;
97 state_
= STATE_INITIALIZING
;
98 SelectDecoder(set_decryptor_ready_cb
);
101 template <DemuxerStream::Type StreamType
>
102 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
104 DCHECK(task_runner_
->BelongsToCurrentThread());
105 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_INITIALIZING
)
107 // No two reads in the flight at any time.
108 DCHECK(read_cb_
.is_null());
109 // No read during resetting or stopping process.
110 DCHECK(reset_cb_
.is_null());
112 if (state_
== STATE_ERROR
) {
113 task_runner_
->PostTask(
114 FROM_HERE
, base::Bind(read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
118 if (state_
== STATE_END_OF_STREAM
&& ready_outputs_
.empty()) {
119 task_runner_
->PostTask(
120 FROM_HERE
, base::Bind(read_cb
, OK
, StreamTraits::CreateEOSOutput()));
124 if (!ready_outputs_
.empty()) {
125 task_runner_
->PostTask(FROM_HERE
,
126 base::Bind(read_cb
, OK
, ready_outputs_
.front()));
127 ready_outputs_
.pop_front();
132 if (state_
== STATE_NORMAL
&& CanDecodeMore())
133 ReadFromDemuxerStream();
136 template <DemuxerStream::Type StreamType
>
137 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
139 DCHECK(task_runner_
->BelongsToCurrentThread());
140 DCHECK_NE(state_
, STATE_UNINITIALIZED
);
141 DCHECK(reset_cb_
.is_null());
145 if (!read_cb_
.is_null()) {
146 task_runner_
->PostTask(FROM_HERE
, base::Bind(
147 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
150 ready_outputs_
.clear();
152 // During decoder reinitialization, the Decoder does not need to be and
153 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
155 if (state_
== STATE_REINITIALIZING_DECODER
)
158 // During pending demuxer read and when not using DecryptingDemuxerStream,
159 // the Decoder will be reset after demuxer read is returned
160 // (in OnBufferReady()).
161 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
164 if (decrypting_demuxer_stream_
) {
165 decrypting_demuxer_stream_
->Reset(base::Bind(
166 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
173 template <DemuxerStream::Type StreamType
>
174 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
175 DCHECK(task_runner_
->BelongsToCurrentThread());
176 return !ready_outputs_
.empty() || decoder_
->CanReadWithoutStalling();
180 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
181 DCHECK(task_runner_
->BelongsToCurrentThread());
185 template <DemuxerStream::Type StreamType
>
186 int DecoderStream
<StreamType
>::GetMaxDecodeRequests() const {
187 return decoder_
->GetMaxDecodeRequests();
191 int DecoderStream
<DemuxerStream::AUDIO
>::GetMaxDecodeRequests() const {
195 template <DemuxerStream::Type StreamType
>
196 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
197 DCHECK(task_runner_
->BelongsToCurrentThread());
199 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
200 // It only makes sense to saturate decoder completely when output queue is
203 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
204 return !decoding_eos_
&& num_decodes
< GetMaxDecodeRequests();
207 template <DemuxerStream::Type StreamType
>
208 void DecoderStream
<StreamType
>::SelectDecoder(
209 const SetDecryptorReadyCB
& set_decryptor_ready_cb
) {
210 decoder_selector_
->SelectDecoder(
211 stream_
, set_decryptor_ready_cb
,
212 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
213 weak_factory_
.GetWeakPtr()),
214 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
215 weak_factory_
.GetWeakPtr()),
216 waiting_for_decryption_key_cb_
);
219 template <DemuxerStream::Type StreamType
>
220 void DecoderStream
<StreamType
>::OnDecoderSelected(
221 scoped_ptr
<Decoder
> selected_decoder
,
222 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
223 FUNCTION_DVLOG(2) << ": "
224 << (selected_decoder
? selected_decoder
->GetDisplayName()
225 : "No decoder selected.");
226 DCHECK(task_runner_
->BelongsToCurrentThread());
227 DCHECK(state_
== STATE_INITIALIZING
|| state_
== STATE_REINITIALIZING_DECODER
)
229 if (state_
== STATE_INITIALIZING
) {
230 DCHECK(!init_cb_
.is_null());
231 DCHECK(read_cb_
.is_null());
232 DCHECK(reset_cb_
.is_null());
237 previous_decoder_
= decoder_
.Pass();
238 decoder_
= selected_decoder
.Pass();
239 if (decrypting_demuxer_stream
) {
240 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
241 stream_
= decrypting_demuxer_stream_
.get();
245 if (state_
== STATE_INITIALIZING
) {
246 state_
= STATE_UNINITIALIZED
;
247 base::ResetAndReturn(&init_cb_
).Run(false);
249 CompleteDecoderReinitialization(false);
254 const std::string stream_type
= DecoderStreamTraits
<StreamType
>::ToString();
255 media_log_
->SetBooleanProperty(stream_type
+ "_dds",
256 decrypting_demuxer_stream_
);
257 media_log_
->SetStringProperty(stream_type
+ "_decoder",
258 decoder_
->GetDisplayName());
260 if (state_
== STATE_REINITIALIZING_DECODER
) {
261 CompleteDecoderReinitialization(true);
265 // Initialization succeeded.
266 state_
= STATE_NORMAL
;
267 if (StreamTraits::NeedsBitstreamConversion(decoder_
.get()))
268 stream_
->EnableBitstreamConverter();
269 base::ResetAndReturn(&init_cb_
).Run(true);
272 template <DemuxerStream::Type StreamType
>
273 void DecoderStream
<StreamType
>::SatisfyRead(
275 const scoped_refptr
<Output
>& output
) {
276 DCHECK(!read_cb_
.is_null());
277 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
280 template <DemuxerStream::Type StreamType
>
281 void DecoderStream
<StreamType
>::Decode(
282 const scoped_refptr
<DecoderBuffer
>& buffer
) {
284 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
285 DCHECK_LT(pending_decode_requests_
, GetMaxDecodeRequests());
286 DCHECK(reset_cb_
.is_null());
287 DCHECK(buffer
.get());
289 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
291 TRACE_EVENT_ASYNC_BEGIN2(
292 "media", GetTraceString
<StreamType
>(), this, "key frame",
293 !buffer
->end_of_stream() && buffer
->is_key_frame(), "timestamp (ms)",
294 buffer
->timestamp().InMilliseconds());
296 if (buffer
->end_of_stream())
297 decoding_eos_
= true;
299 ++pending_decode_requests_
;
300 decoder_
->Decode(buffer
,
301 base::Bind(&DecoderStream
<StreamType
>::OnDecodeDone
,
302 weak_factory_
.GetWeakPtr(),
304 buffer
->end_of_stream()));
307 template <DemuxerStream::Type StreamType
>
308 void DecoderStream
<StreamType
>::FlushDecoder() {
309 Decode(DecoderBuffer::CreateEOSBuffer());
312 template <DemuxerStream::Type StreamType
>
313 void DecoderStream
<StreamType
>::OnDecodeDone(int buffer_size
,
315 typename
Decoder::Status status
) {
316 FUNCTION_DVLOG(2) << ": " << status
;
317 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
318 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
320 DCHECK_GT(pending_decode_requests_
, 0);
322 --pending_decode_requests_
;
324 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
327 DCHECK(!pending_decode_requests_
);
328 decoding_eos_
= false;
331 if (state_
== STATE_ERROR
) {
332 DCHECK(read_cb_
.is_null());
336 // Drop decoding result if Reset() was called during decoding.
337 // The resetting process will be handled when the decoder is reset.
338 if (!reset_cb_
.is_null())
342 case Decoder::kDecodeError
:
343 state_
= STATE_ERROR
;
344 ready_outputs_
.clear();
345 if (!read_cb_
.is_null())
346 SatisfyRead(DECODE_ERROR
, NULL
);
349 case Decoder::kAborted
:
350 // Decoder can return kAborted during Reset() or during destruction.
354 // Any successful decode counts!
356 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
358 if (state_
== STATE_NORMAL
) {
360 state_
= STATE_END_OF_STREAM
;
361 if (ready_outputs_
.empty() && !read_cb_
.is_null())
362 SatisfyRead(OK
, StreamTraits::CreateEOSOutput());
367 ReadFromDemuxerStream();
371 if (state_
== STATE_FLUSHING_DECODER
&& !pending_decode_requests_
)
372 ReinitializeDecoder();
377 template <DemuxerStream::Type StreamType
>
378 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
379 const scoped_refptr
<Output
>& output
) {
380 FUNCTION_DVLOG(2) << ": " << output
->timestamp().InMilliseconds() << " ms";
381 DCHECK(output
.get());
382 DCHECK(!output
->end_of_stream());
383 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
384 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
387 if (state_
== STATE_ERROR
) {
388 DCHECK(read_cb_
.is_null());
392 // Drop decoding result if Reset() was called during decoding.
393 // The resetting process will be handled when the decoder is reset.
394 if (!reset_cb_
.is_null())
397 if (!read_cb_
.is_null()) {
398 // If |ready_outputs_| was non-empty, the read would have already been
399 // satisifed by Read().
400 DCHECK(ready_outputs_
.empty());
401 SatisfyRead(OK
, output
);
405 // Store decoded output.
406 ready_outputs_
.push_back(output
);
409 template <DemuxerStream::Type StreamType
>
410 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
412 DCHECK_EQ(state_
, STATE_NORMAL
);
413 DCHECK(CanDecodeMore());
414 DCHECK(reset_cb_
.is_null());
416 state_
= STATE_PENDING_DEMUXER_READ
;
417 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
418 weak_factory_
.GetWeakPtr()));
421 template <DemuxerStream::Type StreamType
>
422 void DecoderStream
<StreamType
>::OnBufferReady(
423 DemuxerStream::Status status
,
424 const scoped_refptr
<DecoderBuffer
>& buffer
) {
425 FUNCTION_DVLOG(2) << ": " << status
<< ", "
426 << (buffer
.get() ? buffer
->AsHumanReadableString()
429 DCHECK(task_runner_
->BelongsToCurrentThread());
430 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
432 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
434 // Decoding has been stopped (e.g due to an error).
435 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
436 DCHECK(state_
== STATE_ERROR
);
437 DCHECK(read_cb_
.is_null());
441 state_
= STATE_NORMAL
;
443 if (status
== DemuxerStream::kConfigChanged
) {
444 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
445 DCHECK(stream_
->SupportsConfigChanges());
447 if (!config_change_observer_cb_
.is_null())
448 config_change_observer_cb_
.Run();
450 state_
= STATE_FLUSHING_DECODER
;
451 if (!reset_cb_
.is_null()) {
452 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
453 // which will continue the resetting process in it's callback.
454 if (!decrypting_demuxer_stream_
)
455 Reset(base::ResetAndReturn(&reset_cb_
));
456 // Reinitialization will continue after Reset() is done.
463 if (!reset_cb_
.is_null()) {
464 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
465 // which will continue the resetting process in it's callback.
466 if (!decrypting_demuxer_stream_
)
467 Reset(base::ResetAndReturn(&reset_cb_
));
471 if (status
== DemuxerStream::kAborted
) {
472 if (!read_cb_
.is_null())
473 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
477 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
478 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
479 if (active_splice_
|| has_splice_ts
) {
480 splice_observer_cb_
.Run(buffer
->splice_timestamp());
481 active_splice_
= has_splice_ts
;
485 DCHECK(status
== DemuxerStream::kOk
) << status
;
488 // Read more data if the decoder supports multiple parallel decoding requests.
490 ReadFromDemuxerStream();
493 template <DemuxerStream::Type StreamType
>
494 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
496 DCHECK(task_runner_
->BelongsToCurrentThread());
497 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
);
498 DCHECK_EQ(pending_decode_requests_
, 0);
500 state_
= STATE_REINITIALIZING_DECODER
;
501 DecoderStreamTraits
<StreamType
>::InitializeDecoder(
502 decoder_
.get(), stream_
,
503 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
504 weak_factory_
.GetWeakPtr()),
505 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
506 weak_factory_
.GetWeakPtr()));
509 template <DemuxerStream::Type StreamType
>
510 void DecoderStream
<StreamType
>::OnDecoderReinitialized(PipelineStatus status
) {
512 DCHECK(task_runner_
->BelongsToCurrentThread());
513 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
515 // ReinitializeDecoder() can be called in two cases:
516 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
517 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
518 // Also, Reset() can be called during pending ReinitializeDecoder().
519 // This function needs to handle them all!
521 if (status
!= PIPELINE_OK
) {
522 // Reinitialization failed. Try to fall back to one of the remaining
523 // decoders. This will consume at least one decoder so doing it more than
525 // For simplicity, don't attempt to fall back to a decryptor. Calling this
526 // with a null callback ensures that one won't be selected.
527 SelectDecoder(SetDecryptorReadyCB());
529 CompleteDecoderReinitialization(true);
533 template <DemuxerStream::Type StreamType
>
534 void DecoderStream
<StreamType
>::CompleteDecoderReinitialization(bool success
) {
536 DCHECK(task_runner_
->BelongsToCurrentThread());
537 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
);
539 state_
= success
? STATE_NORMAL
: STATE_ERROR
;
541 if (!reset_cb_
.is_null()) {
542 base::ResetAndReturn(&reset_cb_
).Run();
546 if (read_cb_
.is_null())
549 if (state_
== STATE_ERROR
) {
550 SatisfyRead(DECODE_ERROR
, NULL
);
554 ReadFromDemuxerStream();
557 template <DemuxerStream::Type StreamType
>
558 void DecoderStream
<StreamType
>::ResetDecoder() {
560 DCHECK(task_runner_
->BelongsToCurrentThread());
561 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
562 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
563 DCHECK(!reset_cb_
.is_null());
565 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
566 weak_factory_
.GetWeakPtr()));
569 template <DemuxerStream::Type StreamType
>
570 void DecoderStream
<StreamType
>::OnDecoderReset() {
572 DCHECK(task_runner_
->BelongsToCurrentThread());
573 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
574 state_
== STATE_ERROR
|| state_
== STATE_END_OF_STREAM
) << state_
;
575 // If Reset() was called during pending read, read callback should be fired
576 // before the reset callback is fired.
577 DCHECK(read_cb_
.is_null());
578 DCHECK(!reset_cb_
.is_null());
580 if (state_
!= STATE_FLUSHING_DECODER
) {
581 state_
= STATE_NORMAL
;
582 active_splice_
= false;
583 base::ResetAndReturn(&reset_cb_
).Run();
587 // The resetting process will be continued in OnDecoderReinitialized().
588 ReinitializeDecoder();
591 template class DecoderStream
<DemuxerStream::VIDEO
>;
592 template class DecoderStream
<DemuxerStream::AUDIO
>;