Blink roll 18aa52da6706987b8d9242b1cba8fc929b74fcee:25b6bd3a7a131ffe68d809546ad1a2070...
[chromium-blink-merge.git] / jingle / glue / pseudotcp_adapter.cc
blob5a26f7d6034013c19679e022388dea493f0474bf
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h"
17 using cricket::PseudoTcp;
19 namespace {
20 const int kReadBufferSize = 65536; // Maximum size of a packet.
21 const uint16 kDefaultMtu = 1280;
22 } // namespace
24 namespace jingle_glue {
26 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
27 public base::RefCounted<Core> {
28 public:
29 explicit Core(net::Socket* socket);
31 // Functions used to implement net::StreamSocket.
32 int Read(net::IOBuffer* buffer, int buffer_size,
33 const net::CompletionCallback& callback);
34 int Write(net::IOBuffer* buffer, int buffer_size,
35 const net::CompletionCallback& callback);
36 int Connect(const net::CompletionCallback& callback);
37 void Disconnect();
38 bool IsConnected() const;
40 // cricket::IPseudoTcpNotify interface.
41 // These notifications are triggered from NotifyPacket.
42 void OnTcpOpen(cricket::PseudoTcp* tcp) override;
43 void OnTcpReadable(cricket::PseudoTcp* tcp) override;
44 void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
45 // This is triggered by NotifyClock or NotifyPacket.
46 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override;
47 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
48 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
49 const char* buffer,
50 size_t len) override;
52 void SetAckDelay(int delay_ms);
53 void SetNoDelay(bool no_delay);
54 void SetReceiveBufferSize(int32 size);
55 void SetSendBufferSize(int32 size);
56 void SetWriteWaitsForSend(bool write_waits_for_send);
58 void DeleteSocket();
60 private:
61 friend class base::RefCounted<Core>;
62 ~Core() override;
64 // These are invoked by the underlying Socket, and may trigger callbacks.
65 // They hold a reference to |this| while running, to protect from deletion.
66 void OnRead(int result);
67 void OnWritten(int result);
69 // These may trigger callbacks, so the holder must hold a reference on
70 // the stack while calling them.
71 void DoReadFromSocket();
72 void HandleReadResults(int result);
73 void HandleTcpClock();
75 // Checks if current write has completed in the write-waits-for-send
76 // mode.
77 void CheckWriteComplete();
79 // This re-sets |timer| without triggering callbacks.
80 void AdjustClock();
82 net::CompletionCallback connect_callback_;
83 net::CompletionCallback read_callback_;
84 net::CompletionCallback write_callback_;
86 cricket::PseudoTcp pseudo_tcp_;
87 scoped_ptr<net::Socket> socket_;
89 scoped_refptr<net::IOBuffer> read_buffer_;
90 int read_buffer_size_;
91 scoped_refptr<net::IOBuffer> write_buffer_;
92 int write_buffer_size_;
94 // Whether we need to wait for data to be sent before completing write.
95 bool write_waits_for_send_;
97 // Set to true in the write-waits-for-send mode when we've
98 // successfully writtend data to the send buffer and waiting for the
99 // data to be sent to the remote end.
100 bool waiting_write_position_;
102 // Number of the bytes written by the last write stored while we wait
103 // for the data to be sent (i.e. when waiting_write_position_ = true).
104 int last_write_result_;
106 bool socket_write_pending_;
107 scoped_refptr<net::IOBuffer> socket_read_buffer_;
109 base::OneShotTimer<Core> timer_;
111 DISALLOW_COPY_AND_ASSIGN(Core);
115 PseudoTcpAdapter::Core::Core(net::Socket* socket)
116 : pseudo_tcp_(this, 0),
117 socket_(socket),
118 write_waits_for_send_(false),
119 waiting_write_position_(false),
120 socket_write_pending_(false) {
121 // Doesn't trigger callbacks.
122 pseudo_tcp_.NotifyMTU(kDefaultMtu);
125 PseudoTcpAdapter::Core::~Core() {
128 int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
129 const net::CompletionCallback& callback) {
130 DCHECK(read_callback_.is_null());
132 // Reference the Core in case a callback deletes the adapter.
133 scoped_refptr<Core> core(this);
135 int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
136 if (result < 0) {
137 result = net::MapSystemError(pseudo_tcp_.GetError());
138 DCHECK(result < 0);
141 if (result == net::ERR_IO_PENDING) {
142 read_buffer_ = buffer;
143 read_buffer_size_ = buffer_size;
144 read_callback_ = callback;
147 AdjustClock();
149 return result;
152 int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
153 const net::CompletionCallback& callback) {
154 DCHECK(write_callback_.is_null());
156 // Reference the Core in case a callback deletes the adapter.
157 scoped_refptr<Core> core(this);
159 int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
160 if (result < 0) {
161 result = net::MapSystemError(pseudo_tcp_.GetError());
162 DCHECK(result < 0);
165 AdjustClock();
167 if (result == net::ERR_IO_PENDING) {
168 write_buffer_ = buffer;
169 write_buffer_size_ = buffer_size;
170 write_callback_ = callback;
171 return result;
174 if (result < 0)
175 return result;
177 // Need to wait until the data is sent to the peer when
178 // send-confirmation mode is enabled.
179 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
180 DCHECK(!waiting_write_position_);
181 waiting_write_position_ = true;
182 last_write_result_ = result;
183 write_buffer_ = buffer;
184 write_buffer_size_ = buffer_size;
185 write_callback_ = callback;
186 return net::ERR_IO_PENDING;
189 return result;
192 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
193 DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
195 // Reference the Core in case a callback deletes the adapter.
196 scoped_refptr<Core> core(this);
198 // Start the connection attempt.
199 int result = pseudo_tcp_.Connect();
200 if (result < 0)
201 return net::ERR_FAILED;
203 AdjustClock();
205 connect_callback_ = callback;
206 DoReadFromSocket();
208 return net::ERR_IO_PENDING;
211 void PseudoTcpAdapter::Core::Disconnect() {
212 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
213 read_callback_.Reset();
214 read_buffer_ = NULL;
215 write_callback_.Reset();
216 write_buffer_ = NULL;
217 connect_callback_.Reset();
219 // TODO(wez): Connect should succeed if called after Disconnect, which
220 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
221 // and create a new one in Connect.
222 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
223 // effect. This should be addressed in PseudoTcp, really.
224 // In the meantime we can fake OnTcpClosed notification and tear down the
225 // PseudoTcp.
226 pseudo_tcp_.Close(true);
229 bool PseudoTcpAdapter::Core::IsConnected() const {
230 return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
233 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
234 DCHECK(tcp == &pseudo_tcp_);
236 if (!connect_callback_.is_null()) {
237 net::CompletionCallback callback = connect_callback_;
238 connect_callback_.Reset();
239 callback.Run(net::OK);
242 OnTcpReadable(tcp);
243 OnTcpWriteable(tcp);
246 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
247 DCHECK_EQ(tcp, &pseudo_tcp_);
248 if (read_callback_.is_null())
249 return;
251 int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
252 if (result < 0) {
253 result = net::MapSystemError(pseudo_tcp_.GetError());
254 DCHECK(result < 0);
255 if (result == net::ERR_IO_PENDING)
256 return;
259 AdjustClock();
261 net::CompletionCallback callback = read_callback_;
262 read_callback_.Reset();
263 read_buffer_ = NULL;
264 callback.Run(result);
267 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
268 DCHECK_EQ(tcp, &pseudo_tcp_);
269 if (write_callback_.is_null())
270 return;
272 if (waiting_write_position_) {
273 CheckWriteComplete();
274 return;
277 int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
278 if (result < 0) {
279 result = net::MapSystemError(pseudo_tcp_.GetError());
280 DCHECK(result < 0);
281 if (result == net::ERR_IO_PENDING)
282 return;
285 AdjustClock();
287 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
288 DCHECK(!waiting_write_position_);
289 waiting_write_position_ = true;
290 last_write_result_ = result;
291 return;
294 net::CompletionCallback callback = write_callback_;
295 write_callback_.Reset();
296 write_buffer_ = NULL;
297 callback.Run(result);
300 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
301 DCHECK_EQ(tcp, &pseudo_tcp_);
303 if (!connect_callback_.is_null()) {
304 net::CompletionCallback callback = connect_callback_;
305 connect_callback_.Reset();
306 callback.Run(net::MapSystemError(error));
309 if (!read_callback_.is_null()) {
310 net::CompletionCallback callback = read_callback_;
311 read_callback_.Reset();
312 callback.Run(net::MapSystemError(error));
315 if (!write_callback_.is_null()) {
316 net::CompletionCallback callback = write_callback_;
317 write_callback_.Reset();
318 callback.Run(net::MapSystemError(error));
322 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
323 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
326 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
327 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
330 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
331 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
334 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
335 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
338 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
339 write_waits_for_send_ = write_waits_for_send;
342 void PseudoTcpAdapter::Core::DeleteSocket() {
343 socket_.reset();
346 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
347 PseudoTcp* tcp,
348 const char* buffer,
349 size_t len) {
350 DCHECK_EQ(tcp, &pseudo_tcp_);
352 // If we already have a write pending, we behave like a congested network,
353 // returning success for the write, but dropping the packet. PseudoTcp will
354 // back-off and retransmit, adjusting for the perceived congestion.
355 if (socket_write_pending_)
356 return IPseudoTcpNotify::WR_SUCCESS;
358 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
359 memcpy(write_buffer->data(), buffer, len);
361 // Our underlying socket is datagram-oriented, which means it should either
362 // send exactly as many bytes as we requested, or fail.
363 int result;
364 if (socket_.get()) {
365 result = socket_->Write(
366 write_buffer.get(),
367 len,
368 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
369 } else {
370 result = net::ERR_CONNECTION_CLOSED;
372 if (result == net::ERR_IO_PENDING) {
373 socket_write_pending_ = true;
374 return IPseudoTcpNotify::WR_SUCCESS;
375 } else if (result == net::ERR_MSG_TOO_BIG) {
376 return IPseudoTcpNotify::WR_TOO_LARGE;
377 } else if (result < 0) {
378 return IPseudoTcpNotify::WR_FAIL;
379 } else {
380 return IPseudoTcpNotify::WR_SUCCESS;
384 void PseudoTcpAdapter::Core::DoReadFromSocket() {
385 if (!socket_read_buffer_.get())
386 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
388 int result = 1;
389 while (socket_.get() && result > 0) {
390 result = socket_->Read(
391 socket_read_buffer_.get(),
392 kReadBufferSize,
393 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
394 if (result != net::ERR_IO_PENDING)
395 HandleReadResults(result);
399 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
400 if (result <= 0) {
401 LOG(ERROR) << "Read returned " << result;
402 return;
405 // TODO(wez): Disconnect on failure of NotifyPacket?
406 pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
407 AdjustClock();
409 CheckWriteComplete();
412 void PseudoTcpAdapter::Core::OnRead(int result) {
413 // Reference the Core in case a callback deletes the adapter.
414 scoped_refptr<Core> core(this);
416 HandleReadResults(result);
417 if (result >= 0)
418 DoReadFromSocket();
421 void PseudoTcpAdapter::Core::OnWritten(int result) {
422 // Reference the Core in case a callback deletes the adapter.
423 scoped_refptr<Core> core(this);
425 socket_write_pending_ = false;
426 if (result < 0) {
427 LOG(WARNING) << "Write failed. Error code: " << result;
431 void PseudoTcpAdapter::Core::AdjustClock() {
432 long timeout = 0;
433 if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
434 timer_.Stop();
435 timer_.Start(FROM_HERE,
436 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
437 &PseudoTcpAdapter::Core::HandleTcpClock);
441 void PseudoTcpAdapter::Core::HandleTcpClock() {
442 // Reference the Core in case a callback deletes the adapter.
443 scoped_refptr<Core> core(this);
445 pseudo_tcp_.NotifyClock(PseudoTcp::Now());
446 AdjustClock();
448 CheckWriteComplete();
451 void PseudoTcpAdapter::Core::CheckWriteComplete() {
452 if (!write_callback_.is_null() && waiting_write_position_) {
453 if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
454 waiting_write_position_ = false;
456 net::CompletionCallback callback = write_callback_;
457 write_callback_.Reset();
458 write_buffer_ = NULL;
459 callback.Run(last_write_result_);
464 // Public interface implemention.
466 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
467 : core_(new Core(socket)) {
470 PseudoTcpAdapter::~PseudoTcpAdapter() {
471 Disconnect();
473 // Make sure that the underlying socket is destroyed before PseudoTcp.
474 core_->DeleteSocket();
477 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
478 const net::CompletionCallback& callback) {
479 DCHECK(CalledOnValidThread());
480 return core_->Read(buffer, buffer_size, callback);
483 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
484 const net::CompletionCallback& callback) {
485 DCHECK(CalledOnValidThread());
486 return core_->Write(buffer, buffer_size, callback);
489 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
490 DCHECK(CalledOnValidThread());
492 core_->SetReceiveBufferSize(size);
493 return net::OK;
496 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
497 DCHECK(CalledOnValidThread());
499 core_->SetSendBufferSize(size);
500 return net::OK;
503 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
504 DCHECK(CalledOnValidThread());
506 // net::StreamSocket requires that Connect return OK if already connected.
507 if (IsConnected())
508 return net::OK;
510 return core_->Connect(callback);
513 void PseudoTcpAdapter::Disconnect() {
514 DCHECK(CalledOnValidThread());
515 core_->Disconnect();
518 bool PseudoTcpAdapter::IsConnected() const {
519 return core_->IsConnected();
522 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
523 DCHECK(CalledOnValidThread());
524 NOTIMPLEMENTED();
525 return false;
528 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
529 DCHECK(CalledOnValidThread());
531 // We don't have a meaningful peer address, but we can't return an
532 // error, so we return a INADDR_ANY instead.
533 net::IPAddressNumber ip_address(net::kIPv4AddressSize);
534 *address = net::IPEndPoint(ip_address, 0);
535 return net::OK;
538 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
539 DCHECK(CalledOnValidThread());
540 NOTIMPLEMENTED();
541 return net::ERR_FAILED;
544 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
545 DCHECK(CalledOnValidThread());
546 return net_log_;
549 void PseudoTcpAdapter::SetSubresourceSpeculation() {
550 DCHECK(CalledOnValidThread());
551 NOTIMPLEMENTED();
554 void PseudoTcpAdapter::SetOmniboxSpeculation() {
555 DCHECK(CalledOnValidThread());
556 NOTIMPLEMENTED();
559 bool PseudoTcpAdapter::WasEverUsed() const {
560 DCHECK(CalledOnValidThread());
561 NOTIMPLEMENTED();
562 return true;
565 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
566 DCHECK(CalledOnValidThread());
567 return false;
570 bool PseudoTcpAdapter::WasNpnNegotiated() const {
571 DCHECK(CalledOnValidThread());
572 return false;
575 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
576 DCHECK(CalledOnValidThread());
577 return net::kProtoUnknown;
580 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
581 DCHECK(CalledOnValidThread());
582 return false;
585 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
586 DCHECK(CalledOnValidThread());
587 core_->SetAckDelay(delay_ms);
590 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
591 DCHECK(CalledOnValidThread());
592 core_->SetNoDelay(no_delay);
595 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
596 DCHECK(CalledOnValidThread());
597 core_->SetWriteWaitsForSend(write_waits_for_send);
600 } // namespace jingle_glue