1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h"
17 using cricket::PseudoTcp
;
20 const int kReadBufferSize
= 65536; // Maximum size of a packet.
21 const uint16 kDefaultMtu
= 1280;
24 namespace jingle_glue
{
26 class PseudoTcpAdapter::Core
: public cricket::IPseudoTcpNotify
,
27 public base::RefCounted
<Core
> {
29 explicit Core(net::Socket
* socket
);
31 // Functions used to implement net::StreamSocket.
32 int Read(net::IOBuffer
* buffer
, int buffer_size
,
33 const net::CompletionCallback
& callback
);
34 int Write(net::IOBuffer
* buffer
, int buffer_size
,
35 const net::CompletionCallback
& callback
);
36 int Connect(const net::CompletionCallback
& callback
);
38 bool IsConnected() const;
40 // cricket::IPseudoTcpNotify interface.
41 // These notifications are triggered from NotifyPacket.
42 void OnTcpOpen(cricket::PseudoTcp
* tcp
) override
;
43 void OnTcpReadable(cricket::PseudoTcp
* tcp
) override
;
44 void OnTcpWriteable(cricket::PseudoTcp
* tcp
) override
;
45 // This is triggered by NotifyClock or NotifyPacket.
46 void OnTcpClosed(cricket::PseudoTcp
* tcp
, uint32 error
) override
;
47 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
48 WriteResult
TcpWritePacket(cricket::PseudoTcp
* tcp
,
52 void SetAckDelay(int delay_ms
);
53 void SetNoDelay(bool no_delay
);
54 void SetReceiveBufferSize(int32 size
);
55 void SetSendBufferSize(int32 size
);
56 void SetWriteWaitsForSend(bool write_waits_for_send
);
61 friend class base::RefCounted
<Core
>;
64 // These are invoked by the underlying Socket, and may trigger callbacks.
65 // They hold a reference to |this| while running, to protect from deletion.
66 void OnRead(int result
);
67 void OnWritten(int result
);
69 // These may trigger callbacks, so the holder must hold a reference on
70 // the stack while calling them.
71 void DoReadFromSocket();
72 void HandleReadResults(int result
);
73 void HandleTcpClock();
75 // Checks if current write has completed in the write-waits-for-send
77 void CheckWriteComplete();
79 // This re-sets |timer| without triggering callbacks.
82 net::CompletionCallback connect_callback_
;
83 net::CompletionCallback read_callback_
;
84 net::CompletionCallback write_callback_
;
86 cricket::PseudoTcp pseudo_tcp_
;
87 scoped_ptr
<net::Socket
> socket_
;
89 scoped_refptr
<net::IOBuffer
> read_buffer_
;
90 int read_buffer_size_
;
91 scoped_refptr
<net::IOBuffer
> write_buffer_
;
92 int write_buffer_size_
;
94 // Whether we need to wait for data to be sent before completing write.
95 bool write_waits_for_send_
;
97 // Set to true in the write-waits-for-send mode when we've
98 // successfully writtend data to the send buffer and waiting for the
99 // data to be sent to the remote end.
100 bool waiting_write_position_
;
102 // Number of the bytes written by the last write stored while we wait
103 // for the data to be sent (i.e. when waiting_write_position_ = true).
104 int last_write_result_
;
106 bool socket_write_pending_
;
107 scoped_refptr
<net::IOBuffer
> socket_read_buffer_
;
109 base::OneShotTimer
<Core
> timer_
;
111 DISALLOW_COPY_AND_ASSIGN(Core
);
115 PseudoTcpAdapter::Core::Core(net::Socket
* socket
)
116 : pseudo_tcp_(this, 0),
118 write_waits_for_send_(false),
119 waiting_write_position_(false),
120 socket_write_pending_(false) {
121 // Doesn't trigger callbacks.
122 pseudo_tcp_
.NotifyMTU(kDefaultMtu
);
125 PseudoTcpAdapter::Core::~Core() {
128 int PseudoTcpAdapter::Core::Read(net::IOBuffer
* buffer
, int buffer_size
,
129 const net::CompletionCallback
& callback
) {
130 DCHECK(read_callback_
.is_null());
132 // Reference the Core in case a callback deletes the adapter.
133 scoped_refptr
<Core
> core(this);
135 int result
= pseudo_tcp_
.Recv(buffer
->data(), buffer_size
);
137 result
= net::MapSystemError(pseudo_tcp_
.GetError());
141 if (result
== net::ERR_IO_PENDING
) {
142 read_buffer_
= buffer
;
143 read_buffer_size_
= buffer_size
;
144 read_callback_
= callback
;
152 int PseudoTcpAdapter::Core::Write(net::IOBuffer
* buffer
, int buffer_size
,
153 const net::CompletionCallback
& callback
) {
154 DCHECK(write_callback_
.is_null());
156 // Reference the Core in case a callback deletes the adapter.
157 scoped_refptr
<Core
> core(this);
159 int result
= pseudo_tcp_
.Send(buffer
->data(), buffer_size
);
161 result
= net::MapSystemError(pseudo_tcp_
.GetError());
167 if (result
== net::ERR_IO_PENDING
) {
168 write_buffer_
= buffer
;
169 write_buffer_size_
= buffer_size
;
170 write_callback_
= callback
;
177 // Need to wait until the data is sent to the peer when
178 // send-confirmation mode is enabled.
179 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
180 DCHECK(!waiting_write_position_
);
181 waiting_write_position_
= true;
182 last_write_result_
= result
;
183 write_buffer_
= buffer
;
184 write_buffer_size_
= buffer_size
;
185 write_callback_
= callback
;
186 return net::ERR_IO_PENDING
;
192 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback
& callback
) {
193 DCHECK_EQ(pseudo_tcp_
.State(), cricket::PseudoTcp::TCP_LISTEN
);
195 // Reference the Core in case a callback deletes the adapter.
196 scoped_refptr
<Core
> core(this);
198 // Start the connection attempt.
199 int result
= pseudo_tcp_
.Connect();
201 return net::ERR_FAILED
;
205 connect_callback_
= callback
;
208 return net::ERR_IO_PENDING
;
211 void PseudoTcpAdapter::Core::Disconnect() {
212 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
213 read_callback_
.Reset();
215 write_callback_
.Reset();
216 write_buffer_
= NULL
;
217 connect_callback_
.Reset();
219 // TODO(wez): Connect should succeed if called after Disconnect, which
220 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
221 // and create a new one in Connect.
222 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
223 // effect. This should be addressed in PseudoTcp, really.
224 // In the meantime we can fake OnTcpClosed notification and tear down the
226 pseudo_tcp_
.Close(true);
229 bool PseudoTcpAdapter::Core::IsConnected() const {
230 return pseudo_tcp_
.State() == PseudoTcp::TCP_ESTABLISHED
;
233 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp
* tcp
) {
234 DCHECK(tcp
== &pseudo_tcp_
);
236 if (!connect_callback_
.is_null()) {
237 net::CompletionCallback callback
= connect_callback_
;
238 connect_callback_
.Reset();
239 callback
.Run(net::OK
);
246 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp
* tcp
) {
247 DCHECK_EQ(tcp
, &pseudo_tcp_
);
248 if (read_callback_
.is_null())
251 int result
= pseudo_tcp_
.Recv(read_buffer_
->data(), read_buffer_size_
);
253 result
= net::MapSystemError(pseudo_tcp_
.GetError());
255 if (result
== net::ERR_IO_PENDING
)
261 net::CompletionCallback callback
= read_callback_
;
262 read_callback_
.Reset();
264 callback
.Run(result
);
267 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp
* tcp
) {
268 DCHECK_EQ(tcp
, &pseudo_tcp_
);
269 if (write_callback_
.is_null())
272 if (waiting_write_position_
) {
273 CheckWriteComplete();
277 int result
= pseudo_tcp_
.Send(write_buffer_
->data(), write_buffer_size_
);
279 result
= net::MapSystemError(pseudo_tcp_
.GetError());
281 if (result
== net::ERR_IO_PENDING
)
287 if (write_waits_for_send_
&& pseudo_tcp_
.GetBytesBufferedNotSent() > 0) {
288 DCHECK(!waiting_write_position_
);
289 waiting_write_position_
= true;
290 last_write_result_
= result
;
294 net::CompletionCallback callback
= write_callback_
;
295 write_callback_
.Reset();
296 write_buffer_
= NULL
;
297 callback
.Run(result
);
300 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp
* tcp
, uint32 error
) {
301 DCHECK_EQ(tcp
, &pseudo_tcp_
);
303 if (!connect_callback_
.is_null()) {
304 net::CompletionCallback callback
= connect_callback_
;
305 connect_callback_
.Reset();
306 callback
.Run(net::MapSystemError(error
));
309 if (!read_callback_
.is_null()) {
310 net::CompletionCallback callback
= read_callback_
;
311 read_callback_
.Reset();
312 callback
.Run(net::MapSystemError(error
));
315 if (!write_callback_
.is_null()) {
316 net::CompletionCallback callback
= write_callback_
;
317 write_callback_
.Reset();
318 callback
.Run(net::MapSystemError(error
));
322 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms
) {
323 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_ACKDELAY
, delay_ms
);
326 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay
) {
327 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_NODELAY
, no_delay
? 1 : 0);
330 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size
) {
331 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_RCVBUF
, size
);
334 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size
) {
335 pseudo_tcp_
.SetOption(cricket::PseudoTcp::OPT_SNDBUF
, size
);
338 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send
) {
339 write_waits_for_send_
= write_waits_for_send
;
342 void PseudoTcpAdapter::Core::DeleteSocket() {
346 cricket::IPseudoTcpNotify::WriteResult
PseudoTcpAdapter::Core::TcpWritePacket(
350 DCHECK_EQ(tcp
, &pseudo_tcp_
);
352 // If we already have a write pending, we behave like a congested network,
353 // returning success for the write, but dropping the packet. PseudoTcp will
354 // back-off and retransmit, adjusting for the perceived congestion.
355 if (socket_write_pending_
)
356 return IPseudoTcpNotify::WR_SUCCESS
;
358 scoped_refptr
<net::IOBuffer
> write_buffer
= new net::IOBuffer(len
);
359 memcpy(write_buffer
->data(), buffer
, len
);
361 // Our underlying socket is datagram-oriented, which means it should either
362 // send exactly as many bytes as we requested, or fail.
365 result
= socket_
->Write(
368 base::Bind(&PseudoTcpAdapter::Core::OnWritten
, base::Unretained(this)));
370 result
= net::ERR_CONNECTION_CLOSED
;
372 if (result
== net::ERR_IO_PENDING
) {
373 socket_write_pending_
= true;
374 return IPseudoTcpNotify::WR_SUCCESS
;
375 } else if (result
== net::ERR_MSG_TOO_BIG
) {
376 return IPseudoTcpNotify::WR_TOO_LARGE
;
377 } else if (result
< 0) {
378 return IPseudoTcpNotify::WR_FAIL
;
380 return IPseudoTcpNotify::WR_SUCCESS
;
384 void PseudoTcpAdapter::Core::DoReadFromSocket() {
385 if (!socket_read_buffer_
.get())
386 socket_read_buffer_
= new net::IOBuffer(kReadBufferSize
);
389 while (socket_
.get() && result
> 0) {
390 result
= socket_
->Read(
391 socket_read_buffer_
.get(),
393 base::Bind(&PseudoTcpAdapter::Core::OnRead
, base::Unretained(this)));
394 if (result
!= net::ERR_IO_PENDING
)
395 HandleReadResults(result
);
399 void PseudoTcpAdapter::Core::HandleReadResults(int result
) {
401 LOG(ERROR
) << "Read returned " << result
;
405 // TODO(wez): Disconnect on failure of NotifyPacket?
406 pseudo_tcp_
.NotifyPacket(socket_read_buffer_
->data(), result
);
409 CheckWriteComplete();
412 void PseudoTcpAdapter::Core::OnRead(int result
) {
413 // Reference the Core in case a callback deletes the adapter.
414 scoped_refptr
<Core
> core(this);
416 HandleReadResults(result
);
421 void PseudoTcpAdapter::Core::OnWritten(int result
) {
422 // Reference the Core in case a callback deletes the adapter.
423 scoped_refptr
<Core
> core(this);
425 socket_write_pending_
= false;
427 LOG(WARNING
) << "Write failed. Error code: " << result
;
431 void PseudoTcpAdapter::Core::AdjustClock() {
433 if (pseudo_tcp_
.GetNextClock(PseudoTcp::Now(), timeout
)) {
435 timer_
.Start(FROM_HERE
,
436 base::TimeDelta::FromMilliseconds(std::max(timeout
, 0L)), this,
437 &PseudoTcpAdapter::Core::HandleTcpClock
);
441 void PseudoTcpAdapter::Core::HandleTcpClock() {
442 // Reference the Core in case a callback deletes the adapter.
443 scoped_refptr
<Core
> core(this);
445 pseudo_tcp_
.NotifyClock(PseudoTcp::Now());
448 CheckWriteComplete();
451 void PseudoTcpAdapter::Core::CheckWriteComplete() {
452 if (!write_callback_
.is_null() && waiting_write_position_
) {
453 if (pseudo_tcp_
.GetBytesBufferedNotSent() == 0) {
454 waiting_write_position_
= false;
456 net::CompletionCallback callback
= write_callback_
;
457 write_callback_
.Reset();
458 write_buffer_
= NULL
;
459 callback
.Run(last_write_result_
);
464 // Public interface implemention.
466 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket
* socket
)
467 : core_(new Core(socket
)) {
470 PseudoTcpAdapter::~PseudoTcpAdapter() {
473 // Make sure that the underlying socket is destroyed before PseudoTcp.
474 core_
->DeleteSocket();
477 int PseudoTcpAdapter::Read(net::IOBuffer
* buffer
, int buffer_size
,
478 const net::CompletionCallback
& callback
) {
479 DCHECK(CalledOnValidThread());
480 return core_
->Read(buffer
, buffer_size
, callback
);
483 int PseudoTcpAdapter::Write(net::IOBuffer
* buffer
, int buffer_size
,
484 const net::CompletionCallback
& callback
) {
485 DCHECK(CalledOnValidThread());
486 return core_
->Write(buffer
, buffer_size
, callback
);
489 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size
) {
490 DCHECK(CalledOnValidThread());
492 core_
->SetReceiveBufferSize(size
);
496 int PseudoTcpAdapter::SetSendBufferSize(int32 size
) {
497 DCHECK(CalledOnValidThread());
499 core_
->SetSendBufferSize(size
);
503 int PseudoTcpAdapter::Connect(const net::CompletionCallback
& callback
) {
504 DCHECK(CalledOnValidThread());
506 // net::StreamSocket requires that Connect return OK if already connected.
510 return core_
->Connect(callback
);
513 void PseudoTcpAdapter::Disconnect() {
514 DCHECK(CalledOnValidThread());
518 bool PseudoTcpAdapter::IsConnected() const {
519 return core_
->IsConnected();
522 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
523 DCHECK(CalledOnValidThread());
528 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint
* address
) const {
529 DCHECK(CalledOnValidThread());
531 // We don't have a meaningful peer address, but we can't return an
532 // error, so we return a INADDR_ANY instead.
533 net::IPAddressNumber
ip_address(net::kIPv4AddressSize
);
534 *address
= net::IPEndPoint(ip_address
, 0);
538 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint
* address
) const {
539 DCHECK(CalledOnValidThread());
541 return net::ERR_FAILED
;
544 const net::BoundNetLog
& PseudoTcpAdapter::NetLog() const {
545 DCHECK(CalledOnValidThread());
549 void PseudoTcpAdapter::SetSubresourceSpeculation() {
550 DCHECK(CalledOnValidThread());
554 void PseudoTcpAdapter::SetOmniboxSpeculation() {
555 DCHECK(CalledOnValidThread());
559 bool PseudoTcpAdapter::WasEverUsed() const {
560 DCHECK(CalledOnValidThread());
565 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
566 DCHECK(CalledOnValidThread());
570 bool PseudoTcpAdapter::WasNpnNegotiated() const {
571 DCHECK(CalledOnValidThread());
575 net::NextProto
PseudoTcpAdapter::GetNegotiatedProtocol() const {
576 DCHECK(CalledOnValidThread());
577 return net::kProtoUnknown
;
580 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo
* ssl_info
) {
581 DCHECK(CalledOnValidThread());
585 void PseudoTcpAdapter::SetAckDelay(int delay_ms
) {
586 DCHECK(CalledOnValidThread());
587 core_
->SetAckDelay(delay_ms
);
590 void PseudoTcpAdapter::SetNoDelay(bool no_delay
) {
591 DCHECK(CalledOnValidThread());
592 core_
->SetNoDelay(no_delay
);
595 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send
) {
596 DCHECK(CalledOnValidThread());
597 core_
->SetWriteWaitsForSend(write_waits_for_send
);
600 } // namespace jingle_glue