1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_receiver.h"
10 #include "base/message_loop/message_loop.h"
14 // Represents a receive that is not yet fulfilled.
15 class DataReceiver::PendingReceive
{
17 PendingReceive(DataReceiver
* receiver
,
18 const ReceiveDataCallback
& callback
,
19 const ReceiveErrorCallback
& error_callback
,
20 int32_t fatal_error_value
);
22 // Dispatches |data| to |receive_callback_|. Returns whether this
23 // PendingReceive is finished by this call.
24 bool DispatchDataFrame(DataReceiver::DataFrame
* data
);
26 // Reports |fatal_error_value_| to |receive_error_callback_|.
27 void DispatchFatalError();
29 bool buffer_in_use() { return buffer_in_use_
; }
34 // Invoked when the user is finished with the ReadOnlyBuffer provided to
35 // |receive_callback_|.
36 void Done(uint32_t num_bytes
);
38 // The DataReceiver that owns this.
39 DataReceiver
* receiver_
;
41 // The callback to dispatch data.
42 ReceiveDataCallback receive_callback_
;
44 // The callback to report errors.
45 ReceiveErrorCallback receive_error_callback_
;
47 // The error value to report when DispatchFatalError() is called.
48 const int32_t fatal_error_value_
;
50 // True if the user owns a buffer passed to |receive_callback_| as part of
51 // DispatchDataFrame().
55 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
57 class DataReceiver::PendingReceive::Buffer
: public ReadOnlyBuffer
{
59 Buffer(scoped_refptr
<DataReceiver
> pipe
,
60 PendingReceive
* receive
,
62 uint32_t buffer_size
);
65 // ReadOnlyBuffer overrides.
66 const char* GetData() override
;
67 uint32_t GetSize() override
;
68 void Done(uint32_t bytes_consumed
) override
;
69 void DoneWithError(uint32_t bytes_consumed
, int32_t error
) override
;
72 // The DataReceiver of whose buffer we are providing a view.
73 scoped_refptr
<DataReceiver
> receiver_
;
75 // The PendingReceive to which this buffer has been created in response.
76 PendingReceive
* pending_receive_
;
79 uint32_t buffer_size_
;
82 // A buffer of data or an error received from the DataSource.
83 struct DataReceiver::DataFrame
{
84 explicit DataFrame(mojo::Array
<uint8_t> data
)
91 explicit DataFrame(int32_t error
)
92 : is_error(true), offset(0), error(error
), dispatched(false) {}
94 // Whether this DataFrame represents an error.
97 // The data received from the DataSource.
98 mojo::Array
<uint8_t> data
;
100 // The offset within |data| at which the next read should begin.
103 // The value of the error that occurred.
106 // Whether the error has been dispatched to the user.
110 DataReceiver::DataReceiver(
111 mojo::InterfacePtr
<serial::DataSource
> source
,
112 mojo::InterfaceRequest
<serial::DataSourceClient
> client
,
113 uint32_t buffer_size
,
114 int32_t fatal_error_value
)
115 : source_(source
.Pass()),
116 client_(this, client
.Pass()),
117 fatal_error_value_(fatal_error_value
),
119 weak_factory_(this) {
120 source_
.set_connection_error_handler(
121 base::Bind(&DataReceiver::OnConnectionError
, base::Unretained(this)));
122 source_
->Init(buffer_size
);
123 client_
.set_connection_error_handler(
124 base::Bind(&DataReceiver::OnConnectionError
, base::Unretained(this)));
127 bool DataReceiver::Receive(const ReceiveDataCallback
& callback
,
128 const ReceiveErrorCallback
& error_callback
) {
129 DCHECK(!callback
.is_null() && !error_callback
.is_null());
130 if (pending_receive_
|| shut_down_
)
132 // When the DataSource encounters an error, it pauses transmission. When the
133 // user starts a new receive following notification of the error (via
134 // |error_callback| of the previous Receive call) of the error we can tell the
135 // DataSource to resume transmission of data.
136 if (!pending_data_frames_
.empty() && pending_data_frames_
.front()->is_error
&&
137 pending_data_frames_
.front()->dispatched
) {
139 pending_data_frames_
.pop();
142 pending_receive_
.reset(
143 new PendingReceive(this, callback
, error_callback
, fatal_error_value_
));
148 DataReceiver::~DataReceiver() {
152 void DataReceiver::OnError(int32_t error
) {
156 pending_data_frames_
.push(linked_ptr
<DataFrame
>(new DataFrame(error
)));
157 if (pending_receive_
)
161 void DataReceiver::OnData(mojo::Array
<uint8_t> data
) {
162 pending_data_frames_
.push(linked_ptr
<DataFrame
>(new DataFrame(data
.Pass())));
163 if (pending_receive_
)
167 void DataReceiver::OnConnectionError() {
171 void DataReceiver::Done(uint32_t bytes_consumed
) {
175 DCHECK(pending_receive_
);
176 DataFrame
& pending_data
= *pending_data_frames_
.front();
177 pending_data
.offset
+= bytes_consumed
;
178 DCHECK_LE(pending_data
.offset
, pending_data
.data
.size());
179 if (pending_data
.offset
== pending_data
.data
.size()) {
180 source_
->ReportBytesReceived(
181 static_cast<uint32_t>(pending_data
.data
.size()));
182 pending_data_frames_
.pop();
184 pending_receive_
.reset();
187 void DataReceiver::ReceiveInternal() {
190 DCHECK(pending_receive_
);
191 if (pending_receive_
->buffer_in_use())
194 if (!pending_data_frames_
.empty() &&
195 pending_receive_
->DispatchDataFrame(pending_data_frames_
.front().get())) {
196 pending_receive_
.reset();
200 void DataReceiver::ShutDown() {
202 if (pending_receive_
)
203 pending_receive_
->DispatchFatalError();
206 DataReceiver::PendingReceive::PendingReceive(
207 DataReceiver
* receiver
,
208 const ReceiveDataCallback
& callback
,
209 const ReceiveErrorCallback
& error_callback
,
210 int32_t fatal_error_value
)
211 : receiver_(receiver
),
212 receive_callback_(callback
),
213 receive_error_callback_(error_callback
),
214 fatal_error_value_(fatal_error_value
),
215 buffer_in_use_(false) {
218 bool DataReceiver::PendingReceive::DispatchDataFrame(
219 DataReceiver::DataFrame
* data
) {
220 DCHECK(!buffer_in_use_
);
221 DCHECK(!data
->dispatched
);
223 if (data
->is_error
) {
224 data
->dispatched
= true;
225 base::MessageLoop::current()->PostTask(
226 FROM_HERE
, base::Bind(receive_error_callback_
, data
->error
));
229 buffer_in_use_
= true;
230 base::MessageLoop::current()->PostTask(
234 base::Passed(scoped_ptr
<ReadOnlyBuffer
>(new Buffer(
237 reinterpret_cast<char*>(&data
->data
[0]) + data
->offset
,
238 static_cast<uint32_t>(data
->data
.size() - data
->offset
))))));
242 void DataReceiver::PendingReceive::DispatchFatalError() {
243 receive_error_callback_
.Run(fatal_error_value_
);
246 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed
) {
247 DCHECK(buffer_in_use_
);
248 buffer_in_use_
= false;
249 receiver_
->Done(bytes_consumed
);
252 DataReceiver::PendingReceive::Buffer::Buffer(
253 scoped_refptr
<DataReceiver
> receiver
,
254 PendingReceive
* receive
,
256 uint32_t buffer_size
)
257 : receiver_(receiver
),
258 pending_receive_(receive
),
260 buffer_size_(buffer_size
) {
263 DataReceiver::PendingReceive::Buffer::~Buffer() {
264 if (pending_receive_
)
265 pending_receive_
->Done(0);
268 const char* DataReceiver::PendingReceive::Buffer::GetData() {
272 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
276 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed
) {
277 pending_receive_
->Done(bytes_consumed
);
278 pending_receive_
= NULL
;
284 void DataReceiver::PendingReceive::Buffer::DoneWithError(
285 uint32_t bytes_consumed
,
287 Done(bytes_consumed
);
290 } // namespace device