fix piece_picker piece-shuffle bug
[libtorrent.git] / src / session_impl.cpp
blobcca630d01310e263a85213210e42ca8df5767795
1 /*
3 Copyright (c) 2006, Arvid Norberg, Magnus Jonsson
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <ctime>
36 #include <iostream>
37 #include <fstream>
38 #include <iomanip>
39 #include <iterator>
40 #include <algorithm>
41 #include <set>
42 #include <cctype>
43 #include <algorithm>
45 #ifdef _MSC_VER
46 #pragma warning(push, 1)
47 #endif
49 #include <boost/lexical_cast.hpp>
50 #include <boost/filesystem/convenience.hpp>
51 #include <boost/filesystem/exception.hpp>
52 #include <boost/limits.hpp>
53 #include <boost/bind.hpp>
55 #ifdef _MSC_VER
56 #pragma warning(pop)
57 #endif
59 #include "libtorrent/peer_id.hpp"
60 #include "libtorrent/torrent_info.hpp"
61 #include "libtorrent/tracker_manager.hpp"
62 #include "libtorrent/bencode.hpp"
63 #include "libtorrent/hasher.hpp"
64 #include "libtorrent/entry.hpp"
65 #include "libtorrent/session.hpp"
66 #include "libtorrent/fingerprint.hpp"
67 #include "libtorrent/entry.hpp"
68 #include "libtorrent/alert_types.hpp"
69 #include "libtorrent/invariant_check.hpp"
70 #include "libtorrent/file.hpp"
71 #include "libtorrent/bt_peer_connection.hpp"
72 #include "libtorrent/ip_filter.hpp"
73 #include "libtorrent/socket.hpp"
74 #include "libtorrent/aux_/session_impl.hpp"
75 #include "libtorrent/kademlia/dht_tracker.hpp"
76 #include "libtorrent/enum_net.hpp"
77 #include "libtorrent/config.hpp"
79 #ifndef TORRENT_WINDOWS
80 #include <sys/resource.h>
81 #endif
83 #ifndef TORRENT_DISABLE_ENCRYPTION
85 #include <openssl/crypto.h>
87 namespace
89 // openssl requires this to clean up internal
90 // structures it allocates
91 struct openssl_cleanup
93 ~openssl_cleanup() { CRYPTO_cleanup_all_ex_data(); }
94 } openssl_global_destructor;
97 #endif
98 #ifdef _WIN32
99 // for ERROR_SEM_TIMEOUT
100 #include <winerror.h>
101 #endif
103 using boost::shared_ptr;
104 using boost::weak_ptr;
105 using boost::bind;
106 using boost::mutex;
107 using libtorrent::aux::session_impl;
109 namespace libtorrent {
111 namespace fs = boost::filesystem;
113 namespace detail
116 std::string generate_auth_string(std::string const& user
117 , std::string const& passwd)
119 if (user.empty()) return std::string();
120 return user + ":" + passwd;
125 namespace aux {
127 struct seed_random_generator
129 seed_random_generator()
131 std::srand(total_microseconds(time_now() - min_time()));
135 session_impl::session_impl(
136 std::pair<int, int> listen_port_range
137 , fingerprint const& cl_fprint
138 , char const* listen_interface
139 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
140 , fs::path const& logpath
141 #endif
144 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
145 m_send_buffers(send_buffer_size),
146 #endif
147 m_files(40)
148 , m_io_service()
149 , m_disk_thread(m_io_service)
150 , m_half_open(m_io_service)
151 , m_download_channel(m_io_service, peer_connection::download_channel)
152 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
153 , m_upload_channel(m_io_service, peer_connection::upload_channel, true)
154 #else
155 , m_upload_channel(m_io_service, peer_connection::upload_channel)
156 #endif
157 , m_tracker_manager(m_settings, m_tracker_proxy)
158 , m_listen_port_retries(listen_port_range.second - listen_port_range.first)
159 , m_listen_interface(address::from_string(listen_interface), listen_port_range.first)
160 , m_abort(false)
161 , m_paused(false)
162 , m_max_uploads(8)
163 , m_allowed_upload_slots(8)
164 , m_max_connections(200)
165 , m_num_unchoked(0)
166 , m_unchoke_time_scaler(0)
167 , m_auto_manage_time_scaler(0)
168 , m_optimistic_unchoke_time_scaler(0)
169 , m_disconnect_time_scaler(90)
170 , m_auto_scrape_time_scaler(180)
171 , m_incoming_connection(false)
172 , m_last_tick(time_now())
173 #ifndef TORRENT_DISABLE_DHT
174 , m_dht_same_port(true)
175 , m_external_udp_port(0)
176 , m_dht_socket(m_io_service, bind(&session_impl::on_receive_udp, this, _1, _2, _3, _4)
177 , m_half_open)
178 #endif
179 , m_timer(m_io_service)
180 , m_next_connect_torrent(0)
181 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
182 , m_logpath(logpath)
183 #endif
184 #ifndef TORRENT_DISABLE_GEO_IP
185 , m_asnum_db(0)
186 , m_country_db(0)
187 #endif
188 , m_total_failed_bytes(0)
189 , m_total_redundant_bytes(0)
191 m_tcp_mapping[0] = -1;
192 m_tcp_mapping[1] = -1;
193 m_udp_mapping[0] = -1;
194 m_udp_mapping[1] = -1;
195 #ifdef WIN32
196 // windows XP has a limit on the number of
197 // simultaneous half-open TCP connections
198 DWORD windows_version = ::GetVersion();
199 if ((windows_version & 0xff) >= 6)
201 // on vista the limit is 5 (in home edition)
202 m_half_open.limit(4);
204 else
206 // on XP SP2 it's 10
207 m_half_open.limit(8);
209 #endif
211 m_bandwidth_manager[peer_connection::download_channel] = &m_download_channel;
212 m_bandwidth_manager[peer_connection::upload_channel] = &m_upload_channel;
214 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
215 m_logger = create_log("main_session", listen_port(), false);
216 (*m_logger) << time_now_string() << "\n";
218 (*m_logger) << "sizeof(torrent): " << sizeof(torrent) << "\n";
219 (*m_logger) << "sizeof(peer_connection): " << sizeof(peer_connection) << "\n";
220 (*m_logger) << "sizeof(bt_peer_connection): " << sizeof(bt_peer_connection) << "\n";
221 (*m_logger) << "sizeof(policy::peer): " << sizeof(policy::peer) << "\n";
222 (*m_logger) << "sizeof(tcp::endpoint): " << sizeof(tcp::endpoint) << "\n";
223 (*m_logger) << "sizeof(address): " << sizeof(address) << "\n";
224 (*m_logger) << "sizeof(address_v4): " << sizeof(address_v4) << "\n";
225 (*m_logger) << "sizeof(address_v6): " << sizeof(address_v6) << "\n";
226 (*m_logger) << "sizeof(void*): " << sizeof(void*) << "\n";
227 #endif
229 #ifdef TORRENT_STATS
230 m_stats_logger.open("session_stats.log", std::ios::trunc);
231 m_stats_logger <<
232 "1. second\n"
233 "2. upload rate\n"
234 "3. download rate\n"
235 "4. downloading torrents\n"
236 "5. seeding torrents\n"
237 "6. peers\n"
238 "7. connecting peers\n"
239 "8. disk block buffers\n"
240 "\n";
241 m_buffer_usage_logger.open("buffer_stats.log", std::ios::trunc);
242 m_second_counter = 0;
243 m_buffer_allocations = 0;
244 #endif
246 // ---- generate a peer id ----
247 static seed_random_generator seeder;
249 m_key = rand() + (rand() << 15) + (rand() << 30);
250 std::string print = cl_fprint.to_string();
251 TORRENT_ASSERT(print.length() <= 20);
253 // the client's fingerprint
254 std::copy(
255 print.begin()
256 , print.begin() + print.length()
257 , m_peer_id.begin());
259 // http-accepted characters:
260 static char const printable[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
261 "abcdefghijklmnopqrstuvwxyz-_.!~*'()";
263 // the random number
264 for (unsigned char* i = m_peer_id.begin() + print.length();
265 i != m_peer_id.end(); ++i)
267 *i = printable[rand() % (sizeof(printable)-1)];
270 error_code ec;
271 m_timer.expires_from_now(seconds(1), ec);
272 m_timer.async_wait(
273 bind(&session_impl::second_tick, this, _1));
275 m_thread.reset(new boost::thread(boost::ref(*this)));
278 #ifndef TORRENT_DISABLE_GEO_IP
279 namespace
281 struct free_ptr
283 void* ptr_;
284 free_ptr(void* p): ptr_(p) {}
285 ~free_ptr() { free(ptr_); }
289 char const* session_impl::country_for_ip(address const& a)
291 if (!a.is_v4() || m_country_db == 0) return 0;
292 return GeoIP_country_code_by_ipnum(m_country_db, a.to_v4().to_ulong());
295 int session_impl::as_for_ip(address const& a)
297 if (!a.is_v4() || m_asnum_db == 0) return 0;
298 char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong());
299 if (name == 0) return 0;
300 free_ptr p(name);
301 // GeoIP returns the name as AS??? where ? is the AS-number
302 return atoi(name + 2);
305 std::string session_impl::as_name_for_ip(address const& a)
307 if (!a.is_v4() || m_asnum_db == 0) return std::string();
308 char* name = GeoIP_name_by_ipnum(m_asnum_db, a.to_v4().to_ulong());
309 if (name == 0) return std::string();
310 free_ptr p(name);
311 char* tmp = std::strchr(name, ' ');
312 if (tmp == 0) return std::string();
313 return tmp + 1;
316 std::pair<const int, int>* session_impl::lookup_as(int as)
318 std::map<int, int>::iterator i = m_as_peak.lower_bound(as);
320 if (i == m_as_peak.end() || i->first != as)
322 // we don't have any data for this AS, insert a new entry
323 i = m_as_peak.insert(i, std::pair<int, int>(as, 0));
325 return &(*i);
328 bool session_impl::load_asnum_db(char const* file)
330 mutex_t::scoped_lock l(m_mutex);
331 if (m_asnum_db) GeoIP_delete(m_asnum_db);
332 m_asnum_db = GeoIP_open(file, GEOIP_STANDARD);
333 return m_asnum_db;
336 bool session_impl::load_country_db(char const* file)
338 mutex_t::scoped_lock l(m_mutex);
339 if (m_country_db) GeoIP_delete(m_country_db);
340 m_country_db = GeoIP_open(file, GEOIP_STANDARD);
341 return m_country_db;
344 #endif
346 void session_impl::load_state(entry const& ses_state)
348 if (ses_state.type() != entry::dictionary_t) return;
349 mutex_t::scoped_lock l(m_mutex);
350 #ifndef TORRENT_DISABLE_GEO_IP
351 entry const* as_map = ses_state.find_key("AS map");
352 if (as_map && as_map->type() == entry::dictionary_t)
354 entry::dictionary_type const& as_peak = as_map->dict();
355 for (entry::dictionary_type::const_iterator i = as_peak.begin()
356 , end(as_peak.end()); i != end; ++i)
358 int as_num = atoi(i->first.c_str());
359 if (i->second.type() != entry::int_t || i->second.integer() == 0) continue;
360 int& peak = m_as_peak[as_num];
361 if (peak < i->second.integer()) peak = i->second.integer();
364 #endif
367 entry session_impl::state() const
369 mutex_t::scoped_lock l(m_mutex);
370 entry ret;
371 #ifndef TORRENT_DISABLE_GEO_IP
372 entry::dictionary_type& as_map = ret["AS map"].dict();
373 char buf[10];
374 for (std::map<int, int>::const_iterator i = m_as_peak.begin()
375 , end(m_as_peak.end()); i != end; ++i)
377 if (i->second == 0) continue;
378 sprintf(buf, "%05d", i->first);
379 as_map[buf] = i->second;
381 #endif
382 return ret;
385 #ifndef TORRENT_DISABLE_EXTENSIONS
386 void session_impl::add_extension(
387 boost::function<boost::shared_ptr<torrent_plugin>(torrent*, void*)> ext)
389 m_extensions.push_back(ext);
391 #endif
393 #ifndef TORRENT_DISABLE_DHT
394 void session_impl::add_dht_node(udp::endpoint n)
396 if (m_dht) m_dht->add_node(n);
398 #endif
400 void session_impl::pause()
402 mutex_t::scoped_lock l(m_mutex);
403 if (m_paused) return;
404 m_paused = true;
405 for (torrent_map::iterator i = m_torrents.begin()
406 , end(m_torrents.end()); i != end; ++i)
408 torrent& t = *i->second;
409 if (!t.is_torrent_paused()) t.do_pause();
413 void session_impl::resume()
415 mutex_t::scoped_lock l(m_mutex);
416 if (!m_paused) return;
417 m_paused = false;
418 for (torrent_map::iterator i = m_torrents.begin()
419 , end(m_torrents.end()); i != end; ++i)
421 torrent& t = *i->second;
422 t.do_resume();
426 void session_impl::abort()
428 mutex_t::scoped_lock l(m_mutex);
429 if (m_abort) return;
430 #if defined(TORRENT_LOGGING)
431 (*m_logger) << time_now_string() << " *** ABORT CALLED ***\n";
432 #endif
433 // abort the main thread
434 m_abort = true;
435 m_queued_for_checking.clear();
436 if (m_lsd) m_lsd->close();
437 if (m_upnp) m_upnp->close();
438 if (m_natpmp) m_natpmp->close();
439 #ifndef TORRENT_DISABLE_DHT
440 if (m_dht) m_dht->stop();
441 m_dht_socket.close();
442 #endif
443 error_code ec;
444 m_timer.cancel(ec);
446 // close the listen sockets
447 for (std::list<listen_socket_t>::iterator i = m_listen_sockets.begin()
448 , end(m_listen_sockets.end()); i != end; ++i)
450 i->sock->close(ec);
453 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
454 (*m_logger) << time_now_string() << " aborting all torrents (" << m_torrents.size() << ")\n";
455 #endif
456 // abort all torrents
457 for (torrent_map::iterator i = m_torrents.begin()
458 , end(m_torrents.end()); i != end; ++i)
460 i->second->abort();
463 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
464 (*m_logger) << time_now_string() << " aborting all tracker requests\n";
465 #endif
466 m_tracker_manager.abort_all_requests();
468 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
469 (*m_logger) << time_now_string() << " sending event=stopped to trackers\n";
470 int counter = 0;
471 #endif
472 for (torrent_map::iterator i = m_torrents.begin();
473 i != m_torrents.end(); ++i)
475 torrent& t = *i->second;
476 t.abort();
479 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
480 (*m_logger) << time_now_string() << " sent " << counter << " tracker stop requests\n";
481 #endif
483 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
484 (*m_logger) << time_now_string() << " aborting all connections (" << m_connections.size() << ")\n";
485 #endif
486 // abort all connections
487 while (!m_connections.empty())
489 #ifndef NDEBUG
490 int conn = m_connections.size();
491 #endif
492 (*m_connections.begin())->disconnect("stopping torrent");
493 TORRENT_ASSERT(conn == int(m_connections.size()) + 1);
496 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
497 (*m_logger) << time_now_string() << " shutting down connection queue\n";
498 #endif
499 m_half_open.close();
501 m_download_channel.close();
502 m_upload_channel.close();
505 void session_impl::set_port_filter(port_filter const& f)
507 mutex_t::scoped_lock l(m_mutex);
508 m_port_filter = f;
511 void session_impl::set_ip_filter(ip_filter const& f)
513 mutex_t::scoped_lock l(m_mutex);
515 INVARIANT_CHECK;
517 m_ip_filter = f;
519 // Close connections whose endpoint is filtered
520 // by the new ip-filter
521 for (torrent_map::iterator i = m_torrents.begin()
522 , end(m_torrents.end()); i != end; ++i)
523 i->second->ip_filter_updated();
526 void session_impl::set_settings(session_settings const& s)
528 mutex_t::scoped_lock l(m_mutex);
530 INVARIANT_CHECK;
532 TORRENT_ASSERT(s.file_pool_size > 0);
534 // less than 5 seconds unchoke interval is insane
535 TORRENT_ASSERT(s.unchoke_interval >= 5);
536 if (m_settings.cache_size != s.cache_size)
537 m_disk_thread.set_cache_size(s.cache_size);
538 if (m_settings.cache_expiry != s.cache_expiry)
539 m_disk_thread.set_cache_size(s.cache_expiry);
540 // if queuing settings were changed, recalculate
541 // queued torrents sooner
542 if ((m_settings.active_downloads != s.active_downloads
543 || m_settings.active_seeds != s.active_seeds
544 || m_settings.active_limit != s.active_limit)
545 && m_auto_manage_time_scaler > 2)
546 m_auto_manage_time_scaler = 2;
547 m_settings = s;
548 if (m_settings.connection_speed <= 0) m_settings.connection_speed = 200;
550 m_files.resize(m_settings.file_pool_size);
551 if (!s.auto_upload_slots) m_allowed_upload_slots = m_max_uploads;
552 // replace all occurances of '\n' with ' '.
553 std::string::iterator i = m_settings.user_agent.begin();
554 while ((i = std::find(i, m_settings.user_agent.end(), '\n'))
555 != m_settings.user_agent.end())
556 *i = ' ';
559 tcp::endpoint session_impl::get_ipv6_interface() const
561 return m_ipv6_interface;
564 session_impl::listen_socket_t session_impl::setup_listener(tcp::endpoint ep
565 , int retries, bool v6_only)
567 error_code ec;
568 listen_socket_t s;
569 s.sock.reset(new socket_acceptor(m_io_service));
570 s.sock->open(ep.protocol(), ec);
571 s.sock->set_option(socket_acceptor::reuse_address(true), ec);
572 if (ep.protocol() == tcp::v6()) s.sock->set_option(v6only(v6_only), ec);
573 s.sock->bind(ep, ec);
574 while (ec && retries > 0)
576 ec = error_code();
577 TORRENT_ASSERT(!ec);
578 --retries;
579 ep.port(ep.port() + 1);
580 s.sock->bind(ep, ec);
582 if (ec)
584 // instead of giving up, try
585 // let the OS pick a port
586 ep.port(0);
587 ec = error_code();
588 s.sock->bind(ep, ec);
590 if (ec)
592 // not even that worked, give up
593 if (m_alerts.should_post<listen_failed_alert>())
594 m_alerts.post_alert(listen_failed_alert(ep, ec));
595 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
596 std::stringstream msg;
597 msg << "cannot bind to interface '";
598 print_endpoint(msg, ep) << "' " << ec.message();
599 (*m_logger) << msg.str() << "\n";
600 #endif
601 return listen_socket_t();
603 s.external_port = s.sock->local_endpoint(ec).port();
604 s.sock->listen(0, ec);
605 if (ec)
607 if (m_alerts.should_post<listen_failed_alert>())
608 m_alerts.post_alert(listen_failed_alert(ep, ec));
609 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
610 std::stringstream msg;
611 msg << "cannot listen on interface '";
612 print_endpoint(msg, ep) << "' " << ec.message();
613 (*m_logger) << msg.str() << "\n";
614 #endif
615 return listen_socket_t();
618 if (m_alerts.should_post<listen_succeeded_alert>())
619 m_alerts.post_alert(listen_succeeded_alert(ep));
621 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
622 (*m_logger) << "listening on: " << ep
623 << " external port: " << s.external_port << "\n";
624 #endif
625 return s;
628 void session_impl::open_listen_port()
630 // close the open listen sockets
631 m_listen_sockets.clear();
632 m_incoming_connection = false;
634 if (is_any(m_listen_interface.address()))
636 // this means we should open two listen sockets
637 // one for IPv4 and one for IPv6
639 listen_socket_t s = setup_listener(
640 tcp::endpoint(address_v4::any(), m_listen_interface.port())
641 , m_listen_port_retries);
643 if (s.sock)
645 m_listen_sockets.push_back(s);
646 async_accept(s.sock);
649 s = setup_listener(
650 tcp::endpoint(address_v6::any(), m_listen_interface.port())
651 , m_listen_port_retries, true);
653 if (s.sock)
655 m_listen_sockets.push_back(s);
656 async_accept(s.sock);
659 else
661 // we should only open a single listen socket, that
662 // binds to the given interface
664 listen_socket_t s = setup_listener(
665 m_listen_interface, m_listen_port_retries);
667 if (s.sock)
669 m_listen_sockets.push_back(s);
670 async_accept(s.sock);
674 m_ipv6_interface = tcp::endpoint();
676 for (std::list<listen_socket_t>::const_iterator i = m_listen_sockets.begin()
677 , end(m_listen_sockets.end()); i != end; ++i)
679 error_code ec;
680 tcp::endpoint ep = i->sock->local_endpoint(ec);
681 if (ec || ep.address().is_v4()) continue;
683 if (ep.address().to_v6() != address_v6::any())
685 // if we're listening on a specific address
686 // pick it
687 m_ipv6_interface = ep;
689 else
691 // if we're listening on any IPv6 address, enumerate them and
692 // pick the first non-local address
693 std::vector<ip_interface> const& ifs = enum_net_interfaces(m_io_service, ec);
694 for (std::vector<ip_interface>::const_iterator i = ifs.begin()
695 , end(ifs.end()); i != end; ++i)
697 if (i->interface_address.is_v4()
698 || i->interface_address.to_v6().is_link_local()
699 || i->interface_address.to_v6().is_loopback()) continue;
700 m_ipv6_interface = tcp::endpoint(i->interface_address, ep.port());
701 break;
703 break;
707 if (!m_listen_sockets.empty())
709 error_code ec;
710 tcp::endpoint local = m_listen_sockets.front().sock->local_endpoint(ec);
711 if (!ec)
713 if (m_natpmp.get())
715 if (m_tcp_mapping[0] != -1) m_natpmp->delete_mapping(m_tcp_mapping[0]);
716 m_tcp_mapping[0] = m_natpmp->add_mapping(natpmp::tcp
717 , local.port(), local.port());
719 if (m_upnp.get())
721 if (m_tcp_mapping[1] != -1) m_upnp->delete_mapping(m_tcp_mapping[1]);
722 m_tcp_mapping[1] = m_upnp->add_mapping(upnp::tcp
723 , local.port(), local.port());
729 #ifndef TORRENT_DISABLE_DHT
731 void session_impl::on_receive_udp(error_code const& e
732 , udp::endpoint const& ep, char const* buf, int len)
734 if (e)
736 if (e == asio::error::connection_refused
737 || e == asio::error::connection_reset
738 || e == asio::error::connection_aborted)
739 m_dht->on_unreachable(ep);
741 if (m_alerts.should_post<udp_error_alert>())
742 m_alerts.post_alert(udp_error_alert(ep, e));
743 return;
746 if (len > 20 && *buf == 'd' && m_dht)
748 // this is probably a dht message
749 m_dht->on_receive(ep, buf, len);
753 #endif
755 void session_impl::async_accept(boost::shared_ptr<socket_acceptor> const& listener)
757 shared_ptr<socket_type> c(new socket_type(m_io_service));
758 c->instantiate<stream_socket>(m_io_service);
759 listener->async_accept(c->get<stream_socket>()
760 , bind(&session_impl::on_incoming_connection, this, c
761 , boost::weak_ptr<socket_acceptor>(listener), _1));
764 void session_impl::on_incoming_connection(shared_ptr<socket_type> const& s
765 , weak_ptr<socket_acceptor> listen_socket, error_code const& e)
767 boost::shared_ptr<socket_acceptor> listener = listen_socket.lock();
768 if (!listener) return;
770 if (e == asio::error::operation_aborted) return;
772 mutex_t::scoped_lock l(m_mutex);
773 if (m_abort) return;
775 error_code ec;
776 if (e)
778 tcp::endpoint ep = listener->local_endpoint(ec);
779 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
780 std::string msg = "error accepting connection on '"
781 + boost::lexical_cast<std::string>(ep) + "' " + e.message();
782 (*m_logger) << msg << "\n";
783 #endif
784 #ifdef TORRENT_WINDOWS
785 // Windows sometimes generates this error. It seems to be
786 // non-fatal and we have to do another async_accept.
787 if (e.value() == ERROR_SEM_TIMEOUT)
789 async_accept(listener);
790 return;
792 #endif
793 #ifdef TORRENT_BSD
794 // Leopard sometimes generates an "invalid argument" error. It seems to be
795 // non-fatal and we have to do another async_accept.
796 if (e.value() == EINVAL)
798 async_accept(listener);
799 return;
801 #endif
802 if (m_alerts.should_post<listen_failed_alert>())
803 m_alerts.post_alert(listen_failed_alert(ep, e));
804 return;
806 async_accept(listener);
808 // we got a connection request!
809 tcp::endpoint endp = s->remote_endpoint(ec);
811 if (ec)
813 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
814 (*m_logger) << endp << " <== INCOMING CONNECTION FAILED, could "
815 "not retrieve remote endpoint " << ec.message() << "\n";
816 #endif
817 return;
820 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
821 (*m_logger) << time_now_string() << " <== INCOMING CONNECTION " << endp << "\n";
822 #endif
824 // local addresses do not count, since it's likely
825 // coming from our own client through local service discovery
826 // and it does not reflect whether or not a router is open
827 // for incoming connections or not.
828 if (!is_local(endp.address()))
829 m_incoming_connection = true;
831 if (m_ip_filter.access(endp.address()) & ip_filter::blocked)
833 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
834 (*m_logger) << "filtered blocked ip\n";
835 #endif
836 if (m_alerts.should_post<peer_blocked_alert>())
837 m_alerts.post_alert(peer_blocked_alert(endp.address()));
838 return;
841 // don't allow more connections than the max setting
842 if (num_connections() >= max_connections())
844 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
845 (*m_logger) << "number of connections limit exceeded (conns: "
846 << num_connections() << ", limit: " << max_connections()
847 << "), connection rejected\n";
848 #endif
849 return;
852 // check if we have any active torrents
853 // if we don't reject the connection
854 if (m_torrents.empty())
856 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
857 (*m_logger) << " There are no torrents, disconnect\n";
858 #endif
859 return;
862 bool has_active_torrent = false;
863 for (torrent_map::iterator i = m_torrents.begin()
864 , end(m_torrents.end()); i != end; ++i)
866 if (!i->second->is_paused())
868 has_active_torrent = true;
869 break;
872 if (!has_active_torrent)
874 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
875 (*m_logger) << " There are no _active_ torrents, disconnect\n";
876 #endif
877 return;
880 boost::intrusive_ptr<peer_connection> c(
881 new bt_peer_connection(*this, s, endp, 0));
882 #ifndef NDEBUG
883 c->m_in_constructor = false;
884 #endif
886 if (!c->is_disconnecting())
888 m_connections.insert(c);
889 c->start();
892 void session_impl::close_connection(peer_connection const* p
893 , char const* message)
895 mutex_t::scoped_lock l(m_mutex);
897 // too expensive
898 // INVARIANT_CHECK;
900 #ifndef NDEBUG
901 // for (aux::session_impl::torrent_map::const_iterator i = m_torrents.begin()
902 // , end(m_torrents.end()); i != end; ++i)
903 // TORRENT_ASSERT(!i->second->has_peer((peer_connection*)p));
904 #endif
906 #if defined(TORRENT_LOGGING)
907 (*m_logger) << time_now_string() << " CLOSING CONNECTION "
908 << p->remote() << " : " << message << "\n";
909 #endif
911 TORRENT_ASSERT(p->is_disconnecting());
913 if (!p->is_choked()) --m_num_unchoked;
914 // connection_map::iterator i = std::lower_bound(m_connections.begin(), m_connections.end()
915 // , p, bind(&boost::intrusive_ptr<peer_connection>::get, _1) < p);
916 // if (i->get() != p) i == m_connections.end();
917 connection_map::iterator i = std::find_if(m_connections.begin(), m_connections.end()
918 , bind(&boost::intrusive_ptr<peer_connection>::get, _1) == p);
919 if (i != m_connections.end()) m_connections.erase(i);
922 void session_impl::set_peer_id(peer_id const& id)
924 mutex_t::scoped_lock l(m_mutex);
925 m_peer_id = id;
928 void session_impl::set_key(int key)
930 mutex_t::scoped_lock l(m_mutex);
931 m_key = key;
934 int session_impl::next_port()
936 std::pair<int, int> const& out_ports = m_settings.outgoing_ports;
937 if (m_next_port < out_ports.first || m_next_port > out_ports.second)
938 m_next_port = out_ports.first;
940 int port = m_next_port;
941 ++m_next_port;
942 if (m_next_port > out_ports.second) m_next_port = out_ports.first;
943 #if defined TORRENT_LOGGING
944 (*m_logger) << time_now_string() << " *** BINDING OUTGOING CONNECTION [ "
945 "port: " << port << " ]\n";
946 #endif
947 return port;
950 void session_impl::second_tick(error_code const& e)
952 session_impl::mutex_t::scoped_lock l(m_mutex);
954 // too expensive
955 // INVARIANT_CHECK;
957 if (m_abort) return;
959 if (e)
961 #if defined TORRENT_LOGGING
962 (*m_logger) << "*** SECOND TIMER FAILED " << e.message() << "\n";
963 #endif
964 ::abort();
965 return;
968 float tick_interval = total_microseconds(time_now() - m_last_tick) / 1000000.f;
969 m_last_tick = time_now();
971 error_code ec;
972 m_timer.expires_from_now(seconds(1), ec);
973 m_timer.async_wait(
974 bind(&session_impl::second_tick, this, _1));
976 #ifdef TORRENT_STATS
977 ++m_second_counter;
978 int downloading_torrents = 0;
979 int seeding_torrents = 0;
980 for (torrent_map::iterator i = m_torrents.begin()
981 , end(m_torrents.end()); i != end; ++i)
983 if (i->second->is_seed())
984 ++seeding_torrents;
985 else
986 ++downloading_torrents;
988 int num_complete_connections = 0;
989 int num_half_open = 0;
990 for (connection_map::iterator i = m_connections.begin()
991 , end(m_connections.end()); i != end; ++i)
993 if ((*i)->is_connecting())
994 ++num_half_open;
995 else
996 ++num_complete_connections;
999 m_stats_logger
1000 << m_second_counter << "\t"
1001 << m_stat.upload_rate() << "\t"
1002 << m_stat.download_rate() << "\t"
1003 << downloading_torrents << "\t"
1004 << seeding_torrents << "\t"
1005 << num_complete_connections << "\t"
1006 << num_half_open << "\t"
1007 << m_disk_thread.disk_allocations() << "\t"
1008 << std::endl;
1009 #endif
1011 // --------------------------------------------------------------
1012 // check for incoming connections that might have timed out
1013 // --------------------------------------------------------------
1015 for (connection_map::iterator i = m_connections.begin();
1016 i != m_connections.end();)
1018 peer_connection* p = (*i).get();
1019 ++i;
1020 // ignore connections that already have a torrent, since they
1021 // are ticket through the torrents' second_ticket
1022 if (!p->associated_torrent().expired()) continue;
1023 if (m_last_tick - p->connected_time() > seconds(m_settings.handshake_timeout))
1024 p->disconnect("timeout: incoming connection");
1027 // --------------------------------------------------------------
1028 // second_tick every torrent
1029 // --------------------------------------------------------------
1031 int congested_torrents = 0;
1032 int uncongested_torrents = 0;
1034 // count the number of seeding torrents vs. downloading
1035 // torrents we are running
1036 int num_seeds = 0;
1037 int num_downloads = 0;
1039 // count the number of peers of downloading torrents
1040 int num_downloads_peers = 0;
1042 torrent_map::iterator least_recently_scraped = m_torrents.begin();
1043 int num_paused_auto_managed = 0;
1045 // check each torrent for tracker updates
1046 // TODO: do this in a timer-event in each torrent instead
1047 for (torrent_map::iterator i = m_torrents.begin();
1048 i != m_torrents.end();)
1050 torrent& t = *i->second;
1051 TORRENT_ASSERT(!t.is_aborted());
1052 if (t.bandwidth_queue_size(peer_connection::upload_channel))
1053 ++congested_torrents;
1054 else
1055 ++uncongested_torrents;
1057 if (t.is_auto_managed() && t.is_paused() && !t.has_error())
1059 ++num_paused_auto_managed;
1060 if (!least_recently_scraped->second->is_auto_managed()
1061 || !least_recently_scraped->second->is_paused()
1062 || least_recently_scraped->second->last_scrape() > t.last_scrape())
1064 least_recently_scraped = i;
1068 if (t.is_finished())
1070 ++num_seeds;
1072 else
1074 ++num_downloads;
1075 num_downloads_peers += t.num_peers();
1078 t.second_tick(m_stat, tick_interval);
1079 ++i;
1082 // drain the IP overhead from the bandwidth limiters
1083 m_download_channel.drain(m_stat.download_ip_overhead());
1084 m_upload_channel.drain(m_stat.upload_ip_overhead());
1086 m_stat.second_tick(tick_interval);
1088 // --------------------------------------------------------------
1089 // scrape paused torrents that are auto managed
1090 // --------------------------------------------------------------
1091 if (!is_paused())
1093 --m_auto_scrape_time_scaler;
1094 if (m_auto_scrape_time_scaler <= 0)
1096 m_auto_scrape_time_scaler = m_settings.auto_scrape_interval
1097 / (std::max)(1, num_paused_auto_managed);
1098 if (m_auto_scrape_time_scaler < m_settings.auto_scrape_min_interval)
1099 m_auto_scrape_time_scaler = m_settings.auto_scrape_min_interval;
1101 if (least_recently_scraped != m_torrents.end())
1103 least_recently_scraped->second->scrape_tracker();
1108 // --------------------------------------------------------------
1109 // connect new peers
1110 // --------------------------------------------------------------
1112 // let torrents connect to peers if they want to
1113 // if there are any torrents and any free slots
1115 // this loop will "hand out" max(connection_speed
1116 // , half_open.free_slots()) to the torrents, in a
1117 // round robin fashion, so that every torrent is
1118 // equallt likely to connect to a peer
1120 int free_slots = m_half_open.free_slots();
1121 if (!m_torrents.empty()
1122 && free_slots > -m_half_open.limit()
1123 && num_connections() < m_max_connections)
1125 // this is the maximum number of connections we will
1126 // attempt this tick
1127 int max_connections = m_settings.connection_speed;
1128 int average_peers = 0;
1129 if (num_downloads > 0)
1130 average_peers = num_downloads_peers / num_downloads;
1132 torrent_map::iterator i = m_torrents.begin();
1133 if (m_next_connect_torrent < int(m_torrents.size()))
1134 std::advance(i, m_next_connect_torrent);
1135 else
1136 m_next_connect_torrent = 0;
1137 int steps_since_last_connect = 0;
1138 int num_torrents = int(m_torrents.size());
1139 for (;;)
1141 torrent& t = *i->second;
1142 if (t.want_more_peers())
1144 int connect_points = 100;
1145 // have a bias against torrents with more peers
1146 // than average
1147 if (!t.is_seed() && t.num_peers() > average_peers)
1148 connect_points /= 2;
1149 // if this is a seed and there is a torrent that
1150 // is downloading, lower the rate at which this
1151 // torrent gets connections.
1152 // dividing by num_seeds will have the effect
1153 // that all seed will get as many connections
1154 // together, as a single downloading torrent.
1155 if (t.is_seed() && num_downloads > 0)
1156 connect_points /= num_seeds + 1;
1157 if (connect_points <= 0) connect_points = 1;
1158 t.give_connect_points(connect_points);
1161 if (t.try_connect_peer())
1163 --max_connections;
1164 --free_slots;
1165 steps_since_last_connect = 0;
1168 catch (std::bad_alloc&)
1170 // we ran out of memory trying to connect to a peer
1171 // lower the global limit to the number of peers
1172 // we already have
1173 m_max_connections = num_connections();
1174 if (m_max_connections < 2) m_max_connections = 2;
1177 ++m_next_connect_torrent;
1178 ++steps_since_last_connect;
1179 ++i;
1180 if (i == m_torrents.end())
1182 TORRENT_ASSERT(m_next_connect_torrent == num_torrents);
1183 i = m_torrents.begin();
1184 m_next_connect_torrent = 0;
1186 // if we have gone two whole loops without
1187 // handing out a single connection, break
1188 if (steps_since_last_connect > num_torrents * 2) break;
1189 // if there are no more free connection slots, abort
1190 if (free_slots <= -m_half_open.limit()) break;
1191 // if we should not make any more connections
1192 // attempts this tick, abort
1193 if (max_connections == 0) break;
1194 // maintain the global limit on number of connections
1195 if (num_connections() >= m_max_connections) break;
1199 // --------------------------------------------------------------
1200 // auto managed torrent
1201 // --------------------------------------------------------------
1202 m_auto_manage_time_scaler--;
1203 if (m_auto_manage_time_scaler <= 0)
1205 m_auto_manage_time_scaler = settings().auto_manage_interval;
1206 recalculate_auto_managed_torrents();
1209 // --------------------------------------------------------------
1210 // unchoke set and optimistic unchoke calculations
1211 // --------------------------------------------------------------
1212 m_unchoke_time_scaler--;
1213 if (m_unchoke_time_scaler <= 0 && !m_connections.empty())
1215 m_unchoke_time_scaler = settings().unchoke_interval;
1216 recalculate_unchoke_slots(congested_torrents
1217 , uncongested_torrents);
1220 // --------------------------------------------------------------
1221 // disconnect peers when we have too many
1222 // --------------------------------------------------------------
1223 --m_disconnect_time_scaler;
1224 if (m_disconnect_time_scaler <= 0)
1226 m_disconnect_time_scaler = 90;
1228 // every 90 seconds, disconnect the worst peers
1229 // if we have reached the connection limit
1230 if (num_connections() >= max_connections() * m_settings.peer_turnover_cutoff
1231 && !m_torrents.empty())
1233 torrent_map::iterator i = std::max_element(m_torrents.begin(), m_torrents.end()
1234 , bind(&torrent::num_peers, bind(&torrent_map::value_type::second, _1))
1235 < bind(&torrent::num_peers, bind(&torrent_map::value_type::second, _2)));
1237 TORRENT_ASSERT(i != m_torrents.end());
1238 int peers_to_disconnect = (std::min)((std::max)(int(i->second->num_peers()
1239 * m_settings.peer_turnover), 1)
1240 , i->second->get_policy().num_connect_candidates());
1241 i->second->disconnect_peers(peers_to_disconnect);
1243 else
1245 // if we haven't reached the global max. see if any torrent
1246 // has reached its local limit
1247 for (torrent_map::iterator i = m_torrents.begin()
1248 , end(m_torrents.end()); i != end; ++i)
1250 boost::shared_ptr<torrent> t = i->second;
1251 if (t->num_peers() < t->max_connections() * m_settings.peer_turnover_cutoff)
1252 continue;
1254 int peers_to_disconnect = (std::min)((std::max)(int(i->second->num_peers()
1255 * m_settings.peer_turnover), 1)
1256 , i->second->get_policy().num_connect_candidates());
1257 t->disconnect_peers(peers_to_disconnect);
1263 namespace
1265 bool is_active(torrent* t, session_settings const& s)
1267 return !(s.dont_count_slow_torrents
1268 && t->statistics().upload_payload_rate() == 0.f
1269 && t->statistics().download_payload_rate() == 0.f);
1273 void session_impl::recalculate_auto_managed_torrents()
1275 // these vectors are filled with auto managed torrents
1276 std::vector<torrent*> downloaders;
1277 downloaders.reserve(m_torrents.size());
1278 std::vector<torrent*> seeds;
1279 seeds.reserve(m_torrents.size());
1281 // these counters are set to the number of torrents
1282 // of each kind we're allowed to have active
1283 int num_downloaders = settings().active_downloads;
1284 int num_seeds = settings().active_seeds;
1285 int hard_limit = settings().active_limit;
1287 if (num_downloaders == -1)
1288 num_downloaders = (std::numeric_limits<int>::max)();
1289 if (num_seeds == -1)
1290 num_seeds = (std::numeric_limits<int>::max)();
1291 if (hard_limit == -1)
1292 hard_limit = (std::numeric_limits<int>::max)();
1294 for (torrent_map::iterator i = m_torrents.begin()
1295 , end(m_torrents.end()); i != end; ++i)
1297 torrent* t = i->second.get();
1298 TORRENT_ASSERT(t);
1299 if (t->is_auto_managed() && !t->has_error())
1301 // this torrent is auto managed, add it to
1302 // the list (depending on if it's a seed or not)
1303 if (t->is_finished())
1304 seeds.push_back(t);
1305 else
1306 downloaders.push_back(t);
1308 else if (!t->is_paused())
1310 --hard_limit;
1311 if (is_active(t, settings()))
1313 // this is not an auto managed torrent,
1314 // if it's running and active, decrease the
1315 // counters.
1316 --num_downloaders;
1317 --num_seeds;
1322 bool handled_by_extension = false;
1324 #ifndef TORRENT_DISABLE_EXTENSIONS
1325 // TODO: allow extensions to sort torrents for queuing
1326 #endif
1328 if (!handled_by_extension)
1330 std::sort(downloaders.begin(), downloaders.end()
1331 , bind(&torrent::sequence_number, _1) < bind(&torrent::sequence_number, _2));
1333 std::sort(seeds.begin(), seeds.end()
1334 , bind(&torrent::seed_rank, _1, boost::ref(m_settings))
1335 > bind(&torrent::seed_rank, _2, boost::ref(m_settings)));
1338 for (std::vector<torrent*>::iterator i = downloaders.begin()
1339 , end(downloaders.end()); i != end; ++i)
1341 torrent* t = *i;
1342 if (!t->is_paused() && !is_active(t, settings()) && hard_limit > 0)
1344 --hard_limit;
1345 continue;
1348 if (num_downloaders > 0 && hard_limit > 0)
1350 --hard_limit;
1351 if (t->state() != torrent_status::queued_for_checking
1352 && t->state() != torrent_status::checking_files)
1354 --num_downloaders;
1355 if (t->is_paused()) t->resume();
1358 else
1360 if (!t->is_paused()) t->pause();
1364 for (std::vector<torrent*>::iterator i = seeds.begin()
1365 , end(seeds.end()); i != end; ++i)
1367 torrent* t = *i;
1368 if (!t->is_paused() && !is_active(t, settings()) && hard_limit > 0)
1370 --hard_limit;
1371 continue;
1374 if (num_seeds > 0 && hard_limit > 0)
1376 --hard_limit;
1377 --num_seeds;
1378 if (t->is_paused()) t->resume();
1380 else
1382 if (!t->is_paused()) t->pause();
1387 void session_impl::recalculate_unchoke_slots(int congested_torrents
1388 , int uncongested_torrents)
1390 std::vector<peer_connection*> peers;
1391 for (connection_map::iterator i = m_connections.begin()
1392 , end(m_connections.end()); i != end; ++i)
1394 peer_connection* p = i->get();
1395 torrent* t = p->associated_torrent().lock().get();
1396 if (!p->peer_info_struct()
1397 || t == 0
1398 || !p->is_peer_interested()
1399 || p->is_disconnecting()
1400 || p->is_connecting()
1401 || (p->share_diff() < -free_upload_amount
1402 && !t->is_seed()))
1404 if (!(*i)->is_choked() && t)
1406 policy::peer* pi = p->peer_info_struct();
1407 if (pi && pi->optimistically_unchoked)
1409 pi->optimistically_unchoked = false;
1410 // force a new optimistic unchoke
1411 m_optimistic_unchoke_time_scaler = 0;
1413 t->choke_peer(*(*i));
1415 continue;
1417 peers.push_back(i->get());
1420 // sorts the peers that are eligible for unchoke by download rate and secondary
1421 // by total upload. The reason for this is, if all torrents are being seeded,
1422 // the download rate will be 0, and the peers we have sent the least to should
1423 // be unchoked
1424 std::sort(peers.begin(), peers.end()
1425 , bind(&peer_connection::unchoke_compare, _1, _2));
1427 std::for_each(m_connections.begin(), m_connections.end()
1428 , bind(&peer_connection::reset_choke_counters, _1));
1430 // auto unchoke
1431 int upload_limit = m_bandwidth_manager[peer_connection::upload_channel]->throttle();
1432 if (m_settings.auto_upload_slots && upload_limit != bandwidth_limit::inf)
1434 // if our current upload rate is less than 90% of our
1435 // limit AND most torrents are not "congested", i.e.
1436 // they are not holding back because of a per-torrent
1437 // limit
1438 if (m_stat.upload_rate() < upload_limit * 0.9f
1439 && m_allowed_upload_slots <= m_num_unchoked + 1
1440 && congested_torrents < uncongested_torrents)
1442 ++m_allowed_upload_slots;
1444 else if (m_upload_channel.queue_size() > 1
1445 && m_allowed_upload_slots > m_max_uploads)
1447 --m_allowed_upload_slots;
1451 // reserve one upload slot for optimistic unchokes
1452 int unchoke_set_size = m_allowed_upload_slots - 1;
1454 m_num_unchoked = 0;
1455 // go through all the peers and unchoke the first ones and choke
1456 // all the other ones.
1457 for (std::vector<peer_connection*>::iterator i = peers.begin()
1458 , end(peers.end()); i != end; ++i)
1460 peer_connection* p = *i;
1461 TORRENT_ASSERT(p);
1462 torrent* t = p->associated_torrent().lock().get();
1463 TORRENT_ASSERT(t);
1464 if (unchoke_set_size > 0)
1466 if (p->is_choked())
1468 if (!t->unchoke_peer(*p))
1469 continue;
1472 --unchoke_set_size;
1473 ++m_num_unchoked;
1475 TORRENT_ASSERT(p->peer_info_struct());
1476 if (p->peer_info_struct()->optimistically_unchoked)
1478 // force a new optimistic unchoke
1479 m_optimistic_unchoke_time_scaler = 0;
1480 p->peer_info_struct()->optimistically_unchoked = false;
1483 else
1485 TORRENT_ASSERT(p->peer_info_struct());
1486 if (!p->is_choked() && !p->peer_info_struct()->optimistically_unchoked)
1487 t->choke_peer(*p);
1488 if (!p->is_choked())
1489 ++m_num_unchoked;
1493 m_optimistic_unchoke_time_scaler--;
1494 if (m_optimistic_unchoke_time_scaler <= 0)
1496 m_optimistic_unchoke_time_scaler
1497 = settings().optimistic_unchoke_multiplier;
1499 // find the peer that has been waiting the longest to be optimistically
1500 // unchoked
1501 connection_map::iterator current_optimistic_unchoke = m_connections.end();
1502 connection_map::iterator optimistic_unchoke_candidate = m_connections.end();
1503 ptime last_unchoke = max_time();
1505 for (connection_map::iterator i = m_connections.begin()
1506 , end(m_connections.end()); i != end; ++i)
1508 peer_connection* p = i->get();
1509 TORRENT_ASSERT(p);
1510 policy::peer* pi = p->peer_info_struct();
1511 if (!pi) continue;
1512 torrent* t = p->associated_torrent().lock().get();
1513 if (!t) continue;
1515 if (pi->optimistically_unchoked)
1517 TORRENT_ASSERT(!p->is_choked());
1518 TORRENT_ASSERT(current_optimistic_unchoke == m_connections.end());
1519 current_optimistic_unchoke = i;
1522 if (pi->last_optimistically_unchoked < last_unchoke
1523 && !p->is_connecting()
1524 && !p->is_disconnecting()
1525 && p->is_peer_interested()
1526 && t->free_upload_slots()
1527 && p->is_choked())
1529 last_unchoke = pi->last_optimistically_unchoked;
1530 optimistic_unchoke_candidate = i;
1534 if (optimistic_unchoke_candidate != m_connections.end()
1535 && optimistic_unchoke_candidate != current_optimistic_unchoke)
1537 if (current_optimistic_unchoke != m_connections.end())
1539 torrent* t = (*current_optimistic_unchoke)->associated_torrent().lock().get();
1540 TORRENT_ASSERT(t);
1541 (*current_optimistic_unchoke)->peer_info_struct()->optimistically_unchoked = false;
1542 t->choke_peer(*current_optimistic_unchoke->get());
1544 else
1546 ++m_num_unchoked;
1549 torrent* t = (*optimistic_unchoke_candidate)->associated_torrent().lock().get();
1550 TORRENT_ASSERT(t);
1551 bool ret = t->unchoke_peer(*optimistic_unchoke_candidate->get());
1552 TORRENT_ASSERT(ret);
1553 (*optimistic_unchoke_candidate)->peer_info_struct()->optimistically_unchoked = true;
1558 void session_impl::operator()()
1560 eh_initializer();
1563 session_impl::mutex_t::scoped_lock l(m_mutex);
1564 if (m_listen_interface.port() != 0) open_listen_port();
1567 ptime timer = time_now();
1571 #ifndef BOOST_NO_EXCEPTIONS
1574 #endif
1575 m_io_service.run();
1576 TORRENT_ASSERT(m_abort == true);
1577 #ifndef BOOST_NO_EXCEPTIONS
1579 catch (std::exception& e)
1581 #ifndef NDEBUG
1582 std::cerr << e.what() << "\n";
1583 std::string err = e.what();
1584 #endif
1585 TORRENT_ASSERT(false);
1587 #endif
1589 while (!m_abort);
1591 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1592 (*m_logger) << time_now_string() << " locking mutex\n";
1593 #endif
1595 session_impl::mutex_t::scoped_lock l(m_mutex);
1597 #ifndef NDEBUG
1598 for (torrent_map::iterator i = m_torrents.begin();
1599 i != m_torrents.end(); ++i)
1601 TORRENT_ASSERT(i->second->num_peers() == 0);
1603 #endif
1605 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1606 (*m_logger) << time_now_string() << " cleaning up torrents\n";
1607 #endif
1608 m_torrents.clear();
1610 TORRENT_ASSERT(m_torrents.empty());
1611 TORRENT_ASSERT(m_connections.empty());
1615 // the return value from this function is valid only as long as the
1616 // session is locked!
1617 boost::weak_ptr<torrent> session_impl::find_torrent(sha1_hash const& info_hash)
1619 std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator i
1620 = m_torrents.find(info_hash);
1621 #ifndef NDEBUG
1622 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::iterator j
1623 = m_torrents.begin(); j != m_torrents.end(); ++j)
1625 torrent* p = boost::get_pointer(j->second);
1626 TORRENT_ASSERT(p);
1628 #endif
1629 if (i != m_torrents.end()) return i->second;
1630 return boost::weak_ptr<torrent>();
1633 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
1634 boost::shared_ptr<logger> session_impl::create_log(std::string const& name
1635 , int instance, bool append)
1637 // current options are file_logger, cout_logger and null_logger
1638 return boost::shared_ptr<logger>(new logger(m_logpath, name + ".log", instance, append));
1640 #endif
1642 std::vector<torrent_handle> session_impl::get_torrents()
1644 mutex_t::scoped_lock l(m_mutex);
1645 std::vector<torrent_handle> ret;
1647 for (session_impl::torrent_map::iterator i
1648 = m_torrents.begin(), end(m_torrents.end());
1649 i != end; ++i)
1651 if (i->second->is_aborted()) continue;
1652 ret.push_back(torrent_handle(i->second));
1654 return ret;
1657 torrent_handle session_impl::find_torrent_handle(sha1_hash const& info_hash)
1659 return torrent_handle(find_torrent(info_hash));
1662 torrent_handle session_impl::add_torrent(add_torrent_params const& params)
1664 TORRENT_ASSERT(!params.save_path.empty());
1666 if (params.ti && params.ti->files().num_files() == 0)
1668 #ifndef BOOST_NO_EXCEPTIONS
1669 throw std::runtime_error("no files in torrent");
1670 #else
1671 return torrent_handle();
1672 #endif
1675 // lock the session and the checker thread (the order is important!)
1676 mutex_t::scoped_lock l(m_mutex);
1678 // INVARIANT_CHECK;
1680 if (is_aborted())
1682 #ifndef BOOST_NO_EXCEPTIONS
1683 throw std::runtime_error("session is closing");
1684 #else
1685 return torrent_handle();
1686 #endif
1689 // figure out the info hash of the torrent
1690 sha1_hash const* ih = 0;
1691 if (params.ti) ih = &params.ti->info_hash();
1692 else ih = &params.info_hash;
1694 // is the torrent already active?
1695 boost::shared_ptr<torrent> torrent_ptr = find_torrent(*ih).lock();
1696 if (torrent_ptr)
1698 if (!params.duplicate_is_error)
1699 return torrent_handle(torrent_ptr);
1701 #ifndef BOOST_NO_EXCEPTIONS
1702 throw duplicate_torrent();
1703 #else
1704 return torrent_handle();
1705 #endif
1708 int queue_pos = 0;
1709 for (torrent_map::const_iterator i = m_torrents.begin()
1710 , end(m_torrents.end()); i != end; ++i)
1712 int pos = i->second->queue_position();
1713 if (pos >= queue_pos) queue_pos = pos + 1;
1716 // create the torrent and the data associated with
1717 // the checker thread and store it before starting
1718 // the thread
1719 if (params.ti)
1721 torrent_ptr.reset(new torrent(*this, params.ti, params.save_path
1722 , m_listen_interface, params.storage_mode, 16 * 1024
1723 , params.storage, params.paused, params.resume_data
1724 , queue_pos, params.auto_managed));
1726 else
1728 torrent_ptr.reset(new torrent(*this, params.tracker_url, *ih, params.name
1729 , params.save_path, m_listen_interface, params.storage_mode, 16 * 1024
1730 , params.storage, params.paused, params.resume_data
1731 , queue_pos, params.auto_managed));
1733 torrent_ptr->start();
1735 #ifndef TORRENT_DISABLE_EXTENSIONS
1736 for (extension_list_t::iterator i = m_extensions.begin()
1737 , end(m_extensions.end()); i != end; ++i)
1739 boost::shared_ptr<torrent_plugin> tp((*i)(torrent_ptr.get(), params.userdata));
1740 if (tp) torrent_ptr->add_extension(tp);
1742 #endif
1744 #ifndef TORRENT_DISABLE_DHT
1745 if (m_dht && params.ti)
1747 torrent_info::nodes_t const& nodes = params.ti->nodes();
1748 std::for_each(nodes.begin(), nodes.end(), bind(
1749 (void(dht::dht_tracker::*)(std::pair<std::string, int> const&))
1750 &dht::dht_tracker::add_node
1751 , boost::ref(m_dht), _1));
1753 #endif
1755 m_torrents.insert(std::make_pair(*ih, torrent_ptr));
1757 // if this is an auto managed torrent, force a recalculation
1758 // of which torrents to have active
1759 if (params.auto_managed && m_auto_manage_time_scaler > 2)
1760 m_auto_manage_time_scaler = 2;
1762 return torrent_handle(torrent_ptr);
1765 void session_impl::check_torrent(boost::shared_ptr<torrent> const& t)
1767 if (m_abort) return;
1768 if (m_queued_for_checking.empty()) t->start_checking();
1769 m_queued_for_checking.push_back(t);
1772 void session_impl::done_checking(boost::shared_ptr<torrent> const& t)
1774 if (m_queued_for_checking.empty()) return;
1775 check_queue_t::iterator next_check = m_queued_for_checking.begin();
1776 check_queue_t::iterator done = m_queued_for_checking.end();
1777 for (check_queue_t::iterator i = m_queued_for_checking.begin()
1778 , end(m_queued_for_checking.end()); i != end; ++i)
1780 if (*i == t) done = i;
1781 if (next_check == done || (*next_check)->queue_position() > (*i)->queue_position())
1782 next_check = i;
1784 if (next_check != done) (*next_check)->start_checking();
1785 m_queued_for_checking.erase(done);
1788 void session_impl::remove_torrent(const torrent_handle& h, int options)
1790 boost::shared_ptr<torrent> tptr = h.m_torrent.lock();
1791 if (!tptr)
1792 #ifdef BOOST_NO_EXCEPTIONS
1793 return;
1794 #else
1795 throw invalid_handle();
1796 #endif
1798 mutex_t::scoped_lock l(m_mutex);
1800 INVARIANT_CHECK;
1802 session_impl::torrent_map::iterator i =
1803 m_torrents.find(tptr->torrent_file().info_hash());
1805 if (i != m_torrents.end())
1807 torrent& t = *i->second;
1808 if (options & session::delete_files)
1809 t.delete_files();
1810 t.abort();
1812 #ifndef NDEBUG
1813 sha1_hash i_hash = t.torrent_file().info_hash();
1814 #endif
1815 i->second->set_queue_position(-1);
1816 m_torrents.erase(i);
1817 TORRENT_ASSERT(m_torrents.find(i_hash) == m_torrents.end());
1818 return;
1822 bool session_impl::listen_on(
1823 std::pair<int, int> const& port_range
1824 , const char* net_interface)
1826 session_impl::mutex_t::scoped_lock l(m_mutex);
1828 INVARIANT_CHECK;
1830 tcp::endpoint new_interface;
1831 if (net_interface && std::strlen(net_interface) > 0)
1832 new_interface = tcp::endpoint(address::from_string(net_interface), port_range.first);
1833 else
1834 new_interface = tcp::endpoint(address_v4::any(), port_range.first);
1836 m_listen_port_retries = port_range.second - port_range.first;
1838 // if the interface is the same and the socket is open
1839 // don't do anything
1840 if (new_interface == m_listen_interface
1841 && !m_listen_sockets.empty()) return true;
1843 m_listen_interface = new_interface;
1845 open_listen_port();
1847 bool new_listen_address = m_listen_interface.address() != new_interface.address();
1849 #ifndef TORRENT_DISABLE_DHT
1850 if ((new_listen_address || m_dht_same_port) && m_dht)
1852 if (m_dht_same_port)
1853 m_dht_settings.service_port = new_interface.port();
1854 // the listen interface changed, rebind the dht listen socket as well
1855 m_dht_socket.bind(m_dht_settings.service_port);
1856 if (m_natpmp.get())
1858 if (m_udp_mapping[0] != -1) m_natpmp->delete_mapping(m_udp_mapping[0]);
1859 m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::tcp
1860 , m_dht_settings.service_port
1861 , m_dht_settings.service_port);
1863 if (m_upnp.get())
1865 if (m_udp_mapping[1] != -1) m_upnp->delete_mapping(m_udp_mapping[1]);
1866 m_udp_mapping[1] = m_upnp->add_mapping(upnp::tcp
1867 , m_dht_settings.service_port
1868 , m_dht_settings.service_port);
1871 #endif
1873 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
1874 m_logger = create_log("main_session", listen_port(), false);
1875 (*m_logger) << time_now_string() << "\n";
1876 #endif
1878 return !m_listen_sockets.empty();
1881 unsigned short session_impl::listen_port() const
1883 mutex_t::scoped_lock l(m_mutex);
1884 if (m_listen_sockets.empty()) return 0;
1885 return m_listen_sockets.front().external_port;;
1888 void session_impl::announce_lsd(sha1_hash const& ih)
1890 mutex_t::scoped_lock l(m_mutex);
1891 // use internal listen port for local peers
1892 if (m_lsd.get())
1893 m_lsd->announce(ih, m_listen_interface.port());
1896 void session_impl::on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih)
1898 mutex_t::scoped_lock l(m_mutex);
1900 INVARIANT_CHECK;
1902 boost::shared_ptr<torrent> t = find_torrent(ih).lock();
1903 if (!t) return;
1904 // don't add peers from lsd to private torrents
1905 if (t->torrent_file().priv()) return;
1907 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1908 (*m_logger) << time_now_string()
1909 << ": added peer from local discovery: " << peer << "\n";
1910 #endif
1911 t->get_policy().peer_from_tracker(peer, peer_id(0), peer_info::lsd, 0);
1914 void session_impl::on_port_mapping(int mapping, int port
1915 , std::string const& errmsg, int map_transport)
1917 #ifndef TORRENT_DISABLE_DHT
1918 if (mapping == m_udp_mapping[map_transport] && port != 0)
1920 m_external_udp_port = port;
1921 m_dht_settings.service_port = port;
1922 if (m_alerts.should_post<portmap_alert>())
1923 m_alerts.post_alert(portmap_alert(mapping, port
1924 , map_transport));
1925 return;
1927 #endif
1929 if (mapping == m_tcp_mapping[map_transport] && port != 0)
1931 if (!m_listen_sockets.empty())
1932 m_listen_sockets.front().external_port = port;
1933 if (m_alerts.should_post<portmap_alert>())
1934 m_alerts.post_alert(portmap_alert(mapping, port
1935 , map_transport));
1936 return;
1939 if (!errmsg.empty())
1941 if (m_alerts.should_post<portmap_error_alert>())
1942 m_alerts.post_alert(portmap_error_alert(mapping
1943 , map_transport, errmsg));
1945 else
1947 if (m_alerts.should_post<portmap_alert>())
1948 m_alerts.post_alert(portmap_alert(mapping, port
1949 , map_transport));
1953 session_status session_impl::status() const
1955 mutex_t::scoped_lock l(m_mutex);
1957 // INVARIANT_CHECK;
1959 session_status s;
1961 s.num_peers = (int)m_connections.size();
1962 s.num_unchoked = m_num_unchoked;
1963 s.allowed_upload_slots = m_allowed_upload_slots;
1965 s.total_redundant_bytes = m_total_redundant_bytes;
1966 s.total_failed_bytes = m_total_failed_bytes;
1968 s.up_bandwidth_queue = m_upload_channel.queue_size();
1969 s.down_bandwidth_queue = m_download_channel.queue_size();
1971 s.has_incoming_connections = m_incoming_connection;
1973 s.download_rate = m_stat.download_rate();
1974 s.upload_rate = m_stat.upload_rate();
1976 s.payload_download_rate = m_stat.download_payload_rate();
1977 s.payload_upload_rate = m_stat.upload_payload_rate();
1979 s.total_download = m_stat.total_protocol_download()
1980 + m_stat.total_payload_download();
1982 s.total_upload = m_stat.total_protocol_upload()
1983 + m_stat.total_payload_upload();
1985 s.total_payload_download = m_stat.total_payload_download();
1986 s.total_payload_upload = m_stat.total_payload_upload();
1988 #ifndef TORRENT_DISABLE_DHT
1989 if (m_dht)
1991 m_dht->dht_status(s);
1993 else
1995 s.dht_nodes = 0;
1996 s.dht_node_cache = 0;
1997 s.dht_torrents = 0;
1998 s.dht_global_nodes = 0;
2000 #endif
2002 return s;
2005 #ifndef TORRENT_DISABLE_DHT
2007 void session_impl::start_dht(entry const& startup_state)
2009 mutex_t::scoped_lock l(m_mutex);
2011 INVARIANT_CHECK;
2013 if (m_dht)
2015 m_dht->stop();
2016 m_dht = 0;
2018 if (m_dht_settings.service_port == 0
2019 || m_dht_same_port)
2021 m_dht_same_port = true;
2022 // if you hit this assert you are trying to start the
2023 // DHT with the same port as the tcp listen port
2024 // (which is default) _before_ you have opened the
2025 // tcp listen port (so there is no configured port to use)
2026 // basically, make sure you call listen_on() before
2027 // start_dht(). See documentation for listen_on() for
2028 // more information.
2029 TORRENT_ASSERT(m_listen_interface.port() > 0);
2030 m_dht_settings.service_port = m_listen_interface.port();
2032 m_external_udp_port = m_dht_settings.service_port;
2033 if (m_natpmp.get() && m_udp_mapping[0] == -1)
2035 m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp
2036 , m_dht_settings.service_port
2037 , m_dht_settings.service_port);
2039 if (m_upnp.get() && m_udp_mapping[1] == -1)
2041 m_udp_mapping[1] = m_upnp->add_mapping(upnp::udp
2042 , m_dht_settings.service_port
2043 , m_dht_settings.service_port);
2045 m_dht = new dht::dht_tracker(m_dht_socket, m_dht_settings, startup_state);
2046 if (!m_dht_socket.is_open() || m_dht_socket.local_port() != m_dht_settings.service_port)
2048 m_dht_socket.bind(m_dht_settings.service_port);
2052 void session_impl::stop_dht()
2054 mutex_t::scoped_lock l(m_mutex);
2055 if (!m_dht) return;
2056 m_dht->stop();
2057 m_dht = 0;
2060 void session_impl::set_dht_settings(dht_settings const& settings)
2062 mutex_t::scoped_lock l(m_mutex);
2063 // only change the dht listen port in case the settings
2064 // contains a vaiid port, and if it is different from
2065 // the current setting
2066 if (settings.service_port != 0)
2067 m_dht_same_port = false;
2068 else
2069 m_dht_same_port = true;
2070 if (!m_dht_same_port
2071 && settings.service_port != m_dht_settings.service_port
2072 && m_dht)
2074 m_dht_socket.bind(settings.service_port);
2076 if (m_natpmp.get())
2078 if (m_udp_mapping[0] != -1) m_upnp->delete_mapping(m_udp_mapping[0]);
2079 m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp
2080 , m_dht_settings.service_port
2081 , m_dht_settings.service_port);
2083 if (m_upnp.get())
2085 if (m_udp_mapping[1] != -1) m_upnp->delete_mapping(m_udp_mapping[1]);
2086 m_udp_mapping[1] = m_upnp->add_mapping(upnp::udp
2087 , m_dht_settings.service_port
2088 , m_dht_settings.service_port);
2090 m_external_udp_port = settings.service_port;
2092 m_dht_settings = settings;
2093 if (m_dht_same_port)
2094 m_dht_settings.service_port = m_listen_interface.port();
2097 entry session_impl::dht_state() const
2099 mutex_t::scoped_lock l(m_mutex);
2100 if (!m_dht) return entry();
2101 return m_dht->state();
2104 void session_impl::add_dht_node(std::pair<std::string, int> const& node)
2106 TORRENT_ASSERT(m_dht);
2107 mutex_t::scoped_lock l(m_mutex);
2108 m_dht->add_node(node);
2111 void session_impl::add_dht_router(std::pair<std::string, int> const& node)
2113 TORRENT_ASSERT(m_dht);
2114 mutex_t::scoped_lock l(m_mutex);
2115 m_dht->add_router_node(node);
2118 #endif
2120 #ifndef TORRENT_DISABLE_ENCRYPTION
2121 void session_impl::set_pe_settings(pe_settings const& settings)
2123 mutex_t::scoped_lock l(m_mutex);
2124 m_pe_settings = settings;
2126 #endif
2128 bool session_impl::is_listening() const
2130 mutex_t::scoped_lock l(m_mutex);
2131 return !m_listen_sockets.empty();
2134 session_impl::~session_impl()
2136 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2137 (*m_logger) << time_now_string() << "\n\n *** shutting down session *** \n\n";
2138 #endif
2139 abort();
2141 #ifndef TORRENT_DISABLE_GEO_IP
2142 if (m_asnum_db) GeoIP_delete(m_asnum_db);
2143 if (m_country_db) GeoIP_delete(m_country_db);
2144 #endif
2145 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2146 (*m_logger) << time_now_string() << " waiting for main thread\n";
2147 #endif
2148 m_thread->join();
2150 TORRENT_ASSERT(m_torrents.empty());
2152 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2153 (*m_logger) << time_now_string() << " waiting for disk io thread\n";
2154 #endif
2155 m_disk_thread.join();
2157 TORRENT_ASSERT(m_torrents.empty());
2158 TORRENT_ASSERT(m_connections.empty());
2159 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
2160 (*m_logger) << time_now_string() << " shutdown complete!\n";
2161 #endif
2164 void session_impl::set_max_uploads(int limit)
2166 TORRENT_ASSERT(limit > 0 || limit == -1);
2167 mutex_t::scoped_lock l(m_mutex);
2169 INVARIANT_CHECK;
2171 if (limit <= 0) limit = (std::numeric_limits<int>::max)();
2172 if (m_max_uploads == limit) return;
2173 m_max_uploads = limit;
2174 m_allowed_upload_slots = limit;
2177 void session_impl::set_max_connections(int limit)
2179 TORRENT_ASSERT(limit > 0 || limit == -1);
2180 mutex_t::scoped_lock l(m_mutex);
2182 INVARIANT_CHECK;
2184 if (limit <= 0)
2186 limit = (std::numeric_limits<int>::max)();
2187 #ifndef TORRENT_WINDOWS
2188 rlimit l;
2189 if (getrlimit(RLIMIT_NOFILE, &l) == 0
2190 && l.rlim_cur != RLIM_INFINITY)
2192 limit = l.rlim_cur - m_settings.file_pool_size;
2193 if (limit < 5) limit = 5;
2195 #endif
2197 m_max_connections = limit;
2200 void session_impl::set_max_half_open_connections(int limit)
2202 TORRENT_ASSERT(limit > 0 || limit == -1);
2203 mutex_t::scoped_lock l(m_mutex);
2205 INVARIANT_CHECK;
2207 if (limit <= 0) limit = (std::numeric_limits<int>::max)();
2208 m_half_open.limit(limit);
2211 void session_impl::set_download_rate_limit(int bytes_per_second)
2213 TORRENT_ASSERT(bytes_per_second > 0 || bytes_per_second == -1);
2214 mutex_t::scoped_lock l(m_mutex);
2216 INVARIANT_CHECK;
2218 if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf;
2219 m_bandwidth_manager[peer_connection::download_channel]->throttle(bytes_per_second);
2222 void session_impl::set_upload_rate_limit(int bytes_per_second)
2224 TORRENT_ASSERT(bytes_per_second > 0 || bytes_per_second == -1);
2225 mutex_t::scoped_lock l(m_mutex);
2227 INVARIANT_CHECK;
2229 if (bytes_per_second <= 0) bytes_per_second = bandwidth_limit::inf;
2230 m_bandwidth_manager[peer_connection::upload_channel]->throttle(bytes_per_second);
2233 std::auto_ptr<alert> session_impl::pop_alert()
2235 mutex_t::scoped_lock l(m_mutex);
2237 // too expensive
2238 // INVARIANT_CHECK;
2240 if (m_alerts.pending())
2241 return m_alerts.get();
2242 return std::auto_ptr<alert>(0);
2245 alert const* session_impl::wait_for_alert(time_duration max_wait)
2247 return m_alerts.wait_for_alert(max_wait);
2250 void session_impl::set_alert_mask(int m)
2252 mutex_t::scoped_lock l(m_mutex);
2253 m_alerts.set_alert_mask(m);
2256 int session_impl::upload_rate_limit() const
2258 mutex_t::scoped_lock l(m_mutex);
2260 INVARIANT_CHECK;
2262 int ret = m_bandwidth_manager[peer_connection::upload_channel]->throttle();
2263 return ret == (std::numeric_limits<int>::max)() ? -1 : ret;
2266 int session_impl::download_rate_limit() const
2268 mutex_t::scoped_lock l(m_mutex);
2269 int ret = m_bandwidth_manager[peer_connection::download_channel]->throttle();
2270 return ret == (std::numeric_limits<int>::max)() ? -1 : ret;
2273 void session_impl::start_lsd()
2275 mutex_t::scoped_lock l(m_mutex);
2277 INVARIANT_CHECK;
2279 if (m_lsd) return;
2281 m_lsd = new lsd(m_io_service
2282 , m_listen_interface.address()
2283 , bind(&session_impl::on_lsd_peer, this, _1, _2));
2286 natpmp* session_impl::start_natpmp()
2288 mutex_t::scoped_lock l(m_mutex);
2290 INVARIANT_CHECK;
2292 if (m_natpmp) return m_natpmp.get();
2294 m_natpmp = new natpmp(m_io_service
2295 , m_listen_interface.address()
2296 , bind(&session_impl::on_port_mapping
2297 , this, _1, _2, _3, 0));
2299 if (m_listen_interface.port() > 0)
2301 m_tcp_mapping[0] = m_natpmp->add_mapping(natpmp::tcp
2302 , m_listen_interface.port(), m_listen_interface.port());
2304 #ifndef TORRENT_DISABLE_DHT
2305 if (m_dht)
2306 m_udp_mapping[0] = m_natpmp->add_mapping(natpmp::udp
2307 , m_dht_settings.service_port
2308 , m_dht_settings.service_port);
2309 #endif
2310 return m_natpmp.get();
2313 upnp* session_impl::start_upnp()
2315 mutex_t::scoped_lock l(m_mutex);
2317 INVARIANT_CHECK;
2319 if (m_upnp) return m_upnp.get();
2321 m_upnp = new upnp(m_io_service, m_half_open
2322 , m_listen_interface.address()
2323 , m_settings.user_agent
2324 , bind(&session_impl::on_port_mapping
2325 , this, _1, _2, _3, 1)
2326 , m_settings.upnp_ignore_nonrouters);
2328 m_upnp->discover_device();
2329 if (m_listen_interface.port() > 0)
2331 m_tcp_mapping[1] = m_upnp->add_mapping(upnp::tcp
2332 , m_listen_interface.port(), m_listen_interface.port());
2334 #ifndef TORRENT_DISABLE_DHT
2335 if (m_dht)
2336 m_udp_mapping[1] = m_upnp->add_mapping(upnp::udp
2337 , m_dht_settings.service_port
2338 , m_dht_settings.service_port);
2339 #endif
2340 return m_upnp.get();
2343 void session_impl::stop_lsd()
2345 mutex_t::scoped_lock l(m_mutex);
2346 if (m_lsd.get())
2347 m_lsd->close();
2348 m_lsd = 0;
2351 void session_impl::stop_natpmp()
2353 mutex_t::scoped_lock l(m_mutex);
2354 if (m_natpmp.get())
2355 m_natpmp->close();
2356 m_natpmp = 0;
2359 void session_impl::stop_upnp()
2361 mutex_t::scoped_lock l(m_mutex);
2362 if (m_upnp.get())
2364 m_upnp->close();
2365 m_udp_mapping[1] = -1;
2366 m_tcp_mapping[1] = -1;
2368 m_upnp = 0;
2371 void session_impl::set_external_address(address const& ip)
2373 TORRENT_ASSERT(ip != address());
2375 if (m_external_address == ip) return;
2377 m_external_address = ip;
2378 if (m_alerts.should_post<external_ip_alert>())
2379 m_alerts.post_alert(external_ip_alert(ip));
2382 void session_impl::free_disk_buffer(char* buf)
2384 m_disk_thread.free_buffer(buf);
2387 char* session_impl::allocate_disk_buffer()
2389 return m_disk_thread.allocate_buffer();
2392 std::pair<char*, int> session_impl::allocate_buffer(int size)
2394 TORRENT_ASSERT(size > 0);
2395 int num_buffers = (size + send_buffer_size - 1) / send_buffer_size;
2396 TORRENT_ASSERT(num_buffers > 0);
2398 boost::mutex::scoped_lock l(m_send_buffer_mutex);
2399 #ifdef TORRENT_STATS
2400 TORRENT_ASSERT(m_buffer_allocations >= 0);
2401 m_buffer_allocations += num_buffers;
2402 m_buffer_usage_logger << log_time() << " protocol_buffer: "
2403 << (m_buffer_allocations * send_buffer_size) << std::endl;
2404 #endif
2405 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
2406 int num_bytes = num_buffers * send_buffer_size;
2407 return std::make_pair((char*)malloc(num_bytes), num_bytes);
2408 #else
2409 return std::make_pair((char*)m_send_buffers.ordered_malloc(num_buffers)
2410 , num_buffers * send_buffer_size);
2411 #endif
2414 void session_impl::free_buffer(char* buf, int size)
2416 TORRENT_ASSERT(size > 0);
2417 TORRENT_ASSERT(size % send_buffer_size == 0);
2418 int num_buffers = size / send_buffer_size;
2419 TORRENT_ASSERT(num_buffers > 0);
2421 boost::mutex::scoped_lock l(m_send_buffer_mutex);
2422 #ifdef TORRENT_STATS
2423 m_buffer_allocations -= num_buffers;
2424 TORRENT_ASSERT(m_buffer_allocations >= 0);
2425 m_buffer_usage_logger << log_time() << " protocol_buffer: "
2426 << (m_buffer_allocations * send_buffer_size) << std::endl;
2427 #endif
2428 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
2429 free(buf);
2430 #else
2431 m_send_buffers.ordered_free(buf, num_buffers);
2432 #endif
2435 #ifndef NDEBUG
2436 void session_impl::check_invariant() const
2438 std::set<int> unique;
2439 int total_downloaders = 0;
2440 for (torrent_map::const_iterator i = m_torrents.begin()
2441 , end(m_torrents.end()); i != end; ++i)
2443 int pos = i->second->queue_position();
2444 if (pos < 0)
2446 TORRENT_ASSERT(pos == -1);
2447 continue;
2449 ++total_downloaders;
2450 unique.insert(i->second->queue_position());
2452 TORRENT_ASSERT(unique.size() == total_downloaders);
2454 TORRENT_ASSERT(m_max_connections > 0);
2455 TORRENT_ASSERT(m_max_uploads > 0);
2456 TORRENT_ASSERT(m_allowed_upload_slots >= m_max_uploads);
2457 int unchokes = 0;
2458 int num_optimistic = 0;
2459 for (connection_map::const_iterator i = m_connections.begin();
2460 i != m_connections.end(); ++i)
2462 TORRENT_ASSERT(*i);
2463 boost::shared_ptr<torrent> t = (*i)->associated_torrent().lock();
2465 peer_connection* p = i->get();
2466 TORRENT_ASSERT(!p->is_disconnecting());
2467 if (!p->is_choked()) ++unchokes;
2468 if (p->peer_info_struct()
2469 && p->peer_info_struct()->optimistically_unchoked)
2471 ++num_optimistic;
2472 TORRENT_ASSERT(!p->is_choked());
2474 if (t && p->peer_info_struct())
2476 TORRENT_ASSERT(t->get_policy().has_connection(p));
2479 TORRENT_ASSERT(num_optimistic == 0 || num_optimistic == 1);
2480 if (m_num_unchoked != unchokes)
2482 TORRENT_ASSERT(false);
2484 for (std::map<sha1_hash, boost::shared_ptr<torrent> >::const_iterator j
2485 = m_torrents.begin(); j != m_torrents.end(); ++j)
2487 TORRENT_ASSERT(boost::get_pointer(j->second));
2490 #endif