Ignore non-active fullscreen windows for shelf state.
[chromium-blink-merge.git] / content / browser / byte_stream.cc
blob7b0f9fb0f9a792d5e1b6c21714879fba80743115
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/byte_stream.h"
7 #include <deque>
8 #include <set>
9 #include <utility>
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
16 namespace content {
17 namespace {
19 typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>, size_t> >
20 ContentVector;
22 class ByteStreamReaderImpl;
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence. We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
31 public:
32 LifetimeFlag() : is_alive(true) { }
33 bool is_alive;
35 protected:
36 friend class base::RefCountedThreadSafe<LifetimeFlag>;
37 virtual ~LifetimeFlag() { }
39 private:
40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag);
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl : public ByteStreamWriter {
47 public:
48 ByteStreamWriterImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
49 scoped_refptr<LifetimeFlag> lifetime_flag,
50 size_t buffer_size);
51 virtual ~ByteStreamWriterImpl();
53 // Must be called before any operations are performed.
54 void SetPeer(ByteStreamReaderImpl* peer,
55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
56 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
58 // Overridden from ByteStreamWriter.
59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer,
60 size_t byte_count) OVERRIDE;
61 virtual void Flush() OVERRIDE;
62 virtual void Close(int status) OVERRIDE;
63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE;
64 virtual size_t GetTotalBufferedBytes() const OVERRIDE;
66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
67 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
68 ByteStreamWriterImpl* target,
69 size_t bytes_consumed);
71 private:
72 // Called from UpdateWindow when object existence has been validated.
73 void UpdateWindowInternal(size_t bytes_consumed);
75 void PostToPeer(bool complete, int status);
77 const size_t total_buffer_size_;
79 // All data objects in this class are only valid to access on
80 // this task runner except as otherwise noted.
81 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
83 // True while this object is alive.
84 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
86 base::Closure space_available_callback_;
87 ContentVector input_contents_;
88 size_t input_contents_size_;
90 // ** Peer information.
92 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
94 // How much we've sent to the output that for flow control purposes we
95 // must assume hasn't been read yet.
96 size_t output_size_used_;
98 // Only valid to access on peer_task_runner_.
99 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
101 // Only valid to access on peer_task_runner_ if
102 // |*peer_lifetime_flag_ == true|
103 ByteStreamReaderImpl* peer_;
106 class ByteStreamReaderImpl : public ByteStreamReader {
107 public:
108 ByteStreamReaderImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
109 scoped_refptr<LifetimeFlag> lifetime_flag,
110 size_t buffer_size);
111 virtual ~ByteStreamReaderImpl();
113 // Must be called before any operations are performed.
114 void SetPeer(ByteStreamWriterImpl* peer,
115 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
116 scoped_refptr<LifetimeFlag> peer_lifetime_flag);
118 // Overridden from ByteStreamReader.
119 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data,
120 size_t* length) OVERRIDE;
121 virtual int GetStatus() const OVERRIDE;
122 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE;
124 // PostTask target from |ByteStreamWriterImpl::Write| and
125 // |ByteStreamWriterImpl::Close|.
126 // Receive data from our peer.
127 // static because it may be called after the object it is targeting
128 // has been destroyed. It may not access |*target|
129 // if |*object_lifetime_flag| is false.
130 static void TransferData(
131 scoped_refptr<LifetimeFlag> object_lifetime_flag,
132 ByteStreamReaderImpl* target,
133 scoped_ptr<ContentVector> transfer_buffer,
134 size_t transfer_buffer_bytes,
135 bool source_complete,
136 int status);
138 private:
139 // Called from TransferData once object existence has been validated.
140 void TransferDataInternal(
141 scoped_ptr<ContentVector> transfer_buffer,
142 size_t transfer_buffer_bytes,
143 bool source_complete,
144 int status);
146 void MaybeUpdateInput();
148 const size_t total_buffer_size_;
150 scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
152 // True while this object is alive.
153 scoped_refptr<LifetimeFlag> my_lifetime_flag_;
155 ContentVector available_contents_;
157 bool received_status_;
158 int status_;
160 base::Closure data_available_callback_;
162 // Time of last point at which data in stream transitioned from full
163 // to non-full. Nulled when a callback is sent.
164 base::Time last_non_full_time_;
166 // ** Peer information
168 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
170 // How much has been removed from this class that we haven't told
171 // the input about yet.
172 size_t unreported_consumed_bytes_;
174 // Only valid to access on peer_task_runner_.
175 scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
177 // Only valid to access on peer_task_runner_ if
178 // |*peer_lifetime_flag_ == true|
179 ByteStreamWriterImpl* peer_;
182 ByteStreamWriterImpl::ByteStreamWriterImpl(
183 scoped_refptr<base::SequencedTaskRunner> task_runner,
184 scoped_refptr<LifetimeFlag> lifetime_flag,
185 size_t buffer_size)
186 : total_buffer_size_(buffer_size),
187 my_task_runner_(task_runner),
188 my_lifetime_flag_(lifetime_flag),
189 input_contents_size_(0),
190 output_size_used_(0),
191 peer_(NULL) {
192 DCHECK(my_lifetime_flag_.get());
193 my_lifetime_flag_->is_alive = true;
196 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
197 my_lifetime_flag_->is_alive = false;
200 void ByteStreamWriterImpl::SetPeer(
201 ByteStreamReaderImpl* peer,
202 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
203 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
204 peer_ = peer;
205 peer_task_runner_ = peer_task_runner;
206 peer_lifetime_flag_ = peer_lifetime_flag;
209 bool ByteStreamWriterImpl::Write(
210 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
211 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
213 // Check overflow.
215 // TODO(tyoshino): Discuss with content/browser/download developer and if
216 // they're fine with, set smaller limit and make it configurable.
217 size_t space_limit = std::numeric_limits<size_t>::max() -
218 GetTotalBufferedBytes();
219 if (byte_count > space_limit) {
220 // TODO(tyoshino): Tell the user that Write() failed.
221 // Ignore input.
222 return false;
225 input_contents_.push_back(std::make_pair(buffer, byte_count));
226 input_contents_size_ += byte_count;
228 // Arbitrarily, we buffer to a third of the total size before sending.
229 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending)
230 PostToPeer(false, 0);
232 return GetTotalBufferedBytes() <= total_buffer_size_;
235 void ByteStreamWriterImpl::Flush() {
236 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
237 if (input_contents_size_ > 0)
238 PostToPeer(false, 0);
241 void ByteStreamWriterImpl::Close(int status) {
242 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
243 PostToPeer(true, status);
246 void ByteStreamWriterImpl::RegisterCallback(
247 const base::Closure& source_callback) {
248 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
249 space_available_callback_ = source_callback;
252 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
253 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
254 // This sum doesn't overflow since Write() fails if this sum is going to
255 // overflow.
256 return input_contents_size_ + output_size_used_;
259 // static
260 void ByteStreamWriterImpl::UpdateWindow(
261 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target,
262 size_t bytes_consumed) {
263 // If the target object isn't alive anymore, we do nothing.
264 if (!lifetime_flag->is_alive) return;
266 target->UpdateWindowInternal(bytes_consumed);
269 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) {
270 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
272 bool was_above_limit = GetTotalBufferedBytes() > total_buffer_size_;
274 DCHECK_GE(output_size_used_, bytes_consumed);
275 output_size_used_ -= bytes_consumed;
277 // Callback if we were above the limit and we're now <= to it.
278 bool no_longer_above_limit = GetTotalBufferedBytes() <= total_buffer_size_;
280 if (no_longer_above_limit && was_above_limit &&
281 !space_available_callback_.is_null())
282 space_available_callback_.Run();
285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) {
286 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
287 // Valid contexts in which to call.
288 DCHECK(complete || 0 != input_contents_size_);
290 scoped_ptr<ContentVector> transfer_buffer;
291 size_t buffer_size = 0;
292 if (0 != input_contents_size_) {
293 transfer_buffer.reset(new ContentVector);
294 transfer_buffer->swap(input_contents_);
295 buffer_size = input_contents_size_;
296 output_size_used_ += input_contents_size_;
297 input_contents_size_ = 0;
299 peer_task_runner_->PostTask(
300 FROM_HERE, base::Bind(
301 &ByteStreamReaderImpl::TransferData,
302 peer_lifetime_flag_,
303 peer_,
304 base::Passed(&transfer_buffer),
305 buffer_size,
306 complete,
307 status));
310 ByteStreamReaderImpl::ByteStreamReaderImpl(
311 scoped_refptr<base::SequencedTaskRunner> task_runner,
312 scoped_refptr<LifetimeFlag> lifetime_flag,
313 size_t buffer_size)
314 : total_buffer_size_(buffer_size),
315 my_task_runner_(task_runner),
316 my_lifetime_flag_(lifetime_flag),
317 received_status_(false),
318 status_(0),
319 unreported_consumed_bytes_(0),
320 peer_(NULL) {
321 DCHECK(my_lifetime_flag_.get());
322 my_lifetime_flag_->is_alive = true;
325 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
326 my_lifetime_flag_->is_alive = false;
329 void ByteStreamReaderImpl::SetPeer(
330 ByteStreamWriterImpl* peer,
331 scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
332 scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
333 peer_ = peer;
334 peer_task_runner_ = peer_task_runner;
335 peer_lifetime_flag_ = peer_lifetime_flag;
338 ByteStreamReaderImpl::StreamState
339 ByteStreamReaderImpl::Read(scoped_refptr<net::IOBuffer>* data,
340 size_t* length) {
341 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
343 if (available_contents_.size()) {
344 *data = available_contents_.front().first;
345 *length = available_contents_.front().second;
346 available_contents_.pop_front();
347 unreported_consumed_bytes_ += *length;
349 MaybeUpdateInput();
350 return STREAM_HAS_DATA;
352 if (received_status_) {
353 return STREAM_COMPLETE;
355 return STREAM_EMPTY;
358 int ByteStreamReaderImpl::GetStatus() const {
359 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
360 DCHECK(received_status_);
361 return status_;
364 void ByteStreamReaderImpl::RegisterCallback(
365 const base::Closure& sink_callback) {
366 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
368 data_available_callback_ = sink_callback;
371 // static
372 void ByteStreamReaderImpl::TransferData(
373 scoped_refptr<LifetimeFlag> object_lifetime_flag,
374 ByteStreamReaderImpl* target,
375 scoped_ptr<ContentVector> transfer_buffer,
376 size_t buffer_size,
377 bool source_complete,
378 int status) {
379 // If our target is no longer alive, do nothing.
380 if (!object_lifetime_flag->is_alive) return;
382 target->TransferDataInternal(
383 transfer_buffer.Pass(), buffer_size, source_complete, status);
386 void ByteStreamReaderImpl::TransferDataInternal(
387 scoped_ptr<ContentVector> transfer_buffer,
388 size_t buffer_size,
389 bool source_complete,
390 int status) {
391 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
393 bool was_empty = available_contents_.empty();
395 if (transfer_buffer) {
396 available_contents_.insert(available_contents_.end(),
397 transfer_buffer->begin(),
398 transfer_buffer->end());
401 if (source_complete) {
402 received_status_ = true;
403 status_ = status;
406 // Callback on transition from empty to non-empty, or
407 // source complete.
408 if (((was_empty && !available_contents_.empty()) ||
409 source_complete) &&
410 !data_available_callback_.is_null())
411 data_available_callback_.Run();
414 // Decide whether or not to send the input a window update.
415 // Currently we do that whenever we've got unreported consumption
416 // greater than 1/3 of total size.
417 void ByteStreamReaderImpl::MaybeUpdateInput() {
418 DCHECK(my_task_runner_->RunsTasksOnCurrentThread());
420 if (unreported_consumed_bytes_ <=
421 total_buffer_size_ / kFractionReadBeforeWindowUpdate)
422 return;
424 peer_task_runner_->PostTask(
425 FROM_HERE, base::Bind(
426 &ByteStreamWriterImpl::UpdateWindow,
427 peer_lifetime_flag_,
428 peer_,
429 unreported_consumed_bytes_));
430 unreported_consumed_bytes_ = 0;
433 } // namespace
435 const int ByteStreamWriter::kFractionBufferBeforeSending = 3;
436 const int ByteStreamReader::kFractionReadBeforeWindowUpdate = 3;
438 ByteStreamReader::~ByteStreamReader() { }
440 ByteStreamWriter::~ByteStreamWriter() { }
442 void CreateByteStream(
443 scoped_refptr<base::SequencedTaskRunner> input_task_runner,
444 scoped_refptr<base::SequencedTaskRunner> output_task_runner,
445 size_t buffer_size,
446 scoped_ptr<ByteStreamWriter>* input,
447 scoped_ptr<ByteStreamReader>* output) {
448 scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
449 scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
451 ByteStreamWriterImpl* in = new ByteStreamWriterImpl(
452 input_task_runner, input_flag, buffer_size);
453 ByteStreamReaderImpl* out = new ByteStreamReaderImpl(
454 output_task_runner, output_flag, buffer_size);
456 in->SetPeer(out, output_task_runner, output_flag);
457 out->SetPeer(in, input_task_runner, input_flag);
458 input->reset(in);
459 output->reset(out);
462 } // namespace content