6 ServerLoop
,opcode
,MemStream
12 Same idea here as upmgr. Have an tAggr for each peer reporting speed and
13 tJob for each file request saving to file and reqesting missed segments.
14 The aggr should have limit of paralele jobs. Jobs will first be linked in
15 tAggr queue and then started as slots become available.
17 After node restart, notify requester with a No-Source error. But should
18 not be forgotten. More advanced DM could consult CHK or Category the file
22 pDownloadJob
=^tDownloadJob
;
26 state
:(stStop
,stActive
,stDone
,stError
,stLocalError
);
36 function GetJob(const fid
:tFID
):pDownloadJob
;
37 function NewJob(const source
:tNetAddr
; const fid
: tFID
):pDownloadJob
;
40 {TODO: cache for chats}
43 tJob
=object(tDownloadJob
)
49 procedure Init(const source
:tNetAddr
; const ifid
: tFID
);
50 procedure MsgDATA(base
,length
:LongWord
; data
:pointer);
56 HighestRequestBase
, RemoteSkipTo
:LongWord
;
57 procedure ReplyOPEN(msg
:tSMsg
; data
:boolean);
58 procedure ReplyLSEG(msg
:tSMsg
; data
:boolean);
59 procedure HandleFAIL(r
:tMemoryStream
);
60 procedure HandleEPROTO(r
:tMemoryStream
);
61 procedure ReplyDONE(msg
:tSMsg
; data
:boolean);
62 procedure MakeRequest
;
66 function GetJob(const fid
:tFID
):pDownloadJob
;
73 while assigned(a
) do begin
74 for i
:=0 to high(tAggr
.Jobs
) do if assigned(a
^.Jobs
[i
]) then begin
75 if CompareWord(a
^.Jobs
[i
],fid
,10)=0 then begin
78 assert(a
^.Jobs
[i
]^.ix
=i
);
79 assert(a
^.Jobs
[i
]^.aggr
=a
);
88 function NewJob(const source
:tNetAddr
; const fid
: tFID
):pDownloadJob
;
91 if assigned(result
) then exit
;
92 result
:=GetMem(sizeof(tJob
));
93 tJob(pointer(result
)^).init(source
,fid
);
96 procedure tJob
.Init(const source
:tNetAddr
; const ifid
: tFID
);
107 if so
.final
then begin
114 {todo: initialize Done}
116 if so
.rc
<>0 then begin
122 if not assigned(aggr
) then begin
127 ix
:=0; while (ix
<high(tAggr
.Jobs
)) and (dw
^.Jobs
[ix
]=nil) do inc(ix
);
128 assert(dw
^.Jobs
[ix
]=nil);{todo}
133 if state
=stStop
then begin
137 //assert( ((state=stStop)and(rlen>0))or((state=stDone)and(rlen=0)) );
140 procedure tJob
.Start
;
143 writeln('Download: job start');
144 assert( state
=stStop
);
146 ch
^.Callback
:=@ReplyOPEN
;
147 ch
^.streaminit(s
,33);
148 {service}s
.WriteByte(opcode
.upFileServer
);
149 {channel}s
.WriteByte(ix
);
150 {opcode }s
.WriteByte(opcode
.upOPEN
);
151 {file }s
.Write(fid
,20);
152 {todo: request first segment}
155 procedure tJob
.ReplyOPEN(msg
:tSMsg
; data
:boolean);
156 var r
:tMemoryStream
absolute msg
.stream
;
161 if not data
then exit
;
162 op
:=msg
.stream
.ReadByte
;
163 {valid responses: INFO FAIL EPROTO}
164 if op
=upFAIL
then HandleFAIL(r
)
165 else if op
=upINFO
then begin
166 rsize
:=r
.ReadWord(4);
168 writeln('INFO size=',rsize
,' final=',rfinal
);
170 if so
.length
<>rsize
then begin
171 if so
.length
>0 then writeln('Download: warning: size mismatch!');
172 so
.SetFLength(rsize
);
176 else HandleEPROTO(r
);
178 {Strategy for fixing holes without waiting for DONE message:
179 * stuff all small segments to LSEG
180 * put large one at end
181 * when datagrams from the large arrive, Repeat
184 procedure tJob
.MakeRequest
;
186 var b
,l
,trl
:LongWord
;
191 write('Download: job MakeRequest');
195 ch
^.Callback
:=@ReplyLSEG
;
196 ch
^.streaminit(s
,180);
202 if (trl
+l
)>clim
then l
:=clim
-trl
;
208 until (s
.WrBufLen
<8)or(trl
>=clim
);
209 writeln(' for ',trl
,'B in ',cnt
,' ',s
.Length
);
212 writeln('Verifu!!!!!');
214 if not so
.final
then begin
217 writeln('Verifu Faialed!!!!!');
223 if cnt
>1 then HighestRequestBase
:=b
else HighestRequestBase
:=$FFFFFFFF;
225 procedure tJob
.ReplyLSEG(msg
:tSMsg
; data
:boolean);
226 var r
:tMemoryStream
absolute msg
.stream
;
230 if not data
then exit
;
231 op
:=msg
.stream
.ReadByte
;
232 {valid responses: SEGOK UNAVL FAIL}
233 if op
=upFAIL
then HandleFAIL(r
)
234 else if op
=upUNAVL
then begin
235 avail
:=msg
.stream
.ReadWord(4);
236 writeln('Download: job ReplyLSEG: UNAVL avail=',avail
);
237 if avail
=0 then begin
246 else if op
=upSEGOK
then begin
247 avail
:=msg
.stream
.ReadWord(4);
249 writeln('Download: job ReplyLSEG: SEGOK avail=',avail
);
251 ch
^.Callback
:=@ReplyDONE
;
254 else if op
=upDONE
then begin
255 end {ignore, done is sent async so it can hang in-flight a bit}
256 else HandleEPROTO(r
);
258 procedure tJob
.ReplyDONE(msg
:tSMsg
; data
:boolean);
259 var r
:tMemoryStream
absolute msg
.stream
;
262 if not data
then exit
;
263 op
:=msg
.stream
.ReadByte
;
264 {valid responses: DONE}
265 if op
=upDONE
then begin
266 writeln('Download: ReplyDONE: DONE, miss=',MissC
);
268 end else HandleEPROTO(r
);
270 procedure tJob
.HandleFAIL(r
:tMemoryStream
);
272 writeln('Download: FAIL');
280 procedure tJob
.HandleEPROTO(r
:tMemoryStream
);
282 r
.Seek(r
.position
-1);
283 try error2
:=r
.ReadByte
; except end;
284 writeln('Download: EPROTO ',error2
);
290 procedure tJob
.Close
;
294 {opcode }s
.WriteByte(opcode
.upClose
);
297 writeln('chat close');
300 procedure tJob
.Abort
;
302 assert(state
=stActive
);
308 procedure tJob
.MsgDATA(base
,length
:LongWord
; data
:pointer);
310 so
.WriteSeg(base
,length
,data
);
314 else dec(MissC
,length
);
315 if base
>=HighestRequestBase
then {TODO, last segment in list, MakeRequest};
322 writeln('Download: job closing');
323 if state
=stStop
then Close
324 else if state
=stActive
then Abort
;
325 {tu ja EAV, nieco s pointermi}
326 if assigned(aggr
) then begin
329 if aggr
^.refs
=0 then aggr
^.Done
;
331 if state
<>stDone
then so
.Close
;
332 FreeMem(@self
,sizeof(tJob
));
333 end else writeln('not closing ',refc
);
336 procedure tDownloadJob
.Start
;
338 tJob(pointer(@self
)^).Start
;
340 procedure tDownloadJob
.Abort
;
342 tJob(pointer(@self
)^).Abort
;
344 procedure tDownloadJob
.Free
;
346 tJob(pointer(@self
)^).Free
;