2 {Upload Manager for brodnetd}
5 USES Chat
,opcode
,ServerLoop
,MemStream
,NetAddr
,Store1
;
16 uc
:UploadThread
.tChannel
;
18 isOpen
,Active
:boolean;
19 procedure Init(var nchat
:tChat
);
20 procedure OnMSG(msg
:tSMsg
;data
:boolean);
22 procedure DoOPEN(const fid
:tfid
);
23 procedure DoLSEG(count
:byte; base
:array of LongWord
; limit
:array of LongWord
);
24 procedure DoWEIGHT(nweight
:word);
28 procedure Close(tell
:boolean); overload
; inline;
29 procedure ChatTimeout(willwait
:LongWord
);
35 chan
:array[0..11] of ^tPrv
;
36 acks
:Word; {ack counter, for timeouts}
39 limRate
,limSize
:Single;
41 procedure Free(ac
:byte);{called by closing prv}
43 procedure Start(ac
:byte);
44 procedure Stop(ac
:byte);
45 procedure Init(const source
:tNetAddr
);
46 procedure CalcRates(rxRate
:Single);
48 procedure OnCont(msg
:tSMsg
);
49 procedure OnAck(msg
:tSMsg
);
54 procedure SendError(var ch
:tChat
;e1
,e2
:byte); forward;
55 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
; forward;
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
74 upErrHiChan (channel number too high or too many connections)
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv
.DoOPEN(const fid
:tfid
);
85 var err
:tmemorystream
;
87 writeln('Upload: ',string(ch
^.remote
),'/',chan
,' OPEN');
89 if isOpen
then uc
.oi
.Close
;
93 {if not oinfo.final then begin
97 if uc
.oi
.rc
>0 then begin
98 ch
^.StreamInit(err
,3);
99 err
.WriteByte(upFAIL
);
100 if uc
.oi
.rc
=1 then err
.WriteByte(upErrNotFound
)
101 else begin err
.WriteByte(upErrIO
); err
.WriteByte(uc
.oi
.rc
) end;
105 ch
^.StreamInit(err
,10);
106 err
.WriteByte(upINFO
);
107 err
.WriteWord(uc
.oi
.length
,4);
108 if uc
.oi
.final
then err
.WriteByte(1) else err
.WriteByte(0);
114 procedure tPrv
.DoLSEG(count
:byte; base
,limit
: array of LongWord
);
115 var err
:tmemorystream
;
120 writeln('Upload: ',string(ch
^.remote
),'/',chan
,' LSEG');
121 if not isOpen
then begin
122 ch
^.StreamInit(err
,3);
123 err
.WriteByte(upEPROTO
);
124 err
.WriteByte(upErrNotOpen
);
128 if count
=0 then begin
129 ch
^.StreamInit(err
,3);
130 err
.WriteByte(upEPROTO
);
133 writeln('ZeroCount');
138 for i
:=1 to count
do begin
139 if limit
[i
-1]=0 then begin
140 ch
^.StreamInit(err
,3);
141 err
.WriteByte(upEPROTO
);
144 writeln('ZeroLimit');
146 l
:=uc
.oi
.SegmentLength(base
[i
-1]);
149 uc
.s
[uc
.seg
].base
:=base
[i
-1];
150 if l
>limit
[i
-1] then l
:=limit
[i
-1];
153 end else if i
=1 then begin
154 {first failed, try find some seg}
155 uc
.oi
.GetSegAfter(base
[0],fb
,l
);
156 ch
^.StreamInit(err
,5);
158 err
.WriteByte(upUNAVL
);
164 ch
^.StreamInit(err
,6);
165 err
.WriteByte(upSEGOK
);
166 err
.WriteWord(tbytes
,4);
167 err
.WriteByte(uc
.seg
);
172 procedure tPrv
.DoWEIGHT(nweight
:word);
174 if nweight
<50 then nweight
:=50;
179 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
184 msg
.stream
.skip({the initcode}1);
185 if msg
.stream
.RdBufLen
<2 then begin SendError(nchat
,upErrMalformed
,0); exit
end;
186 chan
:=msg
.stream
.ReadByte
;
187 if chan
>high(tAggr
.chan
) then begin Senderror(nchat
,upErrHiChan
,chan
); exit
end;
188 ag
:=FindAggr(msg
.source
^);
189 if not assigned(ag
) then begin
191 ag
^.init(msg
.source
^);
192 end else if assigned(ag
^.chan
[chan
]) then begin SendError(nchat
,upErrChanInUse
,0); exit
end;
199 if msg
.stream
.RdBufLen
>0 {the request may be empty}
200 then pr
^.OnMSG(msg
,true);
202 procedure tPrv
.OnMSG(msg
:tSMsg
;data
:boolean);
207 var err
:tmemorystream
;
209 var lbas
:array [0..23] of LongWOrd
;
210 var llim
:array [0..23] of LongWOrd
;
213 if not data
then exit
; //todo
214 if msg
.stream
.RdBufLen
<1 then goto malformed
;
215 op
:=msg
.stream
.ReadByte
;
222 if msg
.stream
.RdBufLen
<20 then goto malformed
;
223 msg
.stream
.Read(hash
,20);
228 while (msg
.stream
.RdBufLen
>0)and(count
<=high(lbas
)) do begin
229 if msg
.stream
.RdBufLen
<8 then goto malformed
;
230 lbas
[count
]:=msg
.stream
.ReadWord(4);
231 llim
[count
]:=msg
.stream
.ReadWord(4);
234 DoLSEG(count
,lbas
,llim
);
237 if msg
.stream
.RdBufLen
<>2 then goto malformed
;
238 base
:=msg
.stream
.ReadWord(2);
244 ch
^.StreamInit(err
,3);
245 err
.WriteByte(upEPROTO
);
246 err
.WriteByte(upErrMalformed
);
250 procedure tPrv
.Init(var nchat
:tChat
);
253 ch
^.Callback
:=@OnMsg
;
254 ch
^.TMHook
:=@ChatTimeout
;
256 isOpen
:=false; Active
:=false;
257 Shedule(5000,@Close
);
260 procedure tPrv
.NotifyDone
;
261 var err
:tmemorystream
;
264 ch
^.StreamInit(err
,2);
265 err
.WriteByte(upDONE
);
273 Shedule(20000,@Close
);
277 procedure tPrv
.Start
;
280 if not active
then UnShedule(@Close
);
285 procedure tPrv
.Close(tell
:boolean);
286 var err
:tMemoryStream
;
288 assert(assigned(ch
));
290 ch
^.StreamInit(err
,1);
291 err
.WriteByte(upClose
);
295 if isOpen
then uc
.oi
.Close
;
301 FreeMem(@self
,sizeof(self
));
303 procedure tPrv
.Close
;
308 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
310 if WillWait
<8000 then exit
;
311 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' ChatTimeout');
317 procedure tAggr
.Init(const source
:tNetAddr
);
321 if assigned(Peers
) then Peers
^.prev
:=@self
;
328 limRate
:=2000*1024*1024;
331 writeln('Upload: ',string(remote
),' aggr init');
334 SetMsgHandler(opcode
.tccont
,remote
,@OnCont
);
335 SetMsgHandler(opcode
.tceack
,remote
,@OnAck
);
338 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
341 while assigned(result
) do begin
342 if assigned(result
^.next
) then assert(result
^.next
^.prev
=result
);
343 if result
^.remote
=addr
then exit
;
344 result
:=result
^.next
;
348 procedure SendError(var ch
:tChat
;e1
,e2
:byte);
359 procedure tAggr
.Free(ac
:byte);
361 assert(assigned(chan
[ac
]));
366 procedure tAggr
.Done
;
368 write('Upload: ',string(remote
),' aggr done');
369 thr
.Done
; writeln(' thrdone');
370 UnShedule(@Periodic
);
371 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
372 if assigned(next
) then next
^.prev
:=prev
;
373 SetMsgHandler(opcode
.tccont
,remote
,nil);
374 SetMsgHandler(opcode
.tceack
,remote
,nil);
375 FreeMem(@Self
,sizeof(self
));
378 procedure tAggr
.Start(ac
:byte);
380 //writeln('Upload: aggr for ',string(remote),' start chan ',ac);
381 assert(assigned(chan
[ac
]));
382 EnterCriticalSection(thr
.crit
);
383 assert(not assigned(thr
.chans
[ac
]));
384 thr
.chans
[ac
]:=@chan
[ac
]^.uc
;
385 chan
[ac
]^.uc
.wcur
:=chan
[ac
]^.uc
.weight
;
386 UnShedule(@Periodic
);
387 Shedule(700,@Periodic
);
388 if thr
.stop
or thr
.wait
then ResetMark
else {do not reset if running};
389 thr
.Start
; {wake up, or start if not running}
390 LeaveCriticalSection(thr
.crit
);
393 procedure tAggr
.Stop(ac
:byte);
395 //writeln('Upload: aggr for ',string(remote),' stop chan ',ac);
396 assert(assigned(chan
[ac
]));
397 EnterCriticalSection(thr
.crit
);
398 assert(assigned(thr
.chans
[ac
]));
400 LeaveCriticalSection(thr
.crit
);
403 procedure tAggr
.Periodic
;
407 if (thr
.stop
)or(thr
.wait
) then begin
408 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then with chan
[i
]^ do begin
409 if not active
then continue
;
410 EnterCriticalSection(thr
.crit
);
412 LeaveCriticalSection(thr
.crit
);
413 if e
then NotifyDone
;
418 if timeout
>=10 then begin
420 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then chan
[i
]^.Close
;
422 if timeout
=4 then CalcRates(512);
425 Shedule(700,@Periodic
);
432 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);