3 Implement two-way realiable acked lock-step protocol
6 uses NetAddr
,ServerLoop
,MemStream
;
16 Callback
: procedure(msg
:tSMsg
; data
:boolean) of object; {client must maintain active chats}
17 OnTimeout
: procedure of object;
18 OnDispose
: procedure of object;
19 procedure Init(const iremote
:tNetAddr
);
20 procedure SetTimeout(acktm
,repltm
:LongInt);
21 procedure AddHeaders(var s
:tMemoryStream
);
22 procedure StreamInit(var s
:tMemoryStream
; l
:word);
23 procedure Send(s
:tMemoryStream
);
24 {the stream can be invalidated, but the buffer must not be modified or freed}
28 txPk
:pointer; txLen
:word; {last sent, not acked msg}
30 tmAck
,tmReply
:LongWord
;{ms}
31 procedure InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
34 procedure OnReply(msg
:tSMsg
);
35 procedure ReplyTimeout
;
38 type tChatHandler
=procedure(var nchat
:tChat
; msg
:tSMsg
);
39 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
41 { download manager create FileRequest
42 File Request open chat session to server
43 upmgr accepts chat and send reply
44 FileRequest acks, chat is then closed after TimeWait
45 upmgr starts TC transfer
46 transfer finished, upmgr send new Chat to FileRequest
47 FileRequest acks, chat is closed on both ends
48 FileRequest can open new chat if blocks are missing
50 => chat msgs must be created with New, disposed by Chat
51 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
54 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
58 procedure tChat
.Init(const iremote
:tNetAddr
);
61 opcode
:=128+Random(128);
62 while ServerLoop
.IsMsgHandled(opcode
,remote
) do inc(opcode
);
63 InitFrom(remote
,opcode
);
65 procedure tChat
.InitFrom(const iremote
:tNetAddr
; iopcode
:byte);
69 SetMsgHandler(opcode
,remote
,@OnReply
);
72 rxAcked
:=true; {to not ack pk 0}
79 RTT
:=200; {a default for timeouts}
91 procedure tCHat
.AddHeaders(var s
:tMemoryStream
);
93 procedure tChat
.StreamInit(var s
:tMemoryStream
; l
:word);
95 s
.Init(GetMem(l
+5),0,l
+5);
98 procedure tChat
.SetTimeout(acktm
,repltm
:LongInt);
100 assert(assigned(OnTimeout
));
105 procedure tChat
.Send(s
:tMemoryStream
);
107 if txLen
>0 then begin
111 //assert(assigned(callback));
115 s
.WriteWord(txSeq
,2);
116 if not rxAcked
then begin
117 s
.WriteWord(rxSeq
,2);
119 end else s
.WriteWord(0,2);
122 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
123 ServerLoop
.Shedule(RTT
*2,@Resend
);
130 if not rxAcked
then begin
131 s
.Init(GetMem(5),0,5);
134 s
.WriteWord(rxSeq
,2);
135 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
136 FreeMem(s
.base
,s
.length
);
138 if assigned(OnTimeout
) and (tmReply
>0) then Shedule(tmReply
,@ReplyTimeout
);
142 procedure tChat
.Close
;
147 callback
:=nil; {avoid calling}
149 UnShedule(@ReplyTimeout
); {fuck it}
150 //writeln('Chat: closing');
151 if txLen
=0 {no packets in flight} then begin
152 Shedule(5000{todo},@Done
); {wait for something lost}
156 procedure tChat
.Done
;
158 if txLen
>0 then FreeMem(txPk
,txLen
);
159 SetMsgHandler(opcode
,remote
,nil);
161 UnShedule(@ReplyTimeout
);
162 if assigned(OnDispose
) then OnDispose
163 else FreeMem(@self
,sizeof(self
));
164 //writeln('Chat: closed');
167 procedure tChat
.Resend
;
168 {timeout waiting for ack}
170 {check for timeout and closed}
171 if RTT
<1 then RTT
:=2; RTT
:=RTT
*2;
172 if closed
and (RTT
>5000) then begin
176 if (not closed
) and (tmAck
>0) and (RTT
>tmAck
) then begin
177 if assigned(ontimeout
) then OnTimeout
;
182 //writeln('Chat: retry');
183 ServerLoop
.SendMessage(txPk
^,txLen
,remote
);
186 ServerLoop
.Shedule(RTT
,@Resend
);
189 procedure tChat
.OnReply(msg
:tSMsg
);
193 msg
.stream
.skip(1{opcode});
194 seq
:=msg
.stream
.ReadWord(2);
195 aseq
:=msg
.stream
.ReadWord(2);
196 if aseq
>0 then {ack of our msg} begin
197 if (aseq
=txSeq
)and(txLen
>0) {it is current} then begin
198 if txTime
>0 then RTT
:=Round((Now
-txTime
)*MsecsPerDay
);
201 if Closed
then Shedule(5,@Done
);{wtf?}
204 if assigned(callback
) then callback(msg
,false);
205 if assigned(OnTimeout
) and (tmReply
>0) then Shedule(tmReply
,@ReplyTimeout
);
206 end else {write(' old-ack')it is ack of old data, do nothing};
208 if seq
>0 then {some data} begin
209 if seq
<=rxSeq
then {remote didnt get our ack} begin
210 s
.Init(GetMem(5),0,5);
213 s
.WriteWord(rxSeq
,2);
214 ServerLoop
.SendMessage(s
.base
^,s
.length
,remote
);
215 FreeMem(s
.base
,s
.length
);
216 if seq
=rxSeq
then rxacked
:=true;
221 UnShedule(@ReplyTimeout
);
222 if assigned(callback
) then callback(msg
,true);
227 procedure tChat
.ReplyTimeout
;
229 assert(assigned(OnTimeout
));
234 var ChatHandlers
: array [1..32] of tChatHandler
;
236 procedure SetChatHandler(initcode
:byte; handler
:tChatHandler
);
238 assert(ChatHandlers
[initcode
]=nil);
239 ChatHandlers
[initcode
]:=handler
;
242 procedure OnHiMsg(msg
:tSMsg
);
243 {new chat was received!}
246 var hnd
:tChatHandler
;
250 opcode
:=msg
.stream
.ReadByte
;
251 assert(not IsMsgHandled(opcode
,msg
.source
^));
252 seq
:=msg
.stream
.ReadWord(2);
253 aseq
:=msg
.stream
.ReadWord(2);
254 if (seq
<>1)or(aseq
>0) then exit
; {invalid initial state}
255 ix
:=msg
.stream
.ReadByte
;
256 if (ix
<1)or(ix
>high(ChatHandlers
)) then exit
;
257 hnd
:=ChatHandlers
[ix
];
258 if not assigned(hnd
) then raise eXception
.Create('No handler for initcode '+IntToStr(ix
));
259 msg
.stream
.seek(msg
.stream
.position
-1);{unskip the initcode}
260 nchat
:=GetMem(sizeof(tChat
));
261 nchat
^.InitFrom(msg
.Source
^,opcode
);
262 nchat
^.rxacked
:=false;
268 FillChar(ChatHandlers
,sizeof(chathandlers
),0);
269 ServerLoop
.SetHiMsgHandler(@OnHiMsg
);