1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/byte_stream.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
19 typedef std::deque
<std::pair
<scoped_refptr
<net::IOBuffer
>, size_t> >
22 class ByteStreamReaderImpl
;
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence. We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag
: public base::RefCountedThreadSafe
<LifetimeFlag
> {
32 LifetimeFlag() : is_alive(true) { }
36 friend class base::RefCountedThreadSafe
<LifetimeFlag
>;
37 virtual ~LifetimeFlag() { }
40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag
);
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl
: public ByteStreamWriter
{
48 ByteStreamWriterImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
49 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
51 virtual ~ByteStreamWriterImpl();
53 // Must be called before any operations are performed.
54 void SetPeer(ByteStreamReaderImpl
* peer
,
55 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
56 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
58 // Overridden from ByteStreamWriter.
59 virtual bool Write(scoped_refptr
<net::IOBuffer
> buffer
,
60 size_t byte_count
) OVERRIDE
;
61 virtual void Flush() OVERRIDE
;
62 virtual void Close(int status
) OVERRIDE
;
63 virtual void RegisterCallback(const base::Closure
& source_callback
) OVERRIDE
;
64 virtual size_t GetTotalBufferedBytes() const OVERRIDE
;
66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67 static void UpdateWindow(scoped_refptr
<LifetimeFlag
> lifetime_flag
,
68 ByteStreamWriterImpl
* target
,
69 size_t bytes_consumed
);
72 // Called from UpdateWindow when object existence has been validated.
73 void UpdateWindowInternal(size_t bytes_consumed
);
75 void PostToPeer(bool complete
, int status
);
77 const size_t total_buffer_size_
;
79 // All data objects in this class are only valid to access on
80 // this task runner except as otherwise noted.
81 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
83 // True while this object is alive.
84 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
86 base::Closure space_available_callback_
;
87 ContentVector input_contents_
;
88 size_t input_contents_size_
;
90 // ** Peer information.
92 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
94 // How much we've sent to the output that for flow control purposes we
95 // must assume hasn't been read yet.
96 size_t output_size_used_
;
98 // Only valid to access on peer_task_runner_.
99 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
101 // Only valid to access on peer_task_runner_ if
102 // |*peer_lifetime_flag_ == true|
103 ByteStreamReaderImpl
* peer_
;
106 class ByteStreamReaderImpl
: public ByteStreamReader
{
108 ByteStreamReaderImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
109 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
111 virtual ~ByteStreamReaderImpl();
113 // Must be called before any operations are performed.
114 void SetPeer(ByteStreamWriterImpl
* peer
,
115 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
116 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
118 // Overridden from ByteStreamReader.
119 virtual StreamState
Read(scoped_refptr
<net::IOBuffer
>* data
,
120 size_t* length
) OVERRIDE
;
121 virtual int GetStatus() const OVERRIDE
;
122 virtual void RegisterCallback(const base::Closure
& sink_callback
) OVERRIDE
;
124 // PostTask target from |ByteStreamWriterImpl::Write| and
125 // |ByteStreamWriterImpl::Close|.
126 // Receive data from our peer.
127 // static because it may be called after the object it is targeting
128 // has been destroyed. It may not access |*target|
129 // if |*object_lifetime_flag| is false.
130 static void TransferData(
131 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
132 ByteStreamReaderImpl
* target
,
133 scoped_ptr
<ContentVector
> transfer_buffer
,
134 size_t transfer_buffer_bytes
,
135 bool source_complete
,
139 // Called from TransferData once object existence has been validated.
140 void TransferDataInternal(
141 scoped_ptr
<ContentVector
> transfer_buffer
,
142 size_t transfer_buffer_bytes
,
143 bool source_complete
,
146 void MaybeUpdateInput();
148 const size_t total_buffer_size_
;
150 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
152 // True while this object is alive.
153 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
155 ContentVector available_contents_
;
157 bool received_status_
;
160 base::Closure data_available_callback_
;
162 // Time of last point at which data in stream transitioned from full
163 // to non-full. Nulled when a callback is sent.
164 base::Time last_non_full_time_
;
166 // ** Peer information
168 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
170 // How much has been removed from this class that we haven't told
171 // the input about yet.
172 size_t unreported_consumed_bytes_
;
174 // Only valid to access on peer_task_runner_.
175 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
177 // Only valid to access on peer_task_runner_ if
178 // |*peer_lifetime_flag_ == true|
179 ByteStreamWriterImpl
* peer_
;
182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
184 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
186 : total_buffer_size_(buffer_size
),
187 my_task_runner_(task_runner
),
188 my_lifetime_flag_(lifetime_flag
),
189 input_contents_size_(0),
190 output_size_used_(0),
192 DCHECK(my_lifetime_flag_
.get());
193 my_lifetime_flag_
->is_alive
= true;
196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
198 // before we start using it. Once started, should be deleted on the specified
200 my_lifetime_flag_
->is_alive
= false;
203 void ByteStreamWriterImpl::SetPeer(
204 ByteStreamReaderImpl
* peer
,
205 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
206 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
208 peer_task_runner_
= peer_task_runner
;
209 peer_lifetime_flag_
= peer_lifetime_flag
;
212 bool ByteStreamWriterImpl::Write(
213 scoped_refptr
<net::IOBuffer
> buffer
, size_t byte_count
) {
214 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
218 // TODO(tyoshino): Discuss with content/browser/download developer and if
219 // they're fine with, set smaller limit and make it configurable.
220 size_t space_limit
= std::numeric_limits
<size_t>::max() -
221 GetTotalBufferedBytes();
222 if (byte_count
> space_limit
) {
223 // TODO(tyoshino): Tell the user that Write() failed.
228 input_contents_
.push_back(std::make_pair(buffer
, byte_count
));
229 input_contents_size_
+= byte_count
;
231 // Arbitrarily, we buffer to a third of the total size before sending.
232 if (input_contents_size_
> total_buffer_size_
/ kFractionBufferBeforeSending
)
233 PostToPeer(false, 0);
235 return GetTotalBufferedBytes() <= total_buffer_size_
;
238 void ByteStreamWriterImpl::Flush() {
239 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
240 if (input_contents_size_
> 0)
241 PostToPeer(false, 0);
244 void ByteStreamWriterImpl::Close(int status
) {
245 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
246 PostToPeer(true, status
);
249 void ByteStreamWriterImpl::RegisterCallback(
250 const base::Closure
& source_callback
) {
251 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
252 space_available_callback_
= source_callback
;
255 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
256 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
257 // This sum doesn't overflow since Write() fails if this sum is going to
259 return input_contents_size_
+ output_size_used_
;
263 void ByteStreamWriterImpl::UpdateWindow(
264 scoped_refptr
<LifetimeFlag
> lifetime_flag
, ByteStreamWriterImpl
* target
,
265 size_t bytes_consumed
) {
266 // If the target object isn't alive anymore, we do nothing.
267 if (!lifetime_flag
->is_alive
) return;
269 target
->UpdateWindowInternal(bytes_consumed
);
272 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed
) {
273 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
275 bool was_above_limit
= GetTotalBufferedBytes() > total_buffer_size_
;
277 DCHECK_GE(output_size_used_
, bytes_consumed
);
278 output_size_used_
-= bytes_consumed
;
280 // Callback if we were above the limit and we're now <= to it.
281 bool no_longer_above_limit
= GetTotalBufferedBytes() <= total_buffer_size_
;
283 if (no_longer_above_limit
&& was_above_limit
&&
284 !space_available_callback_
.is_null())
285 space_available_callback_
.Run();
288 void ByteStreamWriterImpl::PostToPeer(bool complete
, int status
) {
289 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
290 // Valid contexts in which to call.
291 DCHECK(complete
|| 0 != input_contents_size_
);
293 scoped_ptr
<ContentVector
> transfer_buffer
;
294 size_t buffer_size
= 0;
295 if (0 != input_contents_size_
) {
296 transfer_buffer
.reset(new ContentVector
);
297 transfer_buffer
->swap(input_contents_
);
298 buffer_size
= input_contents_size_
;
299 output_size_used_
+= input_contents_size_
;
300 input_contents_size_
= 0;
302 peer_task_runner_
->PostTask(
303 FROM_HERE
, base::Bind(
304 &ByteStreamReaderImpl::TransferData
,
307 base::Passed(&transfer_buffer
),
313 ByteStreamReaderImpl::ByteStreamReaderImpl(
314 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
315 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
317 : total_buffer_size_(buffer_size
),
318 my_task_runner_(task_runner
),
319 my_lifetime_flag_(lifetime_flag
),
320 received_status_(false),
322 unreported_consumed_bytes_(0),
324 DCHECK(my_lifetime_flag_
.get());
325 my_lifetime_flag_
->is_alive
= true;
328 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
329 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
330 // before we start using it. Once started, should be deleted on the specified
332 my_lifetime_flag_
->is_alive
= false;
335 void ByteStreamReaderImpl::SetPeer(
336 ByteStreamWriterImpl
* peer
,
337 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
338 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
340 peer_task_runner_
= peer_task_runner
;
341 peer_lifetime_flag_
= peer_lifetime_flag
;
344 ByteStreamReaderImpl::StreamState
345 ByteStreamReaderImpl::Read(scoped_refptr
<net::IOBuffer
>* data
,
347 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
349 if (available_contents_
.size()) {
350 *data
= available_contents_
.front().first
;
351 *length
= available_contents_
.front().second
;
352 available_contents_
.pop_front();
353 unreported_consumed_bytes_
+= *length
;
356 return STREAM_HAS_DATA
;
358 if (received_status_
) {
359 return STREAM_COMPLETE
;
364 int ByteStreamReaderImpl::GetStatus() const {
365 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
366 DCHECK(received_status_
);
370 void ByteStreamReaderImpl::RegisterCallback(
371 const base::Closure
& sink_callback
) {
372 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
374 data_available_callback_
= sink_callback
;
378 void ByteStreamReaderImpl::TransferData(
379 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
380 ByteStreamReaderImpl
* target
,
381 scoped_ptr
<ContentVector
> transfer_buffer
,
383 bool source_complete
,
385 // If our target is no longer alive, do nothing.
386 if (!object_lifetime_flag
->is_alive
) return;
388 target
->TransferDataInternal(
389 transfer_buffer
.Pass(), buffer_size
, source_complete
, status
);
392 void ByteStreamReaderImpl::TransferDataInternal(
393 scoped_ptr
<ContentVector
> transfer_buffer
,
395 bool source_complete
,
397 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
399 bool was_empty
= available_contents_
.empty();
401 if (transfer_buffer
) {
402 available_contents_
.insert(available_contents_
.end(),
403 transfer_buffer
->begin(),
404 transfer_buffer
->end());
407 if (source_complete
) {
408 received_status_
= true;
412 // Callback on transition from empty to non-empty, or
414 if (((was_empty
&& !available_contents_
.empty()) ||
416 !data_available_callback_
.is_null())
417 data_available_callback_
.Run();
420 // Decide whether or not to send the input a window update.
421 // Currently we do that whenever we've got unreported consumption
422 // greater than 1/3 of total size.
423 void ByteStreamReaderImpl::MaybeUpdateInput() {
424 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
426 if (unreported_consumed_bytes_
<=
427 total_buffer_size_
/ kFractionReadBeforeWindowUpdate
)
430 peer_task_runner_
->PostTask(
431 FROM_HERE
, base::Bind(
432 &ByteStreamWriterImpl::UpdateWindow
,
435 unreported_consumed_bytes_
));
436 unreported_consumed_bytes_
= 0;
441 const int ByteStreamWriter::kFractionBufferBeforeSending
= 3;
442 const int ByteStreamReader::kFractionReadBeforeWindowUpdate
= 3;
444 ByteStreamReader::~ByteStreamReader() { }
446 ByteStreamWriter::~ByteStreamWriter() { }
448 void CreateByteStream(
449 scoped_refptr
<base::SequencedTaskRunner
> input_task_runner
,
450 scoped_refptr
<base::SequencedTaskRunner
> output_task_runner
,
452 scoped_ptr
<ByteStreamWriter
>* input
,
453 scoped_ptr
<ByteStreamReader
>* output
) {
454 scoped_refptr
<LifetimeFlag
> input_flag(new LifetimeFlag());
455 scoped_refptr
<LifetimeFlag
> output_flag(new LifetimeFlag());
457 ByteStreamWriterImpl
* in
= new ByteStreamWriterImpl(
458 input_task_runner
, input_flag
, buffer_size
);
459 ByteStreamReaderImpl
* out
= new ByteStreamReaderImpl(
460 output_task_runner
, output_flag
, buffer_size
);
462 in
->SetPeer(out
, output_task_runner
, output_flag
);
463 out
->SetPeer(in
, input_task_runner
, input_flag
);
468 } // namespace content