1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_sink_receiver.h"
10 #include "base/message_loop/message_loop.h"
14 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
16 class DataSinkReceiver::Buffer
: public ReadOnlyBuffer
{
18 Buffer(scoped_refptr
<DataSinkReceiver
> receiver
,
20 uint32_t buffer_size
);
23 void Cancel(int32_t error
);
25 // ReadOnlyBuffer overrides.
26 const char* GetData() override
;
27 uint32_t GetSize() override
;
28 void Done(uint32_t bytes_read
) override
;
29 void DoneWithError(uint32_t bytes_read
, int32_t error
) override
;
32 // The DataSinkReceiver of whose buffer we are providing a view.
33 scoped_refptr
<DataSinkReceiver
> receiver_
;
36 uint32_t buffer_size_
;
38 // Whether this receive has been cancelled.
41 // If |cancelled_|, contains the cancellation error to report.
42 int32_t cancellation_error_
;
45 // A frame of data received from the client.
46 class DataSinkReceiver::DataFrame
{
48 explicit DataFrame(mojo::Array
<uint8_t> data
,
49 const mojo::Callback
<void(uint32_t, int32_t)>& callback
);
51 // Returns the number of unconsumed bytes remaining of this data frame.
52 uint32_t GetRemainingBytes();
54 // Returns a pointer to the remaining data to be consumed.
55 const char* GetData();
57 // Reports that |bytes_read| bytes have been consumed.
58 void OnDataConsumed(uint32_t bytes_read
);
60 // Reports that an error occurred.
61 void ReportError(uint32_t bytes_read
, int32_t error
);
64 mojo::Array
<uint8_t> data_
;
66 const mojo::Callback
<void(uint32_t, int32_t)> callback_
;
69 DataSinkReceiver::DataSinkReceiver(
70 mojo::InterfaceRequest
<serial::DataSink
> request
,
71 const ReadyCallback
& ready_callback
,
72 const CancelCallback
& cancel_callback
,
73 const ErrorCallback
& error_callback
)
74 : binding_(this, request
.Pass()),
75 ready_callback_(ready_callback
),
76 cancel_callback_(cancel_callback
),
77 error_callback_(error_callback
),
82 binding_
.set_error_handler(this);
85 void DataSinkReceiver::ShutDown() {
89 DataSinkReceiver::~DataSinkReceiver() {
92 void DataSinkReceiver::Cancel(int32_t error
) {
93 // If we have sent a ReportBytesSentAndError but have not received the
94 // response, that ReportBytesSentAndError message will appear to the
95 // DataSinkClient to be caused by this Cancel message. In that case, we ignore
100 // If there is a buffer is in use, mark the buffer as cancelled and notify the
101 // client by calling |cancel_callback_|. The sink implementation may or may
102 // not take the cancellation into account when deciding what error (if any) to
103 // return. If the sink returns an error, we ignore the cancellation error.
104 // Otherwise, if the sink does not report an error, we override that with the
105 // cancellation error. Once a cancellation has been received, the next report
106 // sent to the client will always contain an error; the error returned by the
107 // sink or the cancellation error if the sink does not return an error.
108 if (buffer_in_use_
) {
109 buffer_in_use_
->Cancel(error
);
110 if (!cancel_callback_
.is_null())
111 cancel_callback_
.Run(error
);
114 ReportError(0, error
);
117 void DataSinkReceiver::OnData(
118 mojo::Array
<uint8_t> data
,
119 const mojo::Callback
<void(uint32_t, int32_t)>& callback
) {
120 if (current_error_
) {
121 callback
.Run(0, current_error_
);
124 pending_data_buffers_
.push(
125 linked_ptr
<DataFrame
>(new DataFrame(data
.Pass(), callback
)));
130 void DataSinkReceiver::OnConnectionError() {
131 DispatchFatalError();
134 void DataSinkReceiver::RunReadyCallback() {
135 DCHECK(!shut_down_
&& !current_error_
);
136 // If data arrives while a call to RunReadyCallback() is posted, we can be
137 // called with buffer_in_use_ already set.
142 pending_data_buffers_
.front()->GetData(),
143 pending_data_buffers_
.front()->GetRemainingBytes());
144 ready_callback_
.Run(scoped_ptr
<ReadOnlyBuffer
>(buffer_in_use_
));
147 void DataSinkReceiver::Done(uint32_t bytes_read
) {
148 if (!DoneInternal(bytes_read
))
150 pending_data_buffers_
.front()->OnDataConsumed(bytes_read
);
151 if (pending_data_buffers_
.front()->GetRemainingBytes() == 0)
152 pending_data_buffers_
.pop();
153 if (!pending_data_buffers_
.empty()) {
154 base::MessageLoop::current()->PostTask(
156 base::Bind(&DataSinkReceiver::RunReadyCallback
,
157 weak_factory_
.GetWeakPtr()));
161 void DataSinkReceiver::DoneWithError(uint32_t bytes_read
, int32_t error
) {
162 if (!DoneInternal(bytes_read
))
164 ReportError(bytes_read
, error
);
167 bool DataSinkReceiver::DoneInternal(uint32_t bytes_read
) {
171 DCHECK(buffer_in_use_
);
172 buffer_in_use_
= NULL
;
176 void DataSinkReceiver::ReportError(uint32_t bytes_read
, int32_t error
) {
177 // When we encounter an error, we must discard the data from any send buffers
178 // transmitted by the DataSink client before it receives this error.
180 current_error_
= error
;
181 while (!pending_data_buffers_
.empty()) {
182 pending_data_buffers_
.front()->ReportError(bytes_read
, error
);
183 pending_data_buffers_
.pop();
188 void DataSinkReceiver::ClearError() {
192 void DataSinkReceiver::DispatchFatalError() {
197 if (!error_callback_
.is_null())
198 error_callback_
.Run();
201 DataSinkReceiver::Buffer::Buffer(scoped_refptr
<DataSinkReceiver
> receiver
,
203 uint32_t buffer_size
)
204 : receiver_(receiver
),
206 buffer_size_(buffer_size
),
208 cancellation_error_(0) {
211 DataSinkReceiver::Buffer::~Buffer() {
212 if (!receiver_
.get())
215 receiver_
->DoneWithError(0, cancellation_error_
);
220 void DataSinkReceiver::Buffer::Cancel(int32_t error
) {
222 cancellation_error_
= error
;
225 const char* DataSinkReceiver::Buffer::GetData() {
229 uint32_t DataSinkReceiver::Buffer::GetSize() {
233 void DataSinkReceiver::Buffer::Done(uint32_t bytes_read
) {
234 scoped_refptr
<DataSinkReceiver
> receiver
= receiver_
;
237 receiver
->DoneWithError(bytes_read
, cancellation_error_
);
239 receiver
->Done(bytes_read
);
244 void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read
,
246 scoped_refptr
<DataSinkReceiver
> receiver
= receiver_
;
248 receiver
->DoneWithError(bytes_read
, error
);
253 DataSinkReceiver::DataFrame::DataFrame(
254 mojo::Array
<uint8_t> data
,
255 const mojo::Callback
<void(uint32_t, int32_t)>& callback
)
256 : data_(data
.Pass()), offset_(0), callback_(callback
) {
257 DCHECK_LT(0u, data_
.size());
260 // Returns the number of uncomsumed bytes remaining of this data frame.
261 uint32_t DataSinkReceiver::DataFrame::GetRemainingBytes() {
262 return static_cast<uint32_t>(data_
.size() - offset_
);
265 // Returns a pointer to the remaining data to be consumed.
266 const char* DataSinkReceiver::DataFrame::GetData() {
267 DCHECK_LT(offset_
, data_
.size());
268 return reinterpret_cast<const char*>(&data_
[0]) + offset_
;
271 void DataSinkReceiver::DataFrame::OnDataConsumed(uint32_t bytes_read
) {
272 offset_
+= bytes_read
;
273 DCHECK_LE(offset_
, data_
.size());
274 if (offset_
== data_
.size())
275 callback_
.Run(offset_
, 0);
277 void DataSinkReceiver::DataFrame::ReportError(uint32_t bytes_read
,
279 offset_
+= bytes_read
;
280 DCHECK_LE(offset_
, data_
.size());
281 callback_
.Run(offset_
, error
);
284 } // namespace device