1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/debug/trace_event.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/single_thread_task_runner.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType
>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
32 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
33 return "DecoderStream<VIDEO>::Decode";
37 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType
>
42 DecoderStream
<StreamType
>::DecoderStream(
43 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
44 ScopedVector
<Decoder
> decoders
,
45 const SetDecryptorReadyCB
& set_decryptor_ready_cb
)
46 : task_runner_(task_runner
),
47 state_(STATE_UNINITIALIZED
),
50 new DecoderSelector
<StreamType
>(task_runner
,
52 set_decryptor_ready_cb
)),
53 active_splice_(false),
54 pending_decode_requests_(0),
55 weak_factory_(this) {}
57 template <DemuxerStream::Type StreamType
>
58 DecoderStream
<StreamType
>::~DecoderStream() {
59 DCHECK(state_
== STATE_UNINITIALIZED
|| state_
== STATE_STOPPED
) << state_
;
62 template <DemuxerStream::Type StreamType
>
63 void DecoderStream
<StreamType
>::Initialize(DemuxerStream
* stream
,
65 const StatisticsCB
& statistics_cb
,
66 const InitCB
& init_cb
) {
68 DCHECK(task_runner_
->BelongsToCurrentThread());
69 DCHECK_EQ(state_
, STATE_UNINITIALIZED
) << state_
;
70 DCHECK(init_cb_
.is_null());
71 DCHECK(!init_cb
.is_null());
73 statistics_cb_
= statistics_cb
;
76 low_delay_
= low_delay
;
78 state_
= STATE_INITIALIZING
;
79 // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
80 decoder_selector_
->SelectDecoder(
82 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
83 weak_factory_
.GetWeakPtr()));
86 template <DemuxerStream::Type StreamType
>
87 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
89 DCHECK(task_runner_
->BelongsToCurrentThread());
90 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
91 state_
== STATE_ERROR
|| state_
== STATE_REINITIALIZING_DECODER
||
92 state_
== STATE_PENDING_DEMUXER_READ
)
94 // No two reads in the flight at any time.
95 DCHECK(read_cb_
.is_null());
96 // No read during resetting or stopping process.
97 DCHECK(reset_cb_
.is_null());
98 DCHECK(stop_cb_
.is_null());
102 if (state_
== STATE_ERROR
) {
103 task_runner_
->PostTask(FROM_HERE
,
104 base::Bind(base::ResetAndReturn(&read_cb_
),
106 scoped_refptr
<Output
>()));
110 if (!ready_outputs_
.empty()) {
111 task_runner_
->PostTask(FROM_HERE
, base::Bind(
112 base::ResetAndReturn(&read_cb_
), OK
, ready_outputs_
.front()));
113 ready_outputs_
.pop_front();
116 // Decoder may be in reinitializing state as result of the previous Read().
117 if (state_
== STATE_REINITIALIZING_DECODER
)
120 if (!CanDecodeMore())
123 if (state_
== STATE_FLUSHING_DECODER
) {
128 if (state_
!= STATE_PENDING_DEMUXER_READ
)
129 ReadFromDemuxerStream();
132 template <DemuxerStream::Type StreamType
>
133 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
135 DCHECK(task_runner_
->BelongsToCurrentThread());
136 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_STOPPED
) << state_
;
137 DCHECK(reset_cb_
.is_null());
138 DCHECK(stop_cb_
.is_null());
142 if (!read_cb_
.is_null()) {
143 task_runner_
->PostTask(FROM_HERE
, base::Bind(
144 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
147 ready_outputs_
.clear();
149 // During decoder reinitialization, the Decoder does not need to be and
150 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
152 if (state_
== STATE_REINITIALIZING_DECODER
)
155 // During pending demuxer read and when not using DecryptingDemuxerStream,
156 // the Decoder will be reset after demuxer read is returned
157 // (in OnBufferReady()).
158 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
161 if (decrypting_demuxer_stream_
) {
162 decrypting_demuxer_stream_
->Reset(base::Bind(
163 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
170 template <DemuxerStream::Type StreamType
>
171 void DecoderStream
<StreamType
>::Stop(const base::Closure
& closure
) {
173 DCHECK(task_runner_
->BelongsToCurrentThread());
174 DCHECK_NE(state_
, STATE_STOPPED
) << state_
;
175 DCHECK(stop_cb_
.is_null());
179 if (state_
== STATE_INITIALIZING
) {
180 decoder_selector_
->Abort();
184 DCHECK(init_cb_
.is_null());
186 // All pending callbacks will be dropped.
187 weak_factory_
.InvalidateWeakPtrs();
189 // Post callbacks to prevent reentrance into this object.
190 if (!read_cb_
.is_null()) {
191 task_runner_
->PostTask(FROM_HERE
, base::Bind(
192 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
194 if (!reset_cb_
.is_null())
195 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
197 if (decrypting_demuxer_stream_
) {
198 decrypting_demuxer_stream_
->Stop(base::Bind(
199 &DecoderStream
<StreamType
>::StopDecoder
, weak_factory_
.GetWeakPtr()));
203 // We may not have a |decoder_| if Stop() was called during initialization.
209 state_
= STATE_STOPPED
;
212 decrypting_demuxer_stream_
.reset();
213 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&stop_cb_
));
216 template <DemuxerStream::Type StreamType
>
217 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
218 DCHECK(task_runner_
->BelongsToCurrentThread());
219 return decoder_
->CanReadWithoutStalling();
223 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
224 DCHECK(task_runner_
->BelongsToCurrentThread());
228 template <DemuxerStream::Type StreamType
>
229 bool DecoderStream
<StreamType
>::CanDecodeMore() const {
230 DCHECK(task_runner_
->BelongsToCurrentThread());
232 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
233 // It only makes sense to saturate decoder completely when output queue is
236 static_cast<int>(ready_outputs_
.size()) + pending_decode_requests_
;
237 return num_decodes
< decoder_
->GetMaxDecodeRequests();
241 bool DecoderStream
<DemuxerStream::AUDIO
>::CanDecodeMore() const {
242 DCHECK(task_runner_
->BelongsToCurrentThread());
243 return !pending_decode_requests_
&& ready_outputs_
.empty();
246 template <DemuxerStream::Type StreamType
>
247 void DecoderStream
<StreamType
>::OnDecoderSelected(
248 scoped_ptr
<Decoder
> selected_decoder
,
249 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
251 DCHECK(task_runner_
->BelongsToCurrentThread());
252 DCHECK_EQ(state_
, STATE_INITIALIZING
) << state_
;
253 DCHECK(!init_cb_
.is_null());
254 DCHECK(read_cb_
.is_null());
255 DCHECK(reset_cb_
.is_null());
257 decoder_selector_
.reset();
258 if (decrypting_demuxer_stream
)
259 stream_
= decrypting_demuxer_stream
.get();
261 if (!selected_decoder
) {
262 state_
= STATE_UNINITIALIZED
;
263 StreamTraits::FinishInitialization(
264 base::ResetAndReturn(&init_cb_
), selected_decoder
.get(), stream_
);
266 state_
= STATE_NORMAL
;
267 decoder_
= selected_decoder
.Pass();
268 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
269 StreamTraits::FinishInitialization(
270 base::ResetAndReturn(&init_cb_
), decoder_
.get(), stream_
);
273 // Stop() called during initialization.
274 if (!stop_cb_
.is_null()) {
275 Stop(base::ResetAndReturn(&stop_cb_
));
280 template <DemuxerStream::Type StreamType
>
281 void DecoderStream
<StreamType
>::SatisfyRead(
283 const scoped_refptr
<Output
>& output
) {
284 DCHECK(!read_cb_
.is_null());
285 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
288 template <DemuxerStream::Type StreamType
>
289 void DecoderStream
<StreamType
>::Decode(
290 const scoped_refptr
<DecoderBuffer
>& buffer
) {
292 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
293 DCHECK(CanDecodeMore());
294 DCHECK(reset_cb_
.is_null());
295 DCHECK(stop_cb_
.is_null());
298 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
300 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString
<StreamType
>(), this);
301 ++pending_decode_requests_
;
302 decoder_
->Decode(buffer
,
303 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
304 weak_factory_
.GetWeakPtr(),
308 template <DemuxerStream::Type StreamType
>
309 void DecoderStream
<StreamType
>::FlushDecoder() {
310 if (pending_decode_requests_
== 0)
311 Decode(DecoderBuffer::CreateEOSBuffer());
314 template <DemuxerStream::Type StreamType
>
315 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
317 typename
Decoder::Status status
,
318 const scoped_refptr
<Output
>& output
) {
319 FUNCTION_DVLOG(2) << status
<< " " << output
;
320 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
321 state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
)
323 DCHECK(stop_cb_
.is_null());
324 DCHECK_EQ(status
== Decoder::kOk
, output
!= NULL
);
325 DCHECK_GT(pending_decode_requests_
, 0);
327 --pending_decode_requests_
;
329 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
331 if (state_
== STATE_ERROR
) {
332 DCHECK(read_cb_
.is_null());
336 if (status
== Decoder::kDecodeError
) {
337 state_
= STATE_ERROR
;
338 ready_outputs_
.clear();
339 if (!read_cb_
.is_null())
340 SatisfyRead(DECODE_ERROR
, NULL
);
344 if (status
== Decoder::kDecryptError
) {
345 state_
= STATE_ERROR
;
346 ready_outputs_
.clear();
347 if (!read_cb_
.is_null())
348 SatisfyRead(DECRYPT_ERROR
, NULL
);
352 if (status
== Decoder::kAborted
) {
353 if (!read_cb_
.is_null())
354 SatisfyRead(ABORTED
, NULL
);
358 // Any successful decode counts!
359 if (buffer_size
> 0) {
360 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
363 // Drop decoding result if Reset() was called during decoding.
364 // The resetting process will be handled when the decoder is reset.
365 if (!reset_cb_
.is_null())
368 // Decoder flushed. Reinitialize the decoder.
369 if (state_
== STATE_FLUSHING_DECODER
&&
370 status
== Decoder::kOk
&& output
->end_of_stream()) {
371 ReinitializeDecoder();
375 if (status
== Decoder::kNotEnoughData
) {
376 if (state_
== STATE_NORMAL
)
377 ReadFromDemuxerStream();
378 else if (state_
== STATE_FLUSHING_DECODER
)
385 // Store decoded output.
386 ready_outputs_
.push_back(output
);
387 scoped_refptr
<Output
> extra_output
;
388 while ((extra_output
= decoder_
->GetDecodeOutput()) != NULL
) {
389 ready_outputs_
.push_back(extra_output
);
392 // Satisfy outstanding read request, if any.
393 if (!read_cb_
.is_null()) {
394 scoped_refptr
<Output
> read_result
= ready_outputs_
.front();
395 ready_outputs_
.pop_front();
396 SatisfyRead(OK
, output
);
400 template <DemuxerStream::Type StreamType
>
401 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
403 DCHECK_EQ(state_
, STATE_NORMAL
) << state_
;
404 DCHECK(CanDecodeMore());
405 DCHECK(reset_cb_
.is_null());
406 DCHECK(stop_cb_
.is_null());
408 state_
= STATE_PENDING_DEMUXER_READ
;
409 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
410 weak_factory_
.GetWeakPtr()));
413 template <DemuxerStream::Type StreamType
>
414 void DecoderStream
<StreamType
>::OnBufferReady(
415 DemuxerStream::Status status
,
416 const scoped_refptr
<DecoderBuffer
>& buffer
) {
417 FUNCTION_DVLOG(2) << ": " << status
;
418 DCHECK(task_runner_
->BelongsToCurrentThread());
419 DCHECK(state_
== STATE_PENDING_DEMUXER_READ
|| state_
== STATE_ERROR
||
420 state_
== STATE_STOPPED
)
422 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
423 DCHECK(stop_cb_
.is_null());
425 // Decoding has been stopped (e.g due to an error).
426 if (state_
!= STATE_PENDING_DEMUXER_READ
) {
427 DCHECK(state_
== STATE_ERROR
|| state_
== STATE_STOPPED
);
428 DCHECK(read_cb_
.is_null());
432 state_
= STATE_NORMAL
;
434 if (status
== DemuxerStream::kConfigChanged
) {
435 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
436 DCHECK(stream_
->SupportsConfigChanges());
438 if (!config_change_observer_cb_
.is_null())
439 config_change_observer_cb_
.Run();
441 state_
= STATE_FLUSHING_DECODER
;
442 if (!reset_cb_
.is_null()) {
443 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
444 // which will continue the resetting process in it's callback.
445 if (!decrypting_demuxer_stream_
)
446 Reset(base::ResetAndReturn(&reset_cb_
));
447 // Reinitialization will continue after Reset() is done.
454 if (!reset_cb_
.is_null()) {
455 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
456 // which will continue the resetting process in it's callback.
457 if (!decrypting_demuxer_stream_
)
458 Reset(base::ResetAndReturn(&reset_cb_
));
462 if (status
== DemuxerStream::kAborted
) {
463 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
467 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
468 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
469 if (active_splice_
|| has_splice_ts
) {
470 splice_observer_cb_
.Run(buffer
->splice_timestamp());
471 active_splice_
= has_splice_ts
;
475 DCHECK(status
== DemuxerStream::kOk
) << status
;
478 // Read more data if the decoder supports multiple parallel decoding requests.
479 if (CanDecodeMore() && !buffer
->end_of_stream())
480 ReadFromDemuxerStream();
483 template <DemuxerStream::Type StreamType
>
484 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
486 DCHECK(task_runner_
->BelongsToCurrentThread());
487 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
) << state_
;
488 DCHECK_EQ(pending_decode_requests_
, 0);
490 DCHECK(StreamTraits::GetDecoderConfig(*stream_
).IsValidConfig());
491 state_
= STATE_REINITIALIZING_DECODER
;
492 DecoderStreamTraits
<StreamType
>::Initialize(
494 StreamTraits::GetDecoderConfig(*stream_
),
496 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
497 weak_factory_
.GetWeakPtr()));
500 template <DemuxerStream::Type StreamType
>
501 void DecoderStream
<StreamType
>::OnDecoderReinitialized(PipelineStatus status
) {
503 DCHECK(task_runner_
->BelongsToCurrentThread());
504 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
) << state_
;
505 DCHECK(stop_cb_
.is_null());
507 // ReinitializeDecoder() can be called in two cases:
508 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
509 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
510 // Also, Reset() can be called during pending ReinitializeDecoder().
511 // This function needs to handle them all!
513 state_
= (status
== PIPELINE_OK
) ? STATE_NORMAL
: STATE_ERROR
;
515 if (!reset_cb_
.is_null()) {
516 base::ResetAndReturn(&reset_cb_
).Run();
520 if (read_cb_
.is_null())
523 if (state_
== STATE_ERROR
) {
524 SatisfyRead(DECODE_ERROR
, NULL
);
528 ReadFromDemuxerStream();
531 template <DemuxerStream::Type StreamType
>
532 void DecoderStream
<StreamType
>::ResetDecoder() {
534 DCHECK(task_runner_
->BelongsToCurrentThread());
535 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
536 state_
== STATE_ERROR
) << state_
;
537 DCHECK(!reset_cb_
.is_null());
539 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
540 weak_factory_
.GetWeakPtr()));
543 template <DemuxerStream::Type StreamType
>
544 void DecoderStream
<StreamType
>::OnDecoderReset() {
546 DCHECK(task_runner_
->BelongsToCurrentThread());
547 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
548 state_
== STATE_ERROR
) << state_
;
549 // If Reset() was called during pending read, read callback should be fired
550 // before the reset callback is fired.
551 DCHECK(read_cb_
.is_null());
552 DCHECK(!reset_cb_
.is_null());
553 DCHECK(stop_cb_
.is_null());
555 if (state_
!= STATE_FLUSHING_DECODER
) {
556 base::ResetAndReturn(&reset_cb_
).Run();
560 // The resetting process will be continued in OnDecoderReinitialized().
561 ReinitializeDecoder();
564 template <DemuxerStream::Type StreamType
>
565 void DecoderStream
<StreamType
>::StopDecoder() {
567 DCHECK(task_runner_
->BelongsToCurrentThread());
568 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_STOPPED
) << state_
;
569 DCHECK(!stop_cb_
.is_null());
571 state_
= STATE_STOPPED
;
575 decrypting_demuxer_stream_
.reset();
576 // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also
578 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&stop_cb_
));
581 template class DecoderStream
<DemuxerStream::VIDEO
>;
582 template class DecoderStream
<DemuxerStream::AUDIO
>;