2 {Upload Manager for brodnetd}
5 USES Chat
,opcode
,ServerLoop
,MemStream
,NetAddr
,Store1
;
16 uc
:UploadThread
.tChannel
;
18 isOpen
,Active
:boolean;
19 procedure Init(var nchat
:tChat
);
20 procedure OnMSG(msg
:tSMsg
;data
:boolean);
22 procedure DoOPEN(const fid
:tfid
);
23 procedure DoLSEG(count
:byte; base
:array of LongWord
; limit
:array of LongWord
);
24 procedure DoWEIGHT(nweight
:word);
28 procedure Close(tell
:boolean); overload
; inline;
29 procedure ChatTimeout(willwait
:LongWord
);
35 chan
:array[0..11] of ^tPrv
;
36 acks
:Word; {ack counter, for timeouts}
39 limRate
,limSize
:Single;
41 procedure Free(ac
:byte);{called by closing prv}
43 procedure Start(ac
:byte);
44 procedure Stop(ac
:byte);
45 procedure Init(const source
:tNetAddr
);
46 procedure CalcRates(rxRate
:Single);
48 procedure OnCont(msg
:tSMsg
);
49 procedure OnAck(msg
:tSMsg
);
54 procedure SendError(var ch
:tChat
;e1
,e2
:byte); forward;
55 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
; forward;
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
74 upErrHiChan (channel number too high or too many connections)
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv
.DoOPEN(const fid
:tfid
);
85 var err
:tmemorystream
;
87 writeln('Upload: ',string(ch
^.remote
),'/',chan
,' OPEN');
89 if isOpen
then uc
.oi
.Close
;
93 {if not oinfo.final then begin
97 if uc
.oi
.rc
>0 then begin
98 ch
^.StreamInit(err
,3);
99 err
.WriteByte(upFAIL
);
100 if uc
.oi
.rc
=1 then err
.WriteByte(upErrNotFound
)
101 else begin err
.WriteByte(upErrIO
); err
.WriteByte(uc
.oi
.rc
) end;
105 ch
^.StreamInit(err
,10);
106 err
.WriteByte(upINFO
);
107 err
.WriteWord(uc
.oi
.length
,4);
108 if uc
.oi
.final
then err
.WriteByte(1) else err
.WriteByte(0);
114 procedure tPrv
.DoLSEG(count
:byte; base
,limit
: array of LongWord
);
115 var err
:tmemorystream
;
120 write('Upload: ',string(ch
^.remote
),'/',chan
,' LSEG ');
121 if not isOpen
then begin
122 ch
^.StreamInit(err
,3);
123 err
.WriteByte(upEPROTO
);
124 err
.WriteByte(upErrNotOpen
);
128 if count
=0 then begin
129 ch
^.StreamInit(err
,3);
130 err
.WriteByte(upEPROTO
);
133 writeln('ZeroCount');
138 for i
:=1 to count
do begin
139 if limit
[i
-1]=0 then begin
140 ch
^.StreamInit(err
,3);
141 err
.WriteByte(upEPROTO
);
144 writeln('ZeroLimit');
146 l
:=uc
.oi
.SegmentLength(base
[i
-1]);
149 uc
.s
[uc
.seg
].base
:=base
[i
-1];
150 if l
>limit
[i
-1] then l
:=limit
[i
-1];
153 end else if i
=1 then begin
154 {first failed, try find some seg}
155 uc
.oi
.GetSegAfter(base
[0],fb
,l
);
156 ch
^.StreamInit(err
,5);
158 err
.WriteByte(upUNAVL
);
164 ch
^.StreamInit(err
,6);
165 err
.WriteByte(upSEGOK
);
166 err
.WriteWord(tbytes
,4);
167 err
.WriteByte(uc
.seg
);
173 procedure tPrv
.DoWEIGHT(nweight
:word);
175 if nweight
<50 then nweight
:=50;
180 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
185 msg
.stream
.skip({the initcode}1);
186 if msg
.stream
.RdBufLen
<2 then begin SendError(nchat
,upErrMalformed
,0); exit
end;
187 chan
:=msg
.stream
.ReadByte
;
188 if chan
>high(tAggr
.chan
) then begin Senderror(nchat
,upErrHiChan
,chan
); exit
end;
189 ag
:=FindAggr(msg
.source
^);
190 if not assigned(ag
) then begin
192 ag
^.init(msg
.source
^);
193 end else if assigned(ag
^.chan
[chan
]) then begin SendError(nchat
,upErrChanInUse
,0); exit
end;
200 if msg
.stream
.RdBufLen
>0 {the request may be empty}
201 then pr
^.OnMSG(msg
,true);
203 procedure tPrv
.OnMSG(msg
:tSMsg
;data
:boolean);
208 var err
:tmemorystream
;
210 var lbas
:array [0..23] of LongWOrd
;
211 var llim
:array [0..23] of LongWOrd
;
214 if not data
then exit
; //todo
215 if msg
.stream
.RdBufLen
<1 then goto malformed
;
216 op
:=msg
.stream
.ReadByte
;
223 if msg
.stream
.RdBufLen
<20 then goto malformed
;
224 msg
.stream
.Read(hash
,20);
229 while (msg
.stream
.RdBufLen
>0)and(count
<=high(lbas
)) do begin
230 if msg
.stream
.RdBufLen
<8 then goto malformed
;
231 lbas
[count
]:=msg
.stream
.ReadWord(4);
232 llim
[count
]:=msg
.stream
.ReadWord(4);
235 DoLSEG(count
,lbas
,llim
);
238 if msg
.stream
.RdBufLen
<>2 then goto malformed
;
239 base
:=msg
.stream
.ReadWord(2);
245 ch
^.StreamInit(err
,3);
246 err
.WriteByte(upEPROTO
);
247 err
.WriteByte(upErrMalformed
);
251 procedure tPrv
.Init(var nchat
:tChat
);
254 ch
^.Callback
:=@OnMsg
;
255 ch
^.TMHook
:=@ChatTimeout
;
257 isOpen
:=false; Active
:=false;
258 Shedule(5000,@Close
);
261 procedure tPrv
.NotifyDone
;
262 var err
:tmemorystream
;
265 ch
^.StreamInit(err
,2);
266 err
.WriteByte(upDONE
);
274 Shedule(20000,@Close
);
278 procedure tPrv
.Start
;
281 if not active
then UnShedule(@Close
);
286 procedure tPrv
.Close(tell
:boolean);
287 var err
:tMemoryStream
;
289 assert(assigned(ch
));
291 ch
^.StreamInit(err
,1);
292 err
.WriteByte(upClose
);
296 if isOpen
then uc
.oi
.Close
;
302 FreeMem(@self
,sizeof(self
));
304 procedure tPrv
.Close
;
309 procedure tPrv
.ChatTimeout(willwait
:LongWord
);
311 if WillWait
<8000 then exit
;
312 writeln('Upload: prv for ',string(ch
^.remote
),'/',chan
,' ChatTimeout');
318 procedure tAggr
.Init(const source
:tNetAddr
);
322 if assigned(Peers
) then Peers
^.prev
:=@self
;
329 limRate
:=2000*1024*1024;
332 writeln('Upload: ',string(remote
),' aggr init');
335 SetMsgHandler(opcode
.tccont
,remote
,@OnCont
);
336 SetMsgHandler(opcode
.tceack
,remote
,@OnAck
);
339 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
342 while assigned(result
) do begin
343 if assigned(result
^.next
) then assert(result
^.next
^.prev
=result
);
344 if result
^.remote
=addr
then exit
;
345 result
:=result
^.next
;
349 procedure SendError(var ch
:tChat
;e1
,e2
:byte);
360 procedure tAggr
.Free(ac
:byte);
362 assert(assigned(chan
[ac
]));
367 procedure tAggr
.Done
;
369 write('Upload: ',string(remote
),' aggr done');
370 thr
.Done
; writeln(' thrdone');
371 UnShedule(@Periodic
);
372 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
373 if assigned(next
) then next
^.prev
:=prev
;
374 SetMsgHandler(opcode
.tccont
,remote
,nil);
375 SetMsgHandler(opcode
.tceack
,remote
,nil);
376 FreeMem(@Self
,sizeof(self
));
379 procedure tAggr
.Start(ac
:byte);
381 //writeln('Upload: aggr for ',string(remote),'/',ac,' start seg=',chan[ac]^.uc.seg);
382 assert(assigned(chan
[ac
]));
383 EnterCriticalSection(thr
.crit
);
384 assert(not assigned(thr
.chans
[ac
]));
385 thr
.chans
[ac
]:=@chan
[ac
]^.uc
;
386 chan
[ac
]^.uc
.wcur
:=chan
[ac
]^.uc
.weight
;
387 UnShedule(@Periodic
);
388 Shedule(700,@Periodic
);
389 {do not reset if resuming from wait}
390 if thr
.stop
{or thr.wait} then ResetMark
else {do not reset if running};
391 thr
.Start
; {wake up, or start if not running}
392 LeaveCriticalSection(thr
.crit
);
395 procedure tAggr
.Stop(ac
:byte);
397 //writeln('Upload: aggr for ',string(remote),' stop chan ',ac);
398 assert(assigned(chan
[ac
]));
399 EnterCriticalSection(thr
.crit
);
400 assert(assigned(thr
.chans
[ac
]));
402 LeaveCriticalSection(thr
.crit
);
405 procedure tAggr
.Periodic
;
409 if (thr
.stop
)or(thr
.wait
) then begin
410 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then with chan
[i
]^ do begin
411 if not active
then continue
;
412 EnterCriticalSection(thr
.crit
);
413 //writeln('periodic ',thr.stop,thr.wait,uc.seg);
415 LeaveCriticalSection(thr
.crit
);
422 if timeout
>=10 then begin
424 for i
:=0 to high(chan
) do if assigned(chan
[i
]) then chan
[i
]^.Close
;
426 if timeout
=4 then CalcRates(512);
429 Shedule(700,@Periodic
);
436 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);