[refactor] More post-NSS WebCrypto cleanups (utility functions).
[chromium-blink-merge.git] / content / browser / streams / stream.cc
blob3f4ae68083eb3424f0389d40b247280daf126b0f
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/streams/stream.h"
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/thread_task_runner_handle.h"
11 #include "base/values.h"
12 #include "content/browser/streams/stream_handle_impl.h"
13 #include "content/browser/streams/stream_read_observer.h"
14 #include "content/browser/streams/stream_registry.h"
15 #include "content/browser/streams/stream_write_observer.h"
16 #include "net/base/io_buffer.h"
17 #include "net/http/http_response_headers.h"
19 namespace {
20 // Start throttling the connection at about 1MB.
21 const size_t kDeferSizeThreshold = 40 * 32768;
24 namespace content {
26 Stream::Stream(StreamRegistry* registry,
27 StreamWriteObserver* write_observer,
28 const GURL& url)
29 : can_add_data_(true),
30 url_(url),
31 data_length_(0),
32 data_bytes_read_(0),
33 last_total_buffered_bytes_(0),
34 registry_(registry),
35 read_observer_(NULL),
36 write_observer_(write_observer),
37 stream_handle_(NULL),
38 weak_ptr_factory_(this) {
39 CreateByteStream(base::ThreadTaskRunnerHandle::Get(),
40 base::ThreadTaskRunnerHandle::Get(), kDeferSizeThreshold,
41 &writer_, &reader_);
43 // Setup callback for writing.
44 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable,
45 weak_ptr_factory_.GetWeakPtr()));
46 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable,
47 weak_ptr_factory_.GetWeakPtr()));
49 registry_->RegisterStream(this);
52 Stream::~Stream() {
55 bool Stream::SetReadObserver(StreamReadObserver* observer) {
56 if (read_observer_)
57 return false;
58 read_observer_ = observer;
59 return true;
62 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
63 DCHECK(observer == read_observer_);
64 read_observer_ = NULL;
67 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
68 DCHECK(observer == write_observer_);
69 write_observer_ = NULL;
72 void Stream::Abort() {
73 // Clear all buffer. It's safe to clear reader_ here since the same thread
74 // is used for both input and output operation.
75 writer_.reset();
76 reader_.reset();
77 ClearBuffer();
78 can_add_data_ = false;
79 registry_->UnregisterStream(url());
82 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
83 if (!writer_.get())
84 return;
86 size_t current_buffered_bytes = writer_->GetTotalBufferedBytes();
87 if (!registry_->UpdateMemoryUsage(url(), current_buffered_bytes, size)) {
88 Abort();
89 return;
92 // Now it's guaranteed that this doesn't overflow. This must be done before
93 // Write() since GetTotalBufferedBytes() may return different value after
94 // Write() call, so if we use the new value, information in this instance and
95 // one in |registry_| become inconsistent.
96 last_total_buffered_bytes_ = current_buffered_bytes + size;
98 can_add_data_ = writer_->Write(buffer, size);
101 void Stream::AddData(const char* data, size_t size) {
102 if (!writer_.get())
103 return;
105 scoped_refptr<net::IOBuffer> io_buffer(new net::IOBuffer(size));
106 memcpy(io_buffer->data(), data, size);
107 AddData(io_buffer, size);
110 void Stream::Flush() {
111 if (!writer_.get())
112 return;
113 writer_->Flush();
116 void Stream::Finalize() {
117 if (!writer_.get())
118 return;
120 writer_->Close(0);
121 writer_.reset();
123 // Continue asynchronously.
124 base::ThreadTaskRunnerHandle::Get()->PostTask(
125 FROM_HERE,
126 base::Bind(&Stream::OnDataAvailable, weak_ptr_factory_.GetWeakPtr()));
129 Stream::StreamState Stream::ReadRawData(net::IOBuffer* buf,
130 int buf_size,
131 int* bytes_read) {
132 DCHECK(buf);
133 DCHECK(bytes_read);
135 *bytes_read = 0;
136 if (!data_.get()) {
137 DCHECK(!data_length_);
138 DCHECK(!data_bytes_read_);
140 if (!reader_.get())
141 return STREAM_ABORTED;
143 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
144 switch (state) {
145 case ByteStreamReader::STREAM_HAS_DATA:
146 break;
147 case ByteStreamReader::STREAM_COMPLETE:
148 registry_->UnregisterStream(url());
149 return STREAM_COMPLETE;
150 case ByteStreamReader::STREAM_EMPTY:
151 return STREAM_EMPTY;
155 const size_t remaining_bytes = data_length_ - data_bytes_read_;
156 size_t to_read =
157 static_cast<size_t>(buf_size) < remaining_bytes ?
158 buf_size : remaining_bytes;
159 memcpy(buf->data(), data_->data() + data_bytes_read_, to_read);
160 data_bytes_read_ += to_read;
161 if (data_bytes_read_ >= data_length_)
162 ClearBuffer();
164 *bytes_read = to_read;
165 return STREAM_HAS_DATA;
168 scoped_ptr<StreamHandle> Stream::CreateHandle() {
169 CHECK(!stream_handle_);
170 stream_handle_ = new StreamHandleImpl(weak_ptr_factory_.GetWeakPtr());
171 return scoped_ptr<StreamHandle>(stream_handle_).Pass();
174 void Stream::CloseHandle() {
175 // Prevent deletion until this function ends.
176 scoped_refptr<Stream> ref(this);
178 CHECK(stream_handle_);
179 stream_handle_ = NULL;
180 registry_->UnregisterStream(url());
181 if (write_observer_)
182 write_observer_->OnClose(this);
185 void Stream::OnSpaceAvailable() {
186 can_add_data_ = true;
187 if (write_observer_)
188 write_observer_->OnSpaceAvailable(this);
191 void Stream::OnDataAvailable() {
192 if (read_observer_)
193 read_observer_->OnDataAvailable(this);
196 void Stream::ClearBuffer() {
197 data_ = NULL;
198 data_length_ = 0;
199 data_bytes_read_ = 0;
202 } // namespace content