fixes bug where priorities where lost when force-rechecking.
[libtorrent.git] / src / kademlia / node.cpp
blob63a8ac84b80ed3679d60ffb013fc6a1d7cdba657
1 /*
3 Copyright (c) 2006, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <utility>
36 #include <boost/bind.hpp>
37 #include <boost/optional.hpp>
38 #include <boost/function.hpp>
39 #include <boost/iterator_adaptors.hpp>
41 #include "libtorrent/io.hpp"
42 #include "libtorrent/hasher.hpp"
43 #include "libtorrent/random_sample.hpp"
44 #include "libtorrent/kademlia/node_id.hpp"
45 #include "libtorrent/kademlia/rpc_manager.hpp"
46 #include "libtorrent/kademlia/routing_table.hpp"
47 #include "libtorrent/kademlia/node.hpp"
49 #include "libtorrent/kademlia/refresh.hpp"
50 #include "libtorrent/kademlia/closest_nodes.hpp"
51 #include "libtorrent/kademlia/find_data.hpp"
53 using boost::bind;
55 namespace libtorrent { namespace dht
58 #ifdef _MSC_VER
59 namespace
61 char rand() { return (char)std::rand(); }
63 #endif
65 // TODO: configurable?
66 enum { announce_interval = 30 };
68 #ifdef TORRENT_DHT_VERBOSE_LOGGING
69 TORRENT_DEFINE_LOG(node)
70 #endif
72 // remove peers that have timed out
73 void purge_peers(std::set<peer_entry>& peers)
75 for (std::set<peer_entry>::iterator i = peers.begin()
76 , end(peers.end()); i != end;)
78 // the peer has timed out
79 if (i->added + minutes(int(announce_interval * 1.5f)) < time_now())
81 #ifdef TORRENT_DHT_VERBOSE_LOGGING
82 TORRENT_LOG(node) << "peer timed out at: " << i->addr.address();
83 #endif
84 peers.erase(i++);
86 else
87 ++i;
91 void nop() {}
93 node_impl::node_impl(boost::function<void(msg const&)> const& f
94 , dht_settings const& settings, boost::optional<node_id> node_id)
95 : m_settings(settings)
96 , m_id(node_id ? *node_id : generate_id())
97 , m_table(m_id, 8, settings)
98 , m_rpc(bind(&node_impl::incoming_request, this, _1)
99 , m_id, m_table, f)
100 , m_last_tracker_tick(time_now())
102 m_secret[0] = std::rand();
103 m_secret[1] = std::rand();
106 bool node_impl::verify_token(msg const& m)
108 if (m.write_token.type() != entry::string_t)
110 #ifdef TORRENT_DHT_VERBOSE_LOGGING
111 TORRENT_LOG(node) << "token of incorrect type " << m.write_token.type();
112 #endif
113 return false;
115 std::string const& token = m.write_token.string();
116 if (token.length() != 4)
118 #ifdef TORRENT_DHT_VERBOSE_LOGGING
119 TORRENT_LOG(node) << "token of incorrect length: " << token.length();
120 #endif
121 return false;
124 hasher h1;
125 std::string address = m.addr.address().to_string();
126 h1.update(&address[0], address.length());
127 h1.update((char*)&m_secret[0], sizeof(m_secret[0]));
128 h1.update((char*)&m.info_hash[0], sha1_hash::size);
130 sha1_hash h = h1.final();
131 if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
132 return true;
134 hasher h2;
135 h2.update(&address[0], address.length());
136 h2.update((char*)&m_secret[1], sizeof(m_secret[1]));
137 h2.update((char*)&m.info_hash[0], sha1_hash::size);
138 h = h2.final();
139 if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
140 return true;
141 return false;
144 entry node_impl::generate_token(msg const& m)
146 std::string token;
147 token.resize(4);
148 hasher h;
149 std::string address = m.addr.address().to_string();
150 h.update(&address[0], address.length());
151 h.update((char*)&m_secret[0], sizeof(m_secret[0]));
152 h.update((char*)&m.info_hash[0], sha1_hash::size);
154 sha1_hash hash = h.final();
155 std::copy(hash.begin(), hash.begin() + 4, (signed char*)&token[0]);
156 return entry(token);
159 void node_impl::refresh(node_id const& id
160 , boost::function0<void> f)
162 // use the 'bucket size' closest nodes
163 // to start the refresh with
164 std::vector<node_entry> start;
165 start.reserve(m_table.bucket_size());
166 m_table.find_node(id, start, false);
167 refresh::initiate(id, m_settings.search_branching, 10, m_table.bucket_size()
168 , m_table, start.begin(), start.end(), m_rpc, f);
171 void node_impl::bootstrap(std::vector<udp::endpoint> const& nodes
172 , boost::function0<void> f)
174 #ifdef TORRENT_DHT_VERBOSE_LOGGING
175 TORRENT_LOG(node) << "bootrapping: " << nodes.size();
176 for (std::vector<udp::endpoint>::const_iterator i = nodes.begin()
177 , end(nodes.end()); i != end; ++i)
178 TORRENT_LOG(node) << " " << *i;
179 #endif
180 std::vector<node_entry> start;
181 start.reserve(nodes.size());
182 std::copy(nodes.begin(), nodes.end(), std::back_inserter(start));
183 refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
184 , m_table, start.begin(), start.end(), m_rpc, f);
187 void node_impl::refresh()
189 std::vector<node_entry> start;
190 start.reserve(m_table.size().get<0>());
191 std::copy(m_table.begin(), m_table.end(), std::back_inserter(start));
193 refresh::initiate(m_id, m_settings.search_branching, 10, m_table.bucket_size()
194 , m_table, start.begin(), start.end(), m_rpc, bind(&nop));
197 int node_impl::bucket_size(int bucket)
199 return m_table.bucket_size(bucket);
202 void node_impl::new_write_key()
204 m_secret[1] = m_secret[0];
205 m_secret[0] = std::rand();
208 void node_impl::refresh_bucket(int bucket) try
210 TORRENT_ASSERT(bucket >= 0 && bucket < 160);
212 // generate a random node_id within the given bucket
213 node_id target = generate_id();
214 int num_bits = 160 - bucket;
215 node_id mask(0);
216 for (int i = 0; i < num_bits; ++i)
218 int byte = i / 8;
219 mask[byte] |= 0x80 >> (i % 8);
222 node_id root = m_id;
223 root &= mask;
224 target &= ~mask;
225 target |= root;
227 // make sure this is in another subtree than m_id
228 // clear the (num_bits - 1) bit and then set it to the
229 // inverse of m_id's corresponding bit.
230 target[(num_bits - 1) / 8] &= ~(0x80 >> ((num_bits - 1) % 8));
231 target[(num_bits - 1) / 8] |=
232 (~(m_id[(num_bits - 1) / 8])) & (0x80 >> ((num_bits - 1) % 8));
234 TORRENT_ASSERT(distance_exp(m_id, target) == bucket);
236 std::vector<node_entry> start;
237 start.reserve(m_table.bucket_size());
238 m_table.find_node(target, start, false, m_table.bucket_size());
240 refresh::initiate(target, m_settings.search_branching, 10, m_table.bucket_size()
241 , m_table, start.begin(), start.end(), m_rpc, bind(&nop));
242 m_table.touch_bucket(bucket);
244 catch (std::exception&) {}
246 void node_impl::unreachable(udp::endpoint const& ep)
248 m_rpc.unreachable(ep);
251 void node_impl::incoming(msg const& m)
253 if (m_rpc.incoming(m))
255 refresh();
259 namespace
261 void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc
262 , int listen_port, sha1_hash const& ih
263 , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
265 #ifdef TORRENT_DHT_VERBOSE_LOGGING
266 TORRENT_LOG(node) << "announce response [ ih: " << ih
267 << " p: " << listen_port
268 << " nodes: " << v.size() << " ]" ;
269 #endif
270 bool nodes = false;
271 // only store on the first k nodes
272 for (std::vector<node_entry>::const_iterator i = v.begin()
273 , end(v.end()); i != end; ++i)
275 #ifdef TORRENT_DHT_VERBOSE_LOGGING
276 TORRENT_LOG(node) << " distance: " << (160 - distance_exp(ih, i->id));
277 #endif
278 observer_ptr o(new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f));
279 #ifndef NDEBUG
280 o->m_in_constructor = false;
281 #endif
282 rpc.invoke(messages::get_peers, i->addr, o);
283 nodes = true;
288 void node_impl::add_router_node(udp::endpoint router)
290 #ifdef TORRENT_DHT_VERBOSE_LOGGING
291 TORRENT_LOG(node) << "adding router node: " << router;
292 #endif
293 m_table.add_router_node(router);
296 void node_impl::add_node(udp::endpoint node)
298 // ping the node, and if we get a reply, it
299 // will be added to the routing table
300 observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator()));
301 #ifndef NDEBUG
302 o->m_in_constructor = false;
303 #endif
304 m_rpc.invoke(messages::ping, node, o);
307 void node_impl::announce(sha1_hash const& info_hash, int listen_port
308 , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
310 #ifdef TORRENT_DHT_VERBOSE_LOGGING
311 TORRENT_LOG(node) << "announcing [ ih: " << info_hash << " p: " << listen_port << " ]" ;
312 #endif
313 // search for nodes with ids close to id, and then invoke the
314 // get_peers and then announce_peer rpc on them.
315 closest_nodes::initiate(info_hash, m_settings.search_branching
316 , m_table.bucket_size(), m_table, m_rpc
317 , boost::bind(&announce_fun, _1, boost::ref(m_rpc), listen_port
318 , info_hash, f));
321 time_duration node_impl::refresh_timeout()
323 int refresh = -1;
324 ptime now = time_now();
325 ptime next = now + minutes(15);
328 for (int i = 0; i < 160; ++i)
330 ptime r = m_table.next_refresh(i);
331 if (r <= next)
333 refresh = i;
334 next = r;
337 if (next < now)
339 TORRENT_ASSERT(refresh > -1);
340 #ifdef TORRENT_DHT_VERBOSE_LOGGING
341 TORRENT_LOG(node) << "refreshing bucket: " << refresh;
342 #endif
343 refresh_bucket(refresh);
346 catch (std::exception&) {}
348 time_duration next_refresh = next - now;
349 time_duration min_next_refresh
350 = minutes(15) / (m_table.num_active_buckets());
351 if (min_next_refresh > seconds(40))
352 min_next_refresh = seconds(40);
354 if (next_refresh < min_next_refresh)
355 next_refresh = min_next_refresh;
357 #ifdef TORRENT_DHT_VERBOSE_LOGGING
358 TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds";
359 #endif
361 return next_refresh;
364 time_duration node_impl::connection_timeout()
366 time_duration d = m_rpc.tick();
369 ptime now(time_now());
370 if (now - m_last_tracker_tick < minutes(10)) return d;
371 m_last_tracker_tick = now;
373 // look through all peers and see if any have timed out
374 for (data_iterator i = begin_data(), end(end_data()); i != end;)
376 torrent_entry& t = i->second;
377 node_id const& key = i->first;
378 ++i;
379 purge_peers(t.peers);
381 // if there are no more peers, remove the entry altogether
382 if (t.peers.empty())
384 table_t::iterator i = m_map.find(key);
385 if (i != m_map.end()) m_map.erase(i);
389 catch (std::exception&) {}
391 return d;
394 void node_impl::on_announce(msg const& m, msg& reply)
396 if (!verify_token(m))
398 reply.message_id = messages::error;
399 reply.error_code = 203;
400 reply.error_msg = "Incorrect token in announce_peer";
401 return;
404 // the token was correct. That means this
405 // node is not spoofing its address. So, let
406 // the table get a chance to add it.
407 m_table.node_seen(m.id, m.addr);
409 torrent_entry& v = m_map[m.info_hash];
410 peer_entry e;
411 e.addr = tcp::endpoint(m.addr.address(), m.port);
412 e.added = time_now();
413 std::set<peer_entry>::iterator i = v.peers.find(e);
414 if (i != v.peers.end()) v.peers.erase(i++);
415 v.peers.insert(i, e);
418 namespace
420 tcp::endpoint get_endpoint(peer_entry const& p)
422 return p.addr;
426 bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
428 table_t::const_iterator i = m_map.find(m.info_hash);
429 if (i == m_map.end()) return false;
431 torrent_entry const& v = i->second;
433 int num = (std::min)((int)v.peers.size(), m_settings.max_peers_reply);
434 peers.clear();
435 peers.reserve(num);
436 random_sample_n(boost::make_transform_iterator(v.peers.begin(), &get_endpoint)
437 , boost::make_transform_iterator(v.peers.end(), &get_endpoint)
438 , std::back_inserter(peers), num);
440 #ifdef TORRENT_DHT_VERBOSE_LOGGING
441 for (std::vector<tcp::endpoint>::iterator i = peers.begin()
442 , end(peers.end()); i != end; ++i)
444 TORRENT_LOG(node) << " " << *i;
446 #endif
447 return true;
450 void node_impl::incoming_request(msg const& m)
452 msg reply;
453 reply.message_id = m.message_id;
454 reply.addr = m.addr;
455 reply.reply = true;
456 reply.transaction_id = m.transaction_id;
458 switch (m.message_id)
460 case messages::ping:
461 break;
462 case messages::get_peers:
464 reply.info_hash = m.info_hash;
465 reply.write_token = generate_token(m);
467 if (!on_find(m, reply.peers))
469 // we don't have any peers for this info_hash,
470 // return nodes instead
471 m_table.find_node(m.info_hash, reply.nodes, false);
472 #ifdef TORRENT_DHT_VERBOSE_LOGGING
473 for (std::vector<node_entry>::iterator i = reply.nodes.begin()
474 , end(reply.nodes.end()); i != end; ++i)
476 TORRENT_LOG(node) << " " << i->id << " " << i->addr;
478 #endif
481 break;
482 case messages::find_node:
484 reply.info_hash = m.info_hash;
486 m_table.find_node(m.info_hash, reply.nodes, false);
487 #ifdef TORRENT_DHT_VERBOSE_LOGGING
488 for (std::vector<node_entry>::iterator i = reply.nodes.begin()
489 , end(reply.nodes.end()); i != end; ++i)
491 TORRENT_LOG(node) << " " << i->id << " " << i->addr;
493 #endif
495 break;
496 case messages::announce_peer:
497 on_announce(m, reply);
498 break;
499 default:
500 TORRENT_ASSERT(false);
503 if (m_table.need_node(m.id))
504 m_rpc.reply_with_ping(reply);
505 else
506 m_rpc.reply(reply);
510 } } // namespace libtorrent::dht