Fix leak when a PeerConnection offer or answer is created.
[chromium-blink-merge.git] / media / filters / decoder_stream.cc
blob4f712be2f2227883e37710fd9ae5a1b6b9bd9751
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/filters/decoder_stream.h"
7 #include "base/bind.h"
8 #include "base/callback_helpers.h"
9 #include "base/debug/trace_event.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/single_thread_task_runner.h"
13 #include "media/base/audio_decoder.h"
14 #include "media/base/bind_to_current_loop.h"
15 #include "media/base/decoder_buffer.h"
16 #include "media/base/demuxer_stream.h"
17 #include "media/base/video_decoder.h"
18 #include "media/filters/decrypting_demuxer_stream.h"
20 namespace media {
22 // TODO(rileya): Devise a better way of specifying trace/UMA/etc strings for
23 // templated classes such as this.
24 template <DemuxerStream::Type StreamType>
25 static const char* GetTraceString();
27 #define FUNCTION_DVLOG(level) \
28 DVLOG(level) << __FUNCTION__ << \
29 "<" << DecoderStreamTraits<StreamType>::ToString() << ">"
31 template <>
32 const char* GetTraceString<DemuxerStream::VIDEO>() {
33 return "DecoderStream<VIDEO>::Decode";
36 template <>
37 const char* GetTraceString<DemuxerStream::AUDIO>() {
38 return "DecoderStream<AUDIO>::Decode";
41 template <DemuxerStream::Type StreamType>
42 DecoderStream<StreamType>::DecoderStream(
43 const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
44 ScopedVector<Decoder> decoders,
45 const SetDecryptorReadyCB& set_decryptor_ready_cb)
46 : task_runner_(task_runner),
47 state_(STATE_UNINITIALIZED),
48 stream_(NULL),
49 decoder_selector_(
50 new DecoderSelector<StreamType>(task_runner,
51 decoders.Pass(),
52 set_decryptor_ready_cb)),
53 active_splice_(false),
54 pending_decode_requests_(0),
55 weak_factory_(this) {}
57 template <DemuxerStream::Type StreamType>
58 DecoderStream<StreamType>::~DecoderStream() {
59 DCHECK(state_ == STATE_UNINITIALIZED || state_ == STATE_STOPPED) << state_;
62 template <DemuxerStream::Type StreamType>
63 void DecoderStream<StreamType>::Initialize(DemuxerStream* stream,
64 bool low_delay,
65 const StatisticsCB& statistics_cb,
66 const InitCB& init_cb) {
67 FUNCTION_DVLOG(2);
68 DCHECK(task_runner_->BelongsToCurrentThread());
69 DCHECK_EQ(state_, STATE_UNINITIALIZED) << state_;
70 DCHECK(init_cb_.is_null());
71 DCHECK(!init_cb.is_null());
73 statistics_cb_ = statistics_cb;
74 init_cb_ = init_cb;
75 stream_ = stream;
76 low_delay_ = low_delay;
78 state_ = STATE_INITIALIZING;
79 // TODO(xhwang): DecoderSelector only needs a config to select a decoder.
80 decoder_selector_->SelectDecoder(
81 stream, low_delay,
82 base::Bind(&DecoderStream<StreamType>::OnDecoderSelected,
83 weak_factory_.GetWeakPtr()));
86 template <DemuxerStream::Type StreamType>
87 void DecoderStream<StreamType>::Read(const ReadCB& read_cb) {
88 FUNCTION_DVLOG(2);
89 DCHECK(task_runner_->BelongsToCurrentThread());
90 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
91 state_ == STATE_ERROR || state_ == STATE_REINITIALIZING_DECODER ||
92 state_ == STATE_PENDING_DEMUXER_READ)
93 << state_;
94 // No two reads in the flight at any time.
95 DCHECK(read_cb_.is_null());
96 // No read during resetting or stopping process.
97 DCHECK(reset_cb_.is_null());
98 DCHECK(stop_cb_.is_null());
100 read_cb_ = read_cb;
102 if (state_ == STATE_ERROR) {
103 task_runner_->PostTask(FROM_HERE,
104 base::Bind(base::ResetAndReturn(&read_cb_),
105 DECODE_ERROR,
106 scoped_refptr<Output>()));
107 return;
110 if (!ready_outputs_.empty()) {
111 task_runner_->PostTask(FROM_HERE, base::Bind(
112 base::ResetAndReturn(&read_cb_), OK, ready_outputs_.front()));
113 ready_outputs_.pop_front();
116 // Decoder may be in reinitializing state as result of the previous Read().
117 if (state_ == STATE_REINITIALIZING_DECODER)
118 return;
120 if (!CanDecodeMore())
121 return;
123 if (state_ == STATE_FLUSHING_DECODER) {
124 FlushDecoder();
125 return;
128 if (state_ != STATE_PENDING_DEMUXER_READ)
129 ReadFromDemuxerStream();
132 template <DemuxerStream::Type StreamType>
133 void DecoderStream<StreamType>::Reset(const base::Closure& closure) {
134 FUNCTION_DVLOG(2);
135 DCHECK(task_runner_->BelongsToCurrentThread());
136 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
137 DCHECK(reset_cb_.is_null());
138 DCHECK(stop_cb_.is_null());
140 reset_cb_ = closure;
142 if (!read_cb_.is_null()) {
143 task_runner_->PostTask(FROM_HERE, base::Bind(
144 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
147 ready_outputs_.clear();
149 // During decoder reinitialization, the Decoder does not need to be and
150 // cannot be Reset(). |decrypting_demuxer_stream_| was reset before decoder
151 // reinitialization.
152 if (state_ == STATE_REINITIALIZING_DECODER)
153 return;
155 // During pending demuxer read and when not using DecryptingDemuxerStream,
156 // the Decoder will be reset after demuxer read is returned
157 // (in OnBufferReady()).
158 if (state_ == STATE_PENDING_DEMUXER_READ && !decrypting_demuxer_stream_)
159 return;
161 if (decrypting_demuxer_stream_) {
162 decrypting_demuxer_stream_->Reset(base::Bind(
163 &DecoderStream<StreamType>::ResetDecoder, weak_factory_.GetWeakPtr()));
164 return;
167 ResetDecoder();
170 template <DemuxerStream::Type StreamType>
171 void DecoderStream<StreamType>::Stop(const base::Closure& closure) {
172 FUNCTION_DVLOG(2);
173 DCHECK(task_runner_->BelongsToCurrentThread());
174 DCHECK_NE(state_, STATE_STOPPED) << state_;
175 DCHECK(stop_cb_.is_null());
177 stop_cb_ = closure;
179 if (state_ == STATE_INITIALIZING) {
180 decoder_selector_->Abort();
181 return;
184 DCHECK(init_cb_.is_null());
186 // All pending callbacks will be dropped.
187 weak_factory_.InvalidateWeakPtrs();
189 // Post callbacks to prevent reentrance into this object.
190 if (!read_cb_.is_null()) {
191 task_runner_->PostTask(FROM_HERE, base::Bind(
192 base::ResetAndReturn(&read_cb_), ABORTED, scoped_refptr<Output>()));
194 if (!reset_cb_.is_null())
195 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&reset_cb_));
197 if (decrypting_demuxer_stream_) {
198 decrypting_demuxer_stream_->Stop(base::Bind(
199 &DecoderStream<StreamType>::StopDecoder, weak_factory_.GetWeakPtr()));
200 return;
203 // We may not have a |decoder_| if Stop() was called during initialization.
204 if (decoder_) {
205 StopDecoder();
206 return;
209 state_ = STATE_STOPPED;
210 stream_ = NULL;
211 decoder_.reset();
212 decrypting_demuxer_stream_.reset();
213 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_));
216 template <DemuxerStream::Type StreamType>
217 bool DecoderStream<StreamType>::CanReadWithoutStalling() const {
218 DCHECK(task_runner_->BelongsToCurrentThread());
219 return decoder_->CanReadWithoutStalling();
222 template <>
223 bool DecoderStream<DemuxerStream::AUDIO>::CanReadWithoutStalling() const {
224 DCHECK(task_runner_->BelongsToCurrentThread());
225 return true;
228 template <DemuxerStream::Type StreamType>
229 bool DecoderStream<StreamType>::CanDecodeMore() const {
230 DCHECK(task_runner_->BelongsToCurrentThread());
232 // Limit total number of outputs stored in |ready_outputs_| and being decoded.
233 // It only makes sense to saturate decoder completely when output queue is
234 // empty.
235 int num_decodes =
236 static_cast<int>(ready_outputs_.size()) + pending_decode_requests_;
237 return num_decodes < decoder_->GetMaxDecodeRequests();
240 template <>
241 bool DecoderStream<DemuxerStream::AUDIO>::CanDecodeMore() const {
242 DCHECK(task_runner_->BelongsToCurrentThread());
243 return !pending_decode_requests_ && ready_outputs_.empty();
246 template <DemuxerStream::Type StreamType>
247 void DecoderStream<StreamType>::OnDecoderSelected(
248 scoped_ptr<Decoder> selected_decoder,
249 scoped_ptr<DecryptingDemuxerStream> decrypting_demuxer_stream) {
250 FUNCTION_DVLOG(2);
251 DCHECK(task_runner_->BelongsToCurrentThread());
252 DCHECK_EQ(state_, STATE_INITIALIZING) << state_;
253 DCHECK(!init_cb_.is_null());
254 DCHECK(read_cb_.is_null());
255 DCHECK(reset_cb_.is_null());
257 decoder_selector_.reset();
258 if (decrypting_demuxer_stream)
259 stream_ = decrypting_demuxer_stream.get();
261 if (!selected_decoder) {
262 state_ = STATE_UNINITIALIZED;
263 StreamTraits::FinishInitialization(
264 base::ResetAndReturn(&init_cb_), selected_decoder.get(), stream_);
265 } else {
266 state_ = STATE_NORMAL;
267 decoder_ = selected_decoder.Pass();
268 decrypting_demuxer_stream_ = decrypting_demuxer_stream.Pass();
269 StreamTraits::FinishInitialization(
270 base::ResetAndReturn(&init_cb_), decoder_.get(), stream_);
273 // Stop() called during initialization.
274 if (!stop_cb_.is_null()) {
275 Stop(base::ResetAndReturn(&stop_cb_));
276 return;
280 template <DemuxerStream::Type StreamType>
281 void DecoderStream<StreamType>::SatisfyRead(
282 Status status,
283 const scoped_refptr<Output>& output) {
284 DCHECK(!read_cb_.is_null());
285 base::ResetAndReturn(&read_cb_).Run(status, output);
288 template <DemuxerStream::Type StreamType>
289 void DecoderStream<StreamType>::Decode(
290 const scoped_refptr<DecoderBuffer>& buffer) {
291 FUNCTION_DVLOG(2);
292 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER) << state_;
293 DCHECK(CanDecodeMore());
294 DCHECK(reset_cb_.is_null());
295 DCHECK(stop_cb_.is_null());
296 DCHECK(buffer);
298 int buffer_size = buffer->end_of_stream() ? 0 : buffer->data_size();
300 TRACE_EVENT_ASYNC_BEGIN0("media", GetTraceString<StreamType>(), this);
301 ++pending_decode_requests_;
302 decoder_->Decode(buffer,
303 base::Bind(&DecoderStream<StreamType>::OnDecodeOutputReady,
304 weak_factory_.GetWeakPtr(),
305 buffer_size));
308 template <DemuxerStream::Type StreamType>
309 void DecoderStream<StreamType>::FlushDecoder() {
310 if (pending_decode_requests_ == 0)
311 Decode(DecoderBuffer::CreateEOSBuffer());
314 template <DemuxerStream::Type StreamType>
315 void DecoderStream<StreamType>::OnDecodeOutputReady(
316 int buffer_size,
317 typename Decoder::Status status,
318 const scoped_refptr<Output>& output) {
319 FUNCTION_DVLOG(2) << status << " " << output;
320 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
321 state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR)
322 << state_;
323 DCHECK(stop_cb_.is_null());
324 DCHECK_EQ(status == Decoder::kOk, output != NULL);
325 DCHECK_GT(pending_decode_requests_, 0);
327 --pending_decode_requests_;
329 TRACE_EVENT_ASYNC_END0("media", GetTraceString<StreamType>(), this);
331 if (state_ == STATE_ERROR) {
332 DCHECK(read_cb_.is_null());
333 return;
336 if (status == Decoder::kDecodeError) {
337 state_ = STATE_ERROR;
338 ready_outputs_.clear();
339 if (!read_cb_.is_null())
340 SatisfyRead(DECODE_ERROR, NULL);
341 return;
344 if (status == Decoder::kDecryptError) {
345 state_ = STATE_ERROR;
346 ready_outputs_.clear();
347 if (!read_cb_.is_null())
348 SatisfyRead(DECRYPT_ERROR, NULL);
349 return;
352 if (status == Decoder::kAborted) {
353 if (!read_cb_.is_null())
354 SatisfyRead(ABORTED, NULL);
355 return;
358 // Any successful decode counts!
359 if (buffer_size > 0) {
360 StreamTraits::ReportStatistics(statistics_cb_, buffer_size);
363 // Drop decoding result if Reset() was called during decoding.
364 // The resetting process will be handled when the decoder is reset.
365 if (!reset_cb_.is_null())
366 return;
368 // Decoder flushed. Reinitialize the decoder.
369 if (state_ == STATE_FLUSHING_DECODER &&
370 status == Decoder::kOk && output->end_of_stream()) {
371 ReinitializeDecoder();
372 return;
375 if (status == Decoder::kNotEnoughData) {
376 if (state_ == STATE_NORMAL)
377 ReadFromDemuxerStream();
378 else if (state_ == STATE_FLUSHING_DECODER)
379 FlushDecoder();
380 return;
383 DCHECK(output);
385 // Store decoded output.
386 ready_outputs_.push_back(output);
387 scoped_refptr<Output> extra_output;
388 while ((extra_output = decoder_->GetDecodeOutput()) != NULL) {
389 ready_outputs_.push_back(extra_output);
392 // Satisfy outstanding read request, if any.
393 if (!read_cb_.is_null()) {
394 scoped_refptr<Output> read_result = ready_outputs_.front();
395 ready_outputs_.pop_front();
396 SatisfyRead(OK, output);
400 template <DemuxerStream::Type StreamType>
401 void DecoderStream<StreamType>::ReadFromDemuxerStream() {
402 FUNCTION_DVLOG(2);
403 DCHECK_EQ(state_, STATE_NORMAL) << state_;
404 DCHECK(CanDecodeMore());
405 DCHECK(reset_cb_.is_null());
406 DCHECK(stop_cb_.is_null());
408 state_ = STATE_PENDING_DEMUXER_READ;
409 stream_->Read(base::Bind(&DecoderStream<StreamType>::OnBufferReady,
410 weak_factory_.GetWeakPtr()));
413 template <DemuxerStream::Type StreamType>
414 void DecoderStream<StreamType>::OnBufferReady(
415 DemuxerStream::Status status,
416 const scoped_refptr<DecoderBuffer>& buffer) {
417 FUNCTION_DVLOG(2) << ": " << status;
418 DCHECK(task_runner_->BelongsToCurrentThread());
419 DCHECK(state_ == STATE_PENDING_DEMUXER_READ || state_ == STATE_ERROR ||
420 state_ == STATE_STOPPED)
421 << state_;
422 DCHECK_EQ(buffer.get() != NULL, status == DemuxerStream::kOk) << status;
423 DCHECK(stop_cb_.is_null());
425 // Decoding has been stopped (e.g due to an error).
426 if (state_ != STATE_PENDING_DEMUXER_READ) {
427 DCHECK(state_ == STATE_ERROR || state_ == STATE_STOPPED);
428 DCHECK(read_cb_.is_null());
429 return;
432 state_ = STATE_NORMAL;
434 if (status == DemuxerStream::kConfigChanged) {
435 FUNCTION_DVLOG(2) << ": " << "ConfigChanged";
436 DCHECK(stream_->SupportsConfigChanges());
438 if (!config_change_observer_cb_.is_null())
439 config_change_observer_cb_.Run();
441 state_ = STATE_FLUSHING_DECODER;
442 if (!reset_cb_.is_null()) {
443 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
444 // which will continue the resetting process in it's callback.
445 if (!decrypting_demuxer_stream_)
446 Reset(base::ResetAndReturn(&reset_cb_));
447 // Reinitialization will continue after Reset() is done.
448 } else {
449 FlushDecoder();
451 return;
454 if (!reset_cb_.is_null()) {
455 // If we are using DecryptingDemuxerStream, we already called DDS::Reset()
456 // which will continue the resetting process in it's callback.
457 if (!decrypting_demuxer_stream_)
458 Reset(base::ResetAndReturn(&reset_cb_));
459 return;
462 if (status == DemuxerStream::kAborted) {
463 SatisfyRead(DEMUXER_READ_ABORTED, NULL);
464 return;
467 if (!splice_observer_cb_.is_null() && !buffer->end_of_stream()) {
468 const bool has_splice_ts = buffer->splice_timestamp() != kNoTimestamp();
469 if (active_splice_ || has_splice_ts) {
470 splice_observer_cb_.Run(buffer->splice_timestamp());
471 active_splice_ = has_splice_ts;
475 DCHECK(status == DemuxerStream::kOk) << status;
476 Decode(buffer);
478 // Read more data if the decoder supports multiple parallel decoding requests.
479 if (CanDecodeMore() && !buffer->end_of_stream())
480 ReadFromDemuxerStream();
483 template <DemuxerStream::Type StreamType>
484 void DecoderStream<StreamType>::ReinitializeDecoder() {
485 FUNCTION_DVLOG(2);
486 DCHECK(task_runner_->BelongsToCurrentThread());
487 DCHECK_EQ(state_, STATE_FLUSHING_DECODER) << state_;
488 DCHECK_EQ(pending_decode_requests_, 0);
490 DCHECK(StreamTraits::GetDecoderConfig(*stream_).IsValidConfig());
491 state_ = STATE_REINITIALIZING_DECODER;
492 DecoderStreamTraits<StreamType>::Initialize(
493 decoder_.get(),
494 StreamTraits::GetDecoderConfig(*stream_),
495 low_delay_,
496 base::Bind(&DecoderStream<StreamType>::OnDecoderReinitialized,
497 weak_factory_.GetWeakPtr()));
500 template <DemuxerStream::Type StreamType>
501 void DecoderStream<StreamType>::OnDecoderReinitialized(PipelineStatus status) {
502 FUNCTION_DVLOG(2);
503 DCHECK(task_runner_->BelongsToCurrentThread());
504 DCHECK_EQ(state_, STATE_REINITIALIZING_DECODER) << state_;
505 DCHECK(stop_cb_.is_null());
507 // ReinitializeDecoder() can be called in two cases:
508 // 1, Flushing decoder finished (see OnDecodeOutputReady()).
509 // 2, Reset() was called during flushing decoder (see OnDecoderReset()).
510 // Also, Reset() can be called during pending ReinitializeDecoder().
511 // This function needs to handle them all!
513 state_ = (status == PIPELINE_OK) ? STATE_NORMAL : STATE_ERROR;
515 if (!reset_cb_.is_null()) {
516 base::ResetAndReturn(&reset_cb_).Run();
517 return;
520 if (read_cb_.is_null())
521 return;
523 if (state_ == STATE_ERROR) {
524 SatisfyRead(DECODE_ERROR, NULL);
525 return;
528 ReadFromDemuxerStream();
531 template <DemuxerStream::Type StreamType>
532 void DecoderStream<StreamType>::ResetDecoder() {
533 FUNCTION_DVLOG(2);
534 DCHECK(task_runner_->BelongsToCurrentThread());
535 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
536 state_ == STATE_ERROR) << state_;
537 DCHECK(!reset_cb_.is_null());
539 decoder_->Reset(base::Bind(&DecoderStream<StreamType>::OnDecoderReset,
540 weak_factory_.GetWeakPtr()));
543 template <DemuxerStream::Type StreamType>
544 void DecoderStream<StreamType>::OnDecoderReset() {
545 FUNCTION_DVLOG(2);
546 DCHECK(task_runner_->BelongsToCurrentThread());
547 DCHECK(state_ == STATE_NORMAL || state_ == STATE_FLUSHING_DECODER ||
548 state_ == STATE_ERROR) << state_;
549 // If Reset() was called during pending read, read callback should be fired
550 // before the reset callback is fired.
551 DCHECK(read_cb_.is_null());
552 DCHECK(!reset_cb_.is_null());
553 DCHECK(stop_cb_.is_null());
555 if (state_ != STATE_FLUSHING_DECODER) {
556 base::ResetAndReturn(&reset_cb_).Run();
557 return;
560 // The resetting process will be continued in OnDecoderReinitialized().
561 ReinitializeDecoder();
564 template <DemuxerStream::Type StreamType>
565 void DecoderStream<StreamType>::StopDecoder() {
566 FUNCTION_DVLOG(2);
567 DCHECK(task_runner_->BelongsToCurrentThread());
568 DCHECK(state_ != STATE_UNINITIALIZED && state_ != STATE_STOPPED) << state_;
569 DCHECK(!stop_cb_.is_null());
571 state_ = STATE_STOPPED;
572 decoder_->Stop();
573 stream_ = NULL;
574 decoder_.reset();
575 decrypting_demuxer_stream_.reset();
576 // Post |stop_cb_| because pending |read_cb_| and/or |reset_cb_| are also
577 // posted in Stop().
578 task_runner_->PostTask(FROM_HERE, base::ResetAndReturn(&stop_cb_));
581 template class DecoderStream<DemuxerStream::VIDEO>;
582 template class DecoderStream<DemuxerStream::AUDIO>;
584 } // namespace media