Chat, message send overrides message currently in flight. Fix callback calling after...
[brdnet.git] / Download.pas
blob721a824627731edf8ac6713782cc2ce5845b7d7a
1 unit Download;
2 {manage downloads}
4 INTERFACE
5 uses NetAddr,
6 ServerLoop,opcode,MemStream
7 ,Store1
8 ,Chat
11 Same idea here as upmgr. Have an tAggr for each peer reporting speed and
12 tJob for each file request saving to file and reqesting missed segments.
13 The aggr should have limit of paralele jobs. Jobs will first be linked in
14 tAggr queue and then started as slots become available.
16 After node restart, notify requester with a No-Source error. But should
17 not be forgotten. More advanced DM could consult CHK or Category the file
18 was found.
20 type
21 pDownloadJob=^tDownloadJob;
22 tDownloadJob=object
23 total,done:LongWord;
24 missc:LongWord;
25 state:(stStop,stActive,stDone,stError);
26 error:byte;
27 error2:byte;
28 fid:tFID;
29 procedure Start;
30 procedure Abort;
31 end;
32 function GetJob(const fid:tFID):pDownloadJob;
33 function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob;
35 IMPLEMENTATION
36 type
37 tJob=object(tDownloadJob)
38 aggr:pointer;
39 ix:byte;
40 so:tStoreObjectInfo;
41 ch:tChat;
42 rofs,rlen:LongWord;{b:l of to be requested block}
43 procedure Init(const source:tNetAddr; const ifid: tFID);
44 procedure MsgDATA(base,length:LongWord; data:pointer);
45 procedure Start;
46 procedure StartTransfer(preamble:boolean);
47 procedure Abort;
48 procedure ReplyGET(msg:tSMsg; data:boolean);
49 procedure ReplyClose(msg:tSMsg; data:boolean);
50 end;
51 tAggr_ptr=^tAggr;
52 tAggr=object
53 Rate:Real;
54 ByteCnt:LongWord;
55 DgrCnt:LongWord;
56 CurMark,PrvMark:byte;
57 StartT:tMTime;
58 Jobs: array [0..15] of ^tJob;
59 refs:byte;
60 ChanOfs:byte;
61 DgrCntCheck:LongWord;
62 remote:tNetAddr;
63 next:tAggr_ptr;
64 procedure Init(const src:tNetAddr);
65 procedure MsgDATA(sz:Word;mark:byte);
66 procedure MsgIMME(sz:Word;mark:byte);
67 procedure Recv(msg:tSMsg);
68 procedure Periodic;
69 procedure Done;
70 end;
71 var AggrChain:^tAggr;
73 function GetAggr(const remote:tNetAddr):tAggr_ptr;
74 var a:^tAggr;
75 var p:^pointer;
76 begin
77 p:=@AggrChain;
78 a:=AggrChain;
79 while assigned(a) do begin
80 if a^.remote=remote then begin
81 GetAggr:=a;
82 p^:=a^.next;
83 a^.next:=AggrChain;
84 AggrChain:=a^.next;
85 exit;
86 end;
87 end;
88 GetAggr:=nil;
89 end;
90 function GetJob(const fid:tFID):pDownloadJob;
91 var a:^tAggr;
92 var i:byte;
93 var p:^pointer;
94 begin
95 p:=@AggrChain;
96 a:=AggrChain;
97 while assigned(a) do begin
98 for i:=0 to high(tAggr.Jobs) do begin
99 if CompareWord(a^.Jobs[i],fid,10)=0 then begin
100 GetJob:=a^.Jobs[i];
101 assert(a^.Jobs[i]^.ix=i);
102 assert(a^.Jobs[i]^.aggr=a);
103 p^:=a^.next;
104 a^.next:=AggrChain;
105 AggrChain:=a^.next;
106 exit;
107 end;
108 end;
109 end;
110 GetJob:=nil;
111 end;
113 function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob;
114 begin
115 result:=GetJob(fid);
116 if assigned(result) then exit;
117 result:=GetMem(sizeof(tJob));
118 tJob(pointer(result)^).init(source,fid);
119 end;
121 procedure tJob.Init(const source:tNetAddr; const ifid: tFID);
122 var dw:^tAggr;
123 begin
124 error:=0;
125 error2:=0;
126 fid:=ifid;
127 so.Open(fid);
128 done:=0;
129 missc:=0;
130 if so.final then begin
131 done:=total;
132 state:=stDone;
133 rlen:=0;
134 exit end;
135 state:=stStop{,stActive,stDone,stError)};
136 if so.rc=0 then begin
137 writeln('Download: resuming');
138 total:=so.Length;
139 so.GetMiss(rofs,rlen);
140 end else begin
141 writeln('Download: start from zero');
142 total:=0;
143 rofs:=0;
144 rlen:=8192;
145 end;
146 so.EnableWrite(fid);
147 if so.rc<>0 then begin
148 state:=stError;
149 error:=253;
150 error2:=so.rc;
151 exit end;
152 dw:=GetAggr(source);
153 if not assigned(aggr) then begin
154 new(dw);
155 dw^.Init(source);
156 ix:=0;
157 end else begin
158 ix:=0; while (ix<high(tAggr.Jobs)) and (dw^.Jobs[ix]=nil) do inc(ix);
159 assert(dw^.Jobs[ix]=nil);{todo}
160 end;
161 aggr:=dw;
162 dw^.Jobs[ix]:=@self;
163 ch.Init(source);
164 assert( ((state=stStop)and(rlen>0))or((state=stDone)and(rlen=0)) );
165 end;
166 procedure tJob.Start;
167 begin
168 assert( (state=stStop)and(rlen>0) );
169 StartTransfer(true);
170 //Shedule(20000,@HardTimeout);
171 state:={stStop,}stActive{,stDone,stError};
172 end;
173 procedure tJob.StartTransfer(preamble:boolean);
174 var s:tMemoryStream;
175 begin
176 assert( rlen>0 );
177 ch.Callback:=@ReplyGET;
178 ch.streaminit(s,33);
179 if preamble then begin
180 {service}s.WriteByte(opcode.upFileServer);
181 {channel}s.WriteByte(ix);
182 {opcode }s.WriteByte(opcode.upGET);
183 {file }s.Write(fid,20);
184 end else
185 {opcode }s.WriteByte(opcode.upSEG);
186 {basehi }s.WriteWord(0,2);
187 {base }s.WriteWord(rofs,4);
188 {limit }s.WriteWord(rlen,4);
189 ch.Send(s);
190 end;
191 procedure tJob.ReplyGET(msg:tSMsg; data:boolean);
192 var r:tMemoryStream absolute msg.stream;
193 var op:byte;
194 var rsize,rseg:LongWord;
195 var rfinal:byte;
196 begin
197 {reply from GET request}
198 write('Download: ReplyGET: ');
199 if not data then begin
200 writeln('ack');
201 end else begin
202 ch.Ack;
203 op:=msg.stream.ReadByte;
204 if op=upFAIL then begin
205 state:=stError;
207 error:=r.ReadByte;
208 error2:=r.ReadByte;
209 except end;
210 writeln('FAIL ',error,'-',error2);
212 else if op=upINFO then begin
213 {rsizehi}r.skip(2);
214 rsize :=r.ReadWord(4);
215 rfinal :=r.readbyte;
216 rseg :=r.readword(4);
217 writeln('INFO size=',rsize,' final=',rfinal,' seg=',rseg);
218 if (rsize<>so.length) then writeln('Download: length mismatch ',so.length,'->',rsize);
219 total:=rsize;
220 so.SetFLength(total);
221 //UnShedule(@HardTimeout);
222 end else if op=opcode.upDONE then begin
223 writeln('DONE');
224 assert(so.Length>0);
225 so.GetMiss(rofs,rlen);
226 if rlen=0 then begin
227 state:=stDone;
228 writeln('Download: completed');
229 end else StartTransfer(false);
230 end else begin
231 if op=upClose then writeln('CLOSE') else writeln('unknown');
232 state:=stError;
233 error:=254;
234 error2:=op;
235 end;
236 end;
237 end;
239 procedure tJob.Abort;
240 var s:tMemoryStream;
241 begin
242 assert(state=stActive);
243 ch.Callback:=@ReplyClose;
244 //Shedule(20000,@HardTimeout);
245 ch.streaminit(s,2);
246 {opcode }s.WriteByte(opcode.upClose);
247 ch.Send(s);
248 state:=stError;
249 error:=255;
250 end;
251 procedure tJob.ReplyClose(msg:tSMsg; data:boolean);
252 begin
253 writeln('Download: ReplyClose');
254 end;
256 procedure tJob.MsgDATA(base,length:LongWord; data:pointer);
257 begin
258 so.WriteSeg(base,length,data);
259 done:=done+length;
260 end;
262 procedure tAggr.Init(const src:tNetAddr);
263 begin
264 Rate:=0;
265 ByteCnt:=0;
266 DgrCnt:=0;
267 CurMark:=0;PrvMark:=0;
268 StartT:=mNow;
269 refs:=high(Jobs); while refs>0 do begin Jobs[refs]:=nil; dec(refs) end;
270 ChanOfs:=Random(255-high(Jobs));
271 DgrCntCheck:=0;
272 Shedule(5000,@Periodic);
273 remote:=src;
274 SetMsgHandler(opcode.tcdata,src,@Recv);
275 SetMsgHandler(opcode.tcdataimm,src,@Recv);
276 end;
278 procedure tAggr.Recv(msg:tSMsg);
279 var op:byte;
280 var chan:byte;
281 var mark:byte;
282 var base:DWORD;
283 begin
284 op:=msg.stream.readbyte;
285 mark:=msg.stream.readbyte;
286 if op=opcode.tcdataimm then MsgIMME(msg.length,mark);
287 MsgDATA(msg.length,mark);
288 chan:=msg.stream.readbyte;
289 base:=msg.stream.ReadWord(4);
290 if (chan<=high(Jobs))and assigned(Jobs[chan]) then Jobs[chan]^.MsgDATA(base,msg.stream.RDBufLen,msg.stream.RDBuf);
291 end;
293 procedure tAggr.MsgIMME(sz:Word; mark:byte);
294 var r:tMemoryStream;
295 var buf:array [1..4] of byte;
296 begin
297 r.Init(@buf,0,sizeof(buf));
298 r.WriteByte(opcode.tceack);
299 r.WriteByte(mark);
300 r.WriteWord(sz,2);
301 SendMessage(r.base^,r.length,remote);
302 end;
304 procedure tAggr.MsgDATA(sz:Word; mark:byte);
305 var r:tMemoryStream;
306 var rateb: DWord; {BytesPerSecond shr 6 (=64)}
307 var buf:array [1..6] of byte;
308 var delta:tMTime;
309 begin
310 if mark<>PrvMark then begin
311 if mark<>CurMark then begin
312 PrvMark:=CurMark;
313 CurMark:=mark;
314 StartT:=mNow;
315 ByteCnt:=1;
316 DgrCnt:=1;
317 end else begin Inc(ByteCnt,sz); Inc(DgrCnt) end;
318 inc(DgrCntCheck);
319 end;
320 if DgrCnt<8 then exit;
321 delta:=(mNow-StartT){*MSecsPerDay};
322 if delta<400 then exit;
323 rate:=(ByteCnt/delta)*1000;
324 writeln('Download: rate ',(rate/1024):7:1, 'kB/s');
325 rateb:=round((rate)/64);
326 StartT:=mNow;
327 ByteCnt:=1;
328 r.Init(@buf,0,sizeof(buf));
329 r.WriteByte(opcode.tccont);
330 r.WriteByte(mark);
331 r.WriteWord(rateb,4);
332 SendMessage(r.base^,r.length,remote);
333 end;
335 procedure tAggr.Periodic;
336 begin
337 if DgrCntCheck>1 then begin
338 DgrCntCheck:=0;
339 Shedule(5000,@Periodic);
340 exit end;
341 writeln('Download: Periodic check failed, unimplemented!');
342 //todo do
343 end;
345 procedure tAggr.Done;
346 begin
347 UnShedule(@Periodic);
348 end;
350 procedure tDownloadJob.Start;
351 begin
352 tJob(pointer(@self)^).Start;
353 end;
354 procedure tDownloadJob.Abort;
355 begin
356 tJob(pointer(@self)^).Abort;
357 end;
359 END.