8 Jobs
: array [0..15] of ^tJob
;
14 procedure Init(const src
:tNetAddr
);
15 procedure MsgDATA(sz
:Word;mark
:byte);
16 procedure MsgIMME(sz
:Word;mark
:byte);
17 procedure Recv(msg
:tSMsg
);
21 procedure Start(ix
:byte);
22 procedure Stop(ix
:byte);
26 function GetAggr(const remote
:tNetAddr
):tAggr_ptr
;
32 while assigned(a
) do begin
33 if a
^.remote
=remote
then begin
43 procedure tAggr
.Init(const src
:tNetAddr
);
49 CurMark
:=0;PrvMark
:=0;
51 refs
:=high(Jobs
); while refs
>0 do begin Jobs
[refs
]:=nil; dec(refs
) end;
52 ChanOfs
:=Random(255-high(Jobs
));
55 SetMsgHandler(opcode
.tcdata
,remote
,@Recv
);
56 SetMsgHandler(opcode
.tcdataimm
,remote
,@Recv
);
59 procedure tAggr
.Recv(msg
:tSMsg
);
66 op
:=msg
.stream
.readbyte
;
67 mark
:=msg
.stream
.readbyte
;
68 if op
=opcode
.tcdataimm
then MsgIMME(msg
.length
,mark
);
69 MsgDATA(msg
.length
,mark
);
70 if DgrCnt
>=8 then begin
71 delta
:=(mNow
-StartT
){*MSecsPerDay};
75 chan
:=msg
.stream
.readbyte
;
76 base
:=msg
.stream
.ReadWord(4);
77 if (chan
<=high(Jobs
))and assigned(Jobs
[chan
]) then Jobs
[chan
]^.MsgDATA(base
,msg
.stream
.RDBufLen
,msg
.stream
.RDBuf
);
80 procedure tAggr
.MsgIMME(sz
:Word; mark
:byte);
82 var buf
:array [1..4] of byte;
84 r
.Init(@buf
,0,sizeof(buf
));
85 r
.WriteByte(opcode
.tceack
);
88 SendMessage(r
.base
^,r
.length
,remote
);
91 procedure tAggr
.MsgDATA(sz
:Word; mark
:byte);
93 if mark
<>PrvMark
then begin
94 if mark
<>CurMark
then begin
100 //writeln('Mark Reset: ',PrvMark,'->',CurMark);
101 end else begin Inc(ByteCnt
,sz
); Inc(DgrCnt
); end;
104 //writeln('Download: got ',DgrCnt,'dg,',ByteCnt,'B in ',delta,'ms');
107 procedure tAggr
.SendRate
;
109 var rateb
: DWord
; {BytesPerSecond shr 6 (=64)}
110 var buf
:array [1..6] of byte;
113 delta
:=(mNow
-StartT
){*MSecsPerDay};
114 rate
:=(ByteCnt
/delta
)*1000;
115 //writeln('Download: rate ',(rate/1024):7:1, 'kB/s');
116 rateb
:=round((rate
)/64);
120 r
.Init(@buf
,0,sizeof(buf
));
121 r
.WriteByte(opcode
.tccont
);
122 r
.WriteByte(CurMark
);
123 r
.WriteWord(rateb
,4);
124 SendMessage(r
.base
^,r
.length
,remote
);
127 procedure tAggr
.Periodic
;
129 if DgrCntCheck
>1 then begin
131 Shedule(5000,@Periodic
);
133 writeln('Download: Periodic check failed, unimplemented!');
137 procedure tAggr
.Done
;
143 while assigned(a
) do begin
144 if a
=@self
then begin
148 UnShedule(@Periodic
);
149 SetMsgHandler(opcode
.tcdata
,remote
,nil);
150 SetMsgHandler(opcode
.tcdataimm
,remote
,nil);
151 FreeMem(@self
,sizeof(self
));
154 procedure tAggr
.Start(ix
:byte);
156 if Jobs
[ix
]^.active
then exit
;
157 if acnt
=0 then Shedule(5000,@Periodic
);
159 Jobs
[ix
]^.active
:=true;
162 procedure tAggr
.Stop(ix
:byte);
164 if not Jobs
[ix
]^.active
then exit
;
166 Jobs
[ix
]^.active
:=false;
168 UnShedule(@Periodic
);