Merge branch 'dev' into dht
[brdnet.git] / dht.pas
blob1258fc7f2d6877e0ce212fb0dcd9dca25682fc72
1 unit DHT;
3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
6 old>new,
7 new>dead
10 {used by: messages, fileshare}
12 INTERFACE
13 uses NetAddr,Store1;
14 type tPID=Store1.tFID;
15 var MyID:tPID;
16 procedure NodeBootstrap(const contact:tNetAddr);
18 IMPLEMENTATION
19 uses ServerLoop,MemStream,opcode;
21 type
22 tPeer=object
23 ID :tPID;
24 Addr :tNetAddr;
25 ReqDelta:word;
26 LastMsgFrom,
27 LastResFrom :tMTime;
28 end;
29 tBucket_ptr=^tBucket;
30 tBucket=object
31 Prefix: tPID;
32 Depth: byte;
33 peer: array [1..4] of tPeer;
34 ModifyTime: tMTime;
35 //ll: ^tll;
36 next: tBucket_ptr;
37 function MatchPrefix(const tp:tFID):boolean;
38 procedure Refresh;
39 end;
41 var Table:^tBucket;
42 {deepest first}
44 function PrefixLength(const a,b:tFID):byte;
45 var i:byte;
46 var m:byte;
47 begin
48 for result:=0 to 20 do
49 if a[result]<>b[result]
50 then break;
51 m:=$80;
52 for i:=7 downto 0 do
53 if (a[result] and m)<>(b[result] and m)
54 then break else m:=m shr 1;
55 result:=result*8+i;
56 end;
59 function tBucket.MatchPrefix(const tp:tFID):boolean;
60 begin
61 result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
62 end;
64 function FindBucket(const prefix:tFID):tBucket_ptr;
65 var cur:^tBucket;
66 begin
67 cur:=Table;
68 result:=cur;
69 while (cur<>nil) and (result=nil) do begin
70 if cur^.MatchPrefix(prefix) {first matching is deepest}
71 then result:=cur;
72 cur:=cur^.next;
73 end;
74 end;
76 operator =(const a,b:tFID):boolean;
77 begin
78 result:=CompareWord(a,b,10)=0;
79 end;
81 procedure SplitBucket(ob:tBucket_ptr);
82 procedure Toggle(var prefix:tPID; bit:byte);
83 begin
84 prefix[bit div 8]:= prefix[bit div 8] xor (bit mod 8);
85 end;
86 var nb:tBucket_ptr;
87 var i:byte;
88 begin
89 writeln('DHT: SplitBucket ',string(ob^.prefix),'/',ob^.depth);
90 {find pref to old bucket, in order to unlink}
91 if ob=Table then table:=table^.next else begin
92 nb:=Table;
93 while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
94 assert(assigned(nb),'old bucket not in table');
95 {unlink}
96 nb^.next:=nb^.next^.next; nb:=nil;
97 end;
98 {increase depth of this bucket}
99 Inc(ob^.depth);
100 ob^.ModifyTime:=mNow;
101 {create new bucket with toggled bit}
102 New(nb);
103 nb^:=ob^;
104 Toggle(nb^.Prefix,nb^.depth);
105 nb^.next:=ob;
106 {clear nodes that do not belong in bucket}
107 for i:=1 to high(tBucket.peer) do begin
108 if ob^.peer[i].addr.isNil then continue;
109 if ob^.MatchPrefix(ob^.peer[i].id)
110 then nb^.peer[i].addr.clear
111 else ob^.peer[i].addr.clear;
112 end;
113 writeln('-> ',string(ob^.prefix),'/',ob^.depth);
114 writeln('-> ',string(nb^.prefix),'/',nb^.depth);
115 if table=nil then table:=nb else begin
116 ob:=Table;
117 while assigned(ob^.next)and (ob^.next^.depth>nb^.depth) do ob:=ob^.next;
118 ob^.next:=nb;
119 writeln('-> after /',ob^.depth);
120 end;
121 Shedule(2000,@nb^.Refresh);
122 end;
124 procedure UpdateNode(const id:tFID; const addr:tNetAddr);
125 var bkt:^tBucket;
126 var i,fr:byte;
127 label again;
128 begin
129 again:
130 bkt:=FindBucket(id);
131 if not assigned(bkt) then begin
132 New(Table); //todo
133 bkt:=Table;
134 bkt^.Prefix:=MyID;
135 bkt^.Depth:=0;
136 bkt^.ModifyTime:=mNow;
137 bkt^.next:=nil;
138 for i:=1 to high(bkt^.peer) do bkt^.peer[i].addr.Clear;
139 Shedule(2000,@bkt^.Refresh);
140 end;
141 fr:=0;
142 for i:=1 to high(bkt^.peer)
143 do if (fr=0)and bkt^.peer[i].addr.isNil then fr:=i
144 //else if bkt^.peer[i].addr=addr then fr:=i
145 else if bkt^.peer[i].id=id then begin
146 if bkt^.peer[i].addr<>addr then continue;
147 {found node in the bucket}
148 writeln('DHT: UpadateNode ',string(id));
149 // ?? bkt^.ModifyTime:=mNow;
150 bkt^.peer[i].LastMsgFrom:=mNow;
151 exit end else if (fr=0) and (bkt^.peer[i].ReqDelta>=2)
152 then fr:=i {use non-responding as free};
153 if fr=0 then begin
154 if bkt^.MatchPrefix(MyID)
155 then begin
156 SplitBucket(bkt);
157 goto again;
158 end; {the bucket is full!}
159 {drop new node and hope nodes in the bucket are good}
160 end else begin
161 writeln('DHT: AddNode ',string(id),' to /',bkt^.depth,'#',fr);
162 bkt^.ModifyTime:=mNow;
163 bkt^.peer[fr].ID:=ID;
164 bkt^.peer[fr].Addr:=Addr;
165 bkt^.peer[fr].LastMsgFrom:=mNow;
166 bkt^.peer[fr].LastResFrom:=0;
167 bkt^.peer[fr].ReqDelta:=0;
168 end;
169 end;
171 procedure RecvRequest(msg:tSMsg);
172 var s:tMemoryStream absolute msg.stream;
173 var hID:^tPID;
174 var rID:^tPID;
175 var caps:byte;
176 var r:tMemoryStream;
177 var bkt:^tBucket;
178 var i:byte;
179 begin
180 s.skip(1);
181 hID:=s.ReadPtr(20);
182 rID:=s.ReadPtr(20);
183 caps:=s.ReadByte;
184 writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
185 UpdateNode(hID^,msg.source^);
186 bkt:=FindBucket(rID^);
187 r.Init(128);
188 if assigned(bkt) then begin
189 r.WriteByte(opcode.dhtSelect);
190 r.WriteByte(caps);
191 r.Write(msg.Source^,sizeof(tNetAddr));
192 r.Write(rID^,20);
193 r.Write(hID^,20);
194 if (s.RdBufLen>0)and(s.RdBufLen<=8) then r.Write(s.RdBuf^,s.RdBufLen);
195 for i:=1 to high(tBucket.peer) do begin
196 if bkt^.peer[i].addr.isNil then continue;
197 if bkt^.peer[i].addr=msg.source^ then continue;
198 writeln('-> Select to ',string(bkt^.peer[i].addr));
199 SendMessage(r.base^,r.length,bkt^.peer[i].addr);
200 end;
201 r.Seek(0);
202 r.Trunc;
204 else writeln('-> empty bucket');
205 r.WriteByte(opcode.dhtReqAck);
206 r.Write(MyID,20);
207 writeln('-> ReqAck to ',string(msg.Source^));
208 SendMessage(r.base^,r.length,msg.source^);
209 FreeMem(r.base,r.size);
210 end;
212 procedure SendRequest(const contact:tNetAddr; const forid: tPID; caps:byte);
213 var r:tMemoryStream;
214 begin
215 r.Init(42);
216 r.WriteByte(opcode.dhtRequest);
217 r.Write(MyID,sizeof(tFID));
218 r.Write(ForID,sizeof(tFID));
219 r.WriteByte(caps);
220 writeln('DHT: Request to ',string(contact));
221 SendMessage(r.base^,r.length,contact);
222 FreeMem(r.base,r.size);
223 end;
225 procedure RecvReqAck(msg:tSMsg);
226 var s:tMemoryStream absolute msg.stream;
227 var hID:^tPID;
228 begin
229 s.skip(1);
230 hID:=s.ReadPtr(20);
231 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
232 UpdateNode(hID^,msg.source^);
233 end;
235 procedure RecvWazzup(msg:tSMsg);
236 var s:tMemoryStream absolute msg.stream;
237 var hID:^tPID;
238 begin
239 s.skip(1);
240 hID:=s.ReadPtr(20);
241 writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
242 UpdateNode(hID^,msg.source^);
243 //UpdateSearch(hID^,msg.source^);
244 end;
246 procedure NodeBootstrap(const contact:tNetAddr);
247 begin
248 SendRequest(contact,MyID,0);
249 end;
251 procedure RecvSelect(msg:tSMsg);
252 var s:tMemoryStream absolute msg.stream;
253 var caps:byte;
254 var addr:^tNetAddr;
255 var rID:^tPID;
256 var r:tMemoryStream;
257 begin
258 s.skip(1);
259 caps:=s.ReadByte;
260 addr:=s.ReadPtr(sizeof(tNetAddr));
261 rID:=s.ReadPtr(20);
262 writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
263 if rID^=MyID then begin
264 writeln('-> self'); exit end;
265 r.Init(21);
266 r.WriteByte(opcode.dhtWazzup);
267 r.Write(MyID,20);
268 writeln('-> Wazzup to ',string(addr^));
269 SendMessage(r.base^,r.length,addr^);
270 FreeMem(r.base,r.size);
271 end;
273 procedure tBucket.Refresh;
274 var my,rtr:boolean;
275 var i,ol:byte;
276 begin
277 my:=MatchPrefix(MyID);
278 ol:=0;
279 rtr:=false;
280 for i:=1 to high(tBucket.peer) do
281 if (not peer[i].Addr.isNil) and (peer[i].ReqDelta<3) then begin
282 if peer[i].ReqDelta>1 then begin
283 {peer is not responding, but try once more}
284 if not rtr then write('DHT: **Refresh (',peer[i].ReqDelta,')** ');
285 SendRequest(peer[i].Addr,prefix,0);
286 inc(peer[i].ReqDelta);
287 rtr:=true;
289 else if (ol=0) or (peer[i].LastMsgFrom<peer[ol].LastMsgFrom)
290 then ol:=i;
291 end;
292 if (ol>0) and (not rtr) then begin
293 write('DHT: **Refresh(T)** ');
294 SendRequest(peer[ol].Addr,MyID,0);
295 inc(peer[ol].ReqDelta);
296 end;
297 if my
298 then Shedule(18000+(depth*600),@Refresh)
299 else Shedule(30000,@Refresh);
300 end;
303 {to bootstrap: ping address to get ID and insert to bucket/il
304 ping may get lost: separate bootstrap unit :)
305 now jut Ass-U-Me wont get lost}
307 procedure LoadIDFromArgs;
308 var oi:word;
309 const opt='-id';
310 begin
311 oi:=OptIndex(opt);
312 if oi>0 then begin
313 assert(OptParamCount(oi)=1,opt+'(pid:sha1)');
314 writeln('DHT: set ID to '+paramstr(oi+1));
315 MyID:=tPID(paramstr(oi+1));
316 end;
317 end;
319 BEGIN
320 SetMsgHandler(opcode.dhtRequest,@recvRequest);
321 SetMsgHandler(opcode.dhtSelect,@recvSelect);
322 SetMsgHandler(opcode.dhtReqAck,@recvReqAck);
323 SetMsgHandler(opcode.dhtWazzup,@recvWazzup);
324 LoadIdFromArgs;
325 END.