From 9486d227df2922f150e36c5ab9224ec5bc7c14d6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tom=C3=A1=C5=A1=20Brada?= Date: Wed, 7 Oct 2015 13:02:08 +0200 Subject: [PATCH] test and finish FS --- ServerLoop.pas | 1 + TC.pas | 6 ++- TestFS.pas | 1 + opcode.pas | 17 ++++--- upmgr.pas | 138 ++++++++++++++++++++++++++++++++++++++++++--------------- 5 files changed, 120 insertions(+), 43 deletions(-) diff --git a/ServerLoop.pas b/ServerLoop.pas index bcde2bf..3e4e58c 100644 --- a/ServerLoop.pas +++ b/ServerLoop.pas @@ -236,6 +236,7 @@ procedure ShedRun; var olTop:^tSheduled; begin {Sheduling} + {fixme: proste niak aby to šlo vymazať z callbacku} olTop:=ShedTop; pcur:=@olTop; cur:=pcur^; diff --git a/TC.pas b/TC.pas index be2c491..508059d 100644 --- a/TC.pas +++ b/TC.pas @@ -43,12 +43,13 @@ type tTCS=object {this is sender part} txLastSize:Word; {is zero if suspend} siMark:byte; siNow,siWait:boolean; - isTimeout:word; + isTimeout,maxTimeout:word; Cur:tTCSSe; {current values} Limit:tTCSSe; {maximum alloved} Initial:tTCSSe; {after start/timeout} minRateIF:single; {used after rate decrease} CanSend: procedure of object; {called when transmit possible} + OnTimeout: procedure of object; procedure Start; {start the transmission} function MaxSize(req:word):word; procedure WriteHeaders(var s:tMemoryStream); {add headers before the data} @@ -80,6 +81,8 @@ procedure tTCS.Init(const iremote:tNetAddr); Initial.SizeIF:=2; minRateIF:=0.01; CanSend:=nil; + OnTimeout:=nil; + maxTimeout:=65535; Cur:=Initial; txLastSize:=0; end; @@ -204,6 +207,7 @@ procedure tTCS.Timeout; mark:=Random(256); MarkData:=0; siMark:=0; Inc(isTimeout); + if (isTimeout>maxTimeout)and assigned(OnTimeout) then OnTimeout; Shedule(80,@TransmitDelay); Shedule(3000,@Timeout); end; diff --git a/TestFS.pas b/TestFS.pas index 3f608e3..0f26327 100644 --- a/TestFS.pas +++ b/TestFS.pas @@ -25,6 +25,7 @@ procedure t.ST1(msg:tSMsg; data:boolean); s.WriteByte(99); ch.Send(s); ch.Callback:=@ST2; + halt(32); end else writeln('unexpected data'); end; procedure t.ST2(msg:tSMsg; data:boolean); diff --git a/opcode.pas b/opcode.pas index a913f9a..690b56e 100644 --- a/opcode.pas +++ b/opcode.pas @@ -11,13 +11,16 @@ const {chat init} upFileServer=2; const {FS opcodes} upClose=0; - upGET=1; - upINFO=2; - upFAIL=3; - upDONE=4; - upSEG=5; - upSEGOK=6; - upFIN=7; + {s}upGET=1; + {r}upINFO=2; + {r}upFAIL=3; + {r}upDONE=4; + {s}upSEG=5; + {r}upSEGOK=6; + {s}upFIN=7; + upErrMalformed=1; + upErrHiChan=2; + upErrChanInUse=3; IMPLEMENTATION END. \ No newline at end of file diff --git a/upmgr.pas b/upmgr.pas index a8d28d4..e524c6d 100644 --- a/upmgr.pas +++ b/upmgr.pas @@ -1,14 +1,6 @@ UNIT UPMGR; {Upload Manager for brodnetd} -{all file requests in one chat - -chat - >FILETRANSFER - need 'chat' protocol } -{to download a file: - >GET channel CHK hash ofs+len (if len>0 start transfer) - SEG channel ofs len - FIN channel -} - INTERFACE USES Chat,TC,opcode,ServerLoop,MemStream,NetAddr; @@ -44,20 +24,35 @@ tPrv=object end; tPrv_ptr=^tPrv; tAggr=object - state: byte; {0=idle} + idle: boolean; tcs: tTCS; - ch: ^tChat; - prv: array of tPrv; + ch:^tChat; + prv: array [0..15] of ^tPrv; + cprv:byte;{current} next,prev: tAggr_ptr; - //procedure OnCont; + procedure OnCont; procedure OnMsg(msg: tSMsg; data: boolean); procedure Init(var nchat:tChat; msg:tSMsg); + procedure ForceClose; procedure Done; + procedure IdleTimeout; procedure TCTimeout; procedure CHTimeout(willwait:LongWOrd); procedure SendTestReply; + procedure ExpandPrv(last:byte); end; var Peers:^tAggr; +{Requests +Close(); +GET(channel:byte; filehash:20; baseHi:word2; base:word4; limit:word4); +SEG(channel:byte; baseHi:word2; base:word4; limit:word4); +FIN(channel:byte; avail:byte); +}{Responses +INFO(channel:byte; struct); +FAIL(channel:byte; code:byte); +DONE(channel:byte); +} + procedure tAggr.OnMsg(msg: tSMsg; data: boolean); var op:byte; begin @@ -69,14 +64,60 @@ procedure tAggr.OnMsg(msg: tSMsg; data: boolean); Done; FreeMem(@self,sizeof(self)); end; + opcode.upGET: ReqGET(msg); 99: SendTestReply; end{case}; end{data}; end; +procedure tAggr.ReqGET(msg:tSMsg); + var chan:byte; + var filehash: array [1..20] of byte; + var basehi:word; + var base:LongWord; + var limit:LongWord; + begin + if msg.stream.RdBufLen<31 then begin + SendError(opcode.upErrMalformed); exit end; + chan:=msg.stream.ReadByte; + if chan>high(prv) then begin + SendError(opcode.upErrHiChan,chan); exit end; + if assigned(prv[chan]) then begin + SendError(opcode.upErrChanInUse,chan); exit end; + msg.stream.Read(FileHash,20); + basehi:=msg.stream.ReadWord(2); + base:=msg.stream.Read(4); + limit:=msg.stream.Read(4); + New(prv[chan]); + ch^.Ack; + with prv[chan]^ do begin + channel:=chan; + aggr:=@self; + Init(filehash,basehi,base,limit); + end; +end; + +procedure tAggr.OnCont; + var pprv:byte; + begin + pprv:=cprv; + repeat + repeat + if cprv>=length(prv) then cprv:=0 else inc(cprv); + if cprv=pprv then begin + idle:=true; + Shedule(15000,@IdleTimeout); + exit; + end; + until prv[cprv].u>0; + {} + until tcs.txLastSize>0; +end; + procedure tAggr.SendTestReply; var s:tMemoryStream; begin + writeln('upmgr: test'); s.Init(GetMem(56),0,56); ch^.AddHeaders(s); s.WriteByte(98); @@ -85,37 +126,64 @@ procedure tAggr.SendTestReply; end; procedure tAggr.Init(var nchat:tChat; msg:tSMsg); + var i:byte; begin + writeln('upmgr: init'); next:=Peers; prev:=nil; if assigned(Peers) then Peers^.prev:=@self; Peers:=@self; ch:=@nchat; tcs.Init(msg.source^); - //tcs.CanSend:=@OnCont; - SetLength(prv,2); - prv[0].u:=0; - prv[1].u:=0; + tcs.CanSend:=@OnCont; + tcs.maxTimeout:=8; + tcs.OnTimeout:=@TCTimeout; + for i:=0 to high(prv) do prv[i]:=nil; + cprv:=0; ch^.Callback:=@OnMsg; - //ch^.TMHook + ch^.TMHook:=@CHTimeout; writeln('upmgr: send ack to init'); ch^.Ack; - state:=0; + idle:=true; + Shedule(15000,@IdleTimeout); end; -procedure tAggr.Done; +procedure tAggr.IdleTimeout; + begin if not idle then exit; + writeln('Idle Timeout'); + ForceClose end; +procedure tAggr.TCTimeout; + begin + writeln('TCTimeout'); + ForceClose end; +procedure tAggr.CHTimeout(willwait:LongWOrd); + begin if willwait<30000 then exit; + writeln('ChatTimeout'); + ForceClose end; + +procedure tAggr.ForceClose; var s:tMemoryStream; begin - {s.Init(GetMem(56),0,56); + writeln('upmgr: force close'); + s.Init(GetMem(56),0,56); ch^.AddHeaders(s); s.WriteByte(opcode.upClose); s.WriteByte(22); - ch^.send(s);} + try + ch^.send(s); + except end; + Done; {fixme sheduler} + FreeMem(@self,sizeof(self)); +end; + +procedure tAggr.Done; + begin + writeln('upmgr: close'); ch^.Close; tcs.Done; + UnShedule(@IdleTimeout); if assigned(prev) then prev^.next:=next else Peers:=next; if assigned(next) then next^.prev:=prev; - state:=$FF; end; function FindAggr({const} addr:tNetAddr): tAggr_ptr; @@ -134,11 +202,11 @@ procedure ChatHandler(var nchat:tChat; msg:tSMsg); {check dup} dup:=FindAggr(msg.source^); if assigned(dup) then begin + Dup^.ForceClose; Dup^.Done; end else begin New(dup); end; - writeln('upmgr: init'); Dup^.Init(nchat,msg); end; -- 2.11.4.GIT