Stream Init helper to Chat.
[brdnet.git] / Chat.pas
blob075d1305ae959ad692b1e188fdf3fc142b4ba60e
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 TMhook : procedure(willwait:LongWord ) of object;
18 DisposeHook: procedure of object; {called instead of freeing self}
19 procedure Init(const iremote:tNetAddr);
20 procedure AddHeaders(var s:tMemoryStream);
21 procedure StreamInit(var s:tMemoryStream; l:word);
22 procedure Send(s:tMemoryStream);
23 {the stream can be invalidated, but the buffer must not be modified or freed}
24 procedure Ack;
25 procedure Close;
26 private
27 txPk:pointer; txLen:word; {last sent, not acked msg}
28 txTime:tDateTime;
29 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
30 procedure Done;
31 procedure Resend;
32 procedure OnReply(msg:tSMsg);
33 end;
35 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
36 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
38 { download manager create FileRequest
39 File Request open chat session to server
40 upmgr accepts chat and send reply
41 FileRequest acks, chat is then closed after TimeWait
42 upmgr starts TC transfer
43 transfer finished, upmgr send new Chat to FileRequest
44 FileRequest acks, chat is closed on both ends
45 FileRequest can open new chat if blocks are missing
47 => chat msgs must be created with New, disposed by Chat
48 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
51 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
53 IMPLEMENTATION
54 uses SysUtils;
55 procedure tChat.Init(const iremote:tNetAddr);
56 begin
57 remote:=iremote;
58 opcode:=128;
59 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
60 InitFrom(remote,opcode);
61 end;
62 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
63 begin
64 remote:=iremote;
65 opcode:=iopcode;
66 SetMsgHandler(opcode,remote,@OnReply);
67 txSeq:=0;
68 rxSeq:=0;
69 rxAcked:=true; {to not ack pk 0}
70 closed:=false;
71 txPk:=nil;
72 txLen:=0;
73 callback:=nil;
74 TMhook:=nil;
75 DisposeHook:=nil;
76 RTT:=200; {a default for timeouts}
77 txTime:=0;
78 end;
79 {struct
80 opcode:byte
81 seq:2
82 ack_seq:2
83 data:xx
86 procedure tCHat.AddHeaders(var s:tMemoryStream);
87 begin s.skip(5) end;
88 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
89 begin
90 s.Init(GetMem(l+5),0,l+5);
91 AddHeaders(s);
92 end;
94 procedure tChat.Send(s:tMemoryStream);
95 begin
96 assert(txLen=0);
97 //assert(assigned(callback));
98 Inc(txSeq);
99 s.Seek(0);
100 s.WriteByte(opcode);
101 s.WriteWord(txSeq,2);
102 if not rxAcked then begin
103 s.WriteWord(rxSeq,2);
104 rxAcked:=true;
105 end else s.WriteWord(0,2);
106 txPk:=s.base;
107 txLen:=s.Length;
108 ServerLoop.SendMessage(txPk^,txLen,remote);
109 ServerLoop.Shedule(RTT*2,@Resend);
110 txTime:=Now;
111 end;
113 procedure tChat.Ack;
114 var s:tMemoryStream;
115 begin
116 if not rxAcked then begin
117 s.Init(GetMem(5),0,5);
118 s.WriteByte(opcode);
119 s.WriteWord(0,2);
120 s.WriteWord(rxSeq,2);
121 ServerLoop.SendMessage(s.base^,s.length,remote);
122 FreeMem(s.base,s.length);
123 rxAcked:=true;
124 end;
125 end;
127 procedure tChat.Close;
128 begin
129 assert(not closed);
130 Ack;
131 closed:=true;
132 if txLen=0 {no packets in flight} then begin
133 Shedule(5000{todo},@Done); {wait for something lost}
134 callback:=nil; {avoid calling}
135 tmhook:=nil;
136 end;
137 end;
139 procedure tChat.Done;
140 begin
141 {called from sheduler, Done is unsheduled, Resend is not sheduled since ack was received when Done was sheduled}
142 if txLen>0 then FreeMem(txPk,txLen);
143 SetMsgHandler(opcode,remote,nil);
144 if assigned(DisposeHook) then DisposeHook
145 else FreeMem(@self,sizeof(self));
146 end;
148 procedure tChat.Resend;
149 {timeout waiting for ack}
150 begin
151 {resend and reshedule}
152 Assert(txLen>0);
153 txTime:=0;
154 RTT:=RTT*2;
155 if assigned(TMhook) and (not closed) then begin
156 TMhook(RTT);
157 if closed then begin
158 Done; {if hook decided to close then abort}
159 exit;
160 end;
161 end;
162 if (RTT>16000) and closed then begin
163 Done {give up}
164 end else begin
165 {finally resend the msg}
166 ServerLoop.SendMessage(txPk^,txLen,remote);
167 ServerLoop.Shedule(RTT,@Resend);
168 end;
169 end;
171 procedure tChat.OnReply(msg:tSMsg);
172 var seq,aseq:Word;
173 var s:tMemoryStream;
174 begin
175 msg.stream.skip(1{opcode});
176 seq:=msg.stream.ReadWord(2);
177 aseq:=msg.stream.ReadWord(2);
178 if aseq>0 then {ack of our msg} begin
179 if (aseq=txSeq)and(txLen>0) {it is current} then begin
180 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
181 FreeMem(txPk,txLen);
182 TxLen:=0;
183 txPk:=nil;
184 if assigned(callback) then callback(msg,false);
185 ServerLoop.UnShedule(@Resend);
186 end else {write(' old-ack')it is ack of old data, do nothing};
187 end;
188 if seq>0 then {some data} begin
189 if seq<=rxSeq then {remote didnt get our ack} begin
190 s.Init(GetMem(5),0,5);
191 s.WriteByte(opcode);
192 s.WriteWord(0,2);
193 s.WriteWord(rxSeq,2);
194 ServerLoop.SendMessage(s.base^,s.length,remote);
195 FreeMem(s.base,s.length);
196 if seq=rxSeq then rxacked:=true;
197 end else begin
198 {some useful data!}
199 rxSeq:=seq;
200 rxAcked:=false;
201 if assigned(callback) then callback(msg,true);
202 end;
203 end;
204 end;
206 var ChatHandlers: array [1..32] of tChatHandler;
208 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
209 begin
210 assert(ChatHandlers[initcode]=nil);
211 ChatHandlers[initcode]:=handler;
212 end;
214 procedure OnHiMsg(msg:tSMsg);
215 {new chat was received!}
216 var opcode:byte;
217 var seq,aseq:word;
218 var hnd:tChatHandler;
219 var nchat:^tChat;
220 var ix:byte;
221 begin
222 opcode:=msg.stream.ReadByte;
223 assert(not IsMsgHandled(opcode,msg.source^));
224 seq:=msg.stream.ReadWord(2);
225 aseq:=msg.stream.ReadWord(2);
226 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
227 ix:=msg.stream.ReadByte;
228 if (ix<1)or(ix>high(ChatHandlers)) then exit;
229 hnd:=ChatHandlers[ix];
230 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
231 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
232 nchat:=GetMem(sizeof(tChat));
233 nchat^.InitFrom(msg.Source^,opcode);
234 nchat^.rxacked:=false;
235 nchat^.rxSeq:=1;
236 hnd(nchat^,msg);
237 end;
239 BEGIN
240 FillChar(ChatHandlers,sizeof(chathandlers),0);
241 ServerLoop.SetHiMsgHandler(@OnHiMsg);
242 END.