3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 TMhook
: procedure(willwait
:LongWord
) of object;
18 DisposeHook
: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote
:tNetAddr
);
20 procedure AddHeaders(var s
:tMemoryStream
);
21 procedure StreamInit(var s
:tMemoryStream
; l
:word);
22 procedure Send(s
:tMemoryStream
);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
27 txPk
:pointer; txLen
:word; {last sent, not acked msg}
29 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
32 procedure OnReply(msg
:tSMsg
);
35 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
36 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
55 procedure tChat
.Init(const iremote
:tNetAddr
);
59 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
60 InitFrom(remote
,opcode
);
62 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
66 SetMsgHandler(opcode
,remote
,@OnReply
);
69 rxAcked
:=true; {to not ack pk 0}
76 RTT
:=200; {a default for timeouts}
86 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
88 procedure tChat
.StreamInit(var s
:tMemoryStream
; l
:word);
90 s
.Init(GetMem(l
+5),0,l
+5);
94 procedure tChat
.Send(s
:tMemoryStream
);
97 //assert(assigned(callback));
101 s
.WriteWord(txSeq
,2);
102 if not rxAcked
then begin
103 s
.WriteWord(rxSeq
,2);
105 end else s
.WriteWord(0,2);
108 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
109 ServerLoop
.Shedule(RTT
*2,@Resend
);
116 if not rxAcked
then begin
117 s
.Init(GetMem(5),0,5);
120 s
.WriteWord(rxSeq
,2);
121 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
122 FreeMem(s
.base
,s
.length
);
127 procedure tChat
.Close
;
132 if txLen
=0 {no packets in flight} then begin
133 Shedule(5000{todo},@Done
); {wait for something lost}
134 callback
:=nil; {avoid calling}
139 procedure tChat
.Done
;
141 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
142 if txLen
>0 then FreeMem(txPk
,txLen
);
143 SetMsgHandler(opcode
,remote
,nil);
144 if assigned(DisposeHook
) then DisposeHook
145 else FreeMem(@self
,sizeof(self
));
148 procedure tChat
.Resend
;
149 {timeout waiting for ack}
151 {resend and reshedule}
155 if assigned(TMhook
) and (not closed
) then begin
158 Done
; {if hook decided to close then abort}
162 if (RTT
>16000) and closed
then begin
165 {finally resend the msg}
166 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
167 ServerLoop
.Shedule(RTT
,@Resend
);
171 procedure tChat
.OnReply(msg
:tSMsg
);
175 msg
.stream
.skip(1{opcode});
176 seq
:=msg
.stream
.ReadWord(2);
177 aseq
:=msg
.stream
.ReadWord(2);
178 if aseq
>0 then {ack of our msg} begin
179 if (aseq
=txSeq
)and(txLen
>0) {it is current} then begin
180 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
184 if assigned(callback
) then callback(msg
,false);
185 ServerLoop
.UnShedule(@Resend
);
186 end else {write(' old-ack')it is ack of old data, do nothing};
188 if seq
>0 then {some data} begin
189 if seq
<=rxSeq
then {remote didnt get our ack} begin
190 s
.Init(GetMem(5),0,5);
193 s
.WriteWord(rxSeq
,2);
194 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
195 FreeMem(s
.base
,s
.length
);
196 if seq
=rxSeq
then rxacked
:=true;
201 if assigned(callback
) then callback(msg
,true);
206 var ChatHandlers
: array [1..32] of tChatHandler
;
208 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
210 assert(ChatHandlers
[initcode
]=nil);
211 ChatHandlers
[initcode
]:=handler
;
214 procedure OnHiMsg(msg
:tSMsg
);
215 {new chat was received!}
218 var hnd
:tChatHandler
;
222 opcode
:=msg
.stream
.ReadByte
;
223 assert(not IsMsgHandled(opcode
,msg
.source
^));
224 seq
:=msg
.stream
.ReadWord(2);
225 aseq
:=msg
.stream
.ReadWord(2);
226 if (seq
<>1)or(aseq
>0) then exit
; {invalid initial state}
227 ix
:=msg
.stream
.ReadByte
;
228 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
229 hnd
:=ChatHandlers
[ix
];
230 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
231 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
232 nchat
:=GetMem(sizeof(tChat
));
233 nchat
^.InitFrom(msg
.Source
^,opcode
);
234 nchat
^.rxacked
:=false;
240 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
241 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);