[Android WebViewShell] Make WebViewLayoutTest runnable with test_runner.py
[chromium-blink-merge.git] / remoting / base / buffered_socket_writer.cc
blob8213f40a887ac70aabcd391798f6b2493e029958
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/base/buffered_socket_writer.h"
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "net/base/net_errors.h"
14 namespace remoting {
16 struct BufferedSocketWriterBase::PendingPacket {
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
18 const base::Closure& done_task)
19 : data(data),
20 done_task(done_task) {
23 scoped_refptr<net::IOBufferWithSize> data;
24 base::Closure done_task;
27 BufferedSocketWriterBase::BufferedSocketWriterBase()
28 : socket_(nullptr),
29 write_pending_(false),
30 closed_(false),
31 destroyed_flag_(nullptr) {
34 void BufferedSocketWriterBase::Init(net::Socket* socket,
35 const WriteFailedCallback& callback) {
36 DCHECK(CalledOnValidThread());
37 DCHECK(socket);
38 socket_ = socket;
39 write_failed_callback_ = callback;
42 bool BufferedSocketWriterBase::Write(
43 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
44 DCHECK(CalledOnValidThread());
45 DCHECK(socket_);
46 DCHECK(data.get());
48 // Don't write after Close().
49 if (closed_)
50 return false;
52 queue_.push_back(new PendingPacket(data, done_task));
54 DoWrite();
56 // DoWrite() may trigger OnWriteError() to be called.
57 return !closed_;
60 void BufferedSocketWriterBase::DoWrite() {
61 DCHECK(CalledOnValidThread());
62 DCHECK(socket_);
64 // Don't try to write if there is another write pending.
65 if (write_pending_)
66 return;
68 // Don't write after Close().
69 if (closed_)
70 return;
72 while (true) {
73 net::IOBuffer* current_packet;
74 int current_packet_size;
75 GetNextPacket(&current_packet, &current_packet_size);
77 // Return if the queue is empty.
78 if (!current_packet)
79 return;
81 int result = socket_->Write(
82 current_packet, current_packet_size,
83 base::Bind(&BufferedSocketWriterBase::OnWritten,
84 base::Unretained(this)));
85 bool write_again = false;
86 HandleWriteResult(result, &write_again);
87 if (!write_again)
88 return;
92 void BufferedSocketWriterBase::HandleWriteResult(int result,
93 bool* write_again) {
94 *write_again = false;
95 if (result < 0) {
96 if (result == net::ERR_IO_PENDING) {
97 write_pending_ = true;
98 } else {
99 HandleError(result);
100 if (!write_failed_callback_.is_null())
101 write_failed_callback_.Run(result);
103 return;
106 base::Closure done_task = AdvanceBufferPosition(result);
107 if (!done_task.is_null()) {
108 bool destroyed = false;
109 destroyed_flag_ = &destroyed;
110 done_task.Run();
111 if (destroyed) {
112 // Stop doing anything if we've been destroyed by the callback.
113 return;
115 destroyed_flag_ = nullptr;
118 *write_again = true;
121 void BufferedSocketWriterBase::OnWritten(int result) {
122 DCHECK(CalledOnValidThread());
123 DCHECK(write_pending_);
124 write_pending_ = false;
126 bool write_again;
127 HandleWriteResult(result, &write_again);
128 if (write_again)
129 DoWrite();
132 void BufferedSocketWriterBase::HandleError(int result) {
133 DCHECK(CalledOnValidThread());
135 closed_ = true;
137 STLDeleteElements(&queue_);
139 // Notify subclass that an error is received.
140 OnError(result);
143 void BufferedSocketWriterBase::Close() {
144 DCHECK(CalledOnValidThread());
145 closed_ = true;
148 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
149 if (destroyed_flag_)
150 *destroyed_flag_ = true;
152 STLDeleteElements(&queue_);
155 base::Closure BufferedSocketWriterBase::PopQueue() {
156 base::Closure result = queue_.front()->done_task;
157 delete queue_.front();
158 queue_.pop_front();
159 return result;
162 BufferedSocketWriter::BufferedSocketWriter() {
165 void BufferedSocketWriter::GetNextPacket(
166 net::IOBuffer** buffer, int* size) {
167 if (!current_buf_.get()) {
168 if (queue_.empty()) {
169 *buffer = nullptr;
170 return; // Nothing to write.
172 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
173 queue_.front()->data->size());
176 *buffer = current_buf_.get();
177 *size = current_buf_->BytesRemaining();
180 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
181 current_buf_->DidConsume(written);
183 if (current_buf_->BytesRemaining() == 0) {
184 current_buf_ = nullptr;
185 return PopQueue();
187 return base::Closure();
190 void BufferedSocketWriter::OnError(int result) {
191 current_buf_ = nullptr;
194 BufferedSocketWriter::~BufferedSocketWriter() {
197 BufferedDatagramWriter::BufferedDatagramWriter() {
200 void BufferedDatagramWriter::GetNextPacket(
201 net::IOBuffer** buffer, int* size) {
202 if (queue_.empty()) {
203 *buffer = nullptr;
204 return; // Nothing to write.
206 *buffer = queue_.front()->data.get();
207 *size = queue_.front()->data->size();
210 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
211 DCHECK_EQ(written, queue_.front()->data->size());
212 return PopQueue();
215 void BufferedDatagramWriter::OnError(int result) {
216 // Nothing to do here.
219 BufferedDatagramWriter::~BufferedDatagramWriter() {
222 } // namespace remoting