9 #include <limits.h> /* for UINT_MAX */
14 #include "templates.h"
17 #include "win32_inet_ntop.h"
19 /* newer versions of MSVC define these in errno.h */
21 #define ECONNRESET WSAECONNRESET
22 #define EMSGSIZE WSAEMSGSIZE
23 #define ECONNREFUSED WSAECONNREFUSED
24 #define ETIMEDOUT WSAETIMEDOUT
29 typedef struct sockaddr_storage SOCKADDR_STORAGE
;
32 /* number of bytes to increase max window size by, per RTT. This is
33 scaled down linearly proportional to off_target. i.e. if all packets
34 in one window have 0 delay, window size will increase by this number.
35 Typically it's less. TCP increases one MSS per RTT, which is 1500 */
36 #define MAX_CWND_INCREASE_BYTES_PER_RTT 3000
37 #define CUR_DELAY_SIZE 3
38 /* experiments suggest that a clock skew of 10 ms per 325 seconds
39 is not impossible. Reset delay_base every 13 minutes. The clock
40 skew is dealt with by observing the delay base in the other
41 direction, and adjusting our own upwards if the opposite direction
42 delay base keeps going down */
43 #define DELAY_BASE_HISTORY 13
44 #define MAX_WINDOW_DECAY 100 /* ms */
46 #define REORDER_BUFFER_SIZE 32
47 #define REORDER_BUFFER_MAX_SIZE 511
48 #define OUTGOING_BUFFER_MAX_SIZE 511
50 #define PACKET_SIZE 350
52 /* this is the minimum max_window value. It can never drop below this */
53 #define MIN_WINDOW_SIZE 10
55 /* when window sizes are smaller than one packet_size, this
56 will pace the packets to average at the given window size
57 if it's not set, it will simply not send anything until
59 #define USE_PACKET_PACING 1
61 /* if we receive 4 or more duplicate acks, we resend the packet
62 that hasn't been acked yet */
63 #define DUPLICATE_ACKS_BEFORE_RESEND 3
65 #define DELAYED_ACK_BYTE_THRESHOLD 2400 /* bytes */
66 #define DELAYED_ACK_TIME_THRESHOLD 100 /* milliseconds */
68 #define RST_INFO_TIMEOUT 10000
69 #define RST_INFO_LIMIT 1000
70 /* 29 seconds determined from measuring many home NAT devices */
71 #define KEEPALIVE_INTERVAL 29000
74 #define SEQ_NR_MASK 0xFFFF
75 #define ACK_NR_MASK 0xFFFF
77 #define DIV_ROUND_UP(num, denom) ((num + denom - 1) / denom)
79 #include "utp_utils.h"
80 #include "utp_config.h"
82 #define LOG_UTP if (g_log_utp) utp_log
83 #define LOG_UTPV if (g_log_utp_verbose) utp_log
87 /* The totals are derived from the following data:
88 45: IPv6 address including embedded IPv4 address
90 2: Brackets around IPv6 address when port is present
91 6: Port (including colon)
92 1: Terminating null byte */
95 #define addrfmt(x, s) psa_fmt(x, s, sizeof(s))
97 #if (defined(__SVR4) && defined(__sun))
103 /*----------------------------------------------------------------------------*/
104 /* packed socket address */
105 union PACKED_ATTRIBUTE psa_in
{
106 /* The values are always stored here in network byte order */
107 byte _in6
[16]; /* IPv6 */
108 uint16 _in6w
[8]; /* IPv6, word based (for convenience) */
109 uint32 _in6d
[4]; /* Dword access */
110 struct in6_addr _in6addr
; /* For convenience */
113 struct PACKED_ATTRIBUTE
ALIGNED_ATTRIBUTE(4) psa
{
115 /* Host byte order */
119 static byte
psa_get_family(struct psa
*psa
)
121 return (IN6_IS_ADDR_V4MAPPED(&psa
->_in
._in6addr
) != 0) ? AF_INET
: AF_INET6
;
124 static bool psa_is_equal(struct psa
*lhs
, struct psa
*rhs
)
128 if (lhs
->_port
!= rhs
->_port
)
130 return memcmp(&lhs
->_in
._in6
[0], &rhs
->_in
._in6
[0], sizeof(lhs
->_in
._in6
)) == 0;
133 static bool psa_is_not_equal(struct psa
*lhs
, struct psa
*rhs
)
135 return !(psa_is_equal(lhs
, rhs
));
138 static void psa_init(struct psa
*psa
, SOCKADDR_STORAGE
*sa
, socklen_t len
)
140 if (sa
->ss_family
== AF_INET
) {
141 struct sockaddr_in
*sin
;
143 assert(len
>= sizeof(struct sockaddr_in
));
145 psa
->_in
._in6w
[0] = 0;
146 psa
->_in
._in6w
[1] = 0;
147 psa
->_in
._in6w
[2] = 0;
148 psa
->_in
._in6w
[3] = 0;
149 psa
->_in
._in6w
[4] = 0;
150 psa
->_in
._in6w
[5] = 0xffff;
152 sin
= (struct sockaddr_in
*)sa
;
154 psa
->_in
._in6d
[3] = sin
->sin_addr
.s_addr
;
155 psa
->_port
= ntohs(sin
->sin_port
);
157 struct sockaddr_in6
*sin6
;
159 assert(len
>= sizeof(struct sockaddr_in6
));
161 sin6
= (struct sockaddr_in6
*)sa
;
163 psa
->_in
._in6addr
= sin6
->sin6_addr
;
164 psa
->_port
= ntohs(sin6
->sin6_port
);
168 /* structure SOCKADDR_STORAGE is passed by value */
169 /* len is defaulted to NULL */
170 static SOCKADDR_STORAGE
psa_get_sockaddr_storage(struct psa
*psa
, socklen_t
*len
)
175 family
= psa_get_family(psa
);
176 if (family
== AF_INET
) {
177 struct sockaddr_in
*sin
;
179 sin
= (struct sockaddr_in
*)&sa
;
181 if (len
) *len
= sizeof(struct sockaddr_in
);
182 memset(sin
, 0, sizeof(struct sockaddr_in
));
183 sin
->sin_family
= family
;
184 sin
->sin_port
= htons(psa
->_port
);
185 sin
->sin_addr
.s_addr
= psa
->_in
._in6d
[3];
187 struct sockaddr_in6
*sin6
;
189 sin6
= (struct sockaddr_in6
*)&sa
;
191 memset(sin6
, 0, sizeof(struct sockaddr_in6
));
192 if (len
) *len
= sizeof(struct sockaddr_in6
);
193 sin6
->sin6_family
= family
;
194 sin6
->sin6_addr
= psa
->_in
._in6addr
;
195 sin6
->sin6_port
= htons(psa
->_port
);
200 static str
psa_fmt(struct psa
*psa
, str s
, size_t len
)
207 family
= psa_get_family(psa
);
208 if (family
== AF_INET
) {
209 inet_ntop(family
, (uint32
*)&psa
->_in
._in6d
[3], s
, len
);
215 inet_ntop(family
, (struct in6_addr
*)&psa
->_in
._in6addr
, i
, len
-1);
219 snprintf(i
, len
- (i
-s
), ":%u", psa
->_port
);
222 /*----------------------------------------------------------------------------*/
224 struct PACKED_ATTRIBUTE RST_Info
{
231 /* these packet sizes are including the uTP header wich
232 is either 20 or 23 bytes depending on version */
233 #define PACKET_SIZE_EMPTY_BUCKET 0
234 #define PACKET_SIZE_EMPTY 23
235 #define PACKET_SIZE_SMALL_BUCKET 1
236 #define PACKET_SIZE_SMALL 373
237 #define PACKET_SIZE_MID_BUCKET 2
238 #define PACKET_SIZE_MID 723
239 #define PACKET_SIZE_BIG_BUCKET 3
240 #define PACKET_SIZE_BIG 1400
241 #define PACKET_SIZE_HUGE_BUCKET 4
244 struct PACKED_ATTRIBUTE pf
{ /* Packet Format */
249 uint32_big reply_micro
;
250 /* receive window size in PACKET_SIZE chunks */
252 /* Type of the first extension header */
256 /* Sequence number */
258 /* Acknowledgment number */
262 struct PACKED_ATTRIBUTE pfa
{ /* Packet Format Ack */
269 struct PACKED_ATTRIBUTE pfe
{ /* Packet Format Extensions */
276 /*----------------------------------------------------------------------------*/
278 struct PACKED_ATTRIBUTE pf1
{ /* Packet Format V1 */
279 /* packet_type (4 high bits) */
280 /* protocol version (4 low bits) */
283 /* Type of the first extension header */
288 uint32_big reply_micro
;
289 /* receive window size in bytes */
290 uint32_big windowsize
;
291 /* Sequence number */
293 /* Acknowledgment number */
297 static byte
pf1_version(struct pf1
*pf1
)
299 return pf1
->ver_type
& 0xf;
302 static byte
pf1_type(struct pf1
*pf1
)
304 return pf1
->ver_type
>> 4;
307 static void pf1_version_set(struct pf1
*pf1
, byte v
)
309 pf1
->ver_type
= (pf1
->ver_type
& 0xf0) | (v
& 0x0f);
312 static void pf1_type_set(struct pf1
*pf1
, byte t
)
314 pf1
->ver_type
= (pf1
->ver_type
& 0x0f) | (t
<< 4);
316 /*----------------------------------------------------------------------------*/
318 struct PACKED_ATTRIBUTE pfa1
{ /* Packet Format Ack V1 */
325 struct PACKED_ATTRIBUTE pfe1
{ /* Packet Format Extensions V1 */
332 /* XXX:carefull many compilers do support pragma pack */
333 #if (defined(__SVR4) && defined(__sun))
339 /* TODO:should be cpp defines */
341 ST_DATA
= 0, /* Data packet. */
342 ST_FIN
= 1, /* Finalize the connection. This is the last packet. */
343 ST_STATE
= 2, /* State packet. Used to transmit an ACK with no data. */
344 ST_RESET
= 3, /* Terminate connection forcefully. */
345 ST_SYN
= 4, /* Connect SYN */
346 ST_NUM_STATES
/* used for bounds checking */
349 /* XXX: no lib global initialization function, then must keep it that way or replace it all */
350 static const cstr flagnames
[] = {
351 "ST_DATA","ST_FIN","ST_STATE","ST_RESET","ST_SYN"
354 /* TODO:should be cpp defines */
359 CS_CONNECTED_FULL
= 3,
361 CS_DESTROY_DELAY
= 5,
367 /* XXX: no lib global initialization function, then must keep it that way or replace it all */
368 static const cstr statenames
[] = {
369 "IDLE","SYN_SENT","CONNECTED","CONNECTED_FULL","GOT_FIN","DESTROY_DELAY","FIN_SENT","RESET","DESTROY"
372 struct op
{ /* Outgoing Packet */
375 uint64 time_sent
; /* microseconds */
381 static void no_read(void *socket
, const byte
*bytes
, size_t count
)
387 static void no_write(void *socket
, byte
*bytes
, size_t count
)
393 static size_t no_rb_size(void *socket
)
398 static void no_state(void *socket
, int state
)
403 static void no_error(void *socket
, int errcode
)
408 static void no_overhead(void *socket
, bool send
, size_t count
, int type
)
416 static struct UTPFunctionTable zero_funcs
= {
425 /*----------------------------------------------------------------------------*/
426 struct scb
{/* Sizable Circular Buffer */
427 /* This is the mask. Since it's always a power of 2, adding 1 to this value will return the size. */
429 /* This is the elements that the circular buffer points to */
433 static void *scb_get(struct scb
*scb
, size_t i
)
435 assert(scb
->elements
);
436 return scb
->elements
? scb
->elements
[i
& scb
->mask
] : NULL
;
439 static void scb_put(struct scb
*scb
, size_t i
, void *data
)
441 assert(scb
->elements
);
442 scb
->elements
[i
&scb
->mask
] = data
;
445 /* Item contains the element we want to make space for
446 index is the index in the list. */
447 static void scb_grow(struct scb
*scb
, size_t item
, size_t index
)
453 /* Figure out the new size. */
454 size
= scb
->mask
+ 1;
455 do size
*= 2; while (index
>= size
);
457 /* Allocate the new buffer */
458 buf
= calloc(size
, sizeof(void*));
463 /* XXX: hope the resize op is rarely called and with little data */
465 /* Copy elements from the old buffer to the new buffer */
466 for (i
= 0; i
<= scb
->mask
; ++i
) {
467 buf
[(item
- index
+ i
) & size
] = scb_get(scb
, item
- index
+ i
);
470 /* Swap to the newly allocated buffer */
476 static void scb_ensure_size(struct scb
*scb
, size_t item
, size_t index
)
478 if (index
> scb
->mask
)
479 scb_grow(scb
, item
, index
);
482 static size_t scb_size(struct scb
*scb
)
484 return scb
->mask
+ 1; /* remember: power of 2 magic */
486 /*----------------------------------------------------------------------------*/
488 static struct UTPGlobalStats _global_stats
;
490 /* compare if lhs is less than rhs, taking wrapping
491 into account. if lhs is close to UINT_MAX and rhs
492 is close to 0, lhs is assumed to have wrapped and
493 considered smaller */
494 static bool wrapping_compare_less(uint32 lhs
, uint32 rhs
)
499 /* distance walking from lhs to rhs, downwards */
500 dist_down
= lhs
- rhs
;
501 /* distance walking from lhs to rhs, upwards */
504 /* if the distance walking up is shorter, lhs
505 is less than rhs. If the distance walking down
506 is shorter, then rhs is less than lhs */
507 return dist_up
< dist_down
;
510 /*----------------------------------------------------------------------------*/
511 struct dh
{/* Delay History */
514 /* this is the history of delay samples,
515 normalized by using the delay_base. These
516 values are always greater than 0 and measures
517 the queuing delay in microseconds */
518 uint32 cur_delay_hist
[CUR_DELAY_SIZE
];
519 size_t cur_delay_idx
;
521 /* this is the history of delay_base. It's
522 a number that doesn't have an absolute meaning
523 only relative. It doesn't make sense to initialize
524 it to anything other than values relative to
525 what's been seen in the real world. */
526 uint32 delay_base_hist
[DELAY_BASE_HISTORY
];
527 size_t delay_base_idx
;
528 /* the time when we last stepped the delay_base_idx */
529 uint32 delay_base_time
;
531 bool delay_base_initialized
;
534 static void dh_clear(struct dh
*dh
)
538 dh
->delay_base_initialized
= false;
540 dh
->cur_delay_idx
= 0;
541 dh
->delay_base_idx
= 0;
542 dh
->delay_base_time
= g_current_ms
;
544 for (i
= 0; i
< CUR_DELAY_SIZE
; i
++) {
545 dh
->cur_delay_hist
[i
] = 0;
548 for (i
= 0; i
< DELAY_BASE_HISTORY
; i
++) {
549 dh
->delay_base_hist
[i
] = 0;
553 static void dh_shift(struct dh
*dh
, uint32 offset
)
557 /* the offset should never be "negative"
558 assert(offset < 0x10000000); */
560 /* increase all of our base delays by this amount
561 this is used to take clock skew into account
562 by observing the other side's changes in its base_delay */
563 for (i
= 0; i
< DELAY_BASE_HISTORY
; i
++) {
564 dh
->delay_base_hist
[i
] += offset
;
566 dh
->delay_base
+= offset
;
569 static void dh_add_sample(struct dh
*dh
, uint32 sample
)
571 /* The two clocks (in the two peers) are assumed not to
572 progress at the exact same rate. They are assumed to be
573 drifting, which causes the delay samples to contain
574 a systematic error, either they are under-
575 estimated or over-estimated. This is why we update the
576 delay_base every two minutes, to adjust for this.
578 This means the values will keep drifting and eventually wrap.
579 We can cross the wrapping boundry in two directions, either
580 going up, crossing the highest value, or going down, crossing 0.
582 if the delay_base is close to the max value and sample actually
583 wrapped on the other end we would see something like this:
584 delay_base = 0xffffff00, sample = 0x00000400
585 sample - delay_base = 0x500 which is the correct difference
587 if the delay_base is instead close to 0, and we got an even lower
588 sample (that will eventually update the delay_base), we may see
590 delay_base = 0x00000400, sample = 0xffffff00
591 sample - delay_base = 0xfffffb00
592 this needs to be interpreted as a negative number and the actual
593 recorded delay should be 0.
595 It is important that all arithmetic that assume wrapping
596 is done with unsigned intergers. Signed integers are not guaranteed
597 to wrap the way unsigned integers do. At least GCC takes advantage
598 of this relaxed rule and won't necessarily wrap signed ints.
600 remove the clock offset and propagation delay.
601 delay base is min of the sample and the current
602 delay base. This min-operation is subject to wrapping
603 and care needs to be taken to correctly choose the
606 specifically the problem case is when delay_base is very small
607 and sample is very large (because it wrapped past zero), sample
608 needs to be considered the smaller */
612 if (!dh
->delay_base_initialized
) {
615 /* delay_base being 0 suggests that we haven't initialized
616 it or its history with any real measurements yet. Initialize
617 everything with this sample. */
618 for (i
= 0; i
< DELAY_BASE_HISTORY
; ++i
) {
619 /* if we don't have a value, set it to the current sample */
620 dh
->delay_base_hist
[i
] = sample
;
623 dh
->delay_base
= sample
;
624 dh
->delay_base_initialized
= true;
627 if (wrapping_compare_less(sample
, dh
->delay_base_hist
[dh
->delay_base_idx
])) {
628 /* sample is smaller than the current delay_base_hist entry
630 dh
->delay_base_hist
[dh
->delay_base_idx
] = sample
;
633 /* is sample lower than delay_base? If so, update delay_base */
634 if (wrapping_compare_less(sample
, dh
->delay_base
)) {
635 /* sample is smaller than the current delay_base
637 dh
->delay_base
= sample
;
640 /* this operation may wrap, and is supposed to */
641 delay
= sample
- dh
->delay_base
;
642 /* sanity check. If this is triggered, something fishy is going on
643 it means the measured sample was greater than 32 seconds! */
644 /* assert(delay < 0x2000000); */
646 dh
->cur_delay_hist
[dh
->cur_delay_idx
] = delay
;
647 dh
->cur_delay_idx
= (dh
->cur_delay_idx
+ 1) % CUR_DELAY_SIZE
;
649 /* once every minute */
650 if (g_current_ms
- dh
->delay_base_time
> 60 * 1000) {
653 dh
->delay_base_time
= g_current_ms
;
654 dh
->delay_base_idx
= (dh
->delay_base_idx
+ 1) % DELAY_BASE_HISTORY
;
655 /* clear up the new delay base history spot by initializing
656 it to the current sample, then update it */
657 dh
->delay_base_hist
[dh
->delay_base_idx
] = sample
;
658 dh
->delay_base
= dh
->delay_base_hist
[0];
659 /* Assign the lowest delay in the last 2 minutes to delay_base */
660 for (i
= 0; i
< DELAY_BASE_HISTORY
; ++i
) {
661 if (wrapping_compare_less(dh
->delay_base_hist
[i
], dh
->delay_base
))
662 dh
->delay_base
= dh
->delay_base_hist
[i
];
667 static uint32
dh_get_value(struct dh
*dh
)
673 for (i
= 0; i
< CUR_DELAY_SIZE
; ++i
) {
674 value
= uint32_min(dh
->cur_delay_hist
[i
], value
);
676 /* value could be UINT_MAX if we have no samples yet... */
679 /*----------------------------------------------------------------------------*/
681 /*----------------------------------------------------------------------------*/
687 uint16 reorder_count
;
690 /* the number of bytes we've received but not acked yet */
691 size_t bytes_since_ack
;
693 /* the number of packets in the send queue. Packets that haven't
694 yet been sent count as well as packets marked as needing resend
695 the oldest un-acked packet in the send queue is seq_nr - cur_window_packets */
696 uint16 cur_window_packets
;
698 /* how much of the window is used, number of bytes in-flight
699 packets that have not yet been sent do not count, packets
700 that are marked as needing to be re-sent (due to a timeout)
701 don't count either */
703 /* maximum window size, in bytes */
705 /* SO_SNDBUF setting, in bytes */
707 /* SO_RCVBUF setting, in bytes */
710 /* Is a FIN packet in the reassembly buffer? */
712 /* Timeout procedure */
715 /* max receive window for other end, in bytes */
716 size_t max_window_user
;
717 /* 0 = original uTP header, 1 = second revision */
719 enum CONN_STATE state
;
720 /* TickCount when we last decayed window (wraps) */
721 int32 last_rwin_decay
;
723 /* the sequence number of the FIN packet. This field is only set
724 when we have received a FIN, and the flag field has the FIN flag set.
725 it is used to know when it is safe to destroy the socket, we must have
726 received all packets up to this sequence number first. */
729 /* All sequence numbers up to including this have been properly received
732 /* This is the sequence number for the next packet to be sent. */
735 uint16 timeout_seq_nr
;
737 /* This is the sequence number of the next packet we're allowed to
738 do a fast resend with. This makes sure we only do a fast-resend
739 once per packet. We can resend the packet with this sequence number
740 or any later packet (with a higher sequence number). */
741 uint16 fast_resend_seq_nr
;
745 /* the time when we need to send another ack. If there's
746 nothing to ack, this is a very large number */
749 uint32 last_got_packet
;
750 uint32 last_sent_packet
;
751 uint32 last_measured_delay
;
752 uint32 last_maxed_out_window
;
754 /* the last time we added send quota to the connection
755 when adding send quota, this is subtracted from the
756 current time multiplied by max_window / rtt
757 which is the current allowed send rate. */
758 int32 last_send_quota
;
760 /* the number of bytes we are allowed to send on
761 this connection. If this is more than one packet
762 size when we run out of data to send, it is clamped
764 this value is multiplied by 100 in order to get
765 higher accuracy when dealing with low rates */
768 SendToProc
*send_to_proc
;
769 void *send_to_userdata
;
770 struct UTPFunctionTable func
;
773 /* Round trip time */
775 /* Round trip time variance */
777 /* Round trip timeout */
780 uint retransmit_timeout
;
781 /* The RTO timer will timeout here. */
783 /* When the window size is set to zero, start this timer. It will send a new packet every 30secs. */
784 uint32 zerowindow_time
;
787 /* Connection ID for packets I receive */
789 /* Connection ID for packets I send */
791 /* Last rcv window we advertised, in bytes */
795 struct dh their_hist
;
797 /* extension bytes from SYN packet */
804 /* Public stats, returned by UTP_GetStats(). See utp.h */
805 struct UTPStats _stats
;
809 static size_t us_get_udp_mtu(struct UTPSocket
*us
)
814 sa
= psa_get_sockaddr_storage(&us
->addr
, &len
);
815 return UTP_GetUDPMTU((struct sockaddr
*)&sa
, len
);
818 /* returns the max number of bytes of payload the uTP
819 connection is allowed to send */
820 static size_t us_get_packet_size(struct UTPSocket
*us
)
825 header_size
= us
->version
== 1 ? sizeof(struct pf1
) : sizeof(struct pf
);
827 mtu
= us_get_udp_mtu(us
);
829 if (DYNAMIC_PACKET_SIZE_ENABLED
) {
831 size_t max_packet_size
;
833 sa
= psa_get_sockaddr_storage(&us
->addr
, NULL
);
834 max_packet_size
= UTP_sockaddr_GetPacketSize((struct sockaddr
*)&sa
);
835 return size_t_min(mtu
- header_size
, max_packet_size
);
838 return mtu
- header_size
;
841 /* Calculates the current receive window */
842 static size_t us_get_rcv_window(struct UTPSocket
*us
)
846 /* If we don't have a connection (such as during connection
847 establishment, always act as if we have an empty buffer). */
849 return us
->opt_rcvbuf
;
851 /* Trim window down according to what's already in buffer. */
852 numbuf
= us
->func
.get_rb_size(us
->userdata
);
853 assert((int)numbuf
>= 0);
854 return us
->opt_rcvbuf
> numbuf
? us
->opt_rcvbuf
- numbuf
: 0;
857 /* Test if we're ready to decay max_window
858 XXX this breaks when spaced by > INT_MAX/2, which is 49
859 days; the failure mode in that case is we do an extra decay
860 or fail to do one when we really shouldn't. */
861 static bool us_can_decay_win(struct UTPSocket
*us
, int32 msec
)
863 return msec
- us
->last_rwin_decay
>= MAX_WINDOW_DECAY
;
866 /* If we can, decay max window, returns true if we actually did so */
867 static void us_maybe_decay_win(struct UTPSocket
*us
)
869 if (us_can_decay_win(us
, g_current_ms
)) {
871 us
->max_window
= (size_t)(us
->max_window
* .5);
872 us
->last_rwin_decay
= g_current_ms
;
873 if (us
->max_window
< MIN_WINDOW_SIZE
)
874 us
->max_window
= MIN_WINDOW_SIZE
;
878 static size_t us_get_header_size(struct UTPSocket
*us
)
880 return (us
->version
? sizeof(struct pf1
) : sizeof(struct pf
));
883 static size_t us_get_header_extensions_size(struct UTPSocket
*us
)
885 return (us
->version
? sizeof(struct pfe1
) : sizeof(struct pfe
));
888 static void us_sent_ack(struct UTPSocket
*us
)
890 us
->ack_time
= g_current_ms
+ 0x70000000;
891 us
->bytes_since_ack
= 0;
894 static size_t us_get_udp_overhead(struct UTPSocket
*us
)
899 sa
= psa_get_sockaddr_storage(&us
->addr
, &len
);
900 return UTP_GetUDPOverhead((struct sockaddr
*)&sa
, len
);
904 /* we keep this function around but it's not used */
905 static uint64
us_get_global_utp_bytes_sent(struct UTPSocket
*us
)
910 sa
= psa_get_sockaddr_storage(&us
->addr
, &len
);
911 return UTP_GetGlobalUTPBytesSent((struct sockaddr
*)&sa
, len
);
915 static size_t us_get_overhead(struct UTPSocket
*us
)
917 return us_get_udp_overhead(us
) + us_get_header_size(us
);
919 /*----------------------------------------------------------------------------*/
921 struct array g_rst_info
= {NULL
, 0, 0, sizeof(struct RST_Info
)};
922 struct array g_utp_sockets
= {NULL
, 0, 0, sizeof(struct UTPSocket
*)};
924 static void UTP_RegisterSentPacket(size_t length
) {
925 if (length
<= PACKET_SIZE_MID
) {
926 if (length
<= PACKET_SIZE_EMPTY
) {
927 _global_stats
._nraw_send
[PACKET_SIZE_EMPTY_BUCKET
]++;
928 } else if (length
<= PACKET_SIZE_SMALL
) {
929 _global_stats
._nraw_send
[PACKET_SIZE_SMALL_BUCKET
]++;
931 _global_stats
._nraw_send
[PACKET_SIZE_MID_BUCKET
]++;
933 if (length
<= PACKET_SIZE_BIG
) {
934 _global_stats
._nraw_send
[PACKET_SIZE_BIG_BUCKET
]++;
936 _global_stats
._nraw_send
[PACKET_SIZE_HUGE_BUCKET
]++;
940 static void send_to_addr(SendToProc
*send_to_proc
, void *send_to_userdata
, byte
*p
, size_t len
, struct psa
*addr
)
945 to
= psa_get_sockaddr_storage(addr
, &tolen
);
946 UTP_RegisterSentPacket(len
);
947 send_to_proc(send_to_userdata
, p
, len
, (const struct sockaddr
*)&to
, tolen
);
950 static void us_send_data(struct UTPSocket
*us
, struct pf
*b
, size_t length
, enum bandwidth_type_t type
)
954 #if g_log_utp_verbose
960 /* time stamp this packet with local time, the stamp goes into
961 the header of every packet at the 8th byte for 8 bytes :
962 two integers, check packet.h for more */
963 time
= UTP_GetMicroseconds();
966 if (us
->version
== 0) {
967 b
->tv_sec
= htonl((uint32
)(time
/ 1000000));
968 b
->tv_usec
= htonl(time
% 1000000);
969 b
->reply_micro
= htonl(us
->reply_micro
);
971 b1
->tv_usec
= htonl((uint32
)time
);
972 b1
->reply_micro
= htonl(us
->reply_micro
);
975 us
->last_sent_packet
= g_current_ms
;
978 us
->_stats
._nbytes_xmit
+= length
;
984 if (type
== payload_bandwidth
) {
985 /* if this packet carries payload, just
986 count the header as overhead */
987 type
= header_overhead
;
988 n
= us_get_overhead(us
);
990 n
= length
+ us_get_udp_overhead(us
);
991 us
->func
.on_overhead(us
->userdata
, true, n
, type
);
993 #if g_log_utp_verbose
994 flags
= us
->version
== 0 ? b
->flags
: pf1_type(b1
);
995 seq_nr
= us
->version
== 0 ? ntohs(b
->seq_nr
) : ntohs(b1
->seq_nr
);
996 ack_nr
= us
->version
== 0 ? ntohs(b
->ack_nr
) : ntohs(b1
->ack_nr
);
997 LOG_UTPV("0x%08x: send %s len:%u id:%u timestamp:"I64u
" reply_micro:%u flags:%s seq_nr:%u ack_nr:%u", us
, addrfmt(&us
->addr
, addrbuf
), (uint
)length
, us
->conn_id_send
, time
, us
->reply_micro
, flagnames
[flags
], seq_nr
, ack_nr
);
999 send_to_addr(us
->send_to_proc
, us
->send_to_userdata
, (byte
*)b
, length
, &us
->addr
);
1002 /* XXX:carefull, synack defaults to false */
1003 static void us_send_ack(struct UTPSocket
*us
, bool synack
)
1005 /* all following structs fit in PacketFormatExtensions */
1012 memset(&pfe
, 0, sizeof(pfe
));
1014 pfe1
= (struct pfe1
*)&pfe
;
1015 pfa
= (struct pfa
*)pfe1
;
1016 pfa1
= (struct pfa1
*)pfe1
;
1018 us
->last_rcv_win
= us_get_rcv_window(us
);
1019 if (us
->version
== 0) {
1020 pfa
->pf
.connid
= htonl(us
->conn_id_send
);
1021 pfa
->pf
.ack_nr
= htons(us
->ack_nr
);
1022 pfa
->pf
.seq_nr
= htons(us
->seq_nr
);
1023 pfa
->pf
.flags
= ST_STATE
;
1025 pfa
->pf
.windowsize
= (byte
)DIV_ROUND_UP(us
->last_rcv_win
, PACKET_SIZE
);
1026 len
= sizeof(struct pf
);
1028 pf1_version_set(&pfa1
->pf
, 1);
1029 pf1_type_set(&pfa1
->pf
, ST_STATE
);
1031 pfa1
->pf
.connid
= htons((uint16
)us
->conn_id_send
);
1032 pfa1
->pf
.ack_nr
= htons(us
->ack_nr
);
1033 pfa1
->pf
.seq_nr
= htons(us
->seq_nr
);
1034 pfa1
->pf
.windowsize
= htonl((uint32
)us
->last_rcv_win
);
1035 len
= sizeof(struct pf1
);
1038 /* we never need to send EACK for connections
1039 that are shutting down */
1040 if (us
->reorder_count
!= 0 && us
->state
< CS_GOT_FIN
) {
1045 /* if reorder count > 0, send an EACK.
1046 reorder count should always be 0
1047 for synacks, so this should not be
1050 if (us
->version
== 0) {
1062 /* reorder count should only be non-zero
1063 if the packet ack_nr + 1 has not yet
1065 assert(scb_get(&us
->inbuf
, us
->ack_nr
+ 1) == NULL
);
1066 window
= size_t_min(14 + 16, scb_size(&us
->inbuf
));
1067 /* Generate bit mask of segments received. */
1068 for (i
= 0; i
< window
; ++i
) {
1069 if (scb_get(&us
->inbuf
, us
->ack_nr
+ i
+ 2) != NULL
) {
1071 LOG_UTPV("0x%08x: EACK packet [%u]", us
, us
->ack_nr
+ i
+ 2);
1074 if (us
->version
== 0) {
1075 pfa
->acks
[0] = (byte
)m
;
1076 pfa
->acks
[1] = (byte
)(m
>> 8);
1077 pfa
->acks
[2] = (byte
)(m
>> 16);
1078 pfa
->acks
[3] = (byte
)(m
>> 24);
1080 pfa1
->acks
[0] = (byte
)m
;
1081 pfa1
->acks
[1] = (byte
)(m
>> 8);
1082 pfa1
->acks
[2] = (byte
)(m
>> 16);
1083 pfa1
->acks
[3] = (byte
)(m
>> 24);
1086 LOG_UTPV("0x%08x: Sending EACK %u [%u] bits:[%032b]", us
, us
->ack_nr
, us
->conn_id_send
, m
);
1087 } else if (synack
) {
1088 /* we only send "extensions" in response to SYN
1089 and the reorder count is 0 in that state */
1091 LOG_UTPV("0x%08x: Sending ACK %u [%u] with extension bits", us
, us
->ack_nr
, us
->conn_id_send
);
1092 if (us
->version
== 0) {
1096 memset(&pfe
.extensions
[0], 0, 8);
1101 memset(&pfe1
->extensions
[0], 0, 8);
1105 LOG_UTPV("0x%08x: Sending ACK %u [%u]", us
, us
->ack_nr
, us
->conn_id_send
);
1109 us_send_data(us
, (struct pf
*)&pfe
, len
, ack_overhead
);
1112 static void us_send_keep_alive(struct UTPSocket
*us
)
1115 LOG_UTPV("0x%08x: Sending KeepAlive ACK %u [%u]", us
, us
->ack_nr
, us
->conn_id_send
);
1116 us_send_ack(us
, false);
1120 /* XXX: static class member function */
1121 static void us_send_rst(SendToProc
*send_to_proc
, void *send_to_userdata
, struct psa
*addr
, uint32 conn_id_send
, uint16 ack_nr
, uint16 seq_nr
, byte version
)
1123 struct pf pf
; /* 23 bytes */
1124 struct pf1
*pf1
; /* 20 bytes */
1127 memset(&pf
, 0, sizeof(pf
));
1128 pf1
= (struct pf1
*)&pf
;
1131 pf
.connid
= htonl(conn_id_send
);
1132 pf
.ack_nr
= htons(ack_nr
);
1133 pf
.seq_nr
= htons(seq_nr
);
1134 pf
.flags
= ST_RESET
;
1139 pf1_version_set(pf1
, 1);
1140 pf1_type_set(pf1
, ST_RESET
);
1142 pf1
->connid
= htons((uint16
)conn_id_send
);
1143 pf1
->ack_nr
= htons(ack_nr
);
1144 pf1
->seq_nr
= htons(seq_nr
);
1145 pf1
->windowsize
= 0;
1149 LOG_UTPV("%s: Sending RST id:%u seq_nr:%u ack_nr:%u", addrfmt(addr
, addrbuf
), conn_id_send
, seq_nr
, ack_nr
);
1150 LOG_UTPV("send %s len:%u id:%u", addrfmt(addr
, addrbuf
), (uint
)len
, conn_id_send
);
1151 send_to_addr(send_to_proc
, send_to_userdata
, (byte
*)pf1
, len
, addr
);
1154 static void us_send_packet(struct UTPSocket
*us
, struct op
*pkt
)
1161 /* only count against the quota the first time we
1162 send the packet. Don't enforce quota when closing
1163 a socket. Only enforce the quota when we're sending
1164 at slow rates (max window < packet size) */
1165 max_send
= size_t_min3(us
->max_window
, us
->opt_sndbuf
, us
->max_window_user
);
1167 if (pkt
->transmissions
== 0 || pkt
->need_resend
)
1168 us
->cur_window
+= pkt
->payload
;
1170 packet_size
= us_get_packet_size(us
);
1171 if (pkt
->transmissions
== 0 && max_send
< packet_size
) {
1172 assert(us
->state
== CS_FIN_SENT
|| (int32
)pkt
->payload
<= us
->send_quota
/ 100);
1173 us
->send_quota
= us
->send_quota
- (int32
)(pkt
->payload
* 100);
1176 pkt
->need_resend
= false;
1178 p1
= (struct pf1
*)pkt
->data
;
1179 p
= (struct pf
*)pkt
->data
;
1180 if (us
->version
== 0) {
1181 p
->ack_nr
= htons(us
->ack_nr
);
1183 p1
->ack_nr
= htons(us
->ack_nr
);
1185 pkt
->time_sent
= UTP_GetMicroseconds();
1186 pkt
->transmissions
++;
1188 us_send_data(us
, (struct pf
*)pkt
->data
, pkt
->length
, (us
->state
== CS_SYN_SENT
) ? connect_overhead
: (pkt
->transmissions
== 1) ? payload_bandwidth
: retransmit_overhead
);
1191 static bool us_is_writable(struct UTPSocket
*us
, size_t to_write
)
1196 /* return true if it's OK to stuff another packet into the
1197 outgoing queue. Since we may be using packet pacing, we
1198 might not actually send the packet right away to affect the
1199 cur_window. The only thing that happens when we add another
1200 packet is that cur_window_packets is increased. */
1201 max_send
= size_t_min3(us
->max_window
, us
->opt_sndbuf
, us
->max_window_user
);
1203 packet_size
= us_get_packet_size(us
);
1205 if (us
->cur_window
+ packet_size
>= us
->max_window
)
1206 us
->last_maxed_out_window
= g_current_ms
;
1208 /* if we don't have enough quota, we can't write regardless */
1209 if (USE_PACKET_PACING
)
1210 if (us
->send_quota
/ 100 < (int32
)to_write
)
1213 /* subtract one to save space for the FIN packet */
1214 if (us
->cur_window_packets
>= OUTGOING_BUFFER_MAX_SIZE
- 1)
1217 /* if sending another packet would not make the window exceed
1218 the max_window, we can write */
1219 if (us
->cur_window
+ packet_size
<= max_send
)
1222 /* if the window size is less than a packet, and we have enough
1223 quota to send a packet, we can write, even though it would
1224 make the window exceed the max size
1225 the last condition is needed to not put too many packets
1226 in the send buffer. cur_window isn't updated until we flush
1227 the send buffer, so we need to take the number of packets
1229 if (USE_PACKET_PACING
)
1230 if (us
->max_window
< to_write
&& us
->cur_window
< us
->max_window
&& us
->cur_window_packets
== 0)
1235 static bool us_flush_packets(struct UTPSocket
*us
)
1240 packet_size
= us_get_packet_size(us
);
1242 /* send packets that are waiting on the pacer to be sent
1243 i has to be an unsigned 16 bit counter to wrap correctly
1244 signed types are not guaranteed to wrap the way you expect */
1245 for (i
= us
->seq_nr
- us
->cur_window_packets
; i
!= us
->seq_nr
; ++i
) {
1248 pkt
= (struct op
*)scb_get(&us
->outbuf
, i
);
1249 if (pkt
== 0 || (pkt
->transmissions
> 0 && pkt
->need_resend
== false))
1251 /* have we run out of quota? */
1252 if (!us_is_writable(us
, pkt
->payload
))
1256 don't send the last packet if we have one packet in-flight
1257 and the current packet is still smaller than packet_size. */
1258 if (i
!= ((us
->seq_nr
- 1) & ACK_NR_MASK
) || us
->cur_window_packets
== 1 || pkt
->payload
>= packet_size
) {
1259 us_send_packet(us
, pkt
);
1261 /* No need to send another ack if there is nothing to reorder. */
1262 if (us
->reorder_count
== 0)
1269 static void us_write_outgoing_packet(struct UTPSocket
*us
, size_t payload
, uint flags
)
1273 /* Setup initial timeout timer */
1274 if (us
->cur_window_packets
== 0) {
1275 us
->retransmit_timeout
= us
->rto
;
1276 us
->rto_timeout
= g_current_ms
+ us
->retransmit_timeout
;
1277 assert(us
->cur_window
== 0);
1280 packet_size
= us_get_packet_size(us
);
1289 assert(us
->cur_window_packets
< OUTGOING_BUFFER_MAX_SIZE
);
1290 assert(flags
== ST_DATA
|| flags
== ST_FIN
);
1295 if (us
->cur_window_packets
> 0)
1296 pkt
= (struct op
*)scb_get(&us
->outbuf
, us
->seq_nr
- 1);
1298 header_size
= us_get_header_size(us
);
1301 /* if there's any room left in the last packet in the window
1302 and it hasn't been sent yet, fill that frame first */
1303 if (payload
&& pkt
&& !pkt
->transmissions
&& pkt
->payload
< packet_size
) {
1304 /* Use the previous unsent packet */
1305 added
= size_t_min(payload
+ pkt
->payload
, size_t_max(packet_size
, pkt
->payload
)) - pkt
->payload
;
1306 pkt
= realloc(pkt
, (sizeof(struct op
) - 1) + header_size
+ pkt
->payload
+ added
);
1307 scb_put(&us
->outbuf
, us
->seq_nr
- 1, pkt
);
1309 assert(!pkt
->need_resend
);
1311 /* Create the packet to send. */
1313 pkt
= malloc((sizeof(struct op
) - 1) + header_size
+ added
);
1315 pkt
->transmissions
= 0;
1316 pkt
->need_resend
= false;
1320 /* Fill it with data from the upper layer. */
1321 us
->func
.on_write(us
->userdata
, pkt
->data
+ header_size
+ pkt
->payload
, added
);
1323 pkt
->payload
+= added
;
1324 pkt
->length
= header_size
+ pkt
->payload
;
1326 us
->last_rcv_win
= us_get_rcv_window(us
);
1328 p
= (struct pf
*)pkt
->data
;
1329 p1
= (struct pf1
*)pkt
->data
;
1330 if (us
->version
== 0) {
1331 p
->connid
= htonl(us
->conn_id_send
);
1333 p
->windowsize
= (byte
)DIV_ROUND_UP(us
->last_rcv_win
, PACKET_SIZE
);
1334 p
->ack_nr
= htons(us
->ack_nr
);
1337 pf1_version_set(p1
, 1);
1338 pf1_type_set(p1
, flags
);
1340 p1
->connid
= htons((uint16
)us
->conn_id_send
);
1341 p1
->windowsize
= htonl((uint32
)us
->last_rcv_win
);
1342 p1
->ack_nr
= htons(us
->ack_nr
);
1346 /* Remember the message in the outgoing queue. */
1347 scb_ensure_size(&us
->outbuf
, us
->seq_nr
, us
->cur_window_packets
);
1348 scb_put(&us
->outbuf
, us
->seq_nr
, pkt
);
1349 if (us
->version
== 0)
1350 p
->seq_nr
= htons(us
->seq_nr
);
1352 p1
->seq_nr
= htons(us
->seq_nr
);
1354 us
->cur_window_packets
++;
1360 us_flush_packets(us
);
1363 static void us_update_send_quota(struct UTPSocket
*us
)
1368 dt
= g_current_ms
- us
->last_send_quota
;
1372 us
->last_send_quota
= g_current_ms
;
1373 add
= us
->max_window
* dt
* 100 / (us
->rtt_hist
.delay_base
?us
->rtt_hist
.delay_base
:50);
1374 if (add
> us
->max_window
* 100 && add
> MAX_CWND_INCREASE_BYTES_PER_RTT
* 100)
1375 add
= us
->max_window
;
1376 us
->send_quota
+= (int32
)add
;
1377 /* LOG_UTPV("0x%08x: UTPSocket::update_send_quota dt:%d rtt:%u max_window:%u quota:%d", this, dt, rtt, (uint)max_window, send_quota / 100);*/
1381 static void us_check_invariant(struct UTPSocket
*us
)
1383 size_t outstanding_bytes
;
1386 if (us
->reorder_count
> 0)
1387 assert(scb_get(&us
->inbuf
, us
->ack_nr
+ 1) == NULL
);
1389 outstanding_bytes
= 0;
1390 for (i
= 0; i
< us
->cur_window_packets
; ++i
) {
1393 pkt
= (struct op
*)scb_get(&us
->outbuf
, us
->seq_nr
- i
- 1);
1394 if (pkt
== 0 || pkt
->transmissions
== 0 || pkt
->need_resend
)
1396 outstanding_bytes
+= pkt
->payload
;
1398 assert(outstanding_bytes
== us
->cur_window
);
1402 static void us_check_timeouts(struct UTPSocket
*us
)
1406 us_check_invariant(us
);
1409 /* this invariant should always be true */
1410 assert(us
->cur_window_packets
== 0 || scb_get(&us
->outbuf
, us
->seq_nr
- us
->cur_window_packets
));
1412 LOG_UTPV("0x%08x: CheckTimeouts timeout:%d max_window:%u cur_window:%u quota:%d " "state:%s cur_window_packets:%u bytes_since_ack:%u ack_time:%d", us
, (int)(us
->rto_timeout
- g_current_ms
), (uint
)us
->max_window
, (uint
)us
->cur_window
, us
->send_quota
/ 100, statenames
[us
->state
], us
->cur_window_packets
, (uint
)us
->bytes_since_ack
, (int)(g_current_ms
- us
->ack_time
));
1414 us_update_send_quota(us
);
1415 us_flush_packets(us
);
1417 if (USE_PACKET_PACING
) {
1418 /* In case the new send quota made it possible to send another packet
1419 Mark the socket as writable. If we don't use pacing, the send
1420 quota does not affect if the socket is writeable
1421 if we don't use packet pacing, the writable event is triggered
1422 whenever the cur_window falls below the max_window, so we don't
1423 need this check then */
1424 if (us
->state
== CS_CONNECTED_FULL
&& us_is_writable(us
, us_get_packet_size(us
))) {
1425 us
->state
= CS_CONNECTED
;
1426 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", us
, (uint
)us
->max_window
, (uint
)us
->cur_window
, us
->send_quota
/ 100, (uint
)us_get_packet_size(us
));
1427 us
->func
.on_state(us
->userdata
, UTP_STATE_WRITABLE
);
1431 switch (us
->state
) {
1433 case CS_CONNECTED_FULL
:
1437 /* Reset max window... */
1438 if ((int)(g_current_ms
- us
->zerowindow_time
) >= 0 && us
->max_window_user
== 0)
1439 us
->max_window_user
= PACKET_SIZE
;
1441 if ((int)(g_current_ms
- us
->rto_timeout
) >= 0 && (!(USE_PACKET_PACING
) || us
->cur_window_packets
> 0) && us
->rto_timeout
> 0) {
1446 OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
1448 // If there were a lot of retransmissions, force recomputation of round trip time
1449 if (pkt->transmissions >= 4)
1454 new_timeout
= us
->retransmit_timeout
* 2;
1455 if (new_timeout
>= 30000 || (us
->state
== CS_SYN_SENT
&& new_timeout
> 6000)) {
1456 /* more than 30 seconds with no reply. kill it.
1457 if we haven't even connected yet, give up sooner. 6 seconds
1458 means 2 tries at the following timeouts: 3, 6 seconds */
1459 if (us
->state
== CS_FIN_SENT
)
1460 us
->state
= CS_DESTROY
;
1462 us
->state
= CS_RESET
;
1463 us
->func
.on_error(us
->userdata
, ETIMEDOUT
);
1467 us
->retransmit_timeout
= new_timeout
;
1468 us
->rto_timeout
= g_current_ms
+ new_timeout
;
1471 us
->duplicate_ack
= 0;
1473 /* rate = min_rate */
1474 us
->max_window
= us_get_packet_size(us
);
1475 us
->send_quota
= int32_max((int32
)us
->max_window
* 100, us
->send_quota
);
1477 /* every packet should be considered lost */
1478 for (i
= 0; i
< us
->cur_window_packets
; ++i
) {
1481 pkt
= (struct op
*)scb_get(&us
->outbuf
, us
->seq_nr
- i
- 1);
1482 if (pkt
== 0 || pkt
->transmissions
== 0 || pkt
->need_resend
)
1484 pkt
->need_resend
= true;
1485 assert(us
->cur_window
>= pkt
->payload
);
1486 us
->cur_window
-= pkt
->payload
;
1489 /* used in parse_log.py */
1490 LOG_UTP("0x%08x: Packet timeout. Resend. seq_nr:%u. timeout:%u max_window:%u", us
, us
->seq_nr
- us
->cur_window_packets
, us
->retransmit_timeout
, (uint
)us
->max_window
);
1492 us
->fast_timeout
= true;
1493 us
->timeout_seq_nr
= us
->seq_nr
;
1495 if (us
->cur_window_packets
> 0) {
1498 pkt
= (struct op
*)scb_get(&us
->outbuf
, us
->seq_nr
- us
->cur_window_packets
);
1500 us
->send_quota
= int32_max((int32
)pkt
->length
* 100, us
->send_quota
);
1502 /* Re-send the packet. */
1503 us_send_packet(us
, pkt
);
1507 /* Mark the socket as writable */
1508 if (us
->state
== CS_CONNECTED_FULL
&& us_is_writable(us
, us_get_packet_size(us
))) {
1509 us
->state
= CS_CONNECTED
;
1510 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", us
, (uint
)us
->max_window
, (uint
)us
->cur_window
, us
->send_quota
/ 100, (uint
)us_get_packet_size(us
));
1511 us
->func
.on_state(us
->userdata
, UTP_STATE_WRITABLE
);
1514 if (us
->state
>= CS_CONNECTED
&& us
->state
<= CS_FIN_SENT
) {
1515 /* Send acknowledgment packets periodically, or when the threshold is reached */
1516 if (us
->bytes_since_ack
> DELAYED_ACK_BYTE_THRESHOLD
|| (int)(g_current_ms
- us
->ack_time
) >= 0)
1517 us_send_ack(us
, false);
1519 if ((int)(g_current_ms
- us
->last_sent_packet
) >= KEEPALIVE_INTERVAL
)
1520 us_send_keep_alive(us
);
1527 case CS_DESTROY_DELAY
:
1528 if ((int)(g_current_ms
- us
->rto_timeout
) >= 0) {
1529 us
->state
= (us
->state
== CS_DESTROY_DELAY
) ? CS_DESTROY
: CS_RESET
;
1530 if (us
->cur_window_packets
> 0 && us
->userdata
)
1531 us
->func
.on_error(us
->userdata
, ECONNRESET
);
1534 /* prevent warning */
1543 /* make sure we don't accumulate quota when we don't have
1545 limit
= int32_max((int32
)us
->max_window
/ 2, 5 * (int32
)us_get_packet_size(us
)) * 100;
1546 if (us
->send_quota
> limit
) us
->send_quota
= limit
;
1550 0: the packet was acked.
1551 1: it means that the packet had already been acked
1552 2: the packet has not been sent yet */
1553 static int us_ack_packet(struct UTPSocket
*us
, uint16 seq
)
1557 pkt
= (struct op
*)scb_get(&us
->outbuf
, seq
);
1559 /* the packet has already been acked (or not sent) */
1561 LOG_UTPV("0x%08x: got ack for:%u (already acked, or never sent)", us
, seq
);
1565 /* can't ack packets that haven't been sent yet! */
1566 if (pkt
->transmissions
== 0) {
1567 LOG_UTPV("0x%08x: got ack for:%u (never sent, pkt_size:%u need_resend:%u)", us
, seq
, (uint
)pkt
->payload
, pkt
->need_resend
);
1571 LOG_UTPV("0x%08x: got ack for:%u (pkt_size:%u need_resend:%u)", us
, seq
, (uint
)pkt
->payload
, pkt
->need_resend
);
1573 scb_put(&us
->outbuf
, seq
, NULL
);
1575 /* if we never re-sent the packet, update the RTT estimate */
1576 if (pkt
->transmissions
== 1) {
1579 /* Estimate the round trip time. */
1580 ertt
= (uint32
)((UTP_GetMicroseconds() - pkt
->time_sent
) / 1000);
1583 /* First round trip time sample */
1585 us
->rtt_var
= ertt
/ 2;
1586 /* sanity check. rtt should never be more than 6 seconds */
1587 /* assert(rtt < 6000); */
1591 /* Compute new round trip times */
1592 delta
= (int)us
->rtt
- ertt
;
1593 us
->rtt_var
= us
->rtt_var
+ (int)(abs(delta
) - us
->rtt_var
) / 4;
1594 us
->rtt
= us
->rtt
- us
->rtt
/8 + ertt
/8;
1595 /* sanity check. rtt should never be more than 6 seconds */
1596 /* assert(rtt < 6000); */
1597 dh_add_sample(&us
->rtt_hist
, ertt
);
1599 us
->rto
= uint_max(us
->rtt
+ us
->rtt_var
* 4, 500);
1600 LOG_UTPV("0x%08x: rtt:%u avg:%u var:%u rto:%u", us
, ertt
, us
->rtt
, us
->rtt_var
, us
->rto
);
1602 us
->retransmit_timeout
= us
->rto
;
1603 us
->rto_timeout
= g_current_ms
+ us
->rto
;
1604 /* if need_resend is set, this packet has already
1605 been considered timed-out, and is not included in
1606 the cur_window anymore */
1607 if (!pkt
->need_resend
) {
1608 assert(us
->cur_window
>= pkt
->payload
);
1609 us
->cur_window
-= pkt
->payload
;
1615 /* count the number of bytes that were acked by the EACK header */
1616 /* XXX: carefull min_rtt was a cerkerk reference */
1617 static size_t us_selective_ack_bytes(struct UTPSocket
*us
, uint base
, byte
* mask
, byte len
, int64
*min_rtt
)
1622 if (us
->cur_window_packets
== 0)
1634 /* ignore bits that haven't been sent yet
1635 see comment in UTPSocket::selective_ack */
1636 if (((us
->seq_nr
- v
- 1) & ACK_NR_MASK
) >= (uint16
)(us
->cur_window_packets
- 1))
1639 /* ignore bits that represents packets we haven't sent yet
1640 or packets that have already been acked */
1641 pkt
= (struct op
*)scb_get(&us
->outbuf
, v
);
1642 if (!pkt
|| pkt
->transmissions
== 0)
1645 /* Count the number of segments that were successfully received past it. */
1646 if (bits
>= 0 && mask
[bits
>>3] & (1 << (bits
& 7))) {
1647 assert((int)(pkt
->payload
) >= 0);
1648 acked_bytes
+= pkt
->payload
;
1649 *min_rtt
= int64_min(*min_rtt
, UTP_GetMicroseconds() - pkt
->time_sent
);
1652 } while (--bits
>= -1);
1656 #define MAX_EACK 128
1658 static void us_selective_ack(struct UTPSocket
*us
, uint base
, byte
*mask
, byte len
)
1662 int resends
[MAX_EACK
];
1667 if (us
->cur_window_packets
== 0)
1670 /* the range is inclusive [0, 31] bits */
1675 /* resends is a stack of sequence numbers we need to resend. Since we
1676 iterate in reverse over the acked packets, at the end, the top packets
1677 are the ones we want to resend */
1678 /* resends[MAX_EACK]; */
1681 LOG_UTPV("0x%08x: Got EACK [%032b] base:%u", us
, *(uint32
*)mask
, base
);
1687 /* we're iterating over the bits from higher sequence numbers
1688 to lower (kind of in reverse order, wich might not be very
1692 /* ignore bits that haven't been sent yet
1693 and bits that fall below the ACKed sequence number
1694 this can happen if an EACK message gets
1695 reordered and arrives after a packet that ACKs up past
1696 the base for thie EACK message
1698 this is essentially the same as:
1699 if v >= seq_nr || v <= seq_nr - cur_window_packets
1700 but it takes wrapping into account
1702 if v == seq_nr the -1 will make it wrap. if v > seq_nr
1703 it will also wrap (since it will fall further below 0)
1704 and be > cur_window_packets.
1705 if v == seq_nr - cur_window_packets, the result will be
1706 seq_nr - (seq_nr - cur_window_packets) - 1
1707 == seq_nr - seq_nr + cur_window_packets - 1
1708 == cur_window_packets - 1 which will be caught by the
1709 test. If v < seq_nr - cur_window_packets the result will grow
1710 fall furhter outside of the cur_window_packets range.
1712 sequence number space:
1714 rejected < accepted > rejected
1715 <============+--------------+============>
1718 (seq_nr-wnd) seq_nr */
1720 if (((us
->seq_nr
- v
- 1) & ACK_NR_MASK
) >= (uint16
)(us
->cur_window_packets
- 1))
1723 /* this counts as a duplicate ack, even though we might have
1724 received an ack for this packet previously (in another EACK
1725 message for instance) */
1726 bit_set
= bits
>= 0 && mask
[bits
>>3] & (1 << (bits
& 7));
1728 /* if this packet is acked, it counts towards the duplicate ack counter */
1732 /* ignore bits that represents packets we haven't sent yet
1733 or packets that have already been acked */
1734 pkt
= (struct op
*)scb_get(&us
->outbuf
, v
);
1735 if (!pkt
|| pkt
->transmissions
== 0) {
1736 LOG_UTPV("0x%08x: skipping %u. pkt:%08x transmissions:%u %s", us
, v
, pkt
, pkt
?pkt
->transmissions
:0, pkt
?"(not sent yet?)":"(already acked?)");
1740 /* Count the number of segments that were successfully received past it. */
1742 /* the selective ack should never ACK the packet we're waiting for to decrement cur_window_packets */
1743 assert((v
& us
->outbuf
.mask
) != ((us
->seq_nr
- us
->cur_window_packets
) & us
->outbuf
.mask
));
1744 us_ack_packet(us
, v
);
1749 if count is less than our re-send limit, we haven't seen enough
1750 acked packets in front of this one to warrant a re-send.
1751 if count == 0, we're still going through the tail of zeroes */
1752 if (((v
- us
->fast_resend_seq_nr
) & ACK_NR_MASK
) <= OUTGOING_BUFFER_MAX_SIZE
&& count
>= DUPLICATE_ACKS_BEFORE_RESEND
&& us
->duplicate_ack
< DUPLICATE_ACKS_BEFORE_RESEND
) {
1753 /* resends is a stack, and we're mostly interested in the top of it
1754 if we're full, just throw away the lower half */
1755 if (nr
>= MAX_EACK
- 2) {
1756 memmove(resends
, &resends
[MAX_EACK
/2], MAX_EACK
/2 * sizeof(resends
[0]));
1760 LOG_UTPV("0x%08x: no ack for %u", us
, v
);
1762 LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", us
, v
, count
, us
->duplicate_ack
, us
->fast_resend_seq_nr
);
1764 } while (--bits
>= -1);
1766 if (((base
- 1 - us
->fast_resend_seq_nr
) & ACK_NR_MASK
) <= OUTGOING_BUFFER_MAX_SIZE
&& count
>= DUPLICATE_ACKS_BEFORE_RESEND
) {
1767 /* if we get enough duplicate acks to start
1768 resending, the first packet we should resend
1770 resends
[nr
++] = (base
- 1) & ACK_NR_MASK
;
1772 LOG_UTPV("0x%08x: not resending %u count:%d dup_ack:%u fast_resend_seq_nr:%u", us
, base
- 1, count
, us
->duplicate_ack
, us
->fast_resend_seq_nr
);
1782 /* don't consider the tail of 0:es to be lost packets
1783 only unacked packets with acked packets after should
1784 be considered lost */
1785 pkt
= (struct op
*)scb_get(&us
->outbuf
, v
);
1787 /* this may be an old (re-ordered) packet, and some of the
1788 packets in here may have been acked already. In which
1789 case they will not be in the send queue anymore */
1793 /* used in parse_log.py */
1794 LOG_UTP("0x%08x: Packet %u lost. Resending", us
, v
);
1799 ++us
->_stats
._rexmit
;
1801 us_send_packet(us
, pkt
);
1802 us
->fast_resend_seq_nr
= v
+ 1;
1804 /* Re-send max 4 packets. */
1810 us_maybe_decay_win(us
);
1812 us
->duplicate_ack
= count
;
1815 static void us_apply_ledbat_ccontrol(struct UTPSocket
*us
, size_t bytes_acked
, uint32 actual_delay
, int64 min_rtt
)
1818 SOCKADDR_STORAGE sa
;
1821 double window_factor
;
1822 double delay_factor
;
1825 /* the delay can never be greater than the rtt. The min_rtt
1826 variable is the RTT in microseconds */
1828 assert(min_rtt
>= 0);
1829 our_delay
= uint32_min(dh_get_value(&us
->our_hist
), (uint32
)min_rtt
);
1830 assert(our_delay
!= INT_MAX
);
1831 assert(our_delay
>= 0);
1833 sa
= psa_get_sockaddr_storage(&us
->addr
, NULL
);
1834 UTP_DelaySample((struct sockaddr
*)&sa
, our_delay
/ 1000);
1836 /* This test the connection under heavy load from foreground
1837 traffic. Pretend that our delays are very high to force the
1838 connection to use sub-packet size window sizes */
1839 /*our_delay *= 4; */
1841 /* target is microseconds */
1842 target
= CCONTROL_TARGET
;
1846 off_target
= target
- our_delay
;
1848 /* this is the same as:
1850 (min(off_target, target) / target) * (bytes_acked / max_window) * MAX_CWND_INCREASE_BYTES_PER_RTT
1852 so, it's scaling the max increase by the fraction of the window this ack represents, and the fraction
1853 of the target delay the current delay represents.
1854 The min() around off_target protects against crazy values of our_delay, which may happen when th
1855 timestamps wraps, or by just having a malicious peer sending garbage. This caps the increase
1856 of the window size to MAX_CWND_INCREASE_BYTES_PER_RTT per rtt.
1857 as for large negative numbers, this direction is already capped at the min packet size further down
1858 the min around the bytes_acked protects against the case where the window size was recently
1859 shrunk and the number of acked bytes exceeds that. This is considered no more than one full
1860 window, in order to keep the gain within sane boundries. */
1862 assert(bytes_acked
> 0);
1863 window_factor
= (double)size_t_min(bytes_acked
, us
->max_window
) / (double)size_t_max(us
->max_window
, bytes_acked
);
1864 delay_factor
= off_target
/ target
;
1865 scaled_gain
= MAX_CWND_INCREASE_BYTES_PER_RTT
* window_factor
* delay_factor
;
1867 /* since MAX_CWND_INCREASE_BYTES_PER_RTT is a cap on how much the window size (max_window)
1868 may increase per RTT, we may not increase the window size more than that proportional
1869 to the number of bytes that were acked, so that once one window has been acked (one rtt)
1870 the increase limit is not exceeded
1871 the +1. is to allow for floating point imprecision */
1872 assert(scaled_gain
<= 1. + MAX_CWND_INCREASE_BYTES_PER_RTT
* (int)size_t_min(bytes_acked
, us
->max_window
) / (double)size_t_max(us
->max_window
, bytes_acked
));
1874 if (scaled_gain
> 0 && g_current_ms
- us
->last_maxed_out_window
> 300)
1875 /* if it was more than 300 milliseconds since we tried to send a packet
1876 and stopped because we hit the max window, we're most likely rate
1877 limited (which prevents us from ever hitting the window size)
1878 if this is the case, we cannot let the max_window grow indefinitely */
1881 if (scaled_gain
+ us
->max_window
< MIN_WINDOW_SIZE
)
1882 us
->max_window
= MIN_WINDOW_SIZE
;
1884 us
->max_window
= (size_t)(us
->max_window
+ scaled_gain
);
1886 /* make sure that the congestion window is below max
1887 make sure that we don't shrink our window too small */
1888 us
->max_window
= size_t_clamp(us
->max_window
, MIN_WINDOW_SIZE
, us
->opt_sndbuf
);
1890 /* used in parse_log.py */
1891 LOG_UTP("0x%08x: actual_delay:%u our_delay:%d their_delay:%u off_target:%d max_window:%u " "delay_base:%u delay_sum:%d target_delay:%d acked_bytes:%u cur_window:%u " "scaled_gain:%f rtt:%u rate:%u quota:%d wnduser:%u rto:%u timeout:%d get_microseconds:"I64u
" " "cur_window_packets:%u packet_size:%u their_delay_base:%u their_actual_delay:%u", us
, actual_delay
, our_delay
/ 1000, dh_get_value(&us
->their_hist
) / 1000, (int)off_target
/ 1000, (uint
)(us
->max_window
), us
->our_hist
.delay_base
, (our_delay
+ dh_get_value(&us
->their_hist
)) / 1000, target
/ 1000, (uint
)bytes_acked
, (uint
)(us
->cur_window
- bytes_acked
), (float)(scaled_gain
), us
->rtt
, (uint
)(us
->max_window
* 1000 / (us
->rtt_hist
.delay_base
?us
->rtt_hist
.delay_base
:50)), us
->send_quota
/ 100, (uint
)us
->max_window_user
, us
->rto
, (int)(us
->rto_timeout
- g_current_ms
), UTP_GetMicroseconds(), us
->cur_window_packets
, (uint
)us_get_packet_size(us
), us
->their_hist
.delay_base
, us
->their_hist
.delay_base
+ dh_get_value(&us
->their_hist
));
1894 static void UTP_RegisterRecvPacket(struct UTPSocket
*conn
, size_t len
)
1897 ++conn
->_stats
._nrecv
;
1898 conn
->_stats
._nbytes_recv
+= len
;
1903 if (len
<= PACKET_SIZE_MID
)
1904 if (len
<= PACKET_SIZE_EMPTY
)
1905 _global_stats
._nraw_recv
[PACKET_SIZE_EMPTY_BUCKET
]++;
1906 else if (len
<= PACKET_SIZE_SMALL
)
1907 _global_stats
._nraw_recv
[PACKET_SIZE_SMALL_BUCKET
]++;
1909 _global_stats
._nraw_recv
[PACKET_SIZE_MID_BUCKET
]++;
1911 if (len
<= PACKET_SIZE_BIG
)
1912 _global_stats
._nraw_recv
[PACKET_SIZE_BIG_BUCKET
]++;
1914 _global_stats
._nraw_recv
[PACKET_SIZE_HUGE_BUCKET
]++;
1917 /* Process an incoming packet
1918 syn is true if this is the first packet received. It will cut off parsing
1919 as soon as the header is done */
1920 /* XXX: syn default to false */
1921 static size_t UTP_ProcessIncoming(struct UTPSocket
*conn
, byte
*packet
, size_t len
, bool syn
)
1940 uint32 prev_delay_base
;
1941 uint32 actual_delay
;
1945 UTP_RegisterRecvPacket(conn
, len
);
1947 g_current_ms
= UTP_GetMilliseconds();
1949 us_update_send_quota(conn
);
1951 pf
= (struct pf
*)packet
;
1952 pf1
= (struct pf1
*)packet
;
1953 packet_end
= packet
+ len
;
1955 if (conn
->version
== 0) {
1956 pk_seq_nr
= ntohs(pf
->seq_nr
);
1957 pk_ack_nr
= ntohs(pf
->ack_nr
);
1958 pk_flags
= pf
->flags
;
1960 pk_seq_nr
= ntohs(pf1
->seq_nr
);
1961 pk_ack_nr
= ntohs(pf1
->ack_nr
);
1962 pk_flags
= pf1_type(pf1
);
1965 if (pk_flags
>= ST_NUM_STATES
)
1968 LOG_UTPV("0x%08x: Got %s. seq_nr:%u ack_nr:%u state:%s version:%u timestamp:"I64u
" reply_micro:%u", conn
, flagnames
[pk_flags
], pk_seq_nr
, pk_ack_nr
, statenames
[conn
->state
], conn
->version
, conn
->version
== 0?(uint64
)(ntohl(pf
->tv_sec
)) * 1000000 + ntohl(pf
->tv_usec
):(uint64
)(ntohl(pf1
->tv_usec
)), conn
->version
== 0?(uint32
)(ntohl(pf
->reply_micro
)):(uint32
)(ntohl(pf1
->reply_micro
)));
1970 /* mark receipt time */
1971 time
= UTP_GetMicroseconds();
1973 /* RSTs are handled earlier, since the connid matches the send id not the recv id */
1974 assert(pk_flags
!= ST_RESET
);
1976 /* TODO: maybe send a ST_RESET if we're in CS_RESET? */
1980 /* Unpack UTP packet options
1982 data
= (byte
*)pf
+ us_get_header_size(conn
);
1983 if (us_get_header_size(conn
) > len
) {
1984 LOG_UTPV("0x%08x: Invalid packet size (less than header size)", conn
);
1987 /* Skip the extension headers */
1988 extension
= conn
->version
== 0 ? pf
->ext
: pf1
->ext
;
1989 if (extension
!= 0) {
1991 /* Verify that the packet is valid. */
1994 if ((int)(packet_end
- data
) < 0 || (int)(packet_end
- data
) < data
[-1]) {
1995 LOG_UTPV("0x%08x: Invalid len of extensions", conn
);
2000 case 1: /* Selective Acknowledgment */
2003 case 2: /* extension bits */
2004 if (data
[-1] != 8) {
2005 LOG_UTPV("0x%08x: Invalid len of extension bits header", conn
);
2008 memcpy(conn
->extensions
, data
, 8);
2009 LOG_UTPV("0x%08x: got extension bits:%02x%02x%02x%02x%02x%02x%02x%02x", conn
, conn
->extensions
[0], conn
->extensions
[1], conn
->extensions
[2], conn
->extensions
[3], conn
->extensions
[4], conn
->extensions
[5], conn
->extensions
[6], conn
->extensions
[7]);
2011 extension
= data
[-2];
2013 } while (extension
);
2016 if (conn
->state
== CS_SYN_SENT
)
2017 /* if this is a syn-ack, initialize our ack_nr
2018 to match the sequence number we got from
2020 conn
->ack_nr
= (pk_seq_nr
- 1) & SEQ_NR_MASK
;
2022 g_current_ms
= UTP_GetMilliseconds();
2023 conn
->last_got_packet
= g_current_ms
;
2028 /* seqnr is the number of packets past the expected
2029 packet this is. ack_nr is the last acked, seq_nr is the
2030 current. Subtracring 1 makes 0 mean "this is the next
2031 expected packet". */
2032 seqnr
= (pk_seq_nr
- conn
->ack_nr
- 1) & SEQ_NR_MASK
;
2034 /* Getting an invalid sequence number? */
2035 if (seqnr
>= REORDER_BUFFER_MAX_SIZE
) {
2036 if (seqnr
>= (SEQ_NR_MASK
+ 1) - REORDER_BUFFER_MAX_SIZE
&& pk_flags
!= ST_STATE
)
2037 conn
->ack_time
= g_current_ms
+ uint_min(conn
->ack_time
- g_current_ms
, DELAYED_ACK_TIME_THRESHOLD
);
2038 LOG_UTPV(" Got old Packet/Ack (%u/%u)=%u!", pk_seq_nr
, conn
->ack_nr
, seqnr
);
2042 /* Process acknowledgment
2043 acks is the number of packets that was acked */
2044 acks
= (pk_ack_nr
- (conn
->seq_nr
- 1 - conn
->cur_window_packets
)) & ACK_NR_MASK
;
2046 /* this happens when we receive an old ack nr */
2047 if (acks
> conn
->cur_window_packets
)
2050 /* if we get the same ack_nr as in the last packet
2051 increase the duplicate_ack counter, otherwise reset
2053 if (conn
->cur_window_packets
> 0) {
2054 if (pk_ack_nr
== ((conn
->seq_nr
- conn
->cur_window_packets
- 1) & ACK_NR_MASK
) && conn
->cur_window_packets
> 0) {
2055 /*++conn->duplicate_ack; */
2057 conn
->duplicate_ack
= 0;
2059 /* TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND
2060 and fast_resend_seq_nr <= ack_nr + 1
2061 resend ack_nr + 1 */
2064 /* figure out how many bytes were acked */
2067 /* the minimum rtt of all acks
2068 this is the upper limit on the delay we get back
2069 from the other peer. Our delay cannot exceed
2070 the rtt of the packet. If it does, clamp it.
2071 this is done in apply_ledbat_ccontrol() */
2072 min_rtt
= INT64_MAX
;
2074 for (i
= 0; i
< acks
; ++i
) {
2078 seq
= conn
->seq_nr
- conn
->cur_window_packets
+ i
;
2079 pkt
= (struct op
*)scb_get(&conn
->outbuf
, seq
);
2080 if (pkt
== 0 || pkt
->transmissions
== 0)
2082 assert((int)(pkt
->payload
) >= 0);
2083 acked_bytes
+= pkt
->payload
;
2084 min_rtt
= int64_min(min_rtt
, UTP_GetMicroseconds() - pkt
->time_sent
);
2087 /* count bytes acked by EACK */
2088 if (selack_ptr
!= NULL
)
2089 acked_bytes
+= us_selective_ack_bytes(conn
, (pk_ack_nr
+ 2) & ACK_NR_MASK
, selack_ptr
, selack_ptr
[-1], &min_rtt
);
2091 LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%d cur_window:%u cur_window_packets:%u relative_seqnr:%u max_window:%u min_rtt:%u rtt:%u", conn
, acks
, (uint
)acked_bytes
, conn
->seq_nr
, (uint
)conn
->cur_window
, conn
->cur_window_packets
, seqnr
, (uint
)conn
->max_window
, (uint
)(min_rtt
/ 1000), conn
->rtt
);
2093 if (conn
->version
== 0)
2094 p
= (uint64
)ntohl(pf
->tv_sec
) * 1000000 + ntohl(pf
->tv_usec
);
2096 p
= ntohl(pf1
->tv_usec
);
2098 conn
->last_measured_delay
= g_current_ms
;
2100 /* get delay in both directions
2101 record the delay to report back */
2102 their_delay
= (uint32
)(p
== 0 ? 0 : time
- p
);
2103 conn
->reply_micro
= their_delay
;
2104 prev_delay_base
= conn
->their_hist
.delay_base
;
2105 if (their_delay
!= 0)
2106 dh_add_sample(&conn
->their_hist
, their_delay
);
2108 /* if their new delay base is less than their previous one
2109 we should shift our delay base in the other direction in order
2110 to take the clock skew into account */
2111 if (prev_delay_base
!= 0 && wrapping_compare_less(conn
->their_hist
.delay_base
, prev_delay_base
))
2112 /* never adjust more than 10 milliseconds */
2113 if (prev_delay_base
- conn
->their_hist
.delay_base
<= 10000)
2114 dh_shift(&conn
->our_hist
, prev_delay_base
- conn
->their_hist
.delay_base
);
2116 actual_delay
= conn
->version
==0 ? (ntohl(pf
->reply_micro
)==INT_MAX
?0:(uint32
)(ntohl(pf
->reply_micro
))) : ((uint32
)(ntohl(pf1
->reply_micro
))==INT_MAX
?0:(uint32
)(ntohl(pf1
->reply_micro
)));
2118 /* if the actual delay is 0, it means the other end
2119 hasn't received a sample from us yet, and doesn't
2120 know what it is. We can't update out history unless
2121 we have a true measured sample */
2122 prev_delay_base
= conn
->our_hist
.delay_base
;
2123 if (actual_delay
!= 0)
2124 dh_add_sample(&conn
->our_hist
, actual_delay
);
2126 /* if our new delay base is less than our previous one
2127 we should shift the other end's delay base in the other
2128 direction in order to take the clock skew into account
2129 This is commented out because it creates bad interactions
2130 with our adjustment in the other direction. We don't really
2131 need our estimates of the other peer to be very accurate
2132 anyway. The problem with shifting here is that we're more
2133 likely shift it back later because of a low latency. This
2134 second shift back would cause us to shift our delay base
2135 which then get's into a death spiral of shifting delay bases */
2136 /* if (prev_delay_base != 0 &&
2137 wrapping_compare_less(conn->our_hist.delay_base, prev_delay_base)) {
2138 // never adjust more than 10 milliseconds
2139 if (prev_delay_base - conn->our_hist.delay_base <= 10000) {
2140 conn->their_hist.Shift(prev_delay_base - conn->our_hist.delay_base);
2145 /* if the delay estimate exceeds the RTT, adjust the base_delay to
2147 if (dh_get_value(&conn
->our_hist
) > (uint32
)(min_rtt
)) {
2148 dh_shift(&conn
->our_hist
, dh_get_value(&conn
->our_hist
) - min_rtt
);
2151 /* only apply the congestion controller on acks
2152 if we don't have a delay measurement, there's
2153 no point in invoking the congestion control */
2154 if (actual_delay
!= 0 && acked_bytes
>= 1)
2155 us_apply_ledbat_ccontrol(conn
, acked_bytes
, actual_delay
, min_rtt
);
2157 /* sanity check, the other end should never ack packets
2158 past the point we've sent */
2159 if (acks
<= conn
->cur_window_packets
) {
2160 conn
->max_window_user
= conn
->version
== 0 ? pf
->windowsize
* PACKET_SIZE
: ntohl(pf1
->windowsize
);
2162 /* If max user window is set to 0, then we startup a timer
2163 That will reset it to 1 after 15 seconds. */
2164 if (conn
->max_window_user
== 0)
2165 /* Reset max_window_user to 1 every 15 seconds. */
2166 conn
->zerowindow_time
= g_current_ms
+ 15000;
2168 /* Respond to connect message
2169 Switch to CONNECTED state. */
2170 if (conn
->state
== CS_SYN_SENT
) {
2171 conn
->state
= CS_CONNECTED
;
2172 conn
->func
.on_state(conn
->userdata
, UTP_STATE_CONNECT
);
2174 /* We've sent a fin, and everything was ACKed (including the FIN),
2175 it's safe to destroy the socket. cur_window_packets == acks
2176 means that this packet acked all the remaining packets that
2178 } else if (conn
->state
== CS_FIN_SENT
&& conn
->cur_window_packets
== acks
)
2179 conn
->state
= CS_DESTROY
;
2181 /* Update fast resend counter */
2182 if (wrapping_compare_less(conn
->fast_resend_seq_nr
, (pk_ack_nr
+ 1) & ACK_NR_MASK
))
2183 conn
->fast_resend_seq_nr
= pk_ack_nr
+ 1;
2185 LOG_UTPV("0x%08x: fast_resend_seq_nr:%u", conn
, conn
->fast_resend_seq_nr
);
2187 for (i
= 0; i
< acks
; ++i
) {
2190 ack_status
= us_ack_packet(conn
, conn
->seq_nr
- conn
->cur_window_packets
);
2191 /* if ack_status is 0, the packet was acked.
2192 if acl_stauts is 1, it means that the packet had already been acked
2193 if it's 2, the packet has not been sent yet
2194 We need to break this loop in the latter case. This could potentially
2195 happen if we get an ack_nr that does not exceed what we have stuffed
2196 into the outgoing buffer, but does exceed what we have sent */
2197 if (ack_status
== 2) {
2201 pkt
= (struct op
*)scb_get(&conn
->outbuf
, conn
->seq_nr
- conn
->cur_window_packets
);
2202 assert(pkt
->transmissions
== 0);
2206 conn
->cur_window_packets
--;
2209 if (conn
->cur_window_packets
== 0)
2210 assert(conn
->cur_window
== 0);
2213 /* packets in front of this may have been acked by a
2214 selective ack (EACK). Keep decreasing the window packet size
2215 until we hit a packet that is still waiting to be acked
2217 this is especially likely to happen when the other end
2218 has the EACK send bug older versions of uTP had */
2219 while (conn
->cur_window_packets
> 0 && !scb_get(&conn
->outbuf
, conn
->seq_nr
- conn
->cur_window_packets
))
2220 conn
->cur_window_packets
--;
2223 if (conn
->cur_window_packets
== 0)
2224 assert(conn
->cur_window
== 0);
2227 /* this invariant should always be true */
2228 assert(conn
->cur_window_packets
== 0 || scb_get(&conn
->outbuf
, conn
->seq_nr
- conn
->cur_window_packets
));
2231 if (conn
->cur_window_packets
== 1) {
2234 pkt
= (struct op
*)scb_get(&conn
->outbuf
, conn
->seq_nr
- 1);
2235 /* do we still have quota? */
2236 if (pkt
->transmissions
== 0 && (!(USE_PACKET_PACING
) || conn
->send_quota
/ 100 >= (int32
)(pkt
->length
))) {
2237 us_send_packet(conn
, pkt
);
2239 /* No need to send another ack if there is nothing to reorder. */
2240 if (conn
->reorder_count
== 0)
2245 /* Fast timeout-retry */
2246 if (conn
->fast_timeout
) {
2247 LOG_UTPV("Fast timeout %u,%u,%u?", (uint
)conn
->cur_window
, conn
->seq_nr
- conn
->timeout_seq_nr
, conn
->timeout_seq_nr
);
2248 /* if the fast_resend_seq_nr is not pointing to the oldest outstanding packet, it suggests that we've already
2249 resent the packet that timed out, and we should leave the fast-timeout mode. */
2250 if (((conn
->seq_nr
- conn
->cur_window_packets
) & ACK_NR_MASK
) != conn
->fast_resend_seq_nr
)
2251 conn
->fast_timeout
= false;
2254 /* resend the oldest packet and increment fast_resend_seq_nr
2255 to not allow another fast resend on it again */
2256 pkt
= (struct op
*)scb_get(&conn
->outbuf
, conn
->seq_nr
- conn
->cur_window_packets
);
2257 if (pkt
&& pkt
->transmissions
> 0) {
2258 LOG_UTPV("0x%08x: Packet %u fast timeout-retry.", conn
, conn
->seq_nr
- conn
->cur_window_packets
);
2260 ++conn
->_stats
._fastrexmit
;
2262 conn
->fast_resend_seq_nr
++;
2263 us_send_packet(conn
, pkt
);
2269 /* Process selective acknowledgent */
2270 if (selack_ptr
!= NULL
)
2271 us_selective_ack(conn
, pk_ack_nr
+ 2, selack_ptr
, selack_ptr
[-1]);
2273 /* this invariant should always be true */
2274 assert(conn
->cur_window_packets
== 0 || scb_get(&conn
->outbuf
, conn
->seq_nr
- conn
->cur_window_packets
));
2276 LOG_UTPV("0x%08x: acks:%d acked_bytes:%u seq_nr:%u cur_window:%u cur_window_packets:%u quota:%d", conn
, acks
, (uint
)acked_bytes
, conn
->seq_nr
, (uint
)conn
->cur_window
, conn
->cur_window_packets
, conn
->send_quota
/ 100);
2278 /* In case the ack dropped the current window below
2279 the max_window size, Mark the socket as writable */
2280 if (conn
->state
== CS_CONNECTED_FULL
&& us_is_writable(conn
, us_get_packet_size(conn
))) {
2281 conn
->state
= CS_CONNECTED
;
2282 LOG_UTPV("0x%08x: Socket writable. max_window:%u cur_window:%u quota:%d packet_size:%u", conn
, (uint
)conn
->max_window
, (uint
)conn
->cur_window
, conn
->send_quota
/ 100, (uint
)us_get_packet_size(conn
));
2283 conn
->func
.on_state(conn
->userdata
, UTP_STATE_WRITABLE
);
2286 if (pk_flags
== ST_STATE
)
2287 /* This is a state packet only. */
2290 /* The connection is not in a state that can accept data? */
2291 if (conn
->state
!= CS_CONNECTED
&& conn
->state
!= CS_CONNECTED_FULL
&& conn
->state
!= CS_FIN_SENT
)
2294 /* Is this a finalize packet? */
2295 if (pk_flags
== ST_FIN
&& !conn
->got_fin
) {
2296 LOG_UTPV("Got FIN eof_pkt:%u", pk_seq_nr
);
2297 conn
->got_fin
= true;
2298 conn
->eof_pkt
= pk_seq_nr
;
2299 /* at this point, it is possible for the
2300 other end to have sent packets with
2301 sequence numbers higher than seq_nr.
2302 if this is the case, our reorder_count
2303 is out of sync. This case is dealt with
2304 when we re-order and hit the eof_pkt.
2305 we'll just ignore any packets with
2306 sequence numbers past this */
2309 /* Getting an in-order packet? */
2313 count
= packet_end
- data
;
2314 if (count
> 0 && conn
->state
!= CS_FIN_SENT
) {
2315 LOG_UTPV("0x%08x: Got Data len:%u (rb:%u)", conn
, (uint
)count
, (uint
)conn
->func
.get_rb_size(conn
->userdata
));
2316 /* Post bytes to the upper layer */
2317 conn
->func
.on_read(conn
->userdata
, data
, count
);
2320 conn
->bytes_since_ack
+= count
;
2322 /* Check if the next packet has been received too, but waiting
2323 in the reorder buffer. */
2325 if (conn
->got_fin
&& conn
->eof_pkt
== conn
->ack_nr
) {
2326 if (conn
->state
!= CS_FIN_SENT
) {
2327 conn
->state
= CS_GOT_FIN
;
2328 conn
->rto_timeout
= g_current_ms
+ uint_min(conn
->rto
* 3, 60);
2330 LOG_UTPV("0x%08x: Posting EOF", conn
);
2331 conn
->func
.on_state(conn
->userdata
, UTP_STATE_EOF
);
2334 /* if the other end wants to close, ack immediately */
2335 us_send_ack(conn
, false);
2337 /* reorder_count is not necessarily 0 at this point.
2338 even though it is most of the time, the other end
2339 may have sent packets with higher sequence numbers
2340 than what later end up being eof_pkt
2341 since we have received all packets up to eof_pkt
2342 just ignore the ones after it. */
2343 conn
->reorder_count
= 0;
2346 /* Quick get-out in case there is nothing to reorder */
2347 if (conn
->reorder_count
== 0)
2350 /* Check if there are additional buffers in the reorder buffers
2351 that need delivery. */
2352 buf
= (byte
*)scb_get(&conn
->inbuf
, conn
->ack_nr
+1);
2355 scb_put(&conn
->inbuf
, conn
->ack_nr
+1, NULL
);
2356 count
= *(uint
*)buf
;
2357 if (count
> 0 && conn
->state
!= CS_FIN_SENT
)
2358 /* Pass the bytes to the upper layer */
2359 conn
->func
.on_read(conn
->userdata
, buf
+ sizeof(uint
), count
);
2361 conn
->bytes_since_ack
+= count
;
2363 /* Free the element from the reorder buffer */
2365 assert(conn
->reorder_count
> 0);
2366 conn
->reorder_count
--;
2369 /* start the delayed ACK timer */
2370 conn
->ack_time
= g_current_ms
+ uint_min(conn
->ack_time
- g_current_ms
, DELAYED_ACK_TIME_THRESHOLD
);
2372 /* Getting an out of order packet.
2373 The packet needs to be remembered and rearranged later. */
2375 /* if we have received a FIN packet, and the EOF-sequence number
2376 is lower than the sequence number of the packet we just received
2377 something is wrong. */
2378 if (conn
->got_fin
&& pk_seq_nr
> conn
->eof_pkt
) {
2379 LOG_UTPV("0x%08x: Got an invalid packet sequence number, past EOF " "reorder_count:%u len:%u (rb:%u)", conn
, conn
->reorder_count
, (uint
)(packet_end
- data
), (uint
)conn
->func
.get_rb_size(conn
->userdata
));
2383 /* if the sequence number is entirely off the expected
2384 one, just drop it. We can't allocate buffer space in
2385 the inbuf entirely based on untrusted input */
2386 if (seqnr
> 0x3ff) {
2387 LOG_UTPV("0x%08x: Got an invalid packet sequence number, too far off " "reorder_count:%u len:%u (rb:%u)", conn
, conn
->reorder_count
, (uint
)(packet_end
- data
), (uint
)conn
->func
.get_rb_size(conn
->userdata
));
2391 /* we need to grow the circle buffer before we
2392 check if the packet is already in here, so that
2393 we don't end up looking at an older packet (since
2394 the indices wraps around). */
2395 scb_ensure_size(&conn
->inbuf
, pk_seq_nr
+ 1, seqnr
+ 1);
2397 /* Has this packet already been received? (i.e. a duplicate)
2398 If that is the case, just discard it. */
2399 if (scb_get(&conn
->inbuf
, pk_seq_nr
) != NULL
) {
2401 ++conn
->_stats
._nduprecv
;
2406 /* Allocate memory to fit the packet that needs to re-ordered */
2407 mem
= malloc((packet_end
- data
) + sizeof(uint
));
2408 *(uint
*)mem
= (uint
)(packet_end
- data
);
2409 memcpy(mem
+ sizeof(uint
), data
, packet_end
- data
);
2411 /* Insert into reorder buffer and increment the count
2412 of # of packets to be reordered.
2413 we add one to seqnr in order to leave the last
2414 entry empty, that way the assert in send_ack
2415 is valid. we have to add one to seqnr too, in order
2416 to make the circular buffer grow around the correct
2417 point (which is conn->ack_nr + 1). */
2418 assert(scb_get(&conn
->inbuf
, pk_seq_nr
) == NULL
);
2419 assert((pk_seq_nr
& conn
->inbuf
.mask
) != ((conn
->ack_nr
+1) & conn
->inbuf
.mask
));
2420 scb_put(&conn
->inbuf
, pk_seq_nr
, mem
);
2421 conn
->reorder_count
++;
2423 LOG_UTPV("0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)", conn
, conn
->reorder_count
, (uint
)(packet_end
- data
), (uint
)conn
->func
.get_rb_size(conn
->userdata
));
2425 /* Setup so the partial ACK message will get sent immediately. */
2426 conn
->ack_time
= g_current_ms
+ uint_min(conn
->ack_time
- g_current_ms
, 1);
2429 /* If ack_time or ack_bytes indicate that we need to send and ack, send one
2430 here instead of waiting for the timer to trigger */
2431 LOG_UTPV("bytes_since_ack:%u ack_time:%d", (uint
)conn
->bytes_since_ack
, (int)(g_current_ms
- conn
->ack_time
));
2432 if (conn
->state
== CS_CONNECTED
|| conn
->state
== CS_CONNECTED_FULL
)
2433 if (conn
->bytes_since_ack
> DELAYED_ACK_BYTE_THRESHOLD
|| (int)(g_current_ms
- conn
->ack_time
) >= 0)
2434 us_send_ack(conn
, false);
2435 return (size_t)(packet_end
- data
);
2438 static __inline__
bool UTP_IsV1(struct pf1
*pf1
)
2440 return pf1_version(pf1
) == 1 && pf1_type(pf1
) < ST_NUM_STATES
&& pf1
->ext
< 3;
2443 static void UTP_Free(struct UTPSocket
*conn
)
2445 struct UTPSocket
**last_ptr
;
2446 struct UTPSocket
*last
;
2449 LOG_UTPV("0x%08x: Killing socket", conn
);
2451 conn
->func
.on_state(conn
->userdata
, UTP_STATE_DESTROYING
);
2452 UTP_SetCallbacks(conn
, NULL
, NULL
);
2454 assert(conn
->idx
< ar_GetCount(&g_utp_sockets
));
2455 assert({last_ptr
= ar_get(&g_utp_sockets
, conn
->idx
); *last_ptr
== conn
;});
2457 /* Unlink object from the global list */
2458 assert(ar_GetCount(&g_utp_sockets
) > 0);
2460 last_ptr
= ar_get(&g_utp_sockets
, ar_GetCount(&g_utp_sockets
) - 1);
2463 assert(last
->idx
< ar_GetCount(&g_utp_sockets
));
2464 assert({last_ptr
= ar_get(&g_utp_sockets
, last
->idx
); *last_ptr
== last
;});
2466 last
->idx
= conn
->idx
;
2468 memcpy(ar_get(&g_utp_sockets
, conn
->idx
), &last
, sizeof(last
));
2470 /* Decrease the count */
2471 ar_SetCount(&g_utp_sockets
, ar_GetCount(&g_utp_sockets
) - 1);
2473 /* Free all memory occupied by the socket object. */
2474 for (i
= 0; i
<= conn
->inbuf
.mask
; i
++) {
2475 free(conn
->inbuf
.elements
[i
]);
2477 for (i
= 0; i
<= conn
->outbuf
.mask
; i
++) {
2478 free(conn
->outbuf
.elements
[i
]);
2480 free(conn
->inbuf
.elements
);
2481 free(conn
->outbuf
.elements
);
2483 /* Finally free the socket object */
2487 /******************************************************************************/
2488 /******************************************************************************/
2489 /******************************************************************************/
2490 /******************************************************************************/
2491 /* public functions */
2492 /******************************************************************************/
2493 /******************************************************************************/
2494 /******************************************************************************/
2495 /******************************************************************************/
2497 /* Create a UTP socket */
2498 struct UTPSocket
*UTP_Create(SendToProc
*send_to_proc
, void *send_to_userdata
, const struct sockaddr
*addr
, socklen_t addrlen
)
2500 struct UTPSocket
*conn
;
2502 conn
= calloc(1, sizeof(*conn
));
2504 g_current_ms
= UTP_GetMilliseconds();
2506 UTP_SetCallbacks(conn
, NULL
, NULL
);
2507 dh_clear(&conn
->our_hist
);
2508 dh_clear(&conn
->their_hist
);
2510 conn
->rtt_var
= 800;
2513 conn
->max_window_user
= 255 * PACKET_SIZE
;
2514 psa_init(&conn
->addr
, (SOCKADDR_STORAGE
*)addr
, addrlen
);
2515 conn
->send_to_proc
= send_to_proc
;
2516 conn
->send_to_userdata
= send_to_userdata
;
2517 conn
->ack_time
= g_current_ms
+ 0x70000000;
2518 conn
->last_got_packet
= g_current_ms
;
2519 conn
->last_sent_packet
= g_current_ms
;
2520 conn
->last_measured_delay
= g_current_ms
+ 0x70000000;
2521 conn
->last_rwin_decay
= (int32
)(g_current_ms
) - MAX_WINDOW_DECAY
;
2522 conn
->last_send_quota
= g_current_ms
;
2523 conn
->send_quota
= PACKET_SIZE
* 100;
2524 conn
->cur_window_packets
= 0;
2525 conn
->fast_resend_seq_nr
= conn
->seq_nr
;
2527 /* default to version 1 */
2528 UTP_SetSockopt(conn
, SO_UTPVERSION
, 1);
2530 /* we need to fit one packet in the window
2531 when we start the connection */
2532 conn
->max_window
= us_get_packet_size(conn
);
2533 conn
->state
= CS_IDLE
;
2535 conn
->outbuf
.mask
= 15;
2536 conn
->inbuf
.mask
= 15;
2538 conn
->outbuf
.elements
= calloc(16, sizeof(void*));
2539 conn
->inbuf
.elements
= calloc(16, sizeof(void*));
2541 conn
->idx
= ar_Append(&g_utp_sockets
, &conn
);
2543 LOG_UTPV("0x%08x: UTP_Create", conn
);
2547 void UTP_SetCallbacks(struct UTPSocket
*conn
, struct UTPFunctionTable
*funcs
, void *userdata
)
2552 funcs
= &zero_funcs
;
2553 conn
->func
= *funcs
;
2554 conn
->userdata
= userdata
;
2557 bool UTP_SetSockopt(struct UTPSocket
* conn
, int opt
, int val
)
2564 conn
->opt_sndbuf
= val
;
2567 conn
->opt_rcvbuf
= val
;
2570 assert(conn
->state
== CS_IDLE
);
2571 if (conn
->state
!= CS_IDLE
)
2574 if (conn
->version
== 1 && val
== 0) {
2575 conn
->reply_micro
= INT_MAX
;
2576 conn
->opt_rcvbuf
= 200 * 1024;
2577 conn
->opt_sndbuf
= OUTGOING_BUFFER_MAX_SIZE
* PACKET_SIZE
;
2578 } else if (conn
->version
== 0 && val
== 1) {
2579 conn
->reply_micro
= 0;
2580 conn
->opt_rcvbuf
= 3 * 1024 * 1024 + 512 * 1024;
2581 conn
->opt_sndbuf
= conn
->opt_rcvbuf
;
2583 conn
->version
= val
;
2589 /* Try to connect to a specified host.
2590 'initial' is the number of data bytes to send in the connect packet. */
2591 void UTP_Connect(struct UTPSocket
*conn
)
2594 size_t header_ext_size
;
2601 assert(conn
->state
== CS_IDLE
);
2602 assert(conn
->cur_window_packets
== 0);
2603 assert(scb_get(&conn
->outbuf
, conn
->seq_nr
) == NULL
);
2604 assert(sizeof(struct pf1
) == 20);
2606 conn
->state
= CS_SYN_SENT
;
2608 g_current_ms
= UTP_GetMilliseconds();
2610 /* Create and send a connect message */
2611 conn_seed
= UTP_Random();
2613 /* we identify newer versions by setting the
2614 first two bytes to 0x0001 */
2615 if (conn
->version
> 0)
2616 conn_seed
&= 0xffff;
2618 /* used in parse_log.py */
2619 LOG_UTP("0x%08x: UTP_Connect conn_seed:%u packet_size:%u (B) " "target_delay:%u (ms) delay_history:%u " "delay_base_history:%u (minutes)", conn
, conn_seed
, PACKET_SIZE
, CCONTROL_TARGET
/ 1000, CUR_DELAY_SIZE
, DELAY_BASE_HISTORY
);
2621 /* Setup initial timeout timer. */
2622 conn
->retransmit_timeout
= 3000;
2623 conn
->rto_timeout
= g_current_ms
+ conn
->retransmit_timeout
;
2624 conn
->last_rcv_win
= us_get_rcv_window(conn
);
2626 conn
->conn_seed
= conn_seed
;
2627 conn
->conn_id_recv
= conn_seed
;
2628 conn
->conn_id_send
= conn_seed
+1;
2629 /* if you need compatibiltiy with 1.8.1, use this. it increases attackability though.
2630 conn->seq_nr = 1; */
2631 conn
->seq_nr
= UTP_Random();
2633 /* Create the connect packet. */
2634 header_ext_size
= us_get_header_extensions_size(conn
);
2636 pkt
= malloc(sizeof(struct op
) - 1 + header_ext_size
);
2638 p
= (struct pfe
*)pkt
->data
;
2639 p1
= (struct pfe1
*)pkt
->data
;
2641 memset(p
, 0, header_ext_size
);
2642 /* SYN packets are special, and have the receive ID in the connid field,
2643 instead of conn_id_send. */
2644 if (conn
->version
== 0) {
2645 p
->pf
.connid
= htonl(conn
->conn_id_recv
);
2647 p
->pf
.windowsize
= (byte
)DIV_ROUND_UP(conn
->last_rcv_win
, PACKET_SIZE
);
2648 p
->pf
.seq_nr
= htons(conn
->seq_nr
);
2649 p
->pf
.flags
= ST_SYN
;
2652 memset(p
->extensions
, 0, 8);
2654 pf1_version_set(&p1
->pf
, 1);
2655 pf1_type_set(&p1
->pf
, ST_SYN
);
2657 p1
->pf
.connid
= htons((uint16
)conn
->conn_id_recv
);
2658 p1
->pf
.windowsize
= htonl((uint32
)conn
->last_rcv_win
);
2659 p1
->pf
.seq_nr
= htons(conn
->seq_nr
);
2662 memset(p1
->extensions
, 0, 8);
2664 pkt
->transmissions
= 0;
2665 pkt
->length
= header_ext_size
;
2668 /*LOG_UTPV("0x%08x: Sending connect %s [%u].",
2669 conn, addrfmt(conn->addr, addrbuf), conn_seed); */
2671 /* Remember the message in the outgoing queue. */
2672 scb_ensure_size(&conn
->outbuf
, conn
->seq_nr
, conn
->cur_window_packets
);
2673 scb_put(&conn
->outbuf
, conn
->seq_nr
, pkt
);
2675 conn
->cur_window_packets
++;
2677 us_send_packet(conn
, pkt
);
2680 bool UTP_IsIncomingUTP(UTPGotIncomingConnection
*incoming_proc
, SendToProc
*send_to_proc
, void *send_to_userdata
, const byte
*buffer
, size_t len
, const struct sockaddr
*to
, socklen_t tolen
)
2693 psa_init(&addr
, (SOCKADDR_STORAGE
*)to
, tolen
);
2695 if (len
< sizeof(struct pf
) && len
< sizeof(struct pf1
)) {
2696 LOG_UTPV("recv %s len:%u too small", addrfmt(&addr
, addrbuf
), (uint
)len
);
2700 p
= (struct pf
*)buffer
;
2701 p1
= (struct pf1
*)buffer
;
2703 version
= UTP_IsV1(p1
);
2704 id
= (version
== 0) ? ntohl(p
->connid
) : (uint32
)(ntohs(p1
->connid
));
2706 if (version
== 0 && len
< sizeof(struct pf
)) {
2707 LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(&addr
, addrbuf
), (uint
)len
, version
);
2711 if (version
== 1 && len
< sizeof(struct pf1
)) {
2712 LOG_UTPV("recv %s len:%u version:%u too small", addrfmt(&addr
, addrbuf
), (uint
)len
, version
);
2716 LOG_UTPV("recv %s len:%u id:%u", addrfmt(&addr
, addrbuf
), (uint
)len
, id
);
2719 pf1
= (struct pf1
*)p
;
2722 LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id
, (uint
)ntohs(pf
->seq_nr
), (uint
)ntohs(pf
->ack_nr
));
2724 LOG_UTPV("recv id:%u seq_nr:%u ack_nr:%u", id
, (uint
)ntohs(pf1
->seq_nr
), (uint
)ntohs(pf1
->ack_nr
));
2727 flags
= version
== 0 ? pf
->flags
: pf1_type(pf1
);
2729 for (i
= 0; i
< ar_GetCount(&g_utp_sockets
); i
++) {
2730 struct UTPSocket
**conn_ptr
;
2731 struct UTPSocket
*conn
;
2733 conn_ptr
= ar_get(&g_utp_sockets
, i
);
2735 /*LOG_UTPV("Examining UTPSocket %s for %s and (seed:%u s:%u r:%u) for %u",
2736 addrfmt(conn->addr, addrbuf), addrfmt(addr, addrbuf2), conn->conn_seed, conn->conn_id_send, conn->conn_id_recv, id); */
2737 if (psa_is_not_equal(&conn
->addr
, &addr
))
2740 if (flags
== ST_RESET
&& (conn
->conn_id_send
== id
|| conn
->conn_id_recv
== id
)) {
2741 LOG_UTPV("0x%08x: recv RST for existing connection", conn
);
2742 if (!conn
->userdata
|| conn
->state
== CS_FIN_SENT
)
2743 conn
->state
= CS_DESTROY
;
2745 conn
->state
= CS_RESET
;
2746 if (conn
->userdata
) {
2749 conn
->func
.on_overhead(conn
->userdata
, false, len
+ us_get_udp_overhead(conn
), close_overhead
);
2750 err
= conn
->state
== CS_SYN_SENT
? ECONNREFUSED
: ECONNRESET
;
2751 conn
->func
.on_error(conn
->userdata
, err
);
2754 } else if (flags
!= ST_SYN
&& conn
->conn_id_recv
== id
) {
2757 LOG_UTPV("0x%08x: recv processing", conn
);
2758 read
= UTP_ProcessIncoming(conn
, (byte
*)buffer
, len
, false);
2760 conn
->func
.on_overhead(conn
->userdata
, false, (len
- read
) + us_get_udp_overhead(conn
), header_overhead
);
2765 if (flags
== ST_RESET
) {
2766 LOG_UTPV("recv RST for unknown connection");
2770 seq_nr
= version
== 0 ? ntohs(pf
->seq_nr
) : ntohs(pf1
->seq_nr
);
2771 if (flags
!= ST_SYN
) {
2775 for (i
= 0; i
< ar_GetCount(&g_rst_info
); i
++) {
2776 struct RST_Info
*cur
;
2778 cur
= ar_get(&g_rst_info
,i
);
2780 if (cur
->connid
!= id
)
2782 if (psa_is_not_equal(&cur
->addr
, &addr
))
2784 if (seq_nr
!= cur
->ack_nr
)
2786 cur
->timestamp
= UTP_GetMilliseconds();
2787 LOG_UTPV("recv not sending RST to non-SYN (stored)");
2790 if (ar_GetCount(&g_rst_info
) > RST_INFO_LIMIT
) {
2791 LOG_UTPV("recv not sending RST to non-SYN (limit at %u stored)", (uint
)ar_GetCount(&g_rst_info
));
2794 LOG_UTPV("recv send RST to non-SYN (%u stored)", (uint
)ar_GetCount(&g_rst_info
));
2795 r
= ar_Append_new(&g_rst_info
);
2799 r
->timestamp
= UTP_GetMilliseconds();
2801 us_send_rst(send_to_proc
, send_to_userdata
, &addr
, id
, seq_nr
, UTP_Random(), version
);
2805 if (incoming_proc
) {
2806 struct UTPSocket
*conn
;
2809 LOG_UTPV("Incoming connection from %s uTP version:%u", addrfmt(&addr
, addrbuf
), version
);
2811 /* Create a new UTP socket to handle this new connection */
2812 conn
= UTP_Create(send_to_proc
, send_to_userdata
, to
, tolen
);
2813 /* Need to track this value to be able to detect duplicate CONNECTs */
2814 conn
->conn_seed
= id
;
2815 /* This is value that identifies this connection for them. */
2816 conn
->conn_id_send
= id
;
2817 /* This is value that identifies this connection for us. */
2818 conn
->conn_id_recv
= id
+1;
2819 conn
->ack_nr
= seq_nr
;
2820 conn
->seq_nr
= UTP_Random();
2821 conn
->fast_resend_seq_nr
= conn
->seq_nr
;
2823 UTP_SetSockopt(conn
, SO_UTPVERSION
, version
);
2824 conn
->state
= CS_CONNECTED
;
2826 read
= UTP_ProcessIncoming(conn
, (byte
*)buffer
, len
, true);
2828 LOG_UTPV("0x%08x: recv send connect ACK", conn
);
2829 us_send_ack(conn
, true);
2831 incoming_proc(send_to_userdata
, conn
);
2833 /* we report overhead after incoming_proc, because the callbacks are setup now */
2834 if (conn
->userdata
) {
2836 conn
->func
.on_overhead(conn
->userdata
, false, (len
- read
) + us_get_udp_overhead(conn
), header_overhead
);
2838 conn
->func
.on_overhead(conn
->userdata
, true, us_get_overhead(conn
), ack_overhead
);
2844 bool UTP_HandleICMP(const byte
* buffer
, size_t len
, const struct sockaddr
*to
, socklen_t tolen
)
2853 psa_init(&addr
, (SOCKADDR_STORAGE
*)to
, tolen
);
2855 /* Want the whole packet so we have connection ID */
2856 if (len
< sizeof(struct pf
))
2859 p
= (struct pf
*)buffer
;
2860 p1
= (struct pf1
*)buffer
;
2862 version
= UTP_IsV1(p1
);
2863 id
= (version
== 0) ? ntohl(p
->connid
) : (uint32
)(ntohs(p1
->connid
));
2865 for (i
= 0; i
< ar_GetCount(&g_utp_sockets
); ++i
) {
2866 struct UTPSocket
**conn_ptr
;
2867 struct UTPSocket
*conn
;
2869 conn_ptr
= ar_get(&g_utp_sockets
,i
);
2871 if (psa_is_equal(&conn
->addr
,&addr
) && conn
->conn_id_recv
== id
) {
2872 /* Don't pass on errors for idle/closed connections */
2873 if (conn
->state
!= CS_IDLE
) {
2874 if (!conn
->userdata
|| conn
->state
== CS_FIN_SENT
) {
2875 LOG_UTPV("0x%08x: icmp packet causing socket destruction", conn
);
2876 conn
->state
= CS_DESTROY
;
2878 conn
->state
= CS_RESET
;
2879 if (conn
->userdata
) {
2882 err
= conn
->state
== CS_SYN_SENT
? ECONNREFUSED
: ECONNRESET
;
2883 LOG_UTPV("0x%08x: icmp packet causing error on socket:%d", conn
, err
);
2884 conn
->func
.on_error(conn
->userdata
, err
);
2893 /* Write bytes to the UTP socket.
2894 Returns true if the socket is still writable. */
2895 bool UTP_Write(struct UTPSocket
*conn
, size_t bytes
)
2899 #ifdef g_log_utp_verbose
2904 #ifdef g_log_utp_verbose
2908 if (conn
->state
!= CS_CONNECTED
) {
2909 LOG_UTPV("0x%08x: UTP_Write %u bytes = false (not CS_CONNECTED)", conn
, (uint
)bytes
);
2913 g_current_ms
= UTP_GetMilliseconds();
2915 us_update_send_quota(conn
);
2917 /* don't send unless it will all fit in the window */
2918 packet_size
= us_get_packet_size(conn
);
2919 num_to_send
= size_t_min(bytes
, packet_size
);
2920 while (us_is_writable(conn
, num_to_send
)) {
2921 /* Send an outgoing packet.
2922 Also add it to the outgoing of packets that have been sent but not ACKed. */
2924 if (num_to_send
== 0) {
2925 LOG_UTPV("0x%08x: UTP_Write %u bytes = true", conn
, (uint
)param
);
2928 bytes
-= num_to_send
;
2930 LOG_UTPV("0x%08x: Sending packet. seq_nr:%u ack_nr:%u wnd:%u/%u/%u rcv_win:%u size:%u quota:%d cur_window_packets:%u", conn
, conn
->seq_nr
, conn
->ack_nr
, (uint
)(conn
->cur_window
+ num_to_send
), (uint
)conn
->max_window
, (uint
)conn
->max_window_user
, (uint
)conn
->last_rcv_win
, num_to_send
, conn
->send_quota
/ 100, conn
->cur_window_packets
);
2931 us_write_outgoing_packet(conn
, num_to_send
, ST_DATA
);
2932 num_to_send
= size_t_min(bytes
, packet_size
);
2935 /* mark the socket as not being writable. */
2936 conn
->state
= CS_CONNECTED_FULL
;
2937 LOG_UTPV("0x%08x: UTP_Write %u bytes = false", conn
, (uint
)bytes
);
2941 void UTP_RBDrained(struct UTPSocket
*conn
)
2947 rcvwin
= us_get_rcv_window(conn
);
2949 if (rcvwin
> conn
->last_rcv_win
) {
2950 /* If last window was 0 send ACK immediately, otherwise should set timer */
2951 if (conn
->last_rcv_win
== 0)
2952 us_send_ack(conn
, false);
2954 conn
->ack_time
= g_current_ms
+ uint_min(conn
->ack_time
- g_current_ms
, DELAYED_ACK_TIME_THRESHOLD
);
2958 void UTP_CheckTimeouts(void)
2962 g_current_ms
= UTP_GetMilliseconds();
2964 for (i
= 0; i
< ar_GetCount(&g_rst_info
); i
++) {
2965 struct RST_Info
*cur
;
2967 cur
= ar_get(&g_rst_info
, i
);
2968 if ((int)(g_current_ms
- cur
->timestamp
) >= RST_INFO_TIMEOUT
) {
2969 ar_MoveUpLast(&g_rst_info
, i
);
2973 if (ar_GetCount(&g_rst_info
) != ar_GetAlloc(&g_rst_info
))
2974 ar_Compact(&g_rst_info
);
2976 for (i
= 0; i
!= ar_GetCount(&g_utp_sockets
); i
++) {
2977 struct UTPSocket
**conn_ptr
;
2978 struct UTPSocket
*conn
;
2980 conn_ptr
= ar_get(&g_utp_sockets
, i
);
2982 us_check_timeouts(conn
);
2984 /* Check if the object was deleted */
2985 if (conn
->state
== CS_DESTROY
) {
2986 LOG_UTPV("0x%08x: Destroying", conn
);
2993 size_t UTP_GetPacketSize(struct UTPSocket
*socket
)
2995 return us_get_packet_size(socket
);
2998 void UTP_GetPeerName(struct UTPSocket
*conn
, struct sockaddr
*addr
, socklen_t
*addrlen
)
3001 SOCKADDR_STORAGE sa
;
3005 sa
= psa_get_sockaddr_storage(&conn
->addr
, &len
);
3006 *addrlen
= socklen_t_min(len
, *addrlen
);
3007 memcpy(addr
, &sa
, *addrlen
);
3010 void UTP_GetDelays(struct UTPSocket
*conn
, int32
*ours
, int32
*theirs
, uint32
*age
)
3015 *ours
= dh_get_value(&conn
->our_hist
);
3017 *theirs
= dh_get_value(&conn
->their_hist
);
3019 *age
= g_current_ms
- conn
->last_measured_delay
;
3023 void UTP_GetStats(struct UTPSocket
*conn
, struct UTPStats
*stats
)
3027 *stats
= conn
->_stats
;
3031 void UTP_GetGlobalStats(struct UTPGlobalStats
*stats
)
3033 *stats
= _global_stats
;
3036 /* Close the UTP socket.
3037 It is not valid for the upper layer to refer to socket after it is closed.
3038 Data will keep to try being delivered after the close. */
3039 void UTP_Close(struct UTPSocket
*conn
)
3043 assert(conn
->state
!= CS_DESTROY_DELAY
&& conn
->state
!= CS_FIN_SENT
&& conn
->state
!= CS_DESTROY
);
3045 LOG_UTPV("0x%08x: UTP_Close in state:%s", conn
, statenames
[conn
->state
]);
3047 switch(conn
->state
) {
3049 case CS_CONNECTED_FULL
:
3050 conn
->state
= CS_FIN_SENT
;
3051 us_write_outgoing_packet(conn
, 0, ST_FIN
);
3055 conn
->rto_timeout
= UTP_GetMilliseconds() + uint_min(conn
->rto
* 2, 60);
3057 conn
->state
= CS_DESTROY_DELAY
;
3061 conn
->state
= CS_DESTROY
;