[Storage] Blob Storage Refactoring pt 1:
[chromium-blink-merge.git] / device / serial / data_sender.cc
blob18b1a43ef3357543a32a99bd16d95843262e0a35
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_sender.h"
7 #include <algorithm>
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
12 namespace device {
14 // Represents a send that is not yet fulfilled.
15 class DataSender::PendingSend {
16 public:
17 PendingSend(const base::StringPiece& data,
18 const DataSentCallback& callback,
19 const SendErrorCallback& error_callback,
20 int32_t fatal_error_value);
22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the
23 // number of bytes that were part of this send from |num_bytes|. Returns
24 // whether this send has been completed. If this send has been completed, this
25 // calls |callback_|.
26 bool ReportBytesSent(uint32_t* num_bytes);
28 // Invoked to report that |num_bytes| of data have been sent and then an
29 // error, |error| was encountered. Subtracts the number of bytes that were
30 // part of this send from |num_bytes|. If this send was not completed before
31 // the error, this calls |error_callback_| to report the error. Otherwise,
32 // this calls |callback_|. Returns the number of bytes sent but not acked.
33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
35 // Reports |fatal_error_value_| to |receive_error_callback_|.
36 void DispatchFatalError();
38 // Attempts to send any data not yet sent to |sink|.
39 bool SendData(serial::DataSink* sink, uint32_t* available_buffer_size);
41 private:
42 // Invoked to update |bytes_acked_| and |num_bytes|.
43 void ReportBytesSentInternal(uint32_t* num_bytes);
45 // The data to send.
46 const base::StringPiece data_;
48 // The callback to report success.
49 const DataSentCallback callback_;
51 // The callback to report errors.
52 const SendErrorCallback error_callback_;
54 // The error value to report when DispatchFatalError() is called.
55 const int32_t fatal_error_value_;
57 // The number of bytes sent to the DataSink.
58 uint32_t bytes_sent_;
60 // The number of bytes acked.
61 uint32_t bytes_acked_;
64 DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
65 uint32_t buffer_size,
66 int32_t fatal_error_value)
67 : sink_(sink.Pass()),
68 fatal_error_value_(fatal_error_value),
69 available_buffer_capacity_(buffer_size),
70 shut_down_(false) {
71 sink_.set_error_handler(this);
72 sink_.set_client(this);
73 sink_->Init(buffer_size);
76 DataSender::~DataSender() {
77 ShutDown();
80 bool DataSender::Send(const base::StringPiece& data,
81 const DataSentCallback& callback,
82 const SendErrorCallback& error_callback) {
83 DCHECK(!callback.is_null() && !error_callback.is_null());
84 if (!pending_cancel_.is_null() || shut_down_)
85 return false;
87 pending_sends_.push(linked_ptr<PendingSend>(
88 new PendingSend(data, callback, error_callback, fatal_error_value_)));
89 SendInternal();
90 return true;
93 bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
94 DCHECK(!callback.is_null());
95 if (!pending_cancel_.is_null() || shut_down_)
96 return false;
97 if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
98 base::MessageLoop::current()->PostTask(FROM_HERE, callback);
99 return true;
102 pending_cancel_ = callback;
103 sink_->Cancel(error);
104 return true;
107 void DataSender::ReportBytesSent(uint32_t bytes_sent) {
108 if (shut_down_)
109 return;
111 available_buffer_capacity_ += bytes_sent;
112 while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
113 sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
114 sends_awaiting_ack_.pop();
116 if (bytes_sent > 0 && !pending_sends_.empty()) {
117 bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
118 DCHECK(!finished);
119 if (finished) {
120 ShutDown();
121 return;
124 if (bytes_sent != 0) {
125 ShutDown();
126 return;
128 if (pending_sends_.empty() && sends_awaiting_ack_.empty())
129 RunCancelCallback();
130 SendInternal();
133 void DataSender::ReportBytesSentAndError(
134 uint32_t bytes_sent,
135 int32_t error,
136 const mojo::Callback<void()>& callback) {
137 if (shut_down_)
138 return;
140 available_buffer_capacity_ += bytes_sent;
141 while (!sends_awaiting_ack_.empty()) {
142 available_buffer_capacity_ +=
143 sends_awaiting_ack_.front()->ReportBytesSentAndError(&bytes_sent,
144 error);
145 sends_awaiting_ack_.pop();
147 while (!pending_sends_.empty()) {
148 available_buffer_capacity_ +=
149 pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
150 pending_sends_.pop();
152 callback.Run();
153 RunCancelCallback();
156 void DataSender::OnConnectionError() {
157 ShutDown();
160 void DataSender::SendInternal() {
161 while (!pending_sends_.empty() && available_buffer_capacity_) {
162 if (pending_sends_.front()->SendData(sink_.get(),
163 &available_buffer_capacity_)) {
164 sends_awaiting_ack_.push(pending_sends_.front());
165 pending_sends_.pop();
170 void DataSender::RunCancelCallback() {
171 DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
172 if (pending_cancel_.is_null())
173 return;
175 base::MessageLoop::current()->PostTask(FROM_HERE,
176 base::Bind(pending_cancel_));
177 pending_cancel_.Reset();
180 void DataSender::ShutDown() {
181 shut_down_ = true;
182 while (!pending_sends_.empty()) {
183 pending_sends_.front()->DispatchFatalError();
184 pending_sends_.pop();
186 while (!sends_awaiting_ack_.empty()) {
187 sends_awaiting_ack_.front()->DispatchFatalError();
188 sends_awaiting_ack_.pop();
190 RunCancelCallback();
193 DataSender::PendingSend::PendingSend(const base::StringPiece& data,
194 const DataSentCallback& callback,
195 const SendErrorCallback& error_callback,
196 int32_t fatal_error_value)
197 : data_(data),
198 callback_(callback),
199 error_callback_(error_callback),
200 fatal_error_value_(fatal_error_value),
201 bytes_sent_(0),
202 bytes_acked_(0) {
205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
206 ReportBytesSentInternal(num_bytes);
207 if (bytes_acked_ < data_.size())
208 return false;
210 base::MessageLoop::current()->PostTask(FROM_HERE,
211 base::Bind(callback_, bytes_acked_));
212 return true;
215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
216 int32_t error) {
217 ReportBytesSentInternal(num_bytes);
218 if (*num_bytes > 0) {
219 base::MessageLoop::current()->PostTask(FROM_HERE,
220 base::Bind(callback_, bytes_acked_));
221 return 0;
223 base::MessageLoop::current()->PostTask(
224 FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
225 return bytes_sent_ - bytes_acked_;
228 void DataSender::PendingSend::DispatchFatalError() {
229 base::MessageLoop::current()->PostTask(
230 FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
233 bool DataSender::PendingSend::SendData(serial::DataSink* sink,
234 uint32_t* available_buffer_size) {
235 uint32_t num_bytes_to_send =
236 std::min(static_cast<uint32_t>(data_.size() - bytes_sent_),
237 *available_buffer_size);
238 mojo::Array<uint8_t> bytes(num_bytes_to_send);
239 memcpy(&bytes[0], data_.data() + bytes_sent_, num_bytes_to_send);
240 bytes_sent_ += num_bytes_to_send;
241 *available_buffer_size -= num_bytes_to_send;
242 sink->OnData(bytes.Pass());
243 return bytes_sent_ == data_.size();
246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
247 bytes_acked_ += *num_bytes;
248 if (bytes_acked_ > bytes_sent_) {
249 *num_bytes = bytes_acked_ - bytes_sent_;
250 bytes_acked_ = bytes_sent_;
251 } else {
252 *num_bytes = 0;
256 } // namespace device