2 {TransmissionControll over UDP
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
20 mark:1;rate:Word4(shr 6)
25 uses MemStream
,NetAddr
,ServerLoop
;
28 Rate
:Real; {sending rate}
29 Size
:word; {datagram size}
30 RateIF
:single; {rate increase fraction}
31 SizeIF
:single; {size increase fraction}
35 type tTCS
=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote
:tNetAddr
); {set defaults for vars}
41 MarkStart
:tDateTime
; {when the mark was started}
42 MarkData
:LongWord
; {how much data sent}
43 txLastSize
:Word; {is zero if suspend}
47 Cur
:tTCSSe
; {current values}
48 Limit
:tTCSSe
; {maximum alloved}
49 Initial
:tTCSSe
; {after start/timeout}
50 minRateIF
:single; {used after rate decrease}
51 CanSend
: procedure of object; {called when transmit possible}
52 procedure Start
; {start the transmission}
53 function MaxSize(req
:word):word;
54 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data}
55 procedure Send(var s
:tMemoryStream
);
58 procedure TransmitDelay
;
60 procedure OnCont(msg
:ServerLoop
.tSMsg
);
61 procedure OnAck(msg
:ServerLoop
.tSMsg
);
62 procedure Done
; {unregister all callbacks}
68 procedure tTCS
.Init(const iremote
:tNetAddr
);
71 SetMsgHandler(5,remote
,@OnCont
);
72 SetMsgHandler(7,remote
,@OnAck
);
73 Limit
.Rate
:=2*1024*1024*1024; {2GB}
77 Initial
.Rate
:={20*}1024;
87 procedure tTCS
.Start
; {start the transmission}
89 Assert(assigned(CanSend
) ); Assert(not remote
.isnil
);
91 mark
:=Random(256); MarkData
:=0;
94 Shedule(80,@TransmitDelay
);
95 Shedule(3000,@Timeout
);
98 function tTCS
.MaxSize(req
:word):word;
102 then result
:=round(cur
.Size
*(1+cur
.SizeIF
))
103 else result
:=cur
.Size
;
105 if result
>req
then result
:=req
;
108 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
111 s
.WriteByte(6);{opcode}
113 end else if isTimeout
=0 then begin
114 s
.WriteByte(4);{opcode}
117 s
.WriteByte(6);{opcode}
122 procedure tTCS
.Send(var s
:tMemoryStream
);
124 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
125 if MarkData
=0 then begin
128 end else MarkData
:=MarkData
+s
.length
;
129 txLastSize
:=s
.length
;
133 procedure tTCS
.OnCont(msg
:ServerLoop
.tSMsg
);
142 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
143 rmark
:=msg
.stream
.ReadByte
;
145 rrate
:=msg
.stream
.ReadWord(4);
146 if (rmark
=Mark
) then begin
148 rxRate
:=(rrate
*64); {B/s}
149 txRate
:=MarkData
/((rnow
-MarkStart
)*SecsPerDay
);
150 RateFill
:=rxRate
/txRate
;
151 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
153 Shedule(2000,@Timeout
);
154 if RateFill
<0.85 then begin
157 cur
.RateIF
:=minRateIF
;
159 if (txRate
/cur
.Rate
)<0.7 then begin
163 cur
.Rate
:=txRate
*(cur
.RateIF
+1);
164 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
165 else cur
.RateIF
:=cur
.RateIF
*2;
166 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
168 repeat mark
:=Random(256) until (mark
<>rMark
);
170 writeln('-> ',(Cur
.Rate
/1024):1:4,'kB/s if=',cur
.RateIF
:6:4);
172 cur
.SizeIF
:=cur
.SizeIF
/2;
177 procedure tTCS
.OnAck(msg
:ServerLoop
.tSMsg
);
182 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
183 rmark
:=msg
.stream
.ReadByte
;
185 rsize
:=msg
.stream
.ReadWord(2);
186 if rmark
<>simark
then exit
;
187 if isTimeout
>0 then begin
188 Shedule(80,@TransmitDelay
);
191 if rsize
>cur
.size
then begin
192 writeln('size inc to ',rsize
);
193 cur
.SizeIF
:=((rSize
/cur
.Size
)-1)*2;
194 if cur
.SizeIF
>Limit
.SizeIF
then Cur
.SizeIF
:=Limit
.SizeIF
;
195 if (rsize
/cur
.rate
)<=0.3 then cur
.size
:=rSize
; {use new size for all transmit}
197 if rsize
>=cur
.size
then siWait
:=false;
200 procedure tTCS
.Timeout
;
202 if txLastSize
=0 then exit
; {suspend}
204 mark
:=Random(256); MarkData
:=0;
207 Shedule(80,@TransmitDelay
);
208 Shedule(3000,@Timeout
);
211 procedure tTCS
.TransmitDelay
;
218 if (siMark
=0)and(cur
.Size
<limit
.Size
){and(random(10)=0)}and(istimeout
=0) then begin
221 siMark
:=random(255)+1;
225 if txLastSize
=0 then exit
; {pause}
226 if (isTimeout
>0) then exit
; {no burst, no shedule next}
227 //txwait:=txwait+(txLastSize/cur.rate);
228 txwait
:=(MarkData
/cur
.Rate
)-((Now
-MarkStart
)*SecsPerDay
);
231 until (txwait
>0.02)or(burst
>200);
232 if txwait
<0.02 then txwait
:=0.01;
233 //writeln(txwait:1:3,burst);
234 Shedule(round(txwait
*1000),@TransmitDelay
);
237 procedure tTCS
.Done
; {unregister all callbacks}
239 UnShedule(@TransmitDelay
);
241 SetMsgHandler(5,remote
,nil);
242 SetMsgHandler(7,remote
,nil);