Add P2PDatagramSocket and P2PStreamSocket interfaces.
[chromium-blink-merge.git] / remoting / protocol / pseudotcp_adapter.cc
blob11f01fd552ec38ee42a237162893145e204b89fb
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "remoting/protocol/pseudotcp_adapter.h"
7 #include "base/compiler_specific.h"
8 #include "base/logging.h"
9 #include "base/time/time.h"
10 #include "base/timer/timer.h"
11 #include "net/base/address_list.h"
12 #include "net/base/completion_callback.h"
13 #include "net/base/io_buffer.h"
14 #include "net/base/net_errors.h"
15 #include "net/base/net_util.h"
16 #include "remoting/protocol/p2p_datagram_socket.h"
18 using cricket::PseudoTcp;
20 namespace {
21 const int kReadBufferSize = 65536; // Maximum size of a packet.
22 const uint16 kDefaultMtu = 1280;
23 } // namespace
25 namespace remoting {
26 namespace protocol {
28 class PseudoTcpAdapter::Core : public cricket::IPseudoTcpNotify,
29 public base::RefCounted<Core> {
30 public:
31 explicit Core(scoped_ptr<P2PDatagramSocket> socket);
33 // Functions used to implement net::StreamSocket.
34 int Read(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size,
35 const net::CompletionCallback& callback);
36 int Write(const scoped_refptr<net::IOBuffer>& buffer, int buffer_size,
37 const net::CompletionCallback& callback);
38 int Connect(const net::CompletionCallback& callback);
40 // cricket::IPseudoTcpNotify interface.
41 // These notifications are triggered from NotifyPacket.
42 void OnTcpOpen(cricket::PseudoTcp* tcp) override;
43 void OnTcpReadable(cricket::PseudoTcp* tcp) override;
44 void OnTcpWriteable(cricket::PseudoTcp* tcp) override;
45 // This is triggered by NotifyClock or NotifyPacket.
46 void OnTcpClosed(cricket::PseudoTcp* tcp, uint32 error) override;
47 // This is triggered by NotifyClock, NotifyPacket, Recv and Send.
48 WriteResult TcpWritePacket(cricket::PseudoTcp* tcp,
49 const char* buffer,
50 size_t len) override;
52 void SetAckDelay(int delay_ms);
53 void SetNoDelay(bool no_delay);
54 void SetReceiveBufferSize(int32 size);
55 void SetSendBufferSize(int32 size);
56 void SetWriteWaitsForSend(bool write_waits_for_send);
58 void DeleteSocket();
60 private:
61 friend class base::RefCounted<Core>;
62 ~Core() override;
64 // These are invoked by the underlying Socket, and may trigger callbacks.
65 // They hold a reference to |this| while running, to protect from deletion.
66 void OnRead(int result);
67 void OnWritten(int result);
69 // These may trigger callbacks, so the holder must hold a reference on
70 // the stack while calling them.
71 void DoReadFromSocket();
72 void HandleReadResults(int result);
73 void HandleTcpClock();
75 // Checks if current write has completed in the write-waits-for-send
76 // mode.
77 void CheckWriteComplete();
79 // This re-sets |timer| without triggering callbacks.
80 void AdjustClock();
82 net::CompletionCallback connect_callback_;
83 net::CompletionCallback read_callback_;
84 net::CompletionCallback write_callback_;
86 cricket::PseudoTcp pseudo_tcp_;
87 scoped_ptr<P2PDatagramSocket> socket_;
89 scoped_refptr<net::IOBuffer> read_buffer_;
90 int read_buffer_size_;
91 scoped_refptr<net::IOBuffer> write_buffer_;
92 int write_buffer_size_;
94 // Whether we need to wait for data to be sent before completing write.
95 bool write_waits_for_send_;
97 // Set to true in the write-waits-for-send mode when we've
98 // successfully writtend data to the send buffer and waiting for the
99 // data to be sent to the remote end.
100 bool waiting_write_position_;
102 // Number of the bytes written by the last write stored while we wait
103 // for the data to be sent (i.e. when waiting_write_position_ = true).
104 int last_write_result_;
106 bool socket_write_pending_;
107 scoped_refptr<net::IOBuffer> socket_read_buffer_;
109 base::OneShotTimer<Core> timer_;
111 DISALLOW_COPY_AND_ASSIGN(Core);
115 PseudoTcpAdapter::Core::Core(scoped_ptr<P2PDatagramSocket> socket)
116 : pseudo_tcp_(this, 0),
117 socket_(socket.Pass()),
118 write_waits_for_send_(false),
119 waiting_write_position_(false),
120 socket_write_pending_(false) {
121 // Doesn't trigger callbacks.
122 pseudo_tcp_.NotifyMTU(kDefaultMtu);
125 PseudoTcpAdapter::Core::~Core() {
128 int PseudoTcpAdapter::Core::Read(const scoped_refptr<net::IOBuffer>& buffer,
129 int buffer_size,
130 const net::CompletionCallback& callback) {
131 DCHECK(read_callback_.is_null());
133 // Reference the Core in case a callback deletes the adapter.
134 scoped_refptr<Core> core(this);
136 int result = pseudo_tcp_.Recv(buffer->data(), buffer_size);
137 if (result < 0) {
138 result = net::MapSystemError(pseudo_tcp_.GetError());
139 DCHECK(result < 0);
142 if (result == net::ERR_IO_PENDING) {
143 read_buffer_ = buffer;
144 read_buffer_size_ = buffer_size;
145 read_callback_ = callback;
148 AdjustClock();
150 return result;
153 int PseudoTcpAdapter::Core::Write(const scoped_refptr<net::IOBuffer>& buffer,
154 int buffer_size,
155 const net::CompletionCallback& callback) {
156 DCHECK(write_callback_.is_null());
158 // Reference the Core in case a callback deletes the adapter.
159 scoped_refptr<Core> core(this);
161 int result = pseudo_tcp_.Send(buffer->data(), buffer_size);
162 if (result < 0) {
163 result = net::MapSystemError(pseudo_tcp_.GetError());
164 DCHECK(result < 0);
167 AdjustClock();
169 if (result == net::ERR_IO_PENDING) {
170 write_buffer_ = buffer;
171 write_buffer_size_ = buffer_size;
172 write_callback_ = callback;
173 return result;
176 if (result < 0)
177 return result;
179 // Need to wait until the data is sent to the peer when
180 // send-confirmation mode is enabled.
181 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
182 DCHECK(!waiting_write_position_);
183 waiting_write_position_ = true;
184 last_write_result_ = result;
185 write_buffer_ = buffer;
186 write_buffer_size_ = buffer_size;
187 write_callback_ = callback;
188 return net::ERR_IO_PENDING;
191 return result;
194 int PseudoTcpAdapter::Core::Connect(const net::CompletionCallback& callback) {
195 DCHECK_EQ(pseudo_tcp_.State(), cricket::PseudoTcp::TCP_LISTEN);
197 // Reference the Core in case a callback deletes the adapter.
198 scoped_refptr<Core> core(this);
200 // Start the connection attempt.
201 int result = pseudo_tcp_.Connect();
202 if (result < 0)
203 return net::ERR_FAILED;
205 AdjustClock();
207 connect_callback_ = callback;
208 DoReadFromSocket();
210 return net::ERR_IO_PENDING;
213 void PseudoTcpAdapter::Core::OnTcpOpen(PseudoTcp* tcp) {
214 DCHECK(tcp == &pseudo_tcp_);
216 if (!connect_callback_.is_null()) {
217 net::CompletionCallback callback = connect_callback_;
218 connect_callback_.Reset();
219 callback.Run(net::OK);
222 OnTcpReadable(tcp);
223 OnTcpWriteable(tcp);
226 void PseudoTcpAdapter::Core::OnTcpReadable(PseudoTcp* tcp) {
227 DCHECK_EQ(tcp, &pseudo_tcp_);
228 if (read_callback_.is_null())
229 return;
231 int result = pseudo_tcp_.Recv(read_buffer_->data(), read_buffer_size_);
232 if (result < 0) {
233 result = net::MapSystemError(pseudo_tcp_.GetError());
234 DCHECK(result < 0);
235 if (result == net::ERR_IO_PENDING)
236 return;
239 AdjustClock();
241 net::CompletionCallback callback = read_callback_;
242 read_callback_.Reset();
243 read_buffer_ = NULL;
244 callback.Run(result);
247 void PseudoTcpAdapter::Core::OnTcpWriteable(PseudoTcp* tcp) {
248 DCHECK_EQ(tcp, &pseudo_tcp_);
249 if (write_callback_.is_null())
250 return;
252 if (waiting_write_position_) {
253 CheckWriteComplete();
254 return;
257 int result = pseudo_tcp_.Send(write_buffer_->data(), write_buffer_size_);
258 if (result < 0) {
259 result = net::MapSystemError(pseudo_tcp_.GetError());
260 DCHECK(result < 0);
261 if (result == net::ERR_IO_PENDING)
262 return;
265 AdjustClock();
267 if (write_waits_for_send_ && pseudo_tcp_.GetBytesBufferedNotSent() > 0) {
268 DCHECK(!waiting_write_position_);
269 waiting_write_position_ = true;
270 last_write_result_ = result;
271 return;
274 net::CompletionCallback callback = write_callback_;
275 write_callback_.Reset();
276 write_buffer_ = NULL;
277 callback.Run(result);
280 void PseudoTcpAdapter::Core::OnTcpClosed(PseudoTcp* tcp, uint32 error) {
281 DCHECK_EQ(tcp, &pseudo_tcp_);
283 if (!connect_callback_.is_null()) {
284 net::CompletionCallback callback = connect_callback_;
285 connect_callback_.Reset();
286 callback.Run(net::MapSystemError(error));
289 if (!read_callback_.is_null()) {
290 net::CompletionCallback callback = read_callback_;
291 read_callback_.Reset();
292 callback.Run(net::MapSystemError(error));
295 if (!write_callback_.is_null()) {
296 net::CompletionCallback callback = write_callback_;
297 write_callback_.Reset();
298 callback.Run(net::MapSystemError(error));
302 void PseudoTcpAdapter::Core::SetAckDelay(int delay_ms) {
303 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_ACKDELAY, delay_ms);
306 void PseudoTcpAdapter::Core::SetNoDelay(bool no_delay) {
307 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_NODELAY, no_delay ? 1 : 0);
310 void PseudoTcpAdapter::Core::SetReceiveBufferSize(int32 size) {
311 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_RCVBUF, size);
314 void PseudoTcpAdapter::Core::SetSendBufferSize(int32 size) {
315 pseudo_tcp_.SetOption(cricket::PseudoTcp::OPT_SNDBUF, size);
318 void PseudoTcpAdapter::Core::SetWriteWaitsForSend(bool write_waits_for_send) {
319 write_waits_for_send_ = write_waits_for_send;
322 void PseudoTcpAdapter::Core::DeleteSocket() {
323 // Don't dispatch outstanding callbacks when the socket is deleted.
324 read_callback_.Reset();
325 read_buffer_ = NULL;
326 write_callback_.Reset();
327 write_buffer_ = NULL;
328 connect_callback_.Reset();
330 socket_.reset();
333 cricket::IPseudoTcpNotify::WriteResult PseudoTcpAdapter::Core::TcpWritePacket(
334 PseudoTcp* tcp,
335 const char* buffer,
336 size_t len) {
337 DCHECK_EQ(tcp, &pseudo_tcp_);
339 // If we already have a write pending, we behave like a congested network,
340 // returning success for the write, but dropping the packet. PseudoTcp will
341 // back-off and retransmit, adjusting for the perceived congestion.
342 if (socket_write_pending_)
343 return IPseudoTcpNotify::WR_SUCCESS;
345 scoped_refptr<net::IOBuffer> write_buffer = new net::IOBuffer(len);
346 memcpy(write_buffer->data(), buffer, len);
348 // Our underlying socket is datagram-oriented, which means it should either
349 // send exactly as many bytes as we requested, or fail.
350 int result;
351 if (socket_) {
352 result = socket_->Send(
353 write_buffer.get(), len,
354 base::Bind(&PseudoTcpAdapter::Core::OnWritten, base::Unretained(this)));
355 } else {
356 result = net::ERR_CONNECTION_CLOSED;
358 if (result == net::ERR_IO_PENDING) {
359 socket_write_pending_ = true;
360 return IPseudoTcpNotify::WR_SUCCESS;
361 } else if (result == net::ERR_MSG_TOO_BIG) {
362 return IPseudoTcpNotify::WR_TOO_LARGE;
363 } else if (result < 0) {
364 return IPseudoTcpNotify::WR_FAIL;
365 } else {
366 return IPseudoTcpNotify::WR_SUCCESS;
370 void PseudoTcpAdapter::Core::DoReadFromSocket() {
371 if (!socket_read_buffer_.get())
372 socket_read_buffer_ = new net::IOBuffer(kReadBufferSize);
374 int result = 1;
375 while (socket_ && result > 0) {
376 result = socket_->Recv(
377 socket_read_buffer_.get(), kReadBufferSize,
378 base::Bind(&PseudoTcpAdapter::Core::OnRead, base::Unretained(this)));
379 if (result != net::ERR_IO_PENDING)
380 HandleReadResults(result);
384 void PseudoTcpAdapter::Core::HandleReadResults(int result) {
385 if (result <= 0) {
386 LOG(ERROR) << "Read returned " << result;
387 return;
390 // TODO(wez): Disconnect on failure of NotifyPacket?
391 pseudo_tcp_.NotifyPacket(socket_read_buffer_->data(), result);
392 AdjustClock();
394 CheckWriteComplete();
397 void PseudoTcpAdapter::Core::OnRead(int result) {
398 // Reference the Core in case a callback deletes the adapter.
399 scoped_refptr<Core> core(this);
401 HandleReadResults(result);
402 if (result >= 0)
403 DoReadFromSocket();
406 void PseudoTcpAdapter::Core::OnWritten(int result) {
407 // Reference the Core in case a callback deletes the adapter.
408 scoped_refptr<Core> core(this);
410 socket_write_pending_ = false;
411 if (result < 0) {
412 LOG(WARNING) << "Write failed. Error code: " << result;
416 void PseudoTcpAdapter::Core::AdjustClock() {
417 long timeout = 0;
418 if (pseudo_tcp_.GetNextClock(PseudoTcp::Now(), timeout)) {
419 timer_.Stop();
420 timer_.Start(FROM_HERE,
421 base::TimeDelta::FromMilliseconds(std::max(timeout, 0L)), this,
422 &PseudoTcpAdapter::Core::HandleTcpClock);
426 void PseudoTcpAdapter::Core::HandleTcpClock() {
427 // Reference the Core in case a callback deletes the adapter.
428 scoped_refptr<Core> core(this);
430 pseudo_tcp_.NotifyClock(PseudoTcp::Now());
431 AdjustClock();
433 CheckWriteComplete();
436 void PseudoTcpAdapter::Core::CheckWriteComplete() {
437 if (!write_callback_.is_null() && waiting_write_position_) {
438 if (pseudo_tcp_.GetBytesBufferedNotSent() == 0) {
439 waiting_write_position_ = false;
441 net::CompletionCallback callback = write_callback_;
442 write_callback_.Reset();
443 write_buffer_ = NULL;
444 callback.Run(last_write_result_);
449 // Public interface implementation.
451 PseudoTcpAdapter::PseudoTcpAdapter(scoped_ptr<P2PDatagramSocket> socket)
452 : core_(new Core(socket.Pass())) {
455 PseudoTcpAdapter::~PseudoTcpAdapter() {
456 // Make sure that the underlying socket is destroyed before PseudoTcp.
457 core_->DeleteSocket();
460 int PseudoTcpAdapter::Read(const scoped_refptr<net::IOBuffer>& buffer,
461 int buffer_size,
462 const net::CompletionCallback& callback) {
463 DCHECK(CalledOnValidThread());
464 return core_->Read(buffer, buffer_size, callback);
467 int PseudoTcpAdapter::Write(const scoped_refptr<net::IOBuffer>& buffer,
468 int buffer_size,
469 const net::CompletionCallback& callback) {
470 DCHECK(CalledOnValidThread());
471 return core_->Write(buffer, buffer_size, callback);
474 int PseudoTcpAdapter::SetReceiveBufferSize(int32 size) {
475 DCHECK(CalledOnValidThread());
476 core_->SetReceiveBufferSize(size);
477 return net::OK;
480 int PseudoTcpAdapter::SetSendBufferSize(int32 size) {
481 DCHECK(CalledOnValidThread());
482 core_->SetSendBufferSize(size);
483 return net::OK;
486 int PseudoTcpAdapter::Connect(const net::CompletionCallback& callback) {
487 DCHECK(CalledOnValidThread());
488 return core_->Connect(callback);
491 void PseudoTcpAdapter::SetAckDelay(int delay_ms) {
492 DCHECK(CalledOnValidThread());
493 core_->SetAckDelay(delay_ms);
496 void PseudoTcpAdapter::SetNoDelay(bool no_delay) {
497 DCHECK(CalledOnValidThread());
498 core_->SetNoDelay(no_delay);
501 void PseudoTcpAdapter::SetWriteWaitsForSend(bool write_waits_for_send) {
502 DCHECK(CalledOnValidThread());
503 core_->SetWriteWaitsForSend(write_waits_for_send);
506 } // namespace protocol
507 } // namespace remoting