3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 TMhook
: procedure(willwait
:LongWord
) of object;
18 DisposeHook
: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote
:tNetAddr
);
20 procedure AddHeaders(var s
:tMemoryStream
);
21 procedure StreamInit(var s
:tMemoryStream
; l
:word);
22 procedure Send(s
:tMemoryStream
);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
27 txPk
:pointer; txLen
:word; {last sent, not acked msg}
29 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
32 procedure OnReply(msg
:tSMsg
);
35 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
36 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
55 procedure tChat
.Init(const iremote
:tNetAddr
);
58 opcode
:=128+Random(128);
59 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
60 InitFrom(remote
,opcode
);
62 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
66 SetMsgHandler(opcode
,remote
,@OnReply
);
69 rxAcked
:=true; {to not ack pk 0}
76 RTT
:=200; {a default for timeouts}
86 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
88 procedure tChat
.StreamInit(var s
:tMemoryStream
; l
:word);
90 s
.Init(GetMem(l
+5),0,l
+5);
94 procedure tChat
.Send(s
:tMemoryStream
);
100 //assert(assigned(callback));
104 s
.WriteWord(txSeq
,2);
105 if not rxAcked
then begin
106 s
.WriteWord(rxSeq
,2);
108 end else s
.WriteWord(0,2);
111 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
112 ServerLoop
.Shedule(RTT
*2,@Resend
);
119 if not rxAcked
then begin
120 s
.Init(GetMem(5),0,5);
123 s
.WriteWord(rxSeq
,2);
124 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
125 FreeMem(s
.base
,s
.length
);
130 procedure tChat
.Close
;
135 callback
:=nil; {avoid calling}
137 //writeln('Chat: closing');
138 if txLen
=0 {no packets in flight} then begin
139 Shedule(15000{todo},@Done
); {wait for something lost}
143 procedure tChat
.Done
;
145 if txLen
>0 then FreeMem(txPk
,txLen
);
146 SetMsgHandler(opcode
,remote
,nil);
147 if assigned(DisposeHook
) then DisposeHook
148 else FreeMem(@self
,sizeof(self
));
149 //writeln('Chat: closed');
152 procedure tChat
.Resend
;
153 {timeout waiting for ack}
155 {resend and reshedule}
156 if txLen
=0 then exit
;
158 if RTT
<1 then RTT
:=2;
160 if assigned(TMhook
) and (not closed
) then begin
163 Done
; {if hook decided to close then abort}
167 if closed
and (RTT
<400) then RTT
:=400;
168 if (RTT
>=5000) and closed
then begin
171 {finally resend the msg}
172 //writeln('Chat: retry');
173 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
174 ServerLoop
.Shedule(RTT
,@Resend
);
178 procedure tChat
.OnReply(msg
:tSMsg
);
182 msg
.stream
.skip(1{opcode});
183 seq
:=msg
.stream
.ReadWord(2);
184 aseq
:=msg
.stream
.ReadWord(2);
185 if aseq
>0 then {ack of our msg} begin
186 if (aseq
=txSeq
)and(txLen
>0) {it is current} then begin
187 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
190 if Closed
then Shedule(5,@Done
);
193 if assigned(callback
) then callback(msg
,false);
194 end else {write(' old-ack')it is ack of old data, do nothing};
196 if seq
>0 then {some data} begin
197 if seq
<=rxSeq
then {remote didnt get our ack} begin
198 s
.Init(GetMem(5),0,5);
201 s
.WriteWord(rxSeq
,2);
202 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
203 FreeMem(s
.base
,s
.length
);
204 if seq
=rxSeq
then rxacked
:=true;
209 if assigned(callback
) then callback(msg
,true);
214 var ChatHandlers
: array [1..32] of tChatHandler
;
216 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
218 assert(ChatHandlers
[initcode
]=nil);
219 ChatHandlers
[initcode
]:=handler
;
222 procedure OnHiMsg(msg
:tSMsg
);
223 {new chat was received!}
226 var hnd
:tChatHandler
;
230 opcode
:=msg
.stream
.ReadByte
;
231 assert(not IsMsgHandled(opcode
,msg
.source
^));
232 seq
:=msg
.stream
.ReadWord(2);
233 aseq
:=msg
.stream
.ReadWord(2);
234 if (seq
<>1)or(aseq
>0) then exit
; {invalid initial state}
235 ix
:=msg
.stream
.ReadByte
;
236 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
237 hnd
:=ChatHandlers
[ix
];
238 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
239 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
240 nchat
:=GetMem(sizeof(tChat
));
241 nchat
^.InitFrom(msg
.Source
^,opcode
);
242 nchat
^.rxacked
:=false;
248 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
249 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);