1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/net/udp_transport.h"
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
24 const int kMaxPacketSize
= 1500;
26 bool IsEmpty(const net::IPEndPoint
& addr
) {
27 net::IPAddressNumber
empty_addr(addr
.address().size());
29 empty_addr
.begin(), empty_addr
.end(), addr
.address().begin()) &&
33 bool IsEqual(const net::IPEndPoint
& addr1
, const net::IPEndPoint
& addr2
) {
34 return addr1
.port() == addr2
.port() && std::equal(addr1
.address().begin(),
35 addr1
.address().end(),
36 addr2
.address().begin());
40 UdpTransport::UdpTransport(
42 const scoped_refptr
<base::SingleThreadTaskRunner
>& io_thread_proxy
,
43 const net::IPEndPoint
& local_end_point
,
44 const net::IPEndPoint
& remote_end_point
,
45 int32 send_buffer_size
,
46 const CastTransportStatusCallback
& status_callback
)
47 : io_thread_proxy_(io_thread_proxy
),
48 local_addr_(local_end_point
),
49 remote_addr_(remote_end_point
),
50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND
,
51 net::RandIntCallback(),
53 net::NetLog::Source())),
55 receive_pending_(false),
56 client_connected_(false),
57 next_dscp_value_(net::DSCP_NO_CHANGE
),
58 send_buffer_size_(send_buffer_size
),
59 status_callback_(status_callback
),
62 DCHECK(!IsEmpty(local_end_point
) || !IsEmpty(remote_end_point
));
65 UdpTransport::~UdpTransport() {}
67 void UdpTransport::StartReceiving(
68 const PacketReceiverCallbackWithStatus
& packet_receiver
) {
69 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
71 packet_receiver_
= packet_receiver
;
72 udp_socket_
->SetMulticastLoopbackMode(true);
73 if (!IsEmpty(local_addr_
)) {
74 if (udp_socket_
->Open(local_addr_
.GetFamily()) < 0 ||
75 udp_socket_
->AllowAddressReuse() < 0 ||
76 udp_socket_
->Bind(local_addr_
) < 0) {
78 status_callback_
.Run(TRANSPORT_SOCKET_ERROR
);
79 LOG(ERROR
) << "Failed to bind local address.";
82 } else if (!IsEmpty(remote_addr_
)) {
83 if (udp_socket_
->Open(remote_addr_
.GetFamily()) < 0 ||
84 udp_socket_
->AllowAddressReuse() < 0 ||
85 udp_socket_
->Connect(remote_addr_
) < 0) {
87 status_callback_
.Run(TRANSPORT_SOCKET_ERROR
);
88 LOG(ERROR
) << "Failed to connect to remote address.";
91 client_connected_
= true;
93 NOTREACHED() << "Either local or remote address has to be defined.";
95 if (udp_socket_
->SetSendBufferSize(send_buffer_size_
) != net::OK
) {
96 LOG(WARNING
) << "Failed to set socket send buffer size.";
99 ScheduleReceiveNextPacket();
102 void UdpTransport::StopReceiving() {
103 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
104 packet_receiver_
= PacketReceiverCallbackWithStatus();
108 void UdpTransport::SetDscp(net::DiffServCodePoint dscp
) {
109 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
110 next_dscp_value_
= dscp
;
114 void UdpTransport::UseNonBlockingIO() {
115 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
116 udp_socket_
->UseNonBlockingIO();
120 void UdpTransport::ScheduleReceiveNextPacket() {
121 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
122 if (!packet_receiver_
.is_null() && !receive_pending_
) {
123 receive_pending_
= true;
124 io_thread_proxy_
->PostTask(FROM_HERE
,
125 base::Bind(&UdpTransport::ReceiveNextPacket
,
126 weak_factory_
.GetWeakPtr(),
127 net::ERR_IO_PENDING
));
131 void UdpTransport::ReceiveNextPacket(int length_or_status
) {
132 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
134 if (packet_receiver_
.is_null())
137 // Loop while UdpSocket is delivering data synchronously. When it responds
138 // with a "pending" status, break and expect this method to be called back in
139 // the future when a packet is ready.
141 if (length_or_status
== net::ERR_IO_PENDING
) {
142 next_packet_
.reset(new Packet(kMaxPacketSize
));
143 recv_buf_
= new net::WrappedIOBuffer(
144 reinterpret_cast<char*>(&next_packet_
->front()));
146 udp_socket_
->RecvFrom(recv_buf_
.get(),
149 base::Bind(&UdpTransport::ReceiveNextPacket
,
150 weak_factory_
.GetWeakPtr()));
151 if (length_or_status
== net::ERR_IO_PENDING
) {
152 receive_pending_
= true;
157 // Note: At this point, either a packet is ready or an error has occurred.
158 if (length_or_status
< 0) {
159 VLOG(1) << "Failed to receive packet: Status code is "
161 receive_pending_
= false;
165 // Confirm the packet has come from the expected remote address; otherwise,
166 // ignore it. If this is the first packet being received and no remote
167 // address has been set, set the remote address and expect all future
168 // packets to come from the same one.
169 // TODO(hubbe): We should only do this if the caller used a valid ssrc.
170 if (IsEmpty(remote_addr_
)) {
171 remote_addr_
= recv_addr_
;
172 VLOG(1) << "Setting remote address from first received packet: "
173 << remote_addr_
.ToString();
174 next_packet_
->resize(length_or_status
);
175 if (!packet_receiver_
.Run(next_packet_
.Pass())) {
176 VLOG(1) << "Packet was not valid, resetting remote address.";
177 remote_addr_
= net::IPEndPoint();
179 } else if (!IsEqual(remote_addr_
, recv_addr_
)) {
180 VLOG(1) << "Ignoring packet received from an unrecognized address: "
181 << recv_addr_
.ToString() << ".";
183 next_packet_
->resize(length_or_status
);
184 packet_receiver_
.Run(next_packet_
.Pass());
186 length_or_status
= net::ERR_IO_PENDING
;
190 bool UdpTransport::SendPacket(PacketRef packet
, const base::Closure
& cb
) {
191 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
193 // Increase byte count no matter the packet was sent or dropped.
194 bytes_sent_
+= packet
->data
.size();
196 DCHECK(!send_pending_
);
198 VLOG(1) << "Cannot send because of pending IO.";
202 if (next_dscp_value_
!= net::DSCP_NO_CHANGE
) {
203 int result
= udp_socket_
->SetDiffServCodePoint(next_dscp_value_
);
204 if (result
!= net::OK
) {
205 VLOG(1) << "Unable to set DSCP: " << next_dscp_value_
206 << " to socket; Error: " << result
;
209 if (result
!= net::ERR_SOCKET_NOT_CONNECTED
) {
210 // Don't change DSCP in next send.
211 next_dscp_value_
= net::DSCP_NO_CHANGE
;
215 scoped_refptr
<net::IOBuffer
> buf
=
216 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet
->data
.front()));
219 base::Callback
<void(int)> callback
= base::Bind(&UdpTransport::OnSent
,
220 weak_factory_
.GetWeakPtr(),
224 if (client_connected_
) {
225 // If we called Connect() before we must call Write() instead of
226 // SendTo(). Otherwise on some platforms we might get
227 // ERR_SOCKET_IS_CONNECTED.
228 result
= udp_socket_
->Write(
229 buf
.get(), static_cast<int>(packet
->data
.size()), callback
);
230 } else if (!IsEmpty(remote_addr_
)) {
231 result
= udp_socket_
->SendTo(buf
.get(),
232 static_cast<int>(packet
->data
.size()),
236 VLOG(1) << "Failed to send packet; socket is neither bound nor "
241 if (result
== net::ERR_IO_PENDING
) {
242 send_pending_
= true;
245 OnSent(buf
, packet
, base::Closure(), result
);
249 int64
UdpTransport::GetBytesSent() {
253 void UdpTransport::OnSent(const scoped_refptr
<net::IOBuffer
>& buf
,
255 const base::Closure
& cb
,
257 DCHECK(io_thread_proxy_
->RunsTasksOnCurrentThread());
259 send_pending_
= false;
261 VLOG(1) << "Failed to send packet: " << result
<< ".";
263 ScheduleReceiveNextPacket();