6 ServerLoop
,opcode
,MemStream
11 Same idea here as upmgr. Have an tAggr for each peer reporting speed and
12 tJob for each file request saving to file and reqesting missed segments.
13 The aggr should have limit of paralele jobs. Jobs will first be linked in
14 tAggr queue and then started as slots become available.
16 After node restart, notify requester with a No-Source error. But should
17 not be forgotten. More advanced DM could consult CHK or Category the file
21 pDownloadJob
=^tDownloadJob
;
25 state
:(stStop
,stActive
,stDone
,stError
);
32 function GetJob(const fid
:tFID
):pDownloadJob
;
33 function NewJob(const source
:tNetAddr
; const fid
: tFID
):pDownloadJob
;
37 tJob
=object(tDownloadJob
)
42 rofs
,rlen
:LongWord
;{b:l of to be requested block}
43 procedure Init(const source
:tNetAddr
; const ifid
: tFID
);
44 procedure MsgDATA(base
,length
:LongWord
; data
:pointer);
46 procedure StartTransfer(preamble
:boolean);
48 procedure ReplyGET(msg
:tSMsg
; data
:boolean);
49 procedure ReplyClose(msg
:tSMsg
; data
:boolean);
58 Jobs
: array [0..15] of ^tJob
;
64 procedure Init(const src
:tNetAddr
);
65 procedure MsgDATA(sz
:Word;mark
:byte);
66 procedure MsgIMME(sz
:Word;mark
:byte);
67 procedure Recv(msg
:tSMsg
);
73 function GetAggr(const remote
:tNetAddr
):tAggr_ptr
;
79 while assigned(a
) do begin
80 if a
^.remote
=remote
then begin
90 function GetJob(const fid
:tFID
):pDownloadJob
;
97 while assigned(a
) do begin
98 for i
:=0 to high(tAggr
.Jobs
) do begin
99 if CompareWord(a
^.Jobs
[i
],fid
,10)=0 then begin
101 assert(a
^.Jobs
[i
]^.ix
=i
);
102 assert(a
^.Jobs
[i
]^.aggr
=a
);
113 function NewJob(const source
:tNetAddr
; const fid
: tFID
):pDownloadJob
;
116 if assigned(result
) then exit
;
117 result
:=GetMem(sizeof(tJob
));
118 tJob(pointer(result
)^).init(source
,fid
);
121 procedure tJob
.Init(const source
:tNetAddr
; const ifid
: tFID
);
130 if so
.final
then begin
135 state
:=stStop
{,stActive,stDone,stError)};
136 if so
.rc
=0 then begin
137 writeln('Download: resuming');
139 so
.GetMiss(rofs
,rlen
);
141 writeln('Download: start from zero');
147 if so
.rc
<>0 then begin
153 if not assigned(aggr
) then begin
158 ix
:=0; while (ix
<high(tAggr
.Jobs
)) and (dw
^.Jobs
[ix
]=nil) do inc(ix
);
159 assert(dw
^.Jobs
[ix
]=nil);{todo}
164 assert( ((state
=stStop
)and(rlen
>0))or((state
=stDone
)and(rlen
=0)) );
166 procedure tJob
.Start
;
168 assert( (state
=stStop
)and(rlen
>0) );
170 //Shedule(20000,@HardTimeout);
171 state
:={stStop,}stActive
{,stDone,stError};
173 procedure tJob
.StartTransfer(preamble
:boolean);
177 ch
.Callback
:=@ReplyGET
;
179 if preamble
then begin
180 {service}s
.WriteByte(opcode
.upFileServer
);
181 {channel}s
.WriteByte(ix
);
182 {opcode }s
.WriteByte(opcode
.upGET
);
183 {file }s
.Write(fid
,20);
185 {opcode }s
.WriteByte(opcode
.upSEG
);
186 {basehi }s
.WriteWord(0,2);
187 {base }s
.WriteWord(rofs
,4);
188 {limit }s
.WriteWord(rlen
,4);
191 procedure tJob
.ReplyGET(msg
:tSMsg
; data
:boolean);
192 var r
:tMemoryStream
absolute msg
.stream
;
194 var rsize
,rseg
:LongWord
;
197 {reply from GET request}
198 write('Download: ReplyGET: ');
199 if not data
then begin
203 op
:=msg
.stream
.ReadByte
;
204 if op
=upFAIL
then begin
210 writeln('FAIL ',error
,'-',error2
);
212 else if op
=upINFO
then begin
214 rsize
:=r
.ReadWord(4);
216 rseg
:=r
.readword(4);
217 writeln('INFO size=',rsize
,' final=',rfinal
,' seg=',rseg
);
218 if (rsize
<>so
.length
) then writeln('Download: length mismatch ',so
.length
,'->',rsize
);
220 so
.SetFLength(total
);
221 //UnShedule(@HardTimeout);
222 end else if op
=opcode
.upDONE
then begin
225 so
.GetMiss(rofs
,rlen
);
228 writeln('Download: completed');
229 end else StartTransfer(false);
231 if op
=upClose
then writeln('CLOSE') else writeln('unknown');
239 procedure tJob
.Abort
;
242 assert(state
=stActive
);
243 ch
.Callback
:=@ReplyClose
;
244 //Shedule(20000,@HardTimeout);
246 {opcode }s
.WriteByte(opcode
.upClose
);
251 procedure tJob
.ReplyClose(msg
:tSMsg
; data
:boolean);
253 writeln('Download: ReplyClose');
256 procedure tJob
.MsgDATA(base
,length
:LongWord
; data
:pointer);
258 so
.WriteSeg(base
,length
,data
);
262 procedure tAggr
.Init(const src
:tNetAddr
);
267 CurMark
:=0;PrvMark
:=0;
269 refs
:=high(Jobs
); while refs
>0 do begin Jobs
[refs
]:=nil; dec(refs
) end;
270 ChanOfs
:=Random(255-high(Jobs
));
272 Shedule(5000,@Periodic
);
274 SetMsgHandler(opcode
.tcdata
,src
,@Recv
);
275 SetMsgHandler(opcode
.tcdataimm
,src
,@Recv
);
278 procedure tAggr
.Recv(msg
:tSMsg
);
284 op
:=msg
.stream
.readbyte
;
285 mark
:=msg
.stream
.readbyte
;
286 if op
=opcode
.tcdataimm
then MsgIMME(msg
.length
,mark
);
287 MsgDATA(msg
.length
,mark
);
288 chan
:=msg
.stream
.readbyte
;
289 base
:=msg
.stream
.ReadWord(4);
290 if (chan
<=high(Jobs
))and assigned(Jobs
[chan
]) then Jobs
[chan
]^.MsgDATA(base
,msg
.stream
.RDBufLen
,msg
.stream
.RDBuf
);
293 procedure tAggr
.MsgIMME(sz
:Word; mark
:byte);
295 var buf
:array [1..4] of byte;
297 r
.Init(@buf
,0,sizeof(buf
));
298 r
.WriteByte(opcode
.tceack
);
301 SendMessage(r
.base
^,r
.length
,remote
);
304 procedure tAggr
.MsgDATA(sz
:Word; mark
:byte);
306 var rateb
: DWord
; {BytesPerSecond shr 6 (=64)}
307 var buf
:array [1..6] of byte;
310 if mark
<>PrvMark
then begin
311 if mark
<>CurMark
then begin
317 end else begin Inc(ByteCnt
,sz
); Inc(DgrCnt
) end;
320 if DgrCnt
<8 then exit
;
321 delta
:=(mNow
-StartT
){*MSecsPerDay};
322 if delta
<400 then exit
;
323 rate
:=(ByteCnt
/delta
)*1000;
324 writeln('Download: rate ',(rate
/1024):7:1, 'kB/s');
325 rateb
:=round((rate
)/64);
328 r
.Init(@buf
,0,sizeof(buf
));
329 r
.WriteByte(opcode
.tccont
);
331 r
.WriteWord(rateb
,4);
332 SendMessage(r
.base
^,r
.length
,remote
);
335 procedure tAggr
.Periodic
;
337 if DgrCntCheck
>1 then begin
339 Shedule(5000,@Periodic
);
341 writeln('Download: Periodic check failed, unimplemented!');
345 procedure tAggr
.Done
;
347 UnShedule(@Periodic
);
350 procedure tDownloadJob
.Start
;
352 tJob(pointer(@self
)^).Start
;
354 procedure tDownloadJob
.Abort
;
356 tJob(pointer(@self
)^).Abort
;