1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/base/upload_data_stream.h"
7 #include "base/logging.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/base/upload_bytes_element_reader.h"
11 #include "net/base/upload_element_reader.h"
15 UploadDataStream::UploadDataStream(
16 ScopedVector
<UploadElementReader
> element_readers
,
18 : element_readers_(element_readers
.Pass()),
22 identifier_(identifier
),
24 last_chunk_appended_(false),
26 initialized_successfully_(false),
27 weak_ptr_factory_(this) {
30 UploadDataStream::UploadDataStream(Chunked
/*chunked*/, int64 identifier
)
34 identifier_(identifier
),
36 last_chunk_appended_(false),
38 initialized_successfully_(false),
39 weak_ptr_factory_(this) {
42 UploadDataStream::~UploadDataStream() {
45 UploadDataStream
* UploadDataStream::CreateWithReader(
46 scoped_ptr
<UploadElementReader
> reader
,
48 ScopedVector
<UploadElementReader
> readers
;
49 readers
.push_back(reader
.release());
50 return new UploadDataStream(readers
.Pass(), identifier
);
53 int UploadDataStream::Init(const CompletionCallback
& callback
) {
55 return InitInternal(0, callback
);
58 int UploadDataStream::Read(IOBuffer
* buf
,
60 const CompletionCallback
& callback
) {
61 DCHECK(initialized_successfully_
);
62 DCHECK_GT(buf_len
, 0);
63 return ReadInternal(new DrainableIOBuffer(buf
, buf_len
), callback
);
66 bool UploadDataStream::IsEOF() const {
67 DCHECK(initialized_successfully_
);
69 return current_position_
== total_size_
;
71 // If the upload data is chunked, check if the last chunk is appended and all
72 // elements are consumed.
73 return element_index_
== element_readers_
.size() && last_chunk_appended_
;
76 bool UploadDataStream::IsInMemory() const {
77 // Chunks are in memory, but UploadData does not have all the chunks at
78 // once. Chunks are provided progressively with AppendChunk() as chunks
79 // are ready. Check is_chunked_ here, rather than relying on the loop
80 // below, as there is a case that is_chunked_ is set to true, but the
81 // first chunk is not yet delivered.
85 for (size_t i
= 0; i
< element_readers_
.size(); ++i
) {
86 if (!element_readers_
[i
]->IsInMemory())
92 void UploadDataStream::AppendChunk(const char* bytes
,
96 DCHECK(!last_chunk_appended_
);
97 last_chunk_appended_
= is_last_chunk
;
99 // Initialize a reader for the newly appended chunk. We leave |total_size_| at
100 // zero, since for chunked uploads, we may not know the total size.
101 std::vector
<char> data(bytes
, bytes
+ bytes_len
);
102 UploadElementReader
* reader
= new UploadOwnedBytesElementReader(&data
);
103 const int rv
= reader
->Init(net::CompletionCallback());
105 element_readers_
.push_back(reader
);
107 // Resume pending read.
108 if (!pending_chunked_read_callback_
.is_null()) {
109 base::Closure callback
= pending_chunked_read_callback_
;
110 pending_chunked_read_callback_
.Reset();
115 void UploadDataStream::Reset() {
116 weak_ptr_factory_
.InvalidateWeakPtrs();
117 pending_chunked_read_callback_
.Reset();
118 initialized_successfully_
= false;
119 read_failed_
= false;
120 current_position_
= 0;
125 int UploadDataStream::InitInternal(int start_index
,
126 const CompletionCallback
& callback
) {
127 DCHECK(!initialized_successfully_
);
129 // Call Init() for all elements.
130 for (size_t i
= start_index
; i
< element_readers_
.size(); ++i
) {
131 UploadElementReader
* reader
= element_readers_
[i
];
132 // When new_result is ERR_IO_PENDING, InitInternal() will be called
133 // with start_index == i + 1 when reader->Init() finishes.
134 const int result
= reader
->Init(
135 base::Bind(&UploadDataStream::ResumePendingInit
,
136 weak_ptr_factory_
.GetWeakPtr(),
140 DCHECK(result
!= ERR_IO_PENDING
|| !callback
.is_null());
145 // Finalize initialization.
147 uint64 total_size
= 0;
148 for (size_t i
= 0; i
< element_readers_
.size(); ++i
) {
149 UploadElementReader
* reader
= element_readers_
[i
];
150 total_size
+= reader
->GetContentLength();
152 total_size_
= total_size
;
154 initialized_successfully_
= true;
158 void UploadDataStream::ResumePendingInit(int start_index
,
159 const CompletionCallback
& callback
,
160 int previous_result
) {
161 DCHECK(!initialized_successfully_
);
162 DCHECK(!callback
.is_null());
163 DCHECK_NE(ERR_IO_PENDING
, previous_result
);
165 // Check the last result.
166 if (previous_result
!= OK
) {
167 callback
.Run(previous_result
);
171 const int result
= InitInternal(start_index
, callback
);
172 if (result
!= ERR_IO_PENDING
)
173 callback
.Run(result
);
176 int UploadDataStream::ReadInternal(scoped_refptr
<DrainableIOBuffer
> buf
,
177 const CompletionCallback
& callback
) {
178 DCHECK(initialized_successfully_
);
180 while (!read_failed_
&& element_index_
< element_readers_
.size()) {
181 UploadElementReader
* reader
= element_readers_
[element_index_
];
183 if (reader
->BytesRemaining() == 0) {
188 if (buf
->BytesRemaining() == 0)
191 int result
= reader
->Read(
193 buf
->BytesRemaining(),
194 base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead
),
195 weak_ptr_factory_
.GetWeakPtr(),
198 if (result
== ERR_IO_PENDING
) {
199 DCHECK(!callback
.is_null());
200 return ERR_IO_PENDING
;
202 ProcessReadResult(buf
, result
);
206 // Chunked transfers may only contain byte readers, so cannot have read
208 DCHECK(!is_chunked_
);
210 // If an error occured during read operation, then pad with zero.
211 // Otherwise the server will hang waiting for the rest of the data.
212 const int num_bytes_to_fill
=
213 std::min(static_cast<uint64
>(buf
->BytesRemaining()),
214 size() - position() - buf
->BytesConsumed());
215 DCHECK_LE(0, num_bytes_to_fill
);
216 memset(buf
->data(), 0, num_bytes_to_fill
);
217 buf
->DidConsume(num_bytes_to_fill
);
220 const int bytes_copied
= buf
->BytesConsumed();
221 current_position_
+= bytes_copied
;
222 DCHECK(is_chunked_
|| total_size_
>= current_position_
);
224 if (is_chunked_
&& !IsEOF() && bytes_copied
== 0) {
225 DCHECK(!callback
.is_null());
226 DCHECK(pending_chunked_read_callback_
.is_null());
227 pending_chunked_read_callback_
=
228 base::Bind(&UploadDataStream::ResumePendingRead
,
229 weak_ptr_factory_
.GetWeakPtr(),
233 return ERR_IO_PENDING
;
236 // Returning 0 is allowed only when IsEOF() == true.
237 DCHECK(bytes_copied
!= 0 || IsEOF());
241 void UploadDataStream::ResumePendingRead(scoped_refptr
<DrainableIOBuffer
> buf
,
242 const CompletionCallback
& callback
,
243 int previous_result
) {
244 DCHECK(!callback
.is_null());
246 ProcessReadResult(buf
, previous_result
);
248 const int result
= ReadInternal(buf
, callback
);
249 if (result
!= ERR_IO_PENDING
)
250 callback
.Run(result
);
253 void UploadDataStream::ProcessReadResult(scoped_refptr
<DrainableIOBuffer
> buf
,
255 DCHECK_NE(ERR_IO_PENDING
, result
);
256 DCHECK(!read_failed_
);
259 buf
->DidConsume(result
);