1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_receiver.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
15 // Represents a receive that is not yet fulfilled.
16 class DataReceiver::PendingReceive
{
18 PendingReceive(DataReceiver
* receiver
,
19 const ReceiveDataCallback
& callback
,
20 const ReceiveErrorCallback
& error_callback
,
21 int32_t fatal_error_value
);
23 // Dispatches |data| to |receive_callback_|.
24 void DispatchData(const void* data
, uint32_t num_bytes
);
26 // Reports |error| to |receive_error_callback_| if it is an appropriate time.
27 // Returns whether it dispatched |error|.
28 bool DispatchError(DataReceiver::PendingError
* error
,
29 uint32_t bytes_received
);
31 // Reports |fatal_error_value_| to |receive_error_callback_|.
32 void DispatchFatalError();
37 // Invoked when the user is finished with the ReadOnlyBuffer provided to
38 // |receive_callback_|.
39 void Done(uint32_t num_bytes
);
41 // The DataReceiver that owns this.
42 DataReceiver
* receiver_
;
44 // The callback to dispatch data.
45 ReceiveDataCallback receive_callback_
;
47 // The callback to report errors.
48 ReceiveErrorCallback receive_error_callback_
;
50 // The error value to report when DispatchFatalError() is called.
51 const int32_t fatal_error_value_
;
53 // True if the user owns a buffer passed to |receive_callback_| as part of
58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
60 class DataReceiver::PendingReceive::Buffer
: public ReadOnlyBuffer
{
62 Buffer(scoped_refptr
<DataReceiver
> pipe
,
63 PendingReceive
* receive
,
65 uint32_t buffer_size
);
68 // ReadOnlyBuffer overrides.
69 const char* GetData() override
;
70 uint32_t GetSize() override
;
71 void Done(uint32_t bytes_consumed
) override
;
72 void DoneWithError(uint32_t bytes_consumed
, int32_t error
) override
;
75 // The DataReceiver whose data pipe we are providing a view.
76 scoped_refptr
<DataReceiver
> receiver_
;
78 // The PendingReceive to which this buffer has been created in response.
79 PendingReceive
* pending_receive_
;
82 uint32_t buffer_size_
;
85 // Represents an error received from the DataSource.
86 struct DataReceiver::PendingError
{
87 PendingError(uint32_t offset
, int32_t error
)
88 : offset(offset
), error(error
), dispatched(false) {}
90 // The location within the data stream where the error occurred.
91 const uint32_t offset
;
93 // The value of the error that occurred.
96 // Whether the error has been dispatched to the user.
100 DataReceiver::DataReceiver(mojo::InterfacePtr
<serial::DataSource
> source
,
101 uint32_t buffer_size
,
102 int32_t fatal_error_value
)
103 : source_(source
.Pass()),
104 fatal_error_value_(fatal_error_value
),
107 weak_factory_(this) {
108 MojoCreateDataPipeOptions options
= {
109 sizeof(options
), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE
, 1, buffer_size
,
111 mojo::ScopedDataPipeProducerHandle remote_handle
;
112 MojoResult result
= mojo::CreateDataPipe(&options
, &remote_handle
, &handle_
);
113 DCHECK_EQ(MOJO_RESULT_OK
, result
);
114 source_
->Init(remote_handle
.Pass());
115 source_
.set_client(this);
118 bool DataReceiver::Receive(const ReceiveDataCallback
& callback
,
119 const ReceiveErrorCallback
& error_callback
) {
120 DCHECK(!callback
.is_null() && !error_callback
.is_null());
121 if (pending_receive_
|| shut_down_
)
123 // When the DataSource encounters an error, it pauses transmission. When the
124 // user starts a new receive following notification of the error (via
125 // |error_callback| of the previous Receive call) of the error we can tell the
126 // DataSource to resume transmission of data.
127 if (pending_error_
&& pending_error_
->dispatched
) {
129 pending_error_
.reset();
132 pending_receive_
.reset(
133 new PendingReceive(this, callback
, error_callback
, fatal_error_value_
));
134 base::MessageLoop::current()->PostTask(
136 base::Bind(&DataReceiver::ReceiveInternal
, weak_factory_
.GetWeakPtr()));
140 DataReceiver::~DataReceiver() {
144 void DataReceiver::OnError(uint32_t offset
, int32_t error
) {
148 if (pending_error_
) {
149 // When OnError is called by the DataSource, transmission of data is
150 // suspended. Thus we shouldn't receive another call to OnError until we
151 // have fully dealt with the error and called Resume to resume transmission
152 // (see Receive()). Under normal operation we should never get here, but if
153 // we do (e.g. in the case of a hijacked service process) just shut down.
157 pending_error_
.reset(new PendingError(offset
, error
));
158 if (pending_receive_
&&
159 pending_receive_
->DispatchError(pending_error_
.get(), bytes_received_
)) {
160 pending_receive_
.reset();
165 void DataReceiver::OnConnectionError() {
169 void DataReceiver::Done(uint32_t bytes_consumed
) {
173 DCHECK(pending_receive_
);
174 MojoResult result
= mojo::EndReadDataRaw(handle_
.get(), bytes_consumed
);
175 DCHECK_EQ(MOJO_RESULT_OK
, result
);
176 pending_receive_
.reset();
177 bytes_received_
+= bytes_consumed
;
180 void DataReceiver::OnDoneWaiting(MojoResult result
) {
181 DCHECK(pending_receive_
&& !shut_down_
&& waiter_
);
183 if (result
!= MOJO_RESULT_OK
) {
190 void DataReceiver::ReceiveInternal() {
193 DCHECK(pending_receive_
);
194 if (pending_error_
&&
195 pending_receive_
->DispatchError(pending_error_
.get(), bytes_received_
)) {
196 pending_receive_
.reset();
202 uint32_t num_bytes
= std::numeric_limits
<uint32_t>::max();
203 MojoResult result
= mojo::BeginReadDataRaw(
204 handle_
.get(), &data
, &num_bytes
, MOJO_READ_DATA_FLAG_NONE
);
205 if (result
== MOJO_RESULT_OK
) {
206 if (!CheckErrorNotInReadRange(num_bytes
)) {
211 pending_receive_
->DispatchData(data
, num_bytes
);
214 if (result
== MOJO_RESULT_SHOULD_WAIT
) {
215 waiter_
.reset(new AsyncWaiter(
217 MOJO_HANDLE_SIGNAL_READABLE
,
218 base::Bind(&DataReceiver::OnDoneWaiting
, weak_factory_
.GetWeakPtr())));
224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes
) {
225 DCHECK(pending_receive_
);
229 DCHECK_NE(bytes_received_
, pending_error_
->offset
);
230 DCHECK_NE(num_bytes
, 0u);
231 uint32_t potential_bytes_received
= bytes_received_
+ num_bytes
;
232 // bytes_received_ can overflow so we must consider two cases:
233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
234 // equal number of times. In this case, |potential_bytes_received| must
235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below
236 // this range can only occur if |bytes_received_| overflows before
237 // |pending_error_->offset|. Above can only occur if |bytes_received_|
238 // overtakes |pending_error_->offset|.
239 // 2. |pending_error_->offset| has overflowed once more than
240 // |bytes_received_|. In this case, |potential_bytes_received| must not
241 // be in the range (|pending_error_->offset|, |bytes_received_|].
242 if ((bytes_received_
< pending_error_
->offset
&&
243 (potential_bytes_received
> pending_error_
->offset
||
244 potential_bytes_received
<= bytes_received_
)) ||
245 (bytes_received_
> pending_error_
->offset
&&
246 potential_bytes_received
> pending_error_
->offset
&&
247 potential_bytes_received
<= bytes_received_
)) {
253 void DataReceiver::ShutDown() {
255 if (pending_receive_
)
256 pending_receive_
->DispatchFatalError();
257 pending_error_
.reset();
261 DataReceiver::PendingReceive::PendingReceive(
262 DataReceiver
* receiver
,
263 const ReceiveDataCallback
& callback
,
264 const ReceiveErrorCallback
& error_callback
,
265 int32_t fatal_error_value
)
266 : receiver_(receiver
),
267 receive_callback_(callback
),
268 receive_error_callback_(error_callback
),
269 fatal_error_value_(fatal_error_value
),
270 buffer_in_use_(false) {
273 void DataReceiver::PendingReceive::DispatchData(const void* data
,
274 uint32_t num_bytes
) {
275 DCHECK(!buffer_in_use_
);
276 buffer_in_use_
= true;
277 receive_callback_
.Run(scoped_ptr
<ReadOnlyBuffer
>(
278 new Buffer(receiver_
, this, static_cast<const char*>(data
), num_bytes
)));
281 bool DataReceiver::PendingReceive::DispatchError(PendingError
* error
,
282 uint32_t bytes_received
) {
283 DCHECK(!error
->dispatched
);
284 if (buffer_in_use_
|| bytes_received
!= error
->offset
)
287 error
->dispatched
= true;
288 receive_error_callback_
.Run(error
->error
);
292 void DataReceiver::PendingReceive::DispatchFatalError() {
293 receive_error_callback_
.Run(fatal_error_value_
);
296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed
) {
297 DCHECK(buffer_in_use_
);
298 buffer_in_use_
= false;
299 receiver_
->Done(bytes_consumed
);
302 DataReceiver::PendingReceive::Buffer::Buffer(
303 scoped_refptr
<DataReceiver
> receiver
,
304 PendingReceive
* receive
,
306 uint32_t buffer_size
)
307 : receiver_(receiver
),
308 pending_receive_(receive
),
310 buffer_size_(buffer_size
) {
313 DataReceiver::PendingReceive::Buffer::~Buffer() {
314 if (pending_receive_
)
315 pending_receive_
->Done(0);
318 const char* DataReceiver::PendingReceive::Buffer::GetData() {
322 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
326 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed
) {
327 pending_receive_
->Done(bytes_consumed
);
328 pending_receive_
= NULL
;
334 void DataReceiver::PendingReceive::Buffer::DoneWithError(
335 uint32_t bytes_consumed
,
337 Done(bytes_consumed
);
340 } // namespace device