1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/byte_stream.h"
8 #include "base/location.h"
9 #include "base/memory/ref_counted.h"
10 #include "base/memory/weak_ptr.h"
11 #include "base/sequenced_task_runner.h"
16 typedef std::deque
<std::pair
<scoped_refptr
<net::IOBuffer
>, size_t> >
19 class ByteStreamReaderImpl
;
21 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
22 // cleared in an object destructor and accessed to check for object
23 // existence. We can't use weak pointers because they're tightly tied to
24 // threads rather than task runners.
25 // TODO(rdsmith): A better solution would be extending weak pointers
26 // to support SequencedTaskRunners.
27 struct LifetimeFlag
: public base::RefCountedThreadSafe
<LifetimeFlag
> {
29 LifetimeFlag() : is_alive(true) { }
33 friend class base::RefCountedThreadSafe
<LifetimeFlag
>;
34 virtual ~LifetimeFlag() { }
37 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag
);
40 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
41 // SetPeer may happen anywhere; all other operations on each class must
42 // happen in the context of their SequencedTaskRunner.
43 class ByteStreamWriterImpl
: public ByteStreamWriter
{
45 ByteStreamWriterImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
46 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
48 virtual ~ByteStreamWriterImpl();
50 // Must be called before any operations are performed.
51 void SetPeer(ByteStreamReaderImpl
* peer
,
52 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
53 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
55 // Overridden from ByteStreamWriter.
56 virtual bool Write(scoped_refptr
<net::IOBuffer
> buffer
,
57 size_t byte_count
) OVERRIDE
;
58 virtual void Close(DownloadInterruptReason status
) OVERRIDE
;
59 virtual void RegisterCallback(const base::Closure
& source_callback
) OVERRIDE
;
61 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
62 static void UpdateWindow(scoped_refptr
<LifetimeFlag
> lifetime_flag
,
63 ByteStreamWriterImpl
* target
,
64 size_t bytes_consumed
);
67 // Called from UpdateWindow when object existence has been validated.
68 void UpdateWindowInternal(size_t bytes_consumed
);
70 void PostToPeer(bool complete
, DownloadInterruptReason status
);
72 const size_t total_buffer_size_
;
74 // All data objects in this class are only valid to access on
75 // this task runner except as otherwise noted.
76 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
78 // True while this object is alive.
79 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
81 base::Closure space_available_callback_
;
82 ContentVector input_contents_
;
83 size_t input_contents_size_
;
85 // ** Peer information.
87 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
89 // How much we've sent to the output that for flow control purposes we
90 // must assume hasn't been read yet.
91 size_t output_size_used_
;
93 // Only valid to access on peer_task_runner_.
94 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
96 // Only valid to access on peer_task_runner_ if
97 // |*peer_lifetime_flag_ == true|
98 ByteStreamReaderImpl
* peer_
;
101 class ByteStreamReaderImpl
: public ByteStreamReader
{
103 ByteStreamReaderImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
104 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
106 virtual ~ByteStreamReaderImpl();
108 // Must be called before any operations are performed.
109 void SetPeer(ByteStreamWriterImpl
* peer
,
110 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
111 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
113 // Overridden from ByteStreamReader.
114 virtual StreamState
Read(scoped_refptr
<net::IOBuffer
>* data
,
115 size_t* length
) OVERRIDE
;
116 virtual DownloadInterruptReason
GetStatus() const OVERRIDE
;
117 virtual void RegisterCallback(const base::Closure
& sink_callback
) OVERRIDE
;
119 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and
120 // |ByteStreamWriterImpl::Close|.
121 // Receive data from our peer.
122 // static because it may be called after the object it is targeting
123 // has been destroyed. It may not access |*target|
124 // if |*object_lifetime_flag| is false.
125 static void TransferData(
126 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
127 ByteStreamReaderImpl
* target
,
128 scoped_ptr
<ContentVector
> transfer_buffer
,
129 size_t transfer_buffer_bytes
,
130 bool source_complete
,
131 DownloadInterruptReason status
);
134 // Called from TransferData once object existence has been validated.
135 void TransferDataInternal(
136 scoped_ptr
<ContentVector
> transfer_buffer
,
137 size_t transfer_buffer_bytes
,
138 bool source_complete
,
139 DownloadInterruptReason status
);
141 void MaybeUpdateInput();
143 const size_t total_buffer_size_
;
145 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
147 // True while this object is alive.
148 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
150 ContentVector available_contents_
;
152 bool received_status_
;
153 DownloadInterruptReason status_
;
155 base::Closure data_available_callback_
;
157 // Time of last point at which data in stream transitioned from full
158 // to non-full. Nulled when a callback is sent.
159 base::Time last_non_full_time_
;
161 // ** Peer information
163 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
165 // How much has been removed from this class that we haven't told
166 // the input about yet.
167 size_t unreported_consumed_bytes_
;
169 // Only valid to access on peer_task_runner_.
170 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
172 // Only valid to access on peer_task_runner_ if
173 // |*peer_lifetime_flag_ == true|
174 ByteStreamWriterImpl
* peer_
;
177 ByteStreamWriterImpl::ByteStreamWriterImpl(
178 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
179 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
181 : total_buffer_size_(buffer_size
),
182 my_task_runner_(task_runner
),
183 my_lifetime_flag_(lifetime_flag
),
184 input_contents_size_(0),
185 output_size_used_(0),
187 DCHECK(my_lifetime_flag_
.get());
188 my_lifetime_flag_
->is_alive
= true;
191 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
192 my_lifetime_flag_
->is_alive
= false;
195 void ByteStreamWriterImpl::SetPeer(
196 ByteStreamReaderImpl
* peer
,
197 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
198 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
200 peer_task_runner_
= peer_task_runner
;
201 peer_lifetime_flag_
= peer_lifetime_flag
;
204 bool ByteStreamWriterImpl::Write(
205 scoped_refptr
<net::IOBuffer
> buffer
, size_t byte_count
) {
206 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
208 input_contents_
.push_back(std::make_pair(buffer
, byte_count
));
209 input_contents_size_
+= byte_count
;
211 // Arbitrarily, we buffer to a third of the total size before sending.
212 if (input_contents_size_
> total_buffer_size_
/ kFractionBufferBeforeSending
)
213 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE
);
215 return (input_contents_size_
+ output_size_used_
<= total_buffer_size_
);
218 void ByteStreamWriterImpl::Close(
219 DownloadInterruptReason status
) {
220 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
221 PostToPeer(true, status
);
224 void ByteStreamWriterImpl::RegisterCallback(
225 const base::Closure
& source_callback
) {
226 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
227 space_available_callback_
= source_callback
;
231 void ByteStreamWriterImpl::UpdateWindow(
232 scoped_refptr
<LifetimeFlag
> lifetime_flag
, ByteStreamWriterImpl
* target
,
233 size_t bytes_consumed
) {
234 // If the target object isn't alive anymore, we do nothing.
235 if (!lifetime_flag
->is_alive
) return;
237 target
->UpdateWindowInternal(bytes_consumed
);
240 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed
) {
241 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
242 DCHECK_GE(output_size_used_
, bytes_consumed
);
243 output_size_used_
-= bytes_consumed
;
245 // Callback if we were above the limit and we're now <= to it.
246 size_t total_known_size_used
=
247 input_contents_size_
+ output_size_used_
;
249 if (total_known_size_used
<= total_buffer_size_
&&
250 (total_known_size_used
+ bytes_consumed
> total_buffer_size_
) &&
251 !space_available_callback_
.is_null())
252 space_available_callback_
.Run();
255 void ByteStreamWriterImpl::PostToPeer(
256 bool complete
, DownloadInterruptReason status
) {
257 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
258 // Valid contexts in which to call.
259 DCHECK(complete
|| 0 != input_contents_size_
);
261 scoped_ptr
<ContentVector
> transfer_buffer(new ContentVector
);
262 size_t buffer_size
= 0;
263 if (0 != input_contents_size_
) {
264 transfer_buffer
.reset(new ContentVector
);
265 transfer_buffer
->swap(input_contents_
);
266 buffer_size
= input_contents_size_
;
267 output_size_used_
+= input_contents_size_
;
268 input_contents_size_
= 0;
270 peer_task_runner_
->PostTask(
271 FROM_HERE
, base::Bind(
272 &ByteStreamReaderImpl::TransferData
,
275 base::Passed(&transfer_buffer
),
281 ByteStreamReaderImpl::ByteStreamReaderImpl(
282 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
283 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
285 : total_buffer_size_(buffer_size
),
286 my_task_runner_(task_runner
),
287 my_lifetime_flag_(lifetime_flag
),
288 received_status_(false),
289 status_(DOWNLOAD_INTERRUPT_REASON_NONE
),
290 unreported_consumed_bytes_(0),
292 DCHECK(my_lifetime_flag_
.get());
293 my_lifetime_flag_
->is_alive
= true;
296 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
297 my_lifetime_flag_
->is_alive
= false;
300 void ByteStreamReaderImpl::SetPeer(
301 ByteStreamWriterImpl
* peer
,
302 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
303 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
305 peer_task_runner_
= peer_task_runner
;
306 peer_lifetime_flag_
= peer_lifetime_flag
;
309 ByteStreamReaderImpl::StreamState
310 ByteStreamReaderImpl::Read(scoped_refptr
<net::IOBuffer
>* data
,
312 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
314 if (available_contents_
.size()) {
315 *data
= available_contents_
.front().first
;
316 *length
= available_contents_
.front().second
;
317 available_contents_
.pop_front();
318 unreported_consumed_bytes_
+= *length
;
321 return STREAM_HAS_DATA
;
323 if (received_status_
) {
324 return STREAM_COMPLETE
;
329 DownloadInterruptReason
ByteStreamReaderImpl::GetStatus() const {
330 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
331 DCHECK(received_status_
);
335 void ByteStreamReaderImpl::RegisterCallback(
336 const base::Closure
& sink_callback
) {
337 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
339 data_available_callback_
= sink_callback
;
343 void ByteStreamReaderImpl::TransferData(
344 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
345 ByteStreamReaderImpl
* target
,
346 scoped_ptr
<ContentVector
> transfer_buffer
,
348 bool source_complete
,
349 DownloadInterruptReason status
) {
350 // If our target is no longer alive, do nothing.
351 if (!object_lifetime_flag
->is_alive
) return;
353 target
->TransferDataInternal(
354 transfer_buffer
.Pass(), buffer_size
, source_complete
, status
);
357 void ByteStreamReaderImpl::TransferDataInternal(
358 scoped_ptr
<ContentVector
> transfer_buffer
,
360 bool source_complete
,
361 DownloadInterruptReason status
) {
362 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
364 bool was_empty
= available_contents_
.empty();
366 if (transfer_buffer
) {
367 available_contents_
.insert(available_contents_
.end(),
368 transfer_buffer
->begin(),
369 transfer_buffer
->end());
372 if (source_complete
) {
373 received_status_
= true;
377 // Callback on transition from empty to non-empty, or
379 if (((was_empty
&& !available_contents_
.empty()) ||
381 !data_available_callback_
.is_null())
382 data_available_callback_
.Run();
385 // Decide whether or not to send the input a window update.
386 // Currently we do that whenever we've got unreported consumption
387 // greater than 1/3 of total size.
388 void ByteStreamReaderImpl::MaybeUpdateInput() {
389 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
391 if (unreported_consumed_bytes_
<=
392 total_buffer_size_
/ kFractionReadBeforeWindowUpdate
)
395 peer_task_runner_
->PostTask(
396 FROM_HERE
, base::Bind(
397 &ByteStreamWriterImpl::UpdateWindow
,
400 unreported_consumed_bytes_
));
401 unreported_consumed_bytes_
= 0;
407 const int ByteStreamWriter::kFractionBufferBeforeSending
= 3;
408 const int ByteStreamReader::kFractionReadBeforeWindowUpdate
= 3;
410 ByteStreamReader::~ByteStreamReader() { }
412 ByteStreamWriter::~ByteStreamWriter() { }
414 void CreateByteStream(
415 scoped_refptr
<base::SequencedTaskRunner
> input_task_runner
,
416 scoped_refptr
<base::SequencedTaskRunner
> output_task_runner
,
418 scoped_ptr
<ByteStreamWriter
>* input
,
419 scoped_ptr
<ByteStreamReader
>* output
) {
420 scoped_refptr
<LifetimeFlag
> input_flag(new LifetimeFlag());
421 scoped_refptr
<LifetimeFlag
> output_flag(new LifetimeFlag());
423 ByteStreamWriterImpl
* in
= new ByteStreamWriterImpl(
424 input_task_runner
, input_flag
, buffer_size
);
425 ByteStreamReaderImpl
* out
= new ByteStreamReaderImpl(
426 output_task_runner
, output_flag
, buffer_size
);
428 in
->SetPeer(out
, output_task_runner
, output_flag
);
429 out
->SetPeer(in
, input_task_runner
, input_flag
);
434 } // namespace content