3 implementation of custom dht, based on pastry and kademlia.
4 keyspace is divided into buckets of limited capacity
5 node belongs to bucket, where at least 'depth' bits match 'prefix'
10 {used by: messages, fileshare}
14 type tPID
=Store1
.tFID
;
16 procedure NodeBootstrap(const contact
:tNetAddr
);
19 uses ServerLoop
,MemStream
,opcode
;
33 peer
: array [1..4] of tPeer
;
37 function MatchPrefix(const tp
:tFID
):boolean;
44 function PrefixLength(const a
,b
:tFID
):byte;
48 for result
:=0 to 20 do
49 if a
[result
]<>b
[result
]
53 if (a
[result
] and m
)<>(b
[result
] and m
)
54 then break
else m
:=m
shr 1;
59 function tBucket
.MatchPrefix(const tp
:tFID
):boolean;
61 result
:=(depth
=0)or(PrefixLength(prefix
,tp
)>=depth
);
64 function FindBucket(const prefix
:tFID
):tBucket_ptr
;
69 while (cur
<>nil) and (result
=nil) do begin
70 if cur
^.MatchPrefix(prefix
) {first matching is deepest}
76 operator
=(const a
,b
:tFID
):boolean;
78 result
:=CompareWord(a
,b
,10)=0;
81 procedure SplitBucket(ob
:tBucket_ptr
);
82 procedure Toggle(var prefix
:tPID
; bit
:byte);
84 prefix
[bit
div 8]:= prefix
[bit
div 8] xor (bit
mod 8);
89 writeln('DHT: SplitBucket ',string(ob
^.prefix
),'/',ob
^.depth
);
90 {find pref to old bucket, in order to unlink}
91 if ob
=Table
then table
:=table
^.next
else begin
93 while assigned(nb
) and (nb
^.next
<>ob
) do nb
:=nb
^.next
;
94 assert(assigned(nb
),'old bucket not in table');
96 nb
^.next
:=nb
^.next
^.next
; nb
:=nil;
98 {increase depth of this bucket}
100 ob
^.ModifyTime
:=mNow
;
101 {create new bucket with toggled bit}
104 Toggle(nb
^.Prefix
,nb
^.depth
);
106 {clear nodes that do not belong in bucket}
107 for i
:=1 to high(tBucket
.peer
) do begin
108 if ob
^.peer
[i
].addr
.isNil
then continue
;
109 if ob
^.MatchPrefix(ob
^.peer
[i
].id
)
110 then nb
^.peer
[i
].addr
.clear
111 else ob
^.peer
[i
].addr
.clear
;
113 writeln('-> ',string(ob
^.prefix
),'/',ob
^.depth
);
114 writeln('-> ',string(nb
^.prefix
),'/',nb
^.depth
);
115 if table
=nil then table
:=nb
else begin
117 while assigned(ob
^.next
)and (ob
^.next
^.depth
>nb
^.depth
) do ob
:=ob
^.next
;
119 writeln('-> after /',ob
^.depth
);
121 Shedule(2000,@nb
^.Refresh
);
124 procedure UpdateNode(const id
:tFID
; const addr
:tNetAddr
);
131 if not assigned(bkt
) then begin
136 bkt
^.ModifyTime
:=mNow
;
138 for i
:=1 to high(bkt
^.peer
) do bkt
^.peer
[i
].addr
.Clear
;
139 Shedule(2000,@bkt
^.Refresh
);
142 for i
:=1 to high(bkt
^.peer
)
143 do if (fr
=0)and bkt
^.peer
[i
].addr
.isNil
then fr
:=i
144 //else if bkt^.peer[i].addr=addr then fr:=i
145 else if bkt
^.peer
[i
].id
=id
then begin
146 if bkt
^.peer
[i
].addr
<>addr
then continue
;
147 {found node in the bucket}
148 writeln('DHT: UpdateNode ',string(id
));
149 // ?? bkt^.ModifyTime:=mNow;
150 bkt
^.peer
[i
].LastMsgFrom
:=mNow
;
151 bkt
^.peer
[i
].ReqDelta
:=0;
152 exit
end else if (fr
=0) and (bkt
^.peer
[i
].ReqDelta
>=2)
153 then fr
:=i
{use non-responding as free};
155 if bkt
^.MatchPrefix(MyID
)
159 end; {the bucket is full!}
160 {drop new node and hope nodes in the bucket are good}
162 writeln('DHT: AddNode ',string(id
),' to /',bkt
^.depth
,'#',fr
);
163 bkt
^.ModifyTime
:=mNow
;
164 bkt
^.peer
[fr
].ID
:=ID
;
165 bkt
^.peer
[fr
].Addr
:=Addr
;
166 bkt
^.peer
[fr
].LastMsgFrom
:=mNow
;
167 bkt
^.peer
[fr
].LastResFrom
:=0;
168 bkt
^.peer
[fr
].ReqDelta
:=0;
172 procedure RecvRequest(msg
:tSMsg
);
173 var s
:tMemoryStream
absolute msg
.stream
;
185 writeln('DHT: ',string(msg
.source
^),' Request for ',string(rID
^));
186 UpdateNode(hID
^,msg
.source
^);
187 bkt
:=FindBucket(rID
^);
189 if assigned(bkt
) then begin
190 r
.WriteByte(opcode
.dhtSelect
);
192 r
.Write(msg
.Source
^,sizeof(tNetAddr
));
195 if (s
.RdBufLen
>0)and(s
.RdBufLen
<=8) then r
.Write(s
.RdBuf
^,s
.RdBufLen
);
196 for i
:=1 to high(tBucket
.peer
) do begin
197 if bkt
^.peer
[i
].addr
.isNil
then continue
;
198 if bkt
^.peer
[i
].addr
=msg
.source
^ then continue
;
199 writeln('-> Select to ',string(bkt
^.peer
[i
].addr
));
200 SendMessage(r
.base
^,r
.length
,bkt
^.peer
[i
].addr
);
205 else writeln('-> empty bucket');
206 r
.WriteByte(opcode
.dhtReqAck
);
208 writeln('-> ReqAck to ',string(msg
.Source
^));
209 SendMessage(r
.base
^,r
.length
,msg
.source
^);
210 FreeMem(r
.base
,r
.size
);
213 procedure SendRequest(const contact
:tNetAddr
; const forid
: tPID
; caps
:byte);
217 r
.WriteByte(opcode
.dhtRequest
);
218 r
.Write(MyID
,sizeof(tFID
));
219 r
.Write(ForID
,sizeof(tFID
));
221 writeln('DHT: Request to ',string(contact
));
222 SendMessage(r
.base
^,r
.length
,contact
);
223 FreeMem(r
.base
,r
.size
);
226 procedure RecvReqAck(msg
:tSMsg
);
227 var s
:tMemoryStream
absolute msg
.stream
;
232 writeln('DHT: ',string(msg
.source
^),' is ',string(hID
^),' (ReqAck)');
233 UpdateNode(hID
^,msg
.source
^);
236 procedure RecvWazzup(msg
:tSMsg
);
237 var s
:tMemoryStream
absolute msg
.stream
;
242 writeln('DHT: ',string(msg
.source
^),' is ',string(hID
^),' (Wazzup)');
243 UpdateNode(hID
^,msg
.source
^);
244 //UpdateSearch(hID^,msg.source^);
247 procedure NodeBootstrap(const contact
:tNetAddr
);
249 SendRequest(contact
,MyID
,0);
252 procedure RecvSelect(msg
:tSMsg
);
253 var s
:tMemoryStream
absolute msg
.stream
;
261 addr
:=s
.ReadPtr(sizeof(tNetAddr
));
263 writeln('DHT: ',string(msg
.source
^),' Select for ',string(addr
^));
264 if rID
^=MyID
then begin
265 writeln('-> self'); exit
end;
267 r
.WriteByte(opcode
.dhtWazzup
);
269 writeln('-> Wazzup to ',string(addr
^));
270 SendMessage(r
.base
^,r
.length
,addr
^);
271 FreeMem(r
.base
,r
.size
);
274 procedure tBucket
.Refresh
;
278 my
:=MatchPrefix(MyID
);
281 for i
:=1 to high(tBucket
.peer
) do
282 if (not peer
[i
].Addr
.isNil
) and (peer
[i
].ReqDelta
<6) then begin
283 if peer
[i
].ReqDelta
>1 then begin
284 {peer is not responding, but try once more}
285 if not rtr
then write('DHT: **Refresh (',peer
[i
].ReqDelta
,')** ');
286 SendRequest(peer
[i
].Addr
,prefix
,0);
287 inc(peer
[i
].ReqDelta
);
290 else if (ol
=0) or (peer
[i
].LastMsgFrom
<peer
[ol
].LastMsgFrom
)
293 if (ol
>0) and (not rtr
) then begin
294 write('DHT: **Refresh(T)** ');
295 SendRequest(peer
[ol
].Addr
,MyID
,0);
296 inc(peer
[ol
].ReqDelta
);
299 then Shedule(18000+(depth
*600),@Refresh
)
300 else Shedule(30000,@Refresh
);
304 {to bootstrap: ping address to get ID and insert to bucket/il
305 ping may get lost: separate bootstrap unit :)
306 now jut Ass-U-Me wont get lost}
308 procedure LoadIDFromArgs
;
314 assert(OptParamCount(oi
)=1,opt
+'(pid:sha1)');
315 writeln('DHT: set ID to '+paramstr(oi
+1));
316 MyID
:=tPID(paramstr(oi
+1));
321 SetMsgHandler(opcode
.dhtRequest
,@recvRequest
);
322 SetMsgHandler(opcode
.dhtSelect
,@recvSelect
);
323 SetMsgHandler(opcode
.dhtReqAck
,@recvReqAck
);
324 SetMsgHandler(opcode
.dhtWazzup
,@recvWazzup
);