1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "content/public/child/fixed_received_data.h"
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData
{
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr
<RequestPeer::ReceivedData
> data
)
26 : data_(data
.Pass()), task_runner_(base::ThreadTaskRunnerHandle::Get()) {}
27 ~DelegateThreadSafeReceivedData() override
{
28 if (!task_runner_
->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_
->DeleteSoon(FROM_HERE
, data_
.release());
34 const char* payload() const override
{ return data_
->payload(); }
35 int length() const override
{ return data_
->length(); }
36 int encoded_length() const override
{ return data_
->encoded_length(); }
39 scoped_ptr
<RequestPeer::ReceivedData
> data_
;
40 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner_
;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData
);
47 using Result
= blink::WebDataConsumerHandle::Result
;
49 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe
<Context
> {
52 explicit Context(const base::Closure
& on_reader_detached
)
56 writer_task_runner_(base::ThreadTaskRunnerHandle::Get()),
57 on_reader_detached_(on_reader_detached
),
58 is_on_reader_detached_valid_(!on_reader_detached_
.is_null()),
59 is_handle_active_(true),
60 is_two_phase_read_in_progress_(false) {}
62 bool IsEmpty() const { return queue_
.empty(); }
63 void ClearIfNecessary() {
64 if (!is_handle_locked() && !is_handle_active()) {
65 // No one is interested in the contents.
66 if (is_on_reader_detached_valid_
) {
67 // We post a task even in the writer thread in order to avoid a
68 // reentrance problem as calling |on_reader_detached_| may manipulate
69 // the context synchronously.
70 writer_task_runner_
->PostTask(FROM_HERE
, on_reader_detached_
);
76 for (auto& data
: queue_
) {
82 RequestPeer::ThreadSafeReceivedData
* Top() { return queue_
.front(); }
83 void Push(scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data
) {
84 queue_
.push_back(data
.release());
86 size_t first_offset() const { return first_offset_
; }
87 Result
result() const { return result_
; }
88 void set_result(Result r
) { result_
= r
; }
89 void AcquireReaderLock(Client
* client
) {
90 DCHECK(!notification_task_runner_
);
92 notification_task_runner_
= base::ThreadTaskRunnerHandle::Get();
94 if (client
&& !(IsEmpty() && result() == Ok
)) {
95 // We cannot notify synchronously because the user doesn't have the reader
97 notification_task_runner_
->PostTask(
98 FROM_HERE
, base::Bind(&Context::NotifyInternal
, this, false));
101 void ReleaseReaderLock() {
102 DCHECK(notification_task_runner_
);
103 notification_task_runner_
= nullptr;
107 auto runner
= notification_task_runner_
;
110 // We don't re-post the task when the runner changes while waiting for
111 // this task because in this case a new reader is obtained and
112 // notification is already done at the reader creation time if necessary.
113 runner
->PostTask(FROM_HERE
,
114 base::Bind(&Context::NotifyInternal
, this, false));
116 void Notify() { NotifyInternal(true); }
117 // This function doesn't work in the destructor if |on_reader_detached_| is
119 void ResetOnReaderDetached() {
120 if (on_reader_detached_
.is_null()) {
121 DCHECK(!is_on_reader_detached_valid_
);
124 is_on_reader_detached_valid_
= false;
125 if (writer_task_runner_
->BelongsToCurrentThread()) {
126 // We can reset the closure immediately.
127 on_reader_detached_
.Reset();
129 // We need to reset |on_reader_detached_| on the right thread because it
130 // might lead to the object destruction.
131 writer_task_runner_
->PostTask(
132 FROM_HERE
, base::Bind(&Context::ResetOnReaderDetachedWithLock
, this));
135 bool is_handle_locked() const { return notification_task_runner_
; }
136 bool IsReaderBoundToCurrentThread() const {
137 return notification_task_runner_
&&
138 notification_task_runner_
->BelongsToCurrentThread();
140 bool is_handle_active() const { return is_handle_active_
; }
141 void set_is_handle_active(bool b
) { is_handle_active_
= b
; }
142 void Consume(size_t s
) {
145 if (static_cast<size_t>(top
->length()) <= first_offset_
) {
151 bool is_two_phase_read_in_progress() const {
152 return is_two_phase_read_in_progress_
;
154 void set_is_two_phase_read_in_progress(bool b
) {
155 is_two_phase_read_in_progress_
= b
;
157 base::Lock
& lock() { return lock_
; }
160 void NotifyInternal(bool repost
) {
161 // Note that this function is not protected by |lock_|.
163 auto runner
= notification_task_runner_
;
167 if (runner
->BelongsToCurrentThread()) {
168 // It is safe to access member variables without lock because |client_|
169 // is bound to the current thread.
171 client_
->didGetReadable();
175 // We don't re-post the task when the runner changes while waiting for
176 // this task because in this case a new reader is obtained and
177 // notification is already done at the reader creation time if necessary.
178 runner
->PostTask(FROM_HERE
,
179 base::Bind(&Context::NotifyInternal
, this, false));
183 for (auto& data
: queue_
) {
189 // Note this doesn't work in the destructor if |on_reader_detached_| is not
190 // null. We have an assert in the destructor.
191 ResetOnReaderDetached();
193 void ResetOnReaderDetachedWithLock() {
194 base::AutoLock
lock(lock_
);
195 ResetOnReaderDetached();
198 friend class base::RefCountedThreadSafe
<Context
>;
200 DCHECK(on_reader_detached_
.is_null());
202 // This is necessary because the queue stores raw pointers.
207 // |result_| stores the ultimate state of this handle if it has. Otherwise,
210 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
211 // once it is allowed.
212 std::deque
<RequestPeer::ThreadSafeReceivedData
*> queue_
;
213 size_t first_offset_
;
215 scoped_refptr
<base::SingleThreadTaskRunner
> notification_task_runner_
;
216 scoped_refptr
<base::SingleThreadTaskRunner
> writer_task_runner_
;
217 base::Closure on_reader_detached_
;
218 // We need this boolean variable to remember if |on_reader_detached_| is
219 // callable because we need to reset |on_reader_detached_| only on the writer
220 // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
222 bool is_on_reader_detached_valid_
;
223 bool is_handle_active_
;
224 bool is_two_phase_read_in_progress_
;
226 DISALLOW_COPY_AND_ASSIGN(Context
);
229 SharedMemoryDataConsumerHandle::Writer::Writer(
230 const scoped_refptr
<Context
>& context
,
231 BackpressureMode mode
)
232 : context_(context
), mode_(mode
) {
235 SharedMemoryDataConsumerHandle::Writer::~Writer() {
237 base::AutoLock
lock(context_
->lock());
238 context_
->ResetOnReaderDetached();
241 void SharedMemoryDataConsumerHandle::Writer::AddData(
242 scoped_ptr
<RequestPeer::ReceivedData
> data
) {
243 if (!data
->length()) {
244 // We omit empty data.
248 bool needs_notification
= false;
250 base::AutoLock
lock(context_
->lock());
251 if (!context_
->is_handle_active() && !context_
->is_handle_locked()) {
252 // No one is interested in the data.
256 needs_notification
= context_
->IsEmpty();
257 scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data_to_pass
;
258 if (mode_
== kApplyBackpressure
) {
260 make_scoped_ptr(new DelegateThreadSafeReceivedData(data
.Pass()));
262 data_to_pass
= make_scoped_ptr(new FixedReceivedData(data
.get()));
264 context_
->Push(data_to_pass
.Pass());
267 if (needs_notification
) {
268 // We CAN issue the notification synchronously if the associated reader
269 // lives in this thread, because this function cannot be called in the
270 // client's callback.
275 void SharedMemoryDataConsumerHandle::Writer::Close() {
276 bool needs_notification
= false;
279 base::AutoLock
lock(context_
->lock());
280 if (context_
->result() == Ok
) {
281 context_
->set_result(Done
);
282 context_
->ResetOnReaderDetached();
283 needs_notification
= context_
->IsEmpty();
286 if (needs_notification
) {
287 // We cannot issue the notification synchronously because this function can
288 // be called in the client's callback.
289 context_
->PostNotify();
293 void SharedMemoryDataConsumerHandle::Writer::Fail() {
294 bool needs_notification
= false;
296 base::AutoLock
lock(context_
->lock());
297 if (context_
->result() == Ok
) {
298 // TODO(yhirano): Use an appropriate error code other than
300 context_
->set_result(UnexpectedError
);
302 if (context_
->is_two_phase_read_in_progress()) {
303 // If we are in two-phase read session, we cannot discard the data. We
304 // will clear the queue at the end of the session.
306 context_
->ClearQueue();
309 context_
->ResetOnReaderDetached();
310 needs_notification
= true;
313 if (needs_notification
) {
314 // We cannot issue the notification synchronously because this function can
315 // be called in the client's callback.
316 context_
->PostNotify();
320 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
321 scoped_refptr
<Context
> context
,
323 : context_(context
) {
324 base::AutoLock
lock(context_
->lock());
325 DCHECK(!context_
->is_handle_locked());
326 context_
->AcquireReaderLock(client
);
329 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
330 base::AutoLock
lock(context_
->lock());
331 context_
->ReleaseReaderLock();
332 context_
->ClearIfNecessary();
335 Result
SharedMemoryDataConsumerHandle::ReaderImpl::read(
339 size_t* read_size_to_return
) {
340 base::AutoLock
lock(context_
->lock());
342 size_t total_read_size
= 0;
343 *read_size_to_return
= 0;
345 if (context_
->result() == Ok
&& context_
->is_two_phase_read_in_progress())
346 context_
->set_result(UnexpectedError
);
348 if (context_
->result() != Ok
&& context_
->result() != Done
)
349 return context_
->result();
351 while (!context_
->IsEmpty() && total_read_size
< size
) {
352 const auto& top
= context_
->Top();
353 size_t readable
= top
->length() - context_
->first_offset();
354 size_t writable
= size
- total_read_size
;
355 size_t read_size
= std::min(readable
, writable
);
356 const char* begin
= top
->payload() + context_
->first_offset();
357 std::copy(begin
, begin
+ read_size
,
358 static_cast<char*>(data
) + total_read_size
);
359 total_read_size
+= read_size
;
360 context_
->Consume(read_size
);
362 *read_size_to_return
= total_read_size
;
363 if (total_read_size
|| !context_
->IsEmpty())
365 if (context_
->result() == Done
)
370 Result
SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
377 base::AutoLock
lock(context_
->lock());
379 if (context_
->result() == Ok
&& context_
->is_two_phase_read_in_progress())
380 context_
->set_result(UnexpectedError
);
382 if (context_
->result() != Ok
&& context_
->result() != Done
)
383 return context_
->result();
385 if (context_
->IsEmpty())
386 return context_
->result() == Done
? Done
: ShouldWait
;
388 context_
->set_is_two_phase_read_in_progress(true);
389 const auto& top
= context_
->Top();
390 *buffer
= top
->payload() + context_
->first_offset();
391 *available
= top
->length() - context_
->first_offset();
396 Result
SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size
) {
397 base::AutoLock
lock(context_
->lock());
399 if (!context_
->is_two_phase_read_in_progress())
400 return UnexpectedError
;
402 context_
->set_is_two_phase_read_in_progress(false);
403 if (context_
->result() != Ok
&& context_
->result() != Done
) {
404 // We have an error, so we can discard the stored data.
405 context_
->ClearQueue();
407 context_
->Consume(read_size
);
413 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
414 BackpressureMode mode
,
415 scoped_ptr
<Writer
>* writer
)
416 : SharedMemoryDataConsumerHandle(mode
, base::Closure(), writer
) {
419 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
420 BackpressureMode mode
,
421 const base::Closure
& on_reader_detached
,
422 scoped_ptr
<Writer
>* writer
)
423 : context_(new Context(on_reader_detached
)) {
424 writer
->reset(new Writer(context_
, mode
));
427 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
428 base::AutoLock
lock(context_
->lock());
429 context_
->set_is_handle_active(false);
430 context_
->ClearIfNecessary();
433 scoped_ptr
<blink::WebDataConsumerHandle::Reader
>
434 SharedMemoryDataConsumerHandle::ObtainReader(Client
* client
) {
435 return make_scoped_ptr(obtainReaderInternal(client
));
438 SharedMemoryDataConsumerHandle::ReaderImpl
*
439 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client
* client
) {
440 return new ReaderImpl(context_
, client
);
443 const char* SharedMemoryDataConsumerHandle::debugName() const {
444 return "SharedMemoryDataConsumerHandle";
447 } // namespace content