Revert 271602 "Implementation of leveldb-backed PrefStore."
[chromium-blink-merge.git] / jingle / glue / pseudotcp_adapter.cc
blobcdb44a050e671c2c829d46b95fdabdd0768dd8ec
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "jingle/glue/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "net/base/address_list.h"
11 #include "net/base/completion_callback.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/net_util.h"
16 using cricket::PseudoTcp;
18 namespace {
19 const int kReadBufferSize = 65536; // Maximum size of a packet.
20 const uint16 kDefaultMtu = 1280;
21 } // namespace
23 namespace jingle_glue {
25 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
26 public base::RefCounted<Core> {
27 public:
28 Core(net::Socket* socket);
30 // Functions used to implement net::StreamSocket.
31 int Read(net::IOBuffer* buffer, int buffer_size,
32 const net::CompletionCallback& callback);
33 int Write(net::IOBuffer* buffer, int buffer_size,
34 const net::CompletionCallback& callback);
35 int Connect(const net::CompletionCallback& callback);
36 void Disconnect();
37 bool IsConnected() const;
39 // cricket::IPseudoTcpNotify interface.
40 // These notifications are triggered from NotifyPacket.
41 virtual void OnTcpOpen(cricket::PseudoTcp* tcp) OVERRIDE;
42 virtual void OnTcpReadable(cricket::PseudoTcp* tcp) OVERRIDE;
43 virtual void OnTcpWriteable(cricket::PseudoTcp* tcp) OVERRIDE;
44 // This is triggered by NotifyClock or NotifyPacket.
45 virtual void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) OVERRIDE;
46 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
47 virtual WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
48 const char* buffer, size_t len) OVERRIDE;
50 void SetAckDelay(int delay_ms);
51 void SetNoDelay(bool no_delay);
52 void SetReceiveBufferSize(int32 size);
53 void SetSendBufferSize(int32 size);
54 void SetWriteWaitsForSend(bool write_waits_for_send);
56 void DeleteSocket();
58 private:
59 friend class base::RefCounted<Core>;
60 virtual ~Core();
62 // These are invoked by the underlying Socket, and may trigger callbacks.
63 // They hold a reference to |this| while running, to protect from deletion.
64 void OnRead(int result);
65 void OnWritten(int result);
67 // These may trigger callbacks, so the holder must hold a reference on
68 // the stack while calling them.
69 void DoReadFromSocket();
70 void HandleReadResults(int result);
71 void HandleTcpClock();
73 // Checks if current write has completed in the write-waits-for-send
74 // mode.
75 void CheckWriteComplete();
77 // This re-sets |timer| without triggering callbacks.
78 void AdjustClock();
80 net::CompletionCallback connect_callback_;
81 net::CompletionCallback read_callback_;
82 net::CompletionCallback write_callback_;
84 cricket::PseudoTcp pseudo_tcp_;
85 scoped_ptr<net::Socket> socket_;
87 scoped_refptr<net::IOBuffer> read_buffer_;
88 int read_buffer_size_;
89 scoped_refptr<net::IOBuffer> write_buffer_;
90 int write_buffer_size_;
92 // Whether we need to wait for data to be sent before completing write.
93 bool write_waits_for_send_;
95 // Set to true in the write-waits-for-send mode when we've
96 // successfully writtend data to the send buffer and waiting for the
97 // data to be sent to the remote end.
98 bool waiting_write_position_;
100 // Number of the bytes written by the last write stored while we wait
101 // for the data to be sent (i.e. when waiting_write_position_ = true).
102 int last_write_result_;
104 bool socket_write_pending_;
105 scoped_refptr<net::IOBuffer> socket_read_buffer_;
107 base::OneShotTimer<Core> timer_;
109 DISALLOW_COPY_AND_ASSIGN(Core);
113 PseudoTcpAdapter::Core::Core(net::Socket* socket)
114 : pseudo_tcp_(this, 0),
115 socket_(socket),
116 write_waits_for_send_(false),
117 waiting_write_position_(false),
118 socket_write_pending_(false) {
119 // Doesn't trigger callbacks.
120 pseudo_tcp_.NotifyMTU(kDefaultMtu);
123 PseudoTcpAdapter::Core::~Core() {
126 int PseudoTcpAdapter::Core::Read(net::IOBuffer* buffer, int buffer_size,
127 const net::CompletionCallback& callback) {
128 DCHECK(read_callback_.is_null());
130 // Reference the Core in case a callback deletes the adapter.
131 scoped_refptr<Core> core(this);
133 int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
134 if (result < 0) {
135 result = net::MapSystemError(pseudo_tcp_.GetError());
136 DCHECK(result < 0);
139 if (result == net::ERR_IO_PENDING) {
140 read_buffer_ = buffer;
141 read_buffer_size_ = buffer_size;
142 read_callback_ = callback;
145 AdjustClock();
147 return result;
150 int PseudoTcpAdapter::Core::Write(net::IOBuffer* buffer, int buffer_size,
151 const net::CompletionCallback& callback) {
152 DCHECK(write_callback_.is_null());
154 // Reference the Core in case a callback deletes the adapter.
155 scoped_refptr<Core> core(this);
157 int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
158 if (result < 0) {
159 result = net::MapSystemError(pseudo_tcp_.GetError());
160 DCHECK(result < 0);
163 AdjustClock();
165 if (result == net::ERR_IO_PENDING) {
166 write_buffer_ = buffer;
167 write_buffer_size_ = buffer_size;
168 write_callback_ = callback;
169 return result;
172 if (result < 0)
173 return result;
175 // Need to wait until the data is sent to the peer when
176 // send-confirmation mode is enabled.
177 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
178 DCHECK(!waiting_write_position_);
179 waiting_write_position_ = true;
180 last_write_result_ = result;
181 write_buffer_ = buffer;
182 write_buffer_size_ = buffer_size;
183 write_callback_ = callback;
184 return net::ERR_IO_PENDING;
187 return result;
190 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
191 DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
193 // Reference the Core in case a callback deletes the adapter.
194 scoped_refptr<Core> core(this);
196 // Start the connection attempt.
197 int result = pseudo_tcp_.Connect();
198 if (result < 0)
199 return net::ERR_FAILED;
201 AdjustClock();
203 connect_callback_ = callback;
204 DoReadFromSocket();
206 return net::ERR_IO_PENDING;
209 void PseudoTcpAdapter::Core::Disconnect() {
210 // Don't dispatch outstanding callbacks, as mandated by net::StreamSocket.
211 read_callback_.Reset();
212 read_buffer_ = NULL;
213 write_callback_.Reset();
214 write_buffer_ = NULL;
215 connect_callback_.Reset();
217 // TODO(wez): Connect should succeed if called after Disconnect, which
218 // PseudoTcp doesn't support, so we need to teardown the internal PseudoTcp
219 // and create a new one in Connect.
220 // TODO(wez): Close sets a shutdown flag inside PseudoTcp but has no other
221 // effect. This should be addressed in PseudoTcp, really.
222 // In the meantime we can fake OnTcpClosed notification and tear down the
223 // PseudoTcp.
224 pseudo_tcp_.Close(true);
227 bool PseudoTcpAdapter::Core::IsConnected() const {
228 return pseudo_tcp_.State() == PseudoTcp::TCP_ESTABLISHED;
231 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
232 DCHECK(tcp == &pseudo_tcp_);
234 if (!connect_callback_.is_null()) {
235 net::CompletionCallback callback = connect_callback_;
236 connect_callback_.Reset();
237 callback.Run(net::OK);
240 OnTcpReadable(tcp);
241 OnTcpWriteable(tcp);
244 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
245 DCHECK_EQ(tcp, &pseudo_tcp_);
246 if (read_callback_.is_null())
247 return;
249 int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
250 if (result < 0) {
251 result = net::MapSystemError(pseudo_tcp_.GetError());
252 DCHECK(result < 0);
253 if (result == net::ERR_IO_PENDING)
254 return;
257 AdjustClock();
259 net::CompletionCallback callback = read_callback_;
260 read_callback_.Reset();
261 read_buffer_ = NULL;
262 callback.Run(result);
265 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
266 DCHECK_EQ(tcp, &pseudo_tcp_);
267 if (write_callback_.is_null())
268 return;
270 if (waiting_write_position_) {
271 CheckWriteComplete();
272 return;
275 int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
276 if (result < 0) {
277 result = net::MapSystemError(pseudo_tcp_.GetError());
278 DCHECK(result < 0);
279 if (result == net::ERR_IO_PENDING)
280 return;
283 AdjustClock();
285 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
286 DCHECK(!waiting_write_position_);
287 waiting_write_position_ = true;
288 last_write_result_ = result;
289 return;
292 net::CompletionCallback callback = write_callback_;
293 write_callback_.Reset();
294 write_buffer_ = NULL;
295 callback.Run(result);
298 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
299 DCHECK_EQ(tcp, &pseudo_tcp_);
301 if (!connect_callback_.is_null()) {
302 net::CompletionCallback callback = connect_callback_;
303 connect_callback_.Reset();
304 callback.Run(net::MapSystemError(error));
307 if (!read_callback_.is_null()) {
308 net::CompletionCallback callback = read_callback_;
309 read_callback_.Reset();
310 callback.Run(net::MapSystemError(error));
313 if (!write_callback_.is_null()) {
314 net::CompletionCallback callback = write_callback_;
315 write_callback_.Reset();
316 callback.Run(net::MapSystemError(error));
320 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
321 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
324 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
325 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
328 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
329 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
332 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
333 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
336 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
337 write_waits_for_send_ = write_waits_for_send;
340 void PseudoTcpAdapter::Core::DeleteSocket() {
341 socket_.reset();
344 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
345 PseudoTcp* tcp,
346 const char* buffer,
347 size_t len) {
348 DCHECK_EQ(tcp, &pseudo_tcp_);
350 // If we already have a write pending, we behave like a congested network,
351 // returning success for the write, but dropping the packet. PseudoTcp will
352 // back-off and retransmit, adjusting for the perceived congestion.
353 if (socket_write_pending_)
354 return IPseudoTcpNotify::WR_SUCCESS;
356 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
357 memcpy(write_buffer->data(), buffer, len);
359 // Our underlying socket is datagram-oriented, which means it should either
360 // send exactly as many bytes as we requested, or fail.
361 int result;
362 if (socket_.get()) {
363 result = socket_->Write(
364 write_buffer.get(),
365 len,
366 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
367 } else {
368 result = net::ERR_CONNECTION_CLOSED;
370 if (result == net::ERR_IO_PENDING) {
371 socket_write_pending_ = true;
372 return IPseudoTcpNotify::WR_SUCCESS;
373 } else if (result == net::ERR_MSG_TOO_BIG) {
374 return IPseudoTcpNotify::WR_TOO_LARGE;
375 } else if (result < 0) {
376 return IPseudoTcpNotify::WR_FAIL;
377 } else {
378 return IPseudoTcpNotify::WR_SUCCESS;
382 void PseudoTcpAdapter::Core::DoReadFromSocket() {
383 if (!socket_read_buffer_.get())
384 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
386 int result = 1;
387 while (socket_.get() && result > 0) {
388 result = socket_->Read(
389 socket_read_buffer_.get(),
390 kReadBufferSize,
391 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
392 if (result != net::ERR_IO_PENDING)
393 HandleReadResults(result);
397 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
398 if (result <= 0) {
399 LOG(ERROR) << "Read returned " << result;
400 return;
403 // TODO(wez): Disconnect on failure of NotifyPacket?
404 pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
405 AdjustClock();
407 CheckWriteComplete();
410 void PseudoTcpAdapter::Core::OnRead(int result) {
411 // Reference the Core in case a callback deletes the adapter.
412 scoped_refptr<Core> core(this);
414 HandleReadResults(result);
415 if (result >= 0)
416 DoReadFromSocket();
419 void PseudoTcpAdapter::Core::OnWritten(int result) {
420 // Reference the Core in case a callback deletes the adapter.
421 scoped_refptr<Core> core(this);
423 socket_write_pending_ = false;
424 if (result < 0) {
425 LOG(WARNING) << "Write failed. Error code: " << result;
429 void PseudoTcpAdapter::Core::AdjustClock() {
430 long timeout = 0;
431 if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
432 timer_.Stop();
433 timer_.Start(FROM_HERE,
434 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
435 &PseudoTcpAdapter::Core::HandleTcpClock);
439 void PseudoTcpAdapter::Core::HandleTcpClock() {
440 // Reference the Core in case a callback deletes the adapter.
441 scoped_refptr<Core> core(this);
443 pseudo_tcp_.NotifyClock(PseudoTcp::Now());
444 AdjustClock();
446 CheckWriteComplete();
449 void PseudoTcpAdapter::Core::CheckWriteComplete() {
450 if (!write_callback_.is_null() && waiting_write_position_) {
451 if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
452 waiting_write_position_ = false;
454 net::CompletionCallback callback = write_callback_;
455 write_callback_.Reset();
456 write_buffer_ = NULL;
457 callback.Run(last_write_result_);
462 // Public interface implemention.
464 PseudoTcpAdapter::PseudoTcpAdapter(net::Socket* socket)
465 : core_(new Core(socket)) {
468 PseudoTcpAdapter::~PseudoTcpAdapter() {
469 Disconnect();
471 // Make sure that the underlying socket is destroyed before PseudoTcp.
472 core_->DeleteSocket();
475 int PseudoTcpAdapter::Read(net::IOBuffer* buffer, int buffer_size,
476 const net::CompletionCallback& callback) {
477 DCHECK(CalledOnValidThread());
478 return core_->Read(buffer, buffer_size, callback);
481 int PseudoTcpAdapter::Write(net::IOBuffer* buffer, int buffer_size,
482 const net::CompletionCallback& callback) {
483 DCHECK(CalledOnValidThread());
484 return core_->Write(buffer, buffer_size, callback);
487 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
488 DCHECK(CalledOnValidThread());
490 core_->SetReceiveBufferSize(size);
491 return net::OK;
494 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
495 DCHECK(CalledOnValidThread());
497 core_->SetSendBufferSize(size);
498 return net::OK;
501 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
502 DCHECK(CalledOnValidThread());
504 // net::StreamSocket requires that Connect return OK if already connected.
505 if (IsConnected())
506 return net::OK;
508 return core_->Connect(callback);
511 void PseudoTcpAdapter::Disconnect() {
512 DCHECK(CalledOnValidThread());
513 core_->Disconnect();
516 bool PseudoTcpAdapter::IsConnected() const {
517 return core_->IsConnected();
520 bool PseudoTcpAdapter::IsConnectedAndIdle() const {
521 DCHECK(CalledOnValidThread());
522 NOTIMPLEMENTED();
523 return false;
526 int PseudoTcpAdapter::GetPeerAddress(net::IPEndPoint* address) const {
527 DCHECK(CalledOnValidThread());
529 // We don't have a meaningful peer address, but we can't return an
530 // error, so we return a INADDR_ANY instead.
531 net::IPAddressNumber ip_address(net::kIPv4AddressSize);
532 *address = net::IPEndPoint(ip_address, 0);
533 return net::OK;
536 int PseudoTcpAdapter::GetLocalAddress(net::IPEndPoint* address) const {
537 DCHECK(CalledOnValidThread());
538 NOTIMPLEMENTED();
539 return net::ERR_FAILED;
542 const net::BoundNetLog& PseudoTcpAdapter::NetLog() const {
543 DCHECK(CalledOnValidThread());
544 return net_log_;
547 void PseudoTcpAdapter::SetSubresourceSpeculation() {
548 DCHECK(CalledOnValidThread());
549 NOTIMPLEMENTED();
552 void PseudoTcpAdapter::SetOmniboxSpeculation() {
553 DCHECK(CalledOnValidThread());
554 NOTIMPLEMENTED();
557 bool PseudoTcpAdapter::WasEverUsed() const {
558 DCHECK(CalledOnValidThread());
559 NOTIMPLEMENTED();
560 return true;
563 bool PseudoTcpAdapter::UsingTCPFastOpen() const {
564 DCHECK(CalledOnValidThread());
565 return false;
568 bool PseudoTcpAdapter::WasNpnNegotiated() const {
569 DCHECK(CalledOnValidThread());
570 return false;
573 net::NextProto PseudoTcpAdapter::GetNegotiatedProtocol() const {
574 DCHECK(CalledOnValidThread());
575 return net::kProtoUnknown;
578 bool PseudoTcpAdapter::GetSSLInfo(net::SSLInfo* ssl_info) {
579 DCHECK(CalledOnValidThread());
580 return false;
583 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
584 DCHECK(CalledOnValidThread());
585 core_->SetAckDelay(delay_ms);
588 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
589 DCHECK(CalledOnValidThread());
590 core_->SetNoDelay(no_delay);
593 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
594 DCHECK(CalledOnValidThread());
595 core_->SetWriteWaitsForSend(write_waits_for_send);
598 } // namespace jingle_glue