1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/socket/udp_node.h"
12 #include "nacl_io/log.h"
13 #include "nacl_io/pepper_interface.h"
14 #include "nacl_io/socket/packet.h"
15 #include "nacl_io/socket/udp_event_emitter.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize
= 65536;
20 const size_t kDefaultFifoSize
= kMaxPacketSize
* 8;
25 class UdpWork
: public StreamFs::Work
{
27 explicit UdpWork(const ScopedUdpEventEmitter
& emitter
)
28 : StreamFs::Work(emitter
->stream()->stream()),
32 ~UdpWork() { delete packet_
; }
34 UDPSocketInterface
* UDPInterface() {
35 return filesystem()->ppapi()->GetUDPSocketInterface();
39 ScopedUdpEventEmitter emitter_
;
43 class UdpSendWork
: public UdpWork
{
45 explicit UdpSendWork(const ScopedUdpEventEmitter
& emitter
,
46 const ScopedSocketNode
& node
)
47 : UdpWork(emitter
), node_(node
) {}
49 virtual bool Start(int32_t val
) {
50 AUTO_LOCK(emitter_
->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_
->TestStreamFlags(SSF_CAN_SEND
))
56 // Check if we are already sending.
57 if (node_
->TestStreamFlags(SSF_SENDING
))
60 packet_
= emitter_
->ReadTXPacket_Locked();
64 int err
= UDPInterface()->SendTo(node_
->socket_resource(),
68 filesystem()->GetRunCompletion(this));
69 if (err
!= PP_OK_COMPLETIONPENDING
) {
70 // Anything else, we should assume the socket has gone bad.
71 node_
->SetError_Locked(err
);
75 node_
->SetStreamFlags(SSF_SENDING
);
79 virtual void Run(int32_t length_error
) {
80 AUTO_LOCK(emitter_
->GetLock());
82 if (length_error
< 0) {
83 node_
->SetError_Locked(length_error
);
87 // If we did send, then Q more work.
88 node_
->ClearStreamFlags(SSF_SENDING
);
93 // We assume that transmits will always complete. If the upstream
94 // actually back pressures, enough to prevent the Send callback
95 // from triggering, this resource may never go away.
96 ScopedSocketNode node_
;
99 class UdpRecvWork
: public UdpWork
{
101 explicit UdpRecvWork(const ScopedUdpEventEmitter
& emitter
)
105 virtual bool Start(int32_t val
) {
106 AUTO_LOCK(emitter_
->GetLock());
107 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
109 // Does the stream exist, and can it recv?
110 if (NULL
== stream
|| !stream
->TestStreamFlags(SSF_CAN_RECV
))
113 // Check if we are already receiving.
114 if (stream
->TestStreamFlags(SSF_RECVING
))
117 stream
->SetStreamFlags(SSF_RECVING
);
118 int err
= UDPInterface()->RecvFrom(stream
->socket_resource(),
122 filesystem()->GetRunCompletion(this));
123 if (err
!= PP_OK_COMPLETIONPENDING
) {
124 stream
->SetError_Locked(err
);
131 virtual void Run(int32_t length_error
) {
132 AUTO_LOCK(emitter_
->GetLock());
133 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
137 // On successful receive we queue more input
138 if (length_error
> 0) {
139 Packet
* packet
= new Packet(filesystem()->ppapi());
140 packet
->Copy(data_
, length_error
, addr_
);
141 filesystem()->ppapi()->ReleaseResource(addr_
);
142 emitter_
->WriteRXPacket_Locked(packet
);
143 stream
->ClearStreamFlags(SSF_RECVING
);
144 stream
->QueueInput();
146 stream
->SetError_Locked(length_error
);
151 char data_
[kMaxPacketSize
];
155 UdpNode::UdpNode(Filesystem
* filesystem
)
156 : SocketNode(filesystem
),
157 emitter_(new UdpEventEmitter(kDefaultFifoSize
, kDefaultFifoSize
)) {
158 emitter_
->AttachStream(this);
161 void UdpNode::Destroy() {
162 emitter_
->DetachStream();
163 SocketNode::Destroy();
166 UdpEventEmitter
* UdpNode::GetEventEmitter() {
167 return emitter_
.get();
170 Error
UdpNode::Init(int open_flags
) {
171 Error err
= SocketNode::Init(open_flags
);
175 if (UDPInterface() == NULL
) {
176 LOG_ERROR("Got NULL interface: UDP");
181 UDPInterface()->Create(filesystem_
->ppapi()->GetInstance());
182 if (0 == socket_resource_
) {
183 LOG_ERROR("Unable to create UDP resource.");
190 void UdpNode::QueueInput() {
191 UdpRecvWork
* work
= new UdpRecvWork(emitter_
);
192 stream()->EnqueueWork(work
);
195 void UdpNode::QueueOutput() {
196 if (!TestStreamFlags(SSF_CAN_SEND
))
199 if (TestStreamFlags(SSF_SENDING
))
202 UdpSendWork
* work
= new UdpSendWork(emitter_
, ScopedSocketNode(this));
203 stream()->EnqueueWork(work
);
206 Error
UdpNode::SetSockOpt(int lvl
,
210 if (lvl
== SOL_SOCKET
&& optname
== SO_RCVBUF
) {
211 if (static_cast<size_t>(len
) < sizeof(int))
213 AUTO_LOCK(node_lock_
);
214 int bufsize
= *static_cast<const int*>(optval
);
216 UDPInterface()->SetOption(socket_resource_
,
217 PP_UDPSOCKET_OPTION_RECV_BUFFER_SIZE
,
218 PP_MakeInt32(bufsize
),
219 PP_BlockUntilComplete());
220 return PPErrorToErrno(error
);
221 } else if (lvl
== SOL_SOCKET
&& optname
== SO_SNDBUF
) {
222 if (static_cast<size_t>(len
) < sizeof(int))
224 AUTO_LOCK(node_lock_
);
225 int bufsize
= *static_cast<const int*>(optval
);
227 UDPInterface()->SetOption(socket_resource_
,
228 PP_UDPSOCKET_OPTION_SEND_BUFFER_SIZE
,
229 PP_MakeInt32(bufsize
),
230 PP_BlockUntilComplete());
231 return PPErrorToErrno(error
);
234 return SocketNode::SetSockOpt(lvl
, optname
, optval
, len
);
237 Error
UdpNode::Bind(const struct sockaddr
* addr
, socklen_t len
) {
238 if (0 == socket_resource_
)
241 /* Only bind once. */
245 PP_Resource out_addr
= SockAddrToResource(addr
, len
);
250 UDPInterface()->Bind(socket_resource_
, out_addr
, PP_BlockUntilComplete());
251 filesystem_
->ppapi()->ReleaseResource(out_addr
);
253 return PPErrorToErrno(err
);
255 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
256 out_addr
= UDPInterface()->GetBoundAddress(socket_resource_
);
260 // Now that we are bound, we can start sending and receiving.
261 SetStreamFlags(SSF_CAN_SEND
| SSF_CAN_RECV
);
264 local_addr_
= out_addr
;
268 Error
UdpNode::Connect(const HandleAttr
& attr
,
269 const struct sockaddr
* addr
,
271 if (0 == socket_resource_
)
274 /* Connect for UDP is the default dest, it's legal to change it. */
275 if (remote_addr_
!= 0) {
276 filesystem_
->ppapi()->ReleaseResource(remote_addr_
);
280 remote_addr_
= SockAddrToResource(addr
, len
);
281 if (0 == remote_addr_
)
287 Error
UdpNode::Recv_Locked(void* buf
,
289 PP_Resource
* out_addr
,
291 Packet
* packet
= emitter_
->ReadRXPacket_Locked();
296 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, packet
->len()));
297 memcpy(buf
, packet
->buffer(), capped_len
);
299 if (packet
->addr() != 0) {
300 filesystem_
->ppapi()->AddRefResource(packet
->addr());
301 *out_addr
= packet
->addr();
304 *out_len
= capped_len
;
309 // Should never happen, Recv_Locked should not be called
310 // unless already in a POLLIN state.
314 Error
UdpNode::Send_Locked(const void* buf
,
319 // Pepper requires a socket to be bound before it can send.
321 addr
.sin_family
= AF_INET
;
323 memset(&addr
.sin_addr
, 0, sizeof(addr
.sin_addr
));
325 Bind(reinterpret_cast<const struct sockaddr
*>(&addr
), sizeof(addr
));
331 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, kMaxPacketSize
));
332 Packet
* packet
= new Packet(filesystem_
->ppapi());
333 packet
->Copy(buf
, capped_len
, addr
);
335 emitter_
->WriteTXPacket_Locked(packet
);
336 *out_len
= capped_len
;
340 } // namespace nacl_io