1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/udp/udp_socket_libevent.h"
10 #include <sys/socket.h>
11 #include <netinet/in.h>
13 #include "base/callback.h"
14 #include "base/logging.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/metrics/stats_counters.h"
17 #include "base/posix/eintr_wrapper.h"
18 #include "base/rand_util.h"
19 #include "net/base/io_buffer.h"
20 #include "net/base/ip_endpoint.h"
21 #include "net/base/net_errors.h"
22 #include "net/base/net_log.h"
23 #include "net/base/net_util.h"
24 #include "net/udp/udp_net_log_parameters.h"
28 const int kBindRetries
= 10;
29 const int kPortStart
= 1024;
30 const int kPortEnd
= 65535;
36 UDPSocketLibevent::UDPSocketLibevent(
37 DatagramSocket::BindType bind_type
,
38 const RandIntCallback
& rand_int_cb
,
40 const net::NetLog::Source
& source
)
41 : socket_(kInvalidSocket
),
43 socket_options_(SOCKET_OPTION_MULTICAST_LOOP
),
44 multicast_time_to_live_(1),
45 bind_type_(bind_type
),
46 rand_int_cb_(rand_int_cb
),
50 recv_from_address_(NULL
),
52 net_log_(BoundNetLog::Make(net_log
, NetLog::SOURCE_UDP_SOCKET
)) {
53 net_log_
.BeginEvent(NetLog::TYPE_SOCKET_ALIVE
,
54 source
.ToEventParametersCallback());
55 if (bind_type
== DatagramSocket::RANDOM_BIND
)
56 DCHECK(!rand_int_cb
.is_null());
59 UDPSocketLibevent::~UDPSocketLibevent() {
61 net_log_
.EndEvent(NetLog::TYPE_SOCKET_ALIVE
);
64 void UDPSocketLibevent::Close() {
65 DCHECK(CalledOnValidThread());
70 // Zero out any pending read/write callback state.
73 read_callback_
.Reset();
74 recv_from_address_
= NULL
;
77 write_callback_
.Reset();
78 send_to_address_
.reset();
80 bool ok
= read_socket_watcher_
.StopWatchingFileDescriptor();
82 ok
= write_socket_watcher_
.StopWatchingFileDescriptor();
85 if (HANDLE_EINTR(close(socket_
)) < 0)
86 PLOG(ERROR
) << "close";
88 socket_
= kInvalidSocket
;
92 int UDPSocketLibevent::GetPeerAddress(IPEndPoint
* address
) const {
93 DCHECK(CalledOnValidThread());
96 return ERR_SOCKET_NOT_CONNECTED
;
98 if (!remote_address_
.get()) {
99 SockaddrStorage storage
;
100 if (getpeername(socket_
, storage
.addr
, &storage
.addr_len
))
101 return MapSystemError(errno
);
102 scoped_ptr
<IPEndPoint
> address(new IPEndPoint());
103 if (!address
->FromSockAddr(storage
.addr
, storage
.addr_len
))
105 remote_address_
.reset(address
.release());
108 *address
= *remote_address_
;
112 int UDPSocketLibevent::GetLocalAddress(IPEndPoint
* address
) const {
113 DCHECK(CalledOnValidThread());
116 return ERR_SOCKET_NOT_CONNECTED
;
118 if (!local_address_
.get()) {
119 SockaddrStorage storage
;
120 if (getsockname(socket_
, storage
.addr
, &storage
.addr_len
))
121 return MapSystemError(errno
);
122 scoped_ptr
<IPEndPoint
> address(new IPEndPoint());
123 if (!address
->FromSockAddr(storage
.addr
, storage
.addr_len
))
125 local_address_
.reset(address
.release());
126 net_log_
.AddEvent(NetLog::TYPE_UDP_LOCAL_ADDRESS
,
127 CreateNetLogUDPConnectCallback(local_address_
.get()));
130 *address
= *local_address_
;
134 int UDPSocketLibevent::Read(IOBuffer
* buf
,
136 const CompletionCallback
& callback
) {
137 return RecvFrom(buf
, buf_len
, NULL
, callback
);
140 int UDPSocketLibevent::RecvFrom(IOBuffer
* buf
,
143 const CompletionCallback
& callback
) {
144 DCHECK(CalledOnValidThread());
145 DCHECK_NE(kInvalidSocket
, socket_
);
146 DCHECK(read_callback_
.is_null());
147 DCHECK(!recv_from_address_
);
148 DCHECK(!callback
.is_null()); // Synchronous operation not supported
149 DCHECK_GT(buf_len
, 0);
151 int nread
= InternalRecvFrom(buf
, buf_len
, address
);
152 if (nread
!= ERR_IO_PENDING
)
155 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
156 socket_
, true, base::MessageLoopForIO::WATCH_READ
,
157 &read_socket_watcher_
, &read_watcher_
)) {
158 PLOG(ERROR
) << "WatchFileDescriptor failed on read";
159 int result
= MapSystemError(errno
);
160 LogRead(result
, NULL
, 0, NULL
);
165 read_buf_len_
= buf_len
;
166 recv_from_address_
= address
;
167 read_callback_
= callback
;
168 return ERR_IO_PENDING
;
171 int UDPSocketLibevent::Write(IOBuffer
* buf
,
173 const CompletionCallback
& callback
) {
174 return SendToOrWrite(buf
, buf_len
, NULL
, callback
);
177 int UDPSocketLibevent::SendTo(IOBuffer
* buf
,
179 const IPEndPoint
& address
,
180 const CompletionCallback
& callback
) {
181 return SendToOrWrite(buf
, buf_len
, &address
, callback
);
184 int UDPSocketLibevent::SendToOrWrite(IOBuffer
* buf
,
186 const IPEndPoint
* address
,
187 const CompletionCallback
& callback
) {
188 DCHECK(CalledOnValidThread());
189 DCHECK_NE(kInvalidSocket
, socket_
);
190 DCHECK(write_callback_
.is_null());
191 DCHECK(!callback
.is_null()); // Synchronous operation not supported
192 DCHECK_GT(buf_len
, 0);
194 int result
= InternalSendTo(buf
, buf_len
, address
);
195 if (result
!= ERR_IO_PENDING
)
198 if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
199 socket_
, true, base::MessageLoopForIO::WATCH_WRITE
,
200 &write_socket_watcher_
, &write_watcher_
)) {
201 DVLOG(1) << "WatchFileDescriptor failed on write, errno " << errno
;
202 int result
= MapSystemError(errno
);
203 LogWrite(result
, NULL
, NULL
);
208 write_buf_len_
= buf_len
;
209 DCHECK(!send_to_address_
.get());
211 send_to_address_
.reset(new IPEndPoint(*address
));
213 write_callback_
= callback
;
214 return ERR_IO_PENDING
;
217 int UDPSocketLibevent::Connect(const IPEndPoint
& address
) {
218 net_log_
.BeginEvent(NetLog::TYPE_UDP_CONNECT
,
219 CreateNetLogUDPConnectCallback(&address
));
220 int rv
= InternalConnect(address
);
223 net_log_
.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT
, rv
);
227 int UDPSocketLibevent::InternalConnect(const IPEndPoint
& address
) {
228 DCHECK(CalledOnValidThread());
229 DCHECK(!is_connected());
230 DCHECK(!remote_address_
.get());
231 int rv
= CreateSocket(address
);
235 if (bind_type_
== DatagramSocket::RANDOM_BIND
)
236 rv
= RandomBind(address
);
237 // else connect() does the DatagramSocket::DEFAULT_BIND
244 SockaddrStorage storage
;
245 if (!address
.ToSockAddr(storage
.addr
, &storage
.addr_len
)) {
247 return ERR_ADDRESS_INVALID
;
250 rv
= HANDLE_EINTR(connect(socket_
, storage
.addr
, storage
.addr_len
));
252 // Close() may change the current errno. Map errno beforehand.
253 int result
= MapSystemError(errno
);
258 remote_address_
.reset(new IPEndPoint(address
));
262 int UDPSocketLibevent::Bind(const IPEndPoint
& address
) {
263 DCHECK(CalledOnValidThread());
264 DCHECK(!is_connected());
265 int rv
= CreateSocket(address
);
269 rv
= SetSocketOptions();
274 rv
= DoBind(address
);
279 local_address_
.reset();
283 bool UDPSocketLibevent::SetReceiveBufferSize(int32 size
) {
284 DCHECK(CalledOnValidThread());
285 int rv
= setsockopt(socket_
, SOL_SOCKET
, SO_RCVBUF
,
286 reinterpret_cast<const char*>(&size
), sizeof(size
));
287 DCHECK(!rv
) << "Could not set socket receive buffer size: " << errno
;
291 bool UDPSocketLibevent::SetSendBufferSize(int32 size
) {
292 DCHECK(CalledOnValidThread());
293 int rv
= setsockopt(socket_
, SOL_SOCKET
, SO_SNDBUF
,
294 reinterpret_cast<const char*>(&size
), sizeof(size
));
295 DCHECK(!rv
) << "Could not set socket send buffer size: " << errno
;
299 void UDPSocketLibevent::AllowAddressReuse() {
300 DCHECK(CalledOnValidThread());
301 DCHECK(!is_connected());
303 socket_options_
|= SOCKET_OPTION_REUSE_ADDRESS
;
306 void UDPSocketLibevent::AllowBroadcast() {
307 DCHECK(CalledOnValidThread());
308 DCHECK(!is_connected());
310 socket_options_
|= SOCKET_OPTION_BROADCAST
;
313 void UDPSocketLibevent::ReadWatcher::OnFileCanReadWithoutBlocking(int) {
314 if (!socket_
->read_callback_
.is_null())
315 socket_
->DidCompleteRead();
318 void UDPSocketLibevent::WriteWatcher::OnFileCanWriteWithoutBlocking(int) {
319 if (!socket_
->write_callback_
.is_null())
320 socket_
->DidCompleteWrite();
323 void UDPSocketLibevent::DoReadCallback(int rv
) {
324 DCHECK_NE(rv
, ERR_IO_PENDING
);
325 DCHECK(!read_callback_
.is_null());
327 // since Run may result in Read being called, clear read_callback_ up front.
328 CompletionCallback c
= read_callback_
;
329 read_callback_
.Reset();
333 void UDPSocketLibevent::DoWriteCallback(int rv
) {
334 DCHECK_NE(rv
, ERR_IO_PENDING
);
335 DCHECK(!write_callback_
.is_null());
337 // since Run may result in Write being called, clear write_callback_ up front.
338 CompletionCallback c
= write_callback_
;
339 write_callback_
.Reset();
343 void UDPSocketLibevent::DidCompleteRead() {
345 InternalRecvFrom(read_buf_
.get(), read_buf_len_
, recv_from_address_
);
346 if (result
!= ERR_IO_PENDING
) {
349 recv_from_address_
= NULL
;
350 bool ok
= read_socket_watcher_
.StopWatchingFileDescriptor();
352 DoReadCallback(result
);
356 void UDPSocketLibevent::LogRead(int result
,
359 const sockaddr
* addr
) const {
361 net_log_
.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR
, result
);
365 if (net_log_
.IsLoggingAllEvents()) {
366 DCHECK(addr_len
> 0);
370 bool is_address_valid
= address
.FromSockAddr(addr
, addr_len
);
372 NetLog::TYPE_UDP_BYTES_RECEIVED
,
373 CreateNetLogUDPDataTranferCallback(
375 is_address_valid
? &address
: NULL
));
378 base::StatsCounter
read_bytes("udp.read_bytes");
379 read_bytes
.Add(result
);
382 int UDPSocketLibevent::CreateSocket(const IPEndPoint
& address
) {
383 addr_family_
= address
.GetSockAddrFamily();
384 socket_
= socket(addr_family_
, SOCK_DGRAM
, 0);
385 if (socket_
== kInvalidSocket
)
386 return MapSystemError(errno
);
387 if (SetNonBlocking(socket_
)) {
388 const int err
= MapSystemError(errno
);
395 void UDPSocketLibevent::DidCompleteWrite() {
397 InternalSendTo(write_buf_
.get(), write_buf_len_
, send_to_address_
.get());
399 if (result
!= ERR_IO_PENDING
) {
402 send_to_address_
.reset();
403 write_socket_watcher_
.StopWatchingFileDescriptor();
404 DoWriteCallback(result
);
408 void UDPSocketLibevent::LogWrite(int result
,
410 const IPEndPoint
* address
) const {
412 net_log_
.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR
, result
);
416 if (net_log_
.IsLoggingAllEvents()) {
418 NetLog::TYPE_UDP_BYTES_SENT
,
419 CreateNetLogUDPDataTranferCallback(result
, bytes
, address
));
422 base::StatsCounter
write_bytes("udp.write_bytes");
423 write_bytes
.Add(result
);
426 int UDPSocketLibevent::InternalRecvFrom(IOBuffer
* buf
, int buf_len
,
427 IPEndPoint
* address
) {
428 int bytes_transferred
;
431 SockaddrStorage storage
;
434 HANDLE_EINTR(recvfrom(socket_
,
441 if (bytes_transferred
>= 0) {
442 result
= bytes_transferred
;
443 if (address
&& !address
->FromSockAddr(storage
.addr
, storage
.addr_len
))
446 result
= MapSystemError(errno
);
448 if (result
!= ERR_IO_PENDING
)
449 LogRead(result
, buf
->data(), storage
.addr_len
, storage
.addr
);
453 int UDPSocketLibevent::InternalSendTo(IOBuffer
* buf
, int buf_len
,
454 const IPEndPoint
* address
) {
455 SockaddrStorage storage
;
456 struct sockaddr
* addr
= storage
.addr
;
459 storage
.addr_len
= 0;
461 if (!address
->ToSockAddr(storage
.addr
, &storage
.addr_len
)) {
462 int result
= ERR_FAILED
;
463 LogWrite(result
, NULL
, NULL
);
468 int result
= HANDLE_EINTR(sendto(socket_
,
475 result
= MapSystemError(errno
);
476 if (result
!= ERR_IO_PENDING
)
477 LogWrite(result
, buf
->data(), address
);
481 int UDPSocketLibevent::SetSocketOptions() {
483 if (socket_options_
& SOCKET_OPTION_REUSE_ADDRESS
) {
484 int rv
= setsockopt(socket_
, SOL_SOCKET
, SO_REUSEADDR
, &true_value
,
487 return MapSystemError(errno
);
489 if (socket_options_
& SOCKET_OPTION_BROADCAST
) {
491 #if defined(OS_MACOSX)
492 // SO_REUSEPORT on OSX permits multiple processes to each receive
493 // UDP multicast or broadcast datagrams destined for the bound
495 rv
= setsockopt(socket_
, SOL_SOCKET
, SO_REUSEPORT
, &true_value
,
498 rv
= setsockopt(socket_
, SOL_SOCKET
, SO_BROADCAST
, &true_value
,
500 #endif // defined(OS_MACOSX)
502 return MapSystemError(errno
);
505 if (!(socket_options_
& SOCKET_OPTION_MULTICAST_LOOP
)) {
507 if (addr_family_
== AF_INET
) {
509 rv
= setsockopt(socket_
, IPPROTO_IP
, IP_MULTICAST_LOOP
,
510 &loop
, sizeof(loop
));
513 rv
= setsockopt(socket_
, IPPROTO_IPV6
, IPV6_MULTICAST_LOOP
,
514 &loop
, sizeof(loop
));
517 return MapSystemError(errno
);
519 if (multicast_time_to_live_
!= IP_DEFAULT_MULTICAST_TTL
) {
521 if (addr_family_
== AF_INET
) {
522 u_char ttl
= multicast_time_to_live_
;
523 rv
= setsockopt(socket_
, IPPROTO_IP
, IP_MULTICAST_TTL
,
526 // Signed interger. -1 to use route default.
527 int ttl
= multicast_time_to_live_
;
528 rv
= setsockopt(socket_
, IPPROTO_IPV6
, IPV6_MULTICAST_HOPS
,
532 return MapSystemError(errno
);
537 int UDPSocketLibevent::DoBind(const IPEndPoint
& address
) {
538 SockaddrStorage storage
;
539 if (!address
.ToSockAddr(storage
.addr
, &storage
.addr_len
))
540 return ERR_ADDRESS_INVALID
;
541 int rv
= bind(socket_
, storage
.addr
, storage
.addr_len
);
542 return rv
< 0 ? MapSystemError(errno
) : rv
;
545 int UDPSocketLibevent::RandomBind(const IPEndPoint
& address
) {
546 DCHECK(bind_type_
== DatagramSocket::RANDOM_BIND
&& !rand_int_cb_
.is_null());
548 // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s.
549 IPAddressNumber
ip(address
.address().size());
551 for (int i
= 0; i
< kBindRetries
; ++i
) {
552 int rv
= DoBind(IPEndPoint(ip
, rand_int_cb_
.Run(kPortStart
, kPortEnd
)));
553 if (rv
== OK
|| rv
!= ERR_ADDRESS_IN_USE
)
556 return DoBind(IPEndPoint(ip
, 0));
559 int UDPSocketLibevent::JoinGroup(const IPAddressNumber
& group_address
) const {
560 DCHECK(CalledOnValidThread());
562 return ERR_SOCKET_NOT_CONNECTED
;
564 switch (group_address
.size()) {
565 case kIPv4AddressSize
: {
566 if (addr_family_
!= AF_INET
)
567 return ERR_ADDRESS_INVALID
;
569 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
570 memcpy(&mreq
.imr_multiaddr
, &group_address
[0], kIPv4AddressSize
);
571 int rv
= setsockopt(socket_
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
,
572 &mreq
, sizeof(mreq
));
574 return MapSystemError(errno
);
577 case kIPv6AddressSize
: {
578 if (addr_family_
!= AF_INET6
)
579 return ERR_ADDRESS_INVALID
;
581 mreq
.ipv6mr_interface
= 0; // 0 indicates default multicast interface.
582 memcpy(&mreq
.ipv6mr_multiaddr
, &group_address
[0], kIPv6AddressSize
);
583 int rv
= setsockopt(socket_
, IPPROTO_IPV6
, IPV6_JOIN_GROUP
,
584 &mreq
, sizeof(mreq
));
586 return MapSystemError(errno
);
590 NOTREACHED() << "Invalid address family";
591 return ERR_ADDRESS_INVALID
;
595 int UDPSocketLibevent::LeaveGroup(const IPAddressNumber
& group_address
) const {
596 DCHECK(CalledOnValidThread());
599 return ERR_SOCKET_NOT_CONNECTED
;
601 switch (group_address
.size()) {
602 case kIPv4AddressSize
: {
603 if (addr_family_
!= AF_INET
)
604 return ERR_ADDRESS_INVALID
;
606 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
607 memcpy(&mreq
.imr_multiaddr
, &group_address
[0], kIPv4AddressSize
);
608 int rv
= setsockopt(socket_
, IPPROTO_IP
, IP_DROP_MEMBERSHIP
,
609 &mreq
, sizeof(mreq
));
611 return MapSystemError(errno
);
614 case kIPv6AddressSize
: {
615 if (addr_family_
!= AF_INET6
)
616 return ERR_ADDRESS_INVALID
;
618 mreq
.ipv6mr_interface
= 0; // 0 indicates default multicast interface.
619 memcpy(&mreq
.ipv6mr_multiaddr
, &group_address
[0], kIPv6AddressSize
);
620 int rv
= setsockopt(socket_
, IPPROTO_IPV6
, IPV6_LEAVE_GROUP
,
621 &mreq
, sizeof(mreq
));
623 return MapSystemError(errno
);
627 NOTREACHED() << "Invalid address family";
628 return ERR_ADDRESS_INVALID
;
632 int UDPSocketLibevent::SetMulticastTimeToLive(int time_to_live
) {
633 DCHECK(CalledOnValidThread());
635 return ERR_SOCKET_IS_CONNECTED
;
637 if (time_to_live
< 0 || time_to_live
> 255)
638 return ERR_INVALID_ARGUMENT
;
639 multicast_time_to_live_
= time_to_live
;
643 int UDPSocketLibevent::SetMulticastLoopbackMode(bool loopback
) {
644 DCHECK(CalledOnValidThread());
646 return ERR_SOCKET_IS_CONNECTED
;
649 socket_options_
|= SOCKET_OPTION_MULTICAST_LOOP
;
651 socket_options_
&= ~SOCKET_OPTION_MULTICAST_LOOP
;