1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_sender.h"
10 #include "base/message_loop/message_loop.h"
14 // Represents a send that is not yet fulfilled.
15 class DataSender::PendingSend
{
17 PendingSend(const base::StringPiece
& data
,
18 const DataSentCallback
& callback
,
19 const SendErrorCallback
& error_callback
,
20 int32_t fatal_error_value
);
22 // Invoked to report that |num_bytes| of data have been sent. Subtracts the
23 // number of bytes that were part of this send from |num_bytes|. Returns
24 // whether this send has been completed. If this send has been completed, this
26 bool ReportBytesSent(uint32_t* num_bytes
);
28 // Invoked to report that |num_bytes| of data have been sent and then an
29 // error, |error| was encountered. Subtracts the number of bytes that were
30 // part of this send from |num_bytes|. If this send was not completed before
31 // the error, this calls |error_callback_| to report the error. Otherwise,
32 // this calls |callback_|. Returns the number of bytes sent but not acked.
33 uint32_t ReportBytesSentAndError(uint32_t* num_bytes
, int32_t error
);
35 // Reports |fatal_error_value_| to |receive_error_callback_|.
36 void DispatchFatalError();
38 // Attempts to send any data not yet sent to |sink|.
39 bool SendData(serial::DataSink
* sink
, uint32_t* available_buffer_size
);
42 // Invoked to update |bytes_acked_| and |num_bytes|.
43 void ReportBytesSentInternal(uint32_t* num_bytes
);
46 const base::StringPiece data_
;
48 // The callback to report success.
49 const DataSentCallback callback_
;
51 // The callback to report errors.
52 const SendErrorCallback error_callback_
;
54 // The error value to report when DispatchFatalError() is called.
55 const int32_t fatal_error_value_
;
57 // The number of bytes sent to the DataSink.
60 // The number of bytes acked.
61 uint32_t bytes_acked_
;
64 DataSender::DataSender(mojo::InterfacePtr
<serial::DataSink
> sink
,
66 int32_t fatal_error_value
)
68 fatal_error_value_(fatal_error_value
),
69 available_buffer_capacity_(buffer_size
),
71 sink_
.set_error_handler(this);
72 sink_
.set_client(this);
73 sink_
->Init(buffer_size
);
76 DataSender::~DataSender() {
80 bool DataSender::Send(const base::StringPiece
& data
,
81 const DataSentCallback
& callback
,
82 const SendErrorCallback
& error_callback
) {
83 DCHECK(!callback
.is_null() && !error_callback
.is_null());
84 if (!pending_cancel_
.is_null() || shut_down_
)
87 pending_sends_
.push(linked_ptr
<PendingSend
>(
88 new PendingSend(data
, callback
, error_callback
, fatal_error_value_
)));
93 bool DataSender::Cancel(int32_t error
, const CancelCallback
& callback
) {
94 DCHECK(!callback
.is_null());
95 if (!pending_cancel_
.is_null() || shut_down_
)
97 if (pending_sends_
.empty() && sends_awaiting_ack_
.empty()) {
98 base::MessageLoop::current()->PostTask(FROM_HERE
, callback
);
102 pending_cancel_
= callback
;
103 sink_
->Cancel(error
);
107 void DataSender::ReportBytesSent(uint32_t bytes_sent
) {
111 available_buffer_capacity_
+= bytes_sent
;
112 while (bytes_sent
!= 0 && !sends_awaiting_ack_
.empty() &&
113 sends_awaiting_ack_
.front()->ReportBytesSent(&bytes_sent
)) {
114 sends_awaiting_ack_
.pop();
116 if (bytes_sent
> 0 && !pending_sends_
.empty()) {
117 bool finished
= pending_sends_
.front()->ReportBytesSent(&bytes_sent
);
124 if (bytes_sent
!= 0) {
128 if (pending_sends_
.empty() && sends_awaiting_ack_
.empty())
133 void DataSender::ReportBytesSentAndError(
136 const mojo::Callback
<void()>& callback
) {
140 available_buffer_capacity_
+= bytes_sent
;
141 while (!sends_awaiting_ack_
.empty()) {
142 available_buffer_capacity_
+=
143 sends_awaiting_ack_
.front()->ReportBytesSentAndError(&bytes_sent
,
145 sends_awaiting_ack_
.pop();
147 while (!pending_sends_
.empty()) {
148 available_buffer_capacity_
+=
149 pending_sends_
.front()->ReportBytesSentAndError(&bytes_sent
, error
);
150 pending_sends_
.pop();
156 void DataSender::OnConnectionError() {
160 void DataSender::SendInternal() {
161 while (!pending_sends_
.empty() && available_buffer_capacity_
) {
162 if (pending_sends_
.front()->SendData(sink_
.get(),
163 &available_buffer_capacity_
)) {
164 sends_awaiting_ack_
.push(pending_sends_
.front());
165 pending_sends_
.pop();
170 void DataSender::RunCancelCallback() {
171 DCHECK(pending_sends_
.empty() && sends_awaiting_ack_
.empty());
172 if (pending_cancel_
.is_null())
175 base::MessageLoop::current()->PostTask(FROM_HERE
,
176 base::Bind(pending_cancel_
));
177 pending_cancel_
.Reset();
180 void DataSender::ShutDown() {
182 while (!pending_sends_
.empty()) {
183 pending_sends_
.front()->DispatchFatalError();
184 pending_sends_
.pop();
186 while (!sends_awaiting_ack_
.empty()) {
187 sends_awaiting_ack_
.front()->DispatchFatalError();
188 sends_awaiting_ack_
.pop();
193 DataSender::PendingSend::PendingSend(const base::StringPiece
& data
,
194 const DataSentCallback
& callback
,
195 const SendErrorCallback
& error_callback
,
196 int32_t fatal_error_value
)
199 error_callback_(error_callback
),
200 fatal_error_value_(fatal_error_value
),
205 bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes
) {
206 ReportBytesSentInternal(num_bytes
);
207 if (bytes_acked_
< data_
.size())
210 base::MessageLoop::current()->PostTask(FROM_HERE
,
211 base::Bind(callback_
, bytes_acked_
));
215 uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes
,
217 ReportBytesSentInternal(num_bytes
);
218 if (*num_bytes
> 0) {
219 base::MessageLoop::current()->PostTask(FROM_HERE
,
220 base::Bind(callback_
, bytes_acked_
));
223 base::MessageLoop::current()->PostTask(
224 FROM_HERE
, base::Bind(error_callback_
, bytes_acked_
, error
));
225 return bytes_sent_
- bytes_acked_
;
228 void DataSender::PendingSend::DispatchFatalError() {
229 base::MessageLoop::current()->PostTask(
230 FROM_HERE
, base::Bind(error_callback_
, 0, fatal_error_value_
));
233 bool DataSender::PendingSend::SendData(serial::DataSink
* sink
,
234 uint32_t* available_buffer_size
) {
235 uint32_t num_bytes_to_send
=
236 std::min(static_cast<uint32_t>(data_
.size() - bytes_sent_
),
237 *available_buffer_size
);
238 mojo::Array
<uint8_t> bytes(num_bytes_to_send
);
239 memcpy(&bytes
[0], data_
.data() + bytes_sent_
, num_bytes_to_send
);
240 bytes_sent_
+= num_bytes_to_send
;
241 *available_buffer_size
-= num_bytes_to_send
;
242 sink
->OnData(bytes
.Pass());
243 return bytes_sent_
== data_
.size();
246 void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes
) {
247 bytes_acked_
+= *num_bytes
;
248 if (bytes_acked_
> bytes_sent_
) {
249 *num_bytes
= bytes_acked_
- bytes_sent_
;
250 bytes_acked_
= bytes_sent_
;
256 } // namespace device