3 Copyright (c) 2006, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
34 #include "libtorrent/socket.hpp"
36 #include <boost/bind.hpp>
37 #include <boost/mpl/max_element.hpp>
38 #include <boost/mpl/vector.hpp>
39 #include <boost/mpl/sizeof.hpp>
40 #include <boost/mpl/transform_view.hpp>
41 #include <boost/mpl/deref.hpp>
42 #include <boost/lexical_cast.hpp>
44 #include <libtorrent/io.hpp>
45 #include <libtorrent/invariant_check.hpp>
46 #include <libtorrent/kademlia/rpc_manager.hpp>
47 #include <libtorrent/kademlia/logging.hpp>
48 #include <libtorrent/kademlia/routing_table.hpp>
49 #include <libtorrent/kademlia/find_data.hpp>
50 #include <libtorrent/kademlia/closest_nodes.hpp>
51 #include <libtorrent/kademlia/refresh.hpp>
52 #include <libtorrent/kademlia/node.hpp>
53 #include <libtorrent/kademlia/observer.hpp>
54 #include <libtorrent/hasher.hpp>
58 using boost::shared_ptr
;
61 namespace libtorrent
{ namespace dht
64 namespace io
= libtorrent::detail
;
65 namespace mpl
= boost::mpl
;
67 #ifdef TORRENT_DHT_VERBOSE_LOGGING
68 TORRENT_DEFINE_LOG(rpc
)
71 void intrusive_ptr_add_ref(observer
const* o
)
73 TORRENT_ASSERT(o
->m_refs
>= 0);
74 TORRENT_ASSERT(o
!= 0);
78 void intrusive_ptr_release(observer
const* o
)
80 TORRENT_ASSERT(o
->m_refs
> 0);
81 TORRENT_ASSERT(o
!= 0);
84 boost::pool
<>& p
= o
->pool_allocator
;
86 p
.free(const_cast<observer
*>(o
));
90 node_id
generate_id();
93 closest_nodes_observer
102 typedef mpl::max_element
<
103 mpl::transform_view
<observer_types
, mpl::sizeof_
<mpl::_1
> >
104 >::type max_observer_type_iter
;
106 rpc_manager::rpc_manager(fun
const& f
, node_id
const& our_id
107 , routing_table
& table
, send_fun
const& sf
)
108 : m_pool_allocator(sizeof(mpl::deref
<max_observer_type_iter::base
>::type
))
109 , m_next_transaction_id(rand() % max_transactions
)
110 , m_oldest_transaction_id(m_next_transaction_id
)
115 , m_timer(time_now())
116 , m_random_number(generate_id())
117 , m_destructing(false)
122 rpc_manager::~rpc_manager()
124 TORRENT_ASSERT(!m_destructing
);
125 m_destructing
= true;
126 #ifdef TORRENT_DHT_VERBOSE_LOGGING
127 TORRENT_LOG(rpc
) << "Destructing";
129 std::for_each(m_aborted_transactions
.begin(), m_aborted_transactions
.end()
130 , bind(&observer::abort
, _1
));
132 for (transactions_t::iterator i
= m_transactions
.begin()
133 , end(m_transactions
.end()); i
!= end
; ++i
)
135 if (*i
) (*i
)->abort();
140 size_t rpc_manager::allocation_size() const
142 size_t s
= sizeof(mpl::deref
<max_observer_type_iter::base
>::type
);
146 void rpc_manager::check_invariant() const
148 TORRENT_ASSERT(m_oldest_transaction_id
>= 0);
149 TORRENT_ASSERT(m_oldest_transaction_id
< max_transactions
);
150 TORRENT_ASSERT(m_next_transaction_id
>= 0);
151 TORRENT_ASSERT(m_next_transaction_id
< max_transactions
);
152 TORRENT_ASSERT(!m_transactions
[m_next_transaction_id
]);
154 for (int i
= (m_next_transaction_id
+ 1) % max_transactions
;
155 i
!= m_oldest_transaction_id
; i
= (i
+ 1) % max_transactions
)
157 TORRENT_ASSERT(!m_transactions
[i
]);
162 void rpc_manager::unreachable(udp::endpoint
const& ep
)
164 #ifdef TORRENT_DHT_VERBOSE_LOGGING
165 TORRENT_LOG(rpc
) << time_now_string() << " PORT_UNREACHABLE [ ip: " << ep
<< " ]";
167 int num_active
= m_oldest_transaction_id
< m_next_transaction_id
168 ? m_next_transaction_id
- m_oldest_transaction_id
169 : max_transactions
- m_next_transaction_id
+ m_oldest_transaction_id
;
170 TORRENT_ASSERT((m_oldest_transaction_id
+ num_active
) % max_transactions
171 == m_next_transaction_id
);
172 int tid
= m_oldest_transaction_id
;
173 for (int i
= 0; i
< num_active
; ++i
, ++tid
)
175 if (tid
>= max_transactions
) tid
= 0;
176 observer_ptr
const& o
= m_transactions
[tid
];
178 if (o
->target_addr
!= ep
) continue;
179 observer_ptr ptr
= m_transactions
[tid
];
180 m_transactions
[tid
] = 0;
181 if (tid
== m_oldest_transaction_id
)
183 ++m_oldest_transaction_id
;
184 if (m_oldest_transaction_id
>= max_transactions
)
185 m_oldest_transaction_id
= 0;
187 #ifdef TORRENT_DHT_VERBOSE_LOGGING
188 TORRENT_LOG(rpc
) << " found transaction [ tid: " << tid
<< " ]";
195 bool rpc_manager::incoming(msg
const& m
)
199 if (m_destructing
) return false;
203 // if we don't have the transaction id in our
204 // request list, ignore the packet
206 if (m
.transaction_id
.size() < 2)
208 #ifdef TORRENT_DHT_VERBOSE_LOGGING
209 TORRENT_LOG(rpc
) << "Reply with invalid transaction id size: "
210 << m
.transaction_id
.size() << " from " << m
.addr
;
214 reply
.message_id
= messages::error
;
215 reply
.error_code
= 203; // Protocol error
216 reply
.error_msg
= "reply with invalid transaction id, size "
217 + boost::lexical_cast
<std::string
>(m
.transaction_id
.size());
219 reply
.transaction_id
= "";
224 std::string::const_iterator i
= m
.transaction_id
.begin();
225 int tid
= io::read_uint16(i
);
227 if (tid
>= (int)m_transactions
.size()
230 #ifdef TORRENT_DHT_VERBOSE_LOGGING
231 TORRENT_LOG(rpc
) << "Reply with invalid transaction id: "
232 << tid
<< " from " << m
.addr
;
236 reply
.message_id
= messages::error
;
237 reply
.error_code
= 203; // Protocol error
238 reply
.error_msg
= "reply with invalid transaction id";
240 reply
.transaction_id
= "";
245 observer_ptr o
= m_transactions
[tid
];
249 #ifdef TORRENT_DHT_VERBOSE_LOGGING
250 TORRENT_LOG(rpc
) << "Reply with unknown transaction id: "
251 << tid
<< " from " << m
.addr
<< " (possibly timed out)";
256 if (m
.addr
.address() != o
->target_addr
.address())
258 #ifdef TORRENT_DHT_VERBOSE_LOGGING
259 TORRENT_LOG(rpc
) << "Reply with incorrect address and valid transaction id: "
260 << tid
<< " from " << m
.addr
<< " expected: " << o
->target_addr
;
265 #ifdef TORRENT_DHT_VERBOSE_LOGGING
266 std::ofstream
reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app
);
267 reply_stats
<< m
.addr
<< "\t" << total_milliseconds(time_now() - o
->sent
)
270 #ifdef TORRENT_DHT_VERBOSE_LOGGING
271 TORRENT_LOG(rpc
) << "Reply with transaction id: "
272 << tid
<< " from " << m
.addr
;
275 m_transactions
[tid
] = 0;
277 if (m
.piggy_backed_ping
)
279 // there is a ping request piggy
280 // backed in this reply
282 ph
.message_id
= messages::ping
;
283 ph
.transaction_id
= m
.ping_transaction_id
;
289 return m_table
.node_seen(m
.id
, m
.addr
);
293 TORRENT_ASSERT(m
.message_id
!= messages::error
);
294 // this is an incoming request
300 time_duration
rpc_manager::tick()
304 const int timeout_ms
= 10 * 1000;
306 // look for observers that has timed out
308 if (m_next_transaction_id
== m_oldest_transaction_id
) return milliseconds(timeout_ms
);
310 std::vector
<observer_ptr
> timeouts
;
312 for (;m_next_transaction_id
!= m_oldest_transaction_id
;
313 m_oldest_transaction_id
= (m_oldest_transaction_id
+ 1) % max_transactions
)
315 TORRENT_ASSERT(m_oldest_transaction_id
>= 0);
316 TORRENT_ASSERT(m_oldest_transaction_id
< max_transactions
);
318 observer_ptr o
= m_transactions
[m_oldest_transaction_id
];
321 time_duration diff
= o
->sent
+ milliseconds(timeout_ms
) - time_now();
322 if (diff
> seconds(0))
324 if (diff
< seconds(1)) return seconds(1);
330 m_transactions
[m_oldest_transaction_id
] = 0;
331 #ifdef TORRENT_DHT_VERBOSE_LOGGING
332 TORRENT_LOG(rpc
) << "Timing out transaction id: "
333 << m_oldest_transaction_id
<< " from " << o
->target_addr
;
335 timeouts
.push_back(o
);
336 } catch (std::exception
) {}
339 std::for_each(timeouts
.begin(), timeouts
.end(), bind(&observer::timeout
, _1
));
342 // clear the aborted transactions, will likely
343 // generate new requests. We need to swap, since the
344 // destrutors may add more observers to the m_aborted_transactions
345 std::vector
<observer_ptr
>().swap(m_aborted_transactions
);
346 return milliseconds(timeout_ms
);
349 unsigned int rpc_manager::new_transaction_id(observer_ptr o
)
353 unsigned int tid
= m_next_transaction_id
;
354 m_next_transaction_id
= (m_next_transaction_id
+ 1) % max_transactions
;
355 if (m_transactions
[m_next_transaction_id
])
357 // moving the observer into the set of aborted transactions
358 // it will prevent it from spawning new requests right now,
359 // since that would break the invariant
360 observer_ptr o
= m_transactions
[m_next_transaction_id
];
361 m_aborted_transactions
.push_back(o
);
362 #ifdef TORRENT_DHT_VERBOSE_LOGGING
363 TORRENT_LOG(rpc
) << "[new_transaction_id] Aborting message with transaction id: "
364 << m_next_transaction_id
<< " sent to " << o
->target_addr
365 << " " << total_seconds(time_now() - o
->sent
) << " seconds ago";
367 m_transactions
[m_next_transaction_id
] = 0;
368 TORRENT_ASSERT(m_oldest_transaction_id
== m_next_transaction_id
);
370 TORRENT_ASSERT(!m_transactions
[tid
]);
371 m_transactions
[tid
] = o
;
372 if (m_oldest_transaction_id
== m_next_transaction_id
)
374 m_oldest_transaction_id
= (m_oldest_transaction_id
+ 1) % max_transactions
;
375 #ifdef TORRENT_DHT_VERBOSE_LOGGING
376 TORRENT_LOG(rpc
) << "WARNING: transaction limit reached! Too many concurrent"
377 " messages! limit: " << (int)max_transactions
;
379 update_oldest_transaction_id();
385 void rpc_manager::update_oldest_transaction_id()
389 TORRENT_ASSERT(m_oldest_transaction_id
!= m_next_transaction_id
);
390 while (!m_transactions
[m_oldest_transaction_id
])
392 m_oldest_transaction_id
= (m_oldest_transaction_id
+ 1)
394 if (m_oldest_transaction_id
== m_next_transaction_id
)
399 void rpc_manager::invoke(int message_id
, udp::endpoint target_addr
411 m
.message_id
= message_id
;
414 m
.addr
= target_addr
;
415 TORRENT_ASSERT(!m_transactions
[m_next_transaction_id
]);
417 int potential_new_id
= m_next_transaction_id
;
421 m
.transaction_id
.clear();
422 std::back_insert_iterator
<std::string
> out(m
.transaction_id
);
423 io::write_uint16(m_next_transaction_id
, out
);
427 o
->sent
= time_now();
428 o
->target_addr
= target_addr
;
430 #ifdef TORRENT_DHT_VERBOSE_LOGGING
431 TORRENT_LOG(rpc
) << "Invoking " << messages::ids
[message_id
]
432 << " -> " << target_addr
;
435 new_transaction_id(o
);
437 catch (std::exception
& e
)
439 // m_send may fail with "no route to host"
440 TORRENT_ASSERT(potential_new_id
== m_next_transaction_id
);
445 void rpc_manager::reply(msg
& m
)
449 if (m_destructing
) return;
451 TORRENT_ASSERT(m
.reply
);
452 m
.piggy_backed_ping
= false;
458 void rpc_manager::reply_with_ping(msg
& m
)
462 if (m_destructing
) return;
463 TORRENT_ASSERT(m
.reply
);
465 m
.piggy_backed_ping
= true;
468 m
.ping_transaction_id
.clear();
469 std::back_insert_iterator
<std::string
> out(m
.ping_transaction_id
);
470 io::write_uint16(m_next_transaction_id
, out
);
472 TORRENT_ASSERT(allocation_size() >= sizeof(null_observer
));
473 observer_ptr
o(new (allocator().malloc()) null_observer(allocator()));
475 o
->m_in_constructor
= false;
477 TORRENT_ASSERT(!m_transactions
[m_next_transaction_id
]);
478 o
->sent
= time_now();
479 o
->target_addr
= m
.addr
;
482 new_transaction_id(o
);
487 } } // namespace libtorrent::dht