Extract SIGPIPE ignoring code to a common place.
[chromium-blink-merge.git] / net / udp / udp_socket_win.cc
blob0be64513daead5a741d4586f906184f96f8f91c8
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/udp/udp_socket_win.h"
7 #include <mstcpip.h>
9 #include "base/callback.h"
10 #include "base/logging.h"
11 #include "base/message_loop.h"
12 #include "base/metrics/stats_counters.h"
13 #include "base/posix/eintr_wrapper.h"
14 #include "base/rand_util.h"
15 #include "net/base/io_buffer.h"
16 #include "net/base/ip_endpoint.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/net_log.h"
19 #include "net/base/net_util.h"
20 #include "net/base/winsock_init.h"
21 #include "net/base/winsock_util.h"
22 #include "net/udp/udp_net_log_parameters.h"
24 namespace {
26 static const int kBindRetries = 10;
27 static const int kPortStart = 1024;
28 static const int kPortEnd = 65535;
30 } // namespace net
32 namespace net {
34 // This class encapsulates all the state that has to be preserved as long as
35 // there is a network IO operation in progress. If the owner UDPSocketWin
36 // is destroyed while an operation is in progress, the Core is detached and it
37 // lives until the operation completes and the OS doesn't reference any resource
38 // declared on this class anymore.
39 class UDPSocketWin::Core : public base::RefCounted<Core> {
40 public:
41 explicit Core(UDPSocketWin* socket);
43 // Start watching for the end of a read or write operation.
44 void WatchForRead();
45 void WatchForWrite();
47 // The UDPSocketWin is going away.
48 void Detach() { socket_ = NULL; }
50 // The separate OVERLAPPED variables for asynchronous operation.
51 OVERLAPPED read_overlapped_;
52 OVERLAPPED write_overlapped_;
54 // The buffers used in Read() and Write().
55 scoped_refptr<IOBuffer> read_iobuffer_;
56 scoped_refptr<IOBuffer> write_iobuffer_;
58 // The address storage passed to WSARecvFrom().
59 SockaddrStorage recv_addr_storage_;
61 private:
62 friend class base::RefCounted<Core>;
64 class ReadDelegate : public base::win::ObjectWatcher::Delegate {
65 public:
66 explicit ReadDelegate(Core* core) : core_(core) {}
67 virtual ~ReadDelegate() {}
69 // base::ObjectWatcher::Delegate methods:
70 virtual void OnObjectSignaled(HANDLE object);
72 private:
73 Core* const core_;
76 class WriteDelegate : public base::win::ObjectWatcher::Delegate {
77 public:
78 explicit WriteDelegate(Core* core) : core_(core) {}
79 virtual ~WriteDelegate() {}
81 // base::ObjectWatcher::Delegate methods:
82 virtual void OnObjectSignaled(HANDLE object);
84 private:
85 Core* const core_;
88 ~Core();
90 // The socket that created this object.
91 UDPSocketWin* socket_;
93 // |reader_| handles the signals from |read_watcher_|.
94 ReadDelegate reader_;
95 // |writer_| handles the signals from |write_watcher_|.
96 WriteDelegate writer_;
98 // |read_watcher_| watches for events from Read().
99 base::win::ObjectWatcher read_watcher_;
100 // |write_watcher_| watches for events from Write();
101 base::win::ObjectWatcher write_watcher_;
103 DISALLOW_COPY_AND_ASSIGN(Core);
106 UDPSocketWin::Core::Core(UDPSocketWin* socket)
107 : socket_(socket),
108 ALLOW_THIS_IN_INITIALIZER_LIST(reader_(this)),
109 ALLOW_THIS_IN_INITIALIZER_LIST(writer_(this)) {
110 memset(&read_overlapped_, 0, sizeof(read_overlapped_));
111 memset(&write_overlapped_, 0, sizeof(write_overlapped_));
113 read_overlapped_.hEvent = WSACreateEvent();
114 write_overlapped_.hEvent = WSACreateEvent();
117 UDPSocketWin::Core::~Core() {
118 // Make sure the message loop is not watching this object anymore.
119 read_watcher_.StopWatching();
120 write_watcher_.StopWatching();
122 WSACloseEvent(read_overlapped_.hEvent);
123 memset(&read_overlapped_, 0xaf, sizeof(read_overlapped_));
124 WSACloseEvent(write_overlapped_.hEvent);
125 memset(&write_overlapped_, 0xaf, sizeof(write_overlapped_));
128 void UDPSocketWin::Core::WatchForRead() {
129 // We grab an extra reference because there is an IO operation in progress.
130 // Balanced in ReadDelegate::OnObjectSignaled().
131 AddRef();
132 read_watcher_.StartWatching(read_overlapped_.hEvent, &reader_);
135 void UDPSocketWin::Core::WatchForWrite() {
136 // We grab an extra reference because there is an IO operation in progress.
137 // Balanced in WriteDelegate::OnObjectSignaled().
138 AddRef();
139 write_watcher_.StartWatching(write_overlapped_.hEvent, &writer_);
142 void UDPSocketWin::Core::ReadDelegate::OnObjectSignaled(HANDLE object) {
143 DCHECK_EQ(object, core_->read_overlapped_.hEvent);
144 if (core_->socket_)
145 core_->socket_->DidCompleteRead();
147 core_->Release();
150 void UDPSocketWin::Core::WriteDelegate::OnObjectSignaled(HANDLE object) {
151 DCHECK_EQ(object, core_->write_overlapped_.hEvent);
152 if (core_->socket_)
153 core_->socket_->DidCompleteWrite();
155 core_->Release();
158 //-----------------------------------------------------------------------------
160 UDPSocketWin::UDPSocketWin(DatagramSocket::BindType bind_type,
161 const RandIntCallback& rand_int_cb,
162 net::NetLog* net_log,
163 const net::NetLog::Source& source)
164 : socket_(INVALID_SOCKET),
165 socket_options_(0),
166 bind_type_(bind_type),
167 rand_int_cb_(rand_int_cb),
168 recv_from_address_(NULL),
169 net_log_(BoundNetLog::Make(net_log, NetLog::SOURCE_UDP_SOCKET)) {
170 EnsureWinsockInit();
171 net_log_.BeginEvent(NetLog::TYPE_SOCKET_ALIVE,
172 source.ToEventParametersCallback());
173 if (bind_type == DatagramSocket::RANDOM_BIND)
174 DCHECK(!rand_int_cb.is_null());
177 UDPSocketWin::~UDPSocketWin() {
178 Close();
179 net_log_.EndEvent(NetLog::TYPE_SOCKET_ALIVE);
182 void UDPSocketWin::Close() {
183 DCHECK(CalledOnValidThread());
185 if (!is_connected())
186 return;
188 // Zero out any pending read/write callback state.
189 read_callback_.Reset();
190 recv_from_address_ = NULL;
191 write_callback_.Reset();
193 closesocket(socket_);
194 socket_ = INVALID_SOCKET;
196 core_->Detach();
197 core_ = NULL;
200 int UDPSocketWin::GetPeerAddress(IPEndPoint* address) const {
201 DCHECK(CalledOnValidThread());
202 DCHECK(address);
203 if (!is_connected())
204 return ERR_SOCKET_NOT_CONNECTED;
206 // TODO(szym): Simplify. http://crbug.com/126152
207 if (!remote_address_.get()) {
208 SockaddrStorage storage;
209 if (getpeername(socket_, storage.addr, &storage.addr_len))
210 return MapSystemError(WSAGetLastError());
211 scoped_ptr<IPEndPoint> address(new IPEndPoint());
212 if (!address->FromSockAddr(storage.addr, storage.addr_len))
213 return ERR_FAILED;
214 remote_address_.reset(address.release());
217 *address = *remote_address_;
218 return OK;
221 int UDPSocketWin::GetLocalAddress(IPEndPoint* address) const {
222 DCHECK(CalledOnValidThread());
223 DCHECK(address);
224 if (!is_connected())
225 return ERR_SOCKET_NOT_CONNECTED;
227 // TODO(szym): Simplify. http://crbug.com/126152
228 if (!local_address_.get()) {
229 SockaddrStorage storage;
230 if (getsockname(socket_, storage.addr, &storage.addr_len))
231 return MapSystemError(WSAGetLastError());
232 scoped_ptr<IPEndPoint> address(new IPEndPoint());
233 if (!address->FromSockAddr(storage.addr, storage.addr_len))
234 return ERR_FAILED;
235 local_address_.reset(address.release());
238 *address = *local_address_;
239 return OK;
242 int UDPSocketWin::Read(IOBuffer* buf,
243 int buf_len,
244 const CompletionCallback& callback) {
245 return RecvFrom(buf, buf_len, NULL, callback);
248 int UDPSocketWin::RecvFrom(IOBuffer* buf,
249 int buf_len,
250 IPEndPoint* address,
251 const CompletionCallback& callback) {
252 DCHECK(CalledOnValidThread());
253 DCHECK_NE(INVALID_SOCKET, socket_);
254 DCHECK(read_callback_.is_null());
255 DCHECK(!recv_from_address_);
256 DCHECK(!callback.is_null()); // Synchronous operation not supported.
257 DCHECK_GT(buf_len, 0);
259 int nread = InternalRecvFrom(buf, buf_len, address);
260 if (nread != ERR_IO_PENDING)
261 return nread;
263 read_callback_ = callback;
264 recv_from_address_ = address;
265 return ERR_IO_PENDING;
268 int UDPSocketWin::Write(IOBuffer* buf,
269 int buf_len,
270 const CompletionCallback& callback) {
271 return SendToOrWrite(buf, buf_len, NULL, callback);
274 int UDPSocketWin::SendTo(IOBuffer* buf,
275 int buf_len,
276 const IPEndPoint& address,
277 const CompletionCallback& callback) {
278 return SendToOrWrite(buf, buf_len, &address, callback);
281 int UDPSocketWin::SendToOrWrite(IOBuffer* buf,
282 int buf_len,
283 const IPEndPoint* address,
284 const CompletionCallback& callback) {
285 DCHECK(CalledOnValidThread());
286 DCHECK_NE(INVALID_SOCKET, socket_);
287 DCHECK(write_callback_.is_null());
288 DCHECK(!callback.is_null()); // Synchronous operation not supported.
289 DCHECK_GT(buf_len, 0);
290 DCHECK(!send_to_address_.get());
292 int nwrite = InternalSendTo(buf, buf_len, address);
293 if (nwrite != ERR_IO_PENDING)
294 return nwrite;
296 if (address)
297 send_to_address_.reset(new IPEndPoint(*address));
298 write_callback_ = callback;
299 return ERR_IO_PENDING;
302 int UDPSocketWin::Connect(const IPEndPoint& address) {
303 net_log_.BeginEvent(NetLog::TYPE_UDP_CONNECT,
304 CreateNetLogUDPConnectCallback(&address));
305 int rv = InternalConnect(address);
306 net_log_.EndEventWithNetErrorCode(NetLog::TYPE_UDP_CONNECT, rv);
307 return rv;
310 int UDPSocketWin::InternalConnect(const IPEndPoint& address) {
311 DCHECK(!is_connected());
312 DCHECK(!remote_address_.get());
313 int rv = CreateSocket(address);
314 if (rv < 0)
315 return rv;
317 if (bind_type_ == DatagramSocket::RANDOM_BIND)
318 rv = RandomBind(address);
319 // else connect() does the DatagramSocket::DEFAULT_BIND
321 if (rv < 0)
322 return rv;
324 SockaddrStorage storage;
325 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
326 return ERR_FAILED;
328 rv = connect(socket_, storage.addr, storage.addr_len);
329 if (rv < 0)
330 return MapSystemError(WSAGetLastError());
332 remote_address_.reset(new IPEndPoint(address));
333 return rv;
336 int UDPSocketWin::Bind(const IPEndPoint& address) {
337 DCHECK(!is_connected());
338 int rv = CreateSocket(address);
339 if (rv < 0)
340 return rv;
341 rv = SetSocketOptions();
342 if (rv < 0)
343 return rv;
344 rv = DoBind(address);
345 if (rv < 0)
346 return rv;
347 local_address_.reset();
348 return rv;
351 int UDPSocketWin::CreateSocket(const IPEndPoint& address) {
352 socket_ = WSASocket(address.GetFamily(), SOCK_DGRAM, IPPROTO_UDP, NULL, 0,
353 WSA_FLAG_OVERLAPPED);
354 if (socket_ == INVALID_SOCKET)
355 return MapSystemError(WSAGetLastError());
356 core_ = new Core(this);
357 return OK;
360 bool UDPSocketWin::SetReceiveBufferSize(int32 size) {
361 DCHECK(CalledOnValidThread());
362 int rv = setsockopt(socket_, SOL_SOCKET, SO_RCVBUF,
363 reinterpret_cast<const char*>(&size), sizeof(size));
364 DCHECK(!rv) << "Could not set socket receive buffer size: " << errno;
365 return rv == 0;
368 bool UDPSocketWin::SetSendBufferSize(int32 size) {
369 DCHECK(CalledOnValidThread());
370 int rv = setsockopt(socket_, SOL_SOCKET, SO_SNDBUF,
371 reinterpret_cast<const char*>(&size), sizeof(size));
372 DCHECK(!rv) << "Could not set socket send buffer size: " << errno;
373 return rv == 0;
376 void UDPSocketWin::AllowAddressReuse() {
377 DCHECK(CalledOnValidThread());
378 DCHECK(!is_connected());
380 socket_options_ |= SOCKET_OPTION_REUSE_ADDRESS;
383 void UDPSocketWin::AllowBroadcast() {
384 DCHECK(CalledOnValidThread());
385 DCHECK(!is_connected());
387 socket_options_ |= SOCKET_OPTION_BROADCAST;
390 void UDPSocketWin::DoReadCallback(int rv) {
391 DCHECK_NE(rv, ERR_IO_PENDING);
392 DCHECK(!read_callback_.is_null());
394 // since Run may result in Read being called, clear read_callback_ up front.
395 CompletionCallback c = read_callback_;
396 read_callback_.Reset();
397 c.Run(rv);
400 void UDPSocketWin::DoWriteCallback(int rv) {
401 DCHECK_NE(rv, ERR_IO_PENDING);
402 DCHECK(!write_callback_.is_null());
404 // since Run may result in Write being called, clear write_callback_ up front.
405 CompletionCallback c = write_callback_;
406 write_callback_.Reset();
407 c.Run(rv);
410 void UDPSocketWin::DidCompleteRead() {
411 DWORD num_bytes, flags;
412 BOOL ok = WSAGetOverlappedResult(socket_, &core_->read_overlapped_,
413 &num_bytes, FALSE, &flags);
414 WSAResetEvent(core_->read_overlapped_.hEvent);
415 int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
416 // Convert address.
417 if (recv_from_address_ && result >= 0) {
418 if (!ReceiveAddressToIPEndpoint(recv_from_address_))
419 result = ERR_FAILED;
421 LogRead(result, core_->read_iobuffer_->data());
422 core_->read_iobuffer_ = NULL;
423 recv_from_address_ = NULL;
424 DoReadCallback(result);
427 void UDPSocketWin::LogRead(int result, const char* bytes) const {
428 if (result < 0) {
429 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_RECEIVE_ERROR, result);
430 return;
433 if (net_log_.IsLoggingAllEvents()) {
434 // Get address for logging, if |address| is NULL.
435 IPEndPoint address;
436 bool is_address_valid = ReceiveAddressToIPEndpoint(&address);
437 net_log_.AddEvent(
438 NetLog::TYPE_UDP_BYTES_RECEIVED,
439 CreateNetLogUDPDataTranferCallback(
440 result, bytes,
441 is_address_valid ? &address : NULL));
444 base::StatsCounter read_bytes("udp.read_bytes");
445 read_bytes.Add(result);
448 void UDPSocketWin::DidCompleteWrite() {
449 DWORD num_bytes, flags;
450 BOOL ok = WSAGetOverlappedResult(socket_, &core_->write_overlapped_,
451 &num_bytes, FALSE, &flags);
452 WSAResetEvent(core_->write_overlapped_.hEvent);
453 int result = ok ? num_bytes : MapSystemError(WSAGetLastError());
454 LogWrite(result, core_->write_iobuffer_->data(), send_to_address_.get());
456 send_to_address_.reset();
457 core_->write_iobuffer_ = NULL;
458 DoWriteCallback(result);
461 void UDPSocketWin::LogWrite(int result,
462 const char* bytes,
463 const IPEndPoint* address) const {
464 if (result < 0) {
465 net_log_.AddEventWithNetErrorCode(NetLog::TYPE_UDP_SEND_ERROR, result);
466 return;
469 if (net_log_.IsLoggingAllEvents()) {
470 net_log_.AddEvent(
471 NetLog::TYPE_UDP_BYTES_SENT,
472 CreateNetLogUDPDataTranferCallback(result, bytes, address));
475 base::StatsCounter write_bytes("udp.write_bytes");
476 write_bytes.Add(result);
479 int UDPSocketWin::InternalRecvFrom(IOBuffer* buf, int buf_len,
480 IPEndPoint* address) {
481 DCHECK(!core_->read_iobuffer_);
482 SockaddrStorage& storage = core_->recv_addr_storage_;
483 storage.addr_len = sizeof(storage.addr_storage);
485 WSABUF read_buffer;
486 read_buffer.buf = buf->data();
487 read_buffer.len = buf_len;
489 DWORD flags = 0;
490 DWORD num;
491 CHECK_NE(INVALID_SOCKET, socket_);
492 AssertEventNotSignaled(core_->read_overlapped_.hEvent);
493 int rv = WSARecvFrom(socket_, &read_buffer, 1, &num, &flags, storage.addr,
494 &storage.addr_len, &core_->read_overlapped_, NULL);
495 if (rv == 0) {
496 if (ResetEventIfSignaled(core_->read_overlapped_.hEvent)) {
497 int result = num;
498 // Convert address.
499 if (address && result >= 0) {
500 if (!ReceiveAddressToIPEndpoint(address))
501 result = ERR_FAILED;
503 LogRead(result, buf->data());
504 return result;
506 } else {
507 int os_error = WSAGetLastError();
508 if (os_error != WSA_IO_PENDING) {
509 int result = MapSystemError(os_error);
510 LogRead(result, NULL);
511 return result;
514 core_->WatchForRead();
515 core_->read_iobuffer_ = buf;
516 return ERR_IO_PENDING;
519 int UDPSocketWin::InternalSendTo(IOBuffer* buf, int buf_len,
520 const IPEndPoint* address) {
521 DCHECK(!core_->write_iobuffer_);
522 SockaddrStorage storage;
523 struct sockaddr* addr = storage.addr;
524 // Convert address.
525 if (!address) {
526 addr = NULL;
527 storage.addr_len = 0;
528 } else {
529 if (!address->ToSockAddr(addr, &storage.addr_len)) {
530 int result = ERR_FAILED;
531 LogWrite(result, NULL, NULL);
532 return result;
536 WSABUF write_buffer;
537 write_buffer.buf = buf->data();
538 write_buffer.len = buf_len;
540 DWORD flags = 0;
541 DWORD num;
542 AssertEventNotSignaled(core_->write_overlapped_.hEvent);
543 int rv = WSASendTo(socket_, &write_buffer, 1, &num, flags,
544 addr, storage.addr_len, &core_->write_overlapped_, NULL);
545 if (rv == 0) {
546 if (ResetEventIfSignaled(core_->write_overlapped_.hEvent)) {
547 int result = num;
548 LogWrite(result, buf->data(), address);
549 return result;
551 } else {
552 int os_error = WSAGetLastError();
553 if (os_error != WSA_IO_PENDING) {
554 int result = MapSystemError(os_error);
555 LogWrite(result, NULL, NULL);
556 return result;
560 core_->WatchForWrite();
561 core_->write_iobuffer_ = buf;
562 return ERR_IO_PENDING;
565 int UDPSocketWin::SetSocketOptions() {
566 BOOL true_value = 1;
567 if (socket_options_ & SOCKET_OPTION_REUSE_ADDRESS) {
568 int rv = setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR,
569 reinterpret_cast<const char*>(&true_value),
570 sizeof(true_value));
571 if (rv < 0)
572 return MapSystemError(errno);
574 if (socket_options_ & SOCKET_OPTION_BROADCAST) {
575 int rv = setsockopt(socket_, SOL_SOCKET, SO_BROADCAST,
576 reinterpret_cast<const char*>(&true_value),
577 sizeof(true_value));
578 if (rv < 0)
579 return MapSystemError(errno);
581 return OK;
584 int UDPSocketWin::DoBind(const IPEndPoint& address) {
585 SockaddrStorage storage;
586 if (!address.ToSockAddr(storage.addr, &storage.addr_len))
587 return ERR_UNEXPECTED;
588 int rv = bind(socket_, storage.addr, storage.addr_len);
589 return rv < 0 ? MapSystemError(WSAGetLastError()) : rv;
592 int UDPSocketWin::RandomBind(const IPEndPoint& address) {
593 DCHECK(bind_type_ == DatagramSocket::RANDOM_BIND && !rand_int_cb_.is_null());
595 // Construct IPAddressNumber of appropriate size (IPv4 or IPv6) of 0s.
596 IPAddressNumber ip(address.address().size());
598 for (int i = 0; i < kBindRetries; ++i) {
599 int rv = DoBind(IPEndPoint(ip, rand_int_cb_.Run(kPortStart, kPortEnd)));
600 if (rv == OK || rv != ERR_ADDRESS_IN_USE)
601 return rv;
603 return DoBind(IPEndPoint(ip, 0));
606 bool UDPSocketWin::ReceiveAddressToIPEndpoint(IPEndPoint* address) const {
607 SockaddrStorage& storage = core_->recv_addr_storage_;
608 return address->FromSockAddr(storage.addr, storage.addr_len);
611 } // namespace net