Roll src/third_party/WebKit eac3800:0237a66 (svn 202606:202607)
[chromium-blink-merge.git] / content / child / shared_memory_data_consumer_handle.cc
blob7bf404a5281e2884dc031534fb89343cb0e4db89
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
7 #include <algorithm>
8 #include <deque>
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "content/public/child/fixed_received_data.h"
17 namespace content {
19 namespace {
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData {
23 public:
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr<RequestPeer::ReceivedData> data)
26 : data_(data.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
27 ~DelegateThreadSafeReceivedData() override {
28 if (!task_runner_->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_->DeleteSoon(FROM_HERE, data_.release());
34 const char* payload() const override { return data_->payload(); }
35 int length() const override { return data_->length(); }
36 int encoded_length() const override { return data_->encoded_length(); }
38 private:
39 scoped_ptr<RequestPeer::ReceivedData> data_;
40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
45 } // namespace
47 using Result = blink::WebDataConsumerHandle::Result;
49 // All methods (except for ctor/dtor) must be called with |lock_| aquired
50 // unless otherwise stated.
51 class SharedMemoryDataConsumerHandle::Context final
52 : public base::RefCountedThreadSafe<Context> {
53 public:
54 explicit Context(const base::Closure& on_reader_detached)
55 : result_(Ok),
56 first_offset_(0),
57 client_(nullptr),
58 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
59 on_reader_detached_(on_reader_detached),
60 is_on_reader_detached_valid_(!on_reader_detached_.is_null()),
61 is_handle_active_(true),
62 is_two_phase_read_in_progress_(false) {}
64 bool IsEmpty() const {
65 lock_.AssertAcquired();
66 return queue_.empty();
68 void ClearIfNecessary() {
69 lock_.AssertAcquired();
70 if (!is_handle_locked() && !is_handle_active()) {
71 // No one is interested in the contents.
72 if (is_on_reader_detached_valid_) {
73 // We post a task even in the writer thread in order to avoid a
74 // reentrance problem as calling |on_reader_detached_| may manipulate
75 // the context synchronously.
76 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_);
78 Clear();
81 void ClearQueue() {
82 lock_.AssertAcquired();
83 for (auto& data : queue_) {
84 delete data;
86 queue_.clear();
87 first_offset_ = 0;
89 RequestPeer::ThreadSafeReceivedData* Top() {
90 lock_.AssertAcquired();
91 return queue_.front();
93 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
94 lock_.AssertAcquired();
95 queue_.push_back(data.release());
97 size_t first_offset() const {
98 lock_.AssertAcquired();
99 return first_offset_;
101 Result result() const {
102 lock_.AssertAcquired();
103 return result_;
105 void set_result(Result r) {
106 lock_.AssertAcquired();
107 result_ = r;
109 void AcquireReaderLock(Client* client) {
110 lock_.AssertAcquired();
111 DCHECK(!notification_task_runner_);
112 DCHECK(!client_);
113 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get();
114 client_ = client;
115 if (client && !(IsEmpty() && result() == Ok)) {
116 // We cannot notify synchronously because the user doesn't have the reader
117 // yet.
118 notification_task_runner_->PostTask(
119 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
122 void ReleaseReaderLock() {
123 lock_.AssertAcquired();
124 DCHECK(notification_task_runner_);
125 notification_task_runner_ = nullptr;
126 client_ = nullptr;
128 void PostNotify() {
129 lock_.AssertAcquired();
130 auto runner = notification_task_runner_;
131 if (!runner)
132 return;
133 // We don't re-post the task when the runner changes while waiting for
134 // this task because in this case a new reader is obtained and
135 // notification is already done at the reader creation time if necessary.
136 runner->PostTask(FROM_HERE,
137 base::Bind(&Context::NotifyInternal, this, false));
139 // Must be called with |lock_| not aquired.
140 void Notify() { NotifyInternal(true); }
141 // This function doesn't work in the destructor if |on_reader_detached_| is
142 // not null.
143 void ResetOnReaderDetached() {
144 lock_.AssertAcquired();
145 if (on_reader_detached_.is_null()) {
146 DCHECK(!is_on_reader_detached_valid_);
147 return;
149 is_on_reader_detached_valid_ = false;
150 if (writer_task_runner_->BelongsToCurrentThread()) {
151 // We can reset the closure immediately.
152 on_reader_detached_.Reset();
153 } else {
154 // We need to reset |on_reader_detached_| on the right thread because it
155 // might lead to the object destruction.
156 writer_task_runner_->PostTask(
157 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this));
160 bool is_handle_locked() const {
161 lock_.AssertAcquired();
162 return notification_task_runner_;
164 bool IsReaderBoundToCurrentThread() const {
165 lock_.AssertAcquired();
166 return notification_task_runner_ &&
167 notification_task_runner_->BelongsToCurrentThread();
169 bool is_handle_active() const {
170 lock_.AssertAcquired();
171 return is_handle_active_;
173 void set_is_handle_active(bool b) {
174 lock_.AssertAcquired();
175 is_handle_active_ = b;
177 void Consume(size_t s) {
178 lock_.AssertAcquired();
179 first_offset_ += s;
180 auto top = Top();
181 if (static_cast<size_t>(top->length()) <= first_offset_) {
182 delete top;
183 queue_.pop_front();
184 first_offset_ = 0;
187 bool is_two_phase_read_in_progress() const {
188 lock_.AssertAcquired();
189 return is_two_phase_read_in_progress_;
191 void set_is_two_phase_read_in_progress(bool b) {
192 lock_.AssertAcquired();
193 is_two_phase_read_in_progress_ = b;
195 // Can be called with |lock_| not aquired.
196 base::Lock& lock() { return lock_; }
198 private:
199 // Must be called with |lock_| not aquired.
200 void NotifyInternal(bool repost) {
201 scoped_refptr<base::SingleThreadTaskRunner> runner;
203 base::AutoLock lock(lock_);
204 runner = notification_task_runner_;
206 if (!runner)
207 return;
209 if (runner->BelongsToCurrentThread()) {
210 // It is safe to access member variables without lock because |client_|
211 // is bound to the current thread.
212 if (client_)
213 client_->didGetReadable();
214 return;
216 if (repost) {
217 // We don't re-post the task when the runner changes while waiting for
218 // this task because in this case a new reader is obtained and
219 // notification is already done at the reader creation time if necessary.
220 runner->PostTask(FROM_HERE,
221 base::Bind(&Context::NotifyInternal, this, false));
224 void Clear() {
225 lock_.AssertAcquired();
226 for (auto& data : queue_) {
227 delete data;
229 queue_.clear();
230 first_offset_ = 0;
231 client_ = nullptr;
232 // Note this doesn't work in the destructor if |on_reader_detached_| is not
233 // null. We have an assert in the destructor.
234 ResetOnReaderDetached();
236 // Must be called with |lock_| not aquired.
237 void ResetOnReaderDetachedWithLock() {
238 base::AutoLock lock(lock_);
239 ResetOnReaderDetached();
242 friend class base::RefCountedThreadSafe<Context>;
243 ~Context() {
244 base::AutoLock lock(lock_);
245 DCHECK(on_reader_detached_.is_null());
247 // This is necessary because the queue stores raw pointers.
248 Clear();
251 base::Lock lock_;
252 // |result_| stores the ultimate state of this handle if it has. Otherwise,
253 // |Ok| is set.
254 Result result_;
255 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
256 // once it is allowed.
257 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
258 size_t first_offset_;
259 Client* client_;
260 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
261 scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_;
262 base::Closure on_reader_detached_;
263 // We need this boolean variable to remember if |on_reader_detached_| is
264 // callable because we need to reset |on_reader_detached_| only on the writer
265 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
266 // other threads.
267 bool is_on_reader_detached_valid_;
268 bool is_handle_active_;
269 bool is_two_phase_read_in_progress_;
271 DISALLOW_COPY_AND_ASSIGN(Context);
274 SharedMemoryDataConsumerHandle::Writer::Writer(
275 const scoped_refptr<Context>& context,
276 BackpressureMode mode)
277 : context_(context), mode_(mode) {
280 SharedMemoryDataConsumerHandle::Writer::~Writer() {
281 Close();
282 base::AutoLock lock(context_->lock());
283 context_->ResetOnReaderDetached();
286 void SharedMemoryDataConsumerHandle::Writer::AddData(
287 scoped_ptr<RequestPeer::ReceivedData> data) {
288 if (!data->length()) {
289 // We omit empty data.
290 return;
293 bool needs_notification = false;
295 base::AutoLock lock(context_->lock());
296 if (!context_->is_handle_active() && !context_->is_handle_locked()) {
297 // No one is interested in the data.
298 return;
301 needs_notification = context_->IsEmpty();
302 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
303 if (mode_ == kApplyBackpressure) {
304 data_to_pass =
305 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
306 } else {
307 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
309 context_->Push(data_to_pass.Pass());
312 if (needs_notification) {
313 // We CAN issue the notification synchronously if the associated reader
314 // lives in this thread, because this function cannot be called in the
315 // client's callback.
316 context_->Notify();
320 void SharedMemoryDataConsumerHandle::Writer::Close() {
321 base::AutoLock lock(context_->lock());
322 if (context_->result() == Ok) {
323 context_->set_result(Done);
324 context_->ResetOnReaderDetached();
325 if (context_->IsEmpty()) {
326 // We cannot issue the notification synchronously because this function
327 // can be called in the client's callback.
328 context_->PostNotify();
333 void SharedMemoryDataConsumerHandle::Writer::Fail() {
334 base::AutoLock lock(context_->lock());
335 if (context_->result() == Ok) {
336 // TODO(yhirano): Use an appropriate error code other than
337 // UnexpectedError.
338 context_->set_result(UnexpectedError);
340 if (context_->is_two_phase_read_in_progress()) {
341 // If we are in two-phase read session, we cannot discard the data. We
342 // will clear the queue at the end of the session.
343 } else {
344 context_->ClearQueue();
347 context_->ResetOnReaderDetached();
348 // We cannot issue the notification synchronously because this function can
349 // be called in the client's callback.
350 context_->PostNotify();
354 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
355 scoped_refptr<Context> context,
356 Client* client)
357 : context_(context) {
358 base::AutoLock lock(context_->lock());
359 DCHECK(!context_->is_handle_locked());
360 context_->AcquireReaderLock(client);
363 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
364 base::AutoLock lock(context_->lock());
365 context_->ReleaseReaderLock();
366 context_->ClearIfNecessary();
369 Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
370 void* data,
371 size_t size,
372 Flags flags,
373 size_t* read_size_to_return) {
374 base::AutoLock lock(context_->lock());
376 size_t total_read_size = 0;
377 *read_size_to_return = 0;
379 if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
380 context_->set_result(UnexpectedError);
382 if (context_->result() != Ok && context_->result() != Done)
383 return context_->result();
385 while (!context_->IsEmpty() && total_read_size < size) {
386 const auto& top = context_->Top();
387 size_t readable = top->length() - context_->first_offset();
388 size_t writable = size - total_read_size;
389 size_t read_size = std::min(readable, writable);
390 const char* begin = top->payload() + context_->first_offset();
391 std::copy(begin, begin + read_size,
392 static_cast<char*>(data) + total_read_size);
393 total_read_size += read_size;
394 context_->Consume(read_size);
396 *read_size_to_return = total_read_size;
397 if (total_read_size || !context_->IsEmpty())
398 return Ok;
399 if (context_->result() == Done)
400 return Done;
401 return ShouldWait;
404 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
405 const void** buffer,
406 Flags flags,
407 size_t* available) {
408 *buffer = nullptr;
409 *available = 0;
411 base::AutoLock lock(context_->lock());
413 if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
414 context_->set_result(UnexpectedError);
416 if (context_->result() != Ok && context_->result() != Done)
417 return context_->result();
419 if (context_->IsEmpty())
420 return context_->result() == Done ? Done : ShouldWait;
422 context_->set_is_two_phase_read_in_progress(true);
423 const auto& top = context_->Top();
424 *buffer = top->payload() + context_->first_offset();
425 *available = top->length() - context_->first_offset();
427 return Ok;
430 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
431 base::AutoLock lock(context_->lock());
433 if (!context_->is_two_phase_read_in_progress())
434 return UnexpectedError;
436 context_->set_is_two_phase_read_in_progress(false);
437 if (context_->result() != Ok && context_->result() != Done) {
438 // We have an error, so we can discard the stored data.
439 context_->ClearQueue();
440 } else {
441 context_->Consume(read_size);
444 return Ok;
447 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
448 BackpressureMode mode,
449 scoped_ptr<Writer>* writer)
450 : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer) {
453 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
454 BackpressureMode mode,
455 const base::Closure& on_reader_detached,
456 scoped_ptr<Writer>* writer)
457 : context_(new Context(on_reader_detached)) {
458 writer->reset(new Writer(context_, mode));
461 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
462 base::AutoLock lock(context_->lock());
463 context_->set_is_handle_active(false);
464 context_->ClearIfNecessary();
467 scoped_ptr<blink::WebDataConsumerHandle::Reader>
468 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
469 return make_scoped_ptr(obtainReaderInternal(client));
472 SharedMemoryDataConsumerHandle::ReaderImpl*
473 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
474 return new ReaderImpl(context_, client);
477 const char* SharedMemoryDataConsumerHandle::debugName() const {
478 return "SharedMemoryDataConsumerHandle";
481 } // namespace content