1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "content/public/child/fixed_received_data.h"
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData
{
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr
<RequestPeer::ReceivedData
> data
)
26 : data_(data
.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
27 ~DelegateThreadSafeReceivedData() override
{
28 if (!task_runner_
->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_
->DeleteSoon(FROM_HERE
, data_
.release());
34 const char* payload() const override
{ return data_
->payload(); }
35 int length() const override
{ return data_
->length(); }
36 int encoded_length() const override
{ return data_
->encoded_length(); }
39 scoped_ptr
<RequestPeer::ReceivedData
> data_
;
40 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner_
;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData
);
47 using Result
= blink::WebDataConsumerHandle::Result
;
49 // All methods (except for ctor/dtor) must be called with |lock_| aquired
50 // unless otherwise stated.
51 class SharedMemoryDataConsumerHandle::Context final
52 : public base::RefCountedThreadSafe
<Context
> {
54 explicit Context(const base::Closure
& on_reader_detached
)
58 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
59 on_reader_detached_(on_reader_detached
),
60 is_on_reader_detached_valid_(!on_reader_detached_
.is_null()),
61 is_handle_active_(true),
62 is_two_phase_read_in_progress_(false) {}
64 bool IsEmpty() const {
65 lock_
.AssertAcquired();
66 return queue_
.empty();
68 void ClearIfNecessary() {
69 lock_
.AssertAcquired();
70 if (!is_handle_locked() && !is_handle_active()) {
71 // No one is interested in the contents.
72 if (is_on_reader_detached_valid_
) {
73 // We post a task even in the writer thread in order to avoid a
74 // reentrance problem as calling |on_reader_detached_| may manipulate
75 // the context synchronously.
76 writer_task_runner_
->PostTask(FROM_HERE
, on_reader_detached_
);
82 lock_
.AssertAcquired();
83 for (auto& data
: queue_
) {
89 RequestPeer::ThreadSafeReceivedData
* Top() {
90 lock_
.AssertAcquired();
91 return queue_
.front();
93 void Push(scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data
) {
94 lock_
.AssertAcquired();
95 queue_
.push_back(data
.release());
97 size_t first_offset() const {
98 lock_
.AssertAcquired();
101 Result
result() const {
102 lock_
.AssertAcquired();
105 void set_result(Result r
) {
106 lock_
.AssertAcquired();
109 void AcquireReaderLock(Client
* client
) {
110 lock_
.AssertAcquired();
111 DCHECK(!notification_task_runner_
);
113 notification_task_runner_
= base::ThreadTaskRunnerHandle::Get();
115 if (client
&& !(IsEmpty() && result() == Ok
)) {
116 // We cannot notify synchronously because the user doesn't have the reader
118 notification_task_runner_
->PostTask(
119 FROM_HERE
, base::Bind(&Context::NotifyInternal
, this, false));
122 void ReleaseReaderLock() {
123 lock_
.AssertAcquired();
124 DCHECK(notification_task_runner_
);
125 notification_task_runner_
= nullptr;
129 lock_
.AssertAcquired();
130 auto runner
= notification_task_runner_
;
133 // We don't re-post the task when the runner changes while waiting for
134 // this task because in this case a new reader is obtained and
135 // notification is already done at the reader creation time if necessary.
136 runner
->PostTask(FROM_HERE
,
137 base::Bind(&Context::NotifyInternal
, this, false));
139 // Must be called with |lock_| not aquired.
140 void Notify() { NotifyInternal(true); }
141 // This function doesn't work in the destructor if |on_reader_detached_| is
143 void ResetOnReaderDetached() {
144 lock_
.AssertAcquired();
145 if (on_reader_detached_
.is_null()) {
146 DCHECK(!is_on_reader_detached_valid_
);
149 is_on_reader_detached_valid_
= false;
150 if (writer_task_runner_
->BelongsToCurrentThread()) {
151 // We can reset the closure immediately.
152 on_reader_detached_
.Reset();
154 // We need to reset |on_reader_detached_| on the right thread because it
155 // might lead to the object destruction.
156 writer_task_runner_
->PostTask(
157 FROM_HERE
, base::Bind(&Context::ResetOnReaderDetachedWithLock
, this));
160 bool is_handle_locked() const {
161 lock_
.AssertAcquired();
162 return notification_task_runner_
;
164 bool IsReaderBoundToCurrentThread() const {
165 lock_
.AssertAcquired();
166 return notification_task_runner_
&&
167 notification_task_runner_
->BelongsToCurrentThread();
169 bool is_handle_active() const {
170 lock_
.AssertAcquired();
171 return is_handle_active_
;
173 void set_is_handle_active(bool b
) {
174 lock_
.AssertAcquired();
175 is_handle_active_
= b
;
177 void Consume(size_t s
) {
178 lock_
.AssertAcquired();
181 if (static_cast<size_t>(top
->length()) <= first_offset_
) {
187 bool is_two_phase_read_in_progress() const {
188 lock_
.AssertAcquired();
189 return is_two_phase_read_in_progress_
;
191 void set_is_two_phase_read_in_progress(bool b
) {
192 lock_
.AssertAcquired();
193 is_two_phase_read_in_progress_
= b
;
195 // Can be called with |lock_| not aquired.
196 base::Lock
& lock() { return lock_
; }
199 // Must be called with |lock_| not aquired.
200 void NotifyInternal(bool repost
) {
201 scoped_refptr
<base::SingleThreadTaskRunner
> runner
;
203 base::AutoLock
lock(lock_
);
204 runner
= notification_task_runner_
;
209 if (runner
->BelongsToCurrentThread()) {
210 // It is safe to access member variables without lock because |client_|
211 // is bound to the current thread.
213 client_
->didGetReadable();
217 // We don't re-post the task when the runner changes while waiting for
218 // this task because in this case a new reader is obtained and
219 // notification is already done at the reader creation time if necessary.
220 runner
->PostTask(FROM_HERE
,
221 base::Bind(&Context::NotifyInternal
, this, false));
225 lock_
.AssertAcquired();
226 for (auto& data
: queue_
) {
232 // Note this doesn't work in the destructor if |on_reader_detached_| is not
233 // null. We have an assert in the destructor.
234 ResetOnReaderDetached();
236 // Must be called with |lock_| not aquired.
237 void ResetOnReaderDetachedWithLock() {
238 base::AutoLock
lock(lock_
);
239 ResetOnReaderDetached();
242 friend class base::RefCountedThreadSafe
<Context
>;
244 base::AutoLock
lock(lock_
);
245 DCHECK(on_reader_detached_
.is_null());
247 // This is necessary because the queue stores raw pointers.
252 // |result_| stores the ultimate state of this handle if it has. Otherwise,
255 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
256 // once it is allowed.
257 std::deque
<RequestPeer::ThreadSafeReceivedData
*> queue_
;
258 size_t first_offset_
;
260 scoped_refptr
<base::SingleThreadTaskRunner
> notification_task_runner_
;
261 scoped_refptr
<base::SingleThreadTaskRunner
> writer_task_runner_
;
262 base::Closure on_reader_detached_
;
263 // We need this boolean variable to remember if |on_reader_detached_| is
264 // callable because we need to reset |on_reader_detached_| only on the writer
265 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
267 bool is_on_reader_detached_valid_
;
268 bool is_handle_active_
;
269 bool is_two_phase_read_in_progress_
;
271 DISALLOW_COPY_AND_ASSIGN(Context
);
274 SharedMemoryDataConsumerHandle::Writer::Writer(
275 const scoped_refptr
<Context
>& context
,
276 BackpressureMode mode
)
277 : context_(context
), mode_(mode
) {
280 SharedMemoryDataConsumerHandle::Writer::~Writer() {
282 base::AutoLock
lock(context_
->lock());
283 context_
->ResetOnReaderDetached();
286 void SharedMemoryDataConsumerHandle::Writer::AddData(
287 scoped_ptr
<RequestPeer::ReceivedData
> data
) {
288 if (!data
->length()) {
289 // We omit empty data.
293 bool needs_notification
= false;
295 base::AutoLock
lock(context_
->lock());
296 if (!context_
->is_handle_active() && !context_
->is_handle_locked()) {
297 // No one is interested in the data.
301 needs_notification
= context_
->IsEmpty();
302 scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data_to_pass
;
303 if (mode_
== kApplyBackpressure
) {
305 make_scoped_ptr(new DelegateThreadSafeReceivedData(data
.Pass()));
307 data_to_pass
= make_scoped_ptr(new FixedReceivedData(data
.get()));
309 context_
->Push(data_to_pass
.Pass());
312 if (needs_notification
) {
313 // We CAN issue the notification synchronously if the associated reader
314 // lives in this thread, because this function cannot be called in the
315 // client's callback.
320 void SharedMemoryDataConsumerHandle::Writer::Close() {
321 base::AutoLock
lock(context_
->lock());
322 if (context_
->result() == Ok
) {
323 context_
->set_result(Done
);
324 context_
->ResetOnReaderDetached();
325 if (context_
->IsEmpty()) {
326 // We cannot issue the notification synchronously because this function
327 // can be called in the client's callback.
328 context_
->PostNotify();
333 void SharedMemoryDataConsumerHandle::Writer::Fail() {
334 base::AutoLock
lock(context_
->lock());
335 if (context_
->result() == Ok
) {
336 // TODO(yhirano): Use an appropriate error code other than
338 context_
->set_result(UnexpectedError
);
340 if (context_
->is_two_phase_read_in_progress()) {
341 // If we are in two-phase read session, we cannot discard the data. We
342 // will clear the queue at the end of the session.
344 context_
->ClearQueue();
347 context_
->ResetOnReaderDetached();
348 // We cannot issue the notification synchronously because this function can
349 // be called in the client's callback.
350 context_
->PostNotify();
354 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
355 scoped_refptr
<Context
> context
,
357 : context_(context
) {
358 base::AutoLock
lock(context_
->lock());
359 DCHECK(!context_
->is_handle_locked());
360 context_
->AcquireReaderLock(client
);
363 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
364 base::AutoLock
lock(context_
->lock());
365 context_
->ReleaseReaderLock();
366 context_
->ClearIfNecessary();
369 Result
SharedMemoryDataConsumerHandle::ReaderImpl::read(
373 size_t* read_size_to_return
) {
374 base::AutoLock
lock(context_
->lock());
376 size_t total_read_size
= 0;
377 *read_size_to_return
= 0;
379 if (context_
->result() == Ok
&& context_
->is_two_phase_read_in_progress())
380 context_
->set_result(UnexpectedError
);
382 if (context_
->result() != Ok
&& context_
->result() != Done
)
383 return context_
->result();
385 while (!context_
->IsEmpty() && total_read_size
< size
) {
386 const auto& top
= context_
->Top();
387 size_t readable
= top
->length() - context_
->first_offset();
388 size_t writable
= size
- total_read_size
;
389 size_t read_size
= std::min(readable
, writable
);
390 const char* begin
= top
->payload() + context_
->first_offset();
391 std::copy(begin
, begin
+ read_size
,
392 static_cast<char*>(data
) + total_read_size
);
393 total_read_size
+= read_size
;
394 context_
->Consume(read_size
);
396 *read_size_to_return
= total_read_size
;
397 if (total_read_size
|| !context_
->IsEmpty())
399 if (context_
->result() == Done
)
404 Result
SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
411 base::AutoLock
lock(context_
->lock());
413 if (context_
->result() == Ok
&& context_
->is_two_phase_read_in_progress())
414 context_
->set_result(UnexpectedError
);
416 if (context_
->result() != Ok
&& context_
->result() != Done
)
417 return context_
->result();
419 if (context_
->IsEmpty())
420 return context_
->result() == Done
? Done
: ShouldWait
;
422 context_
->set_is_two_phase_read_in_progress(true);
423 const auto& top
= context_
->Top();
424 *buffer
= top
->payload() + context_
->first_offset();
425 *available
= top
->length() - context_
->first_offset();
430 Result
SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size
) {
431 base::AutoLock
lock(context_
->lock());
433 if (!context_
->is_two_phase_read_in_progress())
434 return UnexpectedError
;
436 context_
->set_is_two_phase_read_in_progress(false);
437 if (context_
->result() != Ok
&& context_
->result() != Done
) {
438 // We have an error, so we can discard the stored data.
439 context_
->ClearQueue();
441 context_
->Consume(read_size
);
447 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
448 BackpressureMode mode
,
449 scoped_ptr
<Writer
>* writer
)
450 : SharedMemoryDataConsumerHandle(mode
, base::Closure(), writer
) {
453 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
454 BackpressureMode mode
,
455 const base::Closure
& on_reader_detached
,
456 scoped_ptr
<Writer
>* writer
)
457 : context_(new Context(on_reader_detached
)) {
458 writer
->reset(new Writer(context_
, mode
));
461 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
462 base::AutoLock
lock(context_
->lock());
463 context_
->set_is_handle_active(false);
464 context_
->ClearIfNecessary();
467 scoped_ptr
<blink::WebDataConsumerHandle::Reader
>
468 SharedMemoryDataConsumerHandle::ObtainReader(Client
* client
) {
469 return make_scoped_ptr(obtainReaderInternal(client
));
472 SharedMemoryDataConsumerHandle::ReaderImpl
*
473 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client
* client
) {
474 return new ReaderImpl(context_
, client
);
477 const char* SharedMemoryDataConsumerHandle::debugName() const {
478 return "SharedMemoryDataConsumerHandle";
481 } // namespace content