Unregister from GCM when the only GCM app is removed
[chromium-blink-merge.git] / media / cast / net / udp_transport.cc
blobf36b03444dbf59dca08e0b6e8d2d85b5c687b85d
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "media/cast/net/udp_transport.h"
7 #include <algorithm>
8 #include <string>
10 #include "base/bind.h"
11 #include "base/logging.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/rand_util.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/rand_callback.h"
20 namespace media {
21 namespace cast {
23 namespace {
24 const int kMaxPacketSize = 1500;
26 bool IsEmpty(const net::IPEndPoint& addr) {
27 net::IPAddressNumber empty_addr(addr.address().size());
28 return std::equal(
29 empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
30 !addr.port();
33 bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
34 return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
35 addr1.address().end(),
36 addr2.address().begin());
38 } // namespace
40 UdpTransport::UdpTransport(
41 net::NetLog* net_log,
42 const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
43 const net::IPEndPoint& local_end_point,
44 const net::IPEndPoint& remote_end_point,
45 int32 send_buffer_size,
46 const CastTransportStatusCallback& status_callback)
47 : io_thread_proxy_(io_thread_proxy),
48 local_addr_(local_end_point),
49 remote_addr_(remote_end_point),
50 udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
51 net::RandIntCallback(),
52 net_log,
53 net::NetLog::Source())),
54 send_pending_(false),
55 receive_pending_(false),
56 client_connected_(false),
57 next_dscp_value_(net::DSCP_NO_CHANGE),
58 send_buffer_size_(send_buffer_size),
59 status_callback_(status_callback),
60 bytes_sent_(0),
61 weak_factory_(this) {
62 DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
65 UdpTransport::~UdpTransport() {}
67 void UdpTransport::StartReceiving(
68 const PacketReceiverCallbackWithStatus& packet_receiver) {
69 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
71 packet_receiver_ = packet_receiver;
72 udp_socket_->SetMulticastLoopbackMode(true);
73 if (!IsEmpty(local_addr_)) {
74 if (udp_socket_->Open(local_addr_.GetFamily()) < 0 ||
75 udp_socket_->AllowAddressReuse() < 0 ||
76 udp_socket_->Bind(local_addr_) < 0) {
77 udp_socket_->Close();
78 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
79 LOG(ERROR) << "Failed to bind local address.";
80 return;
82 } else if (!IsEmpty(remote_addr_)) {
83 if (udp_socket_->Open(remote_addr_.GetFamily()) < 0 ||
84 udp_socket_->AllowAddressReuse() < 0 ||
85 udp_socket_->Connect(remote_addr_) < 0) {
86 udp_socket_->Close();
87 status_callback_.Run(TRANSPORT_SOCKET_ERROR);
88 LOG(ERROR) << "Failed to connect to remote address.";
89 return;
91 client_connected_ = true;
92 } else {
93 NOTREACHED() << "Either local or remote address has to be defined.";
95 if (udp_socket_->SetSendBufferSize(send_buffer_size_) != net::OK) {
96 LOG(WARNING) << "Failed to set socket send buffer size.";
99 ScheduleReceiveNextPacket();
102 void UdpTransport::StopReceiving() {
103 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
104 packet_receiver_ = PacketReceiverCallbackWithStatus();
108 void UdpTransport::SetDscp(net::DiffServCodePoint dscp) {
109 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
110 next_dscp_value_ = dscp;
113 #if defined(OS_WIN)
114 void UdpTransport::UseNonBlockingIO() {
115 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
116 udp_socket_->UseNonBlockingIO();
118 #endif
120 void UdpTransport::ScheduleReceiveNextPacket() {
121 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
122 if (!packet_receiver_.is_null() && !receive_pending_) {
123 receive_pending_ = true;
124 io_thread_proxy_->PostTask(FROM_HERE,
125 base::Bind(&UdpTransport::ReceiveNextPacket,
126 weak_factory_.GetWeakPtr(),
127 net::ERR_IO_PENDING));
131 void UdpTransport::ReceiveNextPacket(int length_or_status) {
132 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
134 if (packet_receiver_.is_null())
135 return;
137 // Loop while UdpSocket is delivering data synchronously. When it responds
138 // with a "pending" status, break and expect this method to be called back in
139 // the future when a packet is ready.
140 while (true) {
141 if (length_or_status == net::ERR_IO_PENDING) {
142 next_packet_.reset(new Packet(kMaxPacketSize));
143 recv_buf_ = new net::WrappedIOBuffer(
144 reinterpret_cast<char*>(&next_packet_->front()));
145 length_or_status =
146 udp_socket_->RecvFrom(recv_buf_.get(),
147 kMaxPacketSize,
148 &recv_addr_,
149 base::Bind(&UdpTransport::ReceiveNextPacket,
150 weak_factory_.GetWeakPtr()));
151 if (length_or_status == net::ERR_IO_PENDING) {
152 receive_pending_ = true;
153 return;
157 // Note: At this point, either a packet is ready or an error has occurred.
158 if (length_or_status < 0) {
159 VLOG(1) << "Failed to receive packet: Status code is "
160 << length_or_status;
161 receive_pending_ = false;
162 return;
165 // Confirm the packet has come from the expected remote address; otherwise,
166 // ignore it. If this is the first packet being received and no remote
167 // address has been set, set the remote address and expect all future
168 // packets to come from the same one.
169 // TODO(hubbe): We should only do this if the caller used a valid ssrc.
170 if (IsEmpty(remote_addr_)) {
171 remote_addr_ = recv_addr_;
172 VLOG(1) << "Setting remote address from first received packet: "
173 << remote_addr_.ToString();
174 next_packet_->resize(length_or_status);
175 if (!packet_receiver_.Run(next_packet_.Pass())) {
176 VLOG(1) << "Packet was not valid, resetting remote address.";
177 remote_addr_ = net::IPEndPoint();
179 } else if (!IsEqual(remote_addr_, recv_addr_)) {
180 VLOG(1) << "Ignoring packet received from an unrecognized address: "
181 << recv_addr_.ToString() << ".";
182 } else {
183 next_packet_->resize(length_or_status);
184 packet_receiver_.Run(next_packet_.Pass());
186 length_or_status = net::ERR_IO_PENDING;
190 bool UdpTransport::SendPacket(PacketRef packet, const base::Closure& cb) {
191 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
193 // Increase byte count no matter the packet was sent or dropped.
194 bytes_sent_ += packet->data.size();
196 DCHECK(!send_pending_);
197 if (send_pending_) {
198 VLOG(1) << "Cannot send because of pending IO.";
199 return true;
202 if (next_dscp_value_ != net::DSCP_NO_CHANGE) {
203 int result = udp_socket_->SetDiffServCodePoint(next_dscp_value_);
204 if (result != net::OK) {
205 VLOG(1) << "Unable to set DSCP: " << next_dscp_value_
206 << " to socket; Error: " << result;
209 if (result != net::ERR_SOCKET_NOT_CONNECTED) {
210 // Don't change DSCP in next send.
211 next_dscp_value_ = net::DSCP_NO_CHANGE;
215 scoped_refptr<net::IOBuffer> buf =
216 new net::WrappedIOBuffer(reinterpret_cast<char*>(&packet->data.front()));
218 int result;
219 base::Callback<void(int)> callback = base::Bind(&UdpTransport::OnSent,
220 weak_factory_.GetWeakPtr(),
221 buf,
222 packet,
223 cb);
224 if (client_connected_) {
225 // If we called Connect() before we must call Write() instead of
226 // SendTo(). Otherwise on some platforms we might get
227 // ERR_SOCKET_IS_CONNECTED.
228 result = udp_socket_->Write(
229 buf.get(), static_cast<int>(packet->data.size()), callback);
230 } else if (!IsEmpty(remote_addr_)) {
231 result = udp_socket_->SendTo(buf.get(),
232 static_cast<int>(packet->data.size()),
233 remote_addr_,
234 callback);
235 } else {
236 VLOG(1) << "Failed to send packet; socket is neither bound nor "
237 << "connected.";
238 return true;
241 if (result == net::ERR_IO_PENDING) {
242 send_pending_ = true;
243 return false;
245 OnSent(buf, packet, base::Closure(), result);
246 return true;
249 int64 UdpTransport::GetBytesSent() {
250 return bytes_sent_;
253 void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf,
254 PacketRef packet,
255 const base::Closure& cb,
256 int result) {
257 DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
259 send_pending_ = false;
260 if (result < 0) {
261 VLOG(1) << "Failed to send packet: " << result << ".";
263 ScheduleReceiveNextPacket();
265 if (!cb.is_null()) {
266 cb.Run();
270 } // namespace cast
271 } // namespace media