Re-subimission of https://codereview.chromium.org/1041213003/
[chromium-blink-merge.git] / media / filters / decoder_stream.cc
blob3a366d8986198ff9058a94aa9bbd9d5e1a701d4c
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
7 #include "base/bind.h"
8 #include "base/callback_helpers.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/trace_event/trace_event.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
20 namespace media {
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
31 template <>
32 const char* GetTraceString<DemuxerStream::VIDEO>() {
33 return "DecoderStream<VIDEO>::Decode";
36 template <>
37 const char* GetTraceString<DemuxerStream::AUDIO>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType>
42 DecoderStream<StreamType>::DecoderStream(
43 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
44 ScopedVector<Decoder> decoders,
45 const scoped_refptr<MediaLog>& media_log)
46 : task_runner_(task_runner),
47 media_log_(media_log),
48 state_(STATE_UNINITIALIZED),
49 stream_(NULL),
50 decoder_selector_(
51 new DecoderSelector<StreamType>(task_runner, decoders.Pass())),
52 active_splice_(false),
53 decoding_eos_(false),
54 pending_decode_requests_(0),
55 weak_factory_(this) {
58 template <DemuxerStream::Type StreamType>
59 DecoderStream<StreamType>::~DecoderStream() {
60 FUNCTION_DVLOG(2);
61 DCHECK(task_runner_->BelongsToCurrentThread());
63 decoder_selector_.reset();
65 if (!init_cb_.is_null()) {
66 task_runner_->PostTask(FROM_HERE,
67 base::Bind(base::ResetAndReturn(&init_cb_), false));
69 if (!read_cb_.is_null()) {
70 task_runner_->PostTask(FROM_HERE, base::Bind(
71 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
73 if (!reset_cb_.is_null())
74 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
76 stream_ = NULL;
77 decoder_.reset();
78 decrypting_demuxer_stream_.reset();
81 template <DemuxerStream::Type StreamType>
82 void DecoderStream<StreamType>::Initialize(
83 DemuxerStream* stream,
84 const InitCB& init_cb,
85 const SetDecryptorReadyCB& set_decryptor_ready_cb,
86 const StatisticsCB& statistics_cb,
87 const base::Closure& waiting_for_decryption_key_cb) {
88 FUNCTION_DVLOG(2);
89 DCHECK(task_runner_->BelongsToCurrentThread());
90 DCHECK_EQ(state_, STATE_UNINITIALIZED);
91 DCHECK(init_cb_.is_null());
92 DCHECK(!init_cb.is_null());
94 statistics_cb_ = statistics_cb;
95 init_cb_ = init_cb;
96 waiting_for_decryption_key_cb_ = waiting_for_decryption_key_cb;
97 stream_ = stream;
99 state_ = STATE_INITIALIZING;
100 SelectDecoder(set_decryptor_ready_cb);
103 template <DemuxerStream::Type StreamType>
104 void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
105 FUNCTION_DVLOG(2);
106 DCHECK(task_runner_->BelongsToCurrentThread());
107 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_INITIALIZING)
108 << state_;
109 // No two reads in the flight at any time.
110 DCHECK(read_cb_.is_null());
111 // No read during resetting or stopping process.
112 DCHECK(reset_cb_.is_null());
114 if (state_ == STATE_ERROR) {
115 task_runner_->PostTask(
116 FROM_HERE, base::Bind(read_cb, DECODE_ERROR, scoped_refptr<Output>()));
117 return;
120 if (state_ == STATE_END_OF_STREAM && ready_outputs_.empty()) {
121 task_runner_->PostTask(
122 FROM_HERE, base::Bind(read_cb, OK, StreamTraits::CreateEOSOutput()));
123 return;
126 if (!ready_outputs_.empty()) {
127 task_runner_->PostTask(FROM_HERE,
128 base::Bind(read_cb, OK, ready_outputs_.front()));
129 ready_outputs_.pop_front();
130 } else {
131 read_cb_ = read_cb;
134 if (state_ == STATE_NORMAL && CanDecodeMore())
135 ReadFromDemuxerStream();
138 template <DemuxerStream::Type StreamType>
139 void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
140 FUNCTION_DVLOG(2);
141 DCHECK(task_runner_->BelongsToCurrentThread());
142 DCHECK_NE(state_, STATE_UNINITIALIZED);
143 DCHECK(reset_cb_.is_null());
145 reset_cb_ = closure;
147 if (!read_cb_.is_null()) {
148 task_runner_->PostTask(FROM_HERE, base::Bind(
149 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
152 ready_outputs_.clear();
154 // During decoder reinitialization, the Decoder does not need to be and
155 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
156 // reinitialization.
157 if (state_ == STATE_REINITIALIZING_DECODER)
158 return;
160 // During pending demuxer read and when not using DecryptingDemuxerStream,
161 // the Decoder will be reset after demuxer read is returned
162 // (in OnBufferReady()).
163 if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
164 return;
166 if (decrypting_demuxer_stream_) {
167 decrypting_demuxer_stream_->Reset(base::Bind(
168 &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
169 return;
172 ResetDecoder();
175 template <DemuxerStream::Type StreamType>
176 bool DecoderStream<StreamType>::CanReadWithoutStalling() const {
177 DCHECK(task_runner_->BelongsToCurrentThread());
178 return !ready_outputs_.empty() || decoder_->CanReadWithoutStalling();
181 template <>
182 bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
183 DCHECK(task_runner_->BelongsToCurrentThread());
184 return true;
187 template <DemuxerStream::Type StreamType>
188 int DecoderStream<StreamType>::GetMaxDecodeRequests() const {
189 return decoder_->GetMaxDecodeRequests();
192 template <>
193 int DecoderStream<DemuxerStream::AUDIO>::GetMaxDecodeRequests() const {
194 return 1;
197 template <DemuxerStream::Type StreamType>
198 bool DecoderStream<StreamType>::CanDecodeMore() const {
199 DCHECK(task_runner_->BelongsToCurrentThread());
201 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
202 // It only makes sense to saturate decoder completely when output queue is
203 // empty.
204 int num_decodes =
205 static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
206 return !decoding_eos_ && num_decodes < GetMaxDecodeRequests();
209 template <DemuxerStream::Type StreamType>
210 void DecoderStream<StreamType>::SelectDecoder(
211 const SetDecryptorReadyCB& set_decryptor_ready_cb) {
212 decoder_selector_->SelectDecoder(
213 stream_, set_decryptor_ready_cb,
214 base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
215 weak_factory_.GetWeakPtr()),
216 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
217 weak_factory_.GetWeakPtr()),
218 waiting_for_decryption_key_cb_);
221 template <DemuxerStream::Type StreamType>
222 void DecoderStream<StreamType>::OnDecoderSelected(
223 scoped_ptr<Decoder> selected_decoder,
224 scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
225 FUNCTION_DVLOG(2) << ": "
226 << (selected_decoder ? selected_decoder->GetDisplayName()
227 : "No decoder selected.");
228 DCHECK(task_runner_->BelongsToCurrentThread());
229 DCHECK(state_ == STATE_INITIALIZING || state_ == STATE_REINITIALIZING_DECODER)
230 << state_;
231 if (state_ == STATE_INITIALIZING) {
232 DCHECK(!init_cb_.is_null());
233 DCHECK(read_cb_.is_null());
234 DCHECK(reset_cb_.is_null());
235 } else {
236 DCHECK(decoder_);
239 previous_decoder_ = decoder_.Pass();
240 decoder_ = selected_decoder.Pass();
241 if (decrypting_demuxer_stream) {
242 decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass();
243 stream_ = decrypting_demuxer_stream_.get();
246 if (!decoder_) {
247 if (state_ == STATE_INITIALIZING) {
248 state_ = STATE_UNINITIALIZED;
249 base::ResetAndReturn(&init_cb_).Run(false);
250 } else {
251 CompleteDecoderReinitialization(false);
253 return;
256 const std::string stream_type = DecoderStreamTraits<StreamType>::ToString();
257 media_log_->SetBooleanProperty(stream_type + "_dds",
258 decrypting_demuxer_stream_);
259 media_log_->SetStringProperty(stream_type + "_decoder",
260 decoder_->GetDisplayName());
262 if (state_ == STATE_REINITIALIZING_DECODER) {
263 CompleteDecoderReinitialization(true);
264 return;
267 // Initialization succeeded.
268 state_ = STATE_NORMAL;
269 if (StreamTraits::NeedsBitstreamConversion(decoder_.get()))
270 stream_->EnableBitstreamConverter();
271 base::ResetAndReturn(&init_cb_).Run(true);
274 template <DemuxerStream::Type StreamType>
275 void DecoderStream<StreamType>::SatisfyRead(
276 Status status,
277 const scoped_refptr<Output>& output) {
278 DCHECK(!read_cb_.is_null());
279 base::ResetAndReturn(&read_cb_).Run(status, output);
282 template <DemuxerStream::Type StreamType>
283 void DecoderStream<StreamType>::Decode(
284 const scoped_refptr<DecoderBuffer>& buffer) {
285 FUNCTION_DVLOG(2);
286 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
287 DCHECK_LT(pending_decode_requests_, GetMaxDecodeRequests());
288 DCHECK(reset_cb_.is_null());
289 DCHECK(buffer.get());
291 int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
293 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
295 if (buffer->end_of_stream())
296 decoding_eos_ = true;
298 ++pending_decode_requests_;
299 decoder_->Decode(buffer,
300 base::Bind(&DecoderStream<StreamType>::OnDecodeDone,
301 weak_factory_.GetWeakPtr(),
302 buffer_size,
303 buffer->end_of_stream()));
306 template <DemuxerStream::Type StreamType>
307 void DecoderStream<StreamType>::FlushDecoder() {
308 Decode(DecoderBuffer::CreateEOSBuffer());
311 template <DemuxerStream::Type StreamType>
312 void DecoderStream<StreamType>::OnDecodeDone(int buffer_size,
313 bool end_of_stream,
314 typename Decoder::Status status) {
315 FUNCTION_DVLOG(2) << ": " << status;
316 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
317 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
318 << state_;
319 DCHECK_GT(pending_decode_requests_, 0);
321 --pending_decode_requests_;
323 TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
325 if (end_of_stream) {
326 DCHECK(!pending_decode_requests_);
327 decoding_eos_ = false;
330 if (state_ == STATE_ERROR) {
331 DCHECK(read_cb_.is_null());
332 return;
335 // Drop decoding result if Reset() was called during decoding.
336 // The resetting process will be handled when the decoder is reset.
337 if (!reset_cb_.is_null())
338 return;
340 switch (status) {
341 case Decoder::kDecodeError:
342 case Decoder::kDecryptError:
343 state_ = STATE_ERROR;
344 ready_outputs_.clear();
345 if (!read_cb_.is_null())
346 SatisfyRead(DECODE_ERROR, NULL);
347 return;
349 case Decoder::kAborted:
350 // Decoder can return kAborted during Reset() or during destruction.
351 return;
353 case Decoder::kOk:
354 // Any successful decode counts!
355 if (buffer_size > 0)
356 StreamTraits::ReportStatistics(statistics_cb_, buffer_size);
358 if (state_ == STATE_NORMAL) {
359 if (end_of_stream) {
360 state_ = STATE_END_OF_STREAM;
361 if (ready_outputs_.empty() && !read_cb_.is_null())
362 SatisfyRead(OK, StreamTraits::CreateEOSOutput());
363 return;
366 if (CanDecodeMore())
367 ReadFromDemuxerStream();
368 return;
371 if (state_ == STATE_FLUSHING_DECODER && !pending_decode_requests_)
372 ReinitializeDecoder();
373 return;
377 template <DemuxerStream::Type StreamType>
378 void DecoderStream<StreamType>::OnDecodeOutputReady(
379 const scoped_refptr<Output>& output) {
380 FUNCTION_DVLOG(2) << ": " << output->timestamp().InMilliseconds() << " ms";
381 DCHECK(output.get());
382 DCHECK(!output->end_of_stream());
383 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
384 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
385 << state_;
387 if (state_ == STATE_ERROR) {
388 DCHECK(read_cb_.is_null());
389 return;
392 // Drop decoding result if Reset() was called during decoding.
393 // The resetting process will be handled when the decoder is reset.
394 if (!reset_cb_.is_null())
395 return;
397 if (!read_cb_.is_null()) {
398 // If |ready_outputs_| was non-empty, the read would have already been
399 // satisifed by Read().
400 DCHECK(ready_outputs_.empty());
401 SatisfyRead(OK, output);
402 return;
405 // Store decoded output.
406 ready_outputs_.push_back(output);
409 template <DemuxerStream::Type StreamType>
410 void DecoderStream<StreamType>::ReadFromDemuxerStream() {
411 FUNCTION_DVLOG(2);
412 DCHECK_EQ(state_, STATE_NORMAL);
413 DCHECK(CanDecodeMore());
414 DCHECK(reset_cb_.is_null());
416 state_ = STATE_PENDING_DEMUXER_READ;
417 stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady,
418 weak_factory_.GetWeakPtr()));
421 template <DemuxerStream::Type StreamType>
422 void DecoderStream<StreamType>::OnBufferReady(
423 DemuxerStream::Status status,
424 const scoped_refptr<DecoderBuffer>& buffer) {
425 FUNCTION_DVLOG(2) << ": " << status << ", "
426 << (buffer.get() ? buffer->AsHumanReadableString()
427 : "NULL");
429 DCHECK(task_runner_->BelongsToCurrentThread());
430 DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
431 << state_;
432 DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
434 // Decoding has been stopped (e.g due to an error).
435 if (state_ != STATE_PENDING_DEMUXER_READ) {
436 DCHECK(state_ == STATE_ERROR);
437 DCHECK(read_cb_.is_null());
438 return;
441 state_ = STATE_NORMAL;
443 if (status == DemuxerStream::kConfigChanged) {
444 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
445 DCHECK(stream_->SupportsConfigChanges());
447 if (!config_change_observer_cb_.is_null())
448 config_change_observer_cb_.Run();
450 state_ = STATE_FLUSHING_DECODER;
451 if (!reset_cb_.is_null()) {
452 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
453 // which will continue the resetting process in it's callback.
454 if (!decrypting_demuxer_stream_)
455 Reset(base::ResetAndReturn(&reset_cb_));
456 // Reinitialization will continue after Reset() is done.
457 } else {
458 FlushDecoder();
460 return;
463 if (!reset_cb_.is_null()) {
464 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
465 // which will continue the resetting process in it's callback.
466 if (!decrypting_demuxer_stream_)
467 Reset(base::ResetAndReturn(&reset_cb_));
468 return;
471 if (status == DemuxerStream::kAborted) {
472 if (!read_cb_.is_null())
473 SatisfyRead(DEMUXER_READ_ABORTED, NULL);
474 return;
477 if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) {
478 const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp();
479 if (active_splice_ || has_splice_ts) {
480 splice_observer_cb_.Run(buffer->splice_timestamp());
481 active_splice_ = has_splice_ts;
485 DCHECK(status == DemuxerStream::kOk) << status;
486 Decode(buffer);
488 // Read more data if the decoder supports multiple parallel decoding requests.
489 if (CanDecodeMore())
490 ReadFromDemuxerStream();
493 template <DemuxerStream::Type StreamType>
494 void DecoderStream<StreamType>::ReinitializeDecoder() {
495 FUNCTION_DVLOG(2);
496 DCHECK(task_runner_->BelongsToCurrentThread());
497 DCHECK_EQ(state_, STATE_FLUSHING_DECODER);
498 DCHECK_EQ(pending_decode_requests_, 0);
500 state_ = STATE_REINITIALIZING_DECODER;
501 DecoderStreamTraits<StreamType>::InitializeDecoder(
502 decoder_.get(), stream_,
503 base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
504 weak_factory_.GetWeakPtr()),
505 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
506 weak_factory_.GetWeakPtr()));
509 template <DemuxerStream::Type StreamType>
510 void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
511 FUNCTION_DVLOG(2);
512 DCHECK(task_runner_->BelongsToCurrentThread());
513 DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER);
515 // ReinitializeDecoder() can be called in two cases:
516 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
517 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
518 // Also, Reset() can be called during pending ReinitializeDecoder().
519 // This function needs to handle them all!
521 if (status != PIPELINE_OK) {
522 // Reinitialization failed. Try to fall back to one of the remaining
523 // decoders. This will consume at least one decoder so doing it more than
524 // once is safe.
525 // For simplicity, don't attempt to fall back to a decryptor. Calling this
526 // with a null callback ensures that one won't be selected.
527 SelectDecoder(SetDecryptorReadyCB());
528 } else {
529 CompleteDecoderReinitialization(true);
533 template <DemuxerStream::Type StreamType>
534 void DecoderStream<StreamType>::CompleteDecoderReinitialization(bool success) {
535 FUNCTION_DVLOG(2);
536 DCHECK(task_runner_->BelongsToCurrentThread());
537 DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER);
539 state_ = success ? STATE_NORMAL : STATE_ERROR;
541 if (!reset_cb_.is_null()) {
542 base::ResetAndReturn(&reset_cb_).Run();
543 return;
546 if (read_cb_.is_null())
547 return;
549 if (state_ == STATE_ERROR) {
550 SatisfyRead(DECODE_ERROR, NULL);
551 return;
554 ReadFromDemuxerStream();
557 template <DemuxerStream::Type StreamType>
558 void DecoderStream<StreamType>::ResetDecoder() {
559 FUNCTION_DVLOG(2);
560 DCHECK(task_runner_->BelongsToCurrentThread());
561 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
562 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
563 DCHECK(!reset_cb_.is_null());
565 decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset,
566 weak_factory_.GetWeakPtr()));
569 template <DemuxerStream::Type StreamType>
570 void DecoderStream<StreamType>::OnDecoderReset() {
571 FUNCTION_DVLOG(2);
572 DCHECK(task_runner_->BelongsToCurrentThread());
573 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
574 state_ == STATE_ERROR || state_ == STATE_END_OF_STREAM) << state_;
575 // If Reset() was called during pending read, read callback should be fired
576 // before the reset callback is fired.
577 DCHECK(read_cb_.is_null());
578 DCHECK(!reset_cb_.is_null());
580 if (state_ != STATE_FLUSHING_DECODER) {
581 state_ = STATE_NORMAL;
582 active_splice_ = false;
583 base::ResetAndReturn(&reset_cb_).Run();
584 return;
587 // The resetting process will be continued in OnDecoderReinitialized().
588 ReinitializeDecoder();
591 template class DecoderStream<DemuxerStream::VIDEO>;
592 template class DecoderStream<DemuxerStream::AUDIO>;
594 } // namespace media