1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
11 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h"
15 #include "content/public/child/fixed_received_data.h"
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData
{
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr
<RequestPeer::ReceivedData
> data
)
27 task_runner_(base::MessageLoop::current()->task_runner()) {}
28 ~DelegateThreadSafeReceivedData() override
{
29 if (!task_runner_
->BelongsToCurrentThread()) {
30 // Delete the data on the original thread.
31 task_runner_
->DeleteSoon(FROM_HERE
, data_
.release());
35 const char* payload() const override
{ return data_
->payload(); }
36 int length() const override
{ return data_
->length(); }
37 int encoded_length() const override
{ return data_
->encoded_length(); }
40 scoped_ptr
<RequestPeer::ReceivedData
> data_
;
41 scoped_refptr
<base::SingleThreadTaskRunner
> task_runner_
;
43 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData
);
48 using Result
= blink::WebDataConsumerHandle::Result
;
50 class SharedMemoryDataConsumerHandle::Context final
51 : public base::RefCountedThreadSafe
<Context
> {
57 is_reader_active_(true) {}
59 bool IsEmpty() const { return queue_
.empty(); }
61 for (auto& data
: queue_
) {
69 // Note that this function is not protected by |lock_| (actually it
70 // shouldn't be) but |notification_task_runner_| is thread-safe.
72 if (notification_task_runner_
->BelongsToCurrentThread()) {
75 notification_task_runner_
->PostTask(
76 FROM_HERE
, base::Bind(&Context::NotifyImmediately
, this));
80 RequestPeer::ThreadSafeReceivedData
* Top() { return queue_
.front(); }
81 void Push(scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data
) {
82 queue_
.push_back(data
.release());
84 size_t first_offset() const { return first_offset_
; }
85 Result
result() const { return result_
; }
86 void set_result(Result r
) { result_
= r
; }
87 Client
* client() { return client_
; }
88 void SetClient(Client
* client
) {
90 notification_task_runner_
= base::MessageLoop::current()->task_runner();
93 notification_task_runner_
= nullptr;
97 bool is_reader_active() const { return is_reader_active_
; }
98 void set_is_reader_active(bool b
) { is_reader_active_
= b
; }
99 void Consume(size_t s
) {
102 if (static_cast<size_t>(top
->length()) <= first_offset_
) {
108 base::Lock
& lock() { return lock_
; }
111 friend class base::RefCountedThreadSafe
<Context
>;
113 // This is necessary because the queue stores raw pointers.
117 void NotifyImmediately() {
118 // As we can assume that all reader-side methods are called on this
119 // thread (see WebDataConsumerHandle comments), we don't need to lock.
121 client_
->didGetReadable();
125 // |result_| stores the ultimate state of this handle if it has. Otherwise,
128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
129 // once it is allowed.
130 std::deque
<RequestPeer::ThreadSafeReceivedData
*> queue_
;
131 size_t first_offset_
;
133 scoped_refptr
<base::SingleThreadTaskRunner
> notification_task_runner_
;
134 bool is_reader_active_
;
136 DISALLOW_COPY_AND_ASSIGN(Context
);
139 SharedMemoryDataConsumerHandle::Writer::Writer(
140 const scoped_refptr
<Context
>& context
,
141 BackpressureMode mode
)
142 : context_(context
), mode_(mode
) {
145 SharedMemoryDataConsumerHandle::Writer::~Writer() {
149 void SharedMemoryDataConsumerHandle::Writer::AddData(
150 scoped_ptr
<RequestPeer::ReceivedData
> data
) {
151 if (!data
->length()) {
152 // We omit empty data.
156 bool needs_notification
= false;
158 base::AutoLock
lock(context_
->lock());
159 if (!context_
->is_reader_active()) {
160 // No one is interested in the data.
164 needs_notification
= context_
->client() && context_
->IsEmpty();
165 scoped_ptr
<RequestPeer::ThreadSafeReceivedData
> data_to_pass
;
166 if (mode_
== kApplyBackpressure
) {
168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data
.Pass()));
170 data_to_pass
= make_scoped_ptr(new FixedReceivedData(data
.get()));
172 context_
->Push(data_to_pass
.Pass());
175 if (needs_notification
)
179 void SharedMemoryDataConsumerHandle::Writer::Close() {
180 bool needs_notification
= false;
183 base::AutoLock
lock(context_
->lock());
184 if (context_
->result() == Ok
) {
185 context_
->set_result(Done
);
186 needs_notification
= context_
->client() && context_
->IsEmpty();
189 if (needs_notification
)
193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
194 BackpressureMode mode
,
195 scoped_ptr
<Writer
>* writer
)
196 : context_(new Context
) {
197 writer
->reset(new Writer(context_
, mode
));
200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
201 base::AutoLock
lock(context_
->lock());
202 context_
->set_is_reader_active(false);
206 Result
SharedMemoryDataConsumerHandle::read(void* data
,
209 size_t* read_size_to_return
) {
210 base::AutoLock
lock(context_
->lock());
212 size_t total_read_size
= 0;
213 *read_size_to_return
= 0;
214 if (context_
->result() != Ok
&& context_
->result() != Done
)
215 return context_
->result();
217 while (!context_
->IsEmpty() && total_read_size
< size
) {
218 const auto& top
= context_
->Top();
219 size_t readable
= top
->length() - context_
->first_offset();
220 size_t writable
= size
- total_read_size
;
221 size_t read_size
= std::min(readable
, writable
);
222 const char* begin
= top
->payload() + context_
->first_offset();
223 std::copy(begin
, begin
+ read_size
,
224 static_cast<char*>(data
) + total_read_size
);
225 total_read_size
+= read_size
;
226 context_
->Consume(read_size
);
228 *read_size_to_return
= total_read_size
;
229 return total_read_size
? Ok
: context_
->result() == Done
? Done
: ShouldWait
;
232 Result
SharedMemoryDataConsumerHandle::beginRead(const void** buffer
,
238 base::AutoLock
lock(context_
->lock());
240 if (context_
->result() != Ok
&& context_
->result() != Done
)
241 return context_
->result();
243 if (context_
->IsEmpty())
244 return context_
->result() == Done
? Done
: ShouldWait
;
246 const auto& top
= context_
->Top();
247 *buffer
= top
->payload() + context_
->first_offset();
248 *available
= top
->length() - context_
->first_offset();
253 Result
SharedMemoryDataConsumerHandle::endRead(size_t read_size
) {
254 base::AutoLock
lock(context_
->lock());
256 if (context_
->IsEmpty())
257 return UnexpectedError
;
259 context_
->Consume(read_size
);
263 void SharedMemoryDataConsumerHandle::registerClient(Client
* client
) {
264 bool needs_notification
= false;
266 base::AutoLock
lock(context_
->lock());
268 context_
->SetClient(client
);
269 needs_notification
= !context_
->IsEmpty();
271 if (needs_notification
)
275 void SharedMemoryDataConsumerHandle::unregisterClient() {
276 base::AutoLock
lock(context_
->lock());
278 context_
->SetClient(nullptr);
281 } // namespace content