StoreObject finalization correction.
[brdnet.git] / Upload.pas
blobc87de0ae1a462f29c2da9805059ba355b6509b68
1 UNIT Upload;
2 {Upload Manager for brodnetd}
4 INTERFACE
5 USES Chat,opcode,ServerLoop,MemStream,NetAddr,Store1;
7 IMPLEMENTATION
8 uses UploadThread;
10 type
11 tAggr_ptr=^tAggr;
12 tPrv_ptr=^tPrv;
13 tPrv=object
14 chan:byte;
15 aggr:tAggr_ptr;
16 uc:UploadThread.tChannel;
17 ch:^tChat;
18 isOpen,Active:boolean;
19 procedure Init(var nchat:tChat);
20 procedure OnMSG(msg:tSMsg;data:boolean);
21 procedure NotifyDOne;
22 procedure DoOPEN(const fid:tfid);
23 procedure DoLSEG(count:byte; base:array of LongWord; limit:array of LongWord);
24 procedure DoWEIGHT(nweight:word);
25 procedure Stop;
26 procedure Start;
27 procedure Close;
28 procedure Close(tell:boolean); overload; inline;
29 procedure ChatTimeout(willwait:LongWord);
30 end;
31 tAggr=object
32 thr:tUploadThr;
33 remote:tNetAddr;
34 refc:byte;
35 chan:array[0..11] of ^tPrv;
36 acks:Word; {ack counter, for timeouts}
37 timeout:word;
38 rateIF,sizeIF,
39 limRate,limSize:Single;
40 next,prev: tAggr_ptr;
41 procedure Free(ac:byte);{called by closing prv}
42 procedure Done;
43 procedure Start(ac:byte);
44 procedure Stop(ac:byte);
45 procedure Init(const source:tNetAddr);
46 procedure CalcRates(rxRate:Single);
47 procedure Periodic;
48 procedure OnCont(msg:tSMsg);
49 procedure OnAck(msg:tSMsg);
50 procedure ResetMark;
51 end;
53 var Peers:^tAggr;
54 procedure SendError(var ch:tChat;e1,e2:byte); forward;
55 function FindAggr({const} addr:tNetAddr): tAggr_ptr; forward;
57 { PROTOCOL }
58 { CLIENT SERVER(us)}{
59 init upFileServer <channel> -> ACK
60 upOPEN <id> -> upINFO <length> <final>
61 upOPEN <id> <b,l> -> upINFO <length> <final> <avl-bytes>
62 -> upFAIL <code> <code2> <details>
63 upLSEG [b,l] -> upSEGOK <available-bytes>
64 -> upUNAVL <next-avl>
65 -> upFAIL upErrIO
66 upSTOP -> ACK
67 upCLOSE -> ACK
68 upWEIGHT <weight> -> ACK
70 special server messages:
71 upEPROTO <code> <details> (protocol violation)
72 upCLOSE (close by server, usualy timeout)
73 error conditions:
74 upErrHiChan (channel number too high or too many connections)
75 upErrChanInUse
76 upErrNotFound (file was not found)
77 upErrIO (other error while opening/reading/seeking)
78 upEPROTO upErrNotOpen (LSEG without OPEN or afer STOP)
79 upEPROTO upErrTroll (trolling)
80 notes:
81 OPEN message can be merged with init, saving a round-trip
84 procedure tPrv.DoOPEN(const fid:tfid);
85 var err:tmemorystream;
86 begin
87 writeln('Upload: ',string(ch^.remote),'/',chan,' OPEN');
88 Stop;
89 if isOpen then uc.oi.Close;
90 isOpen:=false;
91 ch^.Ack;
92 uc.oi.Open(fid);
93 {if not oinfo.final then begin
94 oinfo.rc:=200;
95 Close(oinfo.hnd);
96 end;}
97 if uc.oi.rc>0 then begin
98 ch^.StreamInit(err,3);
99 err.WriteByte(upFAIL);
100 if uc.oi.rc=1 then err.WriteByte(upErrNotFound)
101 else begin err.WriteByte(upErrIO); err.WriteByte(uc.oi.rc) end;
102 ch^.Send(err);
103 end else begin
104 isopen:=true;
105 ch^.StreamInit(err,10);
106 err.WriteByte(upINFO);
107 err.WriteWord(uc.oi.length,4);
108 if uc.oi.final then err.WriteByte(1) else err.WriteByte(0);
109 err.WriteWord(0,4);
110 ch^.Send(err);
111 end;
112 end;
114 procedure tPrv.DoLSEG(count:byte; base,limit: array of LongWord);
115 var err:tmemorystream;
116 var i:byte;
117 var l,fb:LongWord;
118 var tbytes:LongWOrd;
119 begin
120 writeln('Upload: ',string(ch^.remote),'/',chan,' LSEG');
121 if not isOpen then begin
122 ch^.StreamInit(err,3);
123 err.WriteByte(upEPROTO);
124 err.WriteByte(upErrNotOpen);
125 ch^.send(err);
126 writeln('notOpen');
127 exit end;
128 if count=0 then begin
129 ch^.StreamInit(err,3);
130 err.WriteByte(upEPROTO);
131 err.WriteByte(100);
132 ch^.send(err);
133 writeln('ZeroCount');
134 exit end;
135 stop;
136 uc.seg:=0;
137 tbytes:=0;
138 for i:=1 to count do begin
139 if limit[i-1]=0 then begin
140 ch^.StreamInit(err,3);
141 err.WriteByte(upEPROTO);
142 err.WriteByte(101);
143 ch^.send(err);
144 writeln('ZeroLimit');
145 exit end;
146 l:=uc.oi.SegmentLength(base[i-1]);
147 if l>0 then begin
148 inc(uc.seg);
149 uc.s[uc.seg].base:=base[i-1];
150 if l>limit[i-1] then l:=limit[i-1];
151 uc.s[uc.seg].len:=l;
152 inc(tbytes,l);
153 end else if i=1 then begin
154 {first failed, try find some seg}
155 uc.oi.GetSegAfter(base[0],fb,l);
156 ch^.StreamInit(err,5);
157 if l=0 then fb:=0;
158 err.WriteByte(upUNAVL);
159 err.WriteWord(fb,4);
160 ch^.Send(err);
161 exit;
162 end;
163 end;
164 ch^.StreamInit(err,6);
165 err.WriteByte(upSEGOK);
166 err.WriteWord(tbytes,4);
167 err.WriteByte(uc.seg);
168 ch^.Send(err);
169 Start;
170 end;
172 procedure tPrv.DoWEIGHT(nweight:word);
173 begin
174 if nweight<50 then nweight:=50;
175 uc.Weight:=nweight;
176 ch^.Ack;
177 end;
179 procedure ChatHandler(var nchat:tChat; msg:tSMsg);
180 var ag:^tAggr;
181 var pr:^tPrv;
182 var chan:byte;
183 begin
184 msg.stream.skip({the initcode}1);
185 if msg.stream.RdBufLen<2 then begin SendError(nchat,upErrMalformed,0); exit end;
186 chan:=msg.stream.ReadByte;
187 if chan>high(tAggr.chan) then begin Senderror(nchat,upErrHiChan,chan); exit end;
188 ag:=FindAggr(msg.source^);
189 if not assigned(ag) then begin
190 New(ag);
191 ag^.init(msg.source^);
192 end else if assigned(ag^.chan[chan]) then begin SendError(nchat,upErrChanInUse,0); exit end;
193 New(pr);
194 pr^.aggr:=ag;
195 pr^.chan:=chan;
196 ag^.chan[chan]:=pr;
197 inc(ag^.refc);
198 pr^.Init(nchat);
199 if msg.stream.RdBufLen>0 {the request may be empty}
200 then pr^.OnMSG(msg,true);
201 end;
202 procedure tPrv.OnMSG(msg:tSMsg;data:boolean);
203 var op:byte;
204 var hash:tfid;
205 var base:LongWord;
206 var limit:LongWord;
207 var err:tmemorystream;
208 var count:byte;
209 var lbas:array [0..23] of LongWOrd;
210 var llim:array [0..23] of LongWOrd;
211 label malformed;
212 begin
213 if not data then exit; //todo
214 if msg.stream.RdBufLen<1 then goto malformed;
215 op:=msg.stream.ReadByte;
216 case op of
217 upClose: begin
218 ch^.Ack;
219 Close(false);
220 end;
221 upOPEN: begin
222 if msg.stream.RdBufLen<20 then goto malformed;
223 msg.stream.Read(hash,20);
224 DoOPEN(hash);
225 end;
226 upLSEG: begin
227 count:=0;
228 while (msg.stream.RdBufLen>0)and(count<=high(lbas)) do begin
229 if msg.stream.RdBufLen<8 then goto malformed;
230 lbas[count]:=msg.stream.ReadWord(4);
231 llim[count]:=msg.stream.ReadWord(4);
232 inc(count);
233 end;
234 DoLSEG(count,lbas,llim);
235 end;
236 upWEIGHT: begin
237 if msg.stream.RdBufLen<>2 then goto malformed;
238 base:=msg.stream.ReadWord(2);
239 DoWEIGHT(base);
240 end;
241 else goto malformed;
242 end;
243 exit; malformed:
244 ch^.StreamInit(err,3);
245 err.WriteByte(upEPROTO);
246 err.WriteByte(upErrMalformed);
247 ch^.Send(err);
248 end;
250 procedure tPrv.Init(var nchat:tChat);
251 begin
252 ch:=@nchat;
253 ch^.Callback:=@OnMsg;
254 ch^.TMHook:=@ChatTimeout;
255 uc.weight:=100;
256 isOpen:=false; Active:=false;
257 Shedule(5000,@Close);
258 end;
260 procedure tPrv.NotifyDone;
261 var err:tmemorystream;
262 begin
263 Stop;
264 ch^.StreamInit(err,2);
265 err.WriteByte(upDONE);
266 ch^.Send(err);
267 end;
269 procedure tPrv.Stop;
270 begin
271 if active then begin
272 active:=False;
273 Shedule(20000,@Close);
274 aggr^.Stop(chan);
275 end;
276 end;
277 procedure tPrv.Start;
278 begin
279 assert(isOpen);
280 if not active then UnShedule(@Close);
281 active:=true;
282 aggr^.Start(chan);
283 end;
285 procedure tPrv.Close(tell:boolean);
286 var err:tMemoryStream;
287 begin
288 assert(assigned(ch));
289 if tell then begin
290 ch^.StreamInit(err,1);
291 err.WriteByte(upClose);
292 ch^.Send(err);
293 end;
294 Stop;
295 if isOpen then uc.oi.Close;
296 isOpen:=false;
297 ch^.Close;
298 ch:=nil;
299 UnShedule(@Close);
300 Aggr^.Free(chan);
301 FreeMem(@self,sizeof(self));
302 end;
303 procedure tPrv.Close;
304 begin
305 Close(true);
306 end;
308 procedure tPrv.ChatTimeout(willwait:LongWord);
309 begin
310 if WillWait<8000 then exit;
311 writeln('Upload: prv for ',string(ch^.remote),'/',chan,' ChatTimeout');
312 Close;
313 end;
315 {***AGGREGATOR***}
317 procedure tAggr.Init(const source:tNetAddr);
318 begin
319 next:=Peers;
320 prev:=nil;
321 if assigned(Peers) then Peers^.prev:=@self;
322 Peers:=@self;
323 refc:=0;
324 acks:=0;
325 timeout:=0;
326 rateIF:=1;
327 sizeIF:=1;
328 limRate:=2000*1024*1024;
329 limSize:=4096;
330 remote:=source;
331 writeln('Upload: ',string(remote),' aggr init');
332 thr.Init(source);
333 CalcRates(2048);
334 SetMsgHandler(opcode.tccont,remote,@OnCont);
335 SetMsgHandler(opcode.tceack,remote,@OnAck);
336 end;
338 function FindAggr({const} addr:tNetAddr): tAggr_ptr;
339 begin
340 result:=Peers;
341 while assigned(result) do begin
342 if assigned(result^.next) then assert(result^.next^.prev=result);
343 if result^.remote=addr then exit;
344 result:=result^.next;
345 end;
346 end;
348 procedure SendError(var ch:tChat;e1,e2:byte);
349 var s:tMemoryStream;
350 begin
351 ch.StreamInit(s,3);
352 s.WriteByte(upFAIL);
353 s.WriteByte(e1);
354 s.WriteByte(e2);
355 ch.Send(s);
356 ch.Close;
357 end;
359 procedure tAggr.Free(ac:byte);
360 begin
361 assert(assigned(chan[ac]));
362 chan[ac]:=nil;
363 dec(refc);
364 if refc=0 then Done;
365 end;
366 procedure tAggr.Done;
367 begin
368 write('Upload: ',string(remote),' aggr done');
369 thr.Done; writeln(' thrdone');
370 UnShedule(@Periodic);
371 if assigned(prev) then prev^.next:=next else Peers:=next;
372 if assigned(next) then next^.prev:=prev;
373 SetMsgHandler(opcode.tccont,remote,nil);
374 SetMsgHandler(opcode.tceack,remote,nil);
375 FreeMem(@Self,sizeof(self));
376 end;
378 procedure tAggr.Start(ac:byte);
379 begin
380 //writeln('Upload: aggr for ',string(remote),' start chan ',ac);
381 assert(assigned(chan[ac]));
382 EnterCriticalSection(thr.crit);
383 assert(not assigned(thr.chans[ac]));
384 thr.chans[ac]:=@chan[ac]^.uc;
385 chan[ac]^.uc.wcur:=chan[ac]^.uc.weight;
386 UnShedule(@Periodic);
387 Shedule(700,@Periodic);
388 if thr.stop or thr.wait then ResetMark else {do not reset if running};
389 thr.Start; {wake up, or start if not running}
390 LeaveCriticalSection(thr.crit);
391 end;
393 procedure tAggr.Stop(ac:byte);
394 begin
395 //writeln('Upload: aggr for ',string(remote),' stop chan ',ac);
396 assert(assigned(chan[ac]));
397 EnterCriticalSection(thr.crit);
398 assert(assigned(thr.chans[ac]));
399 thr.chans[ac]:=nil;
400 LeaveCriticalSection(thr.crit);
401 end;
403 procedure tAggr.Periodic;
404 var i:byte;
405 var e:boolean;
406 begin
407 if (thr.stop)or(thr.wait) then begin
408 for i:=0 to high(chan) do if assigned(chan[i]) then with chan[i]^ do begin
409 if not active then continue;
410 EnterCriticalSection(thr.crit);
411 e:=uc.Seg=0;
412 LeaveCriticalSection(thr.crit);
413 if e then NotifyDone;
414 end;
415 exit end;
416 if acks=0 then begin
417 inc(Timeout);
418 if timeout>=10 then begin
419 refc:=255;
420 for i:=0 to high(chan) do if assigned(chan[i]) then chan[i]^.Close;
421 Done;exit;end;
422 if timeout=4 then CalcRates(512);
423 end else timeout:=0;
424 acks:=0;
425 Shedule(700,@Periodic);
426 end;
428 {$I UploadTC.pas}
430 BEGIN
431 Peers:=nil;
432 SetChatHandler(opcode.upFileServer,@ChatHandler);
433 END.