Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / chrome / browser / chromeos / drive / drive_file_stream_reader.cc
blob1437236dd7806c8fd7176aafc0d6b32e38da7e95
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "chrome/browser/chromeos/drive/drive_file_stream_reader.h"
7 #include <algorithm>
8 #include <cstring>
10 #include "base/callback_helpers.h"
11 #include "base/logging.h"
12 #include "base/sequenced_task_runner.h"
13 #include "components/drive/drive.pb.h"
14 #include "components/drive/file_system_interface.h"
15 #include "components/drive/local_file_reader.h"
16 #include "content/public/browser/browser_thread.h"
17 #include "google_apis/drive/task_util.h"
18 #include "net/base/io_buffer.h"
19 #include "net/base/net_errors.h"
20 #include "net/http/http_byte_range.h"
22 using content::BrowserThread;
24 namespace drive {
25 namespace {
27 // Converts FileError code to net::Error code.
28 int FileErrorToNetError(FileError error) {
29 return net::FileErrorToNetError(FileErrorToBaseFileError(error));
32 // Computes the concrete |start| offset and the |length| of |range| in a file
33 // of |total| size.
35 // This is a thin wrapper of HttpByteRange::ComputeBounds, extended to allow
36 // an empty range at the end of the file, like "Range: bytes 0-" on a zero byte
37 // file. This is for convenience in unifying implementation with the seek
38 // operation of stream reader. HTTP doesn't allow such ranges but we want to
39 // treat such seeking as valid.
40 bool ComputeConcretePosition(net::HttpByteRange range, int64 total,
41 int64* start, int64* length) {
42 // The special case when empty range in the end of the file is selected.
43 if (range.HasFirstBytePosition() && range.first_byte_position() == total) {
44 *start = range.first_byte_position();
45 *length = 0;
46 return true;
49 // Otherwise forward to HttpByteRange::ComputeBounds.
50 if (!range.ComputeBounds(total))
51 return false;
52 *start = range.first_byte_position();
53 *length = range.last_byte_position() - range.first_byte_position() + 1;
54 return true;
57 } // namespace
59 namespace internal {
60 namespace {
62 // Copies the content in |pending_data| into |buffer| at most
63 // |buffer_length| bytes, and erases the copied data from
64 // |pending_data|. Returns the number of copied bytes.
65 int ReadInternal(ScopedVector<std::string>* pending_data,
66 net::IOBuffer* buffer, int buffer_length) {
67 size_t index = 0;
68 int offset = 0;
69 for (; index < pending_data->size() && offset < buffer_length; ++index) {
70 const std::string& chunk = *(*pending_data)[index];
71 DCHECK(!chunk.empty());
73 size_t bytes_to_read = std::min(
74 chunk.size(), static_cast<size_t>(buffer_length - offset));
75 std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read);
76 offset += bytes_to_read;
77 if (bytes_to_read < chunk.size()) {
78 // The chunk still has some remaining data.
79 // So remove leading (copied) bytes, and quit the loop so that
80 // the remaining data won't be deleted in the following erase().
81 (*pending_data)[index]->erase(0, bytes_to_read);
82 break;
86 // Consume the copied data.
87 pending_data->erase(pending_data->begin(), pending_data->begin() + index);
89 return offset;
92 } // namespace
94 LocalReaderProxy::LocalReaderProxy(
95 scoped_ptr<util::LocalFileReader> file_reader, int64 length)
96 : file_reader_(file_reader.Pass()),
97 remaining_length_(length),
98 weak_ptr_factory_(this) {
99 DCHECK(file_reader_);
102 LocalReaderProxy::~LocalReaderProxy() {
105 int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
106 const net::CompletionCallback& callback) {
107 DCHECK(thread_checker_.CalledOnValidThread());
108 DCHECK(file_reader_);
110 if (buffer_length > remaining_length_) {
111 // Here, narrowing is safe.
112 buffer_length = static_cast<int>(remaining_length_);
115 if (!buffer_length)
116 return 0;
118 file_reader_->Read(buffer, buffer_length,
119 base::Bind(&LocalReaderProxy::OnReadCompleted,
120 weak_ptr_factory_.GetWeakPtr(), callback));
121 return net::ERR_IO_PENDING;
124 void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
125 // This method should never be called, because no data should be received
126 // from the network during the reading of local-cache file.
127 NOTREACHED();
130 void LocalReaderProxy::OnCompleted(FileError error) {
131 // If this method is called, no network error should be happened.
132 DCHECK_EQ(FILE_ERROR_OK, error);
135 void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback,
136 int read_result) {
137 DCHECK(thread_checker_.CalledOnValidThread());
138 DCHECK(file_reader_);
140 if (read_result >= 0) {
141 // |read_result| bytes data is read.
142 DCHECK_LE(read_result, remaining_length_);
143 remaining_length_ -= read_result;
144 } else {
145 // An error occurs. Close the |file_reader_|.
146 file_reader_.reset();
148 callback.Run(read_result);
151 NetworkReaderProxy::NetworkReaderProxy(
152 int64 offset,
153 int64 content_length,
154 int64 full_content_length,
155 const base::Closure& job_canceller)
156 : remaining_offset_(offset),
157 remaining_content_length_(content_length),
158 is_full_download_(offset + content_length == full_content_length),
159 error_code_(net::OK),
160 buffer_length_(0),
161 job_canceller_(job_canceller) {
164 NetworkReaderProxy::~NetworkReaderProxy() {
165 if (!job_canceller_.is_null()) {
166 job_canceller_.Run();
170 int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length,
171 const net::CompletionCallback& callback) {
172 DCHECK(thread_checker_.CalledOnValidThread());
173 // Check if there is no pending Read operation.
174 DCHECK(!buffer_.get());
175 DCHECK_EQ(buffer_length_, 0);
176 DCHECK(callback_.is_null());
177 // Validate the arguments.
178 DCHECK(buffer);
179 DCHECK_GT(buffer_length, 0);
180 DCHECK(!callback.is_null());
182 if (error_code_ != net::OK) {
183 // An error is already found. Return it immediately.
184 return error_code_;
187 if (remaining_content_length_ == 0) {
188 // If no more data, return immediately.
189 return 0;
192 if (buffer_length > remaining_content_length_) {
193 // Here, narrowing cast should be safe.
194 buffer_length = static_cast<int>(remaining_content_length_);
197 if (pending_data_.empty()) {
198 // No data is available. Keep the arguments, and return pending status.
199 buffer_ = buffer;
200 buffer_length_ = buffer_length;
201 callback_ = callback;
202 return net::ERR_IO_PENDING;
205 int result = ReadInternal(&pending_data_, buffer, buffer_length);
206 remaining_content_length_ -= result;
207 DCHECK_GE(remaining_content_length_, 0);
209 // Although OnCompleted() should reset |job_canceller_| when download is done,
210 // due to timing issues the ReaderProxy instance may be destructed before the
211 // notification. To fix the case we reset here earlier.
212 if (is_full_download_ && remaining_content_length_ == 0)
213 job_canceller_.Reset();
215 return result;
218 void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) {
219 DCHECK(thread_checker_.CalledOnValidThread());
220 DCHECK(data && !data->empty());
222 if (remaining_offset_ >= static_cast<int64>(data->length())) {
223 // Skip unneeded leading data.
224 remaining_offset_ -= data->length();
225 return;
228 if (remaining_offset_ > 0) {
229 // Erase unnecessary leading bytes.
230 data->erase(0, static_cast<size_t>(remaining_offset_));
231 remaining_offset_ = 0;
234 pending_data_.push_back(data.release());
235 if (!buffer_.get()) {
236 // No pending Read operation.
237 return;
240 int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_);
241 remaining_content_length_ -= result;
242 DCHECK_GE(remaining_content_length_, 0);
244 if (is_full_download_ && remaining_content_length_ == 0)
245 job_canceller_.Reset();
247 buffer_ = NULL;
248 buffer_length_ = 0;
249 DCHECK(!callback_.is_null());
250 base::ResetAndReturn(&callback_).Run(result);
253 void NetworkReaderProxy::OnCompleted(FileError error) {
254 DCHECK(thread_checker_.CalledOnValidThread());
255 // The downloading is completed, so we do not need to cancel the job
256 // in the destructor.
257 job_canceller_.Reset();
259 if (error == FILE_ERROR_OK) {
260 return;
263 error_code_ = FileErrorToNetError(error);
264 pending_data_.clear();
266 if (callback_.is_null()) {
267 // No pending Read operation.
268 return;
271 buffer_ = NULL;
272 buffer_length_ = 0;
273 base::ResetAndReturn(&callback_).Run(error_code_);
276 } // namespace internal
278 namespace {
280 // Calls FileSystemInterface::GetFileContent if the file system
281 // is available. If not, the |completion_callback| is invoked with
282 // FILE_ERROR_FAILED.
283 base::Closure GetFileContentOnUIThread(
284 const DriveFileStreamReader::FileSystemGetter& file_system_getter,
285 const base::FilePath& drive_file_path,
286 const GetFileContentInitializedCallback& initialized_callback,
287 const google_apis::GetContentCallback& get_content_callback,
288 const FileOperationCallback& completion_callback) {
289 DCHECK_CURRENTLY_ON(BrowserThread::UI);
291 FileSystemInterface* file_system = file_system_getter.Run();
292 if (!file_system) {
293 completion_callback.Run(FILE_ERROR_FAILED);
294 return base::Closure();
297 return google_apis::CreateRelayCallback(
298 file_system->GetFileContent(drive_file_path,
299 initialized_callback,
300 get_content_callback,
301 completion_callback));
304 // Helper to run FileSystemInterface::GetFileContent on UI thread.
305 void GetFileContent(
306 const DriveFileStreamReader::FileSystemGetter& file_system_getter,
307 const base::FilePath& drive_file_path,
308 const GetFileContentInitializedCallback& initialized_callback,
309 const google_apis::GetContentCallback& get_content_callback,
310 const FileOperationCallback& completion_callback,
311 const base::Callback<void(const base::Closure&)>& reply_callback) {
312 DCHECK_CURRENTLY_ON(BrowserThread::IO);
314 BrowserThread::PostTaskAndReplyWithResult(
315 BrowserThread::UI,
316 FROM_HERE,
317 base::Bind(&GetFileContentOnUIThread,
318 file_system_getter,
319 drive_file_path,
320 google_apis::CreateRelayCallback(initialized_callback),
321 google_apis::CreateRelayCallback(get_content_callback),
322 google_apis::CreateRelayCallback(completion_callback)),
323 reply_callback);
326 } // namespace
328 DriveFileStreamReader::DriveFileStreamReader(
329 const FileSystemGetter& file_system_getter,
330 base::SequencedTaskRunner* file_task_runner)
331 : file_system_getter_(file_system_getter),
332 file_task_runner_(file_task_runner),
333 weak_ptr_factory_(this) {
336 DriveFileStreamReader::~DriveFileStreamReader() {
339 bool DriveFileStreamReader::IsInitialized() const {
340 DCHECK(thread_checker_.CalledOnValidThread());
341 return reader_proxy_.get() != NULL;
344 void DriveFileStreamReader::Initialize(
345 const base::FilePath& drive_file_path,
346 const net::HttpByteRange& byte_range,
347 const InitializeCompletionCallback& callback) {
348 DCHECK(thread_checker_.CalledOnValidThread());
349 DCHECK(!callback.is_null());
351 GetFileContent(
352 file_system_getter_,
353 drive_file_path,
354 base::Bind(&DriveFileStreamReader
355 ::InitializeAfterGetFileContentInitialized,
356 weak_ptr_factory_.GetWeakPtr(),
357 byte_range,
358 callback),
359 base::Bind(&DriveFileStreamReader::OnGetContent,
360 weak_ptr_factory_.GetWeakPtr()),
361 base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion,
362 weak_ptr_factory_.GetWeakPtr(),
363 callback),
364 base::Bind(&DriveFileStreamReader::StoreCancelDownloadClosure,
365 weak_ptr_factory_.GetWeakPtr()));
368 int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length,
369 const net::CompletionCallback& callback) {
370 DCHECK(thread_checker_.CalledOnValidThread());
371 DCHECK(reader_proxy_);
372 DCHECK(buffer);
373 DCHECK(!callback.is_null());
374 return reader_proxy_->Read(buffer, buffer_length, callback);
377 void DriveFileStreamReader::StoreCancelDownloadClosure(
378 const base::Closure& cancel_download_closure) {
379 DCHECK(thread_checker_.CalledOnValidThread());
380 cancel_download_closure_ = cancel_download_closure;
383 void DriveFileStreamReader::InitializeAfterGetFileContentInitialized(
384 const net::HttpByteRange& byte_range,
385 const InitializeCompletionCallback& callback,
386 FileError error,
387 const base::FilePath& local_cache_file_path,
388 scoped_ptr<ResourceEntry> entry) {
389 DCHECK(thread_checker_.CalledOnValidThread());
390 // StoreCancelDownloadClosure() should be called before this function.
391 DCHECK(!cancel_download_closure_.is_null());
393 if (error != FILE_ERROR_OK) {
394 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
395 return;
397 DCHECK(entry);
399 int64 range_start = 0, range_length = 0;
400 if (!ComputeConcretePosition(byte_range, entry->file_info().size(),
401 &range_start, &range_length)) {
402 // If |byte_range| is invalid (e.g. out of bounds), return with an error.
403 // At the same time, we cancel the in-flight downloading operation if
404 // needed and and invalidate weak pointers so that we won't
405 // receive unwanted callbacks.
406 cancel_download_closure_.Run();
407 weak_ptr_factory_.InvalidateWeakPtrs();
408 callback.Run(
409 net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>());
410 return;
413 if (local_cache_file_path.empty()) {
414 // The file is not cached, and being downloaded.
415 reader_proxy_.reset(
416 new internal::NetworkReaderProxy(
417 range_start, range_length,
418 entry->file_info().size(), cancel_download_closure_));
419 callback.Run(net::OK, entry.Pass());
420 return;
423 // Otherwise, open the stream for file.
424 scoped_ptr<util::LocalFileReader> file_reader(
425 new util::LocalFileReader(file_task_runner_.get()));
426 util::LocalFileReader* file_reader_ptr = file_reader.get();
427 file_reader_ptr->Open(
428 local_cache_file_path,
429 range_start,
430 base::Bind(
431 &DriveFileStreamReader::InitializeAfterLocalFileOpen,
432 weak_ptr_factory_.GetWeakPtr(),
433 range_length,
434 callback,
435 base::Passed(&entry),
436 base::Passed(&file_reader)));
439 void DriveFileStreamReader::InitializeAfterLocalFileOpen(
440 int64 length,
441 const InitializeCompletionCallback& callback,
442 scoped_ptr<ResourceEntry> entry,
443 scoped_ptr<util::LocalFileReader> file_reader,
444 int open_result) {
445 DCHECK(thread_checker_.CalledOnValidThread());
447 if (open_result != net::OK) {
448 callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>());
449 return;
452 reader_proxy_.reset(
453 new internal::LocalReaderProxy(file_reader.Pass(), length));
454 callback.Run(net::OK, entry.Pass());
457 void DriveFileStreamReader::OnGetContent(
458 google_apis::DriveApiErrorCode error_code,
459 scoped_ptr<std::string> data) {
460 DCHECK(thread_checker_.CalledOnValidThread());
461 DCHECK(reader_proxy_);
462 reader_proxy_->OnGetContent(data.Pass());
465 void DriveFileStreamReader::OnGetFileContentCompletion(
466 const InitializeCompletionCallback& callback,
467 FileError error) {
468 DCHECK(thread_checker_.CalledOnValidThread());
470 if (reader_proxy_) {
471 // If the proxy object available, send the error to it.
472 reader_proxy_->OnCompleted(error);
473 } else {
474 // Here the proxy object is not yet available.
475 // There are two cases. 1) Some error happens during the initialization.
476 // 2) the cache file is found, but the proxy object is not *yet*
477 // initialized because the file is being opened.
478 // We are interested in 1) only. The callback for 2) will be called
479 // after opening the file is completed.
480 // Note: due to the same reason, LocalReaderProxy::OnCompleted may
481 // or may not be called. This is timing issue, and it is difficult to avoid
482 // unfortunately.
483 if (error != FILE_ERROR_OK) {
484 callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>());
489 } // namespace drive