Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / chromecast / media / cma / ipc / media_message_fifo.cc
blobbd68f3f8673e14683bed6022318866615580e199
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chromecast/media/cma/ipc/media_message_fifo.h"
7 #include "base/atomicops.h"
8 #include "base/bind.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "base/single_thread_task_runner.h"
12 #include "base/thread_task_runner_handle.h"
13 #include "chromecast/media/cma/base/cma_logging.h"
14 #include "chromecast/media/cma/ipc/media_memory_chunk.h"
15 #include "chromecast/media/cma/ipc/media_message.h"
16 #include "chromecast/media/cma/ipc/media_message_type.h"
18 namespace chromecast {
19 namespace media {
21 class MediaMessageFlag
22 : public base::RefCountedThreadSafe<MediaMessageFlag> {
23 public:
24 // |offset| is the offset in the fifo of the media message.
25 explicit MediaMessageFlag(size_t offset);
27 bool IsValid() const;
29 void Invalidate();
31 size_t offset() const { return offset_; }
33 private:
34 friend class base::RefCountedThreadSafe<MediaMessageFlag>;
35 virtual ~MediaMessageFlag();
37 const size_t offset_;
38 bool flag_;
40 DISALLOW_COPY_AND_ASSIGN(MediaMessageFlag);
43 MediaMessageFlag::MediaMessageFlag(size_t offset)
44 : offset_(offset),
45 flag_(true) {
48 MediaMessageFlag::~MediaMessageFlag() {
51 bool MediaMessageFlag::IsValid() const {
52 return flag_;
55 void MediaMessageFlag::Invalidate() {
56 flag_ = false;
59 class FifoOwnedMemory : public MediaMemoryChunk {
60 public:
61 FifoOwnedMemory(void* data, size_t size,
62 const scoped_refptr<MediaMessageFlag>& flag,
63 const base::Closure& release_msg_cb);
64 ~FifoOwnedMemory() override;
66 // MediaMemoryChunk implementation.
67 void* data() const override { return data_; }
68 size_t size() const override { return size_; }
69 bool valid() const override { return flag_->IsValid(); }
71 private:
72 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
73 base::Closure release_msg_cb_;
75 void* const data_;
76 const size_t size_;
77 scoped_refptr<MediaMessageFlag> flag_;
79 DISALLOW_COPY_AND_ASSIGN(FifoOwnedMemory);
82 FifoOwnedMemory::FifoOwnedMemory(void* data,
83 size_t size,
84 const scoped_refptr<MediaMessageFlag>& flag,
85 const base::Closure& release_msg_cb)
86 : task_runner_(base::ThreadTaskRunnerHandle::Get()),
87 release_msg_cb_(release_msg_cb),
88 data_(data),
89 size_(size),
90 flag_(flag) {
93 FifoOwnedMemory::~FifoOwnedMemory() {
94 // Release the flag before notifying that the message has been released.
95 flag_ = scoped_refptr<MediaMessageFlag>();
96 if (!release_msg_cb_.is_null()) {
97 if (task_runner_->BelongsToCurrentThread()) {
98 release_msg_cb_.Run();
99 } else {
100 task_runner_->PostTask(FROM_HERE, release_msg_cb_);
105 MediaMessageFifo::MediaMessageFifo(
106 scoped_ptr<MediaMemoryChunk> mem, bool init)
107 : mem_(mem.Pass()),
108 weak_factory_(this) {
109 CHECK_EQ(reinterpret_cast<uintptr_t>(mem_->data()) % ALIGNOF(Descriptor),
110 0u);
111 CHECK_GE(mem_->size(), sizeof(Descriptor));
112 Descriptor* desc = static_cast<Descriptor*>(mem_->data());
113 base_ = static_cast<void*>(&desc->first_item);
115 // TODO(damienv): remove cast when atomic size_t is defined in Chrome.
116 // Currently, the sign differs.
117 rd_offset_ = reinterpret_cast<AtomicSize*>(&(desc->rd_offset));
118 wr_offset_ = reinterpret_cast<AtomicSize*>(&(desc->wr_offset));
120 size_t max_size = mem_->size() -
121 (static_cast<char*>(base_) - static_cast<char*>(mem_->data()));
122 if (init) {
123 size_ = max_size;
124 desc->size = size_;
125 internal_rd_offset_ = 0;
126 internal_wr_offset_ = 0;
127 base::subtle::Release_Store(rd_offset_, 0);
128 base::subtle::Release_Store(wr_offset_, 0);
129 } else {
130 size_ = desc->size;
131 CHECK_LE(size_, max_size);
132 internal_rd_offset_ = current_rd_offset();
133 internal_wr_offset_ = current_wr_offset();
135 CMALOG(kLogControl)
136 << "MediaMessageFifo:" << " init=" << init << " size=" << size_;
137 CHECK_GT(size_, 0u) << size_;
139 weak_this_ = weak_factory_.GetWeakPtr();
140 thread_checker_.DetachFromThread();
143 MediaMessageFifo::~MediaMessageFifo() {
144 DCHECK(thread_checker_.CalledOnValidThread());
147 void MediaMessageFifo::ObserveReadActivity(
148 const base::Closure& read_event_cb) {
149 read_event_cb_ = read_event_cb;
152 void MediaMessageFifo::ObserveWriteActivity(
153 const base::Closure& write_event_cb) {
154 write_event_cb_ = write_event_cb;
157 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemory(
158 size_t size_to_reserve) {
159 DCHECK(thread_checker_.CalledOnValidThread());
161 // Capture first both the read and write offsets.
162 // and exit right away if not enough free space.
163 size_t wr_offset = internal_wr_offset();
164 size_t rd_offset = current_rd_offset();
165 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
166 size_t free_size = size_ - 1 - allocated_size;
167 if (free_size < size_to_reserve)
168 return scoped_ptr<MediaMemoryChunk>();
169 CHECK_LE(MediaMessage::minimum_msg_size(), size_to_reserve);
171 // Note: in the next 2 conditions, we have:
172 // trailing_byte_count < size_to_reserve
173 // and since at this stage: size_to_reserve <= free_size
174 // we also have trailing_byte_count <= free_size
175 // which means that all the trailing bytes are free space in the fifo.
176 size_t trailing_byte_count = size_ - wr_offset;
177 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
178 // If there is no space to even write the smallest message,
179 // skip the trailing bytes and come back to the beginning of the fifo.
180 // (no way to insert a padding message).
181 if (free_size < trailing_byte_count)
182 return scoped_ptr<MediaMemoryChunk>();
183 wr_offset = 0;
184 CommitInternalWrite(wr_offset);
186 } else if (trailing_byte_count < size_to_reserve) {
187 // At this point, we know we have at least the space to write a message.
188 // However, to avoid splitting a message, a padding message is needed.
189 scoped_ptr<MediaMemoryChunk> mem(
190 ReserveMemoryNoCheck(trailing_byte_count));
191 scoped_ptr<MediaMessage> padding_message(
192 MediaMessage::CreateMessage(PaddingMediaMsg, mem.Pass()));
195 // Recalculate the free size and exit if not enough free space.
196 wr_offset = internal_wr_offset();
197 allocated_size = (size_ + wr_offset - rd_offset) % size_;
198 free_size = size_ - 1 - allocated_size;
199 if (free_size < size_to_reserve)
200 return scoped_ptr<MediaMemoryChunk>();
202 return ReserveMemoryNoCheck(size_to_reserve);
205 scoped_ptr<MediaMessage> MediaMessageFifo::Pop() {
206 DCHECK(thread_checker_.CalledOnValidThread());
208 // Capture the read and write offsets.
209 size_t rd_offset = internal_rd_offset();
210 size_t wr_offset = current_wr_offset();
211 size_t allocated_size = (size_ + wr_offset - rd_offset) % size_;
213 if (allocated_size < MediaMessage::minimum_msg_size())
214 return scoped_ptr<MediaMessage>();
216 size_t trailing_byte_count = size_ - rd_offset;
217 if (trailing_byte_count < MediaMessage::minimum_msg_size()) {
218 // If there is no space to even have the smallest message,
219 // skip the trailing bytes and come back to the beginning of the fifo.
220 // Note: all the trailing bytes correspond to allocated bytes since:
221 // trailing_byte_count < MediaMessage::minimum_msg_size() <= allocated_size
222 rd_offset = 0;
223 allocated_size -= trailing_byte_count;
224 trailing_byte_count = size_;
225 CommitInternalRead(rd_offset);
228 // The message should not be longer than the allocated size
229 // but since a message is a contiguous area of memory, it should also be
230 // smaller than |trailing_byte_count|.
231 size_t max_msg_size = std::min(allocated_size, trailing_byte_count);
232 if (max_msg_size < MediaMessage::minimum_msg_size())
233 return scoped_ptr<MediaMessage>();
234 void* msg_src = static_cast<uint8*>(base_) + rd_offset;
236 // Create a flag to protect the serialized structure of the message
237 // from being overwritten.
238 // The serialized structure starts at offset |rd_offset|.
239 scoped_refptr<MediaMessageFlag> rd_flag(new MediaMessageFlag(rd_offset));
240 rd_flags_.push_back(rd_flag);
241 scoped_ptr<MediaMemoryChunk> mem(
242 new FifoOwnedMemory(
243 msg_src, max_msg_size, rd_flag,
244 base::Bind(&MediaMessageFifo::OnRdMemoryReleased, weak_this_)));
246 // Create the message which wraps its the serialized structure.
247 scoped_ptr<MediaMessage> message(MediaMessage::MapMessage(mem.Pass()));
248 CHECK(message);
250 // Update the internal read pointer.
251 rd_offset = (rd_offset + message->size()) % size_;
252 CommitInternalRead(rd_offset);
254 return message.Pass();
257 void MediaMessageFifo::Flush() {
258 DCHECK(thread_checker_.CalledOnValidThread());
260 size_t wr_offset = current_wr_offset();
262 // Invalidate every memory region before flushing.
263 while (!rd_flags_.empty()) {
264 CMALOG(kLogControl) << "Invalidate flag";
265 rd_flags_.front()->Invalidate();
266 rd_flags_.pop_front();
269 // Flush by setting the read pointer to the value of the write pointer.
270 // Update first the internal read pointer then the public one.
271 CommitInternalRead(wr_offset);
272 CommitRead(wr_offset);
275 scoped_ptr<MediaMemoryChunk> MediaMessageFifo::ReserveMemoryNoCheck(
276 size_t size_to_reserve) {
277 size_t wr_offset = internal_wr_offset();
279 // Memory block corresponding to the serialized structure of the message.
280 void* msg_start = static_cast<uint8*>(base_) + wr_offset;
281 scoped_refptr<MediaMessageFlag> wr_flag(new MediaMessageFlag(wr_offset));
282 wr_flags_.push_back(wr_flag);
283 scoped_ptr<MediaMemoryChunk> mem(
284 new FifoOwnedMemory(
285 msg_start, size_to_reserve, wr_flag,
286 base::Bind(&MediaMessageFifo::OnWrMemoryReleased, weak_this_)));
288 // Update the internal write pointer.
289 wr_offset = (wr_offset + size_to_reserve) % size_;
290 CommitInternalWrite(wr_offset);
292 return mem.Pass();
295 void MediaMessageFifo::OnWrMemoryReleased() {
296 DCHECK(thread_checker_.CalledOnValidThread());
298 if (wr_flags_.empty()) {
299 // Sanity check: when there is no protected memory area,
300 // the external write offset has no reason to be different from
301 // the internal write offset.
302 DCHECK_EQ(current_wr_offset(), internal_wr_offset());
303 return;
306 // Update the external write offset.
307 while (!wr_flags_.empty() &&
308 (!wr_flags_.front()->IsValid() || wr_flags_.front()->HasOneRef())) {
309 // TODO(damienv): Could add a sanity check to make sure the offset is
310 // between the external write offset and the read offset (not included).
311 wr_flags_.pop_front();
314 // Update the read offset to the first locked memory area
315 // or to the internal read pointer if nothing prevents it.
316 size_t external_wr_offset = internal_wr_offset();
317 if (!wr_flags_.empty())
318 external_wr_offset = wr_flags_.front()->offset();
319 CommitWrite(external_wr_offset);
322 void MediaMessageFifo::OnRdMemoryReleased() {
323 DCHECK(thread_checker_.CalledOnValidThread());
325 if (rd_flags_.empty()) {
326 // Sanity check: when there is no protected memory area,
327 // the external read offset has no reason to be different from
328 // the internal read offset.
329 DCHECK_EQ(current_rd_offset(), internal_rd_offset());
330 return;
333 // Update the external read offset.
334 while (!rd_flags_.empty() &&
335 (!rd_flags_.front()->IsValid() || rd_flags_.front()->HasOneRef())) {
336 // TODO(damienv): Could add a sanity check to make sure the offset is
337 // between the external read offset and the write offset.
338 rd_flags_.pop_front();
341 // Update the read offset to the first locked memory area
342 // or to the internal read pointer if nothing prevents it.
343 size_t external_rd_offset = internal_rd_offset();
344 if (!rd_flags_.empty())
345 external_rd_offset = rd_flags_.front()->offset();
346 CommitRead(external_rd_offset);
349 size_t MediaMessageFifo::current_rd_offset() const {
350 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
351 size_t rd_offset = base::subtle::Acquire_Load(rd_offset_);
352 CHECK_LT(rd_offset, size_);
353 return rd_offset;
356 size_t MediaMessageFifo::current_wr_offset() const {
357 DCHECK_EQ(sizeof(size_t), sizeof(AtomicSize));
359 // When the fifo consumer acquires the write offset,
360 // we have to make sure that any possible following reads are actually
361 // returning results at least inline with the memory snapshot taken
362 // when the write offset was sampled.
363 // That's why an Acquire_Load is used here.
364 size_t wr_offset = base::subtle::Acquire_Load(wr_offset_);
365 CHECK_LT(wr_offset, size_);
366 return wr_offset;
369 void MediaMessageFifo::CommitRead(size_t new_rd_offset) {
370 // Add a memory fence to ensure the message content is completely read
371 // before updating the read offset.
372 base::subtle::Release_Store(rd_offset_, new_rd_offset);
374 // Since rd_offset_ is updated by a release_store above, any thread that
375 // does acquire_load is guaranteed to see the new rd_offset_ set above.
376 // So it is safe to send the notification.
377 if (!read_event_cb_.is_null()) {
378 read_event_cb_.Run();
382 void MediaMessageFifo::CommitWrite(size_t new_wr_offset) {
383 // Add a memory fence to ensure the message content is written
384 // before updating the write offset.
385 base::subtle::Release_Store(wr_offset_, new_wr_offset);
387 // Since wr_offset_ is updated by a release_store above, any thread that
388 // does acquire_load is guaranteed to see the new wr_offset_ set above.
389 // So it is safe to send the notification.
390 if (!write_event_cb_.is_null()) {
391 write_event_cb_.Run();
395 void MediaMessageFifo::CommitInternalRead(size_t new_rd_offset) {
396 internal_rd_offset_ = new_rd_offset;
399 void MediaMessageFifo::CommitInternalWrite(size_t new_wr_offset) {
400 internal_wr_offset_ = new_wr_offset;
403 } // namespace media
404 } // namespace chromecast