3 Copyright (c) 2006, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
36 #include <boost/bind.hpp>
37 #include <boost/optional.hpp>
38 #include <boost/function.hpp>
39 #include <boost/iterator_adaptors.hpp>
41 #include "libtorrent/io.hpp"
42 #include "libtorrent/hasher.hpp"
43 #include "libtorrent/random_sample.hpp"
44 #include "libtorrent/kademlia/node_id.hpp"
45 #include "libtorrent/kademlia/rpc_manager.hpp"
46 #include "libtorrent/kademlia/routing_table.hpp"
47 #include "libtorrent/kademlia/node.hpp"
49 #include "libtorrent/kademlia/refresh.hpp"
50 #include "libtorrent/kademlia/closest_nodes.hpp"
51 #include "libtorrent/kademlia/find_data.hpp"
55 namespace libtorrent
{ namespace dht
61 char rand() { return (char)std::rand(); }
65 // TODO: configurable?
66 enum { announce_interval
= 30 };
68 #ifdef TORRENT_DHT_VERBOSE_LOGGING
69 TORRENT_DEFINE_LOG(node
)
72 // remove peers that have timed out
73 void purge_peers(std::set
<peer_entry
>& peers
)
75 for (std::set
<peer_entry
>::iterator i
= peers
.begin()
76 , end(peers
.end()); i
!= end
;)
78 // the peer has timed out
79 if (i
->added
+ minutes(int(announce_interval
* 1.5f
)) < time_now())
81 #ifdef TORRENT_DHT_VERBOSE_LOGGING
82 TORRENT_LOG(node
) << "peer timed out at: " << i
->addr
.address();
93 node_impl::node_impl(boost::function
<void(msg
const&)> const& f
94 , dht_settings
const& settings
, boost::optional
<node_id
> node_id
)
95 : m_settings(settings
)
96 , m_id(node_id
? *node_id
: generate_id())
97 , m_table(m_id
, 8, settings
)
98 , m_rpc(bind(&node_impl::incoming_request
, this, _1
)
100 , m_last_tracker_tick(time_now())
102 m_secret
[0] = std::rand();
103 m_secret
[1] = std::rand();
106 bool node_impl::verify_token(msg
const& m
)
108 if (m
.write_token
.type() != entry::string_t
)
110 #ifdef TORRENT_DHT_VERBOSE_LOGGING
111 TORRENT_LOG(node
) << "token of incorrect type " << m
.write_token
.type();
115 std::string
const& token
= m
.write_token
.string();
116 if (token
.length() != 4)
118 #ifdef TORRENT_DHT_VERBOSE_LOGGING
119 TORRENT_LOG(node
) << "token of incorrect length: " << token
.length();
125 std::string address
= m
.addr
.address().to_string();
126 h1
.update(&address
[0], address
.length());
127 h1
.update((char*)&m_secret
[0], sizeof(m_secret
[0]));
128 h1
.update((char*)&m
.info_hash
[0], sha1_hash::size
);
130 sha1_hash h
= h1
.final();
131 if (std::equal(token
.begin(), token
.end(), (signed char*)&h
[0]))
135 h2
.update(&address
[0], address
.length());
136 h2
.update((char*)&m_secret
[1], sizeof(m_secret
[1]));
137 h2
.update((char*)&m
.info_hash
[0], sha1_hash::size
);
139 if (std::equal(token
.begin(), token
.end(), (signed char*)&h
[0]))
144 entry
node_impl::generate_token(msg
const& m
)
149 std::string address
= m
.addr
.address().to_string();
150 h
.update(&address
[0], address
.length());
151 h
.update((char*)&m_secret
[0], sizeof(m_secret
[0]));
152 h
.update((char*)&m
.info_hash
[0], sha1_hash::size
);
154 sha1_hash hash
= h
.final();
155 std::copy(hash
.begin(), hash
.begin() + 4, (signed char*)&token
[0]);
159 void node_impl::refresh(node_id
const& id
160 , boost::function0
<void> f
)
162 // use the 'bucket size' closest nodes
163 // to start the refresh with
164 std::vector
<node_entry
> start
;
165 start
.reserve(m_table
.bucket_size());
166 m_table
.find_node(id
, start
, false);
167 refresh::initiate(id
, m_settings
.search_branching
, 10, m_table
.bucket_size()
168 , m_table
, start
.begin(), start
.end(), m_rpc
, f
);
171 void node_impl::bootstrap(std::vector
<udp::endpoint
> const& nodes
172 , boost::function0
<void> f
)
174 #ifdef TORRENT_DHT_VERBOSE_LOGGING
175 TORRENT_LOG(node
) << "bootrapping: " << nodes
.size();
176 for (std::vector
<udp::endpoint
>::const_iterator i
= nodes
.begin()
177 , end(nodes
.end()); i
!= end
; ++i
)
178 TORRENT_LOG(node
) << " " << *i
;
180 std::vector
<node_entry
> start
;
181 start
.reserve(nodes
.size());
182 std::copy(nodes
.begin(), nodes
.end(), std::back_inserter(start
));
183 refresh::initiate(m_id
, m_settings
.search_branching
, 10, m_table
.bucket_size()
184 , m_table
, start
.begin(), start
.end(), m_rpc
, f
);
187 void node_impl::refresh()
189 std::vector
<node_entry
> start
;
190 start
.reserve(m_table
.size().get
<0>());
191 std::copy(m_table
.begin(), m_table
.end(), std::back_inserter(start
));
193 refresh::initiate(m_id
, m_settings
.search_branching
, 10, m_table
.bucket_size()
194 , m_table
, start
.begin(), start
.end(), m_rpc
, bind(&nop
));
197 int node_impl::bucket_size(int bucket
)
199 return m_table
.bucket_size(bucket
);
202 void node_impl::new_write_key()
204 m_secret
[1] = m_secret
[0];
205 m_secret
[0] = std::rand();
208 void node_impl::refresh_bucket(int bucket
) try
210 TORRENT_ASSERT(bucket
>= 0 && bucket
< 160);
212 // generate a random node_id within the given bucket
213 node_id target
= generate_id();
214 int num_bits
= 160 - bucket
;
216 for (int i
= 0; i
< num_bits
; ++i
)
219 mask
[byte
] |= 0x80 >> (i
% 8);
227 // make sure this is in another subtree than m_id
228 // clear the (num_bits - 1) bit and then set it to the
229 // inverse of m_id's corresponding bit.
230 target
[(num_bits
- 1) / 8] &= ~(0x80 >> ((num_bits
- 1) % 8));
231 target
[(num_bits
- 1) / 8] |=
232 (~(m_id
[(num_bits
- 1) / 8])) & (0x80 >> ((num_bits
- 1) % 8));
234 TORRENT_ASSERT(distance_exp(m_id
, target
) == bucket
);
236 std::vector
<node_entry
> start
;
237 start
.reserve(m_table
.bucket_size());
238 m_table
.find_node(target
, start
, false, m_table
.bucket_size());
240 refresh::initiate(target
, m_settings
.search_branching
, 10, m_table
.bucket_size()
241 , m_table
, start
.begin(), start
.end(), m_rpc
, bind(&nop
));
242 m_table
.touch_bucket(bucket
);
244 catch (std::exception
&) {}
246 void node_impl::unreachable(udp::endpoint
const& ep
)
248 m_rpc
.unreachable(ep
);
251 void node_impl::incoming(msg
const& m
)
253 if (m_rpc
.incoming(m
))
261 void announce_fun(std::vector
<node_entry
> const& v
, rpc_manager
& rpc
262 , int listen_port
, sha1_hash
const& ih
263 , boost::function
<void(std::vector
<tcp::endpoint
> const&, sha1_hash
const&)> f
)
265 #ifdef TORRENT_DHT_VERBOSE_LOGGING
266 TORRENT_LOG(node
) << "announce response [ ih: " << ih
267 << " p: " << listen_port
268 << " nodes: " << v
.size() << " ]" ;
271 // only store on the first k nodes
272 for (std::vector
<node_entry
>::const_iterator i
= v
.begin()
273 , end(v
.end()); i
!= end
; ++i
)
275 #ifdef TORRENT_DHT_VERBOSE_LOGGING
276 TORRENT_LOG(node
) << " distance: " << (160 - distance_exp(ih
, i
->id
));
278 observer_ptr
o(new (rpc
.allocator().malloc()) get_peers_observer(ih
, listen_port
, rpc
, f
));
280 o
->m_in_constructor
= false;
282 rpc
.invoke(messages::get_peers
, i
->addr
, o
);
288 void node_impl::add_router_node(udp::endpoint router
)
290 #ifdef TORRENT_DHT_VERBOSE_LOGGING
291 TORRENT_LOG(node
) << "adding router node: " << router
;
293 m_table
.add_router_node(router
);
296 void node_impl::add_node(udp::endpoint node
)
298 // ping the node, and if we get a reply, it
299 // will be added to the routing table
300 observer_ptr
o(new (m_rpc
.allocator().malloc()) null_observer(m_rpc
.allocator()));
302 o
->m_in_constructor
= false;
304 m_rpc
.invoke(messages::ping
, node
, o
);
307 void node_impl::announce(sha1_hash
const& info_hash
, int listen_port
308 , boost::function
<void(std::vector
<tcp::endpoint
> const&, sha1_hash
const&)> f
)
310 #ifdef TORRENT_DHT_VERBOSE_LOGGING
311 TORRENT_LOG(node
) << "announcing [ ih: " << info_hash
<< " p: " << listen_port
<< " ]" ;
313 // search for nodes with ids close to id, and then invoke the
314 // get_peers and then announce_peer rpc on them.
315 closest_nodes::initiate(info_hash
, m_settings
.search_branching
316 , m_table
.bucket_size(), m_table
, m_rpc
317 , boost::bind(&announce_fun
, _1
, boost::ref(m_rpc
), listen_port
321 time_duration
node_impl::refresh_timeout()
324 ptime now
= time_now();
325 ptime next
= now
+ minutes(15);
328 for (int i
= 0; i
< 160; ++i
)
330 ptime r
= m_table
.next_refresh(i
);
339 TORRENT_ASSERT(refresh
> -1);
340 #ifdef TORRENT_DHT_VERBOSE_LOGGING
341 TORRENT_LOG(node
) << "refreshing bucket: " << refresh
;
343 refresh_bucket(refresh
);
346 catch (std::exception
&) {}
348 time_duration next_refresh
= next
- now
;
349 time_duration min_next_refresh
350 = minutes(15) / (m_table
.num_active_buckets());
351 if (min_next_refresh
> seconds(40))
352 min_next_refresh
= seconds(40);
354 if (next_refresh
< min_next_refresh
)
355 next_refresh
= min_next_refresh
;
357 #ifdef TORRENT_DHT_VERBOSE_LOGGING
358 TORRENT_LOG(node
) << "next refresh: " << total_seconds(next_refresh
) << " seconds";
364 time_duration
node_impl::connection_timeout()
366 time_duration d
= m_rpc
.tick();
369 ptime
now(time_now());
370 if (now
- m_last_tracker_tick
< minutes(10)) return d
;
371 m_last_tracker_tick
= now
;
373 // look through all peers and see if any have timed out
374 for (data_iterator i
= begin_data(), end(end_data()); i
!= end
;)
376 torrent_entry
& t
= i
->second
;
377 node_id
const& key
= i
->first
;
379 purge_peers(t
.peers
);
381 // if there are no more peers, remove the entry altogether
384 table_t::iterator i
= m_map
.find(key
);
385 if (i
!= m_map
.end()) m_map
.erase(i
);
389 catch (std::exception
&) {}
394 void node_impl::on_announce(msg
const& m
, msg
& reply
)
396 if (!verify_token(m
))
398 reply
.message_id
= messages::error
;
399 reply
.error_code
= 203;
400 reply
.error_msg
= "Incorrect token in announce_peer";
404 // the token was correct. That means this
405 // node is not spoofing its address. So, let
406 // the table get a chance to add it.
407 m_table
.node_seen(m
.id
, m
.addr
);
409 torrent_entry
& v
= m_map
[m
.info_hash
];
411 e
.addr
= tcp::endpoint(m
.addr
.address(), m
.port
);
412 e
.added
= time_now();
413 std::set
<peer_entry
>::iterator i
= v
.peers
.find(e
);
414 if (i
!= v
.peers
.end()) v
.peers
.erase(i
++);
415 v
.peers
.insert(i
, e
);
420 tcp::endpoint
get_endpoint(peer_entry
const& p
)
426 bool node_impl::on_find(msg
const& m
, std::vector
<tcp::endpoint
>& peers
) const
428 table_t::const_iterator i
= m_map
.find(m
.info_hash
);
429 if (i
== m_map
.end()) return false;
431 torrent_entry
const& v
= i
->second
;
433 int num
= (std::min
)((int)v
.peers
.size(), m_settings
.max_peers_reply
);
436 random_sample_n(boost::make_transform_iterator(v
.peers
.begin(), &get_endpoint
)
437 , boost::make_transform_iterator(v
.peers
.end(), &get_endpoint
)
438 , std::back_inserter(peers
), num
);
440 #ifdef TORRENT_DHT_VERBOSE_LOGGING
441 for (std::vector
<tcp::endpoint
>::iterator i
= peers
.begin()
442 , end(peers
.end()); i
!= end
; ++i
)
444 TORRENT_LOG(node
) << " " << *i
;
450 void node_impl::incoming_request(msg
const& m
)
453 reply
.message_id
= m
.message_id
;
456 reply
.transaction_id
= m
.transaction_id
;
458 switch (m
.message_id
)
462 case messages::get_peers
:
464 reply
.info_hash
= m
.info_hash
;
465 reply
.write_token
= generate_token(m
);
467 if (!on_find(m
, reply
.peers
))
469 // we don't have any peers for this info_hash,
470 // return nodes instead
471 m_table
.find_node(m
.info_hash
, reply
.nodes
, false);
472 #ifdef TORRENT_DHT_VERBOSE_LOGGING
473 for (std::vector
<node_entry
>::iterator i
= reply
.nodes
.begin()
474 , end(reply
.nodes
.end()); i
!= end
; ++i
)
476 TORRENT_LOG(node
) << " " << i
->id
<< " " << i
->addr
;
482 case messages::find_node
:
484 reply
.info_hash
= m
.info_hash
;
486 m_table
.find_node(m
.info_hash
, reply
.nodes
, false);
487 #ifdef TORRENT_DHT_VERBOSE_LOGGING
488 for (std::vector
<node_entry
>::iterator i
= reply
.nodes
.begin()
489 , end(reply
.nodes
.end()); i
!= end
; ++i
)
491 TORRENT_LOG(node
) << " " << i
->id
<< " " << i
->addr
;
496 case messages::announce_peer
:
497 on_announce(m
, reply
);
500 TORRENT_ASSERT(false);
503 if (m_table
.need_node(m
.id
))
504 m_rpc
.reply_with_ping(reply
);
510 } } // namespace libtorrent::dht