2 {coprocessor to Upload unit. Move I/O out of main thread}
4 uses Store1
,Sockets
,NetAddr
;
10 s
: array [1..24] of tSegment
;
16 type tUploadThr
=object
18 crit
:tRtlCriticalSection
;
24 MarkTime
:LongWord
;{ms}
26 chans
:array [0..11] of ^tChannel
;
28 buffer
:array [0..2047] of byte;
29 stop
:boolean; {the therad is stopped or stopping}
30 wait
:boolean; {the therad is waiting for data}
33 procedure Init(source
:tNetAddr
);
39 uses MemStream
,ServerLoop
,SysUtils
,opcode
;
41 procedure tUploadThr
.Init(source
:tNetAddr
);
44 InitCriticalSection(crit
);
45 source
.ToSocket(remote
);
46 socket
:=GetSocket(source
);
51 for i
:=0 to high(chans
) do chans
[i
]:=nil;
54 procedure tUploadThr
.Main
;
58 var txwait
,delta
:single;//msec
59 var LastTime
:tDateTime
;//days
65 while not stop
do begin
66 EnterCriticalSection(crit
);
69 while (chans
[curc
]=nil)or(chans
[curc
]^.wcur
=0)or(chans
[curc
]^.seg
=0) do begin
70 if assigned(chans
[curc
])and(chans
[curc
]^.WCur
=0) then chans
[curc
]^.WCur
:=chans
[curc
]^.weight
;
73 if curc
>high(chans
) then curc
:=0;
74 if pch
>(high(chans
)+1) then begin wait
:=true; break
; end;
77 LeaveCriticalSection(crit
);
81 LastTime
:=SysUtils
.Now
;
83 seg
:=@chan
^.s
[chan
^.seg
];
84 s
.Init(@buffer
,0,high(buffer
));
86 if size2
>s
.size
then size2
:=0;
88 sz
:=size1
; if size1
>s
.size
then sz
:=s
.size
;
89 s
.WriteByte(opcode
.tcdata
);
91 sz
:=size2
; if sz
>s
.size
then sz
:=s
.size
;
92 s
.WriteByte(opcode
.tcdataimm
);
98 s
.WriteWord(seg
^.base
,4);
100 if sz
>seg
^.Len
then sz
:=seg
^.Len
;
101 assert(sz
<=seg
^.len
);
102 chan
^.oi
.ReadSeg(s
.WrBuf
,seg
^.base
,sz
);
103 Assert(chan
^.oi
.rc
=0,'IO error reading segment');
105 assert((Seg
^.Len
-sz
)>=0);
108 if Seg
^.Len
=0 then Dec(chan
^.seg
)
109 else Inc(Seg
^.Base
,sz
);
110 LeaveCriticalSection(crit
);
111 fpSendTo(socket
,s
.base
,s
.length
,0,@remote
,sizeof(remote
));
112 txwait
:=((MarkData
/Rate
)*1000)-(MarkTime
);
113 MarkData
:=MarkData
+s
.length
;
114 if txWait
>1000 then begin writeln('!!! txwait=',round(txWait
)); txWait
:=1000;end;
115 if txWait
>0 then Sleep(round(txWait
));
116 Delta
:=Delta
+((SysUtils
.Now
-LastTime
)*MSecsPerDay
);
117 if Delta
>5000 then Delta
:=3000;
118 if Delta
<0 then Delta
:=0;
119 MarkTime
:=MarkTime
+trunc(Delta
);
124 function thrfunc(p
:pointer):PtrInt
;
129 procedure tUploadThr
.Start
;
132 if not stop
then exit
;
136 thrid
:=BeginThread(@ThrFunc
,@self
);
139 procedure tUploadThr
.Done
;
142 EnterCriticalSection(crit
);
144 LeaveCriticalSection(crit
);
145 WaitForThreadterminate(thrid
,999999);
146 DoneCriticalSection(crit
);