Updating trunk VERSION from 2139.0 to 2140.0
[chromium-blink-merge.git] / content / browser / streams / stream.cc
blobd10770c42d2d6f5b0dd6498e420eaa04807cbeec
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/streams/stream.h"
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/message_loop/message_loop_proxy.h"
10 #include "base/values.h"
11 #include "content/browser/streams/stream_handle_impl.h"
12 #include "content/browser/streams/stream_read_observer.h"
13 #include "content/browser/streams/stream_registry.h"
14 #include "content/browser/streams/stream_write_observer.h"
15 #include "net/base/io_buffer.h"
16 #include "net/http/http_response_headers.h"
18 namespace {
19 // Start throttling the connection at about 1MB.
20 const size_t kDeferSizeThreshold = 40 * 32768;
23 namespace content {
25 Stream::Stream(StreamRegistry* registry,
26 StreamWriteObserver* write_observer,
27 const GURL& url)
28 : can_add_data_(true),
29 url_(url),
30 data_length_(0),
31 data_bytes_read_(0),
32 last_total_buffered_bytes_(0),
33 registry_(registry),
34 read_observer_(NULL),
35 write_observer_(write_observer),
36 stream_handle_(NULL),
37 weak_ptr_factory_(this) {
38 CreateByteStream(base::MessageLoopProxy::current(),
39 base::MessageLoopProxy::current(),
40 kDeferSizeThreshold,
41 &writer_,
42 &reader_);
44 // Setup callback for writing.
45 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
46 weak_ptr_factory_.GetWeakPtr()));
47 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
48 weak_ptr_factory_.GetWeakPtr()));
50 registry_->RegisterStream(this);
53 Stream::~Stream() {
56 bool Stream::SetReadObserver(StreamReadObserver* observer) {
57 if (read_observer_)
58 return false;
59 read_observer_ = observer;
60 return true;
63 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
64 DCHECK(observer == read_observer_);
65 read_observer_ = NULL;
68 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
69 DCHECK(observer == write_observer_);
70 write_observer_ = NULL;
73 void Stream::Abort() {
74 // Clear all buffer. It's safe to clear reader_ here since the same thread
75 // is used for both input and output operation.
76 writer_.reset();
77 reader_.reset();
78 ClearBuffer();
79 can_add_data_ = false;
80 registry_->UnregisterStream(url());
83 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
84 if (!writer_.get())
85 return;
87 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
88 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
89 Abort();
90 return;
93 // Now it's guaranteed that this doesn't overflow. This must be done before
94 // Write() since GetTotalBufferedBytes() may return different value after
95 // Write() call, so if we use the new value, information in this instance and
96 // one in |registry_| become inconsistent.
97 last_total_buffered_bytes_ = current_buffered_bytes + size;
99 can_add_data_ = writer_->Write(buffer, size);
102 void Stream::AddData(const char* data, size_t size) {
103 if (!writer_.get())
104 return;
106 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
107 memcpy(io_buffer->data(), data, size);
108 AddData(io_buffer, size);
111 void Stream::Finalize() {
112 if (!writer_.get())
113 return;
115 writer_->Close(0);
116 writer_.reset();
118 // Continue asynchronously.
119 base::MessageLoopProxy::current()->PostTask(
120 FROM_HERE,
121 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
124 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
125 int buf_size,
126 int* bytes_read) {
127 DCHECK(buf);
128 DCHECK(bytes_read);
130 *bytes_read = 0;
131 if (!data_.get()) {
132 DCHECK(!data_length_);
133 DCHECK(!data_bytes_read_);
135 if (!reader_.get())
136 return STREAM_ABORTED;
138 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
139 switch (state) {
140 case ByteStreamReader::STREAM_HAS_DATA:
141 break;
142 case ByteStreamReader::STREAM_COMPLETE:
143 registry_->UnregisterStream(url());
144 return STREAM_COMPLETE;
145 case ByteStreamReader::STREAM_EMPTY:
146 return STREAM_EMPTY;
150 const size_t remaining_bytes = data_length_ - data_bytes_read_;
151 size_t to_read =
152 static_cast<size_t>(buf_size) < remaining_bytes ?
153 buf_size : remaining_bytes;
154 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
155 data_bytes_read_ += to_read;
156 if (data_bytes_read_ >= data_length_)
157 ClearBuffer();
159 *bytes_read = to_read;
160 return STREAM_HAS_DATA;
163 scoped_ptr<StreamHandle> Stream::CreateHandle(
164 const GURL& original_url,
165 const std::string& mime_type,
166 scoped_refptr<net::HttpResponseHeaders> response_headers) {
167 CHECK(!stream_handle_);
168 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr(),
169 original_url,
170 mime_type,
171 response_headers);
172 return scoped_ptr<StreamHandle>(stream_handle_).Pass();
175 void Stream::CloseHandle() {
176 // Prevent deletion until this function ends.
177 scoped_refptr<Stream> ref(this);
179 CHECK(stream_handle_);
180 stream_handle_ = NULL;
181 registry_->UnregisterStream(url());
182 if (write_observer_)
183 write_observer_->OnClose(this);
186 void Stream::OnSpaceAvailable() {
187 can_add_data_ = true;
188 if (write_observer_)
189 write_observer_->OnSpaceAvailable(this);
192 void Stream::OnDataAvailable() {
193 if (read_observer_)
194 read_observer_->OnDataAvailable(this);
197 void Stream::ClearBuffer() {
198 data_ = NULL;
199 data_length_ = 0;
200 data_bytes_read_ = 0;
203 } // namespace content