1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_source_sender.h"
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend
{
18 PendingSend(DataSourceSender
* sender
, const ReadyCallback
& callback
);
20 // Asynchronously fills |data_| with up to |num_bytes| of data. Following
21 // this, one of Done() and DoneWithError() will be called with the result.
22 void GetData(uint32_t num_bytes
);
26 // Reports a successful write of |bytes_written|.
27 void Done(uint32_t bytes_written
);
29 // Reports a partially successful or unsuccessful write of |bytes_written|
30 // with an error of |error|.
31 void DoneWithError(uint32_t bytes_written
, int32_t error
);
33 // The DataSourceSender that owns this.
34 DataSourceSender
* sender_
;
36 // The callback to call to get data.
37 ReadyCallback callback_
;
39 // Whether the buffer specified by GetData() has been passed to |callback_|,
40 // but has not yet called Done() or DoneWithError().
43 // The data obtained using |callback_| to be dispatched to the client.
44 std::vector
<char> data_
;
47 // A Writable implementation that provides a view of a buffer owned by a
49 class DataSourceSender::PendingSend::Buffer
: public WritableBuffer
{
51 Buffer(scoped_refptr
<DataSourceSender
> sender
,
54 uint32_t buffer_size
);
57 // WritableBuffer overrides.
58 char* GetData() override
;
59 uint32_t GetSize() override
;
60 void Done(uint32_t bytes_written
) override
;
61 void DoneWithError(uint32_t bytes_written
, int32_t error
) override
;
64 // The DataSourceSender of whose buffer we are providing a view.
65 scoped_refptr
<DataSourceSender
> sender_
;
67 // The PendingSend to which this buffer has been created in response.
68 PendingSend
* pending_send_
;
71 uint32_t buffer_size_
;
74 DataSourceSender::DataSourceSender(
75 mojo::InterfaceRequest
<serial::DataSource
> source
,
76 mojo::InterfacePtr
<serial::DataSourceClient
> client
,
77 const ReadyCallback
& ready_callback
,
78 const ErrorCallback
& error_callback
)
79 : binding_(this, source
.Pass()),
80 client_(client
.Pass()),
81 ready_callback_(ready_callback
),
82 error_callback_(error_callback
),
83 available_buffer_capacity_(0),
87 DCHECK(!ready_callback
.is_null() && !error_callback
.is_null());
88 binding_
.set_connection_error_handler(
89 base::Bind(&DataSourceSender::OnConnectionError
, base::Unretained(this)));
90 client_
.set_connection_error_handler(
91 base::Bind(&DataSourceSender::OnConnectionError
, base::Unretained(this)));
94 void DataSourceSender::ShutDown() {
96 ready_callback_
.Reset();
97 error_callback_
.Reset();
100 DataSourceSender::~DataSourceSender() {
104 void DataSourceSender::Init(uint32_t buffer_size
) {
105 available_buffer_capacity_
= buffer_size
;
109 void DataSourceSender::Resume() {
111 DispatchFatalError();
119 void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent
) {
120 available_buffer_capacity_
+= bytes_sent
;
121 if (!pending_send_
&& !paused_
)
125 void DataSourceSender::OnConnectionError() {
126 DispatchFatalError();
129 void DataSourceSender::GetMoreData() {
130 if (shut_down_
|| paused_
|| pending_send_
|| !available_buffer_capacity_
)
133 pending_send_
.reset(new PendingSend(this, ready_callback_
));
134 pending_send_
->GetData(available_buffer_capacity_
);
137 void DataSourceSender::Done(const std::vector
<char>& data
) {
139 if (!shut_down_
&& available_buffer_capacity_
) {
140 base::MessageLoop::current()->PostTask(
142 base::Bind(&DataSourceSender::GetMoreData
, weak_factory_
.GetWeakPtr()));
146 void DataSourceSender::DoneWithError(const std::vector
<char>& data
,
150 client_
->OnError(error
);
152 // We don't call GetMoreData here so we don't send any additional data until
153 // Resume() is called.
156 void DataSourceSender::DoneInternal(const std::vector
<char>& data
) {
157 DCHECK(pending_send_
);
161 available_buffer_capacity_
-= static_cast<uint32_t>(data
.size());
163 mojo::Array
<uint8_t> data_to_send(data
.size());
164 std::copy(data
.begin(), data
.end(), &data_to_send
[0]);
165 client_
->OnData(data_to_send
.Pass());
167 pending_send_
.reset();
170 void DataSourceSender::DispatchFatalError() {
174 error_callback_
.Run();
178 DataSourceSender::PendingSend::PendingSend(DataSourceSender
* sender
,
179 const ReadyCallback
& callback
)
180 : sender_(sender
), callback_(callback
), buffer_in_use_(false) {
183 void DataSourceSender::PendingSend::GetData(uint32_t num_bytes
) {
185 DCHECK(!buffer_in_use_
);
186 buffer_in_use_
= true;
187 data_
.resize(num_bytes
);
188 callback_
.Run(scoped_ptr
<WritableBuffer
>(
189 new Buffer(sender_
, this, &data_
[0], num_bytes
)));
192 void DataSourceSender::PendingSend::Done(uint32_t bytes_written
) {
193 DCHECK(buffer_in_use_
);
194 DCHECK_LE(bytes_written
, data_
.size());
195 buffer_in_use_
= false;
196 data_
.resize(bytes_written
);
197 sender_
->Done(data_
);
200 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written
,
202 DCHECK(buffer_in_use_
);
203 DCHECK_LE(bytes_written
, data_
.size());
204 buffer_in_use_
= false;
205 data_
.resize(bytes_written
);
206 sender_
->DoneWithError(data_
, error
);
209 DataSourceSender::PendingSend::Buffer::Buffer(
210 scoped_refptr
<DataSourceSender
> sender
,
213 uint32_t buffer_size
)
217 buffer_size_(buffer_size
) {
220 DataSourceSender::PendingSend::Buffer::~Buffer() {
222 pending_send_
->Done(0);
225 char* DataSourceSender::PendingSend::Buffer::GetData() {
229 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
233 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written
) {
234 DCHECK(sender_
.get());
235 PendingSend
* send
= pending_send_
;
236 pending_send_
= nullptr;
237 send
->Done(bytes_written
);
241 void DataSourceSender::PendingSend::Buffer::DoneWithError(
242 uint32_t bytes_written
,
244 DCHECK(sender_
.get());
245 PendingSend
* send
= pending_send_
;
246 pending_send_
= nullptr;
247 send
->DoneWithError(bytes_written
, error
);
251 } // namespace device