Supervised user import: Listen for profile creation/deletion
[chromium-blink-merge.git] / remoting / base / buffered_socket_writer.cc
blobff2c47f822473a2dfbd5dc0b0bcf03db67d995e1
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/base/buffered_socket_writer.h"
7 #include "base/bind.h"
8 #include "base/location.h"
9 #include "base/single_thread_task_runner.h"
10 #include "base/stl_util.h"
11 #include "base/thread_task_runner_handle.h"
12 #include "net/base/net_errors.h"
14 namespace remoting {
16 struct BufferedSocketWriterBase::PendingPacket {
17 PendingPacket(scoped_refptr<net::IOBufferWithSize> data,
18 const base::Closure& done_task)
19 : data(data),
20 done_task(done_task) {
23 scoped_refptr<net::IOBufferWithSize> data;
24 base::Closure done_task;
27 BufferedSocketWriterBase::BufferedSocketWriterBase()
28 : buffer_size_(0),
29 socket_(nullptr),
30 write_pending_(false),
31 closed_(false),
32 destroyed_flag_(nullptr) {
35 void BufferedSocketWriterBase::Init(net::Socket* socket,
36 const WriteFailedCallback& callback) {
37 DCHECK(CalledOnValidThread());
38 DCHECK(socket);
39 socket_ = socket;
40 write_failed_callback_ = callback;
43 bool BufferedSocketWriterBase::Write(
44 scoped_refptr<net::IOBufferWithSize> data, const base::Closure& done_task) {
45 DCHECK(CalledOnValidThread());
46 DCHECK(socket_);
47 DCHECK(data.get());
49 // Don't write after Close().
50 if (closed_)
51 return false;
53 queue_.push_back(new PendingPacket(data, done_task));
54 buffer_size_ += data->size();
56 DoWrite();
58 // DoWrite() may trigger OnWriteError() to be called.
59 return !closed_;
62 void BufferedSocketWriterBase::DoWrite() {
63 DCHECK(CalledOnValidThread());
64 DCHECK(socket_);
66 // Don't try to write if there is another write pending.
67 if (write_pending_)
68 return;
70 // Don't write after Close().
71 if (closed_)
72 return;
74 while (true) {
75 net::IOBuffer* current_packet;
76 int current_packet_size;
77 GetNextPacket(&current_packet, &current_packet_size);
79 // Return if the queue is empty.
80 if (!current_packet)
81 return;
83 int result = socket_->Write(
84 current_packet, current_packet_size,
85 base::Bind(&BufferedSocketWriterBase::OnWritten,
86 base::Unretained(this)));
87 bool write_again = false;
88 HandleWriteResult(result, &write_again);
89 if (!write_again)
90 return;
94 void BufferedSocketWriterBase::HandleWriteResult(int result,
95 bool* write_again) {
96 *write_again = false;
97 if (result < 0) {
98 if (result == net::ERR_IO_PENDING) {
99 write_pending_ = true;
100 } else {
101 HandleError(result);
102 if (!write_failed_callback_.is_null())
103 write_failed_callback_.Run(result);
105 return;
108 base::Closure done_task = AdvanceBufferPosition(result);
109 if (!done_task.is_null()) {
110 bool destroyed = false;
111 destroyed_flag_ = &destroyed;
112 done_task.Run();
113 if (destroyed) {
114 // Stop doing anything if we've been destroyed by the callback.
115 return;
117 destroyed_flag_ = nullptr;
120 *write_again = true;
123 void BufferedSocketWriterBase::OnWritten(int result) {
124 DCHECK(CalledOnValidThread());
125 DCHECK(write_pending_);
126 write_pending_ = false;
128 bool write_again;
129 HandleWriteResult(result, &write_again);
130 if (write_again)
131 DoWrite();
134 void BufferedSocketWriterBase::HandleError(int result) {
135 DCHECK(CalledOnValidThread());
137 closed_ = true;
139 STLDeleteElements(&queue_);
141 // Notify subclass that an error is received.
142 OnError(result);
145 int BufferedSocketWriterBase::GetBufferSize() {
146 return buffer_size_;
149 int BufferedSocketWriterBase::GetBufferChunks() {
150 return queue_.size();
153 void BufferedSocketWriterBase::Close() {
154 DCHECK(CalledOnValidThread());
155 closed_ = true;
158 BufferedSocketWriterBase::~BufferedSocketWriterBase() {
159 if (destroyed_flag_)
160 *destroyed_flag_ = true;
162 STLDeleteElements(&queue_);
165 base::Closure BufferedSocketWriterBase::PopQueue() {
166 base::Closure result = queue_.front()->done_task;
167 delete queue_.front();
168 queue_.pop_front();
169 return result;
172 BufferedSocketWriter::BufferedSocketWriter() {
175 void BufferedSocketWriter::GetNextPacket(
176 net::IOBuffer** buffer, int* size) {
177 if (!current_buf_.get()) {
178 if (queue_.empty()) {
179 *buffer = nullptr;
180 return; // Nothing to write.
182 current_buf_ = new net::DrainableIOBuffer(queue_.front()->data.get(),
183 queue_.front()->data->size());
186 *buffer = current_buf_.get();
187 *size = current_buf_->BytesRemaining();
190 base::Closure BufferedSocketWriter::AdvanceBufferPosition(int written) {
191 buffer_size_ -= written;
192 current_buf_->DidConsume(written);
194 if (current_buf_->BytesRemaining() == 0) {
195 current_buf_ = nullptr;
196 return PopQueue();
198 return base::Closure();
201 void BufferedSocketWriter::OnError(int result) {
202 current_buf_ = nullptr;
205 BufferedSocketWriter::~BufferedSocketWriter() {
208 BufferedDatagramWriter::BufferedDatagramWriter() {
211 void BufferedDatagramWriter::GetNextPacket(
212 net::IOBuffer** buffer, int* size) {
213 if (queue_.empty()) {
214 *buffer = nullptr;
215 return; // Nothing to write.
217 *buffer = queue_.front()->data.get();
218 *size = queue_.front()->data->size();
221 base::Closure BufferedDatagramWriter::AdvanceBufferPosition(int written) {
222 DCHECK_EQ(written, queue_.front()->data->size());
223 buffer_size_ -= queue_.front()->data->size();
224 return PopQueue();
227 void BufferedDatagramWriter::OnError(int result) {
228 // Nothing to do here.
231 BufferedDatagramWriter::~BufferedDatagramWriter() {
234 } // namespace remoting