2 {TransmissionControll over UDP
4 if pass set payload to that
6 useful for file transfer, voip should only consult the current rate
7 and detect congestion based on latency
9 Used by UploadManager. 1 TC per peer.
11 Suspend: return from CanSend without sending :)
20 mark:1;rate:Word4(shr 6)
25 uses MemStream
,NetAddr
,ServerLoop
,opcode
;
28 Rate
:Real; {sending rate}
29 Size
:word; {datagram size}
30 RateIF
:single; {rate increase fraction}
31 SizeIF
:single; {size increase fraction}
35 type tTCS
=object {this is sender part}
36 {in order methods should be set/called}
37 procedure Init(const iremote
:tNetAddr
); {set defaults for vars}
41 MarkStart
:tDateTime
; {when the mark was started}
42 MarkData
:LongWord
; {how much data sent}
43 txLastSize
:Word; {is zero if suspend}
46 isTimeout
,maxTimeout
:word;
47 Cur
:tTCSSe
; {current values}
48 Limit
:tTCSSe
; {maximum alloved}
49 Initial
:tTCSSe
; {after start/timeout}
50 minRateIF
:single; {used after rate decrease}
51 CanSend
: procedure of object; {called when transmit possible}
52 OnTimeout
: procedure of object;
53 procedure Start
; {start the transmission}
54 function MaxSize(req
:word):word;
55 procedure WriteHeaders(var s
:tMemoryStream
); {add headers before the data}
56 procedure Send(var s
:tMemoryStream
);
57 procedure Done
; {unregister all callbacks}
60 procedure TransmitDelay
;
62 procedure OnCont(msg
:ServerLoop
.tSMsg
);
63 procedure OnAck(msg
:ServerLoop
.tSMsg
);
69 procedure tTCS
.Init(const iremote
:tNetAddr
);
72 SetMsgHandler(opcode
.tccont
,remote
,@OnCont
);
73 SetMsgHandler(opcode
.tceack
,remote
,@OnAck
);
74 Limit
.Rate
:=2*1024*1024*1024; {2GB}
78 Initial
.Rate
:={20*}1024;
90 procedure tTCS
.Start
; {start the transmission}
92 Assert(assigned(CanSend
) ); Assert(not remote
.isnil
);
94 mark
:=Random(256); MarkData
:=0;
97 Shedule(80,@TransmitDelay
);
98 Shedule(3000,@Timeout
);
101 function tTCS
.MaxSize(req
:word):word;
105 then result
:=round(cur
.Size
*(1+cur
.SizeIF
))
106 else result
:=cur
.Size
;
108 if result
>req
then result
:=req
;
111 procedure tTCS
.WriteHeaders(var s
:tMemoryStream
);
114 s
.WriteByte(opcode
.tcdataimm
);{opcode}
116 end else if isTimeout
=0 then begin
117 s
.WriteByte(opcode
.tcdata
);{opcode}
120 s
.WriteByte(opcode
.tcdataimm
);{opcode}
125 procedure tTCS
.Send(var s
:tMemoryStream
);
127 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
128 if MarkData
=0 then begin
131 end else MarkData
:=MarkData
+s
.length
;
132 txLastSize
:=s
.length
;
136 procedure tTCS
.OnCont(msg
:ServerLoop
.tSMsg
);
145 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
146 rmark
:=msg
.stream
.ReadByte
;
148 rrate
:=msg
.stream
.ReadWord(4);
149 if (rmark
=Mark
) then begin
151 rxRate
:=(rrate
*64); {B/s}
152 txRate
:=MarkData
/((rnow
-MarkStart
)*SecsPerDay
);
153 RateFill
:=rxRate
/txRate
;
154 write('speed: ',(rxRate
/1024):1:3,'kB/s (',(RateFill
*100):3:1,'% of ',txRate
/1024:1:3,'), ');
156 Shedule(2000,@Timeout
);
157 if RateFill
<0.85 then begin
160 cur
.RateIF
:=minRateIF
;
162 if (txRate
/cur
.Rate
)<0.7 then begin
166 cur
.Rate
:=txRate
*(cur
.RateIF
+1);
167 if cur
.Rate
>limit
.Rate
then cur
.Rate
:=Limit
.Rate
168 else cur
.RateIF
:=cur
.RateIF
*2;
169 if cur
.RateIF
>limit
.RateIF
then cur
.RateIF
:=Limit
.RateIF
;
171 repeat mark
:=Random(256) until (mark
<>rMark
);
173 writeln('-> ',(Cur
.Rate
/1024):1:4,'kB/s if=',cur
.RateIF
:6:4);
175 cur
.SizeIF
:=cur
.SizeIF
/2;
180 procedure tTCS
.OnAck(msg
:ServerLoop
.tSMsg
);
185 opcode
:=msg
.stream
.ReadByte
; {skip opcode}
186 rmark
:=msg
.stream
.ReadByte
;
188 rsize
:=msg
.stream
.ReadWord(2);
189 if rmark
<>simark
then exit
;
190 if isTimeout
>0 then begin
191 Shedule(80,@TransmitDelay
);
194 if rsize
>cur
.size
then begin
195 writeln('size inc to ',rsize
);
196 cur
.SizeIF
:=((rSize
/cur
.Size
)-1)*2;
197 if cur
.SizeIF
>Limit
.SizeIF
then Cur
.SizeIF
:=Limit
.SizeIF
;
198 if (rsize
/cur
.rate
)<=0.3 then cur
.size
:=rSize
; {use new size for all transmit}
200 if rsize
>=cur
.size
then siWait
:=false;
203 procedure tTCS
.Timeout
;
205 if txLastSize
=0 then exit
; {suspend}
207 mark
:=Random(256); MarkData
:=0;
210 if (isTimeout
>maxTimeout
)and assigned(OnTimeout
) then OnTimeout
;
211 Shedule(80,@TransmitDelay
);
212 Shedule(3000,@Timeout
);
215 procedure tTCS
.TransmitDelay
;
221 if (siMark
=0)and(cur
.Size
<limit
.Size
){and(random(10)=0)}and(istimeout
=0) then begin
224 siMark
:=random(255)+1;
229 if txLastSize
=0 then exit
; {pause}
230 if (isTimeout
>0) then exit
; {no burst, no shedule next}
231 //txwait:=txwait+(txLastSize/cur.rate);
232 txwait
:=(MarkData
/cur
.Rate
)-((Now
-MarkStart
)*SecsPerDay
);
235 until (txwait
>0.02)or(burst
>200);
236 if txwait
<0.02 then txwait
:=0.01;
237 //writeln(txwait:1:3,burst);
238 Shedule(round(txwait
*1000),@TransmitDelay
);
241 procedure tTCS
.Done
; {unregister all callbacks}
243 UnShedule(@TransmitDelay
);
245 SetMsgHandler(5,remote
,nil);
246 SetMsgHandler(7,remote
,nil);