1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "mojo/services/network/public/cpp/udp_socket_wrapper.h"
9 #include "mojo/public/cpp/environment/logging.h"
14 const uint32_t kDefaultReceiveQueueSlots
= 32;
18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler(
19 UDPSocketWrapper
* delegate
)
20 : delegate_(delegate
) {
23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {}
25 void UDPSocketWrapper::NegotiateCallbackHandler::Run(
26 uint32_t actual_size
) const {
27 delegate_
->OnNegotiateMaxPendingSendRequestsCompleted(actual_size
);
30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler(
31 UDPSocketWrapper
* delegate
,
32 const ErrorCallback
& forward_callback
)
33 : delegate_(delegate
),
34 forward_callback_(forward_callback
) {
37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {}
39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result
) const {
40 delegate_
->OnSendToCompleted(result
.Pass(), forward_callback_
);
43 UDPSocketWrapper::ReceivedData::ReceivedData() {}
44 UDPSocketWrapper::ReceivedData::~ReceivedData() {}
46 UDPSocketWrapper::SendRequest::SendRequest() {}
47 UDPSocketWrapper::SendRequest::~SendRequest() {}
49 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket
)
50 : socket_(socket
.Pass()),
51 max_receive_queue_size_(kDefaultReceiveQueueSlots
),
52 max_pending_sends_(1),
53 current_pending_sends_(0) {
57 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket
,
58 uint32_t receive_queue_slots
,
59 uint32_t requested_max_pending_sends
)
60 : socket_(socket
.Pass()),
61 max_receive_queue_size_(receive_queue_slots
),
62 max_pending_sends_(1),
63 current_pending_sends_(0) {
64 Initialize(requested_max_pending_sends
);
67 UDPSocketWrapper::~UDPSocketWrapper() {
68 while (!receive_queue_
.empty()) {
69 delete receive_queue_
.front();
72 while (!send_requests_
.empty()) {
73 delete send_requests_
.front();
78 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback
& callback
) {
79 socket_
->AllowAddressReuse(callback
);
82 void UDPSocketWrapper::Bind(
84 const Callback
<void(NetworkErrorPtr
, NetAddressPtr
)>& callback
) {
85 socket_
->Bind(addr
.Pass(), callback
);
88 void UDPSocketWrapper::SetSendBufferSize(uint32_t size
,
89 const ErrorCallback
& callback
) {
90 socket_
->SetSendBufferSize(size
, callback
);
93 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size
,
94 const ErrorCallback
& callback
) {
95 socket_
->SetReceiveBufferSize(size
, callback
);
98 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback
& callback
) {
99 if (receive_queue_
.empty()) {
100 receive_requests_
.push(callback
);
104 ReceivedData
* data
= receive_queue_
.front();
105 receive_queue_
.pop();
106 socket_
->ReceiveMore(1);
107 callback
.Run(data
->result
.Pass(), data
->src_addr
.Pass(), data
->data
.Pass());
112 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr
,
114 const ErrorCallback
& callback
) {
115 if (current_pending_sends_
>= max_pending_sends_
) {
116 SendRequest
* request
= new SendRequest();
117 request
->dest_addr
= dest_addr
.Pass();
118 request
->data
= data
.Pass();
119 request
->callback
= callback
;
120 send_requests_
.push(request
);
124 MOJO_DCHECK(send_requests_
.empty());
125 current_pending_sends_
++;
126 socket_
->SendTo(dest_addr
.Pass(), data
.Pass(),
127 ErrorCallback(static_cast<ErrorCallback::Runnable
*>(
128 new SendCallbackHandler(this, callback
))));
131 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result
,
132 NetAddressPtr src_addr
,
133 Array
<uint8_t> data
) {
134 if (!receive_requests_
.empty()) {
135 // The cache should be empty if there are user requests waiting for data.
136 MOJO_DCHECK(receive_queue_
.empty());
138 socket_
->ReceiveMore(1);
140 ReceiveCallback callback
= receive_requests_
.front();
141 receive_requests_
.pop();
143 callback
.Run(result
.Pass(), src_addr
.Pass(), data
.Pass());
147 MOJO_DCHECK(receive_queue_
.size() < max_receive_queue_size_
);
148 ReceivedData
* received_data
= new ReceivedData();
149 received_data
->result
= result
.Pass();
150 received_data
->src_addr
= src_addr
.Pass();
151 received_data
->data
= data
.Pass();
152 receive_queue_
.push(received_data
);
155 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends
) {
156 socket_
.set_client(this);
157 socket_
->NegotiateMaxPendingSendRequests(
158 requested_max_pending_sends
,
159 Callback
<void(uint32_t)>(
160 static_cast< Callback
<void(uint32_t)>::Runnable
*>(
161 new NegotiateCallbackHandler(this))));
162 socket_
->ReceiveMore(max_receive_queue_size_
);
165 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
166 uint32_t actual_size
) {
167 MOJO_DCHECK(max_pending_sends_
== 1);
169 if (actual_size
== 0) {
174 max_pending_sends_
= actual_size
;
176 while (ProcessNextSendRequest());
179 void UDPSocketWrapper::OnSendToCompleted(
180 NetworkErrorPtr result
,
181 const ErrorCallback
& forward_callback
) {
182 current_pending_sends_
--;
183 ProcessNextSendRequest();
185 forward_callback
.Run(result
.Pass());
188 bool UDPSocketWrapper::ProcessNextSendRequest() {
189 if (current_pending_sends_
>= max_pending_sends_
|| send_requests_
.empty())
192 SendRequest
* request
= send_requests_
.front();
193 send_requests_
.pop();
195 current_pending_sends_
++;
198 request
->dest_addr
.Pass(), request
->data
.Pass(),
199 ErrorCallback(static_cast<ErrorCallback::Runnable
*>(
200 new SendCallbackHandler(this, request
->callback
))));