2 * Copyright 2006-2010, Haiku, Inc. All Rights Reserved.
3 * Distributed under the terms of the MIT License.
6 * Andrew Galante, haiku.galante@gmail.com
7 * Axel Dörfler, axeld@pinc-software.de
8 * Hugo Santos, hugosantos@gmail.com
12 #include "TCPEndpoint.h"
14 #include <netinet/in.h>
15 #include <netinet/ip.h>
16 #include <netinet/tcp.h>
22 #include <KernelExport.h>
25 #include <net_buffer.h>
26 #include <net_datalink.h>
28 #include <NetBufferUtilities.h>
29 #include <NetUtilities.h>
33 #include <util/AutoLock.h>
34 #include <util/list.h>
36 #include "EndpointManager.h"
40 // - RFC 793 - Transmission Control Protocol
41 // - RFC 813 - Window and Acknowledgement Strategy in TCP
42 // - RFC 1337 - TIME_WAIT Assassination Hazards in TCP
44 // Things this implementation currently doesn't implement:
45 // - TCP Slow Start, Congestion Avoidance, Fast Retransmit, and Fast Recovery,
46 // RFC 2001, RFC 2581, RFC 3042
47 // - NewReno Modification to TCP's Fast Recovery, RFC 2582
48 // - Explicit Congestion Notification (ECN), RFC 3168
50 // - TCP Extensions for High Performance, RFC 1323
51 // - SACK, Selective Acknowledgment - RFC 2018, RFC 2883, RFC 3517
52 // - Forward RTO-Recovery, RFC 4138
53 // - Time-Wait hash instead of keeping sockets alive
55 #define PrintAddress(address) \
56 AddressString(Domain(), address, true).Data()
62 // the space before ', ##args' is important in order for this to work with cpp 2.95
63 # define TRACE(format, args...) dprintf("%" B_PRId32 ": TCP [%" \
64 B_PRIdBIGTIME "] %p (%12s) " format "\n", find_thread(NULL), \
65 system_time(), this, name_for_state(fState) , ##args)
67 # define TRACE(args...) do { } while (0)
71 # define PROBE(buffer, window) \
72 dprintf("TCP PROBE %" B_PRIdBIGTIME " %s %s %" B_PRIu32 " snxt %" B_PRIu32 \
73 " suna %" B_PRIu32 " cw %" B_PRIu32 " sst %" B_PRIu32 " win %" \
74 B_PRIu32 " swin %" B_PRIu32 " smax-suna %" B_PRIu32 " savail %" \
75 B_PRIuSIZE " sqused %" B_PRIuSIZE " rto %" B_PRIdBIGTIME "\n", \
76 system_time(), PrintAddress(buffer->source), \
77 PrintAddress(buffer->destination), buffer->size, fSendNext.Number(), \
78 fSendUnacknowledged.Number(), fCongestionWindow, fSlowStartThreshold, \
79 window, fSendWindow, (fSendMax - fSendUnacknowledged).Number(), \
80 fSendQueue.Available(fSendNext), fSendQueue.Used(), fRetransmitTimeout)
82 # define PROBE(buffer, window) do { } while (0)
86 namespace TCPTracing
{
88 class Receive
: public AbstractTraceEntry
{
90 Receive(TCPEndpoint
* endpoint
, tcp_segment_header
& segment
, uint32 window
,
95 fBufferSize(buffer
->size
),
96 fSequence(segment
.sequence
),
97 fAcknowledge(segment
.acknowledge
),
99 fState(endpoint
->State()),
100 fFlags(segment
.flags
)
105 virtual void AddDump(TraceOutput
& out
)
107 out
.Print("tcp:%p (%12s) receive buffer %p (%" B_PRIu32
" bytes), "
108 "flags %#" B_PRIx8
", seq %" B_PRIu32
", ack %" B_PRIu32
109 ", wnd %" B_PRIu32
, fEndpoint
, name_for_state(fState
), fBuffer
,
110 fBufferSize
, fFlags
, fSequence
, fAcknowledge
, fWindow
);
114 TCPEndpoint
* fEndpoint
;
124 class Send
: public AbstractTraceEntry
{
126 Send(TCPEndpoint
* endpoint
, tcp_segment_header
& segment
, net_buffer
* buffer
,
127 tcp_sequence firstSequence
, tcp_sequence lastSequence
)
131 fBufferSize(buffer
->size
),
132 fSequence(segment
.sequence
),
133 fAcknowledge(segment
.acknowledge
),
134 fFirstSequence(firstSequence
.Number()),
135 fLastSequence(lastSequence
.Number()),
136 fState(endpoint
->State()),
137 fFlags(segment
.flags
)
142 virtual void AddDump(TraceOutput
& out
)
144 out
.Print("tcp:%p (%12s) send buffer %p (%" B_PRIu32
" bytes), "
145 "flags %#" B_PRIx8
", seq %" B_PRIu32
", ack %" B_PRIu32
146 ", first %" B_PRIu32
", last %" B_PRIu32
, fEndpoint
,
147 name_for_state(fState
), fBuffer
, fBufferSize
, fFlags
, fSequence
,
148 fAcknowledge
, fFirstSequence
, fLastSequence
);
152 TCPEndpoint
* fEndpoint
;
157 uint32 fFirstSequence
;
158 uint32 fLastSequence
;
163 class State
: public AbstractTraceEntry
{
165 State(TCPEndpoint
* endpoint
)
168 fState(endpoint
->State())
173 virtual void AddDump(TraceOutput
& out
)
175 out
.Print("tcp:%p (%12s) state change", fEndpoint
,
176 name_for_state(fState
));
180 TCPEndpoint
* fEndpoint
;
184 class Spawn
: public AbstractTraceEntry
{
186 Spawn(TCPEndpoint
* listeningEndpoint
, TCPEndpoint
* spawnedEndpoint
)
188 fListeningEndpoint(listeningEndpoint
),
189 fSpawnedEndpoint(spawnedEndpoint
)
194 virtual void AddDump(TraceOutput
& out
)
196 out
.Print("tcp:%p spawns %p", fListeningEndpoint
, fSpawnedEndpoint
);
200 TCPEndpoint
* fListeningEndpoint
;
201 TCPEndpoint
* fSpawnedEndpoint
;
204 class Error
: public AbstractTraceEntry
{
206 Error(TCPEndpoint
* endpoint
, const char* error
, int32 line
)
211 fState(endpoint
->State())
216 virtual void AddDump(TraceOutput
& out
)
218 out
.Print("tcp:%p (%12s) error at line %" B_PRId32
": %s", fEndpoint
,
219 name_for_state(fState
), fLine
, fError
);
223 TCPEndpoint
* fEndpoint
;
229 class TimerSet
: public AbstractTraceEntry
{
231 TimerSet(TCPEndpoint
* endpoint
, const char* which
, bigtime_t timeout
)
236 fState(endpoint
->State())
241 virtual void AddDump(TraceOutput
& out
)
243 out
.Print("tcp:%p (%12s) %s timer set to %" B_PRIdBIGTIME
, fEndpoint
,
244 name_for_state(fState
), fWhich
, fTimeout
);
248 TCPEndpoint
* fEndpoint
;
254 class TimerTriggered
: public AbstractTraceEntry
{
256 TimerTriggered(TCPEndpoint
* endpoint
, const char* which
)
260 fState(endpoint
->State())
265 virtual void AddDump(TraceOutput
& out
)
267 out
.Print("tcp:%p (%12s) %s timer triggered", fEndpoint
,
268 name_for_state(fState
), fWhich
);
272 TCPEndpoint
* fEndpoint
;
277 class APICall
: public AbstractTraceEntry
{
279 APICall(TCPEndpoint
* endpoint
, const char* which
)
283 fState(endpoint
->State())
288 virtual void AddDump(TraceOutput
& out
)
290 out
.Print("tcp:%p (%12s) api call: %s", fEndpoint
,
291 name_for_state(fState
), fWhich
);
295 TCPEndpoint
* fEndpoint
;
300 } // namespace TCPTracing
302 # define T(x) new(std::nothrow) TCPTracing::x
305 #endif // TCP_TRACING
307 // Initial estimate for packet round trip time (RTT)
308 #define TCP_INITIAL_RTT 2000000
310 // constants for the fFlags field
312 FLAG_OPTION_WINDOW_SCALE
= 0x01,
313 FLAG_OPTION_TIMESTAMP
= 0x02,
314 // TODO: Should FLAG_NO_RECEIVE apply as well to received connections?
315 // That is, what is expected from accept() after a shutdown()
316 // is performed on a listen()ing socket.
317 FLAG_NO_RECEIVE
= 0x04,
319 FLAG_DELETE_ON_CLOSE
= 0x10,
324 static const int kTimestampFactor
= 1024;
327 static inline bigtime_t
328 absolute_timeout(bigtime_t timeout
)
330 if (timeout
== 0 || timeout
== B_INFINITE_TIMEOUT
)
333 return timeout
+ system_time();
337 static inline status_t
338 posix_error(status_t error
)
340 if (error
== B_TIMED_OUT
)
341 return B_WOULD_BLOCK
;
348 in_window(const tcp_sequence
& sequence
, const tcp_sequence
& receiveNext
,
349 uint32 receiveWindow
)
351 return sequence
>= receiveNext
&& sequence
< (receiveNext
+ receiveWindow
);
356 segment_in_sequence(const tcp_segment_header
& segment
, int size
,
357 const tcp_sequence
& receiveNext
, uint32 receiveWindow
)
359 tcp_sequence
sequence(segment
.sequence
);
361 if (receiveWindow
== 0)
362 return sequence
== receiveNext
;
363 return in_window(sequence
, receiveNext
, receiveWindow
);
365 if (receiveWindow
== 0)
367 return in_window(sequence
, receiveNext
, receiveWindow
)
368 || in_window(sequence
+ size
- 1, receiveNext
, receiveWindow
);
374 is_writable(tcp_state state
)
376 return state
== ESTABLISHED
|| state
== FINISH_RECEIVED
;
381 is_establishing(tcp_state state
)
383 return state
== SYNCHRONIZE_SENT
|| state
== SYNCHRONIZE_RECEIVED
;
387 static inline uint32
tcp_now()
389 return system_time() / kTimestampFactor
;
393 static inline uint32
tcp_diff_timestamp(uint32 base
)
395 uint32 now
= tcp_now();
400 return now
+ UINT_MAX
- base
;
405 state_needs_finish(int32 state
)
407 return state
== WAIT_FOR_FINISH_ACKNOWLEDGE
408 || state
== FINISH_SENT
|| state
== CLOSING
;
415 TCPEndpoint::TCPEndpoint(net_socket
* socket
)
417 ProtocolSocket(socket
),
421 fReceiveWindowShift(0),
422 fSendUnacknowledged(0),
425 fSendUrgentOffset(0),
428 fSendMaxSegmentSize(TCP_DEFAULT_MAX_SEGMENT_SIZE
),
429 fSendQueue(socket
->send
.buffer_size
),
430 fInitialSendSequence(0),
431 fDuplicateAcknowledgeCount(0),
434 fReceiveMaxAdvertised(0),
435 fReceiveWindow(socket
->receive
.buffer_size
),
436 fReceiveMaxSegmentSize(TCP_DEFAULT_MAX_SEGMENT_SIZE
),
437 fReceiveQueue(socket
->receive
.buffer_size
),
438 fRoundTripTime(TCP_INITIAL_RTT
/ kTimestampFactor
),
439 fRoundTripDeviation(TCP_INITIAL_RTT
/ kTimestampFactor
),
440 fRetransmitTimeout(TCP_INITIAL_RTT
),
441 fReceivedTimestamp(0),
442 fCongestionWindow(0),
443 fSlowStartThreshold(0),
445 fFlags(FLAG_OPTION_WINDOW_SCALE
| FLAG_OPTION_TIMESTAMP
)
447 // TODO: to be replaced with a real read/write locking strategy!
448 mutex_init(&fLock
, "tcp lock");
450 fReceiveCondition
.Init(this, "tcp receive");
451 fSendCondition
.Init(this, "tcp send");
453 gStackModule
->init_timer(&fPersistTimer
, TCPEndpoint::_PersistTimer
, this);
454 gStackModule
->init_timer(&fRetransmitTimer
, TCPEndpoint::_RetransmitTimer
,
456 gStackModule
->init_timer(&fDelayedAcknowledgeTimer
,
457 TCPEndpoint::_DelayedAcknowledgeTimer
, this);
458 gStackModule
->init_timer(&fTimeWaitTimer
, TCPEndpoint::_TimeWaitTimer
,
461 T(APICall(this, "constructor"));
465 TCPEndpoint::~TCPEndpoint()
469 T(APICall(this, "destructor"));
471 _CancelConnectionTimers();
472 gStackModule
->cancel_timer(&fTimeWaitTimer
);
473 T(TimerSet(this, "time-wait", -1));
475 if (fManager
!= NULL
) {
476 fManager
->Unbind(this);
477 put_endpoint_manager(fManager
);
480 mutex_destroy(&fLock
);
482 // we need to wait for all timers to return
483 gStackModule
->wait_for_timer(&fRetransmitTimer
);
484 gStackModule
->wait_for_timer(&fPersistTimer
);
485 gStackModule
->wait_for_timer(&fDelayedAcknowledgeTimer
);
486 gStackModule
->wait_for_timer(&fTimeWaitTimer
);
488 gDatalinkModule
->put_route(Domain(), fRoute
);
493 TCPEndpoint::InitCheck() const
499 // #pragma mark - protocol API
506 T(APICall(this, "open"));
508 status_t status
= ProtocolSocket::Open();
512 fManager
= get_endpoint_manager(Domain());
513 if (fManager
== NULL
)
523 MutexLocker
locker(fLock
);
526 T(APICall(this, "close"));
528 if (fState
== LISTEN
)
529 delete_sem(fAcceptSemaphore
);
531 if (fState
== SYNCHRONIZE_SENT
|| fState
== LISTEN
) {
532 // TODO: what about linger in case of SYNCHRONIZE_SENT?
538 status_t status
= _Disconnect(true);
542 if (socket
->options
& SO_LINGER
) {
543 TRACE("Close(): Lingering for %i secs", socket
->linger
);
545 bigtime_t maximum
= absolute_timeout(socket
->linger
* 1000000LL);
547 while (fSendQueue
.Used() > 0) {
548 status
= _WaitForCondition(fSendCondition
, locker
, maximum
);
549 if (status
== B_TIMED_OUT
|| status
== B_WOULD_BLOCK
)
551 else if (status
< B_OK
)
555 TRACE("Close(): after waiting, the SendQ was left with %" B_PRIuSIZE
556 " bytes.", fSendQueue
.Used());
565 MutexLocker
_(fLock
);
568 T(APICall(this, "free"));
570 if (fState
<= SYNCHRONIZE_SENT
)
573 // we are only interested in the timer, not in changing state
576 fFlags
|= FLAG_CLOSED
;
577 if ((fFlags
& FLAG_DELETE_ON_CLOSE
) == 0) {
578 // we'll be freed later when the 2MSL timer expires
579 gSocketModule
->acquire_socket(socket
);
584 /*! Creates and sends a synchronize packet to /a address, and then waits
585 until the connection has been established or refused.
588 TCPEndpoint::Connect(const sockaddr
* address
)
590 if (!AddressModule()->is_same_family(address
))
593 MutexLocker
locker(fLock
);
595 TRACE("Connect() on address %s", PrintAddress(address
));
596 T(APICall(this, "connect"));
598 if (gStackModule
->is_restarted_syscall()) {
599 bigtime_t timeout
= gStackModule
->restore_syscall_restart_timeout();
600 status_t status
= _WaitForEstablished(locker
, timeout
);
601 TRACE(" Connect(): Connection complete: %s (timeout was %"
602 B_PRIdBIGTIME
")", strerror(status
), timeout
);
603 return posix_error(status
);
606 // Can only call connect() from CLOSED or LISTEN states
607 // otherwise endpoint is considered already connected
608 if (fState
== LISTEN
) {
609 // this socket is about to connect; remove pending connections in the backlog
610 gSocketModule
->set_max_backlog(socket
, 0);
611 } else if (fState
== ESTABLISHED
) {
613 } else if (fState
!= CLOSED
)
616 // consider destination address INADDR_ANY as INADDR_LOOPBACK
617 sockaddr_storage _address
;
618 if (AddressModule()->is_empty_address(address
, false)) {
619 AddressModule()->get_loopback_address((sockaddr
*)&_address
);
620 // for IPv4 and IPv6 the port is at the same offset
621 ((sockaddr_in
&)_address
).sin_port
= ((sockaddr_in
*)address
)->sin_port
;
622 address
= (sockaddr
*)&_address
;
625 status_t status
= _PrepareSendPath(address
);
629 TRACE(" Connect(): starting 3-way handshake...");
631 fState
= SYNCHRONIZE_SENT
;
635 status
= _SendQueued();
636 if (status
!= B_OK
) {
641 // If we are running over Loopback, after _SendQueued() returns we
642 // may be in ESTABLISHED already.
643 if (fState
== ESTABLISHED
) {
644 TRACE(" Connect() completed after _SendQueued()");
648 // wait until 3-way handshake is complete (if needed)
649 bigtime_t timeout
= min_c(socket
->send
.timeout
, TCP_CONNECTION_TIMEOUT
);
651 // we're a non-blocking socket
652 TRACE(" Connect() delayed, return EINPROGRESS");
656 bigtime_t absoluteTimeout
= absolute_timeout(timeout
);
657 gStackModule
->store_syscall_restart_timeout(absoluteTimeout
);
659 status
= _WaitForEstablished(locker
, absoluteTimeout
);
660 TRACE(" Connect(): Connection complete: %s (timeout was %" B_PRIdBIGTIME
661 ")", strerror(status
), timeout
);
662 return posix_error(status
);
667 TCPEndpoint::Accept(struct net_socket
** _acceptedSocket
)
669 MutexLocker
locker(fLock
);
672 T(APICall(this, "accept"));
675 bigtime_t timeout
= absolute_timeout(socket
->receive
.timeout
);
676 if (gStackModule
->is_restarted_syscall())
677 timeout
= gStackModule
->restore_syscall_restart_timeout();
679 gStackModule
->store_syscall_restart_timeout(timeout
);
684 status
= acquire_sem_etc(fAcceptSemaphore
, 1, B_ABSOLUTE_TIMEOUT
685 | B_CAN_INTERRUPT
, timeout
);
686 if (status
!= B_OK
) {
687 if (status
== B_TIMED_OUT
&& socket
->receive
.timeout
== 0)
688 return B_WOULD_BLOCK
;
694 status
= gSocketModule
->dequeue_connected(socket
, _acceptedSocket
);
697 TRACE(" Accept() returning %p", (*_acceptedSocket
)->first_protocol
);
699 } while (status
!= B_OK
);
706 TCPEndpoint::Bind(const sockaddr
*address
)
711 MutexLocker
lock(fLock
);
713 TRACE("Bind() on address %s", PrintAddress(address
));
714 T(APICall(this, "bind"));
716 if (fState
!= CLOSED
)
719 return fManager
->Bind(this, address
);
724 TCPEndpoint::Unbind(struct sockaddr
*address
)
726 MutexLocker
_(fLock
);
729 T(APICall(this, "unbind"));
731 return fManager
->Unbind(this);
736 TCPEndpoint::Listen(int count
)
738 MutexLocker
_(fLock
);
741 T(APICall(this, "listen"));
743 if (fState
!= CLOSED
&& fState
!= LISTEN
)
746 if (fState
== CLOSED
) {
747 fAcceptSemaphore
= create_sem(0, "tcp accept");
748 if (fAcceptSemaphore
< B_OK
)
751 status_t status
= fManager
->SetPassive(this);
752 if (status
!= B_OK
) {
753 delete_sem(fAcceptSemaphore
);
754 fAcceptSemaphore
= -1;
759 gSocketModule
->set_max_backlog(socket
, count
);
768 TCPEndpoint::Shutdown(int direction
)
770 MutexLocker
lock(fLock
);
772 TRACE("Shutdown(%i)", direction
);
773 T(APICall(this, "shutdown"));
775 if (direction
== SHUT_RD
|| direction
== SHUT_RDWR
)
776 fFlags
|= FLAG_NO_RECEIVE
;
778 if (direction
== SHUT_WR
|| direction
== SHUT_RDWR
) {
779 // TODO: That's not correct. After read/write shutting down the socket
780 // one should still be able to read previously arrived data.
788 /*! Puts data contained in \a buffer into send buffer */
790 TCPEndpoint::SendData(net_buffer
*buffer
)
792 MutexLocker
lock(fLock
);
794 TRACE("SendData(buffer %p, size %" B_PRIu32
", flags %#" B_PRIx32
795 ") [total %" B_PRIuSIZE
" bytes, has %" B_PRIuSIZE
"]", buffer
,
796 buffer
->size
, buffer
->flags
, fSendQueue
.Size(), fSendQueue
.Free());
797 T(APICall(this, "senddata"));
799 uint32 flags
= buffer
->flags
;
801 if (fState
== CLOSED
)
803 if (fState
== LISTEN
)
805 if (!is_writable(fState
) && !is_establishing(fState
)) {
806 // we only send signals when called from userland
807 if (gStackModule
->is_syscall() && (flags
& MSG_NOSIGNAL
) == 0)
808 send_signal(find_thread(NULL
), SIGPIPE
);
812 size_t left
= buffer
->size
;
814 bigtime_t timeout
= absolute_timeout(socket
->send
.timeout
);
815 if (gStackModule
->is_restarted_syscall())
816 timeout
= gStackModule
->restore_syscall_restart_timeout();
818 gStackModule
->store_syscall_restart_timeout(timeout
);
821 while (fSendQueue
.Free() < socket
->send
.low_water_mark
) {
822 // wait until enough space is available
823 status_t status
= _WaitForCondition(fSendCondition
, lock
, timeout
);
825 TRACE(" SendData() returning %s (%d)",
826 strerror(posix_error(status
)), (int)posix_error(status
));
827 return posix_error(status
);
830 if (!is_writable(fState
) && !is_establishing(fState
)) {
831 // we only send signals when called from userland
832 if (gStackModule
->is_syscall())
833 send_signal(find_thread(NULL
), SIGPIPE
);
838 size_t size
= fSendQueue
.Free();
840 // we need to split the original buffer
841 net_buffer
* clone
= gBufferModule
->clone(buffer
, false);
842 // TODO: add offset/size parameter to net_buffer::clone() or
843 // even a move_data() function, as this is a bit inefficient
847 status_t status
= gBufferModule
->trim(clone
, size
);
848 if (status
!= B_OK
) {
849 gBufferModule
->free(clone
);
853 gBufferModule
->remove_header(buffer
, size
);
855 fSendQueue
.Add(clone
);
857 left
-= buffer
->size
;
858 fSendQueue
.Add(buffer
);
862 TRACE(" SendData(): %" B_PRIuSIZE
" bytes used.", fSendQueue
.Used());
865 if ((flags
& MSG_OOB
) != 0) {
866 fSendUrgentOffset
= fSendQueue
.LastSequence();
867 // RFC 961 specifies that the urgent offset points to the last
868 // byte of urgent data. However, this is commonly implemented as
869 // here, ie. it points to the first byte after the urgent data.
872 if ((flags
& MSG_EOF
) != 0)
875 if (fState
== ESTABLISHED
|| fState
== FINISH_RECEIVED
)
883 TCPEndpoint::SendAvailable()
885 MutexLocker
locker(fLock
);
889 if (is_writable(fState
))
890 available
= fSendQueue
.Free();
891 else if (is_establishing(fState
))
896 TRACE("SendAvailable(): %" B_PRIdSSIZE
, available
);
897 T(APICall(this, "sendavailable"));
903 TCPEndpoint::FillStat(net_stat
*stat
)
905 MutexLocker
_(fLock
);
907 strlcpy(stat
->state
, name_for_state(fState
), sizeof(stat
->state
));
908 stat
->receive_queue_size
= fReceiveQueue
.Available();
909 stat
->send_queue_size
= fSendQueue
.Used();
916 TCPEndpoint::ReadData(size_t numBytes
, uint32 flags
, net_buffer
** _buffer
)
918 MutexLocker
locker(fLock
);
920 TRACE("ReadData(%" B_PRIuSIZE
" bytes, flags %#" B_PRIx32
")", numBytes
,
922 T(APICall(this, "readdata"));
926 if (fState
== CLOSED
)
929 bigtime_t timeout
= absolute_timeout(socket
->receive
.timeout
);
930 if (gStackModule
->is_restarted_syscall())
931 timeout
= gStackModule
->restore_syscall_restart_timeout();
933 gStackModule
->store_syscall_restart_timeout(timeout
);
935 if (fState
== SYNCHRONIZE_SENT
|| fState
== SYNCHRONIZE_RECEIVED
) {
936 if (flags
& MSG_DONTWAIT
)
937 return B_WOULD_BLOCK
;
939 status_t status
= _WaitForEstablished(locker
, timeout
);
941 return posix_error(status
);
944 size_t dataNeeded
= socket
->receive
.low_water_mark
;
946 // When MSG_WAITALL is set then the function should block
947 // until the full amount of data can be returned.
948 if (flags
& MSG_WAITALL
)
949 dataNeeded
= numBytes
;
951 // TODO: add support for urgent data (MSG_OOB)
954 if (fState
== CLOSING
|| fState
== WAIT_FOR_FINISH_ACKNOWLEDGE
955 || fState
== TIME_WAIT
) {
956 // ``Connection closing''.
960 if (fReceiveQueue
.Available() > 0) {
961 if (fReceiveQueue
.Available() >= dataNeeded
962 || (fReceiveQueue
.PushedData() > 0
963 && fReceiveQueue
.PushedData() >= fReceiveQueue
.Available()))
965 } else if (fState
== FINISH_RECEIVED
) {
966 // ``If no text is awaiting delivery, the RECEIVE will
967 // get a Connection closing''.
971 if ((flags
& MSG_DONTWAIT
) != 0 || socket
->receive
.timeout
== 0)
972 return B_WOULD_BLOCK
;
974 if ((fFlags
& FLAG_NO_RECEIVE
) != 0)
977 status_t status
= _WaitForCondition(fReceiveCondition
, locker
, timeout
);
979 // The Open Group base specification mentions that EINTR should be
980 // returned if the recv() is interrupted before _any data_ is
981 // available. So we actually check if there is data, and if so,
982 // push it to the user.
983 if ((status
== B_TIMED_OUT
|| status
== B_INTERRUPTED
)
984 && fReceiveQueue
.Available() > 0)
987 return posix_error(status
);
991 TRACE(" ReadData(): %" B_PRIuSIZE
" are available.",
992 fReceiveQueue
.Available());
994 if (numBytes
< fReceiveQueue
.Available())
995 fReceiveCondition
.NotifyAll();
997 bool clone
= (flags
& MSG_PEEK
) != 0;
999 ssize_t receivedBytes
= fReceiveQueue
.Get(numBytes
, !clone
, _buffer
);
1001 TRACE(" ReadData(): %" B_PRIuSIZE
" bytes kept.",
1002 fReceiveQueue
.Available());
1004 // if we are opening the window, check if we should send an ACK
1006 SendAcknowledge(false);
1008 return receivedBytes
;
1013 TCPEndpoint::ReadAvailable()
1015 MutexLocker
locker(fLock
);
1017 TRACE("ReadAvailable(): %" B_PRIdSSIZE
, _AvailableData());
1018 T(APICall(this, "readavailable"));
1020 return _AvailableData();
1025 TCPEndpoint::SetSendBufferSize(size_t length
)
1027 MutexLocker
_(fLock
);
1028 fSendQueue
.SetMaxBytes(length
);
1034 TCPEndpoint::SetReceiveBufferSize(size_t length
)
1036 MutexLocker
_(fLock
);
1037 fReceiveQueue
.SetMaxBytes(length
);
1043 TCPEndpoint::GetOption(int option
, void* _value
, int* _length
)
1045 if (*_length
!= sizeof(int))
1048 int* value
= (int*)_value
;
1052 if ((fOptions
& TCP_NODELAY
) != 0)
1059 *value
= fReceiveMaxSegmentSize
;
1069 TCPEndpoint::SetOption(int option
, const void* _value
, int length
)
1071 if (option
!= TCP_NODELAY
)
1074 if (length
!= sizeof(int))
1077 const int* value
= (const int*)_value
;
1079 MutexLocker
_(fLock
);
1081 fOptions
|= TCP_NODELAY
;
1083 fOptions
&= ~TCP_NODELAY
;
1089 // #pragma mark - misc
1093 TCPEndpoint::IsBound() const
1095 return !LocalAddress().IsEmpty(true);
1100 TCPEndpoint::IsLocal() const
1102 return (fFlags
& FLAG_LOCAL
) != 0;
1107 TCPEndpoint::DelayedAcknowledge()
1109 if (gStackModule
->cancel_timer(&fDelayedAcknowledgeTimer
)) {
1110 // timer was active, send an ACK now (with the exception above,
1111 // we send every other ACK)
1112 T(TimerSet(this, "delayed ack", -1));
1113 return SendAcknowledge(true);
1116 gStackModule
->set_timer(&fDelayedAcknowledgeTimer
,
1117 TCP_DELAYED_ACKNOWLEDGE_TIMEOUT
);
1118 T(TimerSet(this, "delayed ack", TCP_DELAYED_ACKNOWLEDGE_TIMEOUT
));
1124 TCPEndpoint::SendAcknowledge(bool force
)
1126 return _SendQueued(force
, 0);
1131 TCPEndpoint::_StartPersistTimer()
1133 gStackModule
->set_timer(&fPersistTimer
, TCP_PERSIST_TIMEOUT
);
1134 T(TimerSet(this, "persist", TCP_PERSIST_TIMEOUT
));
1139 TCPEndpoint::_EnterTimeWait()
1141 TRACE("_EnterTimeWait()\n");
1143 _CancelConnectionTimers();
1145 if (fState
== TIME_WAIT
&& IsLocal()) {
1146 // we do not use TIME_WAIT state for local connections
1147 fFlags
|= FLAG_DELETE_ON_CLOSE
;
1156 TCPEndpoint::_UpdateTimeWait()
1158 gStackModule
->set_timer(&fTimeWaitTimer
, TCP_MAX_SEGMENT_LIFETIME
<< 1);
1159 T(TimerSet(this, "time-wait", TCP_MAX_SEGMENT_LIFETIME
<< 1));
1164 TCPEndpoint::_CancelConnectionTimers()
1166 gStackModule
->cancel_timer(&fRetransmitTimer
);
1167 T(TimerSet(this, "retransmit", -1));
1168 gStackModule
->cancel_timer(&fPersistTimer
);
1169 T(TimerSet(this, "persist", -1));
1170 gStackModule
->cancel_timer(&fDelayedAcknowledgeTimer
);
1171 T(TimerSet(this, "delayed ack", -1));
1175 /*! Sends the FIN flag to the peer when the connection is still open.
1176 Moves the endpoint to the next state depending on where it was.
1179 TCPEndpoint::_Disconnect(bool closing
)
1181 tcp_state previousState
= fState
;
1183 if (fState
== SYNCHRONIZE_RECEIVED
|| fState
== ESTABLISHED
)
1184 fState
= FINISH_SENT
;
1185 else if (fState
== FINISH_RECEIVED
)
1186 fState
= WAIT_FOR_FINISH_ACKNOWLEDGE
;
1192 status_t status
= _SendQueued();
1193 if (status
!= B_OK
) {
1194 fState
= previousState
;
1204 TCPEndpoint::_MarkEstablished()
1206 fState
= ESTABLISHED
;
1209 if (gSocketModule
->has_parent(socket
)) {
1210 gSocketModule
->set_connected(socket
);
1211 release_sem_etc(fAcceptSemaphore
, 1, B_DO_NOT_RESCHEDULE
);
1214 fSendCondition
.NotifyAll();
1215 gSocketModule
->notify(socket
, B_SELECT_WRITE
, fSendQueue
.Free());
1220 TCPEndpoint::_WaitForEstablished(MutexLocker
&locker
, bigtime_t timeout
)
1222 // TODO: Checking for CLOSED seems correct, but breaks several neon tests.
1223 // When investigating this, also have a look at _Close() and _HandleReset().
1224 while (fState
< ESTABLISHED
/* && fState != CLOSED*/) {
1225 if (socket
->error
!= B_OK
)
1226 return socket
->error
;
1228 status_t status
= _WaitForCondition(fSendCondition
, locker
, timeout
);
1237 // #pragma mark - receive
1241 TCPEndpoint::_Close()
1243 _CancelConnectionTimers();
1247 fFlags
|= FLAG_DELETE_ON_CLOSE
;
1249 fSendCondition
.NotifyAll();
1252 if (gSocketModule
->has_parent(socket
)) {
1253 // We still have a parent - obviously, we haven't been accepted yet,
1254 // so no one could ever close us.
1255 _CancelConnectionTimers();
1256 gSocketModule
->set_aborted(socket
);
1262 TCPEndpoint::_HandleReset(status_t error
)
1264 socket
->error
= error
;
1267 gSocketModule
->notify(socket
, B_SELECT_WRITE
, error
);
1268 gSocketModule
->notify(socket
, B_SELECT_ERROR
, error
);
1273 TCPEndpoint::_DuplicateAcknowledge(tcp_segment_header
&segment
)
1275 if (++fDuplicateAcknowledgeCount
< 3)
1278 if (fDuplicateAcknowledgeCount
== 3) {
1280 fCongestionWindow
= fSlowStartThreshold
+ 3
1281 * fSendMaxSegmentSize
;
1282 fSendNext
= segment
.acknowledge
;
1283 } else if (fDuplicateAcknowledgeCount
> 3)
1284 fCongestionWindow
+= fSendMaxSegmentSize
;
1291 TCPEndpoint::_UpdateTimestamps(tcp_segment_header
& segment
,
1292 size_t segmentLength
)
1294 if (fFlags
& FLAG_OPTION_TIMESTAMP
) {
1295 tcp_sequence
sequence(segment
.sequence
);
1297 if (fLastAcknowledgeSent
>= sequence
1298 && fLastAcknowledgeSent
< (sequence
+ segmentLength
))
1299 fReceivedTimestamp
= segment
.timestamp_value
;
1305 TCPEndpoint::_AvailableData() const
1307 // TODO: Refer to the FLAG_NO_RECEIVE comment above regarding
1308 // the application of FLAG_NO_RECEIVE in listen()ing
1310 if (fState
== LISTEN
)
1311 return gSocketModule
->count_connected(socket
);
1312 if (fState
== SYNCHRONIZE_SENT
)
1315 ssize_t availableData
= fReceiveQueue
.Available();
1317 if (availableData
== 0 && !_ShouldReceive())
1320 return availableData
;
1325 TCPEndpoint::_NotifyReader()
1327 fReceiveCondition
.NotifyAll();
1328 gSocketModule
->notify(socket
, B_SELECT_READ
, _AvailableData());
1333 TCPEndpoint::_AddData(tcp_segment_header
& segment
, net_buffer
* buffer
)
1335 if ((segment
.flags
& TCP_FLAG_FINISH
) != 0) {
1336 // Remember the position of the finish received flag
1337 fFinishReceived
= true;
1338 fFinishReceivedAt
= segment
.sequence
+ buffer
->size
;
1341 fReceiveQueue
.Add(buffer
, segment
.sequence
);
1342 fReceiveNext
= fReceiveQueue
.NextSequence();
1344 if (fFinishReceived
) {
1345 // Set or reset the finish flag on the current segment
1346 if (fReceiveNext
< fFinishReceivedAt
)
1347 segment
.flags
&= ~TCP_FLAG_FINISH
;
1349 segment
.flags
|= TCP_FLAG_FINISH
;
1352 TRACE(" _AddData(): adding data, receive next = %" B_PRIu32
". Now have %"
1353 B_PRIuSIZE
" bytes.", fReceiveNext
.Number(), fReceiveQueue
.Available());
1355 if ((segment
.flags
& TCP_FLAG_PUSH
) != 0)
1356 fReceiveQueue
.SetPushPointer();
1358 return fReceiveQueue
.Available() > 0;
1363 TCPEndpoint::_PrepareReceivePath(tcp_segment_header
& segment
)
1365 fInitialReceiveSequence
= segment
.sequence
;
1366 fFinishReceived
= false;
1368 // count the received SYN
1371 fReceiveNext
= segment
.sequence
;
1372 fReceiveQueue
.SetInitialSequence(segment
.sequence
);
1374 if ((fOptions
& TCP_NOOPT
) == 0) {
1375 if (segment
.max_segment_size
> 0)
1376 fSendMaxSegmentSize
= segment
.max_segment_size
;
1378 if (segment
.options
& TCP_HAS_WINDOW_SCALE
) {
1379 fFlags
|= FLAG_OPTION_WINDOW_SCALE
;
1380 fSendWindowShift
= segment
.window_shift
;
1382 fFlags
&= ~FLAG_OPTION_WINDOW_SCALE
;
1383 fReceiveWindowShift
= 0;
1386 if (segment
.options
& TCP_HAS_TIMESTAMPS
) {
1387 fFlags
|= FLAG_OPTION_TIMESTAMP
;
1388 fReceivedTimestamp
= segment
.timestamp_value
;
1390 fFlags
&= ~FLAG_OPTION_TIMESTAMP
;
1393 fCongestionWindow
= 2 * fSendMaxSegmentSize
;
1394 fSlowStartThreshold
= (uint32
)segment
.advertised_window
<< fSendWindowShift
;
1399 TCPEndpoint::_ShouldReceive() const
1401 if ((fFlags
& FLAG_NO_RECEIVE
) != 0)
1404 return fState
== ESTABLISHED
|| fState
== FINISH_SENT
1405 || fState
== FINISH_ACKNOWLEDGED
;
1410 TCPEndpoint::_Spawn(TCPEndpoint
* parent
, tcp_segment_header
& segment
,
1413 MutexLocker
_(fLock
);
1415 // TODO error checking
1416 ProtocolSocket::Open();
1418 fState
= SYNCHRONIZE_RECEIVED
;
1419 T(Spawn(parent
, this));
1421 fManager
= parent
->fManager
;
1423 LocalAddress().SetTo(buffer
->destination
);
1424 PeerAddress().SetTo(buffer
->source
);
1428 // TODO: proper error handling!
1429 if (fManager
->BindChild(this) != B_OK
) {
1430 T(Error(this, "binding failed", __LINE__
));
1433 if (_PrepareSendPath(*PeerAddress()) != B_OK
) {
1434 T(Error(this, "prepare send faild", __LINE__
));
1438 fOptions
= parent
->fOptions
;
1439 fAcceptSemaphore
= parent
->fAcceptSemaphore
;
1441 _PrepareReceivePath(segment
);
1444 if (_SendQueued() != B_OK
) {
1445 T(Error(this, "sending failed", __LINE__
));
1449 segment
.flags
&= ~TCP_FLAG_SYNCHRONIZE
;
1450 // we handled this flag now, it must not be set for further processing
1452 return _Receive(segment
, buffer
);
1457 TCPEndpoint::_ListenReceive(tcp_segment_header
& segment
, net_buffer
* buffer
)
1459 TRACE("ListenReceive()");
1461 // Essentially, we accept only TCP_FLAG_SYNCHRONIZE in this state,
1462 // but the error behaviour differs
1463 if (segment
.flags
& TCP_FLAG_RESET
)
1465 if (segment
.flags
& TCP_FLAG_ACKNOWLEDGE
)
1466 return DROP
| RESET
;
1467 if ((segment
.flags
& TCP_FLAG_SYNCHRONIZE
) == 0)
1470 // TODO: drop broadcast/multicast
1472 // spawn new endpoint for accept()
1473 net_socket
* newSocket
;
1474 if (gSocketModule
->spawn_pending_socket(socket
, &newSocket
) < B_OK
) {
1475 T(Error(this, "spawning failed", __LINE__
));
1479 return ((TCPEndpoint
*)newSocket
->first_protocol
)->_Spawn(this,
1485 TCPEndpoint::_SynchronizeSentReceive(tcp_segment_header
&segment
,
1488 TRACE("_SynchronizeSentReceive()");
1490 if ((segment
.flags
& TCP_FLAG_ACKNOWLEDGE
) != 0
1491 && (fInitialSendSequence
>= segment
.acknowledge
1492 || fSendMax
< segment
.acknowledge
))
1493 return DROP
| RESET
;
1495 if (segment
.flags
& TCP_FLAG_RESET
) {
1496 _HandleReset(ECONNREFUSED
);
1500 if ((segment
.flags
& TCP_FLAG_SYNCHRONIZE
) == 0)
1503 fSendUnacknowledged
= segment
.acknowledge
;
1504 _PrepareReceivePath(segment
);
1506 if (segment
.flags
& TCP_FLAG_ACKNOWLEDGE
) {
1509 // simultaneous open
1510 fState
= SYNCHRONIZE_RECEIVED
;
1514 segment
.flags
&= ~TCP_FLAG_SYNCHRONIZE
;
1515 // we handled this flag now, it must not be set for further processing
1517 return _Receive(segment
, buffer
) | IMMEDIATE_ACKNOWLEDGE
;
1522 TCPEndpoint::_Receive(tcp_segment_header
& segment
, net_buffer
* buffer
)
1524 uint32 advertisedWindow
= (uint32
)segment
.advertised_window
1525 << fSendWindowShift
;
1526 size_t segmentLength
= buffer
->size
;
1528 // First, handle the most common case for uni-directional data transfer
1529 // (known as header prediction - the segment must not change the window,
1530 // and must be the expected sequence, and contain no control flags)
1532 if (fState
== ESTABLISHED
1533 && segment
.AcknowledgeOnly()
1534 && fReceiveNext
== segment
.sequence
1535 && advertisedWindow
> 0 && advertisedWindow
== fSendWindow
1536 && fSendNext
== fSendMax
) {
1537 _UpdateTimestamps(segment
, segmentLength
);
1539 if (segmentLength
== 0) {
1540 // this is a pure acknowledge segment - we're on the sending end
1541 if (fSendUnacknowledged
< segment
.acknowledge
1542 && fSendMax
>= segment
.acknowledge
) {
1543 _Acknowledged(segment
);
1546 } else if (segment
.acknowledge
== fSendUnacknowledged
1547 && fReceiveQueue
.IsContiguous()
1548 && fReceiveQueue
.Free() >= segmentLength
1549 && (fFlags
& FLAG_NO_RECEIVE
) == 0) {
1550 if (_AddData(segment
, buffer
))
1553 return KEEP
| ((segment
.flags
& TCP_FLAG_PUSH
) != 0
1554 ? IMMEDIATE_ACKNOWLEDGE
: ACKNOWLEDGE
);
1558 // The fast path was not applicable, so we continue with the standard
1559 // processing of the incoming segment
1561 ASSERT(fState
!= SYNCHRONIZE_SENT
&& fState
!= LISTEN
);
1563 if (fState
!= CLOSED
&& fState
!= TIME_WAIT
) {
1564 // Check sequence number
1565 if (!segment_in_sequence(segment
, segmentLength
, fReceiveNext
,
1567 TRACE(" Receive(): segment out of window, next: %" B_PRIu32
1568 " wnd: %" B_PRIu32
, fReceiveNext
.Number(), fReceiveWindow
);
1569 if ((segment
.flags
& TCP_FLAG_RESET
) != 0) {
1570 // TODO: this doesn't look right - review!
1573 return DROP
| IMMEDIATE_ACKNOWLEDGE
;
1577 if ((segment
.flags
& TCP_FLAG_RESET
) != 0) {
1578 // Is this a valid reset?
1579 // We generally ignore resets in time wait state (see RFC 1337)
1580 if (fLastAcknowledgeSent
<= segment
.sequence
1581 && tcp_sequence(segment
.sequence
) < (fLastAcknowledgeSent
1583 && fState
!= TIME_WAIT
) {
1585 if (fState
== SYNCHRONIZE_RECEIVED
)
1586 error
= ECONNREFUSED
;
1587 else if (fState
== CLOSING
|| fState
== WAIT_FOR_FINISH_ACKNOWLEDGE
)
1592 _HandleReset(error
);
1598 if ((segment
.flags
& TCP_FLAG_SYNCHRONIZE
) != 0
1599 || (fState
== SYNCHRONIZE_RECEIVED
1600 && (fInitialReceiveSequence
> segment
.sequence
1601 || ((segment
.flags
& TCP_FLAG_ACKNOWLEDGE
) != 0
1602 && (fSendUnacknowledged
> segment
.acknowledge
1603 || fSendMax
< segment
.acknowledge
))))) {
1604 // reset the connection - either the initial SYN was faulty, or we
1605 // received a SYN within the data stream
1606 return DROP
| RESET
;
1609 // TODO: Check this! Why do we advertize a window outside of what we should
1611 fReceiveWindow
= max_c(fReceiveQueue
.Free(), fReceiveWindow
);
1612 // the window must not shrink
1614 // trim buffer to be within the receive window
1615 int32 drop
= (int32
)(fReceiveNext
- segment
.sequence
).Number();
1617 if ((uint32
)drop
> buffer
->size
1618 || ((uint32
)drop
== buffer
->size
1619 && (segment
.flags
& TCP_FLAG_FINISH
) == 0)) {
1620 // don't accidently remove a FIN we shouldn't remove
1621 segment
.flags
&= ~TCP_FLAG_FINISH
;
1622 drop
= buffer
->size
;
1625 // remove duplicate data at the start
1626 TRACE("* remove %" B_PRId32
" bytes from the start", drop
);
1627 gBufferModule
->remove_header(buffer
, drop
);
1628 segment
.sequence
+= drop
;
1631 int32 action
= KEEP
;
1633 drop
= (int32
)(segment
.sequence
+ buffer
->size
1634 - (fReceiveNext
+ fReceiveWindow
)).Number();
1636 // remove data exceeding our window
1637 if ((uint32
)drop
>= buffer
->size
) {
1638 // if we can accept data, or the segment is not what we'd expect,
1639 // drop the segment (an immediate acknowledge is always triggered)
1640 if (fReceiveWindow
!= 0 || segment
.sequence
!= fReceiveNext
)
1641 return DROP
| IMMEDIATE_ACKNOWLEDGE
;
1643 action
|= IMMEDIATE_ACKNOWLEDGE
;
1646 if ((segment
.flags
& TCP_FLAG_FINISH
) != 0) {
1647 // we need to remove the finish, too, as part of the data
1651 segment
.flags
&= ~(TCP_FLAG_FINISH
| TCP_FLAG_PUSH
);
1652 TRACE("* remove %" B_PRId32
" bytes from the end", drop
);
1653 gBufferModule
->remove_trailer(buffer
, drop
);
1657 if (advertisedWindow
> fSendWindow
) {
1658 TRACE(" Receive(): Window update %" B_PRIu32
" -> %" B_PRIu32
,
1659 fSendWindow
, advertisedWindow
);
1663 fSendWindow
= advertisedWindow
;
1664 if (advertisedWindow
> fSendMaxWindow
)
1665 fSendMaxWindow
= advertisedWindow
;
1667 // Then look at the acknowledgement for any updates
1669 if ((segment
.flags
& TCP_FLAG_ACKNOWLEDGE
) != 0) {
1670 // process acknowledged data
1671 if (fState
== SYNCHRONIZE_RECEIVED
)
1674 if (fSendMax
< segment
.acknowledge
)
1675 return DROP
| IMMEDIATE_ACKNOWLEDGE
;
1677 if (segment
.acknowledge
< fSendUnacknowledged
) {
1678 if (buffer
->size
== 0 && advertisedWindow
== fSendWindow
1679 && (segment
.flags
& TCP_FLAG_FINISH
) == 0) {
1680 TRACE("Receive(): duplicate ack!");
1682 _DuplicateAcknowledge(segment
);
1687 // this segment acknowledges in flight data
1689 if (fDuplicateAcknowledgeCount
>= 3) {
1690 // deflate the window.
1691 fCongestionWindow
= fSlowStartThreshold
;
1694 fDuplicateAcknowledgeCount
= 0;
1696 if (fSendMax
== segment
.acknowledge
)
1697 TRACE("Receive(): all inflight data ack'd!");
1699 if (segment
.acknowledge
> fSendQueue
.LastSequence()
1700 && fState
> ESTABLISHED
) {
1701 TRACE("Receive(): FIN has been acknowledged!");
1705 fState
= FINISH_ACKNOWLEDGED
;
1713 case WAIT_FOR_FINISH_ACKNOWLEDGE
:
1722 if (fState
!= CLOSED
)
1723 _Acknowledged(segment
);
1727 if (segment
.flags
& TCP_FLAG_URGENT
) {
1728 if (fState
== ESTABLISHED
|| fState
== FINISH_SENT
1729 || fState
== FINISH_ACKNOWLEDGED
) {
1730 // TODO: Handle urgent data:
1731 // - RCV.UP <- max(RCV.UP, SEG.UP)
1732 // - signal the user that urgent data is available (SIGURG)
1736 bool notify
= false;
1738 // The buffer may be freed if its data is added to the queue, so cache
1739 // the size as we still need it later.
1740 uint32 bufferSize
= buffer
->size
;
1742 if ((bufferSize
> 0 || (segment
.flags
& TCP_FLAG_FINISH
) != 0)
1743 && _ShouldReceive())
1744 notify
= _AddData(segment
, buffer
);
1746 if ((fFlags
& FLAG_NO_RECEIVE
) != 0)
1747 fReceiveNext
+= buffer
->size
;
1749 action
= (action
& ~KEEP
) | DROP
;
1752 if ((segment
.flags
& TCP_FLAG_FINISH
) != 0) {
1754 if (fState
!= CLOSED
&& fState
!= LISTEN
&& fState
!= SYNCHRONIZE_SENT
) {
1755 TRACE("Receive(): peer is finishing connection!");
1760 fReceiveQueue
.SetPushPointer();
1762 // we'll reply immediately to the FIN if we are not
1763 // transitioning to TIME WAIT so we immediatly ACK it.
1764 action
|= IMMEDIATE_ACKNOWLEDGE
;
1766 // other side is closing connection; change states
1769 case SYNCHRONIZE_RECEIVED
:
1770 fState
= FINISH_RECEIVED
;
1774 // simultaneous close
1778 case FINISH_ACKNOWLEDGED
:
1796 if (bufferSize
> 0 || (segment
.flags
& TCP_FLAG_SYNCHRONIZE
) != 0)
1797 action
|= ACKNOWLEDGE
;
1799 _UpdateTimestamps(segment
, segmentLength
);
1801 TRACE("Receive() Action %" B_PRId32
, action
);
1808 TCPEndpoint::SegmentReceived(tcp_segment_header
& segment
, net_buffer
* buffer
)
1810 MutexLocker
locker(fLock
);
1812 TRACE("SegmentReceived(): buffer %p (%" B_PRIu32
" bytes) address %s "
1813 "to %s flags %#" B_PRIx8
", seq %" B_PRIu32
", ack %" B_PRIu32
1814 ", wnd %" B_PRIu32
, buffer
, buffer
->size
, PrintAddress(buffer
->source
),
1815 PrintAddress(buffer
->destination
), segment
.flags
, segment
.sequence
,
1816 segment
.acknowledge
,
1817 (uint32
)segment
.advertised_window
<< fSendWindowShift
);
1818 T(Receive(this, segment
,
1819 (uint32
)segment
.advertised_window
<< fSendWindowShift
, buffer
));
1820 int32 segmentAction
= DROP
;
1824 segmentAction
= _ListenReceive(segment
, buffer
);
1827 case SYNCHRONIZE_SENT
:
1828 segmentAction
= _SynchronizeSentReceive(segment
, buffer
);
1831 case SYNCHRONIZE_RECEIVED
:
1833 case FINISH_RECEIVED
:
1834 case WAIT_FOR_FINISH_ACKNOWLEDGE
:
1836 case FINISH_ACKNOWLEDGED
:
1840 segmentAction
= _Receive(segment
, buffer
);
1844 // process acknowledge action as asked for by the *Receive() method
1845 if (segmentAction
& IMMEDIATE_ACKNOWLEDGE
)
1846 SendAcknowledge(true);
1847 else if (segmentAction
& ACKNOWLEDGE
)
1848 DelayedAcknowledge();
1850 if ((fFlags
& (FLAG_CLOSED
| FLAG_DELETE_ON_CLOSE
))
1851 == (FLAG_CLOSED
| FLAG_DELETE_ON_CLOSE
)) {
1853 gSocketModule
->release_socket(socket
);
1856 return segmentAction
;
1860 // #pragma mark - send
1864 TCPEndpoint::_CurrentFlags()
1866 // we don't set FLAG_FINISH here, instead we do it
1867 // conditionally below depending if we are sending
1868 // the last bytes of the send queue.
1872 return TCP_FLAG_RESET
| TCP_FLAG_ACKNOWLEDGE
;
1874 case SYNCHRONIZE_SENT
:
1875 return TCP_FLAG_SYNCHRONIZE
;
1876 case SYNCHRONIZE_RECEIVED
:
1877 return TCP_FLAG_SYNCHRONIZE
| TCP_FLAG_ACKNOWLEDGE
;
1880 case FINISH_RECEIVED
:
1881 case FINISH_ACKNOWLEDGED
:
1883 case WAIT_FOR_FINISH_ACKNOWLEDGE
:
1886 return TCP_FLAG_ACKNOWLEDGE
;
1895 TCPEndpoint::_ShouldSendSegment(tcp_segment_header
& segment
, uint32 length
,
1896 uint32 segmentMaxSize
, uint32 flightSize
)
1899 // Avoid the silly window syndrome - we only send a segment in case:
1900 // - we have a full segment to send, or
1901 // - we're at the end of our buffer queue, or
1902 // - the buffer is at least larger than half of the maximum send window,
1904 // - we're retransmitting data
1905 if (length
== segmentMaxSize
1906 || (fOptions
& TCP_NODELAY
) != 0
1907 || tcp_sequence(fSendNext
+ length
) == fSendQueue
.LastSequence()
1908 || (fSendMaxWindow
> 0 && length
>= fSendMaxWindow
/ 2))
1912 // check if we need to send a window update to the peer
1913 if (segment
.advertised_window
> 0) {
1914 // correct the window to take into account what already has been advertised
1915 uint32 window
= (segment
.advertised_window
<< fReceiveWindowShift
)
1916 - (fReceiveMaxAdvertised
- fReceiveNext
).Number();
1918 // if we can advertise a window larger than twice the maximum segment
1919 // size, or half the maximum buffer size we send a window update
1920 if (window
>= (fReceiveMaxSegmentSize
<< 1)
1921 || window
>= (socket
->receive
.buffer_size
>> 1))
1925 if ((segment
.flags
& (TCP_FLAG_SYNCHRONIZE
| TCP_FLAG_FINISH
1926 | TCP_FLAG_RESET
)) != 0)
1929 // We do have urgent data pending
1930 if (fSendUrgentOffset
> fSendNext
)
1933 // there is no reason to send a segment just now
1939 TCPEndpoint::_SendQueued(bool force
)
1941 return _SendQueued(force
, fSendWindow
);
1945 /*! Sends one or more TCP segments with the data waiting in the queue, or some
1946 specific flags that need to be sent.
1949 TCPEndpoint::_SendQueued(bool force
, uint32 sendWindow
)
1954 // in passive state?
1955 if (fState
== LISTEN
)
1958 tcp_segment_header
segment(_CurrentFlags());
1960 if ((fOptions
& TCP_NOOPT
) == 0) {
1961 if ((fFlags
& FLAG_OPTION_TIMESTAMP
) != 0) {
1962 segment
.options
|= TCP_HAS_TIMESTAMPS
;
1963 segment
.timestamp_reply
= fReceivedTimestamp
;
1964 segment
.timestamp_value
= tcp_now();
1967 if ((segment
.flags
& TCP_FLAG_SYNCHRONIZE
) != 0
1968 && fSendNext
== fInitialSendSequence
) {
1969 // add connection establishment options
1970 segment
.max_segment_size
= fReceiveMaxSegmentSize
;
1971 if (fFlags
& FLAG_OPTION_WINDOW_SCALE
) {
1972 segment
.options
|= TCP_HAS_WINDOW_SCALE
;
1973 segment
.window_shift
= fReceiveWindowShift
;
1978 size_t availableBytes
= fReceiveQueue
.Free();
1979 if (fFlags
& FLAG_OPTION_WINDOW_SCALE
)
1980 segment
.advertised_window
= availableBytes
>> fReceiveWindowShift
;
1982 segment
.advertised_window
= min_c(TCP_MAX_WINDOW
, availableBytes
);
1984 segment
.acknowledge
= fReceiveNext
.Number();
1986 // Process urgent data
1987 if (fSendUrgentOffset
> fSendNext
) {
1988 segment
.flags
|= TCP_FLAG_URGENT
;
1989 segment
.urgent_offset
= (fSendUrgentOffset
- fSendNext
).Number();
1991 fSendUrgentOffset
= fSendUnacknowledged
.Number();
1992 // Keep urgent offset updated, so that it doesn't reach into our
1993 // send window on overlap
1994 segment
.urgent_offset
= 0;
1997 if (fCongestionWindow
> 0 && fCongestionWindow
< sendWindow
)
1998 sendWindow
= fCongestionWindow
;
2000 // fSendUnacknowledged
2001 // | fSendNext fSendMax
2004 // -----------------------------------
2005 // | effective window |
2006 // -----------------------------------
2008 // Flight size represents the window of data which is currently in the
2009 // ether. We should never send data such as the flight size becomes larger
2010 // than the effective window. Note however that the effective window may be
2011 // reduced (by congestion for instance), so at some point in time flight
2012 // size may be larger than the currently calculated window.
2014 uint32 flightSize
= (fSendMax
- fSendUnacknowledged
).Number();
2015 uint32 consumedWindow
= (fSendNext
- fSendUnacknowledged
).Number();
2017 if (consumedWindow
> sendWindow
) {
2019 // TODO: enter persist state? try to get a window update.
2021 sendWindow
-= consumedWindow
;
2023 if (force
&& sendWindow
== 0 && fSendNext
<= fSendQueue
.LastSequence()) {
2024 // send one byte of data to ask for a window update
2025 // (triggered by the persist timer)
2029 uint32 length
= min_c(fSendQueue
.Available(fSendNext
), sendWindow
);
2030 bool shouldStartRetransmitTimer
= fSendNext
== fSendUnacknowledged
;
2031 bool retransmit
= fSendNext
< fSendMax
;
2034 uint32 segmentMaxSize
= fSendMaxSegmentSize
2035 - tcp_options_length(segment
);
2036 uint32 segmentLength
= min_c(length
, segmentMaxSize
);
2038 if (fSendNext
+ segmentLength
== fSendQueue
.LastSequence()) {
2039 if (state_needs_finish(fState
))
2040 segment
.flags
|= TCP_FLAG_FINISH
;
2042 segment
.flags
|= TCP_FLAG_PUSH
;
2045 // Determine if we should really send this segment
2046 if (!force
&& !retransmit
&& !_ShouldSendSegment(segment
, segmentLength
,
2047 segmentMaxSize
, flightSize
)) {
2048 if (fSendQueue
.Available()
2049 && !gStackModule
->is_timer_active(&fPersistTimer
)
2050 && !gStackModule
->is_timer_active(&fRetransmitTimer
))
2051 _StartPersistTimer();
2055 net_buffer
*buffer
= gBufferModule
->create(256);
2059 status_t status
= B_OK
;
2060 if (segmentLength
> 0)
2061 status
= fSendQueue
.Get(buffer
, fSendNext
, segmentLength
);
2062 if (status
< B_OK
) {
2063 gBufferModule
->free(buffer
);
2067 LocalAddress().CopyTo(buffer
->source
);
2068 PeerAddress().CopyTo(buffer
->destination
);
2070 uint32 size
= buffer
->size
;
2071 segment
.sequence
= fSendNext
.Number();
2073 TRACE("SendQueued(): buffer %p (%" B_PRIu32
" bytes) address %s to "
2074 "%s flags %#" B_PRIx8
", seq %" B_PRIu32
", ack %" B_PRIu32
2075 ", rwnd %" B_PRIu16
", cwnd %" B_PRIu32
", ssthresh %" B_PRIu32
2076 ", len %" B_PRIu32
", first %" B_PRIu32
", last %" B_PRIu32
,
2077 buffer
, buffer
->size
, PrintAddress(buffer
->source
),
2078 PrintAddress(buffer
->destination
), segment
.flags
, segment
.sequence
,
2079 segment
.acknowledge
, segment
.advertised_window
,
2080 fCongestionWindow
, fSlowStartThreshold
, segmentLength
,
2081 fSendQueue
.FirstSequence().Number(),
2082 fSendQueue
.LastSequence().Number());
2083 T(Send(this, segment
, buffer
, fSendQueue
.FirstSequence(),
2084 fSendQueue
.LastSequence()));
2086 PROBE(buffer
, sendWindow
);
2087 sendWindow
-= buffer
->size
;
2089 status
= add_tcp_header(AddressModule(), segment
, buffer
);
2090 if (status
!= B_OK
) {
2091 gBufferModule
->free(buffer
);
2095 // Update send status - we need to do this before we send the data
2096 // for local connections as the answer is directly handled
2098 if (segment
.flags
& TCP_FLAG_SYNCHRONIZE
) {
2099 segment
.options
&= ~TCP_HAS_WINDOW_SCALE
;
2100 segment
.max_segment_size
= 0;
2104 if (segment
.flags
& TCP_FLAG_FINISH
)
2107 uint32 sendMax
= fSendMax
.Number();
2109 if (fSendMax
< fSendNext
)
2110 fSendMax
= fSendNext
;
2112 fReceiveMaxAdvertised
= fReceiveNext
2113 + ((uint32
)segment
.advertised_window
<< fReceiveWindowShift
);
2115 status
= next
->module
->send_routed_data(next
, fRoute
, buffer
);
2116 if (status
< B_OK
) {
2117 gBufferModule
->free(buffer
);
2119 fSendNext
= segment
.sequence
;
2121 // restore send status
2125 if (shouldStartRetransmitTimer
&& size
> 0) {
2126 TRACE("starting initial retransmit timer of: %" B_PRIdBIGTIME
,
2127 fRetransmitTimeout
);
2128 gStackModule
->set_timer(&fRetransmitTimer
, fRetransmitTimeout
);
2129 T(TimerSet(this, "retransmit", fRetransmitTimeout
));
2130 shouldStartRetransmitTimer
= false;
2133 if (segment
.flags
& TCP_FLAG_ACKNOWLEDGE
)
2134 fLastAcknowledgeSent
= segment
.acknowledge
;
2136 length
-= segmentLength
;
2137 segment
.flags
&= ~(TCP_FLAG_SYNCHRONIZE
| TCP_FLAG_RESET
2143 } while (length
> 0);
2150 TCPEndpoint::_MaxSegmentSize(const sockaddr
* address
) const
2152 return next
->module
->get_mtu(next
, address
) - sizeof(tcp_header
);
2157 TCPEndpoint::_PrepareSendPath(const sockaddr
* peer
)
2159 if (fRoute
== NULL
) {
2160 fRoute
= gDatalinkModule
->get_route(Domain(), peer
);
2164 if ((fRoute
->flags
& RTF_LOCAL
) != 0)
2165 fFlags
|= FLAG_LOCAL
;
2168 // make sure connection does not already exist
2169 status_t status
= fManager
->SetConnection(this, *LocalAddress(), peer
,
2170 fRoute
->interface_address
->local
);
2174 fInitialSendSequence
= system_time() >> 4;
2175 fSendNext
= fInitialSendSequence
;
2176 fSendUnacknowledged
= fInitialSendSequence
;
2177 fSendMax
= fInitialSendSequence
;
2178 fSendUrgentOffset
= fInitialSendSequence
;
2180 // we are counting the SYN here
2181 fSendQueue
.SetInitialSequence(fSendNext
+ 1);
2183 fReceiveMaxSegmentSize
= _MaxSegmentSize(peer
);
2185 // Compute the window shift we advertise to our peer - if it doesn't support
2186 // this option, this will be reset to 0 (when its SYN is received)
2187 fReceiveWindowShift
= 0;
2188 while (fReceiveWindowShift
< TCP_MAX_WINDOW_SHIFT
2189 && (0xffffUL
<< fReceiveWindowShift
) < socket
->receive
.buffer_size
) {
2190 fReceiveWindowShift
++;
2198 TCPEndpoint::_Acknowledged(tcp_segment_header
& segment
)
2200 TRACE("_Acknowledged(): ack %" B_PRIu32
"; uack %" B_PRIu32
"; next %"
2201 B_PRIu32
"; max %" B_PRIu32
, segment
.acknowledge
,
2202 fSendUnacknowledged
.Number(), fSendNext
.Number(), fSendMax
.Number());
2204 ASSERT(fSendUnacknowledged
<= segment
.acknowledge
);
2206 if (fSendUnacknowledged
< segment
.acknowledge
) {
2207 fSendQueue
.RemoveUntil(segment
.acknowledge
);
2208 fSendUnacknowledged
= segment
.acknowledge
;
2209 if (fSendNext
< fSendUnacknowledged
)
2210 fSendNext
= fSendUnacknowledged
;
2212 if (segment
.options
& TCP_HAS_TIMESTAMPS
)
2213 _UpdateRoundTripTime(tcp_diff_timestamp(segment
.timestamp_reply
));
2215 // TODO: Fallback to RFC 793 type estimation; This just resets
2216 // any potential exponential back off that happened due to
2218 fRetransmitTimeout
= TCP_INITIAL_RTT
;
2221 if (fSendUnacknowledged
== fSendMax
) {
2222 TRACE("all acknowledged, cancelling retransmission timer");
2223 gStackModule
->cancel_timer(&fRetransmitTimer
);
2224 T(TimerSet(this, "retransmit", -1));
2226 TRACE("data acknowledged, resetting retransmission timer to: %"
2227 B_PRIdBIGTIME
, fRetransmitTimeout
);
2228 gStackModule
->set_timer(&fRetransmitTimer
, fRetransmitTimeout
);
2229 T(TimerSet(this, "retransmit", fRetransmitTimeout
));
2232 if (is_writable(fState
)) {
2233 // notify threads waiting on the socket to become writable again
2234 fSendCondition
.NotifyAll();
2235 gSocketModule
->notify(socket
, B_SELECT_WRITE
, fSendQueue
.Free());
2238 if (fCongestionWindow
< fSlowStartThreshold
)
2239 fCongestionWindow
+= fSendMaxSegmentSize
;
2242 if (fCongestionWindow
>= fSlowStartThreshold
) {
2243 uint32 increment
= fSendMaxSegmentSize
* fSendMaxSegmentSize
;
2245 if (increment
< fCongestionWindow
)
2248 increment
/= fCongestionWindow
;
2250 fCongestionWindow
+= increment
;
2253 // if there is data left to be send, send it now
2254 if (fSendQueue
.Used() > 0)
2260 TCPEndpoint::_Retransmit()
2262 TRACE("Retransmit()");
2265 fSendNext
= fSendUnacknowledged
;
2267 // Do exponential back off of the retransmit timeout
2268 fRetransmitTimeout
*= 2;
2269 if (fRetransmitTimeout
> TCP_MAX_RETRANSMIT_TIMEOUT
)
2270 fRetransmitTimeout
= TCP_MAX_RETRANSMIT_TIMEOUT
;
2277 TCPEndpoint::_UpdateRoundTripTime(int32 roundTripTime
)
2279 int32 rtt
= roundTripTime
;
2281 // "smooth" round trip time as per Van Jacobson
2282 rtt
-= fRoundTripTime
/ 8;
2283 fRoundTripTime
+= rtt
;
2286 rtt
-= fRoundTripDeviation
/ 4;
2287 fRoundTripDeviation
+= rtt
;
2289 fRetransmitTimeout
= ((fRoundTripTime
/ 4 + fRoundTripDeviation
) / 2)
2291 if (fRetransmitTimeout
< TCP_MIN_RETRANSMIT_TIMEOUT
)
2292 fRetransmitTimeout
= TCP_MIN_RETRANSMIT_TIMEOUT
;
2294 TRACE(" RTO is now %" B_PRIdBIGTIME
" (after rtt %" B_PRId32
"ms)",
2295 fRetransmitTimeout
, roundTripTime
);
2300 TCPEndpoint::_ResetSlowStart()
2302 fSlowStartThreshold
= max_c((fSendMax
- fSendUnacknowledged
).Number() / 2,
2303 2 * fSendMaxSegmentSize
);
2304 fCongestionWindow
= fSendMaxSegmentSize
;
2308 // #pragma mark - timer
2312 TCPEndpoint::_RetransmitTimer(net_timer
* timer
, void* _endpoint
)
2314 TCPEndpoint
* endpoint
= (TCPEndpoint
*)_endpoint
;
2315 T(TimerTriggered(endpoint
, "retransmit"));
2317 MutexLocker
locker(endpoint
->fLock
);
2318 if (!locker
.IsLocked())
2321 endpoint
->_Retransmit();
2326 TCPEndpoint::_PersistTimer(net_timer
* timer
, void* _endpoint
)
2328 TCPEndpoint
* endpoint
= (TCPEndpoint
*)_endpoint
;
2329 T(TimerTriggered(endpoint
, "persist"));
2331 MutexLocker
locker(endpoint
->fLock
);
2332 if (!locker
.IsLocked())
2335 // the timer might not have been canceled early enough
2336 if (endpoint
->State() == CLOSED
)
2339 endpoint
->_SendQueued(true);
2344 TCPEndpoint::_DelayedAcknowledgeTimer(net_timer
* timer
, void* _endpoint
)
2346 TCPEndpoint
* endpoint
= (TCPEndpoint
*)_endpoint
;
2347 T(TimerTriggered(endpoint
, "delayed ack"));
2349 MutexLocker
locker(endpoint
->fLock
);
2350 if (!locker
.IsLocked())
2353 // the timer might not have been canceled early enough
2354 if (endpoint
->State() == CLOSED
)
2357 endpoint
->SendAcknowledge(true);
2362 TCPEndpoint::_TimeWaitTimer(net_timer
* timer
, void* _endpoint
)
2364 TCPEndpoint
* endpoint
= (TCPEndpoint
*)_endpoint
;
2365 T(TimerTriggered(endpoint
, "time-wait"));
2367 MutexLocker
locker(endpoint
->fLock
);
2368 if (!locker
.IsLocked())
2371 if ((endpoint
->fFlags
& FLAG_CLOSED
) == 0) {
2372 endpoint
->fFlags
|= FLAG_DELETE_ON_CLOSE
;
2378 gSocketModule
->release_socket(endpoint
->socket
);
2383 TCPEndpoint::_WaitForCondition(ConditionVariable
& condition
,
2384 MutexLocker
& locker
, bigtime_t timeout
)
2386 ConditionVariableEntry entry
;
2387 condition
.Add(&entry
);
2390 status_t result
= entry
.Wait(B_ABSOLUTE_TIMEOUT
| B_CAN_INTERRUPT
, timeout
);
2401 TCPEndpoint::Dump() const
2403 kprintf("TCP endpoint %p\n", this);
2404 kprintf(" state: %s\n", name_for_state(fState
));
2405 kprintf(" flags: 0x%" B_PRIx32
"\n", fFlags
);
2407 kprintf(" lock: { %p, holder: %" B_PRId32
" }\n", &fLock
, fLock
.holder
);
2409 kprintf(" accept sem: %" B_PRId32
"\n", fAcceptSemaphore
);
2410 kprintf(" options: 0x%" B_PRIx32
"\n", (uint32
)fOptions
);
2412 kprintf(" window shift: %" B_PRIu8
"\n", fSendWindowShift
);
2413 kprintf(" unacknowledged: %" B_PRIu32
"\n",
2414 fSendUnacknowledged
.Number());
2415 kprintf(" next: %" B_PRIu32
"\n", fSendNext
.Number());
2416 kprintf(" max: %" B_PRIu32
"\n", fSendMax
.Number());
2417 kprintf(" urgent offset: %" B_PRIu32
"\n", fSendUrgentOffset
.Number());
2418 kprintf(" window: %" B_PRIu32
"\n", fSendWindow
);
2419 kprintf(" max window: %" B_PRIu32
"\n", fSendMaxWindow
);
2420 kprintf(" max segment size: %" B_PRIu32
"\n", fSendMaxSegmentSize
);
2421 kprintf(" queue: %" B_PRIuSIZE
" / %" B_PRIuSIZE
"\n", fSendQueue
.Used(),
2423 #if DEBUG_BUFFER_QUEUE
2426 kprintf(" last acknowledge sent: %" B_PRIu32
"\n",
2427 fLastAcknowledgeSent
.Number());
2428 kprintf(" initial sequence: %" B_PRIu32
"\n",
2429 fInitialSendSequence
.Number());
2430 kprintf(" receive\n");
2431 kprintf(" window shift: %" B_PRIu8
"\n", fReceiveWindowShift
);
2432 kprintf(" next: %" B_PRIu32
"\n", fReceiveNext
.Number());
2433 kprintf(" max advertised: %" B_PRIu32
"\n",
2434 fReceiveMaxAdvertised
.Number());
2435 kprintf(" window: %" B_PRIu32
"\n", fReceiveWindow
);
2436 kprintf(" max segment size: %" B_PRIu32
"\n", fReceiveMaxSegmentSize
);
2437 kprintf(" queue: %" B_PRIuSIZE
" / %" B_PRIuSIZE
"\n",
2438 fReceiveQueue
.Available(), fReceiveQueue
.Size());
2439 #if DEBUG_BUFFER_QUEUE
2440 fReceiveQueue
.Dump();
2442 kprintf(" initial sequence: %" B_PRIu32
"\n",
2443 fInitialReceiveSequence
.Number());
2444 kprintf(" duplicate acknowledge count: %" B_PRIu32
"\n",
2445 fDuplicateAcknowledgeCount
);
2446 kprintf(" round trip time: %" B_PRId32
" (deviation %" B_PRId32
")\n",
2447 fRoundTripTime
, fRoundTripDeviation
);
2448 kprintf(" retransmit timeout: %" B_PRId64
"\n", fRetransmitTimeout
);
2449 kprintf(" congestion window: %" B_PRIu32
"\n", fCongestionWindow
);
2450 kprintf(" slow start threshold: %" B_PRIu32
"\n", fSlowStartThreshold
);