1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/log.h"
15 #include "nacl_io/pepper_interface.h"
16 #include "nacl_io/socket/tcp_node.h"
17 #include "nacl_io/stream/stream_fs.h"
20 const size_t kMaxPacketSize
= 65536;
21 const size_t kDefaultFifoSize
= kMaxPacketSize
* 8;
26 class TcpWork
: public StreamFs::Work
{
28 explicit TcpWork(const ScopedTcpEventEmitter
& emitter
)
29 : StreamFs::Work(emitter
->stream()->stream()),
37 TCPSocketInterface
* TCPInterface() {
38 return filesystem()->ppapi()->GetTCPSocketInterface();
42 ScopedTcpEventEmitter emitter_
;
46 class TcpSendWork
: public TcpWork
{
48 explicit TcpSendWork(const ScopedTcpEventEmitter
& emitter
,
49 const ScopedSocketNode
& stream
)
50 : TcpWork(emitter
), node_(stream
) {}
52 virtual bool Start(int32_t val
) {
53 AUTO_LOCK(emitter_
->GetLock());
55 // Does the stream exist, and can it send?
56 if (!node_
->TestStreamFlags(SSF_CAN_SEND
))
59 // Check if we are already sending.
60 if (node_
->TestStreamFlags(SSF_SENDING
))
63 size_t tx_data_avail
= emitter_
->BytesInOutputFIFO();
64 int capped_len
= std::min(tx_data_avail
, kMaxPacketSize
);
68 data_
= (char*)malloc(capped_len
);
72 emitter_
->ReadOut_Locked(data_
, capped_len
);
74 int err
= TCPInterface()->Write(node_
->socket_resource(),
77 filesystem()->GetRunCompletion(this));
79 if (err
!= PP_OK_COMPLETIONPENDING
) {
80 // Anything else, we should assume the socket has gone bad.
81 node_
->SetError_Locked(err
);
85 node_
->SetStreamFlags(SSF_SENDING
);
89 virtual void Run(int32_t length_error
) {
90 AUTO_LOCK(emitter_
->GetLock());
92 if (length_error
< 0) {
93 // Send failed, mark the socket as bad
94 node_
->SetError_Locked(length_error
);
98 // If we did send, then Q more work.
99 node_
->ClearStreamFlags(SSF_SENDING
);
100 node_
->QueueOutput();
104 // We assume that transmits will always complete. If the upstream
105 // actually back pressures, enough to prevent the Send callback
106 // from triggering, this resource may never go away.
107 ScopedSocketNode node_
;
110 class TcpRecvWork
: public TcpWork
{
112 explicit TcpRecvWork(const ScopedTcpEventEmitter
& emitter
)
113 : TcpWork(emitter
) {}
115 virtual bool Start(int32_t val
) {
116 AUTO_LOCK(emitter_
->GetLock());
117 TcpNode
* stream
= static_cast<TcpNode
*>(emitter_
->stream());
119 // Does the stream exist, and can it recv?
120 if (NULL
== stream
|| !stream
->TestStreamFlags(SSF_CAN_RECV
))
123 // If we are not currently receiving
124 if (stream
->TestStreamFlags(SSF_RECVING
))
127 size_t rx_space_avail
= emitter_
->SpaceInInputFIFO();
129 static_cast<int32_t>(std::min(rx_space_avail
, kMaxPacketSize
));
134 data_
= (char*)malloc(capped_len
);
138 int err
= TCPInterface()->Read(stream
->socket_resource(),
141 filesystem()->GetRunCompletion(this));
142 if (err
!= PP_OK_COMPLETIONPENDING
) {
143 // Anything else, we should assume the socket has gone bad.
144 stream
->SetError_Locked(err
);
148 stream
->SetStreamFlags(SSF_RECVING
);
152 virtual void Run(int32_t length_error
) {
153 AUTO_LOCK(emitter_
->GetLock());
154 TcpNode
* stream
= static_cast<TcpNode
*>(emitter_
->stream());
159 if (length_error
< 0) {
160 stream
->SetError_Locked(length_error
);
162 } else if (length_error
== 0) {
163 stream
->SetStreamFlags(SSF_RECV_ENDOFSTREAM
);
164 emitter_
->SetRecvEndOfStream_Locked();
167 // If we successfully received, queue more input
168 emitter_
->WriteIn_Locked(data_
, length_error
);
169 stream
->ClearStreamFlags(SSF_RECVING
);
170 stream
->QueueInput();
174 class TCPAcceptWork
: public StreamFs::Work
{
176 explicit TCPAcceptWork(StreamFs
* stream
, const ScopedTcpEventEmitter
& emitter
)
177 : StreamFs::Work(stream
), emitter_(emitter
) {}
179 TCPSocketInterface
* TCPInterface() {
180 return filesystem()->ppapi()->GetTCPSocketInterface();
183 virtual bool Start(int32_t val
) {
184 AUTO_LOCK(emitter_
->GetLock());
185 TcpNode
* node
= static_cast<TcpNode
*>(emitter_
->stream());
187 // Does the stream exist, and can it accept?
191 // If we are not currently accepting
192 if (!node
->TestStreamFlags(SSF_LISTENING
))
195 int err
= TCPInterface()->Accept(node
->socket_resource(),
197 filesystem()->GetRunCompletion(this));
199 if (err
!= PP_OK_COMPLETIONPENDING
) {
200 // Anything else, we should assume the socket has gone bad.
201 node
->SetError_Locked(err
);
208 virtual void Run(int32_t error
) {
209 AUTO_LOCK(emitter_
->GetLock());
210 TcpNode
* node
= static_cast<TcpNode
*>(emitter_
->stream());
215 if (error
!= PP_OK
) {
216 node
->SetError_Locked(error
);
220 emitter_
->SetAcceptedSocket_Locked(new_socket_
);
224 PP_Resource new_socket_
;
225 ScopedTcpEventEmitter emitter_
;
228 class TCPConnectWork
: public StreamFs::Work
{
230 explicit TCPConnectWork(StreamFs
* stream
,
231 const ScopedTcpEventEmitter
& emitter
)
232 : StreamFs::Work(stream
), emitter_(emitter
) {}
234 TCPSocketInterface
* TCPInterface() {
235 return filesystem()->ppapi()->GetTCPSocketInterface();
238 virtual bool Start(int32_t val
) {
239 AUTO_LOCK(emitter_
->GetLock());
240 TcpNode
* node
= static_cast<TcpNode
*>(emitter_
->stream());
242 // Does the stream exist, and can it connect?
246 int err
= TCPInterface()->Connect(node
->socket_resource(),
248 filesystem()->GetRunCompletion(this));
249 if (err
!= PP_OK_COMPLETIONPENDING
) {
250 // Anything else, we should assume the socket has gone bad.
251 node
->SetError_Locked(err
);
258 virtual void Run(int32_t error
) {
259 AUTO_LOCK(emitter_
->GetLock());
260 TcpNode
* node
= static_cast<TcpNode
*>(emitter_
->stream());
265 if (error
!= PP_OK
) {
266 node
->ConnectFailed_Locked();
267 node
->SetError_Locked(error
);
271 node
->ConnectDone_Locked();
275 ScopedTcpEventEmitter emitter_
;
278 TcpNode::TcpNode(Filesystem
* filesystem
)
279 : SocketNode(filesystem
),
280 emitter_(new TcpEventEmitter(kDefaultFifoSize
, kDefaultFifoSize
)),
281 tcp_nodelay_(false) {
282 emitter_
->AttachStream(this);
285 TcpNode::TcpNode(Filesystem
* filesystem
, PP_Resource socket
)
286 : SocketNode(filesystem
, socket
),
287 emitter_(new TcpEventEmitter(kDefaultFifoSize
, kDefaultFifoSize
)),
288 tcp_nodelay_(false) {
289 emitter_
->AttachStream(this);
292 void TcpNode::Destroy() {
293 emitter_
->DetachStream();
294 SocketNode::Destroy();
297 Error
TcpNode::Init(int open_flags
) {
298 Error err
= SocketNode::Init(open_flags
);
302 if (TCPInterface() == NULL
) {
303 LOG_ERROR("Got NULL interface: TCP");
307 if (socket_resource_
!= 0) {
308 // TCP sockets that are contructed with an existing socket_resource_
309 // are those that generated from calls to Accept() and therefore are
310 // already connected.
311 remote_addr_
= TCPInterface()->GetRemoteAddress(socket_resource_
);
312 ConnectDone_Locked();
315 TCPInterface()->Create(filesystem_
->ppapi()->GetInstance());
316 if (0 == socket_resource_
) {
317 LOG_ERROR("Unable to create TCP resource.");
320 SetStreamFlags(SSF_CAN_CONNECT
);
326 EventEmitter
* TcpNode::GetEventEmitter() {
327 return emitter_
.get();
330 void TcpNode::SetError_Locked(int pp_error_num
) {
331 SocketNode::SetError_Locked(pp_error_num
);
332 emitter_
->SetError_Locked();
335 Error
TcpNode::GetSockOpt(int lvl
, int optname
, void* optval
, socklen_t
* len
) {
336 if (lvl
== IPPROTO_TCP
&& optname
== TCP_NODELAY
) {
337 AUTO_LOCK(node_lock_
);
338 int value
= tcp_nodelay_
;
339 socklen_t value_len
= static_cast<socklen_t
>(sizeof(value
));
340 int copy_bytes
= std::min(value_len
, *len
);
341 memcpy(optval
, &value
, copy_bytes
);
346 return SocketNode::GetSockOpt(lvl
, optname
, optval
, len
);
349 Error
TcpNode::SetNoDelay_Locked() {
354 TCPInterface()->SetOption(socket_resource_
,
355 PP_TCPSOCKET_OPTION_NO_DELAY
,
356 PP_MakeBool(tcp_nodelay_
? PP_TRUE
: PP_FALSE
),
357 PP_BlockUntilComplete());
358 return PPErrorToErrno(error
);
361 Error
TcpNode::SetSockOpt(int lvl
,
365 if (lvl
== IPPROTO_TCP
&& optname
== TCP_NODELAY
) {
366 if (static_cast<size_t>(len
) < sizeof(int))
368 AUTO_LOCK(node_lock_
);
369 tcp_nodelay_
= *static_cast<const int*>(optval
) != 0;
370 return SetNoDelay_Locked();
371 } else if (lvl
== SOL_SOCKET
&& optname
== SO_RCVBUF
) {
372 if (static_cast<size_t>(len
) < sizeof(int))
374 AUTO_LOCK(node_lock_
);
375 int bufsize
= *static_cast<const int*>(optval
);
377 TCPInterface()->SetOption(socket_resource_
,
378 PP_TCPSOCKET_OPTION_RECV_BUFFER_SIZE
,
379 PP_MakeInt32(bufsize
),
380 PP_BlockUntilComplete());
381 return PPErrorToErrno(error
);
382 } else if (lvl
== SOL_SOCKET
&& optname
== SO_SNDBUF
) {
383 if (static_cast<size_t>(len
) < sizeof(int))
385 AUTO_LOCK(node_lock_
);
386 int bufsize
= *static_cast<const int*>(optval
);
388 TCPInterface()->SetOption(socket_resource_
,
389 PP_TCPSOCKET_OPTION_SEND_BUFFER_SIZE
,
390 PP_MakeInt32(bufsize
),
391 PP_BlockUntilComplete());
392 return PPErrorToErrno(error
);
395 return SocketNode::SetSockOpt(lvl
, optname
, optval
, len
);
398 void TcpNode::QueueAccept() {
399 StreamFs::Work
* work
= new TCPAcceptWork(stream(), emitter_
);
400 stream()->EnqueueWork(work
);
403 void TcpNode::QueueConnect() {
404 StreamFs::Work
* work
= new TCPConnectWork(stream(), emitter_
);
405 stream()->EnqueueWork(work
);
408 void TcpNode::QueueInput() {
409 if (TestStreamFlags(SSF_RECV_ENDOFSTREAM
))
412 StreamFs::Work
* work
= new TcpRecvWork(emitter_
);
413 stream()->EnqueueWork(work
);
416 void TcpNode::QueueOutput() {
417 if (TestStreamFlags(SSF_SENDING
))
420 if (!TestStreamFlags(SSF_CAN_SEND
))
423 if (0 == emitter_
->BytesInOutputFIFO())
426 StreamFs::Work
* work
= new TcpSendWork(emitter_
, ScopedSocketNode(this));
427 stream()->EnqueueWork(work
);
430 Error
TcpNode::Accept(const HandleAttr
& attr
,
431 PP_Resource
* out_sock
,
432 struct sockaddr
* addr
,
434 EventListenerLock
wait(GetEventEmitter());
436 if (!TestStreamFlags(SSF_LISTENING
))
439 // Either block forever or not at all
440 int ms
= attr
.IsBlocking() ? -1 : 0;
442 Error err
= wait
.WaitOnEvent(POLLIN
, ms
);
443 if (ETIMEDOUT
== err
)
446 int s
= emitter_
->GetAcceptedSocket_Locked();
447 // Non-blocking case.
451 // Consume the new socket and start listening for the next one
453 emitter_
->ClearEvents_Locked(POLLIN
);
455 // Set the out paramaters
457 PP_Resource remote_addr
= TCPInterface()->GetRemoteAddress(*out_sock
);
458 *len
= ResourceToSockAddr(remote_addr
, *len
, addr
);
459 filesystem_
->ppapi()->ReleaseResource(remote_addr
);
466 // We can not bind a client socket with PPAPI. For now we ignore the
467 // bind but report the correct address later, just in case someone is
468 // binding without really caring what the address is (for example to
469 // select a more optimized interface/route.)
470 Error
TcpNode::Bind(const struct sockaddr
* addr
, socklen_t len
) {
471 AUTO_LOCK(node_lock_
);
473 /* Only bind once. */
477 local_addr_
= SockAddrToResource(addr
, len
);
478 int err
= TCPInterface()->Bind(
479 socket_resource_
, local_addr_
, PP_BlockUntilComplete());
481 // If we fail, release the local addr resource
483 filesystem_
->ppapi()->ReleaseResource(local_addr_
);
485 return PPErrorToErrno(err
);
488 local_addr_
= TCPInterface()->GetLocalAddress(socket_resource_
);
492 Error
TcpNode::Connect(const HandleAttr
& attr
,
493 const struct sockaddr
* addr
,
495 EventListenerLock
wait(GetEventEmitter());
497 if (TestStreamFlags(SSF_CONNECTING
))
503 remote_addr_
= SockAddrToResource(addr
, len
);
504 if (0 == remote_addr_
)
507 int ms
= attr
.IsBlocking() ? -1 : 0;
509 SetStreamFlags(SSF_CONNECTING
);
512 Error err
= wait
.WaitOnEvent(POLLOUT
, ms
);
513 if (ETIMEDOUT
== err
)
516 // If we fail, release the dest addr resource
518 ConnectFailed_Locked();
522 // Make sure the connection succeeded.
523 if (last_errno_
!= 0) {
524 ConnectFailed_Locked();
528 ConnectDone_Locked();
532 Error
TcpNode::Shutdown(int how
) {
533 AUTO_LOCK(node_lock_
);
538 AUTO_LOCK(emitter_
->GetLock());
539 emitter_
->SetError_Locked();
544 void TcpNode::ConnectDone_Locked() {
545 local_addr_
= TCPInterface()->GetLocalAddress(socket_resource_
);
547 // Now that we are connected, we can start sending and receiving.
548 ClearStreamFlags(SSF_CONNECTING
| SSF_CAN_CONNECT
);
549 SetStreamFlags(SSF_CAN_SEND
| SSF_CAN_RECV
);
551 emitter_
->ConnectDone_Locked();
553 // The NODELAY option cannot be set in PPAPI before the socket
554 // is connected, but setsockopt() might have already set it.
557 // Begin the input pump
561 void TcpNode::ConnectFailed_Locked() {
562 filesystem_
->ppapi()->ReleaseResource(remote_addr_
);
566 Error
TcpNode::Listen(int backlog
) {
567 AUTO_LOCK(node_lock_
);
571 int err
= TCPInterface()->Listen(
572 socket_resource_
, backlog
, PP_BlockUntilComplete());
574 return PPErrorToErrno(err
);
576 ClearStreamFlags(SSF_CAN_CONNECT
);
577 SetStreamFlags(SSF_LISTENING
);
578 emitter_
->SetListening_Locked();
583 Error
TcpNode::Recv_Locked(void* buf
,
585 PP_Resource
* out_addr
,
587 assert(emitter_
.get());
588 *out_len
= emitter_
->ReadIn_Locked((char*)buf
, len
);
589 *out_addr
= remote_addr_
;
591 // Ref the address copy we pass back.
592 filesystem_
->ppapi()->AddRefResource(remote_addr_
);
596 // TCP ignores dst addr passed to send_to, and always uses bound address
597 Error
TcpNode::Send_Locked(const void* buf
,
601 assert(emitter_
.get());
602 if (emitter_
->GetError_Locked())
604 *out_len
= emitter_
->WriteOut_Locked((char*)buf
, len
);
608 } // namespace nacl_io
610 #endif // PROVIDES_SOCKET_API