1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/byte_stream.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
19 typedef std::deque
<std::pair
<scoped_refptr
<net::IOBuffer
>, size_t> >
22 class ByteStreamReaderImpl
;
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence. We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag
: public base::RefCountedThreadSafe
<LifetimeFlag
> {
32 LifetimeFlag() : is_alive(true) { }
36 friend class base::RefCountedThreadSafe
<LifetimeFlag
>;
37 virtual ~LifetimeFlag() { }
40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag
);
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl
: public ByteStreamWriter
{
48 ByteStreamWriterImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
49 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
51 virtual ~ByteStreamWriterImpl();
53 // Must be called before any operations are performed.
54 void SetPeer(ByteStreamReaderImpl
* peer
,
55 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
56 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
58 // Overridden from ByteStreamWriter.
59 virtual bool Write(scoped_refptr
<net::IOBuffer
> buffer
,
60 size_t byte_count
) OVERRIDE
;
61 virtual void Flush() OVERRIDE
;
62 virtual void Close(int status
) OVERRIDE
;
63 virtual void RegisterCallback(const base::Closure
& source_callback
) OVERRIDE
;
64 virtual size_t GetTotalBufferedBytes() const OVERRIDE
;
66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67 static void UpdateWindow(scoped_refptr
<LifetimeFlag
> lifetime_flag
,
68 ByteStreamWriterImpl
* target
,
69 size_t bytes_consumed
);
72 // Called from UpdateWindow when object existence has been validated.
73 void UpdateWindowInternal(size_t bytes_consumed
);
75 void PostToPeer(bool complete
, int status
);
77 const size_t total_buffer_size_
;
79 // All data objects in this class are only valid to access on
80 // this task runner except as otherwise noted.
81 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
83 // True while this object is alive.
84 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
86 base::Closure space_available_callback_
;
87 ContentVector input_contents_
;
88 size_t input_contents_size_
;
90 // ** Peer information.
92 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
94 // How much we've sent to the output that for flow control purposes we
95 // must assume hasn't been read yet.
96 size_t output_size_used_
;
98 // Only valid to access on peer_task_runner_.
99 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
101 // Only valid to access on peer_task_runner_ if
102 // |*peer_lifetime_flag_ == true|
103 ByteStreamReaderImpl
* peer_
;
106 class ByteStreamReaderImpl
: public ByteStreamReader
{
108 ByteStreamReaderImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
109 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
111 virtual ~ByteStreamReaderImpl();
113 // Must be called before any operations are performed.
114 void SetPeer(ByteStreamWriterImpl
* peer
,
115 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
116 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
118 // Overridden from ByteStreamReader.
119 virtual StreamState
Read(scoped_refptr
<net::IOBuffer
>* data
,
120 size_t* length
) OVERRIDE
;
121 virtual int GetStatus() const OVERRIDE
;
122 virtual void RegisterCallback(const base::Closure
& sink_callback
) OVERRIDE
;
124 // PostTask target from |ByteStreamWriterImpl::Write| and
125 // |ByteStreamWriterImpl::Close|.
126 // Receive data from our peer.
127 // static because it may be called after the object it is targeting
128 // has been destroyed. It may not access |*target|
129 // if |*object_lifetime_flag| is false.
130 static void TransferData(
131 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
132 ByteStreamReaderImpl
* target
,
133 scoped_ptr
<ContentVector
> transfer_buffer
,
134 size_t transfer_buffer_bytes
,
135 bool source_complete
,
139 // Called from TransferData once object existence has been validated.
140 void TransferDataInternal(
141 scoped_ptr
<ContentVector
> transfer_buffer
,
142 size_t transfer_buffer_bytes
,
143 bool source_complete
,
146 void MaybeUpdateInput();
148 const size_t total_buffer_size_
;
150 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
152 // True while this object is alive.
153 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
155 ContentVector available_contents_
;
157 bool received_status_
;
160 base::Closure data_available_callback_
;
162 // Time of last point at which data in stream transitioned from full
163 // to non-full. Nulled when a callback is sent.
164 base::Time last_non_full_time_
;
166 // ** Peer information
168 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
170 // How much has been removed from this class that we haven't told
171 // the input about yet.
172 size_t unreported_consumed_bytes_
;
174 // Only valid to access on peer_task_runner_.
175 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
177 // Only valid to access on peer_task_runner_ if
178 // |*peer_lifetime_flag_ == true|
179 ByteStreamWriterImpl
* peer_
;
182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
184 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
186 : total_buffer_size_(buffer_size
),
187 my_task_runner_(task_runner
),
188 my_lifetime_flag_(lifetime_flag
),
189 input_contents_size_(0),
190 output_size_used_(0),
192 DCHECK(my_lifetime_flag_
.get());
193 my_lifetime_flag_
->is_alive
= true;
196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197 my_lifetime_flag_
->is_alive
= false;
200 void ByteStreamWriterImpl::SetPeer(
201 ByteStreamReaderImpl
* peer
,
202 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
203 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
205 peer_task_runner_
= peer_task_runner
;
206 peer_lifetime_flag_
= peer_lifetime_flag
;
209 bool ByteStreamWriterImpl::Write(
210 scoped_refptr
<net::IOBuffer
> buffer
, size_t byte_count
) {
211 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
215 // TODO(tyoshino): Discuss with content/browser/download developer and if
216 // they're fine with, set smaller limit and make it configurable.
217 size_t space_limit
= std::numeric_limits
<size_t>::max() -
218 GetTotalBufferedBytes();
219 if (byte_count
> space_limit
) {
220 // TODO(tyoshino): Tell the user that Write() failed.
225 input_contents_
.push_back(std::make_pair(buffer
, byte_count
));
226 input_contents_size_
+= byte_count
;
228 // Arbitrarily, we buffer to a third of the total size before sending.
229 if (input_contents_size_
> total_buffer_size_
/ kFractionBufferBeforeSending
)
230 PostToPeer(false, 0);
232 return GetTotalBufferedBytes() <= total_buffer_size_
;
235 void ByteStreamWriterImpl::Flush() {
236 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
237 if (input_contents_size_
> 0)
238 PostToPeer(false, 0);
241 void ByteStreamWriterImpl::Close(int status
) {
242 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
243 PostToPeer(true, status
);
246 void ByteStreamWriterImpl::RegisterCallback(
247 const base::Closure
& source_callback
) {
248 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
249 space_available_callback_
= source_callback
;
252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
253 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
254 // This sum doesn't overflow since Write() fails if this sum is going to
256 return input_contents_size_
+ output_size_used_
;
260 void ByteStreamWriterImpl::UpdateWindow(
261 scoped_refptr
<LifetimeFlag
> lifetime_flag
, ByteStreamWriterImpl
* target
,
262 size_t bytes_consumed
) {
263 // If the target object isn't alive anymore, we do nothing.
264 if (!lifetime_flag
->is_alive
) return;
266 target
->UpdateWindowInternal(bytes_consumed
);
269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed
) {
270 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
272 bool was_above_limit
= GetTotalBufferedBytes() > total_buffer_size_
;
274 DCHECK_GE(output_size_used_
, bytes_consumed
);
275 output_size_used_
-= bytes_consumed
;
277 // Callback if we were above the limit and we're now <= to it.
278 bool no_longer_above_limit
= GetTotalBufferedBytes() <= total_buffer_size_
;
280 if (no_longer_above_limit
&& was_above_limit
&&
281 !space_available_callback_
.is_null())
282 space_available_callback_
.Run();
285 void ByteStreamWriterImpl::PostToPeer(bool complete
, int status
) {
286 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
287 // Valid contexts in which to call.
288 DCHECK(complete
|| 0 != input_contents_size_
);
290 scoped_ptr
<ContentVector
> transfer_buffer
;
291 size_t buffer_size
= 0;
292 if (0 != input_contents_size_
) {
293 transfer_buffer
.reset(new ContentVector
);
294 transfer_buffer
->swap(input_contents_
);
295 buffer_size
= input_contents_size_
;
296 output_size_used_
+= input_contents_size_
;
297 input_contents_size_
= 0;
299 peer_task_runner_
->PostTask(
300 FROM_HERE
, base::Bind(
301 &ByteStreamReaderImpl::TransferData
,
304 base::Passed(&transfer_buffer
),
310 ByteStreamReaderImpl::ByteStreamReaderImpl(
311 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
312 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
314 : total_buffer_size_(buffer_size
),
315 my_task_runner_(task_runner
),
316 my_lifetime_flag_(lifetime_flag
),
317 received_status_(false),
319 unreported_consumed_bytes_(0),
321 DCHECK(my_lifetime_flag_
.get());
322 my_lifetime_flag_
->is_alive
= true;
325 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
326 my_lifetime_flag_
->is_alive
= false;
329 void ByteStreamReaderImpl::SetPeer(
330 ByteStreamWriterImpl
* peer
,
331 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
332 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
334 peer_task_runner_
= peer_task_runner
;
335 peer_lifetime_flag_
= peer_lifetime_flag
;
338 ByteStreamReaderImpl::StreamState
339 ByteStreamReaderImpl::Read(scoped_refptr
<net::IOBuffer
>* data
,
341 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
343 if (available_contents_
.size()) {
344 *data
= available_contents_
.front().first
;
345 *length
= available_contents_
.front().second
;
346 available_contents_
.pop_front();
347 unreported_consumed_bytes_
+= *length
;
350 return STREAM_HAS_DATA
;
352 if (received_status_
) {
353 return STREAM_COMPLETE
;
358 int ByteStreamReaderImpl::GetStatus() const {
359 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
360 DCHECK(received_status_
);
364 void ByteStreamReaderImpl::RegisterCallback(
365 const base::Closure
& sink_callback
) {
366 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
368 data_available_callback_
= sink_callback
;
372 void ByteStreamReaderImpl::TransferData(
373 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
374 ByteStreamReaderImpl
* target
,
375 scoped_ptr
<ContentVector
> transfer_buffer
,
377 bool source_complete
,
379 // If our target is no longer alive, do nothing.
380 if (!object_lifetime_flag
->is_alive
) return;
382 target
->TransferDataInternal(
383 transfer_buffer
.Pass(), buffer_size
, source_complete
, status
);
386 void ByteStreamReaderImpl::TransferDataInternal(
387 scoped_ptr
<ContentVector
> transfer_buffer
,
389 bool source_complete
,
391 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
393 bool was_empty
= available_contents_
.empty();
395 if (transfer_buffer
) {
396 available_contents_
.insert(available_contents_
.end(),
397 transfer_buffer
->begin(),
398 transfer_buffer
->end());
401 if (source_complete
) {
402 received_status_
= true;
406 // Callback on transition from empty to non-empty, or
408 if (((was_empty
&& !available_contents_
.empty()) ||
410 !data_available_callback_
.is_null())
411 data_available_callback_
.Run();
414 // Decide whether or not to send the input a window update.
415 // Currently we do that whenever we've got unreported consumption
416 // greater than 1/3 of total size.
417 void ByteStreamReaderImpl::MaybeUpdateInput() {
418 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
420 if (unreported_consumed_bytes_
<=
421 total_buffer_size_
/ kFractionReadBeforeWindowUpdate
)
424 peer_task_runner_
->PostTask(
425 FROM_HERE
, base::Bind(
426 &ByteStreamWriterImpl::UpdateWindow
,
429 unreported_consumed_bytes_
));
430 unreported_consumed_bytes_
= 0;
435 const int ByteStreamWriter::kFractionBufferBeforeSending
= 3;
436 const int ByteStreamReader::kFractionReadBeforeWindowUpdate
= 3;
438 ByteStreamReader::~ByteStreamReader() { }
440 ByteStreamWriter::~ByteStreamWriter() { }
442 void CreateByteStream(
443 scoped_refptr
<base::SequencedTaskRunner
> input_task_runner
,
444 scoped_refptr
<base::SequencedTaskRunner
> output_task_runner
,
446 scoped_ptr
<ByteStreamWriter
>* input
,
447 scoped_ptr
<ByteStreamReader
>* output
) {
448 scoped_refptr
<LifetimeFlag
> input_flag(new LifetimeFlag());
449 scoped_refptr
<LifetimeFlag
> output_flag(new LifetimeFlag());
451 ByteStreamWriterImpl
* in
= new ByteStreamWriterImpl(
452 input_task_runner
, input_flag
, buffer_size
);
453 ByteStreamReaderImpl
* out
= new ByteStreamReaderImpl(
454 output_task_runner
, output_flag
, buffer_size
);
456 in
->SetPeer(out
, output_task_runner
, output_flag
);
457 out
->SetPeer(in
, input_task_runner
, input_flag
);
462 } // namespace content