[Android WebView] Fix webview perf bot switchover to use org.chromium.webview_shell...
[chromium-blink-merge.git] / content / child / shared_memory_data_consumer_handle.cc
blob24db0590a39ba9ab05ae041478950666de398a2a
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
7 #include <algorithm>
8 #include <deque>
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "content/public/child/fixed_received_data.h"
16 namespace content {
18 namespace {
20 class DelegateThreadSafeReceivedData final
21 : public RequestPeer::ThreadSafeReceivedData {
22 public:
23 explicit DelegateThreadSafeReceivedData(
24 scoped_ptr<RequestPeer::ReceivedData> data)
25 : data_(data.Pass()),
26 task_runner_(base::MessageLoop::current()->task_runner()) {}
27 ~DelegateThreadSafeReceivedData() override {
28 if (!task_runner_->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_->DeleteSoon(FROM_HERE, data_.release());
34 const char* payload() const override { return data_->payload(); }
35 int length() const override { return data_->length(); }
36 int encoded_length() const override { return data_->encoded_length(); }
38 private:
39 scoped_ptr<RequestPeer::ReceivedData> data_;
40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
45 } // namespace
47 using Result = blink::WebDataConsumerHandle::Result;
49 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe<Context> {
51 public:
52 Context()
53 : result_(Ok),
54 first_offset_(0),
55 client_(nullptr),
56 is_handle_active_(true) {}
58 bool IsEmpty() const { return queue_.empty(); }
59 void ClearIfNecessary() {
60 if (!is_handle_locked() && !is_handle_active()) {
61 // No one is interested in the contents.
62 Clear();
65 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
66 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
67 queue_.push_back(data.release());
69 size_t first_offset() const { return first_offset_; }
70 Result result() const { return result_; }
71 void set_result(Result r) { result_ = r; }
72 void AcquireReaderLock(Client* client) {
73 DCHECK(!notification_task_runner_);
74 DCHECK(!client_);
75 notification_task_runner_ = base::MessageLoop::current()->task_runner();
76 client_ = client;
77 if (client && !(IsEmpty() && result() == Ok)) {
78 // We cannot notify synchronously because the user doesn't have the reader
79 // yet.
80 notification_task_runner_->PostTask(
81 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
84 void ReleaseReaderLock() {
85 DCHECK(notification_task_runner_);
86 notification_task_runner_ = nullptr;
87 client_ = nullptr;
89 void Notify() { NotifyInternal(true); }
90 bool is_handle_locked() const { return notification_task_runner_; }
91 bool IsReaderBoundToCurrentThread() const {
92 return notification_task_runner_ &&
93 notification_task_runner_->BelongsToCurrentThread();
95 bool is_handle_active() const { return is_handle_active_; }
96 void set_is_handle_active(bool b) { is_handle_active_ = b; }
97 void Consume(size_t s) {
98 first_offset_ += s;
99 auto top = Top();
100 if (static_cast<size_t>(top->length()) <= first_offset_) {
101 delete top;
102 queue_.pop_front();
103 first_offset_ = 0;
106 base::Lock& lock() { return lock_; }
108 private:
109 void NotifyInternal(bool repost) {
110 // Note that this function is not protected by |lock_|.
112 auto runner = notification_task_runner_;
113 if (!runner)
114 return;
116 if (runner->BelongsToCurrentThread()) {
117 // It is safe to access member variables without lock because |client_|
118 // is bound to the current thread.
119 if (client_)
120 client_->didGetReadable();
121 return;
123 if (repost) {
124 // We don't re-post the task when the runner changes while waiting for
125 // this task because in this case a new reader is obtained and
126 // notification is already done at the reader creation time if necessary.
127 runner->PostTask(FROM_HERE,
128 base::Bind(&Context::NotifyInternal, this, false));
131 void Clear() {
132 for (auto& data : queue_) {
133 delete data;
135 queue_.clear();
136 first_offset_ = 0;
137 client_ = nullptr;
140 friend class base::RefCountedThreadSafe<Context>;
141 ~Context() {
142 // This is necessary because the queue stores raw pointers.
143 Clear();
146 base::Lock lock_;
147 // |result_| stores the ultimate state of this handle if it has. Otherwise,
148 // |Ok| is set.
149 Result result_;
150 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
151 // once it is allowed.
152 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
153 size_t first_offset_;
154 Client* client_;
155 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
156 bool is_handle_active_;
158 DISALLOW_COPY_AND_ASSIGN(Context);
161 SharedMemoryDataConsumerHandle::Writer::Writer(
162 const scoped_refptr<Context>& context,
163 BackpressureMode mode)
164 : context_(context), mode_(mode) {
167 SharedMemoryDataConsumerHandle::Writer::~Writer() {
168 Close();
171 void SharedMemoryDataConsumerHandle::Writer::AddData(
172 scoped_ptr<RequestPeer::ReceivedData> data) {
173 if (!data->length()) {
174 // We omit empty data.
175 return;
178 bool needs_notification = false;
180 base::AutoLock lock(context_->lock());
181 if (!context_->is_handle_active() && !context_->is_handle_locked()) {
182 // No one is interested in the data.
183 return;
186 needs_notification = context_->IsEmpty();
187 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
188 if (mode_ == kApplyBackpressure) {
189 data_to_pass =
190 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
191 } else {
192 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
194 context_->Push(data_to_pass.Pass());
197 if (needs_notification)
198 context_->Notify();
201 void SharedMemoryDataConsumerHandle::Writer::Close() {
202 bool needs_notification = false;
205 base::AutoLock lock(context_->lock());
206 if (context_->result() == Ok) {
207 context_->set_result(Done);
208 needs_notification = context_->IsEmpty();
211 if (needs_notification)
212 context_->Notify();
215 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
216 scoped_refptr<Context> context,
217 Client* client)
218 : context_(context) {
219 base::AutoLock lock(context_->lock());
220 DCHECK(!context_->is_handle_locked());
221 context_->AcquireReaderLock(client);
224 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
225 base::AutoLock lock(context_->lock());
226 context_->ReleaseReaderLock();
227 context_->ClearIfNecessary();
230 Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
231 void* data,
232 size_t size,
233 Flags flags,
234 size_t* read_size_to_return) {
235 base::AutoLock lock(context_->lock());
237 size_t total_read_size = 0;
238 *read_size_to_return = 0;
239 if (context_->result() != Ok && context_->result() != Done)
240 return context_->result();
242 while (!context_->IsEmpty() && total_read_size < size) {
243 const auto& top = context_->Top();
244 size_t readable = top->length() - context_->first_offset();
245 size_t writable = size - total_read_size;
246 size_t read_size = std::min(readable, writable);
247 const char* begin = top->payload() + context_->first_offset();
248 std::copy(begin, begin + read_size,
249 static_cast<char*>(data) + total_read_size);
250 total_read_size += read_size;
251 context_->Consume(read_size);
253 *read_size_to_return = total_read_size;
254 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
257 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
258 const void** buffer,
259 Flags flags,
260 size_t* available) {
261 *buffer = nullptr;
262 *available = 0;
264 base::AutoLock lock(context_->lock());
266 if (context_->result() != Ok && context_->result() != Done)
267 return context_->result();
269 if (context_->IsEmpty())
270 return context_->result() == Done ? Done : ShouldWait;
272 const auto& top = context_->Top();
273 *buffer = top->payload() + context_->first_offset();
274 *available = top->length() - context_->first_offset();
276 return Ok;
279 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
280 base::AutoLock lock(context_->lock());
282 if (context_->IsEmpty())
283 return UnexpectedError;
285 context_->Consume(read_size);
286 return Ok;
289 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
290 BackpressureMode mode,
291 scoped_ptr<Writer>* writer)
292 : context_(new Context) {
293 writer->reset(new Writer(context_, mode));
296 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
297 base::AutoLock lock(context_->lock());
298 context_->set_is_handle_active(false);
299 context_->ClearIfNecessary();
302 scoped_ptr<blink::WebDataConsumerHandle::Reader>
303 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
304 return make_scoped_ptr(obtainReaderInternal(client));
307 SharedMemoryDataConsumerHandle::ReaderImpl*
308 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
309 return new ReaderImpl(context_, client);
312 Result SharedMemoryDataConsumerHandle::read(void* data,
313 size_t size,
314 Flags flags,
315 size_t* read_size_to_return) {
316 // Note this (and below similar functions) is a bit racy. We don't care about
317 // it because this is a deprecated function and will be removed shortly.
318 LockImplicitly();
319 return reader_->read(data, size, flags, read_size_to_return);
322 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
323 Flags flags,
324 size_t* available) {
325 LockImplicitly();
326 return reader_->beginRead(buffer, flags, available);
329 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
330 LockImplicitly();
331 return reader_->endRead(read_size);
334 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
335 UnlockImplicitly();
336 reader_ = ObtainReader(client);
339 void SharedMemoryDataConsumerHandle::unregisterClient() {
340 reader_.reset();
343 void SharedMemoryDataConsumerHandle::LockImplicitly() {
345 base::AutoLock lock(context_->lock());
346 if (reader_) {
347 DCHECK(context_->IsReaderBoundToCurrentThread());
348 return;
351 reader_ = ObtainReader(nullptr);
354 void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
355 bool needs_unlock = false;
357 base::AutoLock lock(context_->lock());
358 if (reader_) {
359 DCHECK(context_->IsReaderBoundToCurrentThread());
360 needs_unlock = true;
363 if (needs_unlock) {
364 reader_.reset();
368 } // namespace content