Fix "#if defined(DEBUG)" statements
[chromium-blink-merge.git] / device / serial / data_receiver.cc
blobd7a47fa1bc8d640792bf5071e05d94e3b62082fa
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_receiver.h"
7 #include <limits>
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
11 #include "device/serial/async_waiter.h"
13 namespace device {
15 // Represents a receive that is not yet fulfilled.
16 class DataReceiver::PendingReceive {
17 public:
18 PendingReceive(DataReceiver* receiver,
19 const ReceiveDataCallback& callback,
20 const ReceiveErrorCallback& error_callback,
21 int32_t fatal_error_value);
23 // Dispatches |data| to |receive_callback_|.
24 void DispatchData(const void* data, uint32_t num_bytes);
26 // Reports |error| to |receive_error_callback_| if it is an appropriate time.
27 // Returns whether it dispatched |error|.
28 bool DispatchError(DataReceiver::PendingError* error,
29 uint32_t bytes_received);
31 // Reports |fatal_error_value_| to |receive_error_callback_|.
32 void DispatchFatalError();
34 private:
35 class Buffer;
37 // Invoked when the user is finished with the ReadOnlyBuffer provided to
38 // |receive_callback_|.
39 void Done(uint32_t num_bytes);
41 // The DataReceiver that owns this.
42 DataReceiver* receiver_;
44 // The callback to dispatch data.
45 ReceiveDataCallback receive_callback_;
47 // The callback to report errors.
48 ReceiveErrorCallback receive_error_callback_;
50 // The error value to report when DispatchFatalError() is called.
51 const int32_t fatal_error_value_;
53 // True if the user owns a buffer passed to |receive_callback_| as part of
54 // DispatchData().
55 bool buffer_in_use_;
58 // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by
59 // a DataReceiver.
60 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
61 public:
62 Buffer(scoped_refptr<DataReceiver> pipe,
63 PendingReceive* receive,
64 const char* buffer,
65 uint32_t buffer_size);
66 ~Buffer() override;
68 // ReadOnlyBuffer overrides.
69 const char* GetData() override;
70 uint32_t GetSize() override;
71 void Done(uint32_t bytes_consumed) override;
72 void DoneWithError(uint32_t bytes_consumed, int32_t error) override;
74 private:
75 // The DataReceiver whose data pipe we are providing a view.
76 scoped_refptr<DataReceiver> receiver_;
78 // The PendingReceive to which this buffer has been created in response.
79 PendingReceive* pending_receive_;
81 const char* buffer_;
82 uint32_t buffer_size_;
85 // Represents an error received from the DataSource.
86 struct DataReceiver::PendingError {
87 PendingError(uint32_t offset, int32_t error)
88 : offset(offset), error(error), dispatched(false) {}
90 // The location within the data stream where the error occurred.
91 const uint32_t offset;
93 // The value of the error that occurred.
94 const int32_t error;
96 // Whether the error has been dispatched to the user.
97 bool dispatched;
100 DataReceiver::DataReceiver(mojo::InterfacePtr<serial::DataSource> source,
101 uint32_t buffer_size,
102 int32_t fatal_error_value)
103 : source_(source.Pass()),
104 fatal_error_value_(fatal_error_value),
105 bytes_received_(0),
106 shut_down_(false),
107 weak_factory_(this) {
108 MojoCreateDataPipeOptions options = {
109 sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
111 mojo::ScopedDataPipeProducerHandle remote_handle;
112 MojoResult result = mojo::CreateDataPipe(&options, &remote_handle, &handle_);
113 DCHECK_EQ(MOJO_RESULT_OK, result);
114 source_->Init(remote_handle.Pass());
115 source_.set_client(this);
118 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
119 const ReceiveErrorCallback& error_callback) {
120 DCHECK(!callback.is_null() && !error_callback.is_null());
121 if (pending_receive_ || shut_down_)
122 return false;
123 // When the DataSource encounters an error, it pauses transmission. When the
124 // user starts a new receive following notification of the error (via
125 // |error_callback| of the previous Receive call) of the error we can tell the
126 // DataSource to resume transmission of data.
127 if (pending_error_ && pending_error_->dispatched) {
128 source_->Resume();
129 pending_error_.reset();
132 pending_receive_.reset(
133 new PendingReceive(this, callback, error_callback, fatal_error_value_));
134 base::MessageLoop::current()->PostTask(
135 FROM_HERE,
136 base::Bind(&DataReceiver::ReceiveInternal, weak_factory_.GetWeakPtr()));
137 return true;
140 DataReceiver::~DataReceiver() {
141 ShutDown();
144 void DataReceiver::OnError(uint32_t offset, int32_t error) {
145 if (shut_down_)
146 return;
148 if (pending_error_) {
149 // When OnError is called by the DataSource, transmission of data is
150 // suspended. Thus we shouldn't receive another call to OnError until we
151 // have fully dealt with the error and called Resume to resume transmission
152 // (see Receive()). Under normal operation we should never get here, but if
153 // we do (e.g. in the case of a hijacked service process) just shut down.
154 ShutDown();
155 return;
157 pending_error_.reset(new PendingError(offset, error));
158 if (pending_receive_ &&
159 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
160 pending_receive_.reset();
161 waiter_.reset();
165 void DataReceiver::OnConnectionError() {
166 ShutDown();
169 void DataReceiver::Done(uint32_t bytes_consumed) {
170 if (shut_down_)
171 return;
173 DCHECK(pending_receive_);
174 MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_consumed);
175 DCHECK_EQ(MOJO_RESULT_OK, result);
176 pending_receive_.reset();
177 bytes_received_ += bytes_consumed;
180 void DataReceiver::OnDoneWaiting(MojoResult result) {
181 DCHECK(pending_receive_ && !shut_down_ && waiter_);
182 waiter_.reset();
183 if (result != MOJO_RESULT_OK) {
184 ShutDown();
185 return;
187 ReceiveInternal();
190 void DataReceiver::ReceiveInternal() {
191 if (shut_down_)
192 return;
193 DCHECK(pending_receive_);
194 if (pending_error_ &&
195 pending_receive_->DispatchError(pending_error_.get(), bytes_received_)) {
196 pending_receive_.reset();
197 waiter_.reset();
198 return;
201 const void* data;
202 uint32_t num_bytes = std::numeric_limits<uint32_t>::max();
203 MojoResult result = mojo::BeginReadDataRaw(
204 handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE);
205 if (result == MOJO_RESULT_OK) {
206 if (!CheckErrorNotInReadRange(num_bytes)) {
207 ShutDown();
208 return;
211 pending_receive_->DispatchData(data, num_bytes);
212 return;
214 if (result == MOJO_RESULT_SHOULD_WAIT) {
215 waiter_.reset(new AsyncWaiter(
216 handle_.get(),
217 MOJO_HANDLE_SIGNAL_READABLE,
218 base::Bind(&DataReceiver::OnDoneWaiting, weak_factory_.GetWeakPtr())));
219 return;
221 ShutDown();
224 bool DataReceiver::CheckErrorNotInReadRange(uint32_t num_bytes) {
225 DCHECK(pending_receive_);
226 if (!pending_error_)
227 return true;
229 DCHECK_NE(bytes_received_, pending_error_->offset);
230 DCHECK_NE(num_bytes, 0u);
231 uint32_t potential_bytes_received = bytes_received_ + num_bytes;
232 // bytes_received_ can overflow so we must consider two cases:
233 // 1. Both |bytes_received_| and |pending_error_->offset| have overflowed an
234 // equal number of times. In this case, |potential_bytes_received| must
235 // be in the range (|bytes_received|, |pending_error_->offset|]. Below
236 // this range can only occur if |bytes_received_| overflows before
237 // |pending_error_->offset|. Above can only occur if |bytes_received_|
238 // overtakes |pending_error_->offset|.
239 // 2. |pending_error_->offset| has overflowed once more than
240 // |bytes_received_|. In this case, |potential_bytes_received| must not
241 // be in the range (|pending_error_->offset|, |bytes_received_|].
242 if ((bytes_received_ < pending_error_->offset &&
243 (potential_bytes_received > pending_error_->offset ||
244 potential_bytes_received <= bytes_received_)) ||
245 (bytes_received_ > pending_error_->offset &&
246 potential_bytes_received > pending_error_->offset &&
247 potential_bytes_received <= bytes_received_)) {
248 return false;
250 return true;
253 void DataReceiver::ShutDown() {
254 shut_down_ = true;
255 if (pending_receive_)
256 pending_receive_->DispatchFatalError();
257 pending_error_.reset();
258 waiter_.reset();
261 DataReceiver::PendingReceive::PendingReceive(
262 DataReceiver* receiver,
263 const ReceiveDataCallback& callback,
264 const ReceiveErrorCallback& error_callback,
265 int32_t fatal_error_value)
266 : receiver_(receiver),
267 receive_callback_(callback),
268 receive_error_callback_(error_callback),
269 fatal_error_value_(fatal_error_value),
270 buffer_in_use_(false) {
273 void DataReceiver::PendingReceive::DispatchData(const void* data,
274 uint32_t num_bytes) {
275 DCHECK(!buffer_in_use_);
276 buffer_in_use_ = true;
277 receive_callback_.Run(scoped_ptr<ReadOnlyBuffer>(
278 new Buffer(receiver_, this, static_cast<const char*>(data), num_bytes)));
281 bool DataReceiver::PendingReceive::DispatchError(PendingError* error,
282 uint32_t bytes_received) {
283 DCHECK(!error->dispatched);
284 if (buffer_in_use_ || bytes_received != error->offset)
285 return false;
287 error->dispatched = true;
288 receive_error_callback_.Run(error->error);
289 return true;
292 void DataReceiver::PendingReceive::DispatchFatalError() {
293 receive_error_callback_.Run(fatal_error_value_);
296 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
297 DCHECK(buffer_in_use_);
298 buffer_in_use_ = false;
299 receiver_->Done(bytes_consumed);
302 DataReceiver::PendingReceive::Buffer::Buffer(
303 scoped_refptr<DataReceiver> receiver,
304 PendingReceive* receive,
305 const char* buffer,
306 uint32_t buffer_size)
307 : receiver_(receiver),
308 pending_receive_(receive),
309 buffer_(buffer),
310 buffer_size_(buffer_size) {
313 DataReceiver::PendingReceive::Buffer::~Buffer() {
314 if (pending_receive_)
315 pending_receive_->Done(0);
318 const char* DataReceiver::PendingReceive::Buffer::GetData() {
319 return buffer_;
322 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
323 return buffer_size_;
326 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
327 pending_receive_->Done(bytes_consumed);
328 pending_receive_ = NULL;
329 receiver_ = NULL;
330 buffer_ = NULL;
331 buffer_size_ = 0;
334 void DataReceiver::PendingReceive::Buffer::DoneWithError(
335 uint32_t bytes_consumed,
336 int32_t error) {
337 Done(bytes_consumed);
340 } // namespace device