Some todo.
[brdnet.git] / UploadThread.pas
blob8f9c6f26f156086a31393e7bfab8628c516f60a3
1 unit UploadThread;
2 {coprocessor to Upload unit. Move I/O out of main thread}
3 INTERFACE
4 uses Store1,Sockets,NetAddr;
6 type tSegment=record
7 base,len:LongWord;
8 end;
9 type tChannel=record
10 s: array [1..24] of tSegment;
11 seg:byte;
12 weight:Word;
13 wcur:Word;
14 oi:tStoreObjectInfo;
15 end;
16 type tUploadThr=object
17 thrid:tThreadID;
18 crit:tRtlCriticalSection;
19 socket:tSocket;
20 remote:tSockAddrL;
21 size1,size2:Word;
22 mark:Byte;
23 rate:Single;
24 MarkTime:LongWord;{ms}
25 MarkData:LongWord;
26 chans:array [0..11] of ^tChannel;
27 curc:byte;
28 buffer:array [0..2047] of byte;
29 stop:boolean; {the therad is stopped or stopping}
30 wait:boolean; {the therad is waiting for data}
32 procedure Main;
33 procedure Init(source:tNetAddr);
34 procedure Start;
35 procedure Done;
36 end;
38 IMPLEMENTATION
39 uses MemStream,ServerLoop,SysUtils,opcode;
41 procedure tUploadThr.Init(source:tNetAddr);
42 var i:integer;
43 begin
44 InitCriticalSection(crit);
45 source.ToSocket(remote);
46 socket:=GetSocket(source);
47 MarkData:=0;
48 MarkTime:=0;
49 stop:=true;
50 wait:=false;
51 for i:=0 to high(chans) do chans[i]:=nil;
52 end;
54 procedure tUploadThr.Main;
55 var pch:byte;
56 var s:tMemoryStream;
57 var sz:Word;
58 var txwait,delta:single;//msec
59 var LastTime:tDateTime;//days
60 var chan:^tChannel;
61 var seg:^tSegment;
62 begin
63 txwait:=0;
64 delta:=0;
65 while not stop do begin
66 EnterCriticalSection(crit);
67 pch:=0;
68 {find usable channel}
69 while (chans[curc]=nil)or(chans[curc]^.wcur=0)or(chans[curc]^.seg=0) do begin
70 if assigned(chans[curc])and(chans[curc]^.WCur=0) then chans[curc]^.WCur:=chans[curc]^.weight;
71 inc(curc);
72 inc(pch);
73 if curc>high(chans) then curc:=0;
74 if pch>(high(chans)+1) then begin wait:=true; break; end;
75 end;
76 if wait then begin
77 LeaveCriticalSection(crit);
78 sleep(200);
79 continue;
80 end;
81 LastTime:=SysUtils.Now;
82 chan:=chans[curc];
83 seg:=@chan^.s[chan^.seg];
84 s.Init(@buffer,0,high(buffer));
85 {prepare header}
86 if size2>s.size then size2:=0;
87 if size2=0 then begin
88 sz:=size1; if size1>s.size then sz:=s.size;
89 s.WriteByte(opcode.tcdata);
90 end else begin
91 sz:=size2; if sz>s.size then sz:=s.size;
92 s.WriteByte(opcode.tcdataimm);
93 size2:=0;
94 end;
95 Assert(seg^.len>0);
96 s.WriteByte(mark);
97 s.WriteByte(curc);
98 s.WriteWord(seg^.base,4);
99 dec(sz,s.length);
100 if sz>seg^.Len then sz:=seg^.Len;
101 assert(sz<=seg^.len);
102 chan^.oi.ReadSeg(s.WrBuf,seg^.base,sz);
103 Assert(chan^.oi.rc=0,'IO error reading segment');
104 s.WrEnd(sz);
105 assert((Seg^.Len-sz)>=0);
106 Dec(Seg^.Len,sz);
107 Dec(chan^.WCur);
108 if Seg^.Len=0 then Dec(chan^.seg)
109 else Inc(Seg^.Base,sz);
110 LeaveCriticalSection(crit);
111 fpSendTo(socket,s.base,s.length,0,@remote,sizeof(remote));
112 txwait:=((MarkData/Rate)*1000)-(MarkTime);
113 MarkData:=MarkData+s.length;
114 if txWait>1000 then begin writeln('!!! txwait=',round(txWait)); txWait:=1000;end;
115 if txWait>0 then Sleep(round(txWait));
116 Delta:=Delta+((SysUtils.Now-LastTime)*MSecsPerDay);
117 if Delta>5000 then Delta:=3000;
118 if Delta<0 then Delta:=0;
119 MarkTime:=MarkTime+trunc(Delta);
120 Delta:=frac(Delta);
121 end;
122 end;
124 function thrfunc(p:pointer):PtrInt;
125 begin
126 tUploadThr(p^).Main;
127 thrfunc:=9;
128 end;
129 procedure tUploadThr.Start;
130 begin
131 wait:=false;
132 if not stop then exit;
133 stop:=false;
134 MarkData:=0;
135 MarkTime:=0;
136 thrid:=BeginThread(@ThrFunc,@self);
137 end;
139 procedure tUploadThr.Done;
140 begin
141 if stop then exit;
142 EnterCriticalSection(crit);
143 stop:=true;
144 LeaveCriticalSection(crit);
145 WaitForThreadterminate(thrid,999999);
146 DoneCriticalSection(crit);
147 end;
148 END.