2 {TransmissionControll over UDP
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
20 mark:1;rate:Word4(shr 6)
25 uses MemStream
,NetAddr
,ServerLoop
,opcode
;
28 Rate
:Real; {sending rate}
29 Size
:word; {datagram size}
30 RateIF
:single; {rate increase fraction}
31 SizeIF
:single; {size increase fraction}
35 type tTCS
=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote
:tNetAddr
); {set defaults for vars}
41 MarkStart
:tMTime
; {when the mark was started}
42 MarkData
:LongWord
; {how much data sent}
43 txLastSize
:Word; {is zero if suspend}
46 isTimeout
,maxTimeout
:word;
47 Cur
:tTCSSe
; {current values}
48 Limit
:tTCSSe
; {maximum alloved}
49 Initial
:tTCSSe
; {after start/timeout}
50 minRateIF
:single; {used after rate decrease}
51 CanSend
: procedure of object; {called when transmit possible}
52 OnTimeout
: procedure of object;
53 procedure Start
; {start the transmission}
54 function MaxSize(req
:word):word;
55 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data}
56 procedure Send(var s
:tMemoryStream
);
57 procedure Done
; {unregister all callbacks}
60 procedure TransmitDelay
;
62 procedure OnCont(msg
:ServerLoop
.tSMsg
);
63 procedure OnAck(msg
:ServerLoop
.tSMsg
);
68 procedure tTCS
.Init(const iremote
:tNetAddr
);
71 SetMsgHandler(opcode
.tccont
,remote
,@OnCont
);
72 SetMsgHandler(opcode
.tceack
,remote
,@OnAck
);
73 Limit
.Rate
:=2*1024*1024*1024; {2GB}
77 Initial
.Rate
:={20*}1024;
89 procedure tTCS
.Start
; {start the transmission}
91 Assert(assigned(CanSend
) ); Assert(not remote
.isnil
);
93 mark
:=Random(256); MarkData
:=0;
96 Shedule(80,@TransmitDelay
);
97 Shedule(3000,@Timeout
);
100 function tTCS
.MaxSize(req
:word):word;
104 then result
:=round(cur
.Size
*(1+cur
.SizeIF
))
105 else result
:=cur
.Size
;
107 if result
>req
then result
:=req
;
110 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
113 s
.WriteByte(opcode
.tcdataimm
);{opcode}
115 end else if isTimeout
=0 then begin
116 s
.WriteByte(opcode
.tcdata
);{opcode}
119 s
.WriteByte(opcode
.tcdataimm
);{opcode}
124 procedure tTCS
.Send(var s
:tMemoryStream
);
126 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
127 if MarkData
=0 then begin
130 end else MarkData
:=MarkData
+s
.length
;
131 txLastSize
:=s
.length
;
135 procedure tTCS
.OnCont(msg
:ServerLoop
.tSMsg
);
143 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
144 rmark
:=msg
.stream
.ReadByte
;
146 rrate
:=msg
.stream
.ReadWord(4);
147 if (rmark
=Mark
) then begin
148 rxRate
:=(rrate
*64); {B/s}
149 txRate
:=MarkData
/((mNow
-MarkStart
)/1000{*SecsPerDay});
150 RateFill
:=rxRate
/txRate
;
151 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
153 Shedule(2000,@Timeout
);
154 if RateFill
<0.85 then begin
157 cur
.RateIF
:=minRateIF
;
159 if (txRate
/cur
.Rate
)<0.7 then begin
163 cur
.Rate
:=txRate
*(cur
.RateIF
+1);
164 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
165 else cur
.RateIF
:=cur
.RateIF
*2;
166 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
168 repeat mark
:=Random(256) until (mark
<>rMark
);
170 writeln('-> ',(Cur
.Rate
/1024):1:4,'kB/s if=',cur
.RateIF
:6:4);
172 cur
.SizeIF
:=cur
.SizeIF
/2;
177 procedure tTCS
.OnAck(msg
:ServerLoop
.tSMsg
);
182 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
183 rmark
:=msg
.stream
.ReadByte
;
185 rsize
:=msg
.stream
.ReadWord(2);
186 if rmark
<>simark
then exit
;
187 if isTimeout
>0 then begin
188 Shedule(80,@TransmitDelay
);
191 if rsize
>cur
.size
then begin
192 writeln('size inc to ',rsize
);
193 cur
.SizeIF
:=((rSize
/cur
.Size
)-1)*2;
194 if cur
.SizeIF
>Limit
.SizeIF
then Cur
.SizeIF
:=Limit
.SizeIF
;
195 if (rsize
/cur
.rate
)<=0.3 then cur
.size
:=rSize
; {use new size for all transmit}
197 if rsize
>=cur
.size
then siWait
:=false;
200 procedure tTCS
.Timeout
;
202 if txLastSize
=0 then exit
; {suspend}
204 mark
:=Random(256); MarkData
:=0;
207 if (isTimeout
>maxTimeout
)and assigned(OnTimeout
) then OnTimeout
;
208 Shedule(80,@TransmitDelay
);
209 Shedule(3000,@Timeout
);
212 procedure tTCS
.TransmitDelay
;
218 if (siMark
=0)and(cur
.Size
<limit
.Size
){and(random(10)=0)}and(istimeout
=0) then begin
221 siMark
:=random(255)+1;
226 if txLastSize
=0 then exit
; {pause}
227 if (isTimeout
>0) then exit
; {no burst, no shedule next}
228 //txwait:=txwait+(txLastSize/cur.rate);
229 txwait
:=(MarkData
/cur
.Rate
)-((mNow
-MarkStart
)/1000{*SecsPerDay});
232 until (txwait
>0.02)or(burst
>200);
233 if txwait
<0.02 then txwait
:=0.01;
234 //writeln(txwait:1:3,burst);
235 Shedule(round(txwait
*1000),@TransmitDelay
);
238 procedure tTCS
.Done
; {unregister all callbacks}
240 UnShedule(@TransmitDelay
);
242 SetMsgHandler(5,remote
,nil);
243 SetMsgHandler(7,remote
,nil);