3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 TMhook
: procedure(willwait
:LongWord
) of object;
18 procedure Init(const iremote
:tNetAddr
);
19 procedure AddHeaders(var s
:tMemoryStream
);
20 procedure Send(s
:tMemoryStream
);
21 {the stream can be invalidated, but the buffer must not be modified or freed}
25 txPk
:pointer; txLen
:word; {last sent, not acked msg}
27 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
30 procedure OnReply(msg
:tSMsg
);
33 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
34 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
36 { download manager create FileRequest
37 File Request open chat session to server
38 upmgr accepts chat and send reply
39 FileRequest acks, chat is then closed after TimeWait
40 upmgr starts TC transfer
41 transfer finished, upmgr send new Chat to FileRequest
42 FileRequest acks, chat is closed on both ends
43 FileRequest can open new chat if blocks are missing
45 => chat msgs must be created with New, disposed by Chat
46 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
49 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
53 procedure tChat
.Init(const iremote
:tNetAddr
);
57 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
58 InitFrom(remote
,opcode
);
60 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
65 rxAcked
:=true; {to not ack pk 0}
71 RTT
:=500; {a default for timeouts}
81 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
84 procedure tChat
.Send(s
:tMemoryStream
);
87 assert(assigned(callback
));
92 if not rxAcked
then begin
95 end else s
.WriteWord(0,2);
98 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
99 ServerLoop
.Shedule(RTT
*2,@Resend
);
106 if not rxAcked
then begin
107 s
.Init(GetMem(5),0,5);
110 s
.WriteWord(rxSeq
,2);
111 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
112 FreeMem(s
.base
,s
.length
);
117 procedure tChat
.Close
;
121 if txLen
=0 {no packets in flight} then begin
122 Shedule(3000{todo},@Done
); {wait for something lost}
123 callback
:=nil; {avoid calling}
127 procedure tChat
.Done
;
129 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
131 SetMsgHandler(opcode
,remote
,nil);
132 FreeMem(@self
,sizeof(self
));
135 procedure tChat
.Resend
;
136 {timeout waiting for ack}
138 {resend and reshedule}
141 if assigned(TMhook
) and (not closed
) then begin
144 Done
; {if hook decided to close then abort}
148 if (RTT
>32000) and closed
150 else if txLen
>0 then begin
151 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
152 ServerLoop
.Shedule(RTT
*2,@Resend
);
156 procedure tChat
.OnReply(msg
:tSMsg
);
160 msg
.stream
.skip(1{opcode});
161 seq
:=msg
.stream
.ReadWord(2);
162 aseq
:=msg
.stream
.ReadWord(2);
163 if seq
>0 then {some data} begin
164 if seq
<=rxSeq
then {remote didnt get our ack} begin
165 s
.Init(GetMem(5),0,5);
168 s
.WriteWord(rxSeq
,2);
169 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
170 FreeMem(s
.base
,s
.length
);
171 if seq
=rxSeq
then rxacked
:=true;
179 if aseq
>0 then {ack of our msg} begin
180 if (aseq
=rxSeq
)and(txLen
>0) {it is current} then begin
181 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
186 end else {it is ack of old data, do nothing};
190 var ChatHandlers
: array [1..32] of tChatHandler
;
192 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
194 assert(ChatHandlers
[initcode
]=nil);
195 ChatHandlers
[initcode
]:=handler
;
198 procedure OnHiMsg(msg
:tSMsg
);
199 {new chat was received!}
202 var hnd
:tChatHandler
;
206 opcode
:=msg
.stream
.ReadByte
;
207 assert(not IsMsgHandled(opcode
,msg
.source
^));
208 seq
:=msg
.stream
.ReadWord(2);
209 aseq
:=msg
.stream
.ReadWord(2);
210 if (seq
<>1)and(aseq
>0) then exit
; {invalid initial state}
211 ix
:=msg
.stream
.ReadByte
;
212 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
213 hnd
:=ChatHandlers
[ix
];
214 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
215 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
217 nchat
^.InitFrom(msg
.Source
^,opcode
);
222 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
223 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);