1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/renderer/p2p/ipc_socket_factory.h"
10 #include "base/compiler_specific.h"
11 #include "base/debug/trace_event.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/message_loop/message_loop_proxy.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/threading/non_thread_safe.h"
16 #include "content/renderer/media/webrtc_logging.h"
17 #include "content/renderer/p2p/host_address_request.h"
18 #include "content/renderer/p2p/socket_client_delegate.h"
19 #include "content/renderer/p2p/socket_client_impl.h"
20 #include "content/renderer/p2p/socket_dispatcher.h"
21 #include "jingle/glue/utils.h"
22 #include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
28 const int kDefaultNonSetOptionValue
= -1;
30 bool IsTcpClientSocket(P2PSocketType type
) {
31 return (type
== P2P_SOCKET_STUN_TCP_CLIENT
) ||
32 (type
== P2P_SOCKET_TCP_CLIENT
) ||
33 (type
== P2P_SOCKET_STUN_SSLTCP_CLIENT
) ||
34 (type
== P2P_SOCKET_SSLTCP_CLIENT
) ||
35 (type
== P2P_SOCKET_TLS_CLIENT
) ||
36 (type
== P2P_SOCKET_STUN_TLS_CLIENT
);
39 bool JingleSocketOptionToP2PSocketOption(talk_base::Socket::Option option
,
40 P2PSocketOption
* ipc_option
) {
42 case talk_base::Socket::OPT_RCVBUF
:
43 *ipc_option
= P2P_SOCKET_OPT_RCVBUF
;
45 case talk_base::Socket::OPT_SNDBUF
:
46 *ipc_option
= P2P_SOCKET_OPT_SNDBUF
;
48 case talk_base::Socket::OPT_DSCP
:
49 *ipc_option
= P2P_SOCKET_OPT_DSCP
;
51 case talk_base::Socket::OPT_DONTFRAGMENT
:
52 case talk_base::Socket::OPT_NODELAY
:
53 case talk_base::Socket::OPT_IPV6_V6ONLY
:
54 case talk_base::Socket::OPT_RTP_SENDTIME_EXTN_ID
:
55 return false; // Not supported by the chrome sockets.
63 // TODO(miu): This needs tuning. http://crbug.com/237960
64 const size_t kMaximumInFlightBytes
= 64 * 1024; // 64 KB
66 // IpcPacketSocket implements talk_base::AsyncPacketSocket interface
67 // using P2PSocketClient that works over IPC-channel. It must be used
68 // on the thread it was created.
69 class IpcPacketSocket
: public talk_base::AsyncPacketSocket
,
70 public P2PSocketClientDelegate
{
73 virtual ~IpcPacketSocket();
75 // Always takes ownership of client even if initialization fails.
76 bool Init(P2PSocketType type
, P2PSocketClientImpl
* client
,
77 const talk_base::SocketAddress
& local_address
,
78 const talk_base::SocketAddress
& remote_address
);
80 // talk_base::AsyncPacketSocket interface.
81 virtual talk_base::SocketAddress
GetLocalAddress() const OVERRIDE
;
82 virtual talk_base::SocketAddress
GetRemoteAddress() const OVERRIDE
;
83 virtual int Send(const void *pv
, size_t cb
,
84 const talk_base::PacketOptions
& options
) OVERRIDE
;
85 virtual int SendTo(const void *pv
, size_t cb
,
86 const talk_base::SocketAddress
& addr
,
87 const talk_base::PacketOptions
& options
) OVERRIDE
;
88 virtual int Close() OVERRIDE
;
89 virtual State
GetState() const OVERRIDE
;
90 virtual int GetOption(talk_base::Socket::Option option
, int* value
) OVERRIDE
;
91 virtual int SetOption(talk_base::Socket::Option option
, int value
) OVERRIDE
;
92 virtual int GetError() const OVERRIDE
;
93 virtual void SetError(int error
) OVERRIDE
;
95 // P2PSocketClientDelegate implementation.
96 virtual void OnOpen(const net::IPEndPoint
& address
) OVERRIDE
;
97 virtual void OnIncomingTcpConnection(
98 const net::IPEndPoint
& address
,
99 P2PSocketClient
* client
) OVERRIDE
;
100 virtual void OnSendComplete() OVERRIDE
;
101 virtual void OnError() OVERRIDE
;
102 virtual void OnDataReceived(const net::IPEndPoint
& address
,
103 const std::vector
<char>& data
,
104 const base::TimeTicks
& timestamp
) OVERRIDE
;
115 // Update trace of send throttling internal state. This should be called
116 // immediately after any changes to |send_bytes_available_| and/or
117 // |in_flight_packet_sizes_|.
118 void TraceSendThrottlingState() const;
120 void InitAcceptedTcp(P2PSocketClient
* client
,
121 const talk_base::SocketAddress
& local_address
,
122 const talk_base::SocketAddress
& remote_address
);
124 int DoSetOption(P2PSocketOption option
, int value
);
128 // Message loop on which this socket was created and being used.
129 base::MessageLoop
* message_loop_
;
131 // Corresponding P2P socket client.
132 scoped_refptr
<P2PSocketClient
> client_
;
134 // Local address is allocated by the browser process, and the
135 // renderer side doesn't know the address until it receives OnOpen()
136 // event from the browser.
137 talk_base::SocketAddress local_address_
;
139 // Remote address for client TCP connections.
140 talk_base::SocketAddress remote_address_
;
142 // Current state of the object.
143 InternalState state_
;
145 // Track the number of bytes allowed to be sent non-blocking. This is used to
146 // throttle the sending of packets to the browser process. For each packet
147 // sent, the value is decreased. As callbacks to OnSendComplete() (as IPCs
148 // from the browser process) are made, the value is increased back. This
149 // allows short bursts of high-rate sending without dropping packets, but
150 // quickly restricts the client to a sustainable steady-state rate.
151 size_t send_bytes_available_
;
152 std::deque
<size_t> in_flight_packet_sizes_
;
154 // Set to true once EWOULDBLOCK was returned from Send(). Indicates that the
155 // caller expects SignalWritable notification.
156 bool writable_signal_expected_
;
158 // Current error code. Valid when state_ == IS_ERROR.
160 int options_
[P2P_SOCKET_OPT_MAX
];
162 DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket
);
165 // Simple wrapper around P2PAsyncAddressResolver. The main purpose of this
166 // class is to send SignalDone, after OnDone callback from
167 // P2PAsyncAddressResolver. Libjingle sig slots are not thread safe. In case
168 // of MT sig slots clients must call disconnect. This class is to make sure
169 // we destruct from the same thread on which is created.
170 class AsyncAddressResolverImpl
: public base::NonThreadSafe
,
171 public talk_base::AsyncResolverInterface
{
173 AsyncAddressResolverImpl(P2PSocketDispatcher
* dispatcher
);
174 virtual ~AsyncAddressResolverImpl();
176 // talk_base::AsyncResolverInterface interface.
177 virtual void Start(const talk_base::SocketAddress
& addr
) OVERRIDE
;
178 virtual bool GetResolvedAddress(
179 int family
, talk_base::SocketAddress
* addr
) const OVERRIDE
;
180 virtual int GetError() const OVERRIDE
;
181 virtual void Destroy(bool wait
) OVERRIDE
;
184 virtual void OnAddressResolved(const net::IPAddressList
& addresses
);
186 scoped_refptr
<P2PAsyncAddressResolver
> resolver_
;
187 int port_
; // Port number in |addr| from Start() method.
188 std::vector
<talk_base::IPAddress
> addresses_
; // Resolved addresses.
191 IpcPacketSocket::IpcPacketSocket()
192 : type_(P2P_SOCKET_UDP
),
193 message_loop_(base::MessageLoop::current()),
194 state_(IS_UNINITIALIZED
),
195 send_bytes_available_(kMaximumInFlightBytes
),
196 writable_signal_expected_(false),
198 COMPILE_ASSERT(kMaximumInFlightBytes
> 0, would_send_at_zero_rate
);
199 std::fill_n(options_
, static_cast<int> (P2P_SOCKET_OPT_MAX
),
200 kDefaultNonSetOptionValue
);
203 IpcPacketSocket::~IpcPacketSocket() {
204 if (state_
== IS_OPENING
|| state_
== IS_OPEN
||
205 state_
== IS_ERROR
) {
210 void IpcPacketSocket::TraceSendThrottlingState() const {
211 TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_
.port(),
212 send_bytes_available_
);
213 TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_
.port(),
214 in_flight_packet_sizes_
.size());
217 bool IpcPacketSocket::Init(P2PSocketType type
,
218 P2PSocketClientImpl
* client
,
219 const talk_base::SocketAddress
& local_address
,
220 const talk_base::SocketAddress
& remote_address
) {
221 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
222 DCHECK_EQ(state_
, IS_UNINITIALIZED
);
226 local_address_
= local_address
;
227 remote_address_
= remote_address
;
230 net::IPEndPoint local_endpoint
;
231 if (!jingle_glue::SocketAddressToIPEndPoint(
232 local_address
, &local_endpoint
)) {
236 net::IPEndPoint remote_endpoint
;
237 if (!remote_address
.IsNil() &&
238 !jingle_glue::SocketAddressToIPEndPoint(
239 remote_address
, &remote_endpoint
)) {
243 // We need to send both resolved and unresolved address in Init. Unresolved
244 // address will be used in case of TLS for certificate hostname matching.
245 // Certificate will be tied to domain name not to IP address.
246 P2PHostAndIPEndPoint
remote_info(remote_address
.hostname(), remote_endpoint
);
248 client
->Init(type
, local_endpoint
, remote_info
, this);
253 void IpcPacketSocket::InitAcceptedTcp(
254 P2PSocketClient
* client
,
255 const talk_base::SocketAddress
& local_address
,
256 const talk_base::SocketAddress
& remote_address
) {
257 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
258 DCHECK_EQ(state_
, IS_UNINITIALIZED
);
261 local_address_
= local_address
;
262 remote_address_
= remote_address
;
264 TraceSendThrottlingState();
265 client_
->SetDelegate(this);
268 // talk_base::AsyncPacketSocket interface.
269 talk_base::SocketAddress
IpcPacketSocket::GetLocalAddress() const {
270 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
271 return local_address_
;
274 talk_base::SocketAddress
IpcPacketSocket::GetRemoteAddress() const {
275 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
276 return remote_address_
;
279 int IpcPacketSocket::Send(const void *data
, size_t data_size
,
280 const talk_base::PacketOptions
& options
) {
281 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
282 return SendTo(data
, data_size
, remote_address_
, options
);
285 int IpcPacketSocket::SendTo(const void *data
, size_t data_size
,
286 const talk_base::SocketAddress
& address
,
287 const talk_base::PacketOptions
& options
) {
288 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
291 case IS_UNINITIALIZED
:
301 // Continue sending the packet.
305 if (data_size
== 0) {
310 if (data_size
> send_bytes_available_
) {
311 TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
312 TRACE_EVENT_SCOPE_THREAD
,
314 client_
->GetSocketID());
315 if (!writable_signal_expected_
) {
316 WebRtcLogMessage(base::StringPrintf(
317 "IpcPacketSocket: sending is blocked. %d packets_in_flight.",
318 static_cast<int>(in_flight_packet_sizes_
.size())));
320 writable_signal_expected_
= true;
323 error_
= EWOULDBLOCK
;
327 net::IPEndPoint address_chrome
;
328 if (!jingle_glue::SocketAddressToIPEndPoint(address
, &address_chrome
)) {
334 send_bytes_available_
-= data_size
;
335 in_flight_packet_sizes_
.push_back(data_size
);
336 TraceSendThrottlingState();
338 const char* data_char
= reinterpret_cast<const char*>(data
);
339 std::vector
<char> data_vector(data_char
, data_char
+ data_size
);
340 client_
->SendWithDscp(address_chrome
, data_vector
, options
);
342 // Fake successful send. The caller ignores result anyway.
346 int IpcPacketSocket::Close() {
347 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
355 talk_base::AsyncPacketSocket::State
IpcPacketSocket::GetState() const {
356 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
359 case IS_UNINITIALIZED
:
364 return STATE_BINDING
;
367 if (IsTcpClientSocket(type_
)) {
368 return STATE_CONNECTED
;
382 int IpcPacketSocket::GetOption(talk_base::Socket::Option option
, int* value
) {
383 P2PSocketOption p2p_socket_option
= P2P_SOCKET_OPT_MAX
;
384 if (!JingleSocketOptionToP2PSocketOption(option
, &p2p_socket_option
)) {
385 // unsupported option.
389 *value
= options_
[p2p_socket_option
];
393 int IpcPacketSocket::SetOption(talk_base::Socket::Option option
, int value
) {
394 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
396 P2PSocketOption p2p_socket_option
= P2P_SOCKET_OPT_MAX
;
397 if (!JingleSocketOptionToP2PSocketOption(option
, &p2p_socket_option
)) {
398 // Option is not supported.
402 options_
[p2p_socket_option
] = value
;
404 if (state_
== IS_OPEN
) {
405 // Options will be applied when state becomes IS_OPEN in OnOpen.
406 return DoSetOption(p2p_socket_option
, value
);
411 int IpcPacketSocket::DoSetOption(P2PSocketOption option
, int value
) {
412 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
413 DCHECK_EQ(state_
, IS_OPEN
);
415 client_
->SetOption(option
, value
);
419 int IpcPacketSocket::GetError() const {
420 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
424 void IpcPacketSocket::SetError(int error
) {
425 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
429 void IpcPacketSocket::OnOpen(const net::IPEndPoint
& address
) {
430 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
432 if (!jingle_glue::IPEndPointToSocketAddress(address
, &local_address_
)) {
433 // Always expect correct IPv4 address to be allocated.
440 TraceSendThrottlingState();
442 // Set all pending options if any.
443 for (int i
= 0; i
< P2P_SOCKET_OPT_MAX
; ++i
) {
444 if (options_
[i
] != kDefaultNonSetOptionValue
)
445 DoSetOption(static_cast<P2PSocketOption
> (i
), options_
[i
]);
448 SignalAddressReady(this, local_address_
);
449 if (IsTcpClientSocket(type_
))
453 void IpcPacketSocket::OnIncomingTcpConnection(
454 const net::IPEndPoint
& address
,
455 P2PSocketClient
* client
) {
456 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
458 scoped_ptr
<IpcPacketSocket
> socket(new IpcPacketSocket());
460 talk_base::SocketAddress remote_address
;
461 if (!jingle_glue::IPEndPointToSocketAddress(address
, &remote_address
)) {
462 // Always expect correct IPv4 address to be allocated.
465 socket
->InitAcceptedTcp(client
, local_address_
, remote_address
);
466 SignalNewConnection(this, socket
.release());
469 void IpcPacketSocket::OnSendComplete() {
470 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
472 CHECK(!in_flight_packet_sizes_
.empty());
473 send_bytes_available_
+= in_flight_packet_sizes_
.front();
475 DCHECK_LE(send_bytes_available_
, kMaximumInFlightBytes
);
477 in_flight_packet_sizes_
.pop_front();
478 TraceSendThrottlingState();
480 if (writable_signal_expected_
&& send_bytes_available_
> 0) {
481 WebRtcLogMessage(base::StringPrintf(
482 "IpcPacketSocket: sending is unblocked. %d packets in flight.",
483 static_cast<int>(in_flight_packet_sizes_
.size())));
485 SignalReadyToSend(this);
486 writable_signal_expected_
= false;
490 void IpcPacketSocket::OnError() {
491 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
492 bool was_closed
= (state_
== IS_ERROR
|| state_
== IS_CLOSED
);
494 error_
= ECONNABORTED
;
496 SignalClose(this, 0);
500 void IpcPacketSocket::OnDataReceived(const net::IPEndPoint
& address
,
501 const std::vector
<char>& data
,
502 const base::TimeTicks
& timestamp
) {
503 DCHECK_EQ(base::MessageLoop::current(), message_loop_
);
505 talk_base::SocketAddress address_lj
;
506 if (!jingle_glue::IPEndPointToSocketAddress(address
, &address_lj
)) {
507 // We should always be able to convert address here because we
508 // don't expect IPv6 address on IPv4 connections.
513 talk_base::PacketTime
packet_time(timestamp
.ToInternalValue(), 0);
514 SignalReadPacket(this, &data
[0], data
.size(), address_lj
,
518 AsyncAddressResolverImpl::AsyncAddressResolverImpl(
519 P2PSocketDispatcher
* dispatcher
)
520 : resolver_(new P2PAsyncAddressResolver(dispatcher
)) {
523 AsyncAddressResolverImpl::~AsyncAddressResolverImpl() {
526 void AsyncAddressResolverImpl::Start(const talk_base::SocketAddress
& addr
) {
527 DCHECK(CalledOnValidThread());
528 // Copy port number from |addr|. |port_| must be copied
529 // when resolved address is returned in GetResolvedAddress.
532 resolver_
->Start(addr
, base::Bind(
533 &AsyncAddressResolverImpl::OnAddressResolved
,
534 base::Unretained(this)));
537 bool AsyncAddressResolverImpl::GetResolvedAddress(
538 int family
, talk_base::SocketAddress
* addr
) const {
539 DCHECK(CalledOnValidThread());
541 if (addresses_
.empty())
544 for (size_t i
= 0; i
< addresses_
.size(); ++i
) {
545 if (family
== addresses_
[i
].family()) {
546 addr
->SetResolvedIP(addresses_
[i
]);
547 addr
->SetPort(port_
);
554 int AsyncAddressResolverImpl::GetError() const {
555 DCHECK(CalledOnValidThread());
556 return addresses_
.empty() ? -1 : 0;
559 void AsyncAddressResolverImpl::Destroy(bool wait
) {
560 DCHECK(CalledOnValidThread());
562 // Libjingle doesn't need this object any more and it's not going to delete
567 void AsyncAddressResolverImpl::OnAddressResolved(
568 const net::IPAddressList
& addresses
) {
569 DCHECK(CalledOnValidThread());
570 for (size_t i
= 0; i
< addresses
.size(); ++i
) {
571 talk_base::SocketAddress socket_address
;
572 if (!jingle_glue::IPEndPointToSocketAddress(
573 net::IPEndPoint(addresses
[i
], 0), &socket_address
)) {
576 addresses_
.push_back(socket_address
.ipaddr());
583 IpcPacketSocketFactory::IpcPacketSocketFactory(
584 P2PSocketDispatcher
* socket_dispatcher
)
585 : socket_dispatcher_(socket_dispatcher
) {
588 IpcPacketSocketFactory::~IpcPacketSocketFactory() {
591 talk_base::AsyncPacketSocket
* IpcPacketSocketFactory::CreateUdpSocket(
592 const talk_base::SocketAddress
& local_address
, int min_port
, int max_port
) {
593 talk_base::SocketAddress crome_address
;
594 P2PSocketClientImpl
* socket_client
=
595 new P2PSocketClientImpl(socket_dispatcher_
);
596 scoped_ptr
<IpcPacketSocket
> socket(new IpcPacketSocket());
597 // TODO(sergeyu): Respect local_address and port limits here (need
598 // to pass them over IPC channel to the browser).
599 if (!socket
->Init(P2P_SOCKET_UDP
, socket_client
,
600 local_address
, talk_base::SocketAddress())) {
603 return socket
.release();
606 talk_base::AsyncPacketSocket
* IpcPacketSocketFactory::CreateServerTcpSocket(
607 const talk_base::SocketAddress
& local_address
, int min_port
, int max_port
,
609 // TODO(sergeyu): Implement SSL support.
610 if (opts
& talk_base::PacketSocketFactory::OPT_SSLTCP
)
613 P2PSocketType type
= (opts
& talk_base::PacketSocketFactory::OPT_STUN
) ?
614 P2P_SOCKET_STUN_TCP_SERVER
: P2P_SOCKET_TCP_SERVER
;
615 P2PSocketClientImpl
* socket_client
=
616 new P2PSocketClientImpl(socket_dispatcher_
);
617 scoped_ptr
<IpcPacketSocket
> socket(new IpcPacketSocket());
618 if (!socket
->Init(type
, socket_client
, local_address
,
619 talk_base::SocketAddress())) {
622 return socket
.release();
625 talk_base::AsyncPacketSocket
* IpcPacketSocketFactory::CreateClientTcpSocket(
626 const talk_base::SocketAddress
& local_address
,
627 const talk_base::SocketAddress
& remote_address
,
628 const talk_base::ProxyInfo
& proxy_info
,
629 const std::string
& user_agent
, int opts
) {
631 if (opts
& talk_base::PacketSocketFactory::OPT_SSLTCP
) {
632 type
= (opts
& talk_base::PacketSocketFactory::OPT_STUN
) ?
633 P2P_SOCKET_STUN_SSLTCP_CLIENT
: P2P_SOCKET_SSLTCP_CLIENT
;
634 } else if (opts
& talk_base::PacketSocketFactory::OPT_TLS
) {
635 type
= (opts
& talk_base::PacketSocketFactory::OPT_STUN
) ?
636 P2P_SOCKET_STUN_TLS_CLIENT
: P2P_SOCKET_TLS_CLIENT
;
638 type
= (opts
& talk_base::PacketSocketFactory::OPT_STUN
) ?
639 P2P_SOCKET_STUN_TCP_CLIENT
: P2P_SOCKET_TCP_CLIENT
;
641 P2PSocketClientImpl
* socket_client
=
642 new P2PSocketClientImpl(socket_dispatcher_
);
643 scoped_ptr
<IpcPacketSocket
> socket(new IpcPacketSocket());
644 if (!socket
->Init(type
, socket_client
, local_address
, remote_address
))
646 return socket
.release();
649 talk_base::AsyncResolverInterface
*
650 IpcPacketSocketFactory::CreateAsyncResolver() {
651 scoped_ptr
<AsyncAddressResolverImpl
> resolver(
652 new AsyncAddressResolverImpl(socket_dispatcher_
));
653 return resolver
.release();
656 } // namespace content