1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/base/buffered_socket_writer.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "net/base/net_errors.h"
16 struct BufferedSocketWriterBase::PendingPacket
{
17 PendingPacket(scoped_refptr
<net::IOBufferWithSize
> data
,
18 const base::Closure
& done_task
)
20 done_task(done_task
) {
23 scoped_refptr
<net::IOBufferWithSize
> data
;
24 base::Closure done_task
;
27 BufferedSocketWriterBase::BufferedSocketWriterBase()
29 write_pending_(false),
31 destroyed_flag_(nullptr) {
34 void BufferedSocketWriterBase::Init(net::Socket
* socket
,
35 const WriteFailedCallback
& callback
) {
36 DCHECK(CalledOnValidThread());
39 write_failed_callback_
= callback
;
42 bool BufferedSocketWriterBase::Write(
43 scoped_refptr
<net::IOBufferWithSize
> data
, const base::Closure
& done_task
) {
44 DCHECK(CalledOnValidThread());
48 // Don't write after Close().
52 queue_
.push_back(new PendingPacket(data
, done_task
));
56 // DoWrite() may trigger OnWriteError() to be called.
60 void BufferedSocketWriterBase::DoWrite() {
61 DCHECK(CalledOnValidThread());
64 // Don't try to write if there is another write pending.
68 // Don't write after Close().
73 net::IOBuffer
* current_packet
;
74 int current_packet_size
;
75 GetNextPacket(¤t_packet
, ¤t_packet_size
);
77 // Return if the queue is empty.
81 int result
= socket_
->Write(
82 current_packet
, current_packet_size
,
83 base::Bind(&BufferedSocketWriterBase::OnWritten
,
84 base::Unretained(this)));
85 bool write_again
= false;
86 HandleWriteResult(result
, &write_again
);
92 void BufferedSocketWriterBase::HandleWriteResult(int result
,
96 if (result
== net::ERR_IO_PENDING
) {
97 write_pending_
= true;
100 if (!write_failed_callback_
.is_null())
101 write_failed_callback_
.Run(result
);
106 base::Closure done_task
= AdvanceBufferPosition(result
);
107 if (!done_task
.is_null()) {
108 bool destroyed
= false;
109 destroyed_flag_
= &destroyed
;
112 // Stop doing anything if we've been destroyed by the callback.
115 destroyed_flag_
= nullptr;
121 void BufferedSocketWriterBase::OnWritten(int result
) {
122 DCHECK(CalledOnValidThread());
123 DCHECK(write_pending_
);
124 write_pending_
= false;
127 HandleWriteResult(result
, &write_again
);
132 void BufferedSocketWriterBase::HandleError(int result
) {
133 DCHECK(CalledOnValidThread());
137 STLDeleteElements(&queue_
);
139 // Notify subclass that an error is received.
143 void BufferedSocketWriterBase::Close() {
144 DCHECK(CalledOnValidThread());
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
150 *destroyed_flag_
= true;
152 STLDeleteElements(&queue_
);
155 base::Closure
BufferedSocketWriterBase::PopQueue() {
156 base::Closure result
= queue_
.front()->done_task
;
157 delete queue_
.front();
162 BufferedSocketWriter::BufferedSocketWriter() {
165 void BufferedSocketWriter::GetNextPacket(
166 net::IOBuffer
** buffer
, int* size
) {
167 if (!current_buf_
.get()) {
168 if (queue_
.empty()) {
170 return; // Nothing to write.
172 current_buf_
= new net::DrainableIOBuffer(queue_
.front()->data
.get(),
173 queue_
.front()->data
->size());
176 *buffer
= current_buf_
.get();
177 *size
= current_buf_
->BytesRemaining();
180 base::Closure
BufferedSocketWriter::AdvanceBufferPosition(int written
) {
181 current_buf_
->DidConsume(written
);
183 if (current_buf_
->BytesRemaining() == 0) {
184 current_buf_
= nullptr;
187 return base::Closure();
190 void BufferedSocketWriter::OnError(int result
) {
191 current_buf_
= nullptr;
194 BufferedSocketWriter::~BufferedSocketWriter() {
197 BufferedDatagramWriter::BufferedDatagramWriter() {
200 void BufferedDatagramWriter::GetNextPacket(
201 net::IOBuffer
** buffer
, int* size
) {
202 if (queue_
.empty()) {
204 return; // Nothing to write.
206 *buffer
= queue_
.front()->data
.get();
207 *size
= queue_
.front()->data
->size();
210 base::Closure
BufferedDatagramWriter::AdvanceBufferPosition(int written
) {
211 DCHECK_EQ(written
, queue_
.front()->data
->size());
215 void BufferedDatagramWriter::OnError(int result
) {
216 // Nothing to do here.
219 BufferedDatagramWriter::~BufferedDatagramWriter() {
222 } // namespace remoting