Process Alt-Svc headers.
[chromium-blink-merge.git] / device / serial / data_source_sender.cc
blob5b4299239560be45c0cc927f84e48aa57f7d3c1a
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "device/serial/data_source_sender.h"
7 #include <algorithm>
8 #include <limits>
10 #include "base/bind.h"
11 #include "base/message_loop/message_loop.h"
13 namespace device {
15 // Represents a send that is not yet fulfilled.
16 class DataSourceSender::PendingSend {
17 public:
18 PendingSend(DataSourceSender* sender, const ReadyCallback& callback);
20 // Asynchronously fills |data_| with up to |num_bytes| of data. Following
21 // this, one of Done() and DoneWithError() will be called with the result.
22 void GetData(uint32_t num_bytes);
24 private:
25 class Buffer;
26 // Reports a successful write of |bytes_written|.
27 void Done(uint32_t bytes_written);
29 // Reports a partially successful or unsuccessful write of |bytes_written|
30 // with an error of |error|.
31 void DoneWithError(uint32_t bytes_written, int32_t error);
33 // The DataSourceSender that owns this.
34 DataSourceSender* sender_;
36 // The callback to call to get data.
37 ReadyCallback callback_;
39 // Whether the buffer specified by GetData() has been passed to |callback_|,
40 // but has not yet called Done() or DoneWithError().
41 bool buffer_in_use_;
43 // The data obtained using |callback_| to be dispatched to the client.
44 std::vector<char> data_;
47 // A Writable implementation that provides a view of a buffer owned by a
48 // DataSourceSender.
49 class DataSourceSender::PendingSend::Buffer : public WritableBuffer {
50 public:
51 Buffer(scoped_refptr<DataSourceSender> sender,
52 PendingSend* send,
53 char* buffer,
54 uint32_t buffer_size);
55 ~Buffer() override;
57 // WritableBuffer overrides.
58 char* GetData() override;
59 uint32_t GetSize() override;
60 void Done(uint32_t bytes_written) override;
61 void DoneWithError(uint32_t bytes_written, int32_t error) override;
63 private:
64 // The DataSourceSender of whose buffer we are providing a view.
65 scoped_refptr<DataSourceSender> sender_;
67 // The PendingSend to which this buffer has been created in response.
68 PendingSend* pending_send_;
70 char* buffer_;
71 uint32_t buffer_size_;
74 DataSourceSender::DataSourceSender(
75 mojo::InterfaceRequest<serial::DataSource> source,
76 mojo::InterfacePtr<serial::DataSourceClient> client,
77 const ReadyCallback& ready_callback,
78 const ErrorCallback& error_callback)
79 : binding_(this, source.Pass()),
80 client_(client.Pass()),
81 ready_callback_(ready_callback),
82 error_callback_(error_callback),
83 available_buffer_capacity_(0),
84 paused_(false),
85 shut_down_(false),
86 weak_factory_(this) {
87 DCHECK(!ready_callback.is_null() && !error_callback.is_null());
88 binding_.set_connection_error_handler(
89 base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this)));
90 client_.set_connection_error_handler(
91 base::Bind(&DataSourceSender::OnConnectionError, base::Unretained(this)));
94 void DataSourceSender::ShutDown() {
95 shut_down_ = true;
96 ready_callback_.Reset();
97 error_callback_.Reset();
100 DataSourceSender::~DataSourceSender() {
101 DCHECK(shut_down_);
104 void DataSourceSender::Init(uint32_t buffer_size) {
105 available_buffer_capacity_ = buffer_size;
106 GetMoreData();
109 void DataSourceSender::Resume() {
110 if (pending_send_) {
111 DispatchFatalError();
112 return;
115 paused_ = false;
116 GetMoreData();
119 void DataSourceSender::ReportBytesReceived(uint32_t bytes_sent) {
120 available_buffer_capacity_ += bytes_sent;
121 if (!pending_send_ && !paused_)
122 GetMoreData();
125 void DataSourceSender::OnConnectionError() {
126 DispatchFatalError();
129 void DataSourceSender::GetMoreData() {
130 if (shut_down_ || paused_ || pending_send_ || !available_buffer_capacity_)
131 return;
133 pending_send_.reset(new PendingSend(this, ready_callback_));
134 pending_send_->GetData(available_buffer_capacity_);
137 void DataSourceSender::Done(const std::vector<char>& data) {
138 DoneInternal(data);
139 if (!shut_down_ && available_buffer_capacity_) {
140 base::MessageLoop::current()->PostTask(
141 FROM_HERE,
142 base::Bind(&DataSourceSender::GetMoreData, weak_factory_.GetWeakPtr()));
146 void DataSourceSender::DoneWithError(const std::vector<char>& data,
147 int32_t error) {
148 DoneInternal(data);
149 if (!shut_down_)
150 client_->OnError(error);
151 paused_ = true;
152 // We don't call GetMoreData here so we don't send any additional data until
153 // Resume() is called.
156 void DataSourceSender::DoneInternal(const std::vector<char>& data) {
157 DCHECK(pending_send_);
158 if (shut_down_)
159 return;
161 available_buffer_capacity_ -= static_cast<uint32_t>(data.size());
162 if (!data.empty()) {
163 mojo::Array<uint8_t> data_to_send(data.size());
164 std::copy(data.begin(), data.end(), &data_to_send[0]);
165 client_->OnData(data_to_send.Pass());
167 pending_send_.reset();
170 void DataSourceSender::DispatchFatalError() {
171 if (shut_down_)
172 return;
174 error_callback_.Run();
175 ShutDown();
178 DataSourceSender::PendingSend::PendingSend(DataSourceSender* sender,
179 const ReadyCallback& callback)
180 : sender_(sender), callback_(callback), buffer_in_use_(false) {
183 void DataSourceSender::PendingSend::GetData(uint32_t num_bytes) {
184 DCHECK(num_bytes);
185 DCHECK(!buffer_in_use_);
186 buffer_in_use_ = true;
187 data_.resize(num_bytes);
188 callback_.Run(scoped_ptr<WritableBuffer>(
189 new Buffer(sender_, this, &data_[0], num_bytes)));
192 void DataSourceSender::PendingSend::Done(uint32_t bytes_written) {
193 DCHECK(buffer_in_use_);
194 DCHECK_LE(bytes_written, data_.size());
195 buffer_in_use_ = false;
196 data_.resize(bytes_written);
197 sender_->Done(data_);
200 void DataSourceSender::PendingSend::DoneWithError(uint32_t bytes_written,
201 int32_t error) {
202 DCHECK(buffer_in_use_);
203 DCHECK_LE(bytes_written, data_.size());
204 buffer_in_use_ = false;
205 data_.resize(bytes_written);
206 sender_->DoneWithError(data_, error);
209 DataSourceSender::PendingSend::Buffer::Buffer(
210 scoped_refptr<DataSourceSender> sender,
211 PendingSend* send,
212 char* buffer,
213 uint32_t buffer_size)
214 : sender_(sender),
215 pending_send_(send),
216 buffer_(buffer),
217 buffer_size_(buffer_size) {
220 DataSourceSender::PendingSend::Buffer::~Buffer() {
221 if (pending_send_)
222 pending_send_->Done(0);
225 char* DataSourceSender::PendingSend::Buffer::GetData() {
226 return buffer_;
229 uint32_t DataSourceSender::PendingSend::Buffer::GetSize() {
230 return buffer_size_;
233 void DataSourceSender::PendingSend::Buffer::Done(uint32_t bytes_written) {
234 DCHECK(sender_.get());
235 PendingSend* send = pending_send_;
236 pending_send_ = nullptr;
237 send->Done(bytes_written);
238 sender_ = nullptr;
241 void DataSourceSender::PendingSend::Buffer::DoneWithError(
242 uint32_t bytes_written,
243 int32_t error) {
244 DCHECK(sender_.get());
245 PendingSend* send = pending_send_;
246 pending_send_ = nullptr;
247 send->DoneWithError(bytes_written, error);
248 sender_ = nullptr;
251 } // namespace device