1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/base/buffered_socket_writer.h"
8 #include "base/stl_util.h"
9 #include "net/base/io_buffer.h"
10 #include "net/base/net_errors.h"
11 #include "net/socket/socket.h"
17 int WriteNetSocket(net::Socket
* socket
,
18 const scoped_refptr
<net::IOBuffer
>& buf
,
20 const net::CompletionCallback
& callback
) {
21 return socket
->Write(buf
.get(), buf_len
, callback
);
26 struct BufferedSocketWriter::PendingPacket
{
27 PendingPacket(scoped_refptr
<net::DrainableIOBuffer
> data
,
28 const base::Closure
& done_task
)
30 done_task(done_task
) {
33 scoped_refptr
<net::DrainableIOBuffer
> data
;
34 base::Closure done_task
;
38 scoped_ptr
<BufferedSocketWriter
> BufferedSocketWriter::CreateForSocket(
40 const WriteFailedCallback
& write_failed_callback
) {
41 scoped_ptr
<BufferedSocketWriter
> result(new BufferedSocketWriter());
42 result
->Init(base::Bind(&WriteNetSocket
, socket
), write_failed_callback
);
46 BufferedSocketWriter::BufferedSocketWriter() : weak_factory_(this) {
49 BufferedSocketWriter::~BufferedSocketWriter() {
50 STLDeleteElements(&queue_
);
53 void BufferedSocketWriter::Init(
54 const WriteCallback
& write_callback
,
55 const WriteFailedCallback
& write_failed_callback
) {
56 write_callback_
= write_callback
;
57 write_failed_callback_
= write_failed_callback
;
60 void BufferedSocketWriter::Write(
61 const scoped_refptr
<net::IOBufferWithSize
>& data
,
62 const base::Closure
& done_task
) {
63 DCHECK(thread_checker_
.CalledOnValidThread());
66 // Don't write after error.
70 queue_
.push_back(new PendingPacket(
71 new net::DrainableIOBuffer(data
.get(), data
->size()), done_task
));
76 bool BufferedSocketWriter::is_closed() {
77 return write_callback_
.is_null();
80 void BufferedSocketWriter::DoWrite() {
81 DCHECK(thread_checker_
.CalledOnValidThread());
83 base::WeakPtr
<BufferedSocketWriter
> self
= weak_factory_
.GetWeakPtr();
84 while (self
&& !write_pending_
&& !is_closed() && !queue_
.empty()) {
85 int result
= write_callback_
.Run(
86 queue_
.front()->data
.get(), queue_
.front()->data
->BytesRemaining(),
87 base::Bind(&BufferedSocketWriter::OnWritten
,
88 weak_factory_
.GetWeakPtr()));
89 HandleWriteResult(result
);
93 void BufferedSocketWriter::HandleWriteResult(int result
) {
95 if (result
== net::ERR_IO_PENDING
) {
96 write_pending_
= true;
98 write_callback_
.Reset();
99 if (!write_failed_callback_
.is_null()) {
100 WriteFailedCallback callback
= write_failed_callback_
;
101 callback
.Run(result
);
107 DCHECK(!queue_
.empty());
109 queue_
.front()->data
->DidConsume(result
);
111 if (queue_
.front()->data
->BytesRemaining() == 0) {
112 base::Closure done_task
= queue_
.front()->done_task
;
113 delete queue_
.front();
116 if (!done_task
.is_null())
121 void BufferedSocketWriter::OnWritten(int result
) {
122 DCHECK(thread_checker_
.CalledOnValidThread());
123 DCHECK(write_pending_
);
124 write_pending_
= false;
126 base::WeakPtr
<BufferedSocketWriter
> self
= weak_factory_
.GetWeakPtr();
127 HandleWriteResult(result
);
132 } // namespace remoting