fixed bug in windows path of file.cpp
[libtorrent.git] / src / kademlia / rpc_manager.cpp
blob384be2eed83530e62d712c9b0741a11c1672cd4b
1 /*
3 Copyright (c) 2006, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
34 #include "libtorrent/socket.hpp"
36 #include <boost/bind.hpp>
37 #include <boost/mpl/max_element.hpp>
38 #include <boost/mpl/vector.hpp>
39 #include <boost/mpl/sizeof.hpp>
40 #include <boost/mpl/transform_view.hpp>
41 #include <boost/mpl/deref.hpp>
42 #include <boost/lexical_cast.hpp>
44 #include <libtorrent/io.hpp>
45 #include <libtorrent/invariant_check.hpp>
46 #include <libtorrent/kademlia/rpc_manager.hpp>
47 #include <libtorrent/kademlia/logging.hpp>
48 #include <libtorrent/kademlia/routing_table.hpp>
49 #include <libtorrent/kademlia/find_data.hpp>
50 #include <libtorrent/kademlia/closest_nodes.hpp>
51 #include <libtorrent/kademlia/refresh.hpp>
52 #include <libtorrent/kademlia/node.hpp>
53 #include <libtorrent/kademlia/observer.hpp>
54 #include <libtorrent/hasher.hpp>
56 #include <fstream>
58 using boost::shared_ptr;
59 using boost::bind;
61 namespace libtorrent { namespace dht
64 namespace io = libtorrent::detail;
65 namespace mpl = boost::mpl;
67 #ifdef TORRENT_DHT_VERBOSE_LOGGING
68 TORRENT_DEFINE_LOG(rpc)
69 #endif
71 void intrusive_ptr_add_ref(observer const* o)
73 TORRENT_ASSERT(o->m_refs >= 0);
74 TORRENT_ASSERT(o != 0);
75 ++o->m_refs;
78 void intrusive_ptr_release(observer const* o)
80 TORRENT_ASSERT(o->m_refs > 0);
81 TORRENT_ASSERT(o != 0);
82 if (--o->m_refs == 0)
84 boost::pool<>& p = o->pool_allocator;
85 o->~observer();
86 p.free(const_cast<observer*>(o));
90 node_id generate_id();
92 typedef mpl::vector<
93 closest_nodes_observer
94 , find_data_observer
95 , announce_observer
96 , get_peers_observer
97 , refresh_observer
98 , ping_observer
99 , null_observer
100 > observer_types;
102 typedef mpl::max_element<
103 mpl::transform_view<observer_types, mpl::sizeof_<mpl::_1> >
104 >::type max_observer_type_iter;
106 rpc_manager::rpc_manager(fun const& f, node_id const& our_id
107 , routing_table& table, send_fun const& sf)
108 : m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type))
109 , m_next_transaction_id(rand() % max_transactions)
110 , m_oldest_transaction_id(m_next_transaction_id)
111 , m_incoming(f)
112 , m_send(sf)
113 , m_our_id(our_id)
114 , m_table(table)
115 , m_timer(time_now())
116 , m_random_number(generate_id())
117 , m_destructing(false)
119 std::srand(time(0));
122 rpc_manager::~rpc_manager()
124 TORRENT_ASSERT(!m_destructing);
125 m_destructing = true;
126 #ifdef TORRENT_DHT_VERBOSE_LOGGING
127 TORRENT_LOG(rpc) << "Destructing";
128 #endif
129 std::for_each(m_aborted_transactions.begin(), m_aborted_transactions.end()
130 , bind(&observer::abort, _1));
132 for (transactions_t::iterator i = m_transactions.begin()
133 , end(m_transactions.end()); i != end; ++i)
135 if (*i) (*i)->abort();
139 #ifndef NDEBUG
140 size_t rpc_manager::allocation_size() const
142 size_t s = sizeof(mpl::deref<max_observer_type_iter::base>::type);
143 return s;
146 void rpc_manager::check_invariant() const
148 TORRENT_ASSERT(m_oldest_transaction_id >= 0);
149 TORRENT_ASSERT(m_oldest_transaction_id < max_transactions);
150 TORRENT_ASSERT(m_next_transaction_id >= 0);
151 TORRENT_ASSERT(m_next_transaction_id < max_transactions);
152 TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
154 for (int i = (m_next_transaction_id + 1) % max_transactions;
155 i != m_oldest_transaction_id; i = (i + 1) % max_transactions)
157 TORRENT_ASSERT(!m_transactions[i]);
160 #endif
162 void rpc_manager::unreachable(udp::endpoint const& ep)
164 #ifdef TORRENT_DHT_VERBOSE_LOGGING
165 TORRENT_LOG(rpc) << time_now_string() << " PORT_UNREACHABLE [ ip: " << ep << " ]";
166 #endif
167 int num_active = m_oldest_transaction_id < m_next_transaction_id
168 ? m_next_transaction_id - m_oldest_transaction_id
169 : max_transactions - m_next_transaction_id + m_oldest_transaction_id;
170 TORRENT_ASSERT((m_oldest_transaction_id + num_active) % max_transactions
171 == m_next_transaction_id);
172 int tid = m_oldest_transaction_id;
173 for (int i = 0; i < num_active; ++i, ++tid)
175 if (tid >= max_transactions) tid = 0;
176 observer_ptr const& o = m_transactions[tid];
177 if (!o) continue;
178 if (o->target_addr != ep) continue;
179 observer_ptr ptr = m_transactions[tid];
180 m_transactions[tid] = 0;
181 if (tid == m_oldest_transaction_id)
183 ++m_oldest_transaction_id;
184 if (m_oldest_transaction_id >= max_transactions)
185 m_oldest_transaction_id = 0;
187 #ifdef TORRENT_DHT_VERBOSE_LOGGING
188 TORRENT_LOG(rpc) << " found transaction [ tid: " << tid << " ]";
189 #endif
190 ptr->timeout();
191 return;
195 bool rpc_manager::incoming(msg const& m)
197 INVARIANT_CHECK;
199 if (m_destructing) return false;
201 if (m.reply)
203 // if we don't have the transaction id in our
204 // request list, ignore the packet
206 if (m.transaction_id.size() < 2)
208 #ifdef TORRENT_DHT_VERBOSE_LOGGING
209 TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
210 << m.transaction_id.size() << " from " << m.addr;
211 #endif
212 msg reply;
213 reply.reply = true;
214 reply.message_id = messages::error;
215 reply.error_code = 203; // Protocol error
216 reply.error_msg = "reply with invalid transaction id, size "
217 + boost::lexical_cast<std::string>(m.transaction_id.size());
218 reply.addr = m.addr;
219 reply.transaction_id = "";
220 m_send(reply);
221 return false;
224 std::string::const_iterator i = m.transaction_id.begin();
225 int tid = io::read_uint16(i);
227 if (tid >= (int)m_transactions.size()
228 || tid < 0)
230 #ifdef TORRENT_DHT_VERBOSE_LOGGING
231 TORRENT_LOG(rpc) << "Reply with invalid transaction id: "
232 << tid << " from " << m.addr;
233 #endif
234 msg reply;
235 reply.reply = true;
236 reply.message_id = messages::error;
237 reply.error_code = 203; // Protocol error
238 reply.error_msg = "reply with invalid transaction id";
239 reply.addr = m.addr;
240 reply.transaction_id = "";
241 m_send(reply);
242 return false;
245 observer_ptr o = m_transactions[tid];
247 if (!o)
249 #ifdef TORRENT_DHT_VERBOSE_LOGGING
250 TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
251 << tid << " from " << m.addr << " (possibly timed out)";
252 #endif
253 return false;
256 if (m.addr.address() != o->target_addr.address())
258 #ifdef TORRENT_DHT_VERBOSE_LOGGING
259 TORRENT_LOG(rpc) << "Reply with incorrect address and valid transaction id: "
260 << tid << " from " << m.addr << " expected: " << o->target_addr;
261 #endif
262 return false;
265 #ifdef TORRENT_DHT_VERBOSE_LOGGING
266 std::ofstream reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app);
267 reply_stats << m.addr << "\t" << total_milliseconds(time_now() - o->sent)
268 << std::endl;
269 #endif
270 #ifdef TORRENT_DHT_VERBOSE_LOGGING
271 TORRENT_LOG(rpc) << "Reply with transaction id: "
272 << tid << " from " << m.addr;
273 #endif
274 o->reply(m);
275 m_transactions[tid] = 0;
277 if (m.piggy_backed_ping)
279 // there is a ping request piggy
280 // backed in this reply
281 msg ph;
282 ph.message_id = messages::ping;
283 ph.transaction_id = m.ping_transaction_id;
284 ph.addr = m.addr;
285 ph.reply = true;
287 reply(ph);
289 return m_table.node_seen(m.id, m.addr);
291 else
293 TORRENT_ASSERT(m.message_id != messages::error);
294 // this is an incoming request
295 m_incoming(m);
297 return false;
300 time_duration rpc_manager::tick()
302 INVARIANT_CHECK;
304 const int timeout_ms = 10 * 1000;
306 // look for observers that has timed out
308 if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms);
310 std::vector<observer_ptr > timeouts;
312 for (;m_next_transaction_id != m_oldest_transaction_id;
313 m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
315 TORRENT_ASSERT(m_oldest_transaction_id >= 0);
316 TORRENT_ASSERT(m_oldest_transaction_id < max_transactions);
318 observer_ptr o = m_transactions[m_oldest_transaction_id];
319 if (!o) continue;
321 time_duration diff = o->sent + milliseconds(timeout_ms) - time_now();
322 if (diff > seconds(0))
324 if (diff < seconds(1)) return seconds(1);
325 return diff;
330 m_transactions[m_oldest_transaction_id] = 0;
331 #ifdef TORRENT_DHT_VERBOSE_LOGGING
332 TORRENT_LOG(rpc) << "Timing out transaction id: "
333 << m_oldest_transaction_id << " from " << o->target_addr;
334 #endif
335 timeouts.push_back(o);
336 } catch (std::exception) {}
339 std::for_each(timeouts.begin(), timeouts.end(), bind(&observer::timeout, _1));
340 timeouts.clear();
342 // clear the aborted transactions, will likely
343 // generate new requests. We need to swap, since the
344 // destrutors may add more observers to the m_aborted_transactions
345 std::vector<observer_ptr>().swap(m_aborted_transactions);
346 return milliseconds(timeout_ms);
349 unsigned int rpc_manager::new_transaction_id(observer_ptr o)
351 INVARIANT_CHECK;
353 unsigned int tid = m_next_transaction_id;
354 m_next_transaction_id = (m_next_transaction_id + 1) % max_transactions;
355 if (m_transactions[m_next_transaction_id])
357 // moving the observer into the set of aborted transactions
358 // it will prevent it from spawning new requests right now,
359 // since that would break the invariant
360 observer_ptr o = m_transactions[m_next_transaction_id];
361 m_aborted_transactions.push_back(o);
362 #ifdef TORRENT_DHT_VERBOSE_LOGGING
363 TORRENT_LOG(rpc) << "[new_transaction_id] Aborting message with transaction id: "
364 << m_next_transaction_id << " sent to " << o->target_addr
365 << " " << total_seconds(time_now() - o->sent) << " seconds ago";
366 #endif
367 m_transactions[m_next_transaction_id] = 0;
368 TORRENT_ASSERT(m_oldest_transaction_id == m_next_transaction_id);
370 TORRENT_ASSERT(!m_transactions[tid]);
371 m_transactions[tid] = o;
372 if (m_oldest_transaction_id == m_next_transaction_id)
374 m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions;
375 #ifdef TORRENT_DHT_VERBOSE_LOGGING
376 TORRENT_LOG(rpc) << "WARNING: transaction limit reached! Too many concurrent"
377 " messages! limit: " << (int)max_transactions;
378 #endif
379 update_oldest_transaction_id();
382 return tid;
385 void rpc_manager::update_oldest_transaction_id()
387 INVARIANT_CHECK;
389 TORRENT_ASSERT(m_oldest_transaction_id != m_next_transaction_id);
390 while (!m_transactions[m_oldest_transaction_id])
392 m_oldest_transaction_id = (m_oldest_transaction_id + 1)
393 % max_transactions;
394 if (m_oldest_transaction_id == m_next_transaction_id)
395 break;
399 void rpc_manager::invoke(int message_id, udp::endpoint target_addr
400 , observer_ptr o)
402 INVARIANT_CHECK;
404 if (m_destructing)
406 o->abort();
407 return;
410 msg m;
411 m.message_id = message_id;
412 m.reply = false;
413 m.id = m_our_id;
414 m.addr = target_addr;
415 TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
416 #ifndef NDEBUG
417 int potential_new_id = m_next_transaction_id;
418 #endif
421 m.transaction_id.clear();
422 std::back_insert_iterator<std::string> out(m.transaction_id);
423 io::write_uint16(m_next_transaction_id, out);
425 o->send(m);
427 o->sent = time_now();
428 o->target_addr = target_addr;
430 #ifdef TORRENT_DHT_VERBOSE_LOGGING
431 TORRENT_LOG(rpc) << "Invoking " << messages::ids[message_id]
432 << " -> " << target_addr;
433 #endif
434 m_send(m);
435 new_transaction_id(o);
437 catch (std::exception& e)
439 // m_send may fail with "no route to host"
440 TORRENT_ASSERT(potential_new_id == m_next_transaction_id);
441 o->abort();
445 void rpc_manager::reply(msg& m)
447 INVARIANT_CHECK;
449 if (m_destructing) return;
451 TORRENT_ASSERT(m.reply);
452 m.piggy_backed_ping = false;
453 m.id = m_our_id;
455 m_send(m);
458 void rpc_manager::reply_with_ping(msg& m)
460 INVARIANT_CHECK;
462 if (m_destructing) return;
463 TORRENT_ASSERT(m.reply);
465 m.piggy_backed_ping = true;
466 m.id = m_our_id;
468 m.ping_transaction_id.clear();
469 std::back_insert_iterator<std::string> out(m.ping_transaction_id);
470 io::write_uint16(m_next_transaction_id, out);
472 TORRENT_ASSERT(allocation_size() >= sizeof(null_observer));
473 observer_ptr o(new (allocator().malloc()) null_observer(allocator()));
474 #ifndef NDEBUG
475 o->m_in_constructor = false;
476 #endif
477 TORRENT_ASSERT(!m_transactions[m_next_transaction_id]);
478 o->sent = time_now();
479 o->target_addr = m.addr;
481 m_send(m);
482 new_transaction_id(o);
487 } } // namespace libtorrent::dht