3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 TMhook
: procedure(willwait
:LongWord
) of object;
18 DisposeHook
: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote
:tNetAddr
);
20 procedure AddHeaders(var s
:tMemoryStream
);
21 procedure Send(s
:tMemoryStream
);
22 {the stream can be invalidated, but the buffer must not be modified or freed}
26 txPk
:pointer; txLen
:word; {last sent, not acked msg}
28 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
31 procedure OnReply(msg
:tSMsg
);
34 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
35 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
37 { download manager create FileRequest
38 File Request open chat session to server
39 upmgr accepts chat and send reply
40 FileRequest acks, chat is then closed after TimeWait
41 upmgr starts TC transfer
42 transfer finished, upmgr send new Chat to FileRequest
43 FileRequest acks, chat is closed on both ends
44 FileRequest can open new chat if blocks are missing
46 => chat msgs must be created with New, disposed by Chat
47 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
50 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
54 procedure tChat
.Init(const iremote
:tNetAddr
);
58 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
59 InitFrom(remote
,opcode
);
61 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
65 SetMsgHandler(opcode
,remote
,@OnReply
);
68 rxAcked
:=true; {to not ack pk 0}
75 RTT
:=200; {a default for timeouts}
85 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
88 procedure tChat
.Send(s
:tMemoryStream
);
91 //assert(assigned(callback));
96 if not rxAcked
then begin
99 end else s
.WriteWord(0,2);
102 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
103 ServerLoop
.Shedule(RTT
*2,@Resend
);
110 if not rxAcked
then begin
111 s
.Init(GetMem(5),0,5);
114 s
.WriteWord(rxSeq
,2);
115 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
116 FreeMem(s
.base
,s
.length
);
121 procedure tChat
.Close
;
126 if txLen
=0 {no packets in flight} then begin
127 Shedule(5000{todo},@Done
); {wait for something lost}
128 callback
:=nil; {avoid calling}
133 procedure tChat
.Done
;
135 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
136 if txLen
>0 then FreeMem(txPk
,txLen
);
137 SetMsgHandler(opcode
,remote
,nil);
138 if assigned(DisposeHook
) then DisposeHook
139 else FreeMem(@self
,sizeof(self
));
142 procedure tChat
.Resend
;
143 {timeout waiting for ack}
145 {resend and reshedule}
149 if assigned(TMhook
) and (not closed
) then begin
152 Done
; {if hook decided to close then abort}
156 if (RTT
>16000) and closed
then begin
159 {finally resend the msg}
160 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
161 ServerLoop
.Shedule(RTT
,@Resend
);
165 procedure tChat
.OnReply(msg
:tSMsg
);
169 msg
.stream
.skip(1{opcode});
170 seq
:=msg
.stream
.ReadWord(2);
171 aseq
:=msg
.stream
.ReadWord(2);
172 if aseq
>0 then {ack of our msg} begin
173 if (aseq
=txSeq
)and(txLen
>0) {it is current} then begin
174 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
178 if assigned(callback
) then callback(msg
,false);
179 ServerLoop
.UnShedule(@Resend
);
180 end else {write(' old-ack')it is ack of old data, do nothing};
182 if seq
>0 then {some data} begin
183 if seq
<=rxSeq
then {remote didnt get our ack} begin
184 s
.Init(GetMem(5),0,5);
187 s
.WriteWord(rxSeq
,2);
188 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
189 FreeMem(s
.base
,s
.length
);
190 if seq
=rxSeq
then rxacked
:=true;
195 if assigned(callback
) then callback(msg
,true);
200 var ChatHandlers
: array [1..32] of tChatHandler
;
202 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
204 assert(ChatHandlers
[initcode
]=nil);
205 ChatHandlers
[initcode
]:=handler
;
208 procedure OnHiMsg(msg
:tSMsg
);
209 {new chat was received!}
212 var hnd
:tChatHandler
;
216 opcode
:=msg
.stream
.ReadByte
;
217 assert(not IsMsgHandled(opcode
,msg
.source
^));
218 seq
:=msg
.stream
.ReadWord(2);
219 aseq
:=msg
.stream
.ReadWord(2);
220 if (seq
<>1)or(aseq
>0) then exit
; {invalid initial state}
221 ix
:=msg
.stream
.ReadByte
;
222 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
223 hnd
:=ChatHandlers
[ix
];
224 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
225 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
226 nchat
:=GetMem(sizeof(tChat
));
227 nchat
^.InitFrom(msg
.Source
^,opcode
);
228 nchat
^.rxacked
:=false;
234 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
235 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);