2 {coprocessor to Upload unit. Move I/O out of main thread}
4 uses Store1
,Sockets
,NetAddr
;
10 s
: array [1..24] of tSegment
;
16 type tUploadThr
=object
18 crit
:tRtlCriticalSection
;
24 MarkTime
:LongWord
;{ms}
26 chans
:array [0..11] of ^tChannel
;
28 buffer
:array [0..2047] of byte;
29 stop
:boolean; {the therad is stopped or stopping}
30 wait
:boolean; {the therad is waiting for data}
34 procedure Init(source
:tNetAddr
);
40 uses MemStream
,ServerLoop
,SysUtils
,opcode
;
42 procedure tUploadThr
.Init(source
:tNetAddr
);
45 InitCriticalSection(crit
);
46 source
.ToSocket(remote
);
47 socket
:=GetSocket(source
);
52 for i
:=0 to high(chans
) do chans
[i
]:=nil;
55 procedure tUploadThr
.Main
;
59 var txwait
,delta
:single;//msec
60 var LastTime
:tDateTime
;//days
66 while not stop
do begin
67 EnterCriticalSection(crit
);
70 while (chans
[curc
]=nil)or(chans
[curc
]^.wcur
=0)or(chans
[curc
]^.seg
=0) do begin
71 if assigned(chans
[curc
])and(chans
[curc
]^.WCur
=0) then chans
[curc
]^.WCur
:=chans
[curc
]^.weight
;
74 if curc
>high(chans
) then curc
:=0;
75 if pch
>(high(chans
)+1) then begin wait
:=true; break
; end;
78 LeaveCriticalSection(crit
);
86 LastTime
:=SysUtils
.Now
;
88 seg
:=@chan
^.s
[chan
^.seg
];
89 s
.Init(@buffer
,0,high(buffer
));
91 if size2
>s
.size
then size2
:=0;
93 sz
:=size1
; if size1
>s
.size
then sz
:=s
.size
;
94 s
.WriteByte(opcode
.tcdata
);
96 sz
:=size2
; if sz
>s
.size
then sz
:=s
.size
;
97 s
.WriteByte(opcode
.tcdataimm
);
103 s
.WriteWord(seg
^.base
,4);
105 if sz
>seg
^.Len
then sz
:=seg
^.Len
;
106 assert(sz
<=seg
^.len
);
107 chan
^.oi
.ReadSeg(s
.WrBuf
,seg
^.base
,sz
);
108 Assert(chan
^.oi
.rc
=0,'IO error reading segment');
110 assert((Seg
^.Len
-sz
)>=0);
113 if Seg
^.Len
=0 then Dec(chan
^.seg
)
114 else Inc(Seg
^.Base
,sz
);
115 LeaveCriticalSection(crit
);
116 fpSendTo(socket
,s
.base
,s
.length
,0,@remote
,sizeof(remote
));
117 txwait
:=((MarkData
/Rate
)*1000)-(MarkTime
);
118 MarkData
:=MarkData
+s
.length
;
119 if txWait
>1000 then begin writeln('!!! txwait=',round(txWait
)); txWait
:=1000;end;
120 if txWait
>0 then Sleep(round(txWait
));
121 Delta
:=Delta
+((SysUtils
.Now
-LastTime
)*MSecsPerDay
);
122 if Delta
>5000 then Delta
:=3000;
123 if Delta
<0 then Delta
:=0;
124 MarkTime
:=MarkTime
+trunc(Delta
);
129 function thrfunc(p
:pointer):PtrInt
;
134 procedure tUploadThr
.Start
;
137 if not stop
then exit
;
141 thrid
:=BeginThread(@ThrFunc
,@self
);
144 procedure tUploadThr
.Done
;
147 EnterCriticalSection(crit
);
149 LeaveCriticalSection(crit
);
150 WaitForThreadterminate(thrid
,65535);
151 DoneCriticalSection(crit
);