1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "network/public/cpp/udp_socket_wrapper.h"
9 #include "third_party/mojo/src/mojo/public/cpp/environment/logging.h"
14 const uint32_t kDefaultReceiveQueueSlots
= 32;
18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler(
19 UDPSocketWrapper
* delegate
)
20 : delegate_(delegate
) {
23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {}
25 void UDPSocketWrapper::NegotiateCallbackHandler::Run(
26 uint32_t actual_size
) const {
27 delegate_
->OnNegotiateMaxPendingSendRequestsCompleted(actual_size
);
30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler(
31 UDPSocketWrapper
* delegate
,
32 const ErrorCallback
& forward_callback
)
33 : delegate_(delegate
),
34 forward_callback_(forward_callback
) {
37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {}
39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result
) const {
40 delegate_
->OnSendToCompleted(result
.Pass(), forward_callback_
);
43 UDPSocketWrapper::ReceiverBindingCallback::ReceiverBindingCallback(
44 UDPSocketWrapper
* delegate
,
45 const Callback
<void(NetworkErrorPtr
, NetAddressPtr
)>& wrapper_callback
)
46 : delegate_(delegate
), wrapper_callback_(wrapper_callback
) {
49 UDPSocketWrapper::ReceiverBindingCallback::~ReceiverBindingCallback() {
52 void UDPSocketWrapper::ReceiverBindingCallback::Run(
53 NetworkErrorPtr result
,
55 InterfaceRequest
<UDPSocketReceiver
> request
) const {
56 delegate_
->StartReceivingData(request
.Pass());
57 wrapper_callback_
.Run(result
.Pass(), addr
.Pass());
60 UDPSocketWrapper::ReceivedData::ReceivedData() {}
61 UDPSocketWrapper::ReceivedData::~ReceivedData() {}
63 UDPSocketWrapper::SendRequest::SendRequest() {}
64 UDPSocketWrapper::SendRequest::~SendRequest() {}
66 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket
)
68 socket_(socket
.Pass()),
69 max_receive_queue_size_(kDefaultReceiveQueueSlots
),
70 max_pending_sends_(1),
71 current_pending_sends_(0) {
75 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket
,
76 uint32_t receive_queue_slots
,
77 uint32_t requested_max_pending_sends
)
79 socket_(socket
.Pass()),
80 max_receive_queue_size_(receive_queue_slots
),
81 max_pending_sends_(1),
82 current_pending_sends_(0) {
83 Initialize(requested_max_pending_sends
);
86 UDPSocketWrapper::~UDPSocketWrapper() {
87 while (!receive_queue_
.empty()) {
88 delete receive_queue_
.front();
91 while (!send_requests_
.empty()) {
92 delete send_requests_
.front();
97 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback
& callback
) {
98 socket_
->AllowAddressReuse(callback
);
101 void UDPSocketWrapper::Bind(
103 const Callback
<void(NetworkErrorPtr
, NetAddressPtr
)>& callback
) {
106 BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable
*>(
107 new ReceiverBindingCallback(this, callback
))));
110 void UDPSocketWrapper::Connect(
111 NetAddressPtr remote_addr
,
112 const Callback
<void(NetworkErrorPtr
, NetAddressPtr
)>& callback
) {
115 BindOrConnectCallback(static_cast<BindOrConnectCallback::Runnable
*>(
116 new ReceiverBindingCallback(this, callback
))));
119 void UDPSocketWrapper::SetSendBufferSize(uint32_t size
,
120 const ErrorCallback
& callback
) {
121 socket_
->SetSendBufferSize(size
, callback
);
124 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size
,
125 const ErrorCallback
& callback
) {
126 socket_
->SetReceiveBufferSize(size
, callback
);
129 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback
& callback
) {
130 if (receive_queue_
.empty()) {
131 receive_requests_
.push(callback
);
135 ReceivedData
* data
= receive_queue_
.front();
136 receive_queue_
.pop();
137 socket_
->ReceiveMore(1);
138 callback
.Run(data
->result
.Pass(), data
->src_addr
.Pass(), data
->data
.Pass());
143 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr
,
145 const ErrorCallback
& callback
) {
146 if (current_pending_sends_
>= max_pending_sends_
) {
147 SendRequest
* request
= new SendRequest();
148 request
->dest_addr
= dest_addr
.Pass();
149 request
->data
= data
.Pass();
150 request
->callback
= callback
;
151 send_requests_
.push(request
);
155 MOJO_DCHECK(send_requests_
.empty());
156 current_pending_sends_
++;
157 socket_
->SendTo(dest_addr
.Pass(), data
.Pass(),
158 ErrorCallback(static_cast<ErrorCallback::Runnable
*>(
159 new SendCallbackHandler(this, callback
))));
162 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result
,
163 NetAddressPtr src_addr
,
164 Array
<uint8_t> data
) {
165 if (!receive_requests_
.empty()) {
166 // The cache should be empty if there are user requests waiting for data.
167 MOJO_DCHECK(receive_queue_
.empty());
169 socket_
->ReceiveMore(1);
171 ReceiveCallback callback
= receive_requests_
.front();
172 receive_requests_
.pop();
174 callback
.Run(result
.Pass(), src_addr
.Pass(), data
.Pass());
178 MOJO_DCHECK(receive_queue_
.size() < max_receive_queue_size_
);
179 ReceivedData
* received_data
= new ReceivedData();
180 received_data
->result
= result
.Pass();
181 received_data
->src_addr
= src_addr
.Pass();
182 received_data
->data
= data
.Pass();
183 receive_queue_
.push(received_data
);
186 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends
) {
187 socket_
->NegotiateMaxPendingSendRequests(
188 requested_max_pending_sends
,
189 Callback
<void(uint32_t)>(
190 static_cast< Callback
<void(uint32_t)>::Runnable
*>(
191 new NegotiateCallbackHandler(this))));
194 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
195 uint32_t actual_size
) {
196 MOJO_DCHECK(max_pending_sends_
== 1);
198 if (actual_size
== 0) {
203 max_pending_sends_
= actual_size
;
205 while (ProcessNextSendRequest());
208 void UDPSocketWrapper::OnSendToCompleted(
209 NetworkErrorPtr result
,
210 const ErrorCallback
& forward_callback
) {
211 current_pending_sends_
--;
212 ProcessNextSendRequest();
214 forward_callback
.Run(result
.Pass());
217 bool UDPSocketWrapper::ProcessNextSendRequest() {
218 if (current_pending_sends_
>= max_pending_sends_
|| send_requests_
.empty())
221 SendRequest
* request
= send_requests_
.front();
222 send_requests_
.pop();
224 current_pending_sends_
++;
227 request
->dest_addr
.Pass(), request
->data
.Pass(),
228 ErrorCallback(static_cast<ErrorCallback::Runnable
*>(
229 new SendCallbackHandler(this, request
->callback
))));
236 void UDPSocketWrapper::StartReceivingData(
237 InterfaceRequest
<UDPSocketReceiver
> request
) {
238 binding_
.Bind(request
.Pass());
239 socket_
->ReceiveMore(max_receive_queue_size_
);