exclude PluginsFieldTrialTest.NoPrefLeftBehind from valgrind bot
[chromium-blink-merge.git] / remoting / base / buffered_socket_writer.cc
blob827abca9df9376d2057430087f701ce3eaf63d22
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/base/buffered_socket_writer.h"
7 #include "base/bind.h"
8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h"
10 #include "net/base/net_errors.h"
11 #include "net/socket/socket.h"
13 namespace remoting {
15 namespace {
17 int WriteNetSocket(net::Socket* socket,
18 const scoped_refptr<net::IOBuffer>& buf,
19 int buf_len,
20 const net::CompletionCallback& callback) {
21 return socket->Write(buf.get(), buf_len, callback);
24 } // namespace
26 struct BufferedSocketWriter::PendingPacket {
27 PendingPacket(scoped_refptr<net::DrainableIOBuffer> data,
28 const base::Closure& done_task)
29 : data(data),
30 done_task(done_task) {
33 scoped_refptr<net::DrainableIOBuffer> data;
34 base::Closure done_task;
37 // static
38 scoped_ptr<BufferedSocketWriter> BufferedSocketWriter::CreateForSocket(
39 net::Socket* socket,
40 const WriteFailedCallback& write_failed_callback) {
41 scoped_ptr<BufferedSocketWriter> result(new BufferedSocketWriter());
42 result->Init(base::Bind(&WriteNetSocket, socket), write_failed_callback);
43 return result.Pass();
46 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
49 BufferedSocketWriter::~BufferedSocketWriter() {
50 STLDeleteElements(&queue_);
53 void BufferedSocketWriter::Init(
54 const WriteCallback& write_callback,
55 const WriteFailedCallback& write_failed_callback) {
56 write_callback_ = write_callback;
57 write_failed_callback_ = write_failed_callback;
60 void BufferedSocketWriter::Write(
61 const scoped_refptr<net::IOBufferWithSize>& data,
62 const base::Closure& done_task) {
63 DCHECK(thread_checker_.CalledOnValidThread());
64 DCHECK(data.get());
66 // Don't write after error.
67 if (is_closed())
68 return;
70 queue_.push_back(new PendingPacket(
71 new net::DrainableIOBuffer(data.get(), data->size()), done_task));
73 DoWrite();
76 bool BufferedSocketWriter::is_closed() {
77 return write_callback_.is_null();
80 void BufferedSocketWriter::DoWrite() {
81 DCHECK(thread_checker_.CalledOnValidThread());
83 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
84 while (self && !write_pending_ && !is_closed() && !queue_.empty()) {
85 int result = write_callback_.Run(
86 queue_.front()->data.get(), queue_.front()->data->BytesRemaining(),
87 base::Bind(&BufferedSocketWriter::OnWritten,
88 weak_factory_.GetWeakPtr()));
89 HandleWriteResult(result);
93 void BufferedSocketWriter::HandleWriteResult(int result) {
94 if (result < 0) {
95 if (result == net::ERR_IO_PENDING) {
96 write_pending_ = true;
97 } else {
98 write_callback_.Reset();
99 if (!write_failed_callback_.is_null()) {
100 WriteFailedCallback callback = write_failed_callback_;
101 callback.Run(result);
104 return;
107 DCHECK(!queue_.empty());
109 queue_.front()->data->DidConsume(result);
111 if (queue_.front()->data->BytesRemaining() == 0) {
112 base::Closure done_task = queue_.front()->done_task;
113 delete queue_.front();
114 queue_.pop_front();
116 if (!done_task.is_null())
117 done_task.Run();
121 void BufferedSocketWriter::OnWritten(int result) {
122 DCHECK(thread_checker_.CalledOnValidThread());
123 DCHECK(write_pending_);
124 write_pending_ = false;
126 base::WeakPtr<BufferedSocketWriter> self = weak_factory_.GetWeakPtr();
127 HandleWriteResult(result);
128 if (self)
129 DoWrite();
132 } // namespace remoting