1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/protocol/buffered_socket_writer.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "net/base/net_errors.h"
17 struct BufferedSocketWriterBase::PendingPacket
{
18 PendingPacket(scoped_refptr
<net::IOBufferWithSize
> data
,
19 const base::Closure
& done_task
)
21 done_task(done_task
) {
24 scoped_refptr
<net::IOBufferWithSize
> data
;
25 base::Closure done_task
;
28 BufferedSocketWriterBase::BufferedSocketWriterBase()
31 write_pending_(false),
33 destroyed_flag_(nullptr) {
36 void BufferedSocketWriterBase::Init(net::Socket
* socket
,
37 const WriteFailedCallback
& callback
) {
38 DCHECK(CalledOnValidThread());
41 write_failed_callback_
= callback
;
44 bool BufferedSocketWriterBase::Write(
45 scoped_refptr
<net::IOBufferWithSize
> data
, const base::Closure
& done_task
) {
46 DCHECK(CalledOnValidThread());
50 // Don't write after Close().
54 queue_
.push_back(new PendingPacket(data
, done_task
));
55 buffer_size_
+= data
->size();
59 // DoWrite() may trigger OnWriteError() to be called.
63 void BufferedSocketWriterBase::DoWrite() {
64 DCHECK(CalledOnValidThread());
67 // Don't try to write if there is another write pending.
71 // Don't write after Close().
76 net::IOBuffer
* current_packet
;
77 int current_packet_size
;
78 GetNextPacket(¤t_packet
, ¤t_packet_size
);
80 // Return if the queue is empty.
84 int result
= socket_
->Write(
85 current_packet
, current_packet_size
,
86 base::Bind(&BufferedSocketWriterBase::OnWritten
,
87 base::Unretained(this)));
88 bool write_again
= false;
89 HandleWriteResult(result
, &write_again
);
95 void BufferedSocketWriterBase::HandleWriteResult(int result
,
99 if (result
== net::ERR_IO_PENDING
) {
100 write_pending_
= true;
103 if (!write_failed_callback_
.is_null())
104 write_failed_callback_
.Run(result
);
109 base::Closure done_task
= AdvanceBufferPosition(result
);
110 if (!done_task
.is_null()) {
111 bool destroyed
= false;
112 destroyed_flag_
= &destroyed
;
115 // Stop doing anything if we've been destroyed by the callback.
118 destroyed_flag_
= nullptr;
124 void BufferedSocketWriterBase::OnWritten(int result
) {
125 DCHECK(CalledOnValidThread());
126 DCHECK(write_pending_
);
127 write_pending_
= false;
130 HandleWriteResult(result
, &write_again
);
135 void BufferedSocketWriterBase::HandleError(int result
) {
136 DCHECK(CalledOnValidThread());
140 STLDeleteElements(&queue_
);
142 // Notify subclass that an error is received.
146 int BufferedSocketWriterBase::GetBufferSize() {
150 int BufferedSocketWriterBase::GetBufferChunks() {
151 return queue_
.size();
154 void BufferedSocketWriterBase::Close() {
155 DCHECK(CalledOnValidThread());
159 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
161 *destroyed_flag_
= true;
163 STLDeleteElements(&queue_
);
166 base::Closure
BufferedSocketWriterBase::PopQueue() {
167 base::Closure result
= queue_
.front()->done_task
;
168 delete queue_
.front();
173 BufferedSocketWriter::BufferedSocketWriter() {
176 void BufferedSocketWriter::GetNextPacket(
177 net::IOBuffer
** buffer
, int* size
) {
178 if (!current_buf_
.get()) {
179 if (queue_
.empty()) {
181 return; // Nothing to write.
183 current_buf_
= new net::DrainableIOBuffer(queue_
.front()->data
.get(),
184 queue_
.front()->data
->size());
187 *buffer
= current_buf_
.get();
188 *size
= current_buf_
->BytesRemaining();
191 base::Closure
BufferedSocketWriter::AdvanceBufferPosition(int written
) {
192 buffer_size_
-= written
;
193 current_buf_
->DidConsume(written
);
195 if (current_buf_
->BytesRemaining() == 0) {
196 current_buf_
= nullptr;
199 return base::Closure();
202 void BufferedSocketWriter::OnError(int result
) {
203 current_buf_
= nullptr;
206 BufferedSocketWriter::~BufferedSocketWriter() {
209 BufferedDatagramWriter::BufferedDatagramWriter() {
212 void BufferedDatagramWriter::GetNextPacket(
213 net::IOBuffer
** buffer
, int* size
) {
214 if (queue_
.empty()) {
216 return; // Nothing to write.
218 *buffer
= queue_
.front()->data
.get();
219 *size
= queue_
.front()->data
->size();
222 base::Closure
BufferedDatagramWriter::AdvanceBufferPosition(int written
) {
223 DCHECK_EQ(written
, queue_
.front()->data
->size());
224 buffer_size_
-= queue_
.front()->data
->size();
228 void BufferedDatagramWriter::OnError(int result
) {
229 // Nothing to do here.
232 BufferedDatagramWriter::~BufferedDatagramWriter() {
235 } // namespace protocol
236 } // namespace remoting