1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/socket/udp_node.h"
12 #include "nacl_io/log.h"
13 #include "nacl_io/pepper_interface.h"
14 #include "nacl_io/socket/packet.h"
15 #include "nacl_io/socket/udp_event_emitter.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize
= 65536;
20 const size_t kDefaultFifoSize
= kMaxPacketSize
* 8;
25 class UdpWork
: public StreamFs::Work
{
27 explicit UdpWork(const ScopedUdpEventEmitter
& emitter
)
28 : StreamFs::Work(emitter
->stream()->stream()),
32 ~UdpWork() { delete packet_
; }
34 UDPSocketInterface
* UDPInterface() {
35 return filesystem()->ppapi()->GetUDPSocketInterface();
39 ScopedUdpEventEmitter emitter_
;
43 class UdpSendWork
: public UdpWork
{
45 explicit UdpSendWork(const ScopedUdpEventEmitter
& emitter
,
46 const ScopedSocketNode
& node
)
47 : UdpWork(emitter
), node_(node
) {}
49 virtual bool Start(int32_t val
) {
50 AUTO_LOCK(emitter_
->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_
->TestStreamFlags(SSF_CAN_SEND
))
56 packet_
= emitter_
->ReadTXPacket_Locked();
60 int err
= UDPInterface()->SendTo(node_
->socket_resource(),
64 filesystem()->GetRunCompletion(this));
65 if (err
!= PP_OK_COMPLETIONPENDING
) {
66 // Anything else, we should assume the socket has gone bad.
67 node_
->SetError_Locked(err
);
71 node_
->SetStreamFlags(SSF_SENDING
);
75 virtual void Run(int32_t length_error
) {
76 AUTO_LOCK(emitter_
->GetLock());
78 if (length_error
< 0) {
79 node_
->SetError_Locked(length_error
);
83 // If we did send, then Q more work.
84 node_
->ClearStreamFlags(SSF_SENDING
);
89 // We assume that transmits will always complete. If the upstream
90 // actually back pressures, enough to prevent the Send callback
91 // from triggering, this resource may never go away.
92 ScopedSocketNode node_
;
95 class UdpRecvWork
: public UdpWork
{
97 explicit UdpRecvWork(const ScopedUdpEventEmitter
& emitter
)
99 data_
= new char[kMaxPacketSize
];
102 ~UdpRecvWork() { delete[] data_
; }
104 virtual bool Start(int32_t val
) {
105 AUTO_LOCK(emitter_
->GetLock());
106 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
108 // Does the stream exist, and can it recv?
109 if (NULL
== stream
|| !stream
->TestStreamFlags(SSF_CAN_RECV
))
112 // Check if we are already receiving.
113 if (stream
->TestStreamFlags(SSF_RECVING
))
116 stream
->SetStreamFlags(SSF_RECVING
);
117 int err
= UDPInterface()->RecvFrom(stream
->socket_resource(),
121 filesystem()->GetRunCompletion(this));
122 if (err
!= PP_OK_COMPLETIONPENDING
) {
123 stream
->SetError_Locked(err
);
130 virtual void Run(int32_t length_error
) {
131 AUTO_LOCK(emitter_
->GetLock());
132 UdpNode
* stream
= static_cast<UdpNode
*>(emitter_
->stream());
136 // On successful receive we queue more input
137 if (length_error
> 0) {
138 Packet
* packet
= new Packet(filesystem()->ppapi());
139 packet
->Copy(data_
, length_error
, addr_
);
140 emitter_
->WriteRXPacket_Locked(packet
);
141 stream
->ClearStreamFlags(SSF_RECVING
);
142 stream
->QueueInput();
144 stream
->SetError_Locked(length_error
);
153 UdpNode::UdpNode(Filesystem
* filesystem
)
154 : SocketNode(filesystem
),
155 emitter_(new UdpEventEmitter(kDefaultFifoSize
, kDefaultFifoSize
)) {
156 emitter_
->AttachStream(this);
159 void UdpNode::Destroy() {
160 emitter_
->DetachStream();
161 SocketNode::Destroy();
164 UdpEventEmitter
* UdpNode::GetEventEmitter() {
165 return emitter_
.get();
168 Error
UdpNode::Init(int open_flags
) {
169 Error err
= SocketNode::Init(open_flags
);
173 if (UDPInterface() == NULL
) {
174 LOG_ERROR("Got NULL interface: UDP");
179 UDPInterface()->Create(filesystem_
->ppapi()->GetInstance());
180 if (0 == socket_resource_
) {
181 LOG_ERROR("Unable to create UDP resource.");
188 void UdpNode::QueueInput() {
189 UdpRecvWork
* work
= new UdpRecvWork(emitter_
);
190 stream()->EnqueueWork(work
);
193 void UdpNode::QueueOutput() {
194 if (!TestStreamFlags(SSF_CAN_SEND
))
197 if (TestStreamFlags(SSF_SENDING
))
200 UdpSendWork
* work
= new UdpSendWork(emitter_
, ScopedSocketNode(this));
201 stream()->EnqueueWork(work
);
204 Error
UdpNode::Bind(const struct sockaddr
* addr
, socklen_t len
) {
205 if (0 == socket_resource_
)
208 /* Only bind once. */
212 PP_Resource out_addr
= SockAddrToResource(addr
, len
);
217 UDPInterface()->Bind(socket_resource_
, out_addr
, PP_BlockUntilComplete());
218 filesystem_
->ppapi()->ReleaseResource(out_addr
);
220 return PPErrorToErrno(err
);
222 // Get the address that was actually bound (in case addr was 0.0.0.0:0).
223 out_addr
= UDPInterface()->GetBoundAddress(socket_resource_
);
227 // Now that we are bound, we can start sending and receiving.
228 SetStreamFlags(SSF_CAN_SEND
| SSF_CAN_RECV
);
231 local_addr_
= out_addr
;
235 Error
UdpNode::Connect(const HandleAttr
& attr
,
236 const struct sockaddr
* addr
,
238 if (0 == socket_resource_
)
241 /* Connect for UDP is the default dest, it's legal to change it. */
242 if (remote_addr_
!= 0) {
243 filesystem_
->ppapi()->ReleaseResource(remote_addr_
);
247 remote_addr_
= SockAddrToResource(addr
, len
);
248 if (0 == remote_addr_
)
254 Error
UdpNode::Recv_Locked(void* buf
,
256 PP_Resource
* out_addr
,
258 Packet
* packet
= emitter_
->ReadRXPacket_Locked();
263 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, packet
->len()));
264 memcpy(buf
, packet
->buffer(), capped_len
);
266 if (packet
->addr() != 0) {
267 filesystem_
->ppapi()->AddRefResource(packet
->addr());
268 *out_addr
= packet
->addr();
271 *out_len
= capped_len
;
276 // Should never happen, Recv_Locked should not be called
277 // unless already in a POLLIN state.
281 Error
UdpNode::Send_Locked(const void* buf
,
286 // Pepper requires a socket to be bound before it can send.
288 addr
.sin_family
= AF_INET
;
290 memset(&addr
.sin_addr
, 0, sizeof(addr
.sin_addr
));
292 Bind(reinterpret_cast<const struct sockaddr
*>(&addr
), sizeof(addr
));
298 int capped_len
= static_cast<int32_t>(std::min
<int>(len
, kMaxPacketSize
));
299 Packet
* packet
= new Packet(filesystem_
->ppapi());
300 packet
->Copy(buf
, capped_len
, addr
);
302 emitter_
->WriteTXPacket_Locked(packet
);
303 *out_len
= capped_len
;
307 } // namespace nacl_io