Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / media / cast / net / udp_transport.cc
blob0d77a69888148e009a4240e888386399c95e487d
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/net/udp_transport.h"
7 #include <algorithm>
8 #include <string>
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/rand_util.h"
14 #include "net/base/io_buffer.h"
15 #include "net/base/net_errors.h"
16 #include "net/base/rand_callback.h"
18 namespace media {
19 namespace cast {
21 namespace {
22 const int kMaxPacketSize = 1500;
24 bool IsEmpty(const net::IPEndPoint& addr) {
25 net::IPAddressNumber empty_addr(addr.address().size());
26 return std::equal(
27 empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
28 !addr.port();
31 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
32 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
33 addr1.address().end(),
34 addr2.address().begin());
36 } // namespace
38 UdpTransport::UdpTransport(
39 net::NetLog* net_log,
40 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
41 const net::IPEndPoint& local_end_point,
42 const net::IPEndPoint& remote_end_point,
43 int32 send_buffer_size,
44 const CastTransportStatusCallback& status_callback)
45 : io_thread_proxy_(io_thread_proxy),
46 local_addr_(local_end_point),
47 remote_addr_(remote_end_point),
48 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
49 net::RandIntCallback(),
50 net_log,
51 net::NetLog::Source())),
52 send_pending_(false),
53 receive_pending_(false),
54 client_connected_(false),
55 next_dscp_value_(net::DSCP_NO_CHANGE),
56 send_buffer_size_(send_buffer_size),
57 status_callback_(status_callback),
58 bytes_sent_(0),
59 weak_factory_(this) {
60 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
63 UdpTransport::~UdpTransport() {}
65 void UdpTransport::StartReceiving(
66 const PacketReceiverCallbackWithStatus& packet_receiver) {
67 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
69 if (!udp_socket_) {
70 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
71 return;
74 packet_receiver_ = packet_receiver;
75 udp_socket_->SetMulticastLoopbackMode(true);
76 if (!IsEmpty(local_addr_)) {
77 if (udp_socket_->Open(local_addr_.GetFamily()) < 0 ||
78 udp_socket_->AllowAddressReuse() < 0 ||
79 udp_socket_->Bind(local_addr_) < 0) {
80 udp_socket_->Close();
81 udp_socket_.reset();
82 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
83 LOG(ERROR) << "Failed to bind local address.";
84 return;
86 } else if (!IsEmpty(remote_addr_)) {
87 if (udp_socket_->Open(remote_addr_.GetFamily()) < 0 ||
88 udp_socket_->AllowAddressReuse() < 0 ||
89 udp_socket_->Connect(remote_addr_) < 0) {
90 udp_socket_->Close();
91 udp_socket_.reset();
92 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
93 LOG(ERROR) << "Failed to connect to remote address.";
94 return;
96 client_connected_ = true;
97 } else {
98 NOTREACHED() << "Either local or remote address has to be defined.";
100 if (udp_socket_->SetSendBufferSize(send_buffer_size_) != net::OK) {
101 LOG(WARNING) << "Failed to set socket send buffer size.";
104 ScheduleReceiveNextPacket();
107 void UdpTransport::StopReceiving() {
108 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
109 packet_receiver_ = PacketReceiverCallbackWithStatus();
113 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
114 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
115 next_dscp_value_ = dscp;
118 #if defined(OS_WIN)
119 void UdpTransport::UseNonBlockingIO() {
120 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
121 if (!udp_socket_)
122 return;
123 udp_socket_->UseNonBlockingIO();
125 #endif
127 void UdpTransport::ScheduleReceiveNextPacket() {
128 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
129 if (!packet_receiver_.is_null() && !receive_pending_) {
130 receive_pending_ = true;
131 io_thread_proxy_->PostTask(FROM_HERE,
132 base::Bind(&UdpTransport::ReceiveNextPacket,
133 weak_factory_.GetWeakPtr(),
134 net::ERR_IO_PENDING));
138 void UdpTransport::ReceiveNextPacket(int length_or_status) {
139 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
141 if (packet_receiver_.is_null())
142 return;
143 if (!udp_socket_)
144 return;
146 // Loop while UdpSocket is delivering data synchronously. When it responds
147 // with a "pending" status, break and expect this method to be called back in
148 // the future when a packet is ready.
149 while (true) {
150 if (length_or_status == net::ERR_IO_PENDING) {
151 next_packet_.reset(new Packet(kMaxPacketSize));
152 recv_buf_ = new net::WrappedIOBuffer(
153 reinterpret_cast<char*>(&next_packet_->front()));
154 length_or_status =
155 udp_socket_->RecvFrom(recv_buf_.get(),
156 kMaxPacketSize,
157 &recv_addr_,
158 base::Bind(&UdpTransport::ReceiveNextPacket,
159 weak_factory_.GetWeakPtr()));
160 if (length_or_status == net::ERR_IO_PENDING) {
161 receive_pending_ = true;
162 return;
166 // Note: At this point, either a packet is ready or an error has occurred.
167 if (length_or_status < 0) {
168 VLOG(1) << "Failed to receive packet: Status code is "
169 << length_or_status;
170 receive_pending_ = false;
171 return;
174 // Confirm the packet has come from the expected remote address; otherwise,
175 // ignore it. If this is the first packet being received and no remote
176 // address has been set, set the remote address and expect all future
177 // packets to come from the same one.
178 // TODO(hubbe): We should only do this if the caller used a valid ssrc.
179 if (IsEmpty(remote_addr_)) {
180 remote_addr_ = recv_addr_;
181 VLOG(1) << "Setting remote address from first received packet: "
182 << remote_addr_.ToString();
183 next_packet_->resize(length_or_status);
184 if (!packet_receiver_.Run(next_packet_.Pass())) {
185 VLOG(1) << "Packet was not valid, resetting remote address.";
186 remote_addr_ = net::IPEndPoint();
188 } else if (!IsEqual(remote_addr_, recv_addr_)) {
189 VLOG(1) << "Ignoring packet received from an unrecognized address: "
190 << recv_addr_.ToString() << ".";
191 } else {
192 next_packet_->resize(length_or_status);
193 packet_receiver_.Run(next_packet_.Pass());
195 length_or_status = net::ERR_IO_PENDING;
199 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
200 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
201 if (!udp_socket_)
202 return true;
204 // Increase byte count no matter the packet was sent or dropped.
205 bytes_sent_ += packet->data.size();
207 DCHECK(!send_pending_);
208 if (send_pending_) {
209 VLOG(1) << "Cannot send because of pending IO.";
210 return true;
213 if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
214 int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
215 if (result != net::OK) {
216 VLOG(1) << "Unable to set DSCP: " << next_dscp_value_
217 << " to socket; Error: " << result;
220 if (result != net::ERR_SOCKET_NOT_CONNECTED) {
221 // Don't change DSCP in next send.
222 next_dscp_value_ = net::DSCP_NO_CHANGE;
226 scoped_refptr<net::IOBuffer> buf =
227 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
229 int result;
230 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
231 weak_factory_.GetWeakPtr(),
232 buf,
233 packet,
234 cb);
235 if (client_connected_) {
236 // If we called Connect() before we must call Write() instead of
237 // SendTo(). Otherwise on some platforms we might get
238 // ERR_SOCKET_IS_CONNECTED.
239 result = udp_socket_->Write(
240 buf.get(), static_cast<int>(packet->data.size()), callback);
241 } else if (!IsEmpty(remote_addr_)) {
242 result = udp_socket_->SendTo(buf.get(),
243 static_cast<int>(packet->data.size()),
244 remote_addr_,
245 callback);
246 } else {
247 VLOG(1) << "Failed to send packet; socket is neither bound nor "
248 << "connected.";
249 return true;
252 if (result == net::ERR_IO_PENDING) {
253 send_pending_ = true;
254 return false;
256 OnSent(buf, packet, base::Closure(), result);
257 return true;
260 int64 UdpTransport::GetBytesSent() {
261 return bytes_sent_;
264 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
265 PacketRef packet,
266 const base::Closure& cb,
267 int result) {
268 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
270 send_pending_ = false;
271 if (result < 0) {
272 VLOG(1) << "Failed to send packet: " << result << ".";
274 ScheduleReceiveNextPacket();
276 if (!cb.is_null()) {
277 cb.Run();
281 } // namespace cast
282 } // namespace media