1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/base/upload_data_stream.h"
7 #include "base/logging.h"
8 #include "net/base/io_buffer.h"
9 #include "net/base/net_errors.h"
10 #include "net/base/upload_data.h"
11 #include "net/base/upload_element_reader.h"
15 bool UploadDataStream::merge_chunks_
= true;
18 void UploadDataStream::ResetMergeChunks() {
19 // WARNING: merge_chunks_ must match the above initializer.
23 UploadDataStream::UploadDataStream(UploadData
* upload_data
)
24 : upload_data_(upload_data
),
28 initialized_successfully_(false),
29 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
30 weak_ptr_factory_for_chunks_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
31 const ScopedVector
<UploadElement
>& elements
= upload_data_
->elements();
32 for (size_t i
= 0; i
< elements
.size(); ++i
)
33 element_readers_
.push_back(UploadElementReader::Create(*elements
[i
]));
35 upload_data_
->set_chunk_callback(
36 base::Bind(&UploadDataStream::OnChunkAvailable
,
37 weak_ptr_factory_for_chunks_
.GetWeakPtr()));
40 UploadDataStream::~UploadDataStream() {
43 int UploadDataStream::Init(const CompletionCallback
& callback
) {
45 // Use fast path when initialization can be done synchronously.
49 return InitInternal(0, callback
);
52 int UploadDataStream::InitSync() {
54 // Initialize all readers synchronously.
55 for (size_t i
= 0; i
< element_readers_
.size(); ++i
) {
56 UploadElementReader
* reader
= element_readers_
[i
];
57 const int result
= reader
->InitSync();
62 FinalizeInitialization();
66 int UploadDataStream::Read(IOBuffer
* buf
,
68 const CompletionCallback
& callback
) {
69 DCHECK(initialized_successfully_
);
70 DCHECK(!callback
.is_null());
71 DCHECK_GT(buf_len
, 0);
72 return ReadInternal(new DrainableIOBuffer(buf
, buf_len
), callback
);
75 int UploadDataStream::ReadSync(IOBuffer
* buf
, int buf_len
) {
76 DCHECK(initialized_successfully_
);
77 DCHECK_GT(buf_len
, 0);
78 return ReadInternal(new DrainableIOBuffer(buf
, buf_len
),
79 CompletionCallback());
82 int64
UploadDataStream::identifier() const {
83 return upload_data_
->identifier();
86 bool UploadDataStream::is_chunked() const {
87 return upload_data_
->is_chunked();
90 bool UploadDataStream::IsEOF() const {
91 DCHECK(initialized_successfully_
);
92 // Check if all elements are consumed.
93 if (element_index_
== element_readers_
.size()) {
94 // If the upload data is chunked, check if the last chunk is appended.
95 if (!is_chunked() || upload_data_
->last_chunk_appended())
101 bool UploadDataStream::IsInMemory() const {
102 // Chunks are in memory, but UploadData does not have all the chunks at
103 // once. Chunks are provided progressively with AppendChunk() as chunks
104 // are ready. Check is_chunked_ here, rather than relying on the loop
105 // below, as there is a case that is_chunked_ is set to true, but the
106 // first chunk is not yet delivered.
110 for (size_t i
= 0; i
< element_readers_
.size(); ++i
) {
111 if (!element_readers_
[i
]->IsInMemory())
117 void UploadDataStream::Reset() {
118 weak_ptr_factory_
.InvalidateWeakPtrs();
119 pending_chunked_read_callback_
.Reset();
120 initialized_successfully_
= false;
121 current_position_
= 0;
126 int UploadDataStream::InitInternal(int start_index
,
127 const CompletionCallback
& callback
) {
128 DCHECK(!initialized_successfully_
);
130 // Call Init() for all elements.
131 for (size_t i
= start_index
; i
< element_readers_
.size(); ++i
) {
132 UploadElementReader
* reader
= element_readers_
[i
];
133 // When new_result is ERR_IO_PENDING, InitInternal() will be called
134 // with start_index == i + 1 when reader->Init() finishes.
135 const int result
= reader
->Init(
136 base::Bind(&UploadDataStream::ResumePendingInit
,
137 weak_ptr_factory_
.GetWeakPtr(),
144 // Finalize initialization.
145 FinalizeInitialization();
149 void UploadDataStream::ResumePendingInit(int start_index
,
150 const CompletionCallback
& callback
,
151 int previous_result
) {
152 DCHECK(!initialized_successfully_
);
153 DCHECK(!callback
.is_null());
154 DCHECK_NE(ERR_IO_PENDING
, previous_result
);
156 // Check the last result.
157 if (previous_result
!= OK
) {
158 callback
.Run(previous_result
);
162 const int result
= InitInternal(start_index
, callback
);
163 if (result
!= ERR_IO_PENDING
)
164 callback
.Run(result
);
167 void UploadDataStream::FinalizeInitialization() {
168 DCHECK(!initialized_successfully_
);
170 uint64 total_size
= 0;
171 for (size_t i
= 0; i
< element_readers_
.size(); ++i
) {
172 UploadElementReader
* reader
= element_readers_
[i
];
173 total_size
+= reader
->GetContentLength();
175 total_size_
= total_size
;
177 initialized_successfully_
= true;
180 int UploadDataStream::ReadInternal(scoped_refptr
<DrainableIOBuffer
> buf
,
181 const CompletionCallback
& callback
) {
182 DCHECK(initialized_successfully_
);
184 while (element_index_
< element_readers_
.size()) {
185 UploadElementReader
* reader
= element_readers_
[element_index_
];
187 if (reader
->BytesRemaining() == 0) {
192 if (buf
->BytesRemaining() == 0)
195 // Some tests need chunks to be kept unmerged.
196 if (!merge_chunks_
&& is_chunked() && buf
->BytesConsumed())
200 if (!callback
.is_null()) {
201 result
= reader
->Read(
203 buf
->BytesRemaining(),
204 base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead
),
205 weak_ptr_factory_
.GetWeakPtr(),
209 result
= reader
->ReadSync(buf
, buf
->BytesRemaining());
210 DCHECK_NE(ERR_IO_PENDING
, result
);
212 if (result
== ERR_IO_PENDING
)
213 return ERR_IO_PENDING
;
214 DCHECK_GE(result
, 0);
215 buf
->DidConsume(result
);
218 const int bytes_copied
= buf
->BytesConsumed();
219 current_position_
+= bytes_copied
;
221 if (is_chunked() && !IsEOF() && bytes_copied
== 0) {
222 DCHECK(!callback
.is_null());
223 DCHECK(pending_chunked_read_callback_
.is_null());
224 pending_chunked_read_callback_
=
225 base::Bind(&UploadDataStream::ResumePendingRead
,
226 weak_ptr_factory_
.GetWeakPtr(),
230 return ERR_IO_PENDING
;
235 void UploadDataStream::ResumePendingRead(scoped_refptr
<DrainableIOBuffer
> buf
,
236 const CompletionCallback
& callback
,
237 int previous_result
) {
238 DCHECK_GE(previous_result
, 0);
240 // Add the last result.
241 buf
->DidConsume(previous_result
);
243 DCHECK(!callback
.is_null());
244 const int result
= ReadInternal(buf
, callback
);
245 if (result
!= ERR_IO_PENDING
)
246 callback
.Run(result
);
249 void UploadDataStream::OnChunkAvailable() {
250 DCHECK(is_chunked());
252 // Initialize a reader for the newly appended chunk.
253 const ScopedVector
<UploadElement
>& elements
= upload_data_
->elements();
254 DCHECK_EQ(elements
.size(), element_readers_
.size() + 1);
256 // We can initialize the reader synchronously here because only bytes can be
257 // appended for chunked data. We leave |total_size_| at zero, since for
258 // chunked uploads, we may not know the total size.
259 const UploadElement
& element
= *elements
.back();
260 DCHECK_EQ(UploadElement::TYPE_BYTES
, element
.type());
261 UploadElementReader
* reader
= UploadElementReader::Create(element
);
262 const int rv
= reader
->InitSync();
264 element_readers_
.push_back(reader
);
266 // Resume pending read.
267 if (!pending_chunked_read_callback_
.is_null()) {
268 base::Closure callback
= pending_chunked_read_callback_
;
269 pending_chunked_read_callback_
.Reset();