Fix crash on app list start page contents not existing.
[chromium-blink-merge.git] / native_client_sdk / src / libraries / nacl_io / socket / tcp_node.cc
blob738206eb730992f3c8c0fbbaa8437742c0523ebb
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
8 #include <assert.h>
9 #include <errno.h>
10 #include <string.h>
11 #include <algorithm>
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/log.h"
15 #include "nacl_io/pepper_interface.h"
16 #include "nacl_io/socket/tcp_node.h"
17 #include "nacl_io/stream/stream_fs.h"
19 namespace {
20 const size_t kMaxPacketSize = 65536;
21 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
24 namespace nacl_io {
26 class TcpWork : public StreamFs::Work {
27 public:
28 explicit TcpWork(const ScopedTcpEventEmitter& emitter)
29 : StreamFs::Work(emitter->stream()->stream()),
30 emitter_(emitter),
31 data_(NULL) {}
33 ~TcpWork() {
34 free(data_);
37 TCPSocketInterface* TCPInterface() {
38 return filesystem()->ppapi()->GetTCPSocketInterface();
41 protected:
42 ScopedTcpEventEmitter emitter_;
43 char* data_;
46 class TcpSendWork : public TcpWork {
47 public:
48 explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
49 const ScopedSocketNode& stream)
50 : TcpWork(emitter), node_(stream) {}
52 virtual bool Start(int32_t val) {
53 AUTO_LOCK(emitter_->GetLock());
55 // Does the stream exist, and can it send?
56 if (!node_->TestStreamFlags(SSF_CAN_SEND))
57 return false;
59 // Check if we are already sending.
60 if (node_->TestStreamFlags(SSF_SENDING))
61 return false;
63 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
64 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
65 if (capped_len == 0)
66 return false;
68 data_ = (char*)malloc(capped_len);
69 assert(data_);
70 if (data_ == NULL)
71 return false;
72 emitter_->ReadOut_Locked(data_, capped_len);
74 int err = TCPInterface()->Write(node_->socket_resource(),
75 data_,
76 capped_len,
77 filesystem()->GetRunCompletion(this));
79 if (err != PP_OK_COMPLETIONPENDING) {
80 // Anything else, we should assume the socket has gone bad.
81 node_->SetError_Locked(err);
82 return false;
85 node_->SetStreamFlags(SSF_SENDING);
86 return true;
89 virtual void Run(int32_t length_error) {
90 AUTO_LOCK(emitter_->GetLock());
92 if (length_error < 0) {
93 // Send failed, mark the socket as bad
94 node_->SetError_Locked(length_error);
95 return;
98 // If we did send, then Q more work.
99 node_->ClearStreamFlags(SSF_SENDING);
100 node_->QueueOutput();
103 private:
104 // We assume that transmits will always complete. If the upstream
105 // actually back pressures, enough to prevent the Send callback
106 // from triggering, this resource may never go away.
107 ScopedSocketNode node_;
110 class TcpRecvWork : public TcpWork {
111 public:
112 explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
113 : TcpWork(emitter) {}
115 virtual bool Start(int32_t val) {
116 AUTO_LOCK(emitter_->GetLock());
117 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
119 // Does the stream exist, and can it recv?
120 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
121 return false;
123 // If we are not currently receiving
124 if (stream->TestStreamFlags(SSF_RECVING))
125 return false;
127 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
128 int capped_len =
129 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
131 if (capped_len == 0)
132 return false;
134 data_ = (char*)malloc(capped_len);
135 assert(data_);
136 if (data_ == NULL)
137 return false;
138 int err = TCPInterface()->Read(stream->socket_resource(),
139 data_,
140 capped_len,
141 filesystem()->GetRunCompletion(this));
142 if (err != PP_OK_COMPLETIONPENDING) {
143 // Anything else, we should assume the socket has gone bad.
144 stream->SetError_Locked(err);
145 return false;
148 stream->SetStreamFlags(SSF_RECVING);
149 return true;
152 virtual void Run(int32_t length_error) {
153 AUTO_LOCK(emitter_->GetLock());
154 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
156 if (!stream)
157 return;
159 if (length_error < 0) {
160 stream->SetError_Locked(length_error);
161 return;
162 } else if (length_error == 0) {
163 stream->SetStreamFlags(SSF_RECV_ENDOFSTREAM);
164 emitter_->SetRecvEndOfStream_Locked();
167 // If we successfully received, queue more input
168 emitter_->WriteIn_Locked(data_, length_error);
169 stream->ClearStreamFlags(SSF_RECVING);
170 stream->QueueInput();
174 class TCPAcceptWork : public StreamFs::Work {
175 public:
176 explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
177 : StreamFs::Work(stream), emitter_(emitter) {}
179 TCPSocketInterface* TCPInterface() {
180 return filesystem()->ppapi()->GetTCPSocketInterface();
183 virtual bool Start(int32_t val) {
184 AUTO_LOCK(emitter_->GetLock());
185 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
187 // Does the stream exist, and can it accept?
188 if (NULL == node)
189 return false;
191 // If we are not currently accepting
192 if (!node->TestStreamFlags(SSF_LISTENING))
193 return false;
195 int err = TCPInterface()->Accept(node->socket_resource(),
196 &new_socket_,
197 filesystem()->GetRunCompletion(this));
199 if (err != PP_OK_COMPLETIONPENDING) {
200 // Anything else, we should assume the socket has gone bad.
201 node->SetError_Locked(err);
202 return false;
205 return true;
208 virtual void Run(int32_t error) {
209 AUTO_LOCK(emitter_->GetLock());
210 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
212 if (node == NULL)
213 return;
215 if (error != PP_OK) {
216 node->SetError_Locked(error);
217 return;
220 emitter_->SetAcceptedSocket_Locked(new_socket_);
223 protected:
224 PP_Resource new_socket_;
225 ScopedTcpEventEmitter emitter_;
228 class TCPConnectWork : public StreamFs::Work {
229 public:
230 explicit TCPConnectWork(StreamFs* stream,
231 const ScopedTcpEventEmitter& emitter)
232 : StreamFs::Work(stream), emitter_(emitter) {}
234 TCPSocketInterface* TCPInterface() {
235 return filesystem()->ppapi()->GetTCPSocketInterface();
238 virtual bool Start(int32_t val) {
239 AUTO_LOCK(emitter_->GetLock());
240 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
242 // Does the stream exist, and can it connect?
243 if (NULL == node)
244 return false;
246 int err = TCPInterface()->Connect(node->socket_resource(),
247 node->remote_addr(),
248 filesystem()->GetRunCompletion(this));
249 if (err != PP_OK_COMPLETIONPENDING) {
250 // Anything else, we should assume the socket has gone bad.
251 node->SetError_Locked(err);
252 return false;
255 return true;
258 virtual void Run(int32_t error) {
259 AUTO_LOCK(emitter_->GetLock());
260 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
262 if (node == NULL)
263 return;
265 if (error != PP_OK) {
266 node->ConnectFailed_Locked();
267 node->SetError_Locked(error);
268 return;
271 node->ConnectDone_Locked();
274 protected:
275 ScopedTcpEventEmitter emitter_;
278 TcpNode::TcpNode(Filesystem* filesystem)
279 : SocketNode(filesystem),
280 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
281 tcp_nodelay_(false) {
282 emitter_->AttachStream(this);
285 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
286 : SocketNode(filesystem, socket),
287 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
288 tcp_nodelay_(false) {
289 emitter_->AttachStream(this);
292 void TcpNode::Destroy() {
293 emitter_->DetachStream();
294 SocketNode::Destroy();
297 Error TcpNode::Init(int open_flags) {
298 Error err = SocketNode::Init(open_flags);
299 if (err != 0)
300 return err;
302 if (TCPInterface() == NULL) {
303 LOG_ERROR("Got NULL interface: TCP");
304 return EACCES;
307 if (socket_resource_ != 0) {
308 // TCP sockets that are contructed with an existing socket_resource_
309 // are those that generated from calls to Accept() and therefore are
310 // already connected.
311 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
312 ConnectDone_Locked();
313 } else {
314 socket_resource_ =
315 TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
316 if (0 == socket_resource_) {
317 LOG_ERROR("Unable to create TCP resource.");
318 return EACCES;
320 SetStreamFlags(SSF_CAN_CONNECT);
323 return 0;
326 EventEmitter* TcpNode::GetEventEmitter() {
327 return emitter_.get();
330 void TcpNode::SetError_Locked(int pp_error_num) {
331 SocketNode::SetError_Locked(pp_error_num);
332 emitter_->SetError_Locked();
335 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
336 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
337 AUTO_LOCK(node_lock_);
338 int value = tcp_nodelay_;
339 socklen_t value_len = static_cast<socklen_t>(sizeof(value));
340 int copy_bytes = std::min(value_len, *len);
341 memcpy(optval, &value, copy_bytes);
342 *len = value_len;
343 return 0;
346 return SocketNode::GetSockOpt(lvl, optname, optval, len);
349 Error TcpNode::SetNoDelay_Locked() {
350 if (!IsConnected())
351 return 0;
353 int32_t error =
354 TCPInterface()->SetOption(socket_resource_,
355 PP_TCPSOCKET_OPTION_NO_DELAY,
356 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
357 PP_BlockUntilComplete());
358 return PPErrorToErrno(error);
361 Error TcpNode::SetSockOpt(int lvl,
362 int optname,
363 const void* optval,
364 socklen_t len) {
365 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
366 if (static_cast<size_t>(len) < sizeof(int))
367 return EINVAL;
368 AUTO_LOCK(node_lock_);
369 tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
370 return SetNoDelay_Locked();
371 } else if (lvl == SOL_SOCKET && optname == SO_RCVBUF) {
372 if (static_cast<size_t>(len) < sizeof(int))
373 return EINVAL;
374 AUTO_LOCK(node_lock_);
375 int bufsize = *static_cast<const int*>(optval);
376 int32_t error =
377 TCPInterface()->SetOption(socket_resource_,
378 PP_TCPSOCKET_OPTION_RECV_BUFFER_SIZE,
379 PP_MakeInt32(bufsize),
380 PP_BlockUntilComplete());
381 return PPErrorToErrno(error);
382 } else if (lvl == SOL_SOCKET && optname == SO_SNDBUF) {
383 if (static_cast<size_t>(len) < sizeof(int))
384 return EINVAL;
385 AUTO_LOCK(node_lock_);
386 int bufsize = *static_cast<const int*>(optval);
387 int32_t error =
388 TCPInterface()->SetOption(socket_resource_,
389 PP_TCPSOCKET_OPTION_SEND_BUFFER_SIZE,
390 PP_MakeInt32(bufsize),
391 PP_BlockUntilComplete());
392 return PPErrorToErrno(error);
395 return SocketNode::SetSockOpt(lvl, optname, optval, len);
398 void TcpNode::QueueAccept() {
399 StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
400 stream()->EnqueueWork(work);
403 void TcpNode::QueueConnect() {
404 StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
405 stream()->EnqueueWork(work);
408 void TcpNode::QueueInput() {
409 if (TestStreamFlags(SSF_RECV_ENDOFSTREAM))
410 return;
412 StreamFs::Work* work = new TcpRecvWork(emitter_);
413 stream()->EnqueueWork(work);
416 void TcpNode::QueueOutput() {
417 if (TestStreamFlags(SSF_SENDING))
418 return;
420 if (!TestStreamFlags(SSF_CAN_SEND))
421 return;
423 if (0 == emitter_->BytesInOutputFIFO())
424 return;
426 StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
427 stream()->EnqueueWork(work);
430 Error TcpNode::Accept(const HandleAttr& attr,
431 PP_Resource* out_sock,
432 struct sockaddr* addr,
433 socklen_t* len) {
434 EventListenerLock wait(GetEventEmitter());
436 if (!TestStreamFlags(SSF_LISTENING))
437 return EINVAL;
439 // Either block forever or not at all
440 int ms = attr.IsBlocking() ? -1 : 0;
442 Error err = wait.WaitOnEvent(POLLIN, ms);
443 if (ETIMEDOUT == err)
444 return EWOULDBLOCK;
446 int s = emitter_->GetAcceptedSocket_Locked();
447 // Non-blocking case.
448 if (s == 0)
449 return EAGAIN;
451 // Consume the new socket and start listening for the next one
452 *out_sock = s;
453 emitter_->ClearEvents_Locked(POLLIN);
455 // Set the out paramaters
456 if (addr && len) {
457 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
458 *len = ResourceToSockAddr(remote_addr, *len, addr);
459 filesystem_->ppapi()->ReleaseResource(remote_addr);
462 QueueAccept();
463 return 0;
466 // We can not bind a client socket with PPAPI. For now we ignore the
467 // bind but report the correct address later, just in case someone is
468 // binding without really caring what the address is (for example to
469 // select a more optimized interface/route.)
470 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
471 AUTO_LOCK(node_lock_);
473 /* Only bind once. */
474 if (IsBound())
475 return EINVAL;
477 local_addr_ = SockAddrToResource(addr, len);
478 int err = TCPInterface()->Bind(
479 socket_resource_, local_addr_, PP_BlockUntilComplete());
481 // If we fail, release the local addr resource
482 if (err != PP_OK) {
483 filesystem_->ppapi()->ReleaseResource(local_addr_);
484 local_addr_ = 0;
485 return PPErrorToErrno(err);
488 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
489 return 0;
492 Error TcpNode::Connect(const HandleAttr& attr,
493 const struct sockaddr* addr,
494 socklen_t len) {
495 EventListenerLock wait(GetEventEmitter());
497 if (TestStreamFlags(SSF_CONNECTING))
498 return EALREADY;
500 if (IsConnected())
501 return EISCONN;
503 remote_addr_ = SockAddrToResource(addr, len);
504 if (0 == remote_addr_)
505 return EINVAL;
507 int ms = attr.IsBlocking() ? -1 : 0;
509 SetStreamFlags(SSF_CONNECTING);
510 QueueConnect();
512 Error err = wait.WaitOnEvent(POLLOUT, ms);
513 if (ETIMEDOUT == err)
514 return EINPROGRESS;
516 // If we fail, release the dest addr resource
517 if (err != 0) {
518 ConnectFailed_Locked();
519 return err;
522 // Make sure the connection succeeded.
523 if (last_errno_ != 0) {
524 ConnectFailed_Locked();
525 return last_errno_;
528 ConnectDone_Locked();
529 return 0;
532 Error TcpNode::Shutdown(int how) {
533 AUTO_LOCK(node_lock_);
534 if (!IsConnected())
535 return ENOTCONN;
538 AUTO_LOCK(emitter_->GetLock());
539 emitter_->SetError_Locked();
541 return 0;
544 void TcpNode::ConnectDone_Locked() {
545 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
547 // Now that we are connected, we can start sending and receiving.
548 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
549 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
551 emitter_->ConnectDone_Locked();
553 // The NODELAY option cannot be set in PPAPI before the socket
554 // is connected, but setsockopt() might have already set it.
555 SetNoDelay_Locked();
557 // Begin the input pump
558 QueueInput();
561 void TcpNode::ConnectFailed_Locked() {
562 filesystem_->ppapi()->ReleaseResource(remote_addr_);
563 remote_addr_ = 0;
566 Error TcpNode::Listen(int backlog) {
567 AUTO_LOCK(node_lock_);
568 if (!IsBound())
569 return EINVAL;
571 int err = TCPInterface()->Listen(
572 socket_resource_, backlog, PP_BlockUntilComplete());
573 if (err != PP_OK)
574 return PPErrorToErrno(err);
576 ClearStreamFlags(SSF_CAN_CONNECT);
577 SetStreamFlags(SSF_LISTENING);
578 emitter_->SetListening_Locked();
579 QueueAccept();
580 return 0;
583 Error TcpNode::Recv_Locked(void* buf,
584 size_t len,
585 PP_Resource* out_addr,
586 int* out_len) {
587 assert(emitter_.get());
588 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
589 *out_addr = remote_addr_;
591 // Ref the address copy we pass back.
592 filesystem_->ppapi()->AddRefResource(remote_addr_);
593 return 0;
596 // TCP ignores dst addr passed to send_to, and always uses bound address
597 Error TcpNode::Send_Locked(const void* buf,
598 size_t len,
599 PP_Resource,
600 int* out_len) {
601 assert(emitter_.get());
602 if (emitter_->GetError_Locked())
603 return EPIPE;
604 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
605 return 0;
608 } // namespace nacl_io
610 #endif // PROVIDES_SOCKET_API