1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/synchronization/lock.h"
14 #include "content/public/child/fixed_received_data.h"
20 class DelegateThreadSafeReceivedData final
21 : public RequestPeer::ThreadSafeReceivedData
{
23 explicit DelegateThreadSafeReceivedData(
24 scoped_ptr
<RequestPeer::ReceivedData
> data
)
26 task_runner_(base::MessageLoop::current()->task_runner()) {}
27 ~DelegateThreadSafeReceivedData() override
{
28 if (!task_runner_
->BelongsToCurrentThread()) {
29 // Delete the data on the original thread.
30 task_runner_
->DeleteSoon(FROM_HERE
, data_
.release());
34 const char* payload() const override
{ return data_
->payload(); }
35 int length() const override
{ return data_
->length(); }
36 int encoded_length() const override
{ return data_
->encoded_length(); }
39 scoped_ptr
<RequestPeer::ReceivedData
> data_
;
40 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner_
;
42 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData
);
47 using Result
= blink::WebDataConsumerHandle::Result
;
49 class SharedMemoryDataConsumerHandle::Context final
50 : public base::RefCountedThreadSafe
<Context
> {
56 is_handle_active_(true) {}
58 bool IsEmpty() const { return queue_
.empty(); }
59 void ClearIfNecessary() {
60 if (!is_handle_locked() && !is_handle_active()) {
61 // No one is interested in the contents.
65 RequestPeer::ThreadSafeReceivedData
* Top() { return queue_
.front(); }
66 void Push(scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data
) {
67 queue_
.push_back(data
.release());
69 size_t first_offset() const { return first_offset_
; }
70 Result
result() const { return result_
; }
71 void set_result(Result r
) { result_
= r
; }
72 void AcquireReaderLock(Client
* client
) {
73 DCHECK(!notification_task_runner_
);
75 notification_task_runner_
= base::MessageLoop::current()->task_runner();
77 if (client
&& !(IsEmpty() && result() == Ok
)) {
78 // We cannot notify synchronously because the user doesn't have the reader
80 notification_task_runner_
->PostTask(
81 FROM_HERE
, base::Bind(&Context::NotifyInternal
, this, false));
84 void ReleaseReaderLock() {
85 DCHECK(notification_task_runner_
);
86 notification_task_runner_
= nullptr;
89 void Notify() { NotifyInternal(true); }
90 bool is_handle_locked() const { return notification_task_runner_
; }
91 bool IsReaderBoundToCurrentThread() const {
92 return notification_task_runner_
&&
93 notification_task_runner_
->BelongsToCurrentThread();
95 bool is_handle_active() const { return is_handle_active_
; }
96 void set_is_handle_active(bool b
) { is_handle_active_
= b
; }
97 void Consume(size_t s
) {
100 if (static_cast<size_t>(top
->length()) <= first_offset_
) {
106 base::Lock
& lock() { return lock_
; }
109 void NotifyInternal(bool repost
) {
110 // Note that this function is not protected by |lock_|.
112 auto runner
= notification_task_runner_
;
116 if (runner
->BelongsToCurrentThread()) {
117 // It is safe to access member variables without lock because |client_|
118 // is bound to the current thread.
120 client_
->didGetReadable();
124 // We don't re-post the task when the runner changes while waiting for
125 // this task because in this case a new reader is obtained and
126 // notification is already done at the reader creation time if necessary.
127 runner
->PostTask(FROM_HERE
,
128 base::Bind(&Context::NotifyInternal
, this, false));
132 for (auto& data
: queue_
) {
140 friend class base::RefCountedThreadSafe
<Context
>;
142 // This is necessary because the queue stores raw pointers.
147 // |result_| stores the ultimate state of this handle if it has. Otherwise,
150 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
151 // once it is allowed.
152 std::deque
<RequestPeer::ThreadSafeReceivedData
*> queue_
;
153 size_t first_offset_
;
155 scoped_refptr
<base::SingleThreadTaskRunner
> notification_task_runner_
;
156 bool is_handle_active_
;
158 DISALLOW_COPY_AND_ASSIGN(Context
);
161 SharedMemoryDataConsumerHandle::Writer::Writer(
162 const scoped_refptr
<Context
>& context
,
163 BackpressureMode mode
)
164 : context_(context
), mode_(mode
) {
167 SharedMemoryDataConsumerHandle::Writer::~Writer() {
171 void SharedMemoryDataConsumerHandle::Writer::AddData(
172 scoped_ptr
<RequestPeer::ReceivedData
> data
) {
173 if (!data
->length()) {
174 // We omit empty data.
178 bool needs_notification
= false;
180 base::AutoLock
lock(context_
->lock());
181 if (!context_
->is_handle_active() && !context_
->is_handle_locked()) {
182 // No one is interested in the data.
186 needs_notification
= context_
->IsEmpty();
187 scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data_to_pass
;
188 if (mode_
== kApplyBackpressure
) {
190 make_scoped_ptr(new DelegateThreadSafeReceivedData(data
.Pass()));
192 data_to_pass
= make_scoped_ptr(new FixedReceivedData(data
.get()));
194 context_
->Push(data_to_pass
.Pass());
197 if (needs_notification
)
201 void SharedMemoryDataConsumerHandle::Writer::Close() {
202 bool needs_notification
= false;
205 base::AutoLock
lock(context_
->lock());
206 if (context_
->result() == Ok
) {
207 context_
->set_result(Done
);
208 needs_notification
= context_
->IsEmpty();
211 if (needs_notification
)
215 SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
216 scoped_refptr
<Context
> context
,
218 : context_(context
) {
219 base::AutoLock
lock(context_
->lock());
220 DCHECK(!context_
->is_handle_locked());
221 context_
->AcquireReaderLock(client
);
224 SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl() {
225 base::AutoLock
lock(context_
->lock());
226 context_
->ReleaseReaderLock();
227 context_
->ClearIfNecessary();
230 Result
SharedMemoryDataConsumerHandle::ReaderImpl::read(
234 size_t* read_size_to_return
) {
235 base::AutoLock
lock(context_
->lock());
237 size_t total_read_size
= 0;
238 *read_size_to_return
= 0;
239 if (context_
->result() != Ok
&& context_
->result() != Done
)
240 return context_
->result();
242 while (!context_
->IsEmpty() && total_read_size
< size
) {
243 const auto& top
= context_
->Top();
244 size_t readable
= top
->length() - context_
->first_offset();
245 size_t writable
= size
- total_read_size
;
246 size_t read_size
= std::min(readable
, writable
);
247 const char* begin
= top
->payload() + context_
->first_offset();
248 std::copy(begin
, begin
+ read_size
,
249 static_cast<char*>(data
) + total_read_size
);
250 total_read_size
+= read_size
;
251 context_
->Consume(read_size
);
253 *read_size_to_return
= total_read_size
;
254 return total_read_size
? Ok
: context_
->result() == Done
? Done
: ShouldWait
;
257 Result
SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
264 base::AutoLock
lock(context_
->lock());
266 if (context_
->result() != Ok
&& context_
->result() != Done
)
267 return context_
->result();
269 if (context_
->IsEmpty())
270 return context_
->result() == Done
? Done
: ShouldWait
;
272 const auto& top
= context_
->Top();
273 *buffer
= top
->payload() + context_
->first_offset();
274 *available
= top
->length() - context_
->first_offset();
279 Result
SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size
) {
280 base::AutoLock
lock(context_
->lock());
282 if (context_
->IsEmpty())
283 return UnexpectedError
;
285 context_
->Consume(read_size
);
289 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
290 BackpressureMode mode
,
291 scoped_ptr
<Writer
>* writer
)
292 : context_(new Context
) {
293 writer
->reset(new Writer(context_
, mode
));
296 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
297 base::AutoLock
lock(context_
->lock());
298 context_
->set_is_handle_active(false);
299 context_
->ClearIfNecessary();
302 scoped_ptr
<blink::WebDataConsumerHandle::Reader
>
303 SharedMemoryDataConsumerHandle::ObtainReader(Client
* client
) {
304 return make_scoped_ptr(obtainReaderInternal(client
));
307 SharedMemoryDataConsumerHandle::ReaderImpl
*
308 SharedMemoryDataConsumerHandle::obtainReaderInternal(Client
* client
) {
309 return new ReaderImpl(context_
, client
);
312 Result
SharedMemoryDataConsumerHandle::read(void* data
,
315 size_t* read_size_to_return
) {
316 // Note this (and below similar functions) is a bit racy. We don't care about
317 // it because this is a deprecated function and will be removed shortly.
319 return reader_
->read(data
, size
, flags
, read_size_to_return
);
322 Result
SharedMemoryDataConsumerHandle::beginRead(const void** buffer
,
326 return reader_
->beginRead(buffer
, flags
, available
);
329 Result
SharedMemoryDataConsumerHandle::endRead(size_t read_size
) {
331 return reader_
->endRead(read_size
);
334 void SharedMemoryDataConsumerHandle::registerClient(Client
* client
) {
336 reader_
= ObtainReader(client
);
339 void SharedMemoryDataConsumerHandle::unregisterClient() {
343 void SharedMemoryDataConsumerHandle::LockImplicitly() {
345 base::AutoLock
lock(context_
->lock());
347 DCHECK(context_
->IsReaderBoundToCurrentThread());
351 reader_
= ObtainReader(nullptr);
354 void SharedMemoryDataConsumerHandle::UnlockImplicitly() {
355 bool needs_unlock
= false;
357 base::AutoLock
lock(context_
->lock());
359 DCHECK(context_
->IsReaderBoundToCurrentThread());
368 } // namespace content