8 Jobs
: array [0..15] of ^tJob
;
14 procedure Init(const src
:tNetAddr
);
15 procedure MsgDATA(sz
:Word;mark
:byte);
16 procedure MsgIMME(sz
:Word;mark
:byte);
17 procedure Recv(msg
:tSMsg
);
20 procedure Start(ix
:byte);
21 procedure Stop(ix
:byte);
25 function GetAggr(const remote
:tNetAddr
):tAggr_ptr
;
31 while assigned(a
) do begin
32 if a
^.remote
=remote
then begin
42 procedure tAggr
.Init(const src
:tNetAddr
);
48 CurMark
:=0;PrvMark
:=0;
50 refs
:=high(Jobs
); while refs
>0 do begin Jobs
[refs
]:=nil; dec(refs
) end;
51 ChanOfs
:=Random(255-high(Jobs
));
54 SetMsgHandler(opcode
.tcdata
,remote
,@Recv
);
55 SetMsgHandler(opcode
.tcdataimm
,remote
,@Recv
);
58 procedure tAggr
.Recv(msg
:tSMsg
);
64 op
:=msg
.stream
.readbyte
;
65 mark
:=msg
.stream
.readbyte
;
66 if op
=opcode
.tcdataimm
then MsgIMME(msg
.length
,mark
);
67 MsgDATA(msg
.length
,mark
);
68 chan
:=msg
.stream
.readbyte
;
69 base
:=msg
.stream
.ReadWord(4);
70 if (chan
<=high(Jobs
))and assigned(Jobs
[chan
]) then Jobs
[chan
]^.MsgDATA(base
,msg
.stream
.RDBufLen
,msg
.stream
.RDBuf
);
73 procedure tAggr
.MsgIMME(sz
:Word; mark
:byte);
75 var buf
:array [1..4] of byte;
77 r
.Init(@buf
,0,sizeof(buf
));
78 r
.WriteByte(opcode
.tceack
);
81 SendMessage(r
.base
^,r
.length
,remote
);
84 procedure tAggr
.MsgDATA(sz
:Word; mark
:byte);
86 var rateb
: DWord
; {BytesPerSecond shr 6 (=64)}
87 var buf
:array [1..6] of byte;
90 if mark
<>PrvMark
then begin
91 if mark
<>CurMark
then begin
97 end else begin Inc(ByteCnt
,sz
); Inc(DgrCnt
); end;
100 //writeln('Download: got ',DgrCnt,'dg,',ByteCnt,'B in ',delta,'ms');
101 if DgrCnt
<8 then exit
;
102 delta
:=(mNow
-StartT
){*MSecsPerDay};
103 if delta
<400 then exit
;
104 rate
:=(ByteCnt
/delta
)*1000;
105 //writeln('Download: rate ',(rate/1024):7:1, 'kB/s');
106 rateb
:=round((rate
)/64);
110 r
.Init(@buf
,0,sizeof(buf
));
111 r
.WriteByte(opcode
.tccont
);
113 r
.WriteWord(rateb
,4);
114 SendMessage(r
.base
^,r
.length
,remote
);
117 procedure tAggr
.Periodic
;
119 if DgrCntCheck
>1 then begin
121 Shedule(5000,@Periodic
);
123 writeln('Download: Periodic check failed, unimplemented!');
127 procedure tAggr
.Done
;
133 while assigned(a
) do begin
134 if a
=@self
then begin
138 UnShedule(@Periodic
);
139 SetMsgHandler(opcode
.tcdata
,remote
,nil);
140 SetMsgHandler(opcode
.tcdataimm
,remote
,nil);
141 FreeMem(@self
,sizeof(self
));
144 procedure tAggr
.Start(ix
:byte);
146 if Jobs
[ix
]^.active
then exit
;
147 if acnt
=0 then Shedule(5000,@Periodic
);
149 Jobs
[ix
]^.active
:=true;
152 procedure tAggr
.Stop(ix
:byte);
154 if not Jobs
[ix
]^.active
then exit
;
156 Jobs
[ix
]^.active
:=false;
157 if acnt
=0 then UnShedule(@Periodic
);