Some changes in fileserver test that i do not want to loose.
[brdnet.git] / Download.pas
blob3f301816e72280c36251f394fc8c505635d6acfc
1 unit Download;
2 {manage downloads}
4 INTERFACE
5 uses NetAddr,
6 ServerLoop,opcode,MemStream;
8 Same idea here as upmgr. Have an tAggr for each peer reporting speed and
9 tJob for each file request saving to file and reqesting missed segments.
10 The aggr should have limit of paralele jobs. Jobs will first be linked in
11 tAggr queue and then started as slots become available.
13 After node restart, notify requester with a No-Source error. But should
14 not be forgotten. More advanced DM could consult CHK or Category the file
15 was found.
17 type
18 tJob_ptr=pointer;//^tJob;
19 tAggr=object
20 Rate:Real;
21 ByteCnt:LongWord;
22 DgrCnt:LongWord;
23 CurMark,PrvMark:byte;
24 StartT:tMTime;
25 Jobs: array [0..15] of tJob_ptr;
26 refs:byte;
27 ChanOfs:byte;
28 DgrCntCheck:LongWord;
29 remote:tNetAddr;
30 procedure Init(const src:tNetAddr);
31 procedure MsgDATA(sz:Word;mark:byte);
32 procedure MsgIMME(sz:Word;mark:byte);
33 procedure Recv(msg:tSMsg);
34 procedure Periodic;
35 procedure Done;
36 end;
37 IMPLEMENTATION
39 procedure tAggr.Init(const src:tNetAddr);
40 begin
41 Rate:=0;
42 ByteCnt:=0;
43 DgrCnt:=0;
44 CurMark:=0;PrvMark:=0;
45 StartT:=mNow;
46 refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end;
47 ChanOfs:=Random(255-high(Jobs));
48 DgrCntCheck:=0;
49 Shedule(5000,@Periodic);
50 remote:=src;
51 SetMsgHandler(opcode.tcdata,src,@Recv);
52 SetMsgHandler(opcode.tcdataimm,src,@Recv);
53 end;
55 procedure tAggr.Recv(msg:tSMsg);
56 var op:byte;
57 var mark:byte;
58 var chan:byte;
59 begin
60 op:=msg.stream.readbyte;
61 mark:=msg.stream.readbyte;
62 if op=opcode.tcdataimm then MsgIMME(msg.length,mark);
63 MsgDATA(msg.length,mark);
64 chan:=msg.stream.readbyte;
65 //delegate to others todo
66 end;
68 procedure tAggr.MsgIMME(sz:Word; mark:byte);
69 var r:tMemoryStream;
70 var buf:array [1..4] of byte;
71 begin
72 r.Init(@buf,0,sizeof(buf));
73 r.WriteByte(opcode.tceack);
74 r.WriteByte(mark);
75 r.WriteWord(sz,2);
76 SendMessage(r.base^,r.length,remote);
77 end;
79 procedure tAggr.MsgDATA(sz:Word; mark:byte);
80 var r:tMemoryStream;
81 var rateb: DWord; {BytesPerSecond shr 6 (=64)}
82 var buf:array [1..6] of byte;
83 var delta:tMTime;
84 begin
85 if mark<>PrvMark then begin
86 if mark<>CurMark then begin
87 PrvMark:=CurMark;
88 CurMark:=mark;
89 StartT:=mNow;
90 ByteCnt:=1;
91 DgrCnt:=1;
92 end else begin Inc(ByteCnt,sz); Inc(DgrCnt) end;
93 inc(DgrCntCheck);
94 end;
95 if DgrCnt<8 then exit;
96 delta:=(mNow-StartT){*MSecsPerDay};
97 if delta<400 then exit;
98 rate:=(ByteCnt/delta)*1000;
99 writeln('Rate: ',(rate/1024):7:1, '(',ByteCnt,')');
100 rateb:=round((rate)/64);
101 StartT:=mNow;
102 ByteCnt:=1;
103 r.Init(@buf,0,sizeof(buf));
104 r.WriteByte(opcode.tccont);
105 r.WriteByte(mark);
106 r.WriteWord(rateb,4);
107 SendMessage(r.base^,r.length,remote);
108 end;
110 procedure tAggr.Periodic;
111 begin
112 if DgrCntCheck>1 then begin
113 DgrCntCheck:=0;
114 Shedule(5000,@Periodic);
115 exit end;
116 writeln('Periodic check failed');
117 //todo do
118 end;
120 procedure tAggr.Done;
121 begin
122 UnShedule(@Periodic);
123 end;
125 END.