[Media Router] Add integration tests and e2e tests for media router and presentation...
[chromium-blink-merge.git] / chrome / browser / media_galleries / fileapi / readahead_file_stream_reader.cc
blobd91f39d720fa69f4684a0f5924b18eaf4d992354
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/media_galleries/fileapi/readahead_file_stream_reader.h"
7 #include <algorithm>
9 #include "base/message_loop/message_loop.h"
10 #include "base/numerics/safe_conversions.h"
11 #include "net/base/io_buffer.h"
12 #include "net/base/net_errors.h"
14 using storage::FileStreamReader;
16 namespace {
18 const size_t kDesiredNumberOfBuffers = 2; // So we are always one buffer ahead.
19 const int kBufferSize = 1024*1024; // 1MB to minimize transaction costs.
21 } // namespace
23 ReadaheadFileStreamReader::ReadaheadFileStreamReader(FileStreamReader* source)
24 : source_(source),
25 source_error_(0),
26 source_has_pending_read_(false),
27 weak_factory_(this) {
30 ReadaheadFileStreamReader::~ReadaheadFileStreamReader() {}
32 int ReadaheadFileStreamReader::Read(
33 net::IOBuffer* buf, int buf_len, const net::CompletionCallback& callback) {
34 DCHECK(!pending_sink_buffer_.get());
35 DCHECK(pending_read_callback_.is_null());
37 ReadFromSourceIfNeeded();
39 scoped_refptr<net::DrainableIOBuffer> sink =
40 new net::DrainableIOBuffer(buf, buf_len);
41 int result = FinishReadFromCacheOrStoredError(sink.get());
43 // We are waiting for an source read to complete, so save the request.
44 if (result == net::ERR_IO_PENDING) {
45 DCHECK(!pending_sink_buffer_.get());
46 DCHECK(pending_read_callback_.is_null());
47 pending_sink_buffer_ = sink;
48 pending_read_callback_ = callback;
51 return result;
54 int64 ReadaheadFileStreamReader::GetLength(
55 const net::Int64CompletionCallback& callback) {
56 return source_->GetLength(callback);
59 int ReadaheadFileStreamReader::FinishReadFromCacheOrStoredError(
60 net::DrainableIOBuffer* sink) {
61 // If we don't have any ready cache, return the pending read code, or
62 // the stored error code.
63 if (buffers_.empty()) {
64 if (source_.get()) {
65 DCHECK(source_has_pending_read_);
66 return net::ERR_IO_PENDING;
67 } else {
68 return source_error_;
72 while (sink->BytesRemaining() > 0 && !buffers_.empty()) {
73 net::DrainableIOBuffer* source_buffer = buffers_.front().get();
75 DCHECK(source_buffer->BytesRemaining() > 0);
77 int copy_len = std::min(source_buffer->BytesRemaining(),
78 sink->BytesRemaining());
79 std::copy(source_buffer->data(), source_buffer->data() + copy_len,
80 sink->data());
82 source_buffer->DidConsume(copy_len);
83 sink->DidConsume(copy_len);
85 if (source_buffer->BytesRemaining() == 0) {
86 buffers_.pop();
88 // Get a new buffer to replace the one we just used up.
89 ReadFromSourceIfNeeded();
93 return sink->BytesConsumed();
96 void ReadaheadFileStreamReader::ReadFromSourceIfNeeded() {
97 if (!source_.get() || source_has_pending_read_ ||
98 buffers_.size() >= kDesiredNumberOfBuffers) {
99 return;
102 source_has_pending_read_ = true;
104 scoped_refptr<net::IOBuffer> buf(new net::IOBuffer(kBufferSize));
105 int result = source_->Read(
106 buf.get(),
107 kBufferSize,
108 base::Bind(&ReadaheadFileStreamReader::OnFinishReadFromSource,
109 weak_factory_.GetWeakPtr(),
110 buf));
112 if (result != net::ERR_IO_PENDING)
113 OnFinishReadFromSource(buf.get(), result);
116 void ReadaheadFileStreamReader::OnFinishReadFromSource(net::IOBuffer* buf,
117 int result) {
118 DCHECK(result != net::ERR_IO_PENDING);
119 DCHECK(source_has_pending_read_);
120 source_has_pending_read_ = false;
122 // Either store the data read from |source_|, or store the error code.
123 if (result > 0) {
124 scoped_refptr<net::DrainableIOBuffer> drainable_buffer(
125 new net::DrainableIOBuffer(buf, result));
126 buffers_.push(drainable_buffer);
127 ReadFromSourceIfNeeded();
128 } else {
129 source_.reset();
130 source_error_ = result;
133 // If there's a read request waiting for the source FileStreamReader to
134 // finish reading, fulfill that request now from the cache or stored error.
135 if (pending_sink_buffer_.get()) {
136 DCHECK(!pending_read_callback_.is_null());
138 // Free the pending callback before running it, as the callback often
139 // dispatches another read.
140 scoped_refptr<net::DrainableIOBuffer> sink = pending_sink_buffer_;
141 pending_sink_buffer_ = NULL;
142 net::CompletionCallback completion_callback = pending_read_callback_;
143 pending_read_callback_.Reset();
145 completion_callback.Run(FinishReadFromCacheOrStoredError(sink.get()));