Sumting change in dht.
[brdnet.git] / Download.pas
blobb49d2512eecd6a907b631bb12626fb9fdb39d604
1 unit Download;
2 {manage downloads}
4 INTERFACE
5 uses NetAddr,
6 ServerLoop,opcode,MemStream
7 ,Store1
8 ,Chat
9 ,Sha1
12 Same idea here as upmgr. Have an tAggr for each peer reporting speed and
13 tJob for each file request saving to file and reqesting missed segments.
14 The aggr should have limit of paralele jobs. Jobs will first be linked in
15 tAggr queue and then started as slots become available.
17 After node restart, notify requester with a No-Source error. But should
18 not be forgotten. More advanced DM could consult CHK or Category the file
19 was found.
21 type
22 pDownloadJob=^tDownloadJob;
23 tDownloadJob=object
24 total,done:LongWord;
25 missc:LongWord;
26 state:(stStop,stActive,stDone,stError,stLocalError);
27 error:byte;
28 error2:byte;
29 fid:tFID;
30 procedure Start;
31 procedure Free;
32 procedure Abort;
33 protected
34 refc:byte;
35 end;
36 function GetJob(const fid:tFID):pDownloadJob;
37 function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob;
39 IMPLEMENTATION
40 {TODO: cache for chats}
41 type
42 tAggr_ptr=^tAggr;
43 tJob=object(tDownloadJob)
44 aggr:tAggr_ptr;
45 ix:byte;
46 so:tStoreObjectInfo;
47 ch:^tChat;
48 active:boolean;
49 procedure Init(const source:tNetAddr; const ifid: tFID);
50 procedure MsgDATA(base,length:LongWord; data:pointer);
51 procedure Start;
52 procedure Free;
53 procedure Abort;
54 procedure Close;
55 private
56 HighestRequestBase, RemoteSkipTo :LongWord;
57 procedure ReplyOPEN(msg:tSMsg; data:boolean);
58 procedure ReplyLSEG(msg:tSMsg; data:boolean);
59 procedure HandleFAIL(r:tMemoryStream);
60 procedure HandleEPROTO(r:tMemoryStream);
61 procedure ReplyDONE(msg:tSMsg; data:boolean);
62 procedure MakeRequest;
63 end;
64 {$I DownloadTC.pas}
66 function GetJob(const fid:tFID):pDownloadJob;
67 var a:^tAggr;
68 var i:byte;
69 var p:^pointer;
70 begin
71 p:=@AggrChain;
72 a:=AggrChain;
73 while assigned(a) do begin
74 for i:=0 to high(tAggr.Jobs) do if assigned(a^.Jobs[i]) then begin
75 if CompareWord(a^.Jobs[i],fid,10)=0 then begin
76 GetJob:=a^.Jobs[i];
77 Inc(a^.refs);
78 assert(a^.Jobs[i]^.ix=i);
79 assert(a^.Jobs[i]^.aggr=a);
80 exit;
81 end else break{for};
82 end;
83 p:=@a^.next; a:=p^;
84 end;
85 GetJob:=nil;
86 end;
88 function NewJob(const source:tNetAddr; const fid: tFID):pDownloadJob;
89 begin
90 result:=GetJob(fid);
91 if assigned(result) then exit;
92 result:=GetMem(sizeof(tJob));
93 tJob(pointer(result)^).init(source,fid);
94 end;
96 procedure tJob.Init(const source:tNetAddr; const ifid: tFID);
97 var dw:^tAggr;
98 begin
99 refc:=1;
100 error:=0;
101 error2:=0;
102 fid:=ifid;
103 active:=false;
104 so.Open(fid);
105 done:=0;
106 missc:=0;
107 if so.final then begin
108 done:=total;
109 state:=stDone;
110 aggr:=nil;
111 so.Close;
112 exit end;
113 state:=stStop;
114 {todo: initialize Done}
115 so.EnableWrite(fid);
116 if so.rc<>0 then begin
117 state:=stLocalError;
118 error:=255;
119 error2:=so.rc;
120 exit end;
121 dw:=GetAggr(source);
122 if not assigned(aggr) then begin
123 new(dw);
124 dw^.Init(source);
125 ix:=0;
126 end else begin
127 ix:=0; while (ix<high(tAggr.Jobs)) and (dw^.Jobs[ix]=nil) do inc(ix);
128 assert(dw^.Jobs[ix]=nil);{todo}
129 end;
130 aggr:=dw;
131 dw^.Jobs[ix]:=@self;
132 inc(dw^.refs);
133 if state=stStop then begin
134 New(CH);
135 ch^.Init(source);
136 end;
137 //assert( ((state=stStop)and(rlen>0))or((state=stDone)and(rlen=0)) );
138 end;
140 procedure tJob.Start;
141 var s:tMemoryStream;
142 begin
143 writeln('Download: job start');
144 assert( state=stStop );
145 state:=stActive;
146 ch^.Callback:=@ReplyOPEN;
147 ch^.streaminit(s,33);
148 {service}s.WriteByte(opcode.upFileServer);
149 {channel}s.WriteByte(ix);
150 {opcode }s.WriteByte(opcode.upOPEN);
151 {file }s.Write(fid,20);
152 {todo: request first segment}
153 ch^.Send(s);
154 end;
155 procedure tJob.ReplyOPEN(msg:tSMsg; data:boolean);
156 var r:tMemoryStream absolute msg.stream;
157 var op:byte;
158 var rsize:LongWord;
159 var rfinal:byte;
160 begin
161 if not data then exit;
162 op:=msg.stream.ReadByte;
163 {valid responses: INFO FAIL EPROTO}
164 if op=upFAIL then HandleFAIL(r)
165 else if op=upINFO then begin
166 rsize :=r.ReadWord(4);
167 rfinal :=r.readbyte;
168 writeln('INFO size=',rsize,' final=',rfinal);
169 self.total:=rsize;
170 if so.length<>rsize then begin
171 if so.length>0 then writeln('Download: warning: size mismatch!');
172 so.SetFLength(rsize);
173 end;
174 MakeRequest;
176 else HandleEPROTO(r);
177 end;
178 {Strategy for fixing holes without waiting for DONE message:
179 * stuff all small segments to LSEG
180 * put large one at end
181 * when datagrams from the large arrive, Repeat
184 procedure tJob.MakeRequest;
185 var s:tMemoryStream;
186 var b,l,trl:LongWord;
187 var cnt:byte;
188 var mst:Pointer;
189 const clim=83886080;
190 begin
191 write('Download: job MakeRequest');
192 mst:=nil;
193 trl:=0;
194 cnt:=0;
195 ch^.Callback:=@ReplyLSEG;
196 ch^.streaminit(s,180);
197 s.WriteByte(upLSEG);
198 repeat
199 {todo: skipto}
200 so.GetMiss(b,l,mst);
201 if l=0 then break;
202 if (trl+l)>clim then l:=clim-trl;
203 write(' ',b,'+',l);
204 s.WriteWord(b,4);
205 s.WriteWord(l,4);
206 inc(trl,l);
207 inc(cnt);
208 until (s.WrBufLen<8)or(trl>=clim);
209 writeln(' for ',trl,'B in ',cnt,' ',s.Length);
210 if trl=0 then begin
211 state:=stDone;
212 writeln('Verifu!!!!!');
213 so.VerifyAndReset;
214 if not so.final then begin
215 state:=stLocalError;
216 error:=254;
217 writeln('Verifu Faialed!!!!!');
218 end;
219 Aggr^.Stop(ix);
220 Close;
221 exit end;
222 ch^.Send(s);
223 if cnt>1 then HighestRequestBase:=b else HighestRequestBase:=$FFFFFFFF;
224 end;
225 procedure tJob.ReplyLSEG(msg:tSMsg; data:boolean);
226 var r:tMemoryStream absolute msg.stream;
227 var op:byte;
228 var avail:LongWord;
229 begin
230 if not data then exit;
231 op:=msg.stream.ReadByte;
232 {valid responses: SEGOK UNAVL FAIL}
233 if op=upFAIL then HandleFAIL(r)
234 else if op=upUNAVL then begin
235 avail:=msg.stream.ReadWord(4);
236 writeln('Download: job ReplyLSEG: UNAVL avail=',avail);
237 if avail=0 then begin
238 state:=stLocalError;
239 error:=253;
240 Close;
241 end else begin
242 RemoteSkipTo:=avail;
243 MakeRequest;
244 end;
246 else if op=upSEGOK then begin
247 avail:=msg.stream.ReadWord(4);
248 MissC:=avail;
249 writeln('Download: job ReplyLSEG: SEGOK avail=',avail);
250 aggr^.Start(ix);
251 ch^.Callback:=@ReplyDONE;
252 ch^.Ack;
254 else if op=upDONE then begin
255 end {ignore, done is sent async so it can hang in-flight a bit}
256 else HandleEPROTO(r);
257 end;
258 procedure tJob.ReplyDONE(msg:tSMsg; data:boolean);
259 var r:tMemoryStream absolute msg.stream;
260 var op:byte;
261 begin
262 if not data then exit;
263 op:=msg.stream.ReadByte;
264 {valid responses: DONE}
265 if op=upDONE then begin
266 writeln('Download: ReplyDONE: DONE, miss=',MissC);
267 MakeRequest;
268 end else HandleEPROTO(r);
269 end;
270 procedure tJob.HandleFAIL(r:tMemoryStream);
271 begin
272 writeln('Download: FAIL');
273 state:=stError;
275 error:=r.readByte;
276 error2:=r.readByte;
277 except end;
278 Close;
279 end;
280 procedure tJob.HandleEPROTO(r:tMemoryStream);
281 begin
282 r.Seek(r.position-1);
283 try error2:=r.ReadByte; except end;
284 writeln('Download: EPROTO ',error2);
285 state:=stLocalError;
286 error:=252;
287 Close;
288 end;
290 procedure tJob.Close;
291 var s:tMemoryStream;
292 begin
293 ch^.streaminit(s,2);
294 {opcode }s.WriteByte(opcode.upClose);
295 ch^.Send(s);
296 ch^.Close;
297 writeln('chat close');
298 aggr^.Stop(ix);
299 end;
300 procedure tJob.Abort;
301 begin
302 assert(state=stActive);
303 state:=stLocalError;
304 error:=251;
305 Close;
306 end;
308 procedure tJob.MsgDATA(base,length:LongWord; data:pointer);
309 begin
310 so.WriteSeg(base,length,data);
311 done:=done+length;
312 if MissC<=length
313 then MakeRequest
314 else dec(MissC,length);
315 if base>=HighestRequestBase then {TODO, last segment in list, MakeRequest};
316 end;
318 procedure tJob.Free;
319 begin
320 Dec(refc);
321 if refc=0 then begin
322 writeln('Download: job closing');
323 if state=stStop then Close
324 else if state=stActive then Abort;
325 {tu ja EAV, nieco s pointermi}
326 if assigned(aggr) then begin
327 aggr^.Jobs[ix]:=nil;
328 dec(aggr^.refs);
329 if aggr^.refs=0 then aggr^.Done;
330 end;
331 if state<>stDone then so.Close;
332 FreeMem(@self,sizeof(tJob));
333 end else writeln('not closing ',refc);
334 end;
336 procedure tDownloadJob.Start;
337 begin
338 tJob(pointer(@self)^).Start;
339 end;
340 procedure tDownloadJob.Abort;
341 begin
342 tJob(pointer(@self)^).Abort;
343 end;
344 procedure tDownloadJob.Free;
345 begin
346 tJob(pointer(@self)^).Free;
347 end;
348 END.