1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/streams/stream.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/values.h"
11 #include "content/browser/streams/stream_handle_impl.h"
12 #include "content/browser/streams/stream_read_observer.h"
13 #include "content/browser/streams/stream_registry.h"
14 #include "content/browser/streams/stream_write_observer.h"
15 #include "net/base/io_buffer.h"
16 #include "net/http/http_response_headers.h"
19 // Start throttling the connection at about 1MB.
20 const size_t kDeferSizeThreshold
= 40 * 32768;
25 Stream::Stream(StreamRegistry
* registry
,
26 StreamWriteObserver
* write_observer
,
28 : can_add_data_(true),
32 last_total_buffered_bytes_(0),
35 write_observer_(write_observer
),
37 weak_ptr_factory_(this) {
38 CreateByteStream(base::MessageLoopProxy::current(),
39 base::MessageLoopProxy::current(),
44 // Setup callback for writing.
45 writer_
->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable
,
46 weak_ptr_factory_
.GetWeakPtr()));
47 reader_
->RegisterCallback(base::Bind(&Stream::OnDataAvailable
,
48 weak_ptr_factory_
.GetWeakPtr()));
50 registry_
->RegisterStream(this);
56 bool Stream::SetReadObserver(StreamReadObserver
* observer
) {
59 read_observer_
= observer
;
63 void Stream::RemoveReadObserver(StreamReadObserver
* observer
) {
64 DCHECK(observer
== read_observer_
);
65 read_observer_
= NULL
;
68 void Stream::RemoveWriteObserver(StreamWriteObserver
* observer
) {
69 DCHECK(observer
== write_observer_
);
70 write_observer_
= NULL
;
73 void Stream::Abort() {
74 // Clear all buffer. It's safe to clear reader_ here since the same thread
75 // is used for both input and output operation.
79 can_add_data_
= false;
80 registry_
->UnregisterStream(url());
83 void Stream::AddData(scoped_refptr
<net::IOBuffer
> buffer
, size_t size
) {
87 size_t current_buffered_bytes
= writer_
->GetTotalBufferedBytes();
88 if (!registry_
->UpdateMemoryUsage(url(), current_buffered_bytes
, size
)) {
93 // Now it's guaranteed that this doesn't overflow. This must be done before
94 // Write() since GetTotalBufferedBytes() may return different value after
95 // Write() call, so if we use the new value, information in this instance and
96 // one in |registry_| become inconsistent.
97 last_total_buffered_bytes_
= current_buffered_bytes
+ size
;
99 can_add_data_
= writer_
->Write(buffer
, size
);
102 void Stream::AddData(const char* data
, size_t size
) {
106 scoped_refptr
<net::IOBuffer
> io_buffer(new net::IOBuffer(size
));
107 memcpy(io_buffer
->data(), data
, size
);
108 AddData(io_buffer
, size
);
111 void Stream::Flush() {
117 void Stream::Finalize() {
124 // Continue asynchronously.
125 base::MessageLoopProxy::current()->PostTask(
127 base::Bind(&Stream::OnDataAvailable
, weak_ptr_factory_
.GetWeakPtr()));
130 Stream::StreamState
Stream::ReadRawData(net::IOBuffer
* buf
,
138 DCHECK(!data_length_
);
139 DCHECK(!data_bytes_read_
);
142 return STREAM_ABORTED
;
144 ByteStreamReader::StreamState state
= reader_
->Read(&data_
, &data_length_
);
146 case ByteStreamReader::STREAM_HAS_DATA
:
148 case ByteStreamReader::STREAM_COMPLETE
:
149 registry_
->UnregisterStream(url());
150 return STREAM_COMPLETE
;
151 case ByteStreamReader::STREAM_EMPTY
:
156 const size_t remaining_bytes
= data_length_
- data_bytes_read_
;
158 static_cast<size_t>(buf_size
) < remaining_bytes
?
159 buf_size
: remaining_bytes
;
160 memcpy(buf
->data(), data_
->data() + data_bytes_read_
, to_read
);
161 data_bytes_read_
+= to_read
;
162 if (data_bytes_read_
>= data_length_
)
165 *bytes_read
= to_read
;
166 return STREAM_HAS_DATA
;
169 scoped_ptr
<StreamHandle
> Stream::CreateHandle() {
170 CHECK(!stream_handle_
);
171 stream_handle_
= new StreamHandleImpl(weak_ptr_factory_
.GetWeakPtr());
172 return scoped_ptr
<StreamHandle
>(stream_handle_
).Pass();
175 void Stream::CloseHandle() {
176 // Prevent deletion until this function ends.
177 scoped_refptr
<Stream
> ref(this);
179 CHECK(stream_handle_
);
180 stream_handle_
= NULL
;
181 registry_
->UnregisterStream(url());
183 write_observer_
->OnClose(this);
186 void Stream::OnSpaceAvailable() {
187 can_add_data_
= true;
189 write_observer_
->OnSpaceAvailable(this);
192 void Stream::OnDataAvailable() {
194 read_observer_
->OnDataAvailable(this);
197 void Stream::ClearBuffer() {
200 data_bytes_read_
= 0;
203 } // namespace content