2 {Upload Manager for brodnetd}
8 - handle retransmit requests
10 - deprioritize/cancel uploads
11 keep one TC connection per peer (+expire-delete)
12 => need 'chat' protocol
16 USES Chat
,TC
,opcode
,ServerLoop
,MemStream
,NetAddr
;
30 prv
: array [0..15] of ^tPrv
;
34 procedure OnMsg(msg
: tSMsg
; data
: boolean);
35 procedure Init(var nchat
:tChat
; msg
:tSMsg
);
38 procedure IdleTimeout
; procedure TCTimeout
; procedure CHTimeout(willwait
:LongWOrd
);
39 procedure SendTestReply
;
40 procedure ExpandPrv(last
:byte);
47 GET(channel:byte; filehash:20; baseHi:word2; base:word4; limit:word4);
48 SEG(channel:byte; baseHi:word2; base:word4; limit:word4);
49 FIN(channel:byte; avail:byte);
51 INFO(channel:byte; struct);
52 FAIL(channel:byte; code:byte);
56 procedure tAggr
.OnMsg(msg
: tSMsg
; data
: boolean);
60 op
:=msg
.stream
.readbyte
;
65 FreeMem(@self
,sizeof(self
));
67 opcode
.upGET
: ReqGET(msg
);
73 procedure tAggr
.ReqGET(msg
:tSMsg
);
75 var filehash
: array [1..20] of byte;
80 if msg
.stream
.RdBufLen
<31 then begin
81 SendError(opcode
.upErrMalformed
); exit
end;
82 chan
:=msg
.stream
.ReadByte
;
83 if chan
>high(prv
) then begin
84 SendError(opcode
.upErrHiChan
,chan
); exit
end;
85 if assigned(prv
[chan
]) then begin
86 SendError(opcode
.upErrChanInUse
,chan
); exit
end;
87 msg
.stream
.Read(FileHash
,20);
88 basehi
:=msg
.stream
.ReadWord(2);
89 base
:=msg
.stream
.Read(4);
90 limit
:=msg
.stream
.Read(4);
93 with prv
[chan
]^ do begin
96 Init(filehash
,basehi
,base
,limit
);
100 procedure tAggr
.OnCont
;
106 if cprv
>=length(prv
) then cprv
:=0 else inc(cprv
);
107 if cprv
=pprv
then begin
109 Shedule(15000,@IdleTimeout
);
114 until tcs
.txLastSize
>0;
117 procedure tAggr
.SendTestReply
;
120 writeln('upmgr: test');
121 s
.Init(GetMem(56),0,56);
128 procedure tAggr
.Init(var nchat
:tChat
; msg
:tSMsg
);
131 writeln('upmgr: init');
134 if assigned(Peers
) then Peers
^.prev
:=@self
;
137 tcs
.Init(msg
.source
^);
138 tcs
.CanSend
:=@OnCont
;
140 tcs
.OnTimeout
:=@TCTimeout
;
141 for i
:=0 to high(prv
) do prv
[i
]:=nil;
143 ch
^.Callback
:=@OnMsg
;
144 ch
^.TMHook
:=@CHTimeout
;
145 writeln('upmgr: send ack to init');
148 Shedule(15000,@IdleTimeout
);
151 procedure tAggr
.IdleTimeout
;
152 begin if not idle
then exit
;
153 writeln('Idle Timeout');
155 procedure tAggr
.TCTimeout
;
157 writeln('TCTimeout');
159 procedure tAggr
.CHTimeout(willwait
:LongWOrd
);
160 begin if willwait
<30000 then exit
;
161 writeln('ChatTimeout');
164 procedure tAggr
.ForceClose
;
167 writeln('upmgr: force close');
168 s
.Init(GetMem(56),0,56);
170 s
.WriteByte(opcode
.upClose
);
175 Done
; {fixme sheduler}
176 FreeMem(@self
,sizeof(self
));
179 procedure tAggr
.Done
;
181 writeln('upmgr: close');
184 UnShedule(@IdleTimeout
);
185 if assigned(prev
) then prev
^.next
:=next
else Peers
:=next
;
186 if assigned(next
) then next
^.prev
:=prev
;
189 function FindAggr({const} addr
:tNetAddr
): tAggr_ptr
;
192 while assigned(result
) do begin
193 if result
^.tcs
.remote
=addr
then exit
;
194 assert(result
^.prev
=result
);
195 result
:=result
^.next
;
199 procedure ChatHandler(var nchat
:tChat
; msg
:tSMsg
);
203 dup
:=FindAggr(msg
.source
^);
204 if assigned(dup
) then begin
210 Dup
^.Init(nchat
,msg
);
214 SetChatHandler(opcode
.upFileServer
,@ChatHandler
);