1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/socket/udp_node.h"
12 #include "nacl_io/log.h"
13 #include "nacl_io/pepper_interface.h"
14 #include "nacl_io/socket/packet.h"
15 #include "nacl_io/socket/udp_event_emitter.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize
= 65536;
20 const size_t kDefaultFifoSize
= kMaxPacketSize
* 8;
25 class UdpWork
: public StreamFs::Work
{
27 explicit UdpWork(const ScopedUdpEventEmitter
& emitter
)
28 : StreamFs::Work(emitter
->stream()->stream()),
32 ~UdpWork() { delete packet_
; }
34 UDPSocketInterface
* UDPInterface() {
35 return filesystem()->ppapi()->GetUDPSocketInterface();
39 ScopedUdpEventEmitter emitter_
;
43 class UdpSendWork
: public UdpWork
{
45 explicit UdpSendWork(const ScopedUdpEventEmitter
& emitter
,
46 const ScopedSocketNode
& node
)
47 : UdpWork(emitter
), node_(node
) {}
49 virtual bool Start(int32_t val
) {
50 AUTO_LOCK(emitter_
->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_
->TestStreamFlags(SSF_CAN_SEND
))
56 // Check if we are already sending.
57 if (node_
->TestStreamFlags(SSF_SENDING
))
60 // If this is a retry packet, packet_ will be already set
61 // and we don't need to dequeue from emitter_.
62 if (NULL
== packet_
) {
63 packet_
= emitter_
->ReadTXPacket_Locked();
68 int err
= UDPInterface()->SendTo(node_
->socket_resource(),
72 filesystem()->GetRunCompletion(this));
73 if (err
!= PP_OK_COMPLETIONPENDING
) {
74 // Anything else, we should assume the socket has gone bad.
75 node_
->SetError_Locked(err
);
79 node_
->SetStreamFlags(SSF_SENDING
);
83 virtual void Run(int32_t length_error
) {
84 AUTO_LOCK(emitter_
->GetLock());
86 if (length_error
< 0) {
87 if (length_error
== PP_ERROR_INPROGRESS
) {
88 // We need to retry this packet later.
89 node_
->ClearStreamFlags(SSF_SENDING
);
90 node_
->stream()->EnqueueWork(this);
93 node_
->SetError_Locked(length_error
);
97 // If we did send, then Q more work.
98 node_
->ClearStreamFlags(SSF_SENDING
);
103 // We assume that transmits will always complete. If the upstream
104 // actually back pressures, enough to prevent the Send callback
105 // from triggering, this resource may never go away.
106 ScopedSocketNode node_
;
109 class UdpRecvWork
: public UdpWork
{
111 explicit UdpRecvWork(const ScopedUdpEventEmitter
& emitter
)
115 virtual bool Start(int32_t val
) {
116 AUTO_LOCK(emitter_
->GetLock());
117 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
119 // Does the stream exist, and can it recv?
120 if (NULL
== stream
|| !stream
->TestStreamFlags(SSF_CAN_RECV
))
123 // Check if we are already receiving.
124 if (stream
->TestStreamFlags(SSF_RECVING
))
127 stream
->SetStreamFlags(SSF_RECVING
);
128 int err
= UDPInterface()->RecvFrom(stream
->socket_resource(),
132 filesystem()->GetRunCompletion(this));
133 if (err
!= PP_OK_COMPLETIONPENDING
) {
134 stream
->SetError_Locked(err
);
141 virtual void Run(int32_t length_error
) {
142 AUTO_LOCK(emitter_
->GetLock());
143 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
147 // On successful receive we queue more input
148 if (length_error
> 0) {
149 Packet
* packet
= new Packet(filesystem()->ppapi());
150 packet
->Copy(data_
, length_error
, addr_
);
151 filesystem()->ppapi()->ReleaseResource(addr_
);
152 emitter_
->WriteRXPacket_Locked(packet
);
153 stream
->ClearStreamFlags(SSF_RECVING
);
154 stream
->QueueInput();
156 stream
->SetError_Locked(length_error
);
161 char data_
[kMaxPacketSize
];
165 UdpNode::UdpNode(Filesystem
* filesystem
)
166 : SocketNode(filesystem
),
167 emitter_(new UdpEventEmitter(kDefaultFifoSize
, kDefaultFifoSize
)) {
168 emitter_
->AttachStream(this);
171 void UdpNode::Destroy() {
172 emitter_
->DetachStream();
173 SocketNode::Destroy();
176 UdpEventEmitter
* UdpNode::GetEventEmitter() {
177 return emitter_
.get();
180 Error
UdpNode::Init(int open_flags
) {
181 Error err
= SocketNode::Init(open_flags
);
185 if (UDPInterface() == NULL
) {
186 LOG_ERROR("Got NULL interface: UDP");
191 UDPInterface()->Create(filesystem_
->ppapi()->GetInstance());
192 if (0 == socket_resource_
) {
193 LOG_ERROR("Unable to create UDP resource.");
200 void UdpNode::QueueInput() {
201 UdpRecvWork
* work
= new UdpRecvWork(emitter_
);
202 stream()->EnqueueWork(work
);
205 void UdpNode::QueueOutput() {
206 if (!TestStreamFlags(SSF_CAN_SEND
))
209 if (TestStreamFlags(SSF_SENDING
))
212 UdpSendWork
* work
= new UdpSendWork(emitter_
, ScopedSocketNode(this));
213 stream()->EnqueueWork(work
);
216 Error
UdpNode::SetSockOpt(int lvl
,
220 if (lvl
== SOL_SOCKET
&& optname
== SO_RCVBUF
) {
221 if (static_cast<size_t>(len
) < sizeof(int))
223 AUTO_LOCK(node_lock_
);
224 int bufsize
= *static_cast<const int*>(optval
);
226 UDPInterface()->SetOption(socket_resource_
,
227 PP_UDPSOCKET_OPTION_RECV_BUFFER_SIZE
,
228 PP_MakeInt32(bufsize
),
229 PP_BlockUntilComplete());
230 return PPErrorToErrno(error
);
231 } else if (lvl
== SOL_SOCKET
&& optname
== SO_SNDBUF
) {
232 if (static_cast<size_t>(len
) < sizeof(int))
234 AUTO_LOCK(node_lock_
);
235 int bufsize
= *static_cast<const int*>(optval
);
237 UDPInterface()->SetOption(socket_resource_
,
238 PP_UDPSOCKET_OPTION_SEND_BUFFER_SIZE
,
239 PP_MakeInt32(bufsize
),
240 PP_BlockUntilComplete());
241 return PPErrorToErrno(error
);
244 return SocketNode::SetSockOpt(lvl
, optname
, optval
, len
);
247 Error
UdpNode::Bind(const struct sockaddr
* addr
, socklen_t len
) {
248 if (0 == socket_resource_
)
251 /* Only bind once. */
255 PP_Resource out_addr
= SockAddrToResource(addr
, len
);
260 UDPInterface()->Bind(socket_resource_
, out_addr
, PP_BlockUntilComplete());
261 filesystem_
->ppapi()->ReleaseResource(out_addr
);
263 return PPErrorToErrno(err
);
265 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
266 out_addr
= UDPInterface()->GetBoundAddress(socket_resource_
);
270 // Now that we are bound, we can start sending and receiving.
271 SetStreamFlags(SSF_CAN_SEND
| SSF_CAN_RECV
);
274 local_addr_
= out_addr
;
278 Error
UdpNode::Connect(const HandleAttr
& attr
,
279 const struct sockaddr
* addr
,
281 if (0 == socket_resource_
)
284 /* Connect for UDP is the default dest, it's legal to change it. */
285 if (remote_addr_
!= 0) {
286 filesystem_
->ppapi()->ReleaseResource(remote_addr_
);
290 remote_addr_
= SockAddrToResource(addr
, len
);
291 if (0 == remote_addr_
)
297 Error
UdpNode::Recv_Locked(void* buf
,
299 PP_Resource
* out_addr
,
301 Packet
* packet
= emitter_
->ReadRXPacket_Locked();
306 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, packet
->len()));
307 memcpy(buf
, packet
->buffer(), capped_len
);
309 if (packet
->addr() != 0) {
310 filesystem_
->ppapi()->AddRefResource(packet
->addr());
311 *out_addr
= packet
->addr();
314 *out_len
= capped_len
;
319 // Should never happen, Recv_Locked should not be called
320 // unless already in a POLLIN state.
324 Error
UdpNode::Send_Locked(const void* buf
,
329 // Pepper requires a socket to be bound before it can send.
331 addr
.sin_family
= AF_INET
;
333 memset(&addr
.sin_addr
, 0, sizeof(addr
.sin_addr
));
335 Bind(reinterpret_cast<const struct sockaddr
*>(&addr
), sizeof(addr
));
341 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, kMaxPacketSize
));
342 Packet
* packet
= new Packet(filesystem_
->ppapi());
343 packet
->Copy(buf
, capped_len
, addr
);
345 emitter_
->WriteTXPacket_Locked(packet
);
346 *out_len
= capped_len
;
350 } // namespace nacl_io