3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
8 TODO: weight nodes by IP-Address common prefix length.
11 {used by: messages, fileshare}
15 type tPID
=Store1
.tFID
; {reQ: ids can be shorter}
23 procedure NodeBootstrap(const contact
:tNetAddr
);
24 procedure GetNextNode(var ibkt
:pointer; var ix
:byte; out peer
:tPeerPub
);
25 procedure InsertNode(const peer
:tPeerPub
);
28 uses ServerLoop
,Chat
,MemStream
,opcode
,sha1
,ecc
,CRAuth
;
31 tPeer
=object(tPeerPub
)
36 Verify
: ^CRAuth
.tAuth
; {nil when verified}
37 procedure VerifyCallback
;
44 peer
: array [1..4] of tPeer
;
49 function MatchPrefix(const tp
:tFID
):boolean;
56 function PrefixLength(const a
,b
:tFID
):byte;
62 i
:=0; while(i
<=19) do begin
63 if a
[i
]<>b
[i
] then break
;
70 if (a
[i
] and m
)<>(b
[i
] and m
) then break
;
77 function tBucket
.MatchPrefix(const tp
:tFID
):boolean;
79 result
:=(depth
=0)or(PrefixLength(prefix
,tp
)>=depth
);
82 function FindBucket(const prefix
:tFID
):tBucket_ptr
;
87 while (cur
<>nil) and (result
=nil) do begin
88 if cur
^.MatchPrefix(prefix
) {first matching is deepest}
94 operator
=(const a
,b
:tFID
):boolean;
96 result
:=CompareWord(a
,b
,10)=0;
99 procedure SplitBucket(ob
:tBucket_ptr
);
100 procedure Toggle(var prefix
:tPID
; bit
:byte);
102 prefix
[bit
div 8]:= prefix
[bit
div 8] xor ($80 shr (bit
mod 8));
107 writeln('DHT: SplitBucket ',string(ob
^.prefix
),'/',ob
^.depth
);
108 {find pref to old bucket, in order to unlink}
109 if ob
=Table
then table
:=table
^.next
else begin
111 while assigned(nb
) and (nb
^.next
<>ob
) do nb
:=nb
^.next
;
112 assert(assigned(nb
),'old bucket not in table');
114 nb
^.next
:=nb
^.next
^.next
; nb
:=nil;
116 {increase depth of this bucket}
118 ob
^.ModifyTime
:=mNow
;
119 {create new bucket with toggled bit}
122 Toggle(nb
^.Prefix
,nb
^.depth
-1);
124 {clear nodes that do not belong in bucket}
125 for i
:=1 to high(tBucket
.peer
) do begin
126 if ob
^.peer
[i
].addr
.isNil
then continue
;
127 if ob
^.MatchPrefix(ob
^.peer
[i
].id
)
128 then nb
^.peer
[i
].addr
.clear
129 else ob
^.peer
[i
].addr
.clear
;
131 writeln('-> ',string(ob
^.prefix
),'/',ob
^.depth
);
132 for i
:=1 to high(tBucket
.peer
) do if not ob
^.peer
[i
].addr
.isnil
133 then writeln('-> -> ',string(ob
^.peer
[i
].id
));
134 writeln('-> ',string(nb
^.prefix
),'/',nb
^.depth
);
135 for i
:=1 to high(tBucket
.peer
) do if not nb
^.peer
[i
].addr
.isnil
136 then writeln('-> -> ',string(nb
^.peer
[i
].id
));
137 if table
=nil then table
:=nb
else begin
139 while assigned(ob
^.next
)and (ob
^.next
^.depth
>nb
^.depth
) do ob
:=ob
^.next
;
141 writeln('-> after /',ob
^.depth
);
143 Shedule(2000,@nb
^.Refresh
);
146 procedure VerifyInit(b
:tBucket_ptr
; i
:byte); forward;
148 function CheckNode(const id
: tPID
; const addr
: tNetAddr
): boolean;
149 {return false if node is banned}
151 {initiate auth on insert and also on id conflict}
152 {replace only old, banned and free slots}
158 if id
=MyID
then exit
;
163 if not assigned(b
) then begin
164 New(Table
); b
:=Table
;
170 for i
:=1 to high(b
^.peer
) do b
^.peer
[i
].addr
.Clear
;
171 for i
:=1 to high(b
^.peer
) do b
^.peer
[i
].ban
:=false;
172 Shedule(2000,@b
^.Refresh
);
174 for i
:=1 to high(b
^.peer
) do begin {check for ban and dup}
175 if (b
^.peer
[i
].Ban
) and (b
^.peer
[i
].Addr
=addr
) then exit
;
176 if (fr
=0)and(b
^.peer
[i
].Addr
.isNil
) then fr
:=i
;
177 if (b
^.peer
[i
].ID
=id
)or(b
^.peer
[i
].Addr
=Addr
) then begin
178 fr
:=i
;dup
:=(b
^.peer
[i
].ReqDelta
<2);break
181 if fr
=0 then for i
:=1 to high(b
^.peer
) do begin {check for old/banned}
182 if (b
^.peer
[i
].ReqDelta
>=2) then fr
:=i
;
183 if (fr
=0) and (b
^.peer
[i
].Ban
) then fr
:=i
;
186 if b
^.MatchPrefix(MyID
) then begin
189 end (*else bucket is full and not splittable*)
192 if (b
^.peer
[i
].addr
=addr
) then begin
193 b
^.peer
[i
].LastMsgFrom
:=mNow
;
194 b
^.peer
[i
].ReqDelta
:=0;
202 if (not b
^.peer
[fr
].Addr
.isNil
)and assigned(b
^.peer
[fr
].Verify
)
203 then b
^.peer
[fr
].Verify
^.Cancel
;
204 writeln('DHT: AddNode ',string(id
),string(addr
),' to ',string(b
^.prefix
),'/',b
^.depth
,'#',fr
);
207 b
^.peer
[fr
].Addr
:=Addr
;
208 b
^.peer
[fr
].LastMsgFrom
:=mNow
;
209 b
^.peer
[fr
].LastResFrom
:=0;
210 b
^.peer
[fr
].ReqDelta
:=0;
211 b
^.peer
[fr
].ban
:=false;
212 b
^.peer
[fr
].Verify
:=nil;
219 procedure InsertNode(const peer
:tPeerPub
);
221 CheckNode(peer
.id
,peer
.addr
);
224 procedure GetNextNode(var ibkt
:tBucket_ptr
; var ix
:byte; const id
:tPID
; maxrd
:word; bans
:boolean);
227 if not assigned(ibkt
) then exit
;
231 if ix
>high(tBucket
.peer
) then begin
234 if not assigned(bkt
) then break
;
236 until (not bkt
^.peer
[ix
].Addr
.isNil
)
237 and(bkt
^.peer
[ix
].ReqDelta
<maxrd
)
238 and(bans
or(bkt
^.peer
[ix
].ban
=false));
242 procedure GetNextNode(var ibkt
:pointer; var ix
:byte; out peer
:tPeerPub
);
244 if ibkt
=nil then ibkt
:=Table
;
245 GetNextNode(ibkt
,ix
,MyID
,3,false);
247 then peer
:=tBucket(ibkt
^).peer
[ix
]
248 else peer
.addr
.clear
;
252 a)Request: op, SendID, TargetID, caps, adt
253 b)Select : op, caps, addr, TargetID, OrigID, adt (66) [ ]
254 : op, caps, addr, TatgetID, SendID, adt (66) [*]
256 d)Wazzup : op, SenderID
259 procedure RecvRequest(msg
:tSMsg
);
260 var s
:tMemoryStream
absolute msg
.stream
;
274 //writeln('DHT: ',string(msg.source^),' Request for ',string(rID^));
275 if not CheckNode(sID
^,msg
.source
^) then exit
;
276 {Select peers only from The bucket,
277 if it is broken, send none, but still Ack}
278 bkt
:=FindBucket(rID
^);
280 if assigned(bkt
) then begin
281 r
.WriteByte(opcode
.dhtSelect
);
283 r
.Write(msg
.Source
^,sizeof(tNetAddr
));
286 if (s
.RdBufLen
>0)and(s
.RdBufLen
<=8) then r
.Write(s
.RdBuf
^,s
.RdBufLen
);
287 for i
:=1 to high(tBucket
.peer
) do begin
288 if bkt
^.peer
[i
].addr
.isNil
then continue
;
289 if bkt
^.peer
[i
].addr
=msg
.source
^ then continue
;
290 if bkt
^.peer
[i
].ReqDelta
>1 then continue
;
291 //writeln('-> Select to ',string(bkt^.peer[i].addr));
292 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[i
].addr
);
296 while SendCnt
<4 do begin
297 GetNextNode(bkt
,li
,rID
^,3,false);
298 if not assigned(bkt
) then break
;
299 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[li
].addr
);
305 //else writeln('-> empty bucket')
307 r
.WriteByte(opcode
.dhtReqAck
);
309 //writeln('-> ReqAck to ',string(msg.Source^));
310 SendMessage(r
.base
^,r
.length
,msg
.source
^);
311 FreeMem(r
.base
,r
.size
);
314 procedure SendRequest(const contact
:tNetAddr
; const forid
: tPID
; caps
:byte);
318 r
.WriteByte(opcode
.dhtRequest
);
319 r
.Write(MyID
,sizeof(tFID
));
320 r
.Write(ForID
,sizeof(tFID
));
322 SendMessage(r
.base
^,r
.length
,contact
);
323 FreeMem(r
.base
,r
.size
);
326 procedure RecvReqAck(msg
:tSMsg
);
327 var s
:tMemoryStream
absolute msg
.stream
;
332 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (ReqAck)');
333 CheckNode(hID
^,msg
.source
^);
336 procedure RecvWazzup(msg
:tSMsg
);
337 var s
:tMemoryStream
absolute msg
.stream
;
342 //writeln('DHT: ',string(msg.source^),' is ',string(hID^),' (Wazzup)');
343 if CheckNode(hID
^,msg
.source
^) then
344 {UpdateSearch(hID^,msg.source^)};
347 procedure NodeBootstrap(const contact
:tNetAddr
);
349 SendRequest(contact
,MyID
,0);
352 procedure RecvSelect(msg
:tSMsg
);
353 var s
:tMemoryStream
absolute msg
.stream
;
361 addr
:=s
.ReadPtr(sizeof(tNetAddr
));
364 if CheckNode(sID
^,msg
.source
^) then exit
;
365 //writeln('DHT: ',string(msg.source^),' Select for ',string(addr^));
366 if rID
^=MyID
then begin
367 //writeln('-> self');
370 r
.WriteByte(opcode
.dhtWazzup
);
372 //writeln('-> Wazzup to ',string(addr^));
373 SendMessage(r
.base
^,r
.length
,addr
^);
374 FreeMem(r
.base
,r
.size
);
378 procedure tBucket
.Refresh
;
379 var my
,rtr
,stich
:boolean;
383 procedure lSend(var peer
:tPeer
; const trg
:tPID
);
385 SendRequest(peer
.Addr
,trg
,0);
389 my
:=MatchPrefix(MyID
);
391 {1 of 10 times try to contact dead nodes in attempt to recover from network split}
392 stich
:=Random(cStichRar
)=0;
393 for i
:=1 to high(tBucket
.peer
)
394 do if (not peer
[i
].Addr
.isNil
) and (not peer
[i
].Ban
) then begin
395 if peer
[i
].ReqDelta
>0 then begin
396 if (peer
[i
].ReqDelta
<=3)xor stich
then begin
397 {this will get rid of half-dead nodes}
398 writeln('DHT: Refresh (R',peer
[i
].ReqDelta
,') ',copy(string(peer
[i
].id
),1,6),string(peer
[i
].addr
));
399 lSend(peer
[i
],prefix
);
403 else if (ol
=0) or (peer
[i
].LastMsgFrom
<peer
[ol
].LastMsgFrom
)
406 {now nudge the most quiet peer, but not too often}
407 if (ol
>0) and ((mNow
-peer
[ol
].LastMsgFrom
)>10000) then begin
408 //writeln('DHT: Refresh (T',mNow-peer[ol].LastMsgFrom,') #',ol,' ',string(peer[ol].addr));
409 lSend(peer
[ol
],MyID
);
411 {try to recover bucket full of bad nodes}
412 if (ol
=0){and(not rtr)} then begin
414 GetNextNode(rvb
,rv
,prefix
,desperate
,false);
415 if not assigned(rvb
) then begin
416 rv
:=0; rvb
:=Table
; {in extreme cases, try the whole table}
417 GetNextNode(rvb
,rv
,prefix
,desperate
,true);
419 if assigned(rvb
) then begin
420 writeln('DHT: Recover ',string(prefix
),'/',depth
,' try ',copy(string(rvb
^.peer
[rv
].id
),1,6),string(rvb
^.peer
[rv
].addr
));
421 lSend(rvb
^.peer
[rv
],prefix
);
422 end else inc(desperate
);
423 end else desperate
:=3;
425 then wait
:=18000+(depth
*600)
427 if rtr
and(not stich
) then wait
:=wait
div 3;
428 Shedule(wait
,@Refresh
);
431 {to bootstrap: ping address to get ID and insert to bucket/il
432 ping may get lost: separate bootstrap unit :)
433 now jut Ass-U-Me wont get lost}
435 procedure VerifyInit(b
:tBucket_ptr
; i
:byte);
437 with b
^.peer
[i
] do begin
438 if assigned(Verify
) then exit
;
440 Verify
^.Callback
:=@VerifyCallback
;
442 //writeln('DHT: Starting Verificator for ',string(Addr));
445 procedure tPeer
.VerifyCallback
;
447 if Verify
^.error
>0 then begin
448 writeln('DHT: Verificator error ',string(Addr
),Verify
^.error
);
451 if Verify
^.Valid
and Verify
^.PowValid
and (CompareWord(ID
,Verify
^.RemotePub
,10)=0) then
455 writeln('DHT: Verificator failed for ',string(Addr
),Verify
^.Valid
,Verify
^.PoWValid
,Verify
^.error
);
457 Verify
:=nil; {it will free itelf}
461 SetMsgHandler(opcode
.dhtRequest
,@recvRequest
);
462 SetMsgHandler(opcode
.dhtSelect
,@recvSelect
);
463 SetMsgHandler(opcode
.dhtReqAck
,@recvReqAck
);
464 SetMsgHandler(opcode
.dhtWazzup
,@recvWazzup
);