Add OWNERS to content/browser/quota
[chromium-blink-merge.git] / device / serial / data_receiver.cc
blob82d0153fe287dfcca74b0cdd025e7631c751e9f3
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_receiver.h"
7 #include <limits>
9 #include "base/bind.h"
10 #include "base/message_loop/message_loop.h"
12 namespace device {
14 // Represents a receive that is not yet fulfilled.
15 class DataReceiver::PendingReceive {
16 public:
17 PendingReceive(DataReceiver* receiver,
18 const ReceiveDataCallback& callback,
19 const ReceiveErrorCallback& error_callback,
20 int32_t fatal_error_value);
22 // Dispatches |data| to |receive_callback_|. Returns whether this
23 // PendingReceive is finished by this call.
24 bool DispatchDataFrame(DataReceiver::DataFrame* data);
26 // Reports |fatal_error_value_| to |receive_error_callback_|.
27 void DispatchFatalError();
29 bool buffer_in_use() { return buffer_in_use_; }
31 private:
32 class Buffer;
34 // Invoked when the user is finished with the ReadOnlyBuffer provided to
35 // |receive_callback_|.
36 void Done(uint32_t num_bytes);
38 // The DataReceiver that owns this.
39 DataReceiver* receiver_;
41 // The callback to dispatch data.
42 ReceiveDataCallback receive_callback_;
44 // The callback to report errors.
45 ReceiveErrorCallback receive_error_callback_;
47 // The error value to report when DispatchFatalError() is called.
48 const int32_t fatal_error_value_;
50 // True if the user owns a buffer passed to |receive_callback_| as part of
51 // DispatchDataFrame().
52 bool buffer_in_use_;
55 // A ReadOnlyBuffer implementation that provides a view of a buffer owned by a
56 // DataReceiver.
57 class DataReceiver::PendingReceive::Buffer : public ReadOnlyBuffer {
58 public:
59 Buffer(scoped_refptr<DataReceiver> pipe,
60 PendingReceive* receive,
61 const char* buffer,
62 uint32_t buffer_size);
63 ~Buffer() override;
65 // ReadOnlyBuffer overrides.
66 const char* GetData() override;
67 uint32_t GetSize() override;
68 void Done(uint32_t bytes_consumed) override;
69 void DoneWithError(uint32_t bytes_consumed, int32_t error) override;
71 private:
72 // The DataReceiver of whose buffer we are providing a view.
73 scoped_refptr<DataReceiver> receiver_;
75 // The PendingReceive to which this buffer has been created in response.
76 PendingReceive* pending_receive_;
78 const char* buffer_;
79 uint32_t buffer_size_;
82 // A buffer of data or an error received from the DataSource.
83 struct DataReceiver::DataFrame {
84 explicit DataFrame(mojo::Array<uint8_t> data)
85 : is_error(false),
86 data(data.Pass()),
87 offset(0),
88 error(0),
89 dispatched(false) {}
91 explicit DataFrame(int32_t error)
92 : is_error(true), offset(0), error(error), dispatched(false) {}
94 // Whether this DataFrame represents an error.
95 bool is_error;
97 // The data received from the DataSource.
98 mojo::Array<uint8_t> data;
100 // The offset within |data| at which the next read should begin.
101 uint32_t offset;
103 // The value of the error that occurred.
104 const int32_t error;
106 // Whether the error has been dispatched to the user.
107 bool dispatched;
110 DataReceiver::DataReceiver(
111 mojo::InterfacePtr<serial::DataSource> source,
112 mojo::InterfaceRequest<serial::DataSourceClient> client,
113 uint32_t buffer_size,
114 int32_t fatal_error_value)
115 : source_(source.Pass()),
116 client_(this, client.Pass()),
117 fatal_error_value_(fatal_error_value),
118 shut_down_(false),
119 weak_factory_(this) {
120 source_.set_connection_error_handler(
121 base::Bind(&DataReceiver::OnConnectionError, base::Unretained(this)));
122 source_->Init(buffer_size);
123 client_.set_connection_error_handler(
124 base::Bind(&DataReceiver::OnConnectionError, base::Unretained(this)));
127 bool DataReceiver::Receive(const ReceiveDataCallback& callback,
128 const ReceiveErrorCallback& error_callback) {
129 DCHECK(!callback.is_null() && !error_callback.is_null());
130 if (pending_receive_ || shut_down_)
131 return false;
132 // When the DataSource encounters an error, it pauses transmission. When the
133 // user starts a new receive following notification of the error (via
134 // |error_callback| of the previous Receive call) of the error we can tell the
135 // DataSource to resume transmission of data.
136 if (!pending_data_frames_.empty() && pending_data_frames_.front()->is_error &&
137 pending_data_frames_.front()->dispatched) {
138 source_->Resume();
139 pending_data_frames_.pop();
142 pending_receive_.reset(
143 new PendingReceive(this, callback, error_callback, fatal_error_value_));
144 ReceiveInternal();
145 return true;
148 DataReceiver::~DataReceiver() {
149 ShutDown();
152 void DataReceiver::OnError(int32_t error) {
153 if (shut_down_)
154 return;
156 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(error)));
157 if (pending_receive_)
158 ReceiveInternal();
161 void DataReceiver::OnData(mojo::Array<uint8_t> data) {
162 pending_data_frames_.push(linked_ptr<DataFrame>(new DataFrame(data.Pass())));
163 if (pending_receive_)
164 ReceiveInternal();
167 void DataReceiver::OnConnectionError() {
168 ShutDown();
171 void DataReceiver::Done(uint32_t bytes_consumed) {
172 if (shut_down_)
173 return;
175 DCHECK(pending_receive_);
176 DataFrame& pending_data = *pending_data_frames_.front();
177 pending_data.offset += bytes_consumed;
178 DCHECK_LE(pending_data.offset, pending_data.data.size());
179 if (pending_data.offset == pending_data.data.size()) {
180 source_->ReportBytesReceived(
181 static_cast<uint32_t>(pending_data.data.size()));
182 pending_data_frames_.pop();
184 pending_receive_.reset();
187 void DataReceiver::ReceiveInternal() {
188 if (shut_down_)
189 return;
190 DCHECK(pending_receive_);
191 if (pending_receive_->buffer_in_use())
192 return;
194 if (!pending_data_frames_.empty() &&
195 pending_receive_->DispatchDataFrame(pending_data_frames_.front().get())) {
196 pending_receive_.reset();
200 void DataReceiver::ShutDown() {
201 shut_down_ = true;
202 if (pending_receive_)
203 pending_receive_->DispatchFatalError();
206 DataReceiver::PendingReceive::PendingReceive(
207 DataReceiver* receiver,
208 const ReceiveDataCallback& callback,
209 const ReceiveErrorCallback& error_callback,
210 int32_t fatal_error_value)
211 : receiver_(receiver),
212 receive_callback_(callback),
213 receive_error_callback_(error_callback),
214 fatal_error_value_(fatal_error_value),
215 buffer_in_use_(false) {
218 bool DataReceiver::PendingReceive::DispatchDataFrame(
219 DataReceiver::DataFrame* data) {
220 DCHECK(!buffer_in_use_);
221 DCHECK(!data->dispatched);
223 if (data->is_error) {
224 data->dispatched = true;
225 base::MessageLoop::current()->PostTask(
226 FROM_HERE, base::Bind(receive_error_callback_, data->error));
227 return true;
229 buffer_in_use_ = true;
230 base::MessageLoop::current()->PostTask(
231 FROM_HERE,
232 base::Bind(
233 receive_callback_,
234 base::Passed(scoped_ptr<ReadOnlyBuffer>(new Buffer(
235 receiver_,
236 this,
237 reinterpret_cast<char*>(&data->data[0]) + data->offset,
238 static_cast<uint32_t>(data->data.size() - data->offset))))));
239 return false;
242 void DataReceiver::PendingReceive::DispatchFatalError() {
243 receive_error_callback_.Run(fatal_error_value_);
246 void DataReceiver::PendingReceive::Done(uint32_t bytes_consumed) {
247 DCHECK(buffer_in_use_);
248 buffer_in_use_ = false;
249 receiver_->Done(bytes_consumed);
252 DataReceiver::PendingReceive::Buffer::Buffer(
253 scoped_refptr<DataReceiver> receiver,
254 PendingReceive* receive,
255 const char* buffer,
256 uint32_t buffer_size)
257 : receiver_(receiver),
258 pending_receive_(receive),
259 buffer_(buffer),
260 buffer_size_(buffer_size) {
263 DataReceiver::PendingReceive::Buffer::~Buffer() {
264 if (pending_receive_)
265 pending_receive_->Done(0);
268 const char* DataReceiver::PendingReceive::Buffer::GetData() {
269 return buffer_;
272 uint32_t DataReceiver::PendingReceive::Buffer::GetSize() {
273 return buffer_size_;
276 void DataReceiver::PendingReceive::Buffer::Done(uint32_t bytes_consumed) {
277 pending_receive_->Done(bytes_consumed);
278 pending_receive_ = NULL;
279 receiver_ = NULL;
280 buffer_ = NULL;
281 buffer_size_ = 0;
284 void DataReceiver::PendingReceive::Buffer::DoneWithError(
285 uint32_t bytes_consumed,
286 int32_t error) {
287 Done(bytes_consumed);
290 } // namespace device