service_resolver_64: Correctly check all the bytes of the service code.
[chromium-blink-merge.git] / content / child / shared_memory_data_consumer_handle.cc
blobf11435ada87c22572379c59ce9b80f2e50c85b44
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
7 #include <algorithm>
8 #include <deque>
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "content/public/child/fixed_received_data.h"
17 namespace content {
19 namespace {
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData {
23 public:
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr<RequestPeer::ReceivedData> data)
26 : data_(data.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
27 ~DelegateThreadSafeReceivedData() override {
28 if (!task_runner_->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_->DeleteSoon(FROM_HERE, data_.release());
34 const char* payload() const override { return data_->payload(); }
35 int length() const override { return data_->length(); }
36 int encoded_length() const override { return data_->encoded_length(); }
38 private:
39 scoped_ptr<RequestPeer::ReceivedData> data_;
40 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
45 } // namespace
47 using Result = blink::WebDataConsumerHandle::Result;
49 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe<Context> {
51 public:
52 explicit Context(const base::Closure& on_reader_detached)
53 : result_(Ok),
54 first_offset_(0),
55 client_(nullptr),
56 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
57 on_reader_detached_(on_reader_detached),
58 is_on_reader_detached_valid_(!on_reader_detached_.is_null()),
59 is_handle_active_(true),
60 is_two_phase_read_in_progress_(false) {}
62 bool IsEmpty() const { return queue_.empty(); }
63 void ClearIfNecessary() {
64 if (!is_handle_locked() && !is_handle_active()) {
65 // No one is interested in the contents.
66 if (is_on_reader_detached_valid_) {
67 // We post a task even in the writer thread in order to avoid a
68 // reentrance problem as calling |on_reader_detached_| may manipulate
69 // the context synchronously.
70 writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_);
72 Clear();
75 void ClearQueue() {
76 for (auto& data : queue_) {
77 delete data;
79 queue_.clear();
80 first_offset_ = 0;
82 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
83 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
84 queue_.push_back(data.release());
86 size_t first_offset() const { return first_offset_; }
87 Result result() const { return result_; }
88 void set_result(Result r) { result_ = r; }
89 void AcquireReaderLock(Client* client) {
90 DCHECK(!notification_task_runner_);
91 DCHECK(!client_);
92 notification_task_runner_ = base::ThreadTaskRunnerHandle::Get();
93 client_ = client;
94 if (client && !(IsEmpty() && result() == Ok)) {
95 // We cannot notify synchronously because the user doesn't have the reader
96 // yet.
97 notification_task_runner_->PostTask(
98 FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
101 void ReleaseReaderLock() {
102 DCHECK(notification_task_runner_);
103 notification_task_runner_ = nullptr;
104 client_ = nullptr;
106 void PostNotify() {
107 auto runner = notification_task_runner_;
108 if (!runner)
109 return;
110 // We don't re-post the task when the runner changes while waiting for
111 // this task because in this case a new reader is obtained and
112 // notification is already done at the reader creation time if necessary.
113 runner->PostTask(FROM_HERE,
114 base::Bind(&Context::NotifyInternal, this, false));
116 void Notify() { NotifyInternal(true); }
117 // This function doesn't work in the destructor if |on_reader_detached_| is
118 // not null.
119 void ResetOnReaderDetached() {
120 if (on_reader_detached_.is_null()) {
121 DCHECK(!is_on_reader_detached_valid_);
122 return;
124 is_on_reader_detached_valid_ = false;
125 if (writer_task_runner_->BelongsToCurrentThread()) {
126 // We can reset the closure immediately.
127 on_reader_detached_.Reset();
128 } else {
129 // We need to reset |on_reader_detached_| on the right thread because it
130 // might lead to the object destruction.
131 writer_task_runner_->PostTask(
132 FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this));
135 bool is_handle_locked() const { return notification_task_runner_; }
136 bool IsReaderBoundToCurrentThread() const {
137 return notification_task_runner_ &&
138 notification_task_runner_->BelongsToCurrentThread();
140 bool is_handle_active() const { return is_handle_active_; }
141 void set_is_handle_active(bool b) { is_handle_active_ = b; }
142 void Consume(size_t s) {
143 first_offset_ += s;
144 auto top = Top();
145 if (static_cast<size_t>(top->length()) <= first_offset_) {
146 delete top;
147 queue_.pop_front();
148 first_offset_ = 0;
151 bool is_two_phase_read_in_progress() const {
152 return is_two_phase_read_in_progress_;
154 void set_is_two_phase_read_in_progress(bool b) {
155 is_two_phase_read_in_progress_ = b;
157 base::Lock& lock() { return lock_; }
159 private:
160 void NotifyInternal(bool repost) {
161 // Note that this function is not protected by |lock_|.
163 auto runner = notification_task_runner_;
164 if (!runner)
165 return;
167 if (runner->BelongsToCurrentThread()) {
168 // It is safe to access member variables without lock because |client_|
169 // is bound to the current thread.
170 if (client_)
171 client_->didGetReadable();
172 return;
174 if (repost) {
175 // We don't re-post the task when the runner changes while waiting for
176 // this task because in this case a new reader is obtained and
177 // notification is already done at the reader creation time if necessary.
178 runner->PostTask(FROM_HERE,
179 base::Bind(&Context::NotifyInternal, this, false));
182 void Clear() {
183 for (auto& data : queue_) {
184 delete data;
186 queue_.clear();
187 first_offset_ = 0;
188 client_ = nullptr;
189 // Note this doesn't work in the destructor if |on_reader_detached_| is not
190 // null. We have an assert in the destructor.
191 ResetOnReaderDetached();
193 void ResetOnReaderDetachedWithLock() {
194 base::AutoLock lock(lock_);
195 ResetOnReaderDetached();
198 friend class base::RefCountedThreadSafe<Context>;
199 ~Context() {
200 DCHECK(on_reader_detached_.is_null());
202 // This is necessary because the queue stores raw pointers.
203 Clear();
206 base::Lock lock_;
207 // |result_| stores the ultimate state of this handle if it has. Otherwise,
208 // |Ok| is set.
209 Result result_;
210 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
211 // once it is allowed.
212 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
213 size_t first_offset_;
214 Client* client_;
215 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
216 scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_;
217 base::Closure on_reader_detached_;
218 // We need this boolean variable to remember if |on_reader_detached_| is
219 // callable because we need to reset |on_reader_detached_| only on the writer
220 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
221 // other threads.
222 bool is_on_reader_detached_valid_;
223 bool is_handle_active_;
224 bool is_two_phase_read_in_progress_;
226 DISALLOW_COPY_AND_ASSIGN(Context);
229 SharedMemoryDataConsumerHandle::Writer::Writer(
230 const scoped_refptr<Context>& context,
231 BackpressureMode mode)
232 : context_(context), mode_(mode) {
235 SharedMemoryDataConsumerHandle::Writer::~Writer() {
236 Close();
237 base::AutoLock lock(context_->lock());
238 context_->ResetOnReaderDetached();
241 void SharedMemoryDataConsumerHandle::Writer::AddData(
242 scoped_ptr<RequestPeer::ReceivedData> data) {
243 if (!data->length()) {
244 // We omit empty data.
245 return;
248 bool needs_notification = false;
250 base::AutoLock lock(context_->lock());
251 if (!context_->is_handle_active() && !context_->is_handle_locked()) {
252 // No one is interested in the data.
253 return;
256 needs_notification = context_->IsEmpty();
257 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
258 if (mode_ == kApplyBackpressure) {
259 data_to_pass =
260 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
261 } else {
262 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
264 context_->Push(data_to_pass.Pass());
267 if (needs_notification) {
268 // We CAN issue the notification synchronously if the associated reader
269 // lives in this thread, because this function cannot be called in the
270 // client's callback.
271 context_->Notify();
275 void SharedMemoryDataConsumerHandle::Writer::Close() {
276 bool needs_notification = false;
279 base::AutoLock lock(context_->lock());
280 if (context_->result() == Ok) {
281 context_->set_result(Done);
282 context_->ResetOnReaderDetached();
283 needs_notification = context_->IsEmpty();
286 if (needs_notification) {
287 // We cannot issue the notification synchronously because this function can
288 // be called in the client's callback.
289 context_->PostNotify();
293 void SharedMemoryDataConsumerHandle::Writer::Fail() {
294 bool needs_notification = false;
296 base::AutoLock lock(context_->lock());
297 if (context_->result() == Ok) {
298 // TODO(yhirano): Use an appropriate error code other than
299 // UnexpectedError.
300 context_->set_result(UnexpectedError);
302 if (context_->is_two_phase_read_in_progress()) {
303 // If we are in two-phase read session, we cannot discard the data. We
304 // will clear the queue at the end of the session.
305 } else {
306 context_->ClearQueue();
309 context_->ResetOnReaderDetached();
310 needs_notification = true;
313 if (needs_notification) {
314 // We cannot issue the notification synchronously because this function can
315 // be called in the client's callback.
316 context_->PostNotify();
320 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
321 scoped_refptr<Context> context,
322 Client* client)
323 : context_(context) {
324 base::AutoLock lock(context_->lock());
325 DCHECK(!context_->is_handle_locked());
326 context_->AcquireReaderLock(client);
329 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
330 base::AutoLock lock(context_->lock());
331 context_->ReleaseReaderLock();
332 context_->ClearIfNecessary();
335 Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
336 void* data,
337 size_t size,
338 Flags flags,
339 size_t* read_size_to_return) {
340 base::AutoLock lock(context_->lock());
342 size_t total_read_size = 0;
343 *read_size_to_return = 0;
345 if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
346 context_->set_result(UnexpectedError);
348 if (context_->result() != Ok && context_->result() != Done)
349 return context_->result();
351 while (!context_->IsEmpty() && total_read_size < size) {
352 const auto& top = context_->Top();
353 size_t readable = top->length() - context_->first_offset();
354 size_t writable = size - total_read_size;
355 size_t read_size = std::min(readable, writable);
356 const char* begin = top->payload() + context_->first_offset();
357 std::copy(begin, begin + read_size,
358 static_cast<char*>(data) + total_read_size);
359 total_read_size += read_size;
360 context_->Consume(read_size);
362 *read_size_to_return = total_read_size;
363 if (total_read_size || !context_->IsEmpty())
364 return Ok;
365 if (context_->result() == Done)
366 return Done;
367 return ShouldWait;
370 Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
371 const void** buffer,
372 Flags flags,
373 size_t* available) {
374 *buffer = nullptr;
375 *available = 0;
377 base::AutoLock lock(context_->lock());
379 if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
380 context_->set_result(UnexpectedError);
382 if (context_->result() != Ok && context_->result() != Done)
383 return context_->result();
385 if (context_->IsEmpty())
386 return context_->result() == Done ? Done : ShouldWait;
388 context_->set_is_two_phase_read_in_progress(true);
389 const auto& top = context_->Top();
390 *buffer = top->payload() + context_->first_offset();
391 *available = top->length() - context_->first_offset();
393 return Ok;
396 Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size) {
397 base::AutoLock lock(context_->lock());
399 if (!context_->is_two_phase_read_in_progress())
400 return UnexpectedError;
402 context_->set_is_two_phase_read_in_progress(false);
403 if (context_->result() != Ok && context_->result() != Done) {
404 // We have an error, so we can discard the stored data.
405 context_->ClearQueue();
406 } else {
407 context_->Consume(read_size);
410 return Ok;
413 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
414 BackpressureMode mode,
415 scoped_ptr<Writer>* writer)
416 : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer) {
419 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
420 BackpressureMode mode,
421 const base::Closure& on_reader_detached,
422 scoped_ptr<Writer>* writer)
423 : context_(new Context(on_reader_detached)) {
424 writer->reset(new Writer(context_, mode));
427 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
428 base::AutoLock lock(context_->lock());
429 context_->set_is_handle_active(false);
430 context_->ClearIfNecessary();
433 scoped_ptr<blink::WebDataConsumerHandle::Reader>
434 SharedMemoryDataConsumerHandle::ObtainReader(Client* client) {
435 return make_scoped_ptr(obtainReaderInternal(client));
438 SharedMemoryDataConsumerHandle::ReaderImpl*
439 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client* client) {
440 return new ReaderImpl(context_, client);
443 const char* SharedMemoryDataConsumerHandle::debugName() const {
444 return "SharedMemoryDataConsumerHandle";
447 } // namespace content