1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
8 #include "base/callback_helpers.h"
9 #include "base/debug/trace_event.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/single_thread_task_runner.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType
>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
32 const char* GetTraceString
<DemuxerStream::VIDEO
>() {
33 return "DecoderStream<VIDEO>::Decode";
37 const char* GetTraceString
<DemuxerStream::AUDIO
>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType
>
42 DecoderStream
<StreamType
>::DecoderStream(
43 const scoped_refptr
<base::SingleThreadTaskRunner
>& task_runner
,
44 ScopedVector
<Decoder
> decoders
,
45 const SetDecryptorReadyCB
& set_decryptor_ready_cb
)
46 : task_runner_(task_runner
),
47 state_(STATE_UNINITIALIZED
),
50 new DecoderSelector
<StreamType
>(task_runner
,
52 set_decryptor_ready_cb
)),
53 active_splice_(false),
54 weak_factory_(this) {}
56 template <DemuxerStream::Type StreamType
>
57 DecoderStream
<StreamType
>::~DecoderStream() {
58 DCHECK(state_
== STATE_UNINITIALIZED
|| state_
== STATE_STOPPED
) << state_
;
61 template <DemuxerStream::Type StreamType
>
62 void DecoderStream
<StreamType
>::Initialize(DemuxerStream
* stream
,
63 const StatisticsCB
& statistics_cb
,
64 const InitCB
& init_cb
) {
66 DCHECK(task_runner_
->BelongsToCurrentThread());
67 DCHECK_EQ(state_
, STATE_UNINITIALIZED
) << state_
;
68 DCHECK(init_cb_
.is_null());
69 DCHECK(!init_cb
.is_null());
71 statistics_cb_
= statistics_cb
;
75 state_
= STATE_INITIALIZING
;
76 // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
77 decoder_selector_
->SelectDecoder(
79 base::Bind(&DecoderStream
<StreamType
>::OnDecoderSelected
,
80 weak_factory_
.GetWeakPtr()));
83 template <DemuxerStream::Type StreamType
>
84 void DecoderStream
<StreamType
>::Read(const ReadCB
& read_cb
) {
86 DCHECK(task_runner_
->BelongsToCurrentThread());
87 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
88 state_
== STATE_ERROR
) << state_
;
89 // No two reads in the flight at any time.
90 DCHECK(read_cb_
.is_null());
91 // No read during resetting or stopping process.
92 DCHECK(reset_cb_
.is_null());
93 DCHECK(stop_cb_
.is_null());
95 if (state_
== STATE_ERROR
) {
96 task_runner_
->PostTask(FROM_HERE
, base::Bind(
97 read_cb
, DECODE_ERROR
, scoped_refptr
<Output
>()));
103 if (state_
== STATE_FLUSHING_DECODER
) {
108 scoped_refptr
<Output
> output
= decoder_
->GetDecodeOutput();
110 // If the decoder has queued output ready to go we don't need a demuxer read.
112 task_runner_
->PostTask(
113 FROM_HERE
, base::Bind(base::ResetAndReturn(&read_cb_
), OK
, output
));
117 ReadFromDemuxerStream();
120 template <DemuxerStream::Type StreamType
>
121 void DecoderStream
<StreamType
>::Reset(const base::Closure
& closure
) {
123 DCHECK(task_runner_
->BelongsToCurrentThread());
124 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_STOPPED
) << state_
;
125 DCHECK(reset_cb_
.is_null());
126 DCHECK(stop_cb_
.is_null());
130 // During decoder reinitialization, the Decoder does not need to be and
131 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
133 if (state_
== STATE_REINITIALIZING_DECODER
)
136 // During pending demuxer read and when not using DecryptingDemuxerStream,
137 // the Decoder will be reset after demuxer read is returned
138 // (in OnBufferReady()).
139 if (state_
== STATE_PENDING_DEMUXER_READ
&& !decrypting_demuxer_stream_
)
142 // The Decoder API guarantees that if Decoder::Reset() is called during
143 // a pending decode, the decode callback must be fired before the reset
144 // callback is fired. Therefore, we can call Decoder::Reset() regardless
145 // of if we have a pending decode and always satisfy the reset callback when
146 // the decoder reset is finished.
147 if (decrypting_demuxer_stream_
) {
148 decrypting_demuxer_stream_
->Reset(base::Bind(
149 &DecoderStream
<StreamType
>::ResetDecoder
, weak_factory_
.GetWeakPtr()));
156 template <DemuxerStream::Type StreamType
>
157 void DecoderStream
<StreamType
>::Stop(const base::Closure
& closure
) {
159 DCHECK(task_runner_
->BelongsToCurrentThread());
160 DCHECK_NE(state_
, STATE_STOPPED
) << state_
;
161 DCHECK(stop_cb_
.is_null());
165 if (state_
== STATE_INITIALIZING
) {
166 decoder_selector_
->Abort();
170 DCHECK(init_cb_
.is_null());
172 // All pending callbacks will be dropped.
173 weak_factory_
.InvalidateWeakPtrs();
175 // Post callbacks to prevent reentrance into this object.
176 if (!read_cb_
.is_null())
177 task_runner_
->PostTask(FROM_HERE
, base::Bind(
178 base::ResetAndReturn(&read_cb_
), ABORTED
, scoped_refptr
<Output
>()));
179 if (!reset_cb_
.is_null())
180 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&reset_cb_
));
182 if (decrypting_demuxer_stream_
) {
183 decrypting_demuxer_stream_
->Stop(base::Bind(
184 &DecoderStream
<StreamType
>::StopDecoder
, weak_factory_
.GetWeakPtr()));
188 // We may not have a |decoder_| if Stop() was called during initialization.
194 state_
= STATE_STOPPED
;
197 decrypting_demuxer_stream_
.reset();
198 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&stop_cb_
));
201 template <DemuxerStream::Type StreamType
>
202 bool DecoderStream
<StreamType
>::CanReadWithoutStalling() const {
203 DCHECK(task_runner_
->BelongsToCurrentThread());
204 return decoder_
->CanReadWithoutStalling();
208 bool DecoderStream
<DemuxerStream::AUDIO
>::CanReadWithoutStalling() const {
209 DCHECK(task_runner_
->BelongsToCurrentThread());
213 template <DemuxerStream::Type StreamType
>
214 void DecoderStream
<StreamType
>::OnDecoderSelected(
215 scoped_ptr
<Decoder
> selected_decoder
,
216 scoped_ptr
<DecryptingDemuxerStream
> decrypting_demuxer_stream
) {
218 DCHECK(task_runner_
->BelongsToCurrentThread());
219 DCHECK_EQ(state_
, STATE_INITIALIZING
) << state_
;
220 DCHECK(!init_cb_
.is_null());
221 DCHECK(read_cb_
.is_null());
222 DCHECK(reset_cb_
.is_null());
224 decoder_selector_
.reset();
225 if (decrypting_demuxer_stream
)
226 stream_
= decrypting_demuxer_stream
.get();
228 if (!selected_decoder
) {
229 state_
= STATE_UNINITIALIZED
;
230 StreamTraits::FinishInitialization(
231 base::ResetAndReturn(&init_cb_
), selected_decoder
.get(), stream_
);
233 state_
= STATE_NORMAL
;
234 decoder_
= selected_decoder
.Pass();
235 decrypting_demuxer_stream_
= decrypting_demuxer_stream
.Pass();
236 StreamTraits::FinishInitialization(
237 base::ResetAndReturn(&init_cb_
), decoder_
.get(), stream_
);
240 // Stop() called during initialization.
241 if (!stop_cb_
.is_null()) {
242 Stop(base::ResetAndReturn(&stop_cb_
));
247 template <DemuxerStream::Type StreamType
>
248 void DecoderStream
<StreamType
>::SatisfyRead(
250 const scoped_refptr
<Output
>& output
) {
251 DCHECK(!read_cb_
.is_null());
252 base::ResetAndReturn(&read_cb_
).Run(status
, output
);
255 template <DemuxerStream::Type StreamType
>
256 void DecoderStream
<StreamType
>::AbortRead() {
257 // Abort read during pending reset. It is safe to fire the |read_cb_| directly
258 // instead of posting it because the renderer won't call into this class
259 // again when it's in kFlushing state.
260 // TODO(xhwang): Improve the resetting process to avoid this dependency on the
262 DCHECK(!reset_cb_
.is_null());
263 SatisfyRead(ABORTED
, NULL
);
266 template <DemuxerStream::Type StreamType
>
267 void DecoderStream
<StreamType
>::Decode(
268 const scoped_refptr
<DecoderBuffer
>& buffer
) {
270 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
271 DCHECK(!read_cb_
.is_null());
272 DCHECK(reset_cb_
.is_null());
273 DCHECK(stop_cb_
.is_null());
276 int buffer_size
= buffer
->end_of_stream() ? 0 : buffer
->data_size();
278 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString
<StreamType
>(), this);
279 decoder_
->Decode(buffer
,
280 base::Bind(&DecoderStream
<StreamType
>::OnDecodeOutputReady
,
281 weak_factory_
.GetWeakPtr(),
285 template <DemuxerStream::Type StreamType
>
286 void DecoderStream
<StreamType
>::FlushDecoder() {
287 Decode(DecoderBuffer::CreateEOSBuffer());
290 template <DemuxerStream::Type StreamType
>
291 void DecoderStream
<StreamType
>::OnDecodeOutputReady(
293 typename
Decoder::Status status
,
294 const scoped_refptr
<Output
>& output
) {
296 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
) << state_
;
297 DCHECK(!read_cb_
.is_null());
298 DCHECK(stop_cb_
.is_null());
299 DCHECK_EQ(status
== Decoder::kOk
, output
!= NULL
);
301 TRACE_EVENT_ASYNC_END0("media", GetTraceString
<StreamType
>(), this);
303 if (status
== Decoder::kDecodeError
) {
304 state_
= STATE_ERROR
;
305 SatisfyRead(DECODE_ERROR
, NULL
);
309 if (status
== Decoder::kDecryptError
) {
310 state_
= STATE_ERROR
;
311 SatisfyRead(DECRYPT_ERROR
, NULL
);
315 if (status
== Decoder::kAborted
) {
316 SatisfyRead(ABORTED
, NULL
);
320 // Any successful decode counts!
321 if (buffer_size
> 0) {
322 StreamTraits::ReportStatistics(statistics_cb_
, buffer_size
);
325 // Drop decoding result if Reset() was called during decoding.
326 // The resetting process will be handled when the decoder is reset.
327 if (!reset_cb_
.is_null()) {
332 // Decoder flushed. Reinitialize the decoder.
333 if (state_
== STATE_FLUSHING_DECODER
&&
334 status
== Decoder::kOk
&& output
->end_of_stream()) {
335 ReinitializeDecoder();
339 if (status
== Decoder::kNotEnoughData
) {
340 if (state_
== STATE_NORMAL
)
341 ReadFromDemuxerStream();
342 else if (state_
== STATE_FLUSHING_DECODER
)
348 SatisfyRead(OK
, output
);
351 template <DemuxerStream::Type StreamType
>
352 void DecoderStream
<StreamType
>::ReadFromDemuxerStream() {
354 DCHECK_EQ(state_
, STATE_NORMAL
) << state_
;
355 DCHECK(!read_cb_
.is_null());
356 DCHECK(reset_cb_
.is_null());
357 DCHECK(stop_cb_
.is_null());
359 state_
= STATE_PENDING_DEMUXER_READ
;
360 stream_
->Read(base::Bind(&DecoderStream
<StreamType
>::OnBufferReady
,
361 weak_factory_
.GetWeakPtr()));
364 template <DemuxerStream::Type StreamType
>
365 void DecoderStream
<StreamType
>::OnBufferReady(
366 DemuxerStream::Status status
,
367 const scoped_refptr
<DecoderBuffer
>& buffer
) {
368 FUNCTION_DVLOG(2) << ": " << status
;
369 DCHECK(task_runner_
->BelongsToCurrentThread());
370 DCHECK_EQ(state_
, STATE_PENDING_DEMUXER_READ
) << state_
;
371 DCHECK_EQ(buffer
.get() != NULL
, status
== DemuxerStream::kOk
) << status
;
372 DCHECK(!read_cb_
.is_null());
373 DCHECK(stop_cb_
.is_null());
375 state_
= STATE_NORMAL
;
377 if (status
== DemuxerStream::kConfigChanged
) {
378 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
379 DCHECK(stream_
->SupportsConfigChanges());
381 if (!config_change_observer_cb_
.is_null())
382 config_change_observer_cb_
.Run();
384 state_
= STATE_FLUSHING_DECODER
;
385 if (!reset_cb_
.is_null()) {
387 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
388 // which will continue the resetting process in it's callback.
389 if (!decrypting_demuxer_stream_
)
390 Reset(base::ResetAndReturn(&reset_cb_
));
391 // Reinitialization will continue after Reset() is done.
398 if (!reset_cb_
.is_null()) {
400 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
401 // which will continue the resetting process in it's callback.
402 if (!decrypting_demuxer_stream_
)
403 Reset(base::ResetAndReturn(&reset_cb_
));
407 if (status
== DemuxerStream::kAborted
) {
408 SatisfyRead(DEMUXER_READ_ABORTED
, NULL
);
412 if (!splice_observer_cb_
.is_null() && !buffer
->end_of_stream()) {
413 const bool has_splice_ts
= buffer
->splice_timestamp() != kNoTimestamp();
414 if (active_splice_
|| has_splice_ts
) {
415 splice_observer_cb_
.Run(buffer
->splice_timestamp());
416 active_splice_
= has_splice_ts
;
420 DCHECK(status
== DemuxerStream::kOk
) << status
;
424 template <DemuxerStream::Type StreamType
>
425 void DecoderStream
<StreamType
>::ReinitializeDecoder() {
427 DCHECK(task_runner_
->BelongsToCurrentThread());
428 DCHECK_EQ(state_
, STATE_FLUSHING_DECODER
) << state_
;
430 DCHECK(StreamTraits::GetDecoderConfig(*stream_
).IsValidConfig());
431 state_
= STATE_REINITIALIZING_DECODER
;
432 decoder_
->Initialize(
433 StreamTraits::GetDecoderConfig(*stream_
),
434 base::Bind(&DecoderStream
<StreamType
>::OnDecoderReinitialized
,
435 weak_factory_
.GetWeakPtr()));
438 template <DemuxerStream::Type StreamType
>
439 void DecoderStream
<StreamType
>::OnDecoderReinitialized(PipelineStatus status
) {
441 DCHECK(task_runner_
->BelongsToCurrentThread());
442 DCHECK_EQ(state_
, STATE_REINITIALIZING_DECODER
) << state_
;
443 DCHECK(stop_cb_
.is_null());
445 // ReinitializeDecoder() can be called in two cases:
446 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
447 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
448 // Also, Reset() can be called during pending ReinitializeDecoder().
449 // This function needs to handle them all!
451 state_
= (status
== PIPELINE_OK
) ? STATE_NORMAL
: STATE_ERROR
;
453 if (!reset_cb_
.is_null()) {
454 if (!read_cb_
.is_null())
456 base::ResetAndReturn(&reset_cb_
).Run();
459 if (read_cb_
.is_null())
462 if (state_
== STATE_ERROR
) {
463 SatisfyRead(DECODE_ERROR
, NULL
);
467 ReadFromDemuxerStream();
470 template <DemuxerStream::Type StreamType
>
471 void DecoderStream
<StreamType
>::ResetDecoder() {
473 DCHECK(task_runner_
->BelongsToCurrentThread());
474 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
475 state_
== STATE_ERROR
) << state_
;
476 DCHECK(!reset_cb_
.is_null());
478 decoder_
->Reset(base::Bind(&DecoderStream
<StreamType
>::OnDecoderReset
,
479 weak_factory_
.GetWeakPtr()));
482 template <DemuxerStream::Type StreamType
>
483 void DecoderStream
<StreamType
>::OnDecoderReset() {
485 DCHECK(task_runner_
->BelongsToCurrentThread());
486 DCHECK(state_
== STATE_NORMAL
|| state_
== STATE_FLUSHING_DECODER
||
487 state_
== STATE_ERROR
) << state_
;
488 // If Reset() was called during pending read, read callback should be fired
489 // before the reset callback is fired.
490 DCHECK(read_cb_
.is_null());
491 DCHECK(!reset_cb_
.is_null());
492 DCHECK(stop_cb_
.is_null());
494 if (state_
!= STATE_FLUSHING_DECODER
) {
495 base::ResetAndReturn(&reset_cb_
).Run();
499 // The resetting process will be continued in OnDecoderReinitialized().
500 ReinitializeDecoder();
503 template <DemuxerStream::Type StreamType
>
504 void DecoderStream
<StreamType
>::StopDecoder() {
506 DCHECK(task_runner_
->BelongsToCurrentThread());
507 DCHECK(state_
!= STATE_UNINITIALIZED
&& state_
!= STATE_STOPPED
) << state_
;
508 DCHECK(!stop_cb_
.is_null());
510 state_
= STATE_STOPPED
;
514 decrypting_demuxer_stream_
.reset();
515 // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also
517 task_runner_
->PostTask(FROM_HERE
, base::ResetAndReturn(&stop_cb_
));
520 template class DecoderStream
<DemuxerStream::VIDEO
>;
521 template class DecoderStream
<DemuxerStream::AUDIO
>;