[Storage] Blob Storage Refactoring pt 1:
[chromium-blink-merge.git] / device / serial / data_sink_receiver.cc
blob170f8b2c5b052e80d1c9be8a945a2d3492da3326
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_sink_receiver.h"
7 #include <limits>
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
12 namespace device {
14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
15 // DataSinkReceiver.
16 class DataSinkReceiver::Buffer : public ReadOnlyBuffer {
17 public:
18 Buffer(scoped_refptr<DataSinkReceiver> receiver,
19 const char* buffer,
20 uint32_t buffer_size);
21 ~Buffer() override;
23 void Cancel(int32_t error);
25 // ReadOnlyBuffer overrides.
26 const char* GetData() override;
27 uint32_t GetSize() override;
28 void Done(uint32_t bytes_read) override;
29 void DoneWithError(uint32_t bytes_read, int32_t error) override;
31 private:
32 // The DataSinkReceiver of whose buffer we are providing a view.
33 scoped_refptr<DataSinkReceiver> receiver_;
35 const char* buffer_;
36 uint32_t buffer_size_;
38 // Whether this receive has been cancelled.
39 bool cancelled_;
41 // If |cancelled_|, contains the cancellation error to report.
42 int32_t cancellation_error_;
45 // A frame of data received from the client.
46 class DataSinkReceiver::DataFrame {
47 public:
48 explicit DataFrame(mojo::Array<uint8_t> data);
50 // Returns the number of uncomsumed bytes remaining of this data frame.
51 uint32_t GetRemainingBytes();
53 // Returns a pointer to the remaining data to be consumed.
54 const char* GetData();
56 // Reports that |bytes_read| bytes have been consumed.
57 void OnDataConsumed(uint32_t bytes_read);
59 private:
60 mojo::Array<uint8_t> data_;
61 uint32_t offset_;
64 DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback,
65 const CancelCallback& cancel_callback,
66 const ErrorCallback& error_callback)
67 : ready_callback_(ready_callback),
68 cancel_callback_(cancel_callback),
69 error_callback_(error_callback),
70 flush_pending_(false),
71 buffer_in_use_(NULL),
72 initialized_(false),
73 available_buffer_capacity_(0),
74 shut_down_(false),
75 weak_factory_(this) {
78 void DataSinkReceiver::ShutDown() {
79 shut_down_ = true;
82 DataSinkReceiver::~DataSinkReceiver() {
85 void DataSinkReceiver::Init(uint32_t buffer_size) {
86 if (initialized_) {
87 ShutDown();
88 return;
90 initialized_ = true;
91 available_buffer_capacity_ = buffer_size;
94 void DataSinkReceiver::Cancel(int32_t error) {
95 // If we have sent a ReportBytesSentAndError but have not received the
96 // response, that ReportBytesSentAndError message will appear to the
97 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
98 // the cancel.
99 if (flush_pending_)
100 return;
102 // If there is a buffer is in use, mark the buffer as cancelled and notify the
103 // client by calling |cancel_callback_|. The sink implementation may or may
104 // not take the cancellation into account when deciding what error (if any) to
105 // return. If the sink returns an error, we ignore the cancellation error.
106 // Otherwise, if the sink does not report an error, we override that with the
107 // cancellation error. Once a cancellation has been received, the next report
108 // sent to the client will always contain an error; the error returned by the
109 // sink or the cancellation error if the sink does not return an error.
110 if (buffer_in_use_) {
111 buffer_in_use_->Cancel(error);
112 if (!cancel_callback_.is_null())
113 cancel_callback_.Run(error);
114 return;
116 ReportBytesSentAndError(0, error);
119 void DataSinkReceiver::OnData(mojo::Array<uint8_t> data) {
120 if (!initialized_) {
121 ShutDown();
122 return;
124 if (data.size() > available_buffer_capacity_) {
125 ShutDown();
126 return;
128 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
129 pending_data_buffers_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
130 if (!buffer_in_use_ && !flush_pending_)
131 RunReadyCallback();
134 void DataSinkReceiver::OnConnectionError() {
135 DispatchFatalError();
138 void DataSinkReceiver::RunReadyCallback() {
139 DCHECK(!shut_down_ && !flush_pending_);
140 // If data arrives while a call to RunReadyCallback() is posted, we can be
141 // called with buffer_in_use_ already set.
142 if (buffer_in_use_)
143 return;
144 buffer_in_use_ =
145 new Buffer(this,
146 pending_data_buffers_.front()->GetData(),
147 pending_data_buffers_.front()->GetRemainingBytes());
148 ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_));
151 void DataSinkReceiver::Done(uint32_t bytes_read) {
152 if (!DoneInternal(bytes_read))
153 return;
154 client()->ReportBytesSent(bytes_read);
155 if (!pending_data_buffers_.empty()) {
156 base::MessageLoop::current()->PostTask(
157 FROM_HERE,
158 base::Bind(&DataSinkReceiver::RunReadyCallback,
159 weak_factory_.GetWeakPtr()));
163 void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) {
164 if (!DoneInternal(bytes_read))
165 return;
166 ReportBytesSentAndError(bytes_read, error);
169 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) {
170 if (shut_down_)
171 return false;
173 DCHECK(buffer_in_use_);
174 buffer_in_use_ = NULL;
175 available_buffer_capacity_ += bytes_read;
176 pending_data_buffers_.front()->OnDataConsumed(bytes_read);
177 if (pending_data_buffers_.front()->GetRemainingBytes() == 0)
178 pending_data_buffers_.pop();
179 return true;
182 void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read,
183 int32_t error) {
184 // When we encounter an error, we must discard the data from any send buffers
185 // transmitted by the DataSinkClient before it receives this error.
186 flush_pending_ = true;
187 client()->ReportBytesSentAndError(
188 bytes_read,
189 error,
190 base::Bind(&DataSinkReceiver::DoFlush, weak_factory_.GetWeakPtr()));
193 void DataSinkReceiver::DoFlush() {
194 DCHECK(flush_pending_);
195 flush_pending_ = false;
196 while (!pending_data_buffers_.empty()) {
197 available_buffer_capacity_ +=
198 pending_data_buffers_.front()->GetRemainingBytes();
199 pending_data_buffers_.pop();
203 void DataSinkReceiver::DispatchFatalError() {
204 if (shut_down_)
205 return;
207 ShutDown();
208 if (!error_callback_.is_null())
209 error_callback_.Run();
212 DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver,
213 const char* buffer,
214 uint32_t buffer_size)
215 : receiver_(receiver),
216 buffer_(buffer),
217 buffer_size_(buffer_size),
218 cancelled_(false),
219 cancellation_error_(0) {
222 DataSinkReceiver::Buffer::~Buffer() {
223 if (!receiver_.get())
224 return;
225 if (cancelled_)
226 receiver_->DoneWithError(0, cancellation_error_);
227 else
228 receiver_->Done(0);
231 void DataSinkReceiver::Buffer::Cancel(int32_t error) {
232 cancelled_ = true;
233 cancellation_error_ = error;
236 const char* DataSinkReceiver::Buffer::GetData() {
237 return buffer_;
240 uint32_t DataSinkReceiver::Buffer::GetSize() {
241 return buffer_size_;
244 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) {
245 scoped_refptr<DataSinkReceiver> receiver = receiver_;
246 receiver_ = nullptr;
247 if (cancelled_)
248 receiver->DoneWithError(bytes_read, cancellation_error_);
249 else
250 receiver->Done(bytes_read);
251 buffer_ = NULL;
252 buffer_size_ = 0;
255 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read,
256 int32_t error) {
257 scoped_refptr<DataSinkReceiver> receiver = receiver_;
258 receiver_ = nullptr;
259 receiver->DoneWithError(bytes_read, error);
260 buffer_ = NULL;
261 buffer_size_ = 0;
264 DataSinkReceiver::DataFrame::DataFrame(mojo::Array<uint8_t> data)
265 : data_(data.Pass()), offset_(0) {
266 DCHECK_LT(0u, data_.size());
269 // Returns the number of uncomsumed bytes remaining of this data frame.
270 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
271 return static_cast<uint32_t>(data_.size() - offset_);
274 // Returns a pointer to the remaining data to be consumed.
275 const char* DataSinkReceiver::DataFrame::GetData() {
276 DCHECK_LT(offset_, data_.size());
277 return reinterpret_cast<const char*>(&data_[0]) + offset_;
280 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read) {
281 offset_ += bytes_read;
282 DCHECK_LE(offset_, data_.size());
285 } // namespace device