Static DHT bootstrap from cmdline opt -boot.
[brdnet.git] / DownloadTC.pas
blob47c3ededa4782020365facd449e6daaf87361b10
1 {Include file}
2 tAggr=object
3 Rate:Real;
4 ByteCnt:LongWord;
5 DgrCnt:LongWord;
6 CurMark,PrvMark:byte;
7 StartT:tMTime;
8 Jobs: array [0..15] of ^tJob;
9 refs,acnt:byte;
10 ChanOfs:byte;
11 DgrCntCheck:LongWord;
12 remote:tNetAddr;
13 next:tAggr_ptr;
14 procedure Init(const src:tNetAddr);
15 procedure MsgDATA(sz:Word;mark:byte);
16 procedure MsgIMME(sz:Word;mark:byte);
17 procedure Recv(msg:tSMsg);
18 procedure SendRate;
19 procedure Periodic;
20 procedure Done;
21 procedure Start(ix:byte);
22 procedure Stop(ix:byte);
23 end;
24 var AggrChain:^tAggr;
26 function GetAggr(const remote:tNetAddr):tAggr_ptr;
27 var a:^tAggr;
28 var p:^pointer;
29 begin
30 p:=@AggrChain;
31 a:=AggrChain;
32 while assigned(a) do begin
33 if a^.remote=remote then begin
34 GetAggr:=a;
35 p^:=a^.next;
36 a^.next:=AggrChain;
37 AggrChain:=a^.next;
38 exit;
39 end;
40 end;
41 GetAggr:=nil;
42 end;
43 procedure tAggr.Init(const src:tNetAddr);
44 begin
45 acnt:=0;
46 Rate:=0;
47 ByteCnt:=0;
48 DgrCnt:=0;
49 CurMark:=0;PrvMark:=0;
50 StartT:=mNow;
51 refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end;
52 ChanOfs:=Random(255-high(Jobs));
53 DgrCntCheck:=0;
54 remote:=src;
55 SetMsgHandler(opcode.tcdata,remote,@Recv);
56 SetMsgHandler(opcode.tcdataimm,remote,@Recv);
57 end;
59 procedure tAggr.Recv(msg:tSMsg);
60 var op:byte;
61 var chan:byte;
62 var mark:byte;
63 var base:DWORD;
64 var delta:tMTime;
65 begin
66 op:=msg.stream.readbyte;
67 mark:=msg.stream.readbyte;
68 if op=opcode.tcdataimm then MsgIMME(msg.length,mark);
69 MsgDATA(msg.length,mark);
70 if DgrCnt>=8 then begin
71 delta:=(mNow-StartT){*MSecsPerDay};
72 if delta>=400
73 then SendRate;
74 end;
75 chan:=msg.stream.readbyte;
76 base:=msg.stream.ReadWord(4);
77 if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf);
78 end;
80 procedure tAggr.MsgIMME(sz:Word; mark:byte);
81 var r:tMemoryStream;
82 var buf:array [1..4] of byte;
83 begin
84 r.Init(@buf,0,sizeof(buf));
85 r.WriteByte(opcode.tceack);
86 r.WriteByte(mark);
87 r.WriteWord(sz,2);
88 SendMessage(r.base^,r.length,remote);
89 end;
91 procedure tAggr.MsgDATA(sz:Word; mark:byte);
92 begin
93 if mark<>PrvMark then begin
94 if mark<>CurMark then begin
95 PrvMark:=CurMark;
96 CurMark:=mark;
97 StartT:=mNow;
98 ByteCnt:=1;
99 DgrCnt:=1;
100 //writeln('Mark Reset: ',PrvMark,'->',CurMark);
101 end else begin Inc(ByteCnt,sz); Inc(DgrCnt); end;
102 inc(DgrCntCheck);
103 end;
104 //writeln('Download: got ',DgrCnt,'dg,',ByteCnt,'B in ',delta,'ms');
105 end;
107 procedure tAggr.SendRate;
108 var r:tMemoryStream;
109 var rateb: DWord; {BytesPerSecond shr 6 (=64)}
110 var buf:array [1..6] of byte;
111 var delta:tMTime;
112 begin
113 delta:=(mNow-StartT){*MSecsPerDay};
114 rate:=(ByteCnt/delta)*1000;
115 //writeln('Download: rate ',(rate/1024):7:1, 'kB/s');
116 rateb:=round((rate)/64);
117 StartT:=mNow;
118 ByteCnt:=1;
119 DgrCnt:=0;
120 r.Init(@buf,0,sizeof(buf));
121 r.WriteByte(opcode.tccont);
122 r.WriteByte(CurMark);
123 r.WriteWord(rateb,4);
124 SendMessage(r.base^,r.length,remote);
125 end;
127 procedure tAggr.Periodic;
128 begin
129 if DgrCntCheck>1 then begin
130 DgrCntCheck:=0;
131 Shedule(5000,@Periodic);
132 exit end;
133 writeln('Download: Periodic check failed, unimplemented!');
134 //todo do
135 end;
137 procedure tAggr.Done;
138 var a:^tAggr;
139 var p:^pointer;
140 begin
141 p:=@AggrChain;
142 a:=AggrChain;
143 while assigned(a) do begin
144 if a=@self then begin
145 p^:=next;
146 break end;
147 end;
148 UnShedule(@Periodic);
149 SetMsgHandler(opcode.tcdata,remote,nil);
150 SetMsgHandler(opcode.tcdataimm,remote,nil);
151 FreeMem(@self,sizeof(self));
152 end;
154 procedure tAggr.Start(ix:byte);
155 begin
156 if Jobs[ix]^.active then exit;
157 if acnt=0 then Shedule(5000,@Periodic);
158 inc(acnt);
159 Jobs[ix]^.active:=true;
160 end;
162 procedure tAggr.Stop(ix:byte);
163 begin
164 if not Jobs[ix]^.active then exit;
165 dec(acnt);
166 Jobs[ix]^.active:=false;
167 if acnt=0 then begin
168 UnShedule(@Periodic);
169 SendRate;
170 end;
171 end;