Static DHT bootstrap from cmdline opt -boot.
[brdnet.git] / Chat.pas
bloba8e4df42a4452dfc1faade784fffd72573697b9a
1 unit Chat;
3 Implement two-way realiable acked lock-step protocol
5 INTERFACE
6 uses NetAddr,ServerLoop,MemStream;
8 type tChat=object
9 remote:tNetAddr;
10 opcode:byte;
11 txSeq:Word;
12 rxSeq:Word;
13 rxAcked:boolean;
14 closed:boolean;
15 RTT:LongWord;{in ms}
16 Callback: procedure(msg:tSMsg; data:boolean) of object; {client must maintain active chats}
17 OnTimeout: procedure of object;
18 OnDispose: procedure of object;
19 procedure Init(const iremote:tNetAddr);
20 procedure SetTimeout(acktm,repltm:LongInt);
21 procedure AddHeaders(var s:tMemoryStream);
22 procedure StreamInit(var s:tMemoryStream; l:word);
23 procedure Send(s:tMemoryStream);
24 {the stream can be invalidated, but the buffer must not be modified or freed}
25 procedure Ack;
26 procedure Close;
27 private
28 txPk:pointer; txLen:word; {last sent, not acked msg}
29 txTime:tDateTime;
30 tmAck,tmReply:LongWord;{ms}
31 procedure InitFrom(const iremote:tNetAddr; iopcode:byte);
32 procedure Done;
33 procedure Resend;
34 procedure OnReply(msg:tSMsg);
35 procedure ReplyTimeout;
36 end;
38 type tChatHandler=procedure(var nchat:tChat; msg:tSMsg);
39 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
41 { download manager create FileRequest
42 File Request open chat session to server
43 upmgr accepts chat and send reply
44 FileRequest acks, chat is then closed after TimeWait
45 upmgr starts TC transfer
46 transfer finished, upmgr send new Chat to FileRequest
47 FileRequest acks, chat is closed on both ends
48 FileRequest can open new chat if blocks are missing
50 => chat msgs must be created with New, disposed by Chat
51 => there is TimeWait, no references are to the Chat, except Sheduler, it Disposes itself.
54 { Chats are the HiMsg. Use hash table from ServerLoop, works for HiMsg too. }
56 IMPLEMENTATION
57 uses SysUtils;
58 procedure tChat.Init(const iremote:tNetAddr);
59 begin
60 remote:=iremote;
61 opcode:=128+Random(128);
62 while ServerLoop.IsMsgHandled(opcode,remote) do inc(opcode);
63 InitFrom(remote,opcode);
64 end;
65 procedure tChat.InitFrom(const iremote:tNetAddr; iopcode:byte);
66 begin
67 remote:=iremote;
68 opcode:=iopcode;
69 SetMsgHandler(opcode,remote,@OnReply);
70 txSeq:=0;
71 rxSeq:=0;
72 rxAcked:=true; {to not ack pk 0}
73 closed:=false;
74 txPk:=nil;
75 txLen:=0;
76 Callback:=nil;
77 OnTimeout:=nil;
78 OnDispose:=nil;
79 RTT:=200; {a default for timeouts}
80 txTime:=0;
81 tmAck:=0;
82 tmReply:=0;
83 end;
84 {struct
85 opcode:byte
86 seq:2
87 ack_seq:2
88 data:xx
91 procedure tCHat.AddHeaders(var s:tMemoryStream);
92 begin s.skip(5) end;
93 procedure tChat.StreamInit(var s:tMemoryStream; l:word);
94 begin
95 s.Init(GetMem(l+5),0,l+5);
96 AddHeaders(s);
97 end;
98 procedure tChat.SetTimeout(acktm,repltm:LongInt);
99 begin
100 assert(assigned(OnTimeout));
101 tmAck:=acktm;
102 tmReply:=repltm;
103 end;
105 procedure tChat.Send(s:tMemoryStream);
106 begin
107 if txLen>0 then begin
108 FreeMem(txPk,txLen);
109 UnShedule(@Resend);
110 end;
111 //assert(assigned(callback));
112 Inc(txSeq);
113 s.Seek(0);
114 s.WriteByte(opcode);
115 s.WriteWord(txSeq,2);
116 if not rxAcked then begin
117 s.WriteWord(rxSeq,2);
118 rxAcked:=true;
119 end else s.WriteWord(0,2);
120 txPk:=s.base;
121 txLen:=s.Length;
122 ServerLoop.SendMessage(txPk^,txLen,remote);
123 ServerLoop.Shedule(RTT*2,@Resend);
124 txTime:=Now;
125 end;
127 procedure tChat.Ack;
128 var s:tMemoryStream;
129 begin
130 if not rxAcked then begin
131 s.Init(GetMem(5),0,5);
132 s.WriteByte(opcode);
133 s.WriteWord(0,2);
134 s.WriteWord(rxSeq,2);
135 ServerLoop.SendMessage(s.base^,s.length,remote);
136 FreeMem(s.base,s.length);
137 rxAcked:=true;
138 if assigned(OnTimeout) and (tmReply>0) then Shedule(tmReply,@ReplyTimeout);
139 end;
140 end;
142 procedure tChat.Close;
143 begin
144 assert(not closed);
145 Ack;
146 closed:=true;
147 callback:=nil; {avoid calling}
148 ontimeout:=nil;
149 UnShedule(@ReplyTimeout); {fuck it}
150 //writeln('Chat: closing');
151 if txLen=0 {no packets in flight} then begin
152 Shedule(5000{todo},@Done); {wait for something lost}
153 end;
154 end;
156 procedure tChat.Done;
157 begin
158 if txLen>0 then FreeMem(txPk,txLen);
159 SetMsgHandler(opcode,remote,nil);
160 UnShedule(@Resend);
161 UnShedule(@ReplyTimeout);
162 if assigned(OnDispose) then OnDispose
163 else FreeMem(@self,sizeof(self));
164 //writeln('Chat: closed');
165 end;
167 procedure tChat.Resend;
168 {timeout waiting for ack}
169 begin
170 {check for timeout and closed}
171 if RTT<1 then RTT:=2; RTT:=RTT*2;
172 if closed and (RTT>5000) then begin
173 Done;
174 exit
175 end;
176 if (not closed) and (tmAck>0) and (RTT>tmAck) then begin
177 if assigned(ontimeout) then OnTimeout;
178 Done;
179 exit
180 end;
181 {resend}
182 //writeln('Chat: retry');
183 ServerLoop.SendMessage(txPk^,txLen,remote);
184 txTime:=Now;
185 {reshedule}
186 ServerLoop.Shedule(RTT,@Resend);
187 end;
189 procedure tChat.OnReply(msg:tSMsg);
190 var seq,aseq:Word;
191 var s:tMemoryStream;
192 begin
193 msg.stream.skip(1{opcode});
194 seq:=msg.stream.ReadWord(2);
195 aseq:=msg.stream.ReadWord(2);
196 if aseq>0 then {ack of our msg} begin
197 if (aseq=txSeq)and(txLen>0) {it is current} then begin
198 if txTime>0 then RTT:=Round((Now-txTime)*MsecsPerDay);
199 FreeMem(txPk,txLen);
200 UnShedule(@Resend);
201 if Closed then Shedule(5,@Done);{wtf?}
202 TxLen:=0;
203 txPk:=nil;
204 if assigned(callback) then callback(msg,false);
205 if assigned(OnTimeout) and (tmReply>0) then Shedule(tmReply,@ReplyTimeout);
206 end else {write(' old-ack')it is ack of old data, do nothing};
207 end;
208 if seq>0 then {some data} begin
209 if seq<=rxSeq then {remote didnt get our ack} begin
210 s.Init(GetMem(5),0,5);
211 s.WriteByte(opcode);
212 s.WriteWord(0,2);
213 s.WriteWord(rxSeq,2);
214 ServerLoop.SendMessage(s.base^,s.length,remote);
215 FreeMem(s.base,s.length);
216 if seq=rxSeq then rxacked:=true;
217 end else begin
218 {some useful data!}
219 rxSeq:=seq;
220 rxAcked:=false;
221 UnShedule(@ReplyTimeout);
222 if assigned(callback) then callback(msg,true);
223 end;
224 end;
225 end;
227 procedure tChat.ReplyTimeout;
228 begin
229 assert(assigned(OnTimeout));
230 OnTimeout;
231 {...}
232 end;
234 var ChatHandlers: array [1..32] of tChatHandler;
236 procedure SetChatHandler(initcode:byte; handler:tChatHandler);
237 begin
238 assert(ChatHandlers[initcode]=nil);
239 ChatHandlers[initcode]:=handler;
240 end;
242 procedure OnHiMsg(msg:tSMsg);
243 {new chat was received!}
244 var opcode:byte;
245 var seq,aseq:word;
246 var hnd:tChatHandler;
247 var nchat:^tChat;
248 var ix:byte;
249 begin
250 opcode:=msg.stream.ReadByte;
251 assert(not IsMsgHandled(opcode,msg.source^));
252 seq:=msg.stream.ReadWord(2);
253 aseq:=msg.stream.ReadWord(2);
254 if (seq<>1)or(aseq>0) then exit; {invalid initial state}
255 ix:=msg.stream.ReadByte;
256 if (ix<1)or(ix>high(ChatHandlers)) then exit;
257 hnd:=ChatHandlers[ix];
258 if not assigned(hnd) then raise eXception.Create('No handler for initcode '+IntToStr(ix));
259 msg.stream.seek(msg.stream.position-1);{unskip the initcode}
260 nchat:=GetMem(sizeof(tChat));
261 nchat^.InitFrom(msg.Source^,opcode);
262 nchat^.rxacked:=false;
263 nchat^.rxSeq:=1;
264 hnd(nchat^,msg);
265 end;
267 BEGIN
268 FillChar(ChatHandlers,sizeof(chathandlers),0);
269 ServerLoop.SetHiMsgHandler(@OnHiMsg);
270 END.