1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/streams/stream.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "base/values.h"
12 #include "content/browser/streams/stream_handle_impl.h"
13 #include "content/browser/streams/stream_read_observer.h"
14 #include "content/browser/streams/stream_registry.h"
15 #include "content/browser/streams/stream_write_observer.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_response_headers.h"
20 // Start throttling the connection at about 1MB.
21 const size_t kDeferSizeThreshold
= 40 * 32768;
26 Stream::Stream(StreamRegistry
* registry
,
27 StreamWriteObserver
* write_observer
,
29 : can_add_data_(true),
33 last_total_buffered_bytes_(0),
36 write_observer_(write_observer
),
38 weak_ptr_factory_(this) {
39 CreateByteStream(base::ThreadTaskRunnerHandle::Get(),
40 base::ThreadTaskRunnerHandle::Get(), kDeferSizeThreshold
,
43 // Setup callback for writing.
44 writer_
->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable
,
45 weak_ptr_factory_
.GetWeakPtr()));
46 reader_
->RegisterCallback(base::Bind(&Stream::OnDataAvailable
,
47 weak_ptr_factory_
.GetWeakPtr()));
49 registry_
->RegisterStream(this);
55 bool Stream::SetReadObserver(StreamReadObserver
* observer
) {
58 read_observer_
= observer
;
62 void Stream::RemoveReadObserver(StreamReadObserver
* observer
) {
63 DCHECK(observer
== read_observer_
);
64 read_observer_
= NULL
;
67 void Stream::RemoveWriteObserver(StreamWriteObserver
* observer
) {
68 DCHECK(observer
== write_observer_
);
69 write_observer_
= NULL
;
72 void Stream::Abort() {
73 // Clear all buffer. It's safe to clear reader_ here since the same thread
74 // is used for both input and output operation.
78 can_add_data_
= false;
79 registry_
->UnregisterStream(url());
82 void Stream::AddData(scoped_refptr
<net::IOBuffer
> buffer
, size_t size
) {
86 size_t current_buffered_bytes
= writer_
->GetTotalBufferedBytes();
87 if (!registry_
->UpdateMemoryUsage(url(), current_buffered_bytes
, size
)) {
92 // Now it's guaranteed that this doesn't overflow. This must be done before
93 // Write() since GetTotalBufferedBytes() may return different value after
94 // Write() call, so if we use the new value, information in this instance and
95 // one in |registry_| become inconsistent.
96 last_total_buffered_bytes_
= current_buffered_bytes
+ size
;
98 can_add_data_
= writer_
->Write(buffer
, size
);
101 void Stream::AddData(const char* data
, size_t size
) {
105 scoped_refptr
<net::IOBuffer
> io_buffer(new net::IOBuffer(size
));
106 memcpy(io_buffer
->data(), data
, size
);
107 AddData(io_buffer
, size
);
110 void Stream::Flush() {
116 void Stream::Finalize() {
123 // Continue asynchronously.
124 base::ThreadTaskRunnerHandle::Get()->PostTask(
126 base::Bind(&Stream::OnDataAvailable
, weak_ptr_factory_
.GetWeakPtr()));
129 Stream::StreamState
Stream::ReadRawData(net::IOBuffer
* buf
,
137 DCHECK(!data_length_
);
138 DCHECK(!data_bytes_read_
);
141 return STREAM_ABORTED
;
143 ByteStreamReader::StreamState state
= reader_
->Read(&data_
, &data_length_
);
145 case ByteStreamReader::STREAM_HAS_DATA
:
147 case ByteStreamReader::STREAM_COMPLETE
:
148 registry_
->UnregisterStream(url());
149 return STREAM_COMPLETE
;
150 case ByteStreamReader::STREAM_EMPTY
:
155 const size_t remaining_bytes
= data_length_
- data_bytes_read_
;
157 static_cast<size_t>(buf_size
) < remaining_bytes
?
158 buf_size
: remaining_bytes
;
159 memcpy(buf
->data(), data_
->data() + data_bytes_read_
, to_read
);
160 data_bytes_read_
+= to_read
;
161 if (data_bytes_read_
>= data_length_
)
164 *bytes_read
= to_read
;
165 return STREAM_HAS_DATA
;
168 scoped_ptr
<StreamHandle
> Stream::CreateHandle() {
169 CHECK(!stream_handle_
);
170 stream_handle_
= new StreamHandleImpl(weak_ptr_factory_
.GetWeakPtr());
171 return scoped_ptr
<StreamHandle
>(stream_handle_
).Pass();
174 void Stream::CloseHandle() {
175 // Prevent deletion until this function ends.
176 scoped_refptr
<Stream
> ref(this);
178 CHECK(stream_handle_
);
179 stream_handle_
= NULL
;
180 registry_
->UnregisterStream(url());
182 write_observer_
->OnClose(this);
185 void Stream::OnSpaceAvailable() {
186 can_add_data_
= true;
188 write_observer_
->OnSpaceAvailable(this);
191 void Stream::OnDataAvailable() {
193 read_observer_
->OnDataAvailable(this);
196 void Stream::ClearBuffer() {
199 data_bytes_read_
= 0;
202 } // namespace content