1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/protocol/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h"
16 #include "remoting/protocol/p2p_datagram_socket.h"
18 using cricket::PseudoTcp
;
21 const int kReadBufferSize
= 65536; // Maximum size of a packet.
22 const uint16 kDefaultMtu
= 1280;
28 class PseudoTcpAdapter::Core
: public cricket::IPseudoTcpNotify
,
29 public base::RefCounted
<Core
> {
31 explicit Core(scoped_ptr
<P2PDatagramSocket
> socket
);
33 // Functions used to implement net::StreamSocket.
34 int Read(const scoped_refptr
<net::IOBuffer
>& buffer
, int buffer_size
,
35 const net::CompletionCallback
& callback
);
36 int Write(const scoped_refptr
<net::IOBuffer
>& buffer
, int buffer_size
,
37 const net::CompletionCallback
& callback
);
38 int Connect(const net::CompletionCallback
& callback
);
40 // cricket::IPseudoTcpNotify interface.
41 // These notifications are triggered from NotifyPacket.
42 void OnTcpOpen(cricket::PseudoTcp
* tcp
) override
;
43 void OnTcpReadable(cricket::PseudoTcp
* tcp
) override
;
44 void OnTcpWriteable(cricket::PseudoTcp
* tcp
) override
;
45 // This is triggered by NotifyClock or NotifyPacket.
46 void OnTcpClosed(cricket::PseudoTcp
* tcp
, uint32 error
) override
;
47 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
48 WriteResult
TcpWritePacket(cricket::PseudoTcp
* tcp
,
52 void SetAckDelay(int delay_ms
);
53 void SetNoDelay(bool no_delay
);
54 void SetReceiveBufferSize(int32 size
);
55 void SetSendBufferSize(int32 size
);
56 void SetWriteWaitsForSend(bool write_waits_for_send
);
61 friend class base::RefCounted
<Core
>;
64 // These are invoked by the underlying Socket, and may trigger callbacks.
65 // They hold a reference to |this| while running, to protect from deletion.
66 void OnRead(int result
);
67 void OnWritten(int result
);
69 // These may trigger callbacks, so the holder must hold a reference on
70 // the stack while calling them.
71 void DoReadFromSocket();
72 void HandleReadResults(int result
);
73 void HandleTcpClock();
75 // Checks if current write has completed in the write-waits-for-send
77 void CheckWriteComplete();
79 // This re-sets |timer| without triggering callbacks.
82 net::CompletionCallback connect_callback_
;
83 net::CompletionCallback read_callback_
;
84 net::CompletionCallback write_callback_
;
86 cricket::PseudoTcp pseudo_tcp_
;
87 scoped_ptr
<P2PDatagramSocket
> socket_
;
89 scoped_refptr
<net::IOBuffer
> read_buffer_
;
90 int read_buffer_size_
;
91 scoped_refptr
<net::IOBuffer
> write_buffer_
;
92 int write_buffer_size_
;
94 // Whether we need to wait for data to be sent before completing write.
95 bool write_waits_for_send_
;
97 // Set to true in the write-waits-for-send mode when we've
98 // successfully writtend data to the send buffer and waiting for the
99 // data to be sent to the remote end.
100 bool waiting_write_position_
;
102 // Number of the bytes written by the last write stored while we wait
103 // for the data to be sent (i.e. when waiting_write_position_ = true).
104 int last_write_result_
;
106 bool socket_write_pending_
;
107 scoped_refptr
<net::IOBuffer
> socket_read_buffer_
;
109 base::OneShotTimer
<Core
> timer_
;
111 DISALLOW_COPY_AND_ASSIGN(Core
);
115 PseudoTcpAdapter::Core::Core(scoped_ptr
<P2PDatagramSocket
> socket
)
116 : pseudo_tcp_(this, 0),
117 socket_(socket
.Pass()),
118 write_waits_for_send_(false),
119 waiting_write_position_(false),
120 socket_write_pending_(false) {
121 // Doesn't trigger callbacks.
122 pseudo_tcp_
.NotifyMTU(kDefaultMtu
);
125 PseudoTcpAdapter::Core::~Core() {
128 int PseudoTcpAdapter::Core::Read(const scoped_refptr
<net::IOBuffer
>& buffer
,
130 const net::CompletionCallback
& callback
) {
131 DCHECK(read_callback_
.is_null());
133 // Reference the Core in case a callback deletes the adapter.
134 scoped_refptr
<Core
> core(this);
136 int result
= pseudo_tcp_
.Recv(buffer
->data(), buffer_size
);
138 result
= net::MapSystemError(pseudo_tcp_
.GetError());
142 if (result
== net::ERR_IO_PENDING
) {
143 read_buffer_
= buffer
;
144 read_buffer_size_
= buffer_size
;
145 read_callback_
= callback
;
153 int PseudoTcpAdapter::Core::Write(const scoped_refptr
<net::IOBuffer
>& buffer
,
155 const net::CompletionCallback
& callback
) {
156 DCHECK(write_callback_
.is_null());
158 // Reference the Core in case a callback deletes the adapter.
159 scoped_refptr
<Core
> core(this);
161 int result
= pseudo_tcp_
.Send(buffer
->data(), buffer_size
);
163 result
= net::MapSystemError(pseudo_tcp_
.GetError());
169 if (result
== net::ERR_IO_PENDING
) {
170 write_buffer_
= buffer
;
171 write_buffer_size_
= buffer_size
;
172 write_callback_
= callback
;
179 // Need to wait until the data is sent to the peer when
180 // send-confirmation mode is enabled.
181 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
182 DCHECK(!waiting_write_position_
);
183 waiting_write_position_
= true;
184 last_write_result_
= result
;
185 write_buffer_
= buffer
;
186 write_buffer_size_
= buffer_size
;
187 write_callback_
= callback
;
188 return net::ERR_IO_PENDING
;
194 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback
& callback
) {
195 DCHECK_EQ(pseudo_tcp_
.State(), cricket::PseudoTcp::TCP_LISTEN
);
197 // Reference the Core in case a callback deletes the adapter.
198 scoped_refptr
<Core
> core(this);
200 // Start the connection attempt.
201 int result
= pseudo_tcp_
.Connect();
203 return net::ERR_FAILED
;
207 connect_callback_
= callback
;
210 return net::ERR_IO_PENDING
;
213 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp
* tcp
) {
214 DCHECK(tcp
== &pseudo_tcp_
);
216 if (!connect_callback_
.is_null()) {
217 net::CompletionCallback callback
= connect_callback_
;
218 connect_callback_
.Reset();
219 callback
.Run(net::OK
);
226 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp
* tcp
) {
227 DCHECK_EQ(tcp
, &pseudo_tcp_
);
228 if (read_callback_
.is_null())
231 int result
= pseudo_tcp_
.Recv(read_buffer_
->data(), read_buffer_size_
);
233 result
= net::MapSystemError(pseudo_tcp_
.GetError());
235 if (result
== net::ERR_IO_PENDING
)
241 net::CompletionCallback callback
= read_callback_
;
242 read_callback_
.Reset();
244 callback
.Run(result
);
247 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp
* tcp
) {
248 DCHECK_EQ(tcp
, &pseudo_tcp_
);
249 if (write_callback_
.is_null())
252 if (waiting_write_position_
) {
253 CheckWriteComplete();
257 int result
= pseudo_tcp_
.Send(write_buffer_
->data(), write_buffer_size_
);
259 result
= net::MapSystemError(pseudo_tcp_
.GetError());
261 if (result
== net::ERR_IO_PENDING
)
267 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
268 DCHECK(!waiting_write_position_
);
269 waiting_write_position_
= true;
270 last_write_result_
= result
;
274 net::CompletionCallback callback
= write_callback_
;
275 write_callback_
.Reset();
276 write_buffer_
= NULL
;
277 callback
.Run(result
);
280 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp
* tcp
, uint32 error
) {
281 DCHECK_EQ(tcp
, &pseudo_tcp_
);
283 if (!connect_callback_
.is_null()) {
284 net::CompletionCallback callback
= connect_callback_
;
285 connect_callback_
.Reset();
286 callback
.Run(net::MapSystemError(error
));
289 if (!read_callback_
.is_null()) {
290 net::CompletionCallback callback
= read_callback_
;
291 read_callback_
.Reset();
292 callback
.Run(net::MapSystemError(error
));
295 if (!write_callback_
.is_null()) {
296 net::CompletionCallback callback
= write_callback_
;
297 write_callback_
.Reset();
298 callback
.Run(net::MapSystemError(error
));
302 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms
) {
303 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_ACKDELAY
, delay_ms
);
306 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay
) {
307 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_NODELAY
, no_delay
? 1 : 0);
310 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size
) {
311 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_RCVBUF
, size
);
314 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size
) {
315 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_SNDBUF
, size
);
318 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send
) {
319 write_waits_for_send_
= write_waits_for_send
;
322 void PseudoTcpAdapter::Core::DeleteSocket() {
323 // Don't dispatch outstanding callbacks when the socket is deleted.
324 read_callback_
.Reset();
326 write_callback_
.Reset();
327 write_buffer_
= NULL
;
328 connect_callback_
.Reset();
333 cricket::IPseudoTcpNotify::WriteResult
PseudoTcpAdapter::Core::TcpWritePacket(
337 DCHECK_EQ(tcp
, &pseudo_tcp_
);
339 // If we already have a write pending, we behave like a congested network,
340 // returning success for the write, but dropping the packet. PseudoTcp will
341 // back-off and retransmit, adjusting for the perceived congestion.
342 if (socket_write_pending_
)
343 return IPseudoTcpNotify::WR_SUCCESS
;
345 scoped_refptr
<net::IOBuffer
> write_buffer
= new net::IOBuffer(len
);
346 memcpy(write_buffer
->data(), buffer
, len
);
348 // Our underlying socket is datagram-oriented, which means it should either
349 // send exactly as many bytes as we requested, or fail.
352 result
= socket_
->Send(
353 write_buffer
.get(), len
,
354 base::Bind(&PseudoTcpAdapter::Core::OnWritten
, base::Unretained(this)));
356 result
= net::ERR_CONNECTION_CLOSED
;
358 if (result
== net::ERR_IO_PENDING
) {
359 socket_write_pending_
= true;
360 return IPseudoTcpNotify::WR_SUCCESS
;
361 } else if (result
== net::ERR_MSG_TOO_BIG
) {
362 return IPseudoTcpNotify::WR_TOO_LARGE
;
363 } else if (result
< 0) {
364 return IPseudoTcpNotify::WR_FAIL
;
366 return IPseudoTcpNotify::WR_SUCCESS
;
370 void PseudoTcpAdapter::Core::DoReadFromSocket() {
371 if (!socket_read_buffer_
.get())
372 socket_read_buffer_
= new net::IOBuffer(kReadBufferSize
);
375 while (socket_
&& result
> 0) {
376 result
= socket_
->Recv(
377 socket_read_buffer_
.get(), kReadBufferSize
,
378 base::Bind(&PseudoTcpAdapter::Core::OnRead
, base::Unretained(this)));
379 if (result
!= net::ERR_IO_PENDING
)
380 HandleReadResults(result
);
384 void PseudoTcpAdapter::Core::HandleReadResults(int result
) {
386 LOG(ERROR
) << "Read returned " << result
;
390 // TODO(wez): Disconnect on failure of NotifyPacket?
391 pseudo_tcp_
.NotifyPacket(socket_read_buffer_
->data(), result
);
394 CheckWriteComplete();
397 void PseudoTcpAdapter::Core::OnRead(int result
) {
398 // Reference the Core in case a callback deletes the adapter.
399 scoped_refptr
<Core
> core(this);
401 HandleReadResults(result
);
406 void PseudoTcpAdapter::Core::OnWritten(int result
) {
407 // Reference the Core in case a callback deletes the adapter.
408 scoped_refptr
<Core
> core(this);
410 socket_write_pending_
= false;
412 LOG(WARNING
) << "Write failed. Error code: " << result
;
416 void PseudoTcpAdapter::Core::AdjustClock() {
418 if (pseudo_tcp_
.GetNextClock(PseudoTcp::Now(), timeout
)) {
420 timer_
.Start(FROM_HERE
,
421 base::TimeDelta::FromMilliseconds(std::max(timeout
, 0L)), this,
422 &PseudoTcpAdapter::Core::HandleTcpClock
);
426 void PseudoTcpAdapter::Core::HandleTcpClock() {
427 // Reference the Core in case a callback deletes the adapter.
428 scoped_refptr
<Core
> core(this);
430 pseudo_tcp_
.NotifyClock(PseudoTcp::Now());
433 CheckWriteComplete();
436 void PseudoTcpAdapter::Core::CheckWriteComplete() {
437 if (!write_callback_
.is_null() && waiting_write_position_
) {
438 if (pseudo_tcp_
.GetBytesBufferedNotSent() == 0) {
439 waiting_write_position_
= false;
441 net::CompletionCallback callback
= write_callback_
;
442 write_callback_
.Reset();
443 write_buffer_
= NULL
;
444 callback
.Run(last_write_result_
);
449 // Public interface implementation.
451 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr
<P2PDatagramSocket
> socket
)
452 : core_(new Core(socket
.Pass())) {
455 PseudoTcpAdapter::~PseudoTcpAdapter() {
456 // Make sure that the underlying socket is destroyed before PseudoTcp.
457 core_
->DeleteSocket();
460 int PseudoTcpAdapter::Read(const scoped_refptr
<net::IOBuffer
>& buffer
,
462 const net::CompletionCallback
& callback
) {
463 DCHECK(CalledOnValidThread());
464 return core_
->Read(buffer
, buffer_size
, callback
);
467 int PseudoTcpAdapter::Write(const scoped_refptr
<net::IOBuffer
>& buffer
,
469 const net::CompletionCallback
& callback
) {
470 DCHECK(CalledOnValidThread());
471 return core_
->Write(buffer
, buffer_size
, callback
);
474 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size
) {
475 DCHECK(CalledOnValidThread());
476 core_
->SetReceiveBufferSize(size
);
480 int PseudoTcpAdapter::SetSendBufferSize(int32 size
) {
481 DCHECK(CalledOnValidThread());
482 core_
->SetSendBufferSize(size
);
486 int PseudoTcpAdapter::Connect(const net::CompletionCallback
& callback
) {
487 DCHECK(CalledOnValidThread());
488 return core_
->Connect(callback
);
491 void PseudoTcpAdapter::SetAckDelay(int delay_ms
) {
492 DCHECK(CalledOnValidThread());
493 core_
->SetAckDelay(delay_ms
);
496 void PseudoTcpAdapter::SetNoDelay(bool no_delay
) {
497 DCHECK(CalledOnValidThread());
498 core_
->SetNoDelay(no_delay
);
501 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send
) {
502 DCHECK(CalledOnValidThread());
503 core_
->SetWriteWaitsForSend(write_waits_for_send
);
506 } // namespace protocol
507 } // namespace remoting