fixes bug where priorities where lost when force-rechecking.
[libtorrent.git] / src / peer_connection.cpp
blobe72239abcc6c762bf2c18e42423ef493c94159d4
1 /*
3 Copyright (c) 2003, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <vector>
36 #include <iostream>
37 #include <iomanip>
38 #include <limits>
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
58 using boost::bind;
59 using boost::shared_ptr;
60 using libtorrent::aux::session_impl;
62 namespace libtorrent
64 // outbound connection
65 peer_connection::peer_connection(
66 session_impl& ses
67 , boost::weak_ptr<torrent> tor
68 , shared_ptr<socket_type> s
69 , tcp::endpoint const& endp
70 , policy::peer* peerinfo)
72 #ifndef NDEBUG
73 m_last_choke(time_now() - hours(1))
75 #endif
76 m_ses(ses)
77 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
85 , m_timeout_extend(0)
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
90 , m_free_upload(0)
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses, 0)
93 , m_socket(s)
94 , m_remote(endp)
95 , m_torrent(tor)
96 , m_num_pieces(0)
97 , m_timeout(m_ses.settings().peer_timeout)
98 , m_packet_size(0)
99 , m_recv_pos(0)
100 , m_disk_recv_buffer_size(0)
101 , m_reading_bytes(0)
102 , m_num_invalid_requests(0)
103 , m_priority(1)
104 , m_upload_limit(bandwidth_limit::inf)
105 , m_download_limit(bandwidth_limit::inf)
106 , m_peer_info(peerinfo)
107 , m_speed(slow)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
114 , m_rtt(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
118 , m_active(true)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
122 , m_choked(true)
123 , m_failed(false)
124 , m_ignore_bandwidth_limits(false)
125 , m_have_all(false)
126 , m_disconnecting(false)
127 , m_connecting(true)
128 , m_queued(true)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
131 , m_snubbed(false)
132 , m_bitfield_received(false)
133 #ifndef NDEBUG
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 #endif
138 m_channel_state[upload_channel] = peer_info::bw_idle;
139 m_channel_state[download_channel] = peer_info::bw_idle;
141 TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false);
142 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
143 std::fill(m_country, m_country + 2, 0);
144 #ifndef TORRENT_DISABLE_GEO_IP
145 if (m_ses.has_country_db())
147 char const *country = m_ses.country_for_ip(m_remote.address());
148 if (country != 0)
150 m_country[0] = country[0];
151 m_country[1] = country[1];
154 #endif
155 #endif
156 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
157 m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
158 + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
159 (*m_logger) << "*** OUTGOING CONNECTION\n";
160 #endif
161 #ifndef NDEBUG
162 piece_failed = false;
163 #endif
164 #ifndef TORRENT_DISABLE_GEO_IP
165 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
166 #endif
168 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
171 // incoming connection
172 peer_connection::peer_connection(
173 session_impl& ses
174 , shared_ptr<socket_type> s
175 , tcp::endpoint const& endp
176 , policy::peer* peerinfo)
178 #ifndef NDEBUG
179 m_last_choke(time_now() - hours(1))
181 #endif
182 m_ses(ses)
183 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
184 , m_last_piece(time_now())
185 , m_last_request(time_now())
186 , m_last_incoming_request(min_time())
187 , m_last_unchoke(min_time())
188 , m_last_receive(time_now())
189 , m_last_sent(time_now())
190 , m_requested(min_time())
191 , m_timeout_extend(0)
192 , m_remote_dl_update(time_now())
193 , m_connect(time_now())
194 , m_became_uninterested(time_now())
195 , m_became_uninteresting(time_now())
196 , m_free_upload(0)
197 , m_downloaded_at_last_unchoke(0)
198 , m_disk_recv_buffer(ses, 0)
199 , m_socket(s)
200 , m_remote(endp)
201 , m_num_pieces(0)
202 , m_timeout(m_ses.settings().peer_timeout)
203 , m_packet_size(0)
204 , m_recv_pos(0)
205 , m_disk_recv_buffer_size(0)
206 , m_reading_bytes(0)
207 , m_num_invalid_requests(0)
208 , m_priority(1)
209 , m_upload_limit(bandwidth_limit::inf)
210 , m_download_limit(bandwidth_limit::inf)
211 , m_peer_info(peerinfo)
212 , m_speed(slow)
213 , m_connection_ticket(-1)
214 , m_remote_bytes_dled(0)
215 , m_remote_dl_rate(0)
216 , m_outstanding_writing_bytes(0)
217 , m_download_rate_peak(0)
218 , m_upload_rate_peak(0)
219 , m_rtt(0)
220 , m_prefer_whole_pieces(0)
221 , m_desired_queue_size(2)
222 , m_fast_reconnect(false)
223 , m_active(false)
224 , m_peer_interested(false)
225 , m_peer_choked(true)
226 , m_interesting(false)
227 , m_choked(true)
228 , m_failed(false)
229 , m_ignore_bandwidth_limits(false)
230 , m_have_all(false)
231 , m_disconnecting(false)
232 , m_connecting(false)
233 , m_queued(false)
234 , m_request_large_blocks(false)
235 , m_upload_only(false)
236 , m_snubbed(false)
237 , m_bitfield_received(false)
238 #ifndef NDEBUG
239 , m_in_constructor(true)
240 , m_disconnect_started(false)
241 #endif
243 m_channel_state[upload_channel] = peer_info::bw_idle;
244 m_channel_state[download_channel] = peer_info::bw_idle;
246 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
247 std::fill(m_country, m_country + 2, 0);
248 #ifndef TORRENT_DISABLE_GEO_IP
249 if (m_ses.has_country_db())
251 char const *country = m_ses.country_for_ip(m_remote.address());
252 if (country != 0)
254 m_country[0] = country[0];
255 m_country[1] = country[1];
258 #endif
259 #endif
261 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
262 error_code ec;
263 TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec);
264 m_logger = m_ses.create_log(remote().address().to_string(ec) + "_"
265 + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
266 (*m_logger) << "*** INCOMING CONNECTION\n";
267 #endif
269 #ifndef TORRENT_DISABLE_GEO_IP
270 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
271 #endif
272 #ifndef NDEBUG
273 piece_failed = false;
274 #endif
275 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
278 bool peer_connection::unchoke_compare(boost::intrusive_ptr<peer_connection const> const& p) const
280 TORRENT_ASSERT(p);
281 peer_connection const& rhs = *p;
283 // first compare how many bytes they've sent us
284 size_type c1 = m_statistics.total_payload_download() - m_downloaded_at_last_unchoke;
285 size_type c2 = rhs.m_statistics.total_payload_download() - rhs.m_downloaded_at_last_unchoke;
286 if (c1 > c2) return true;
287 if (c1 < c2) return false;
289 // if they are equal, compare how much we have uploaded
290 if (m_peer_info) c1 = m_peer_info->total_upload();
291 else c1 = m_statistics.total_payload_upload();
292 if (rhs.m_peer_info) c2 = rhs.m_peer_info->total_upload();
293 else c2 = rhs.m_statistics.total_payload_upload();
295 return c1 < c2;
298 void peer_connection::reset_choke_counters()
300 m_downloaded_at_last_unchoke = m_statistics.total_payload_download();
303 void peer_connection::start()
305 TORRENT_ASSERT(m_peer_info == 0 || m_peer_info->connection == this);
306 boost::shared_ptr<torrent> t = m_torrent.lock();
308 if (!t)
310 tcp::socket::non_blocking_io ioc(true);
311 error_code ec;
312 m_socket->io_control(ioc, ec);
313 if (ec)
315 disconnect(ec.message().c_str());
316 return;
318 m_remote = m_socket->remote_endpoint(ec);
319 if (ec)
321 disconnect(ec.message().c_str());
322 return;
324 if (m_remote.address().is_v4())
325 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
327 else if (t->ready_for_connections())
329 init();
333 void peer_connection::update_interest()
335 boost::shared_ptr<torrent> t = m_torrent.lock();
336 TORRENT_ASSERT(t);
338 bool interested = false;
339 if (!t->is_finished())
341 piece_picker const& p = t->picker();
342 int num_pieces = p.num_pieces();
343 for (int j = 0; j != num_pieces; ++j)
345 if (!p.have_piece(j)
346 && t->piece_priority(j) > 0
347 && m_have_piece[j])
349 interested = true;
350 break;
356 if (!interested) send_not_interested();
357 else t->get_policy().peer_is_interesting(*this);
359 // may throw an asio error if socket has disconnected
360 catch (std::exception&) {}
362 TORRENT_ASSERT(is_interesting() == interested);
365 #ifndef TORRENT_DISABLE_EXTENSIONS
366 void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
368 m_extensions.push_back(ext);
370 #endif
372 void peer_connection::send_allowed_set()
374 INVARIANT_CHECK;
376 boost::shared_ptr<torrent> t = m_torrent.lock();
377 TORRENT_ASSERT(t);
379 int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
380 int num_pieces = t->torrent_file().num_pieces();
382 if (num_allowed_pieces >= num_pieces)
384 for (int i = 0; i < num_pieces; ++i)
386 #ifdef TORRENT_VERBOSE_LOGGING
387 (*m_logger) << time_now_string()
388 << " ==> ALLOWED_FAST [ " << i << " ]\n";
389 #endif
390 write_allow_fast(i);
391 m_accept_fast.insert(i);
393 return;
396 std::string x;
397 address const& addr = m_remote.address();
398 if (addr.is_v4())
400 address_v4::bytes_type bytes = addr.to_v4().to_bytes();
401 x.assign((char*)&bytes[0], bytes.size());
403 else
405 address_v6::bytes_type bytes = addr.to_v6().to_bytes();
406 x.assign((char*)&bytes[0], bytes.size());
408 x.append((char*)&t->torrent_file().info_hash()[0], 20);
410 sha1_hash hash = hasher(&x[0], x.size()).final();
411 for (;;)
413 char* p = (char*)&hash[0];
414 for (int i = 0; i < 5; ++i)
416 int piece = detail::read_uint32(p) % num_pieces;
417 if (m_accept_fast.find(piece) == m_accept_fast.end())
419 #ifdef TORRENT_VERBOSE_LOGGING
420 (*m_logger) << time_now_string()
421 << " ==> ALLOWED_FAST [ " << piece << " ]\n";
422 #endif
423 write_allow_fast(piece);
424 m_accept_fast.insert(piece);
425 if (int(m_accept_fast.size()) >= num_allowed_pieces
426 || int(m_accept_fast.size()) == num_pieces) return;
429 hash = hasher((char*)&hash[0], 20).final();
433 void peer_connection::init()
435 INVARIANT_CHECK;
437 boost::shared_ptr<torrent> t = m_torrent.lock();
438 TORRENT_ASSERT(t);
439 TORRENT_ASSERT(t->valid_metadata());
440 TORRENT_ASSERT(t->ready_for_connections());
442 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
443 if (m_have_all) m_num_pieces = t->torrent_file().num_pieces();
445 // now that we have a piece_picker,
446 // update it with this peer's pieces
448 TORRENT_ASSERT(m_num_pieces == std::count(m_have_piece.begin()
449 , m_have_piece.end(), true));
451 if (m_num_pieces == int(m_have_piece.size()))
453 #ifdef TORRENT_VERBOSE_LOGGING
454 (*m_logger) << " *** THIS IS A SEED ***\n";
455 #endif
456 // if this is a web seed. we don't have a peer_info struct
457 if (m_peer_info) m_peer_info->seed = true;
459 t->peer_has_all();
460 if (t->is_finished()) send_not_interested();
461 else t->get_policy().peer_is_interesting(*this);
462 return;
465 // if we're a seed, we don't keep track of piece availability
466 if (!t->is_seed())
468 t->peer_has(m_have_piece);
469 bool interesting = false;
470 for (int i = 0; i < int(m_have_piece.size()); ++i)
472 if (m_have_piece[i])
474 // if the peer has a piece and we don't, the peer is interesting
475 if (!t->have_piece(i)
476 && t->picker().piece_priority(i) != 0)
477 interesting = true;
480 if (interesting) t->get_policy().peer_is_interesting(*this);
481 else send_not_interested();
483 else
485 update_interest();
489 peer_connection::~peer_connection()
491 // INVARIANT_CHECK;
492 TORRENT_ASSERT(!m_in_constructor);
493 TORRENT_ASSERT(m_disconnecting);
494 TORRENT_ASSERT(m_disconnect_started);
496 m_disk_recv_buffer_size = 0;
498 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
499 if (m_logger)
501 (*m_logger) << time_now_string()
502 << " *** CONNECTION CLOSED\n";
504 #endif
505 TORRENT_ASSERT(!m_ses.has_peer(this));
506 #ifndef NDEBUG
507 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
508 , end(m_ses.m_torrents.end()); i != end; ++i)
509 TORRENT_ASSERT(!i->second->has_peer(this));
510 if (m_peer_info)
511 TORRENT_ASSERT(m_peer_info->connection == 0);
513 boost::shared_ptr<torrent> t = m_torrent.lock();
514 #endif
517 void peer_connection::fast_reconnect(bool r)
519 if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1)
520 return;
521 m_fast_reconnect = r;
522 peer_info_struct()->connected = time_now()
523 - seconds(m_ses.settings().min_reconnect_time
524 * m_ses.settings().max_failcount);
525 ++peer_info_struct()->fast_reconnects;
528 void peer_connection::announce_piece(int index)
530 // dont announce during handshake
531 if (in_handshake()) return;
533 // remove suggested pieces that we have
534 std::vector<int>::iterator i = std::find(
535 m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
536 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
538 if (has_piece(index))
540 // if we got a piece that this peer has
541 // it might have been the last interesting
542 // piece this peer had. We might not be
543 // interested anymore
544 update_interest();
545 if (is_disconnecting()) return;
547 // optimization, don't send have messages
548 // to peers that already have the piece
549 if (!m_ses.settings().send_redundant_have)
551 #ifdef TORRENT_VERBOSE_LOGGING
552 (*m_logger) << time_now_string()
553 << " ==> HAVE [ piece: " << index << " ] SUPRESSED\n";
554 #endif
555 return;
559 #ifdef TORRENT_VERBOSE_LOGGING
560 (*m_logger) << time_now_string()
561 << " ==> HAVE [ piece: " << index << "]\n";
562 #endif
563 write_have(index);
564 #ifndef NDEBUG
565 boost::shared_ptr<torrent> t = m_torrent.lock();
566 TORRENT_ASSERT(t);
567 TORRENT_ASSERT(t->have_piece(index));
568 #endif
571 bool peer_connection::has_piece(int i) const
573 boost::shared_ptr<torrent> t = m_torrent.lock();
574 TORRENT_ASSERT(t);
575 TORRENT_ASSERT(t->valid_metadata());
576 TORRENT_ASSERT(i >= 0);
577 TORRENT_ASSERT(i < t->torrent_file().num_pieces());
578 return m_have_piece[i];
581 std::deque<piece_block> const& peer_connection::request_queue() const
583 return m_request_queue;
586 std::deque<pending_block> const& peer_connection::download_queue() const
588 return m_download_queue;
591 std::deque<peer_request> const& peer_connection::upload_queue() const
593 return m_requests;
596 void peer_connection::add_stat(size_type downloaded, size_type uploaded)
598 m_statistics.add_stat(downloaded, uploaded);
601 bitfield const& peer_connection::get_bitfield() const
603 return m_have_piece;
606 void peer_connection::received_valid_data(int index)
608 INVARIANT_CHECK;
610 #ifndef TORRENT_DISABLE_EXTENSIONS
611 for (extension_list_t::iterator i = m_extensions.begin()
612 , end(m_extensions.end()); i != end; ++i)
614 #ifdef BOOST_NO_EXCEPTIONS
615 (*i)->on_piece_pass(index);
616 #else
617 try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
618 #endif
620 #endif
623 void peer_connection::received_invalid_data(int index)
625 INVARIANT_CHECK;
627 #ifndef TORRENT_DISABLE_EXTENSIONS
628 for (extension_list_t::iterator i = m_extensions.begin()
629 , end(m_extensions.end()); i != end; ++i)
631 #ifdef BOOST_NO_EXCEPTIONS
632 (*i)->on_piece_failed(index);
633 #else
634 try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
635 #endif
637 #endif
638 if (is_disconnecting()) return;
640 if (peer_info_struct())
642 if (m_ses.settings().use_parole_mode)
643 peer_info_struct()->on_parole = true;
645 ++peer_info_struct()->hashfails;
646 boost::int8_t& trust_points = peer_info_struct()->trust_points;
648 // we decrease more than we increase, to keep the
649 // allowed failed/passed ratio low.
650 // TODO: make this limit user settable
651 trust_points -= 2;
652 if (trust_points < -7) trust_points = -7;
656 size_type peer_connection::total_free_upload() const
658 return m_free_upload;
661 void peer_connection::add_free_upload(size_type free_upload)
663 INVARIANT_CHECK;
665 m_free_upload += free_upload;
668 // verifies a piece to see if it is valid (is within a valid range)
669 // and if it can correspond to a request generated by libtorrent.
670 bool peer_connection::verify_piece(const peer_request& p) const
672 INVARIANT_CHECK;
674 boost::shared_ptr<torrent> t = m_torrent.lock();
675 TORRENT_ASSERT(t);
677 TORRENT_ASSERT(t->valid_metadata());
678 torrent_info const& ti = t->torrent_file();
680 return p.piece >= 0
681 && p.piece < t->torrent_file().num_pieces()
682 && p.length > 0
683 && p.start >= 0
684 && (p.length == t->block_size()
685 || (p.length < t->block_size()
686 && p.piece == ti.num_pieces()-1
687 && p.start + p.length == ti.piece_size(p.piece))
688 || (m_request_large_blocks
689 && p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
690 1 : m_prefer_whole_pieces))
691 && p.piece * size_type(ti.piece_length()) + p.start + p.length
692 <= ti.total_size()
693 && (p.start % t->block_size() == 0);
696 void peer_connection::attach_to_torrent(sha1_hash const& ih)
698 INVARIANT_CHECK;
700 TORRENT_ASSERT(!m_disconnecting);
701 TORRENT_ASSERT(m_torrent.expired());
702 boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
703 boost::shared_ptr<torrent> t = wpt.lock();
705 if (t && t->is_aborted())
707 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
708 (*m_logger) << " *** the torrent has been aborted\n";
709 #endif
710 t.reset();
713 if (!t)
715 // we couldn't find the torrent!
716 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
717 (*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
718 (*m_logger) << " torrents:\n";
719 session_impl::torrent_map const& torrents = m_ses.m_torrents;
720 for (session_impl::torrent_map::const_iterator i = torrents.begin()
721 , end(torrents.end()); i != end; ++i)
723 (*m_logger) << " " << i->second->torrent_file().info_hash() << "\n";
725 #endif
726 disconnect("got invalid info-hash", 2);
727 return;
730 if (t->is_paused())
732 // paused torrents will not accept
733 // incoming connections
734 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
735 (*m_logger) << " rejected connection to paused torrent\n";
736 #endif
737 disconnect("connection rejected bacause torrent is paused");
738 return;
741 TORRENT_ASSERT(m_torrent.expired());
742 // check to make sure we don't have another connection with the same
743 // info_hash and peer_id. If we do. close this connection.
744 #ifndef NDEBUG
747 #endif
748 t->attach_peer(this);
749 #ifndef NDEBUG
751 catch (std::exception& e)
753 std::cout << e.what() << std::endl;
754 TORRENT_ASSERT(false);
756 #endif
757 if (m_disconnecting) return;
758 m_torrent = wpt;
760 TORRENT_ASSERT(!m_torrent.expired());
762 // if the torrent isn't ready to accept
763 // connections yet, we'll have to wait with
764 // our initialization
765 if (t->ready_for_connections()) init();
767 TORRENT_ASSERT(!m_torrent.expired());
769 // assume the other end has no pieces
770 // if we don't have valid metadata yet,
771 // leave the vector unallocated
772 TORRENT_ASSERT(m_num_pieces == 0);
773 m_have_piece.clear_all();
774 TORRENT_ASSERT(!m_torrent.expired());
777 // message handlers
779 // -----------------------------
780 // --------- KEEPALIVE ---------
781 // -----------------------------
783 void peer_connection::incoming_keepalive()
785 INVARIANT_CHECK;
787 #ifdef TORRENT_VERBOSE_LOGGING
788 (*m_logger) << time_now_string() << " <== KEEPALIVE\n";
789 #endif
792 // -----------------------------
793 // ----------- CHOKE -----------
794 // -----------------------------
796 void peer_connection::incoming_choke()
798 INVARIANT_CHECK;
800 boost::shared_ptr<torrent> t = m_torrent.lock();
801 TORRENT_ASSERT(t);
803 #ifndef TORRENT_DISABLE_EXTENSIONS
804 for (extension_list_t::iterator i = m_extensions.begin()
805 , end(m_extensions.end()); i != end; ++i)
807 if ((*i)->on_choke()) return;
809 #endif
810 if (is_disconnecting()) return;
812 #ifdef TORRENT_VERBOSE_LOGGING
813 (*m_logger) << time_now_string() << " <== CHOKE\n";
814 #endif
815 m_peer_choked = true;
817 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
819 // if the peer is not in parole mode, clear the queued
820 // up block requests
821 if (!t->is_seed())
823 piece_picker& p = t->picker();
824 for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
825 , end(m_request_queue.end()); i != end; ++i)
827 // since this piece was skipped, clear it and allow it to
828 // be requested from other peers
829 p.abort_download(*i);
832 m_request_queue.clear();
836 bool match_request(peer_request const& r, piece_block const& b, int block_size)
838 if (b.piece_index != r.piece) return false;
839 if (b.block_index != r.start / block_size) return false;
840 if (r.start % block_size != 0) return false;
841 return true;
844 // -----------------------------
845 // -------- REJECT PIECE -------
846 // -----------------------------
848 void peer_connection::incoming_reject_request(peer_request const& r)
850 INVARIANT_CHECK;
852 boost::shared_ptr<torrent> t = m_torrent.lock();
853 TORRENT_ASSERT(t);
855 #ifndef TORRENT_DISABLE_EXTENSIONS
856 for (extension_list_t::iterator i = m_extensions.begin()
857 , end(m_extensions.end()); i != end; ++i)
859 if ((*i)->on_reject(r)) return;
861 #endif
863 if (is_disconnecting()) return;
865 std::deque<pending_block>::iterator i = std::find_if(
866 m_download_queue.begin(), m_download_queue.end()
867 , bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
868 , t->block_size()));
870 #ifdef TORRENT_VERBOSE_LOGGING
871 (*m_logger) << time_now_string()
872 << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
873 #endif
875 piece_block b(-1, 0);
876 if (i != m_download_queue.end())
878 b = i->block;
879 m_download_queue.erase(i);
881 // if the peer is in parole mode, keep the request
882 if (peer_info_struct() && peer_info_struct()->on_parole)
884 m_request_queue.push_front(b);
886 else if (!t->is_seed())
888 piece_picker& p = t->picker();
889 p.abort_download(b);
892 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
893 else
895 (*m_logger) << time_now_string()
896 << " *** PIECE NOT IN REQUEST QUEUE\n";
898 #endif
899 if (has_peer_choked())
901 // if we're choked and we got a rejection of
902 // a piece in the allowed fast set, remove it
903 // from the allow fast set.
904 std::vector<int>::iterator i = std::find(
905 m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
906 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
908 else
910 std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
911 , m_suggested_pieces.end(), r.piece);
912 if (i != m_suggested_pieces.end())
913 m_suggested_pieces.erase(i);
916 if (m_request_queue.empty() && m_download_queue.size() < 2)
918 request_a_block(*t, *this);
919 send_block_requests();
923 // -----------------------------
924 // ------- SUGGEST PIECE -------
925 // -----------------------------
927 void peer_connection::incoming_suggest(int index)
929 INVARIANT_CHECK;
931 #ifdef TORRENT_VERBOSE_LOGGING
932 (*m_logger) << time_now_string()
933 << " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
934 #endif
935 boost::shared_ptr<torrent> t = m_torrent.lock();
936 if (!t) return;
938 #ifndef TORRENT_DISABLE_EXTENSIONS
939 for (extension_list_t::iterator i = m_extensions.begin()
940 , end(m_extensions.end()); i != end; ++i)
942 if ((*i)->on_suggest(index)) return;
944 #endif
946 if (is_disconnecting()) return;
947 if (t->have_piece(index)) return;
949 if (m_suggested_pieces.size() > 9)
950 m_suggested_pieces.erase(m_suggested_pieces.begin());
951 m_suggested_pieces.push_back(index);
953 #ifdef TORRENT_VERBOSE_LOGGING
954 (*m_logger) << time_now_string()
955 << " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
956 #endif
959 // -----------------------------
960 // ---------- UNCHOKE ----------
961 // -----------------------------
963 void peer_connection::incoming_unchoke()
965 INVARIANT_CHECK;
967 boost::shared_ptr<torrent> t = m_torrent.lock();
968 TORRENT_ASSERT(t);
970 #ifndef TORRENT_DISABLE_EXTENSIONS
971 for (extension_list_t::iterator i = m_extensions.begin()
972 , end(m_extensions.end()); i != end; ++i)
974 if ((*i)->on_unchoke()) return;
976 #endif
978 #ifdef TORRENT_VERBOSE_LOGGING
979 (*m_logger) << time_now_string() << " <== UNCHOKE\n";
980 #endif
981 m_peer_choked = false;
982 if (is_disconnecting()) return;
984 t->get_policy().unchoked(*this);
987 // -----------------------------
988 // -------- INTERESTED ---------
989 // -----------------------------
991 void peer_connection::incoming_interested()
993 INVARIANT_CHECK;
995 boost::shared_ptr<torrent> t = m_torrent.lock();
996 TORRENT_ASSERT(t);
998 #ifndef TORRENT_DISABLE_EXTENSIONS
999 for (extension_list_t::iterator i = m_extensions.begin()
1000 , end(m_extensions.end()); i != end; ++i)
1002 if ((*i)->on_interested()) return;
1004 #endif
1006 #ifdef TORRENT_VERBOSE_LOGGING
1007 (*m_logger) << time_now_string() << " <== INTERESTED\n";
1008 #endif
1009 m_peer_interested = true;
1010 if (is_disconnecting()) return;
1011 t->get_policy().interested(*this);
1014 // -----------------------------
1015 // ------ NOT INTERESTED -------
1016 // -----------------------------
1018 void peer_connection::incoming_not_interested()
1020 INVARIANT_CHECK;
1022 #ifndef TORRENT_DISABLE_EXTENSIONS
1023 for (extension_list_t::iterator i = m_extensions.begin()
1024 , end(m_extensions.end()); i != end; ++i)
1026 if ((*i)->on_not_interested()) return;
1028 #endif
1030 m_became_uninterested = time_now();
1032 #ifdef TORRENT_VERBOSE_LOGGING
1033 (*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
1034 #endif
1035 m_peer_interested = false;
1036 if (is_disconnecting()) return;
1038 boost::shared_ptr<torrent> t = m_torrent.lock();
1039 TORRENT_ASSERT(t);
1041 t->get_policy().not_interested(*this);
1044 // -----------------------------
1045 // ----------- HAVE ------------
1046 // -----------------------------
1048 void peer_connection::incoming_have(int index)
1050 INVARIANT_CHECK;
1052 boost::shared_ptr<torrent> t = m_torrent.lock();
1053 TORRENT_ASSERT(t);
1055 #ifndef TORRENT_DISABLE_EXTENSIONS
1056 for (extension_list_t::iterator i = m_extensions.begin()
1057 , end(m_extensions.end()); i != end; ++i)
1059 if ((*i)->on_have(index)) return;
1061 #endif
1063 if (is_disconnecting()) return;
1065 // if we haven't received a bitfield, it was
1066 // probably omitted, which is the same as 'have_none'
1067 if (!m_bitfield_received) incoming_have_none();
1069 #ifdef TORRENT_VERBOSE_LOGGING
1070 (*m_logger) << time_now_string()
1071 << " <== HAVE [ piece: " << index << "]\n";
1072 #endif
1074 if (!t->valid_metadata() && index > int(m_have_piece.size()))
1076 if (index < 65536)
1078 // if we don't have metadata
1079 // and we might not have received a bitfield
1080 // extend the bitmask to fit the new
1081 // have message
1082 m_have_piece.resize(index + 1, false);
1084 else
1086 // unless the index > 64k, in which case
1087 // we just ignore it
1088 return;
1092 // if we got an invalid message, abort
1093 if (index >= int(m_have_piece.size()) || index < 0)
1095 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1096 return;
1099 if (m_have_piece[index])
1101 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1102 (*m_logger) << " got redundant HAVE message for index: " << index << "\n";
1103 #endif
1105 else
1107 m_have_piece.set_bit(index);
1109 // only update the piece_picker if
1110 // we have the metadata and if
1111 // we're not a seed (in which case
1112 // we won't have a piece picker)
1113 if (t->valid_metadata())
1115 ++m_num_pieces;
1116 t->peer_has(index);
1118 if (!t->have_piece(index)
1119 && !t->is_seed()
1120 && !is_interesting()
1121 && t->picker().piece_priority(index) != 0)
1122 t->get_policy().peer_is_interesting(*this);
1124 // this will disregard all have messages we get within
1125 // the first two seconds. Since some clients implements
1126 // lazy bitfields, these will not be reliable to use
1127 // for an estimated peer download rate.
1128 if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
1130 // update bytes downloaded since last timer
1131 m_remote_bytes_dled += t->torrent_file().piece_size(index);
1135 if (is_seed())
1137 m_peer_info->seed = true;
1138 m_upload_only = true;
1139 disconnect_if_redundant();
1140 if (is_disconnecting()) return;
1145 // -----------------------------
1146 // --------- BITFIELD ----------
1147 // -----------------------------
1149 void peer_connection::incoming_bitfield(bitfield const& bits)
1151 INVARIANT_CHECK;
1153 boost::shared_ptr<torrent> t = m_torrent.lock();
1154 TORRENT_ASSERT(t);
1156 #ifndef TORRENT_DISABLE_EXTENSIONS
1157 for (extension_list_t::iterator i = m_extensions.begin()
1158 , end(m_extensions.end()); i != end; ++i)
1160 if ((*i)->on_bitfield(bits)) return;
1162 #endif
1164 if (is_disconnecting()) return;
1166 #ifdef TORRENT_VERBOSE_LOGGING
1167 (*m_logger) << time_now_string() << " <== BITFIELD ";
1169 for (int i = 0; i < int(bits.size()); ++i)
1171 if (bits[i]) (*m_logger) << "1";
1172 else (*m_logger) << "0";
1174 (*m_logger) << "\n";
1175 #endif
1177 // if we don't have the metedata, we cannot
1178 // verify the bitfield size
1179 if (t->valid_metadata()
1180 && (bits.size() / 8) != (m_have_piece.size() / 8))
1182 std::stringstream msg;
1183 msg << "got bitfield with invalid size: " << (bits.size() / 8)
1184 << "bytes. expected: " << (m_have_piece.size() / 8)
1185 << " bytes";
1186 disconnect(msg.str().c_str(), 2);
1187 return;
1190 m_bitfield_received = true;
1192 // if we don't have metadata yet
1193 // just remember the bitmask
1194 // don't update the piecepicker
1195 // (since it doesn't exist yet)
1196 if (!t->ready_for_connections())
1198 m_have_piece = bits;
1199 m_num_pieces = bits.count();
1200 if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bits.size()));
1201 return;
1204 TORRENT_ASSERT(t->valid_metadata());
1206 int num_pieces = bits.count();
1207 if (num_pieces == int(m_have_piece.size()))
1209 #ifdef TORRENT_VERBOSE_LOGGING
1210 (*m_logger) << " *** THIS IS A SEED ***\n";
1211 #endif
1212 // if this is a web seed. we don't have a peer_info struct
1213 if (m_peer_info) m_peer_info->seed = true;
1214 m_upload_only = true;
1216 m_have_piece.set_all();
1217 m_num_pieces = num_pieces;
1218 t->peer_has_all();
1219 if (!t->is_finished())
1220 t->get_policy().peer_is_interesting(*this);
1222 disconnect_if_redundant();
1224 return;
1227 // let the torrent know which pieces the
1228 // peer has
1229 // if we're a seed, we don't keep track of piece availability
1230 bool interesting = false;
1231 if (!t->is_seed())
1233 t->peer_has(bits);
1235 for (int i = 0; i < (int)m_have_piece.size(); ++i)
1237 bool have = bits[i];
1238 if (have && !m_have_piece[i])
1240 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
1241 interesting = true;
1243 else if (!have && m_have_piece[i])
1245 // this should probably not be allowed
1246 t->peer_lost(i);
1251 m_have_piece = bits;
1252 m_num_pieces = num_pieces;
1254 if (interesting) t->get_policy().peer_is_interesting(*this);
1255 else if (upload_only()) disconnect("upload to upload connections");
1258 void peer_connection::disconnect_if_redundant()
1260 if (!m_ses.settings().close_redundant_connections) return;
1262 boost::shared_ptr<torrent> t = m_torrent.lock();
1263 TORRENT_ASSERT(t);
1264 if (m_upload_only && t->is_finished())
1265 disconnect("seed to seed");
1267 if (m_upload_only
1268 && !m_interesting
1269 && m_bitfield_received
1270 && t->are_files_checked())
1271 disconnect("uninteresting upload-only peer");
1274 // -----------------------------
1275 // ---------- REQUEST ----------
1276 // -----------------------------
1278 void peer_connection::incoming_request(peer_request const& r)
1280 INVARIANT_CHECK;
1282 boost::shared_ptr<torrent> t = m_torrent.lock();
1283 TORRENT_ASSERT(t);
1285 // if we haven't received a bitfield, it was
1286 // probably omitted, which is the same as 'have_none'
1287 if (!m_bitfield_received) incoming_have_none();
1289 #ifndef TORRENT_DISABLE_EXTENSIONS
1290 for (extension_list_t::iterator i = m_extensions.begin()
1291 , end(m_extensions.end()); i != end; ++i)
1293 if ((*i)->on_request(r)) return;
1295 #endif
1296 if (is_disconnecting()) return;
1298 if (!t->valid_metadata())
1300 // if we don't have valid metadata yet,
1301 // we shouldn't get a request
1302 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1303 (*m_logger) << time_now_string()
1304 << " <== UNEXPECTED_REQUEST [ "
1305 "piece: " << r.piece << " | "
1306 "s: " << r.start << " | "
1307 "l: " << r.length << " | "
1308 "i: " << m_peer_interested << " | "
1309 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1310 "n: " << t->torrent_file().num_pieces() << " ]\n";
1312 (*m_logger) << time_now_string()
1313 << " ==> REJECT_PIECE [ "
1314 "piece: " << r.piece << " | "
1315 "s: " << r.start << " | "
1316 "l: " << r.length << " ]\n";
1317 #endif
1318 write_reject_request(r);
1319 return;
1322 if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
1324 // don't allow clients to abuse our
1325 // memory consumption.
1326 // ignore requests if the client
1327 // is making too many of them.
1328 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1329 (*m_logger) << time_now_string()
1330 << " <== TOO MANY REQUESTS [ "
1331 "piece: " << r.piece << " | "
1332 "s: " << r.start << " | "
1333 "l: " << r.length << " | "
1334 "i: " << m_peer_interested << " | "
1335 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1336 "n: " << t->torrent_file().num_pieces() << " ]\n";
1338 (*m_logger) << time_now_string()
1339 << " ==> REJECT_PIECE [ "
1340 "piece: " << r.piece << " | "
1341 "s: " << r.start << " | "
1342 "l: " << r.length << " ]\n";
1343 #endif
1344 write_reject_request(r);
1345 return;
1348 // make sure this request
1349 // is legal and that the peer
1350 // is not choked
1351 if (r.piece >= 0
1352 && r.piece < t->torrent_file().num_pieces()
1353 && t->have_piece(r.piece)
1354 && r.start >= 0
1355 && r.start < t->torrent_file().piece_size(r.piece)
1356 && r.length > 0
1357 && r.length + r.start <= t->torrent_file().piece_size(r.piece)
1358 && m_peer_interested
1359 && r.length <= t->block_size())
1361 #ifdef TORRENT_VERBOSE_LOGGING
1362 (*m_logger) << time_now_string()
1363 << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1364 #endif
1365 // if we have choked the client
1366 // ignore the request
1367 if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
1369 write_reject_request(r);
1370 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1371 (*m_logger) << time_now_string()
1372 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1373 (*m_logger) << time_now_string()
1374 << " ==> REJECT_PIECE [ "
1375 "piece: " << r.piece << " | "
1376 "s: " << r.start << " | "
1377 "l: " << r.length << " ]\n";
1378 #endif
1380 else
1382 m_requests.push_back(r);
1383 m_last_incoming_request = time_now();
1384 fill_send_buffer();
1387 else
1389 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1390 (*m_logger) << time_now_string()
1391 << " <== INVALID_REQUEST [ "
1392 "piece: " << r.piece << " | "
1393 "s: " << r.start << " | "
1394 "l: " << r.length << " | "
1395 "i: " << m_peer_interested << " | "
1396 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1397 "n: " << t->torrent_file().num_pieces() << " | "
1398 "h: " << t->have_piece(r.piece) << " | "
1399 "block_limit: " << t->block_size() << " ]\n";
1401 (*m_logger) << time_now_string()
1402 << " ==> REJECT_PIECE [ "
1403 "piece: " << r.piece << " | "
1404 "s: " << r.start << " | "
1405 "l: " << r.length << " ]\n";
1406 #endif
1408 write_reject_request(r);
1409 ++m_num_invalid_requests;
1411 if (t->alerts().should_post<invalid_request_alert>())
1413 t->alerts().post_alert(invalid_request_alert(
1414 t->get_handle(), m_remote, m_peer_id, r));
1419 void peer_connection::incoming_piece_fragment()
1421 m_last_piece = time_now();
1424 #ifndef NDEBUG
1425 struct check_postcondition
1427 check_postcondition(boost::shared_ptr<torrent> const& t_
1428 , bool init_check = true): t(t_) { if (init_check) check(); }
1430 ~check_postcondition() { check(); }
1432 void check()
1434 if (!t->is_seed())
1436 const int blocks_per_piece = static_cast<int>(
1437 t->torrent_file().piece_length() / t->block_size());
1439 std::vector<piece_picker::downloading_piece> const& dl_queue
1440 = t->picker().get_download_queue();
1442 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
1443 dl_queue.begin(); i != dl_queue.end(); ++i)
1445 TORRENT_ASSERT(i->finished <= blocks_per_piece);
1450 shared_ptr<torrent> t;
1452 #endif
1455 // -----------------------------
1456 // ----------- PIECE -----------
1457 // -----------------------------
1459 void peer_connection::incoming_piece(peer_request const& p, char const* data)
1461 char* buffer = m_ses.allocate_disk_buffer();
1462 if (buffer == 0)
1464 disconnect("out of memory");
1465 return;
1467 disk_buffer_holder holder(m_ses, buffer);
1468 std::memcpy(buffer, data, p.length);
1469 incoming_piece(p, holder);
1472 void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data)
1474 INVARIANT_CHECK;
1476 boost::shared_ptr<torrent> t = m_torrent.lock();
1477 TORRENT_ASSERT(t);
1479 TORRENT_ASSERT(!m_disk_recv_buffer);
1480 TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
1482 #ifdef TORRENT_CORRUPT_DATA
1483 // corrupt all pieces from certain peers
1484 if (m_remote.address().is_v4()
1485 && (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
1487 data.get()[0] = ~data.get()[0];
1489 #endif
1491 // if we haven't received a bitfield, it was
1492 // probably omitted, which is the same as 'have_none'
1493 if (!m_bitfield_received) incoming_have_none();
1495 #ifndef TORRENT_DISABLE_EXTENSIONS
1496 for (extension_list_t::iterator i = m_extensions.begin()
1497 , end(m_extensions.end()); i != end; ++i)
1499 if ((*i)->on_piece(p, data)) return;
1501 #endif
1502 if (is_disconnecting()) return;
1504 #ifndef NDEBUG
1505 check_postcondition post_checker_(t);
1506 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS
1507 t->check_invariant();
1508 #endif
1509 #endif
1511 #ifdef TORRENT_VERBOSE_LOGGING
1512 (*m_logger) << time_now_string()
1513 << " <== PIECE [ piece: " << p.piece << " | "
1514 "s: " << p.start << " | "
1515 "l: " << p.length << " | "
1516 "ds: " << statistics().download_rate() << " | "
1517 "qs: " << m_desired_queue_size << " ]\n";
1518 #endif
1520 if (p.length == 0)
1522 if (t->alerts().should_post<peer_error_alert>())
1524 t->alerts().post_alert(peer_error_alert(t->get_handle(), m_remote
1525 , m_peer_id, "peer sent 0 length piece"));
1527 return;
1530 if (!verify_piece(p))
1532 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1533 (*m_logger) << time_now_string()
1534 << " <== INVALID_PIECE [ piece: " << p.piece << " | "
1535 "start: " << p.start << " | "
1536 "length: " << p.length << " ]\n";
1537 #endif
1538 disconnect("got invalid piece packet", 2);
1539 return;
1542 // if we're already seeding, don't bother,
1543 // just ignore it
1544 if (t->is_seed())
1546 t->add_redundant_bytes(p.length);
1547 return;
1550 ptime now = time_now();
1552 piece_picker& picker = t->picker();
1553 piece_manager& fs = t->filesystem();
1555 std::vector<piece_block> finished_blocks;
1556 piece_block block_finished(p.piece, p.start / t->block_size());
1557 TORRENT_ASSERT(p.start % t->block_size() == 0);
1558 TORRENT_ASSERT(p.length == t->block_size()
1559 || p.length == t->torrent_file().total_size() % t->block_size());
1561 std::deque<pending_block>::iterator b
1562 = std::find_if(
1563 m_download_queue.begin()
1564 , m_download_queue.end()
1565 , has_block(block_finished));
1567 if (b == m_download_queue.end())
1569 if (t->alerts().should_post<unwanted_block_alert>())
1571 t->alerts().post_alert(unwanted_block_alert(t->get_handle(), m_remote
1572 , m_peer_id, block_finished.block_index, block_finished.piece_index));
1574 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1575 (*m_logger) << " *** The block we just got was not in the "
1576 "request queue ***\n";
1577 #endif
1578 t->add_redundant_bytes(p.length);
1579 request_a_block(*t, *this);
1580 send_block_requests();
1581 return;
1584 int block_index = b - m_download_queue.begin();
1585 for (int i = 0; i < block_index; ++i)
1587 pending_block& qe = m_download_queue[i];
1589 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1590 (*m_logger) << time_now_string()
1591 << " *** SKIPPED_PIECE [ piece: " << qe.block.piece_index << " | "
1592 "b: " << qe.block.block_index << " ] ***\n";
1593 #endif
1595 ++qe.skipped;
1596 // if the number of times a block is skipped by out of order
1597 // blocks exceeds the size of the outstanding queue, assume that
1598 // the other end dropped the request.
1599 if (qe.skipped > m_desired_queue_size)
1601 if (m_ses.m_alerts.should_post<request_dropped_alert>())
1602 m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
1603 , remote(), pid(), qe.block.block_index, qe.block.piece_index));
1604 picker.abort_download(qe.block);
1605 m_download_queue.erase(m_download_queue.begin() + i);
1606 --i;
1607 --block_index;
1611 // if the block we got is already finished, then ignore it
1612 if (picker.is_downloaded(block_finished))
1614 t->add_redundant_bytes(p.length);
1616 m_download_queue.erase(b);
1617 m_timeout_extend = 0;
1619 if (!m_download_queue.empty())
1620 m_requested = now;
1622 request_a_block(*t, *this);
1623 send_block_requests();
1624 return;
1627 if (total_seconds(now - m_requested)
1628 < m_ses.settings().request_timeout
1629 && m_snubbed)
1631 m_snubbed = false;
1632 if (m_ses.m_alerts.should_post<peer_unsnubbed_alert>())
1634 m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle()
1635 , m_remote, m_peer_id));
1639 fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
1640 , self(), _1, _2, p, t));
1641 m_outstanding_writing_bytes += p.length;
1642 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
1643 m_download_queue.erase(b);
1645 if (m_outstanding_writing_bytes >= m_ses.settings().max_outstanding_disk_bytes_per_connection
1646 && t->alerts().should_post<performance_alert>())
1648 t->alerts().post_alert(performance_alert(t->get_handle()
1649 , performance_alert::outstanding_disk_buffer_limit_reached));
1652 if (!m_download_queue.empty())
1654 m_timeout_extend = (std::max)(m_timeout_extend
1655 - m_ses.settings().request_timeout, 0);
1656 m_requested += seconds(m_ses.settings().request_timeout);
1657 if (m_requested > now) m_requested = now;
1659 else
1661 m_timeout_extend = 0;
1664 // did we request this block from any other peers?
1665 bool multi = picker.num_peers(block_finished) > 1;
1666 picker.mark_as_writing(block_finished, peer_info_struct());
1668 // if we requested this block from other peers, cancel it now
1669 if (multi) t->cancel_block(block_finished);
1671 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
1672 t->check_invariant();
1673 #endif
1674 request_a_block(*t, *this);
1675 send_block_requests();
1678 void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
1679 , peer_request p, boost::shared_ptr<torrent> t)
1681 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1683 INVARIANT_CHECK;
1685 m_outstanding_writing_bytes -= p.length;
1686 TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
1688 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1689 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1690 // << p.piece << " o: " << p.start << " ]\n";
1691 #endif
1692 // in case the outstanding bytes just dropped down
1693 // to allow to receive more data
1694 setup_receive();
1696 piece_block block_finished(p.piece, p.start / t->block_size());
1698 if (ret == -1 || !t)
1700 if (t->has_picker()) t->picker().write_failed(block_finished);
1702 if (!t)
1704 disconnect(j.str.c_str());
1705 return;
1708 if (t->alerts().should_post<file_error_alert>())
1710 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
1712 t->pause();
1713 return;
1716 if (t->is_seed()) return;
1718 piece_picker& picker = t->picker();
1720 TORRENT_ASSERT(p.piece == j.piece);
1721 TORRENT_ASSERT(p.start == j.offset);
1722 picker.mark_as_finished(block_finished, peer_info_struct());
1723 if (t->alerts().should_post<block_finished_alert>())
1725 t->alerts().post_alert(block_finished_alert(t->get_handle(),
1726 remote(), pid(), block_finished.block_index, block_finished.piece_index));
1729 // did we just finish the piece?
1730 if (picker.is_piece_finished(p.piece))
1732 #ifndef NDEBUG
1733 check_postcondition post_checker2_(t, false);
1734 #endif
1735 t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
1736 , p.piece, _1));
1739 if (!t->is_seed() && !m_torrent.expired())
1741 // this is a free function defined in policy.cpp
1742 request_a_block(*t, *this);
1743 send_block_requests();
1748 // -----------------------------
1749 // ---------- CANCEL -----------
1750 // -----------------------------
1752 void peer_connection::incoming_cancel(peer_request const& r)
1754 INVARIANT_CHECK;
1756 #ifndef TORRENT_DISABLE_EXTENSIONS
1757 for (extension_list_t::iterator i = m_extensions.begin()
1758 , end(m_extensions.end()); i != end; ++i)
1760 if ((*i)->on_cancel(r)) return;
1762 #endif
1763 if (is_disconnecting()) return;
1765 #ifdef TORRENT_VERBOSE_LOGGING
1766 (*m_logger) << time_now_string()
1767 << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1768 #endif
1770 std::deque<peer_request>::iterator i
1771 = std::find(m_requests.begin(), m_requests.end(), r);
1773 if (i != m_requests.end())
1775 m_requests.erase(i);
1776 #ifdef TORRENT_VERBOSE_LOGGING
1777 (*m_logger) << time_now_string()
1778 << " ==> REJECT_PIECE [ "
1779 "piece: " << r.piece << " | "
1780 "s: " << r.start << " | "
1781 "l: " << r.length << " ]\n";
1782 #endif
1783 write_reject_request(r);
1785 else
1787 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1788 (*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1789 #endif
1793 // -----------------------------
1794 // --------- DHT PORT ----------
1795 // -----------------------------
1797 void peer_connection::incoming_dht_port(int listen_port)
1799 INVARIANT_CHECK;
1801 #ifdef TORRENT_VERBOSE_LOGGING
1802 (*m_logger) << time_now_string()
1803 << " <== DHT_PORT [ p: " << listen_port << " ]\n";
1804 #endif
1805 #ifndef TORRENT_DISABLE_DHT
1806 m_ses.add_dht_node(udp::endpoint(
1807 m_remote.address(), listen_port));
1808 #endif
1811 // -----------------------------
1812 // --------- HAVE ALL ----------
1813 // -----------------------------
1815 void peer_connection::incoming_have_all()
1817 INVARIANT_CHECK;
1819 boost::shared_ptr<torrent> t = m_torrent.lock();
1820 TORRENT_ASSERT(t);
1822 #ifdef TORRENT_VERBOSE_LOGGING
1823 (*m_logger) << time_now_string() << " <== HAVE_ALL\n";
1824 #endif
1826 #ifndef TORRENT_DISABLE_EXTENSIONS
1827 for (extension_list_t::iterator i = m_extensions.begin()
1828 , end(m_extensions.end()); i != end; ++i)
1830 if ((*i)->on_have_all()) return;
1832 #endif
1833 if (is_disconnecting()) return;
1835 m_have_all = true;
1837 if (m_peer_info) m_peer_info->seed = true;
1838 m_upload_only = true;
1839 m_bitfield_received = true;
1841 #ifdef TORRENT_VERBOSE_LOGGING
1842 (*m_logger) << " *** THIS IS A SEED ***\n";
1843 #endif
1845 // if we don't have metadata yet
1846 // just remember the bitmask
1847 // don't update the piecepicker
1848 // (since it doesn't exist yet)
1849 if (!t->ready_for_connections())
1851 // assume seeds are interesting when we
1852 // don't even have the metadata
1853 t->get_policy().peer_is_interesting(*this);
1855 disconnect_if_redundant();
1856 // TODO: this might need something more
1857 // so that once we have the metadata
1858 // we can construct a full bitfield
1859 return;
1862 TORRENT_ASSERT(!m_have_piece.empty());
1863 m_have_piece.set_all();
1864 m_num_pieces = m_have_piece.size();
1866 t->peer_has_all();
1868 // if we're finished, we're not interested
1869 if (t->is_finished()) send_not_interested();
1870 else t->get_policy().peer_is_interesting(*this);
1872 disconnect_if_redundant();
1875 // -----------------------------
1876 // --------- HAVE NONE ---------
1877 // -----------------------------
1879 void peer_connection::incoming_have_none()
1881 INVARIANT_CHECK;
1883 #ifdef TORRENT_VERBOSE_LOGGING
1884 (*m_logger) << time_now_string() << " <== HAVE_NONE\n";
1885 #endif
1887 boost::shared_ptr<torrent> t = m_torrent.lock();
1888 TORRENT_ASSERT(t);
1890 #ifndef TORRENT_DISABLE_EXTENSIONS
1891 for (extension_list_t::iterator i = m_extensions.begin()
1892 , end(m_extensions.end()); i != end; ++i)
1894 if ((*i)->on_have_none()) return;
1896 #endif
1897 if (is_disconnecting()) return;
1898 if (m_peer_info) m_peer_info->seed = false;
1899 m_bitfield_received = true;
1901 // we're never interested in a peer that doesn't have anything
1902 send_not_interested();
1904 TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
1905 disconnect_if_redundant();
1908 // -----------------------------
1909 // ------- ALLOWED FAST --------
1910 // -----------------------------
1912 void peer_connection::incoming_allowed_fast(int index)
1914 INVARIANT_CHECK;
1916 boost::shared_ptr<torrent> t = m_torrent.lock();
1917 TORRENT_ASSERT(t);
1919 #ifdef TORRENT_VERBOSE_LOGGING
1920 (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
1921 #endif
1923 #ifndef TORRENT_DISABLE_EXTENSIONS
1924 for (extension_list_t::iterator i = m_extensions.begin()
1925 , end(m_extensions.end()); i != end; ++i)
1927 if ((*i)->on_allowed_fast(index)) return;
1929 #endif
1930 if (is_disconnecting()) return;
1932 if (index < 0 || index >= int(m_have_piece.size()))
1934 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1935 (*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
1936 << int(m_have_piece.size()) << " ]\n";
1937 #endif
1938 return;
1941 // if we already have the piece, we can
1942 // ignore this message
1943 if (t->valid_metadata()
1944 && t->have_piece(index))
1945 return;
1947 m_allowed_fast.push_back(index);
1949 // if the peer has the piece and we want
1950 // to download it, request it
1951 if (int(m_have_piece.size()) > index
1952 && m_have_piece[index]
1953 && t->has_picker()
1954 && t->picker().piece_priority(index) > 0)
1956 t->get_policy().peer_is_interesting(*this);
1960 std::vector<int> const& peer_connection::allowed_fast()
1962 boost::shared_ptr<torrent> t = m_torrent.lock();
1963 TORRENT_ASSERT(t);
1965 m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
1966 , m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
1967 , m_allowed_fast.end());
1969 // TODO: sort the allowed fast set in priority order
1970 return m_allowed_fast;
1973 void peer_connection::add_request(piece_block const& block)
1975 // INVARIANT_CHECK;
1977 boost::shared_ptr<torrent> t = m_torrent.lock();
1978 TORRENT_ASSERT(t);
1980 TORRENT_ASSERT(t->valid_metadata());
1981 TORRENT_ASSERT(block.piece_index >= 0);
1982 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
1983 TORRENT_ASSERT(block.block_index >= 0);
1984 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
1985 TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
1986 TORRENT_ASSERT(!t->have_piece(block.piece_index));
1987 TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
1988 , has_block(block)) == m_download_queue.end());
1989 TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
1990 , block) == m_request_queue.end());
1992 piece_picker::piece_state_t state;
1993 peer_speed_t speed = peer_speed();
1994 char const* speedmsg = 0;
1995 if (speed == fast)
1997 speedmsg = "fast";
1998 state = piece_picker::fast;
2000 else if (speed == medium)
2002 speedmsg = "medium";
2003 state = piece_picker::medium;
2005 else
2007 speedmsg = "slow";
2008 state = piece_picker::slow;
2011 if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
2012 return;
2014 if (t->alerts().should_post<block_downloading_alert>())
2016 t->alerts().post_alert(block_downloading_alert(t->get_handle(),
2017 remote(), pid(), speedmsg, block.block_index, block.piece_index));
2020 m_request_queue.push_back(block);
2023 void peer_connection::cancel_request(piece_block const& block)
2025 INVARIANT_CHECK;
2027 boost::shared_ptr<torrent> t = m_torrent.lock();
2028 // this peer might be disconnecting
2029 if (!t) return;
2031 TORRENT_ASSERT(t->valid_metadata());
2033 TORRENT_ASSERT(block.piece_index >= 0);
2034 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2035 TORRENT_ASSERT(block.block_index >= 0);
2036 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2038 // if all the peers that requested this block has been
2039 // cancelled, then just ignore the cancel.
2040 if (!t->picker().is_requested(block)) return;
2042 std::deque<pending_block>::iterator it
2043 = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
2044 if (it == m_download_queue.end())
2046 std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
2047 , m_request_queue.end(), block);
2049 // when a multi block is received, it is cancelled
2050 // from all peers, so if this one hasn't requested
2051 // the block, just ignore to cancel it.
2052 if (rit == m_request_queue.end()) return;
2054 t->picker().abort_download(block);
2055 m_request_queue.erase(rit);
2056 // since we found it in the request queue, it means it hasn't been
2057 // sent yet, so we don't have to send a cancel.
2058 return;
2061 int block_offset = block.block_index * t->block_size();
2062 int block_size
2063 = (std::min)(t->torrent_file().piece_size(block.piece_index)-block_offset,
2064 t->block_size());
2065 TORRENT_ASSERT(block_size > 0);
2066 TORRENT_ASSERT(block_size <= t->block_size());
2068 peer_request r;
2069 r.piece = block.piece_index;
2070 r.start = block_offset;
2071 r.length = block_size;
2073 #ifdef TORRENT_VERBOSE_LOGGING
2074 (*m_logger) << time_now_string()
2075 << " ==> CANCEL [ piece: " << block.piece_index << " | s: "
2076 << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
2077 #endif
2078 write_cancel(r);
2081 void peer_connection::send_choke()
2083 INVARIANT_CHECK;
2085 TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
2087 if (m_choked) return;
2088 write_choke();
2089 m_choked = true;
2091 #ifdef TORRENT_VERBOSE_LOGGING
2092 (*m_logger) << time_now_string() << " ==> CHOKE\n";
2093 #endif
2094 #ifndef NDEBUG
2095 m_last_choke = time_now();
2096 #endif
2097 m_num_invalid_requests = 0;
2099 // reject the requests we have in the queue
2100 // except the allowed fast pieces
2101 for (std::deque<peer_request>::iterator i = m_requests.begin();
2102 i != m_requests.end();)
2104 if (m_accept_fast.count(i->piece))
2106 ++i;
2107 continue;
2110 peer_request const& r = *i;
2111 write_reject_request(r);
2113 #ifdef TORRENT_VERBOSE_LOGGING
2114 (*m_logger) << time_now_string()
2115 << " ==> REJECT_PIECE [ "
2116 "piece: " << r.piece << " | "
2117 "s: " << r.start << " | "
2118 "l: " << r.length << " ]\n";
2119 #endif
2120 i = m_requests.erase(i);
2124 void peer_connection::send_unchoke()
2126 INVARIANT_CHECK;
2128 if (!m_choked) return;
2129 m_last_unchoke = time_now();
2130 write_unchoke();
2131 m_choked = false;
2133 #ifdef TORRENT_VERBOSE_LOGGING
2134 (*m_logger) << time_now_string() << " ==> UNCHOKE\n";
2135 #endif
2138 void peer_connection::send_interested()
2140 if (m_interesting) return;
2141 m_interesting = true;
2142 boost::shared_ptr<torrent> t = m_torrent.lock();
2143 if (!t->valid_metadata()) return;
2144 write_interested();
2146 #ifdef TORRENT_VERBOSE_LOGGING
2147 (*m_logger) << time_now_string() << " ==> INTERESTED\n";
2148 #endif
2151 void peer_connection::send_not_interested()
2153 if (!m_interesting) return;
2154 m_interesting = false;
2155 boost::shared_ptr<torrent> t = m_torrent.lock();
2156 if (!t->valid_metadata()) return;
2157 write_not_interested();
2159 m_became_uninteresting = time_now();
2161 #ifdef TORRENT_VERBOSE_LOGGING
2162 (*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
2163 #endif
2164 disconnect_if_redundant();
2167 void peer_connection::send_block_requests()
2169 INVARIANT_CHECK;
2171 boost::shared_ptr<torrent> t = m_torrent.lock();
2172 TORRENT_ASSERT(t);
2174 if ((int)m_download_queue.size() >= m_desired_queue_size) return;
2176 bool empty_download_queue = m_download_queue.empty();
2178 while (!m_request_queue.empty()
2179 && (int)m_download_queue.size() < m_desired_queue_size)
2181 piece_block block = m_request_queue.front();
2183 int block_offset = block.block_index * t->block_size();
2184 int block_size = (std::min)(t->torrent_file().piece_size(
2185 block.piece_index) - block_offset, t->block_size());
2186 TORRENT_ASSERT(block_size > 0);
2187 TORRENT_ASSERT(block_size <= t->block_size());
2189 peer_request r;
2190 r.piece = block.piece_index;
2191 r.start = block_offset;
2192 r.length = block_size;
2194 m_request_queue.pop_front();
2195 if (t->is_seed()) continue;
2196 // this can happen if a block times out, is re-requested and
2197 // then arrives "unexpectedly"
2198 if (t->picker().is_finished(block) || t->picker().is_downloaded(block))
2199 continue;
2201 m_download_queue.push_back(block);
2203 #ifdef TORRENT_VERBOSE_LOGGING
2204 (*m_logger) << time_now_string()
2205 << " *** REQUEST-QUEUE** [ "
2206 "piece: " << block.piece_index << " | "
2207 "block: " << block.block_index << " ]\n";
2208 #endif
2210 // if we are requesting large blocks, merge the smaller
2211 // blocks that are in the same piece into larger requests
2212 if (m_request_large_blocks)
2214 int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
2216 while (!m_request_queue.empty())
2218 // check to see if this block is connected to the previous one
2219 // if it is, merge them, otherwise, break this merge loop
2220 piece_block const& front = m_request_queue.front();
2221 if (front.piece_index * blocks_per_piece + front.block_index
2222 != block.piece_index * blocks_per_piece + block.block_index + 1)
2223 break;
2224 block = m_request_queue.front();
2225 m_request_queue.pop_front();
2226 m_download_queue.push_back(block);
2228 #ifdef TORRENT_VERBOSE_LOGGING
2229 (*m_logger) << time_now_string()
2230 << " *** MERGING REQUEST ** [ "
2231 "piece: " << block.piece_index << " | "
2232 "block: " << block.block_index << " ]\n";
2233 #endif
2235 block_offset = block.block_index * t->block_size();
2236 block_size = (std::min)(t->torrent_file().piece_size(
2237 block.piece_index) - block_offset, t->block_size());
2238 TORRENT_ASSERT(block_size > 0);
2239 TORRENT_ASSERT(block_size <= t->block_size());
2241 r.length += block_size;
2245 TORRENT_ASSERT(verify_piece(r));
2247 #ifndef TORRENT_DISABLE_EXTENSIONS
2248 bool handled = false;
2249 for (extension_list_t::iterator i = m_extensions.begin()
2250 , end(m_extensions.end()); i != end; ++i)
2252 if (handled = (*i)->write_request(r)) break;
2254 if (is_disconnecting()) return;
2255 if (!handled)
2257 write_request(r);
2258 m_last_request = time_now();
2260 #else
2261 write_request(r);
2262 m_last_request = time_now();
2263 #endif
2265 #ifdef TORRENT_VERBOSE_LOGGING
2266 (*m_logger) << time_now_string()
2267 << " ==> REQUEST [ "
2268 "piece: " << r.piece << " | "
2269 "s: " << r.start << " | "
2270 "l: " << r.length << " | "
2271 "ds: " << statistics().download_rate() << " B/s | "
2272 "qs: " << m_desired_queue_size << " "
2273 "blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
2274 #endif
2276 m_last_piece = time_now();
2278 if (!m_download_queue.empty()
2279 && empty_download_queue)
2281 // This means we just added a request to this connection
2282 m_requested = time_now();
2286 void peer_connection::timed_out()
2288 TORRENT_ASSERT(m_connecting);
2289 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2290 (*m_ses.m_logger) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote.address().to_string()
2291 << "\n";
2292 #endif
2293 disconnect("timed out: connect", 1);
2296 // the error argument defaults to 0, which means deliberate disconnect
2297 // 1 means unexpected disconnect/error
2298 // 2 protocol error (client sent something invalid)
2299 void peer_connection::disconnect(char const* message, int error)
2301 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2303 #ifndef NDEBUG
2304 m_disconnect_started = true;
2305 #endif
2307 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2308 switch (error)
2310 case 0:
2311 (*m_logger) << "*** CONNECTION CLOSED " << message << "\n";
2312 break;
2313 case 1:
2314 (*m_logger) << "*** CONNECTION FAILED " << message << "\n";
2315 break;
2316 case 2:
2317 (*m_logger) << "*** PEER ERROR " << message << "\n";
2318 break;
2320 #endif
2321 // we cannot do this in a constructor
2322 TORRENT_ASSERT(m_in_constructor == false);
2323 if (error > 0) m_failed = true;
2324 if (m_disconnecting) return;
2325 boost::intrusive_ptr<peer_connection> me(this);
2327 INVARIANT_CHECK;
2329 if (m_connecting && m_connection_ticket >= 0)
2331 m_ses.m_half_open.done(m_connection_ticket);
2332 m_connection_ticket = -1;
2335 boost::shared_ptr<torrent> t = m_torrent.lock();
2336 torrent_handle handle;
2337 if (t) handle = t->get_handle();
2339 if (message)
2341 if (error > 1 && m_ses.m_alerts.should_post<peer_error_alert>())
2343 m_ses.m_alerts.post_alert(
2344 peer_error_alert(handle, remote(), pid(), message));
2346 else if (error <= 1 && m_ses.m_alerts.should_post<peer_disconnected_alert>())
2348 m_ses.m_alerts.post_alert(
2349 peer_disconnected_alert(handle, remote(), pid(), message));
2353 if (t)
2355 // make sure we keep all the stats!
2356 calc_ip_overhead();
2357 t->add_stats(statistics());
2359 if (t->has_picker())
2361 piece_picker& picker = t->picker();
2363 while (!m_download_queue.empty())
2365 picker.abort_download(m_download_queue.back().block);
2366 m_download_queue.pop_back();
2368 while (!m_request_queue.empty())
2370 picker.abort_download(m_request_queue.back());
2371 m_request_queue.pop_back();
2375 t->remove_peer(this);
2376 m_torrent.reset();
2379 #ifndef NDEBUG
2380 // since this connection doesn't have a torrent reference
2381 // no torrent should have a reference to this connection either
2382 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
2383 , end(m_ses.m_torrents.end()); i != end; ++i)
2384 TORRENT_ASSERT(!i->second->has_peer(this));
2385 #endif
2387 m_disconnecting = true;
2388 error_code ec;
2389 m_socket->close(ec);
2390 m_ses.close_connection(this, message);
2393 void peer_connection::set_upload_limit(int limit)
2395 TORRENT_ASSERT(limit >= -1);
2396 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2397 if (limit < 10) limit = 10;
2398 m_upload_limit = limit;
2399 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2402 void peer_connection::set_download_limit(int limit)
2404 TORRENT_ASSERT(limit >= -1);
2405 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2406 if (limit < 10) limit = 10;
2407 m_download_limit = limit;
2408 m_bandwidth_limit[download_channel].throttle(m_download_limit);
2411 size_type peer_connection::share_diff() const
2413 INVARIANT_CHECK;
2415 boost::shared_ptr<torrent> t = m_torrent.lock();
2416 TORRENT_ASSERT(t);
2418 float ratio = t->ratio();
2420 // if we have an infinite ratio, just say we have downloaded
2421 // much more than we have uploaded. And we'll keep uploading.
2422 if (ratio == 0.f)
2423 return (std::numeric_limits<size_type>::max)();
2425 return m_free_upload
2426 + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
2427 - m_statistics.total_payload_upload();
2430 // defined in upnp.cpp
2431 bool is_local(address const& a);
2433 bool peer_connection::on_local_network() const
2435 if (libtorrent::is_local(m_remote.address())
2436 || is_loopback(m_remote.address())) return true;
2437 return false;
2440 void peer_connection::get_peer_info(peer_info& p) const
2442 TORRENT_ASSERT(!associated_torrent().expired());
2444 ptime now = time_now();
2446 p.download_rate_peak = m_download_rate_peak;
2447 p.upload_rate_peak = m_upload_rate_peak;
2448 p.rtt = m_rtt;
2449 p.down_speed = statistics().download_rate();
2450 p.up_speed = statistics().upload_rate();
2451 p.payload_down_speed = statistics().download_payload_rate();
2452 p.payload_up_speed = statistics().upload_payload_rate();
2453 p.pid = pid();
2454 p.ip = remote();
2455 p.pending_disk_bytes = m_outstanding_writing_bytes;
2456 p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
2457 p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
2458 if (m_download_queue.empty()) p.request_timeout = -1;
2459 else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
2460 + m_timeout_extend;
2461 #ifndef TORRENT_DISABLE_GEO_IP
2462 p.inet_as_name = m_inet_as_name;
2463 #endif
2465 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2466 p.country[0] = m_country[0];
2467 p.country[1] = m_country[1];
2468 #endif
2470 p.total_download = statistics().total_payload_download();
2471 p.total_upload = statistics().total_payload_upload();
2473 if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
2474 p.upload_limit = -1;
2475 else
2476 p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
2478 if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
2479 p.download_limit = -1;
2480 else
2481 p.download_limit = m_bandwidth_limit[download_channel].throttle();
2483 p.load_balancing = total_free_upload();
2485 p.download_queue_length = int(download_queue().size() + m_request_queue.size());
2486 p.requests_in_buffer = int(m_requests_in_buffer.size());
2487 p.target_dl_queue_length = int(desired_queue_size());
2488 p.upload_queue_length = int(upload_queue().size());
2490 if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
2492 p.downloading_piece_index = ret->piece_index;
2493 p.downloading_block_index = ret->block_index;
2494 p.downloading_progress = ret->bytes_downloaded;
2495 p.downloading_total = ret->full_block_bytes;
2497 else
2499 p.downloading_piece_index = -1;
2500 p.downloading_block_index = -1;
2501 p.downloading_progress = 0;
2502 p.downloading_total = 0;
2505 p.pieces = get_bitfield();
2506 p.last_request = now - m_last_request;
2507 p.last_active = now - (std::max)(m_last_sent, m_last_receive);
2509 // this will set the flags so that we can update them later
2510 p.flags = 0;
2511 get_specific_peer_info(p);
2513 p.flags |= is_seed() ? peer_info::seed : 0;
2514 p.flags |= m_snubbed ? peer_info::snubbed : 0;
2515 p.flags |= m_upload_only ? peer_info::upload_only : 0;
2516 if (peer_info_struct())
2518 policy::peer* pi = peer_info_struct();
2519 p.source = pi->source;
2520 p.failcount = pi->failcount;
2521 p.num_hashfails = pi->hashfails;
2522 p.flags |= pi->on_parole ? peer_info::on_parole : 0;
2523 p.flags |= pi->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
2524 #ifndef TORRENT_DISABLE_GEO_IP
2525 p.inet_as = pi->inet_as->first;
2526 #endif
2528 else
2530 p.source = 0;
2531 p.failcount = 0;
2532 p.num_hashfails = 0;
2533 p.remote_dl_rate = 0;
2534 #ifndef TORRENT_DISABLE_GEO_IP
2535 p.inet_as = 0xffff;
2536 #endif
2539 p.remote_dl_rate = m_remote_dl_rate;
2540 p.send_buffer_size = m_send_buffer.capacity();
2541 p.used_send_buffer = m_send_buffer.size();
2542 p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size;
2543 p.used_receive_buffer = m_recv_pos;
2544 p.write_state = m_channel_state[upload_channel];
2545 p.read_state = m_channel_state[download_channel];
2548 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2549 // end of the current receive buffer with it. i.e. the receive pos
2550 // must be <= packet_size - disk_buffer_size
2551 // the disk buffer can be accessed through release_disk_receive_buffer()
2552 // when it is queried, the responsibility to free it is transferred
2553 // to the caller
2554 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
2556 INVARIANT_CHECK;
2558 TORRENT_ASSERT(m_packet_size > 0);
2559 TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size);
2560 TORRENT_ASSERT(!m_disk_recv_buffer);
2561 TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
2563 if (disk_buffer_size > 16 * 1024)
2565 disconnect("invalid piece size", 2);
2566 return false;
2569 m_disk_recv_buffer.reset(m_ses.allocate_disk_buffer());
2570 if (!m_disk_recv_buffer)
2572 disconnect("out of memory");
2573 return false;
2575 m_disk_recv_buffer_size = disk_buffer_size;
2576 return true;
2579 char* peer_connection::release_disk_receive_buffer()
2581 m_disk_recv_buffer_size = 0;
2582 return m_disk_recv_buffer.release();
2585 void peer_connection::cut_receive_buffer(int size, int packet_size)
2587 INVARIANT_CHECK;
2589 TORRENT_ASSERT(packet_size > 0);
2590 TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
2591 TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
2592 TORRENT_ASSERT(m_recv_pos >= size);
2594 if (size > 0)
2595 std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
2597 m_recv_pos -= size;
2599 #ifndef NDEBUG
2600 std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
2601 #endif
2603 m_packet_size = packet_size;
2606 void peer_connection::calc_ip_overhead()
2608 m_statistics.calc_ip_overhead();
2611 void peer_connection::second_tick(float tick_interval)
2613 ptime now(time_now());
2614 boost::intrusive_ptr<peer_connection> me(self());
2616 // the invariant check must be run before me is destructed
2617 // in case the peer got disconnected
2618 INVARIANT_CHECK;
2620 boost::shared_ptr<torrent> t = m_torrent.lock();
2621 if (!t || m_disconnecting)
2623 m_ses.m_half_open.done(m_connection_ticket);
2624 m_connecting = false;
2625 disconnect("torrent aborted");
2626 return;
2629 on_tick();
2631 #ifndef TORRENT_DISABLE_EXTENSIONS
2632 for (extension_list_t::iterator i = m_extensions.begin()
2633 , end(m_extensions.end()); i != end; ++i)
2635 (*i)->tick();
2637 if (is_disconnecting()) return;
2638 #endif
2640 // if the peer hasn't said a thing for a certain
2641 // time, it is considered to have timed out
2642 time_duration d;
2643 d = now - m_last_receive;
2644 if (d > seconds(m_timeout) && !m_connecting)
2646 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2647 (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
2648 << total_seconds(d) << " seconds ago ] ***\n";
2649 #endif
2650 disconnect("timed out: inactivity");
2651 return;
2654 // do not stall waiting for a handshake
2655 if (!m_connecting
2656 && in_handshake()
2657 && d > seconds(m_ses.settings().handshake_timeout))
2659 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2660 (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
2661 << total_seconds(d) << " seconds ] ***\n";
2662 #endif
2663 disconnect("timed out: no handshake");
2664 return;
2667 // disconnect peers that we unchoked, but
2668 // they didn't send a request within 20 seconds.
2669 // but only if we're a seed
2670 d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
2671 if (!m_connecting
2672 && m_requests.empty()
2673 && !m_choked
2674 && m_peer_interested
2675 && t && t->is_finished()
2676 && d > seconds(20))
2678 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2679 (*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
2680 << total_seconds(d) << " ] ***\n";
2681 #endif
2682 disconnect("timed out: no request when unchoked");
2683 return;
2686 // if the peer hasn't become interested and we haven't
2687 // become interested in the peer for 10 minutes, it
2688 // has also timed out.
2689 time_duration d1;
2690 time_duration d2;
2691 d1 = now - m_became_uninterested;
2692 d2 = now - m_became_uninteresting;
2693 time_duration time_limit = seconds(
2694 m_ses.settings().inactivity_timeout);
2696 // don't bother disconnect peers we haven't been interested
2697 // in (and that hasn't been interested in us) for a while
2698 // unless we have used up all our connection slots
2699 if (!m_interesting
2700 && !m_peer_interested
2701 && d1 > time_limit
2702 && d2 > time_limit
2703 && (m_ses.num_connections() >= m_ses.max_connections()
2704 || (t && t->num_peers() >= t->max_connections())))
2706 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2707 (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2708 "t1: " << total_seconds(d1) << " | "
2709 "t2: " << total_seconds(d2) << " ] ***\n";
2710 #endif
2711 disconnect("timed out: no interest");
2712 return;
2715 if (!m_download_queue.empty()
2716 && now > m_requested + seconds(m_ses.settings().request_timeout
2717 + m_timeout_extend))
2719 snub_peer();
2722 // if we haven't sent something in too long, send a keep-alive
2723 keep_alive();
2725 m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
2726 && on_local_network();
2728 m_statistics.second_tick(tick_interval);
2730 if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
2732 m_upload_rate_peak = m_statistics.upload_payload_rate();
2734 if (m_statistics.download_payload_rate() > m_download_rate_peak)
2736 m_download_rate_peak = m_statistics.download_payload_rate();
2737 #ifndef TORRENT_DISABLE_GEO_IP
2738 if (peer_info_struct())
2740 std::pair<const int, int>* as_stats = peer_info_struct()->inet_as;
2741 if (as_stats && as_stats->second < m_download_rate_peak)
2742 as_stats->second = m_download_rate_peak;
2744 #endif
2746 if (is_disconnecting()) return;
2748 if (!t->valid_metadata()) return;
2750 // calculate the desired download queue size
2751 const float queue_time = m_ses.settings().request_queue_time;
2752 // (if the latency is more than this, the download will stall)
2753 // so, the queue size is queue_time * down_rate / 16 kiB
2754 // (16 kB is the size of each request)
2755 // the minimum number of requests is 2 and the maximum is 48
2756 // the block size doesn't have to be 16. So we first query the
2757 // torrent for it
2758 const int block_size = m_request_large_blocks
2759 ? t->torrent_file().piece_length() : t->block_size();
2760 TORRENT_ASSERT(block_size > 0);
2762 if (m_snubbed)
2764 m_desired_queue_size = 1;
2766 else
2768 m_desired_queue_size = static_cast<int>(queue_time
2769 * statistics().download_rate() / block_size);
2770 if (m_desired_queue_size > m_max_out_request_queue)
2771 m_desired_queue_size = m_max_out_request_queue;
2772 if (m_desired_queue_size < min_request_queue)
2773 m_desired_queue_size = min_request_queue;
2775 if (m_desired_queue_size == m_max_out_request_queue
2776 && t->alerts().should_post<performance_alert>())
2778 t->alerts().post_alert(performance_alert(t->get_handle()
2779 , performance_alert::outstanding_request_limit_reached));
2783 if (!m_download_queue.empty()
2784 && now - m_last_piece > seconds(m_ses.settings().piece_timeout
2785 + m_timeout_extend))
2787 // this peer isn't sending the pieces we've
2788 // requested (this has been observed by BitComet)
2789 // in this case we'll clear our download queue and
2790 // re-request the blocks.
2791 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2792 (*m_logger) << time_now_string()
2793 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
2794 << " " << total_seconds(now - m_last_piece) << "] ***\n";
2795 #endif
2797 snub_peer();
2800 // If the client sends more data
2801 // we send it data faster, otherwise, slower.
2802 // It will also depend on how much data the
2803 // client has sent us. This is the mean to
2804 // maintain the share ratio given by m_ratio
2805 // with all peers.
2807 if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
2809 // if we have downloaded more than one piece more
2810 // than we have uploaded OR if we are a seed
2811 // have an unlimited upload rate
2812 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2814 else
2816 size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
2818 double break_even_time = 15; // seconds.
2819 size_type have_uploaded = m_statistics.total_payload_upload();
2820 size_type have_downloaded = m_statistics.total_payload_download();
2821 double download_speed = m_statistics.download_rate();
2823 size_type soon_downloaded =
2824 have_downloaded + (size_type)(download_speed * break_even_time*1.5);
2826 if (t->ratio() != 1.f)
2827 soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
2829 double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
2830 + bias) / break_even_time, double(m_upload_limit));
2832 upload_speed_limit = (std::min)(upload_speed_limit,
2833 (double)(std::numeric_limits<int>::max)());
2835 m_bandwidth_limit[upload_channel].throttle(
2836 (std::min)((std::max)((int)upload_speed_limit, 20)
2837 , m_upload_limit));
2840 // update once every minute
2841 if (now - m_remote_dl_update >= seconds(60))
2843 float factor = 0.6666666666667f;
2845 if (m_remote_dl_rate == 0) factor = 0.0f;
2847 m_remote_dl_rate = int((m_remote_dl_rate * factor) +
2848 ((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
2850 m_remote_bytes_dled = 0;
2851 m_remote_dl_update = now;
2854 fill_send_buffer();
2857 void peer_connection::snub_peer()
2859 INVARIANT_CHECK;
2861 boost::shared_ptr<torrent> t = m_torrent.lock();
2862 TORRENT_ASSERT(t);
2864 if (!m_snubbed)
2866 m_snubbed = true;
2867 if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
2869 m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
2870 , m_remote, m_peer_id));
2873 m_desired_queue_size = 1;
2875 if (on_parole())
2877 m_timeout_extend += m_ses.settings().request_timeout;
2878 return;
2880 if (!t->has_picker()) return;
2881 piece_picker& picker = t->picker();
2883 piece_block r(-1, -1);
2884 // time out the last request in the queue
2885 if (!m_request_queue.empty())
2887 r = m_request_queue.back();
2888 m_request_queue.pop_back();
2890 else
2892 TORRENT_ASSERT(!m_download_queue.empty());
2893 r = m_download_queue.back().block;
2895 // only time out a request if it blocks the piece
2896 // from being completed (i.e. no free blocks to
2897 // request from it)
2898 piece_picker::downloading_piece p;
2899 picker.piece_info(r.piece_index, p);
2900 int free_blocks = picker.blocks_in_piece(r.piece_index)
2901 - p.finished - p.writing - p.requested;
2902 if (free_blocks > 0)
2904 m_timeout_extend += m_ses.settings().request_timeout;
2905 return;
2908 if (m_ses.m_alerts.should_post<block_timeout_alert>())
2910 m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
2911 , remote(), pid(), r.block_index, r.piece_index));
2913 m_download_queue.pop_back();
2915 if (!m_download_queue.empty() || !m_request_queue.empty())
2916 m_timeout_extend += m_ses.settings().request_timeout;
2918 m_desired_queue_size = 2;
2919 request_a_block(*t, *this);
2920 m_desired_queue_size = 1;
2922 // abort the block after the new one has
2923 // been requested in order to prevent it from
2924 // picking the same block again, stalling the
2925 // same piece indefinitely.
2926 if (r != piece_block(-1, -1))
2927 picker.abort_download(r);
2929 send_block_requests();
2932 void peer_connection::fill_send_buffer()
2934 INVARIANT_CHECK;
2936 boost::shared_ptr<torrent> t = m_torrent.lock();
2937 if (!t) return;
2939 // only add new piece-chunks if the send buffer is small enough
2940 // otherwise there will be no end to how large it will be!
2942 int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
2943 if (buffer_size_watermark < 512) buffer_size_watermark = 512;
2944 else if (buffer_size_watermark > m_ses.settings().send_buffer_watermark)
2945 buffer_size_watermark = m_ses.settings().send_buffer_watermark;
2947 while (!m_requests.empty()
2948 && (send_buffer_size() + m_reading_bytes < buffer_size_watermark))
2950 TORRENT_ASSERT(t->valid_metadata());
2951 peer_request& r = m_requests.front();
2953 TORRENT_ASSERT(r.piece >= 0);
2954 TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
2955 TORRENT_ASSERT(t->have_piece(r.piece));
2956 TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
2957 TORRENT_ASSERT(r.length > 0 && r.start >= 0);
2959 t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
2960 , self(), _1, _2, r));
2961 m_reading_bytes += r.length;
2963 m_requests.erase(m_requests.begin());
2967 void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
2969 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2971 m_reading_bytes -= r.length;
2973 disk_buffer_holder buffer(m_ses, j.buffer);
2975 if (ret != r.length || m_torrent.expired())
2977 boost::shared_ptr<torrent> t = m_torrent.lock();
2978 if (!t)
2980 disconnect(j.str.c_str());
2981 return;
2984 if (t->alerts().should_post<file_error_alert>())
2985 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
2986 t->set_error(j.str);
2987 t->pause();
2988 return;
2991 #ifdef TORRENT_VERBOSE_LOGGING
2992 (*m_logger) << time_now_string()
2993 << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
2994 << " | l: " << r.length << " ]\n";
2995 #endif
2997 write_piece(r, buffer);
2998 setup_send();
3001 void peer_connection::assign_bandwidth(int channel, int amount)
3003 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3005 #ifdef TORRENT_VERBOSE_LOGGING
3006 (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
3007 #endif
3009 m_bandwidth_limit[channel].assign(amount);
3010 TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global);
3011 m_channel_state[channel] = peer_info::bw_idle;
3012 if (channel == upload_channel)
3014 setup_send();
3016 else if (channel == download_channel)
3018 setup_receive();
3022 void peer_connection::expire_bandwidth(int channel, int amount)
3024 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3026 m_bandwidth_limit[channel].expire(amount);
3027 if (channel == upload_channel)
3029 setup_send();
3031 else if (channel == download_channel)
3033 setup_receive();
3037 void peer_connection::setup_send()
3039 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3041 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3043 shared_ptr<torrent> t = m_torrent.lock();
3045 if (m_bandwidth_limit[upload_channel].quota_left() == 0
3046 && !m_send_buffer.empty()
3047 && !m_connecting
3048 && t
3049 && !m_ignore_bandwidth_limits)
3051 // in this case, we have data to send, but no
3052 // bandwidth. So, we simply request bandwidth
3053 // from the torrent
3054 TORRENT_ASSERT(t);
3055 if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
3057 int priority = is_interesting() * 2 + m_requests_in_buffer.size();
3058 // peers that we are not interested in are non-prioritized
3059 m_channel_state[upload_channel] = peer_info::bw_torrent;
3060 t->request_bandwidth(upload_channel, self()
3061 , m_send_buffer.size(), priority);
3062 #ifdef TORRENT_VERBOSE_LOGGING
3063 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3064 << priority << "]\n";
3065 #endif
3068 return;
3071 if (!can_write())
3073 #ifdef TORRENT_VERBOSE_LOGGING
3074 (*m_logger) << time_now_string() << " *** CANNOT WRITE ["
3075 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3076 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3077 " buf: " << m_send_buffer.size() <<
3078 " connecting: " << (m_connecting?"yes":"no") <<
3079 " ]\n";
3080 #endif
3081 return;
3084 // send the actual buffer
3085 if (!m_send_buffer.empty())
3087 int amount_to_send = m_send_buffer.size();
3088 int quota_left = m_bandwidth_limit[upload_channel].quota_left();
3089 if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
3090 amount_to_send = quota_left;
3092 TORRENT_ASSERT(amount_to_send > 0);
3094 #ifdef TORRENT_VERBOSE_LOGGING
3095 (*m_logger) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send << " ]\n";
3096 #endif
3097 std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
3098 m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
3100 m_channel_state[upload_channel] = peer_info::bw_network;
3104 void peer_connection::setup_receive()
3106 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3108 INVARIANT_CHECK;
3110 if (m_channel_state[download_channel] != peer_info::bw_idle) return;
3112 shared_ptr<torrent> t = m_torrent.lock();
3114 if (m_bandwidth_limit[download_channel].quota_left() == 0
3115 && !m_connecting
3116 && t
3117 && !m_ignore_bandwidth_limits)
3119 if (m_bandwidth_limit[download_channel].max_assignable() > 0)
3121 #ifdef TORRENT_VERBOSE_LOGGING
3122 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3123 #endif
3124 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
3125 m_channel_state[download_channel] = peer_info::bw_torrent;
3126 t->request_bandwidth(download_channel, self()
3127 , m_download_queue.size() * 16 * 1024 + 30, m_priority);
3129 return;
3132 if (!can_read())
3134 #ifdef TORRENT_VERBOSE_LOGGING
3135 (*m_logger) << time_now_string() << " *** CANNOT READ ["
3136 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3137 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3138 " outstanding: " << m_outstanding_writing_bytes <<
3139 " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
3140 " ]\n";
3141 #endif
3142 return;
3145 TORRENT_ASSERT(m_packet_size > 0);
3146 int max_receive = m_packet_size - m_recv_pos;
3147 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3148 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3149 max_receive = quota_left;
3151 if (max_receive == 0) return;
3153 TORRENT_ASSERT(m_recv_pos >= 0);
3154 TORRENT_ASSERT(m_packet_size > 0);
3155 TORRENT_ASSERT(can_read());
3156 #ifdef TORRENT_VERBOSE_LOGGING
3157 (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
3158 #endif
3160 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3162 if (int(m_recv_buffer.size()) < regular_buffer_size)
3163 m_recv_buffer.resize(regular_buffer_size);
3165 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3167 // only receive into regular buffer
3168 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3169 m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3170 , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
3172 else if (m_recv_pos >= regular_buffer_size)
3174 // only receive into disk buffer
3175 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3176 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3177 m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size
3178 , max_receive)
3179 , bind(&peer_connection::on_receive_data, self(), _1, _2));
3181 else
3183 // receive into both regular and disk buffer
3184 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3185 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3186 TORRENT_ASSERT(max_receive - regular_buffer_size
3187 + m_recv_pos <= m_disk_recv_buffer_size);
3189 boost::array<asio::mutable_buffer, 2> vec;
3190 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3191 , regular_buffer_size - m_recv_pos);
3192 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3193 , max_receive - regular_buffer_size + m_recv_pos);
3194 m_socket->async_read_some(vec, bind(&peer_connection::on_receive_data
3195 , self(), _1, _2));
3197 m_channel_state[download_channel] = peer_info::bw_network;
3200 #ifndef TORRENT_DISABLE_ENCRYPTION
3202 // returns the last 'bytes' from the receive buffer
3203 std::pair<buffer::interval, buffer::interval> peer_connection::wr_recv_buffers(int bytes)
3205 TORRENT_ASSERT(bytes <= m_recv_pos);
3207 std::pair<buffer::interval, buffer::interval> vec;
3208 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3209 TORRENT_ASSERT(regular_buffer_size >= 0);
3210 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos)
3212 vec.first = buffer::interval(&m_recv_buffer[0]
3213 + m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_pos);
3214 vec.second = buffer::interval(0,0);
3216 else if (m_recv_pos - bytes >= regular_buffer_size)
3218 vec.first = buffer::interval(m_disk_recv_buffer.get() + m_recv_pos
3219 - regular_buffer_size - bytes, m_disk_recv_buffer.get() + m_recv_pos
3220 - regular_buffer_size);
3221 vec.second = buffer::interval(0,0);
3223 else
3225 TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size);
3226 TORRENT_ASSERT(m_recv_pos > regular_buffer_size);
3227 vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_pos - bytes
3228 , &m_recv_buffer[0] + regular_buffer_size);
3229 vec.second = buffer::interval(m_disk_recv_buffer.get()
3230 , m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size);
3232 TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes);
3233 return vec;
3235 #endif
3237 void peer_connection::reset_recv_buffer(int packet_size)
3239 TORRENT_ASSERT(packet_size > 0);
3240 if (m_recv_pos > m_packet_size)
3242 cut_receive_buffer(m_packet_size, packet_size);
3243 return;
3245 m_recv_pos = 0;
3246 m_packet_size = packet_size;
3249 void peer_connection::send_buffer(char const* buf, int size, int flags)
3251 if (flags == message_type_request)
3252 m_requests_in_buffer.push_back(m_send_buffer.size() + size);
3254 int free_space = m_send_buffer.space_in_last_buffer();
3255 if (free_space > size) free_space = size;
3256 if (free_space > 0)
3258 m_send_buffer.append(buf, free_space);
3259 size -= free_space;
3260 buf += free_space;
3261 #ifdef TORRENT_STATS
3262 m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
3263 << free_space << std::endl;
3264 m_ses.log_buffer_usage();
3265 #endif
3267 if (size <= 0) return;
3269 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3270 if (buffer.first == 0)
3272 disconnect("out of memory");
3273 return;
3275 TORRENT_ASSERT(buffer.second >= size);
3276 std::memcpy(buffer.first, buf, size);
3277 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3278 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3279 #ifdef TORRENT_STATS
3280 m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
3281 m_ses.log_buffer_usage();
3282 #endif
3283 setup_send();
3286 // TODO: change this interface to automatically call setup_send() when the
3287 // return value is destructed
3288 buffer::interval peer_connection::allocate_send_buffer(int size)
3290 TORRENT_ASSERT(size > 0);
3291 char* insert = m_send_buffer.allocate_appendix(size);
3292 if (insert == 0)
3294 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3295 if (buffer.first == 0)
3297 disconnect("out of memory");
3298 return buffer::interval(0, 0);
3300 TORRENT_ASSERT(buffer.second >= size);
3301 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3302 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3303 buffer::interval ret(buffer.first, buffer.first + size);
3304 #ifdef TORRENT_STATS
3305 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
3306 m_ses.log_buffer_usage();
3307 #endif
3308 return ret;
3310 else
3312 #ifdef TORRENT_STATS
3313 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
3314 m_ses.log_buffer_usage();
3315 #endif
3316 buffer::interval ret(insert, insert + size);
3317 return ret;
3321 template<class T>
3322 struct set_to_zero
3324 set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
3325 void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
3326 ~set_to_zero() { if (m_cond) m_val = 0; }
3327 private:
3328 T& m_val;
3329 bool m_cond;
3332 // --------------------------
3333 // RECEIVE DATA
3334 // --------------------------
3336 // throws exception when the client should be disconnected
3337 void peer_connection::on_receive_data(const error_code& error
3338 , std::size_t bytes_transferred)
3340 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3342 INVARIANT_CHECK;
3344 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_network);
3345 m_channel_state[download_channel] = peer_info::bw_idle;
3347 if (error)
3349 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3350 (*m_logger) << time_now_string() << " **ERROR**: "
3351 << error.message() << "[in peer_connection::on_receive_data]\n";
3352 #endif
3353 on_receive(error, bytes_transferred);
3354 disconnect(error.message().c_str());
3355 return;
3358 int max_receive = 0;
3361 #ifdef TORRENT_VERBOSE_LOGGING
3362 (*m_logger) << "read " << bytes_transferred << " bytes\n";
3363 #endif
3364 // correct the dl quota usage, if not all of the buffer was actually read
3365 if (!m_ignore_bandwidth_limits)
3366 m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
3368 if (m_disconnecting) return;
3370 TORRENT_ASSERT(m_packet_size > 0);
3371 TORRENT_ASSERT(bytes_transferred > 0);
3373 m_last_receive = time_now();
3374 m_recv_pos += bytes_transferred;
3375 TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()
3376 + m_disk_recv_buffer_size));
3378 on_receive(error, bytes_transferred);
3380 TORRENT_ASSERT(m_packet_size > 0);
3382 if (m_peer_choked
3383 && m_recv_pos == 0
3384 && (m_recv_buffer.capacity() - m_packet_size) > 128)
3386 buffer(m_packet_size).swap(m_recv_buffer);
3389 max_receive = m_packet_size - m_recv_pos;
3390 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3391 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3392 max_receive = quota_left;
3394 if (max_receive == 0) break;
3396 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3398 if (int(m_recv_buffer.size()) < regular_buffer_size)
3399 m_recv_buffer.resize(regular_buffer_size);
3401 error_code ec;
3402 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3404 // only receive into regular buffer
3405 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3406 bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3407 , max_receive), ec);
3409 else if (m_recv_pos >= regular_buffer_size)
3411 // only receive into disk buffer
3412 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3413 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3414 bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get()
3415 + m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
3416 - m_recv_pos, max_receive)), ec);
3418 else
3420 // receive into both regular and disk buffer
3421 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3422 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3423 TORRENT_ASSERT(max_receive - regular_buffer_size
3424 + m_recv_pos <= m_disk_recv_buffer_size);
3426 boost::array<asio::mutable_buffer, 2> vec;
3427 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3428 , regular_buffer_size - m_recv_pos);
3429 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3430 , (std::min)(m_disk_recv_buffer_size
3431 , max_receive - regular_buffer_size + m_recv_pos));
3432 bytes_transferred = m_socket->read_some(vec, ec);
3434 if (ec && ec != asio::error::would_block)
3436 disconnect(ec.message().c_str());
3437 return;
3439 if (ec == asio::error::would_block) break;
3441 while (bytes_transferred > 0);
3443 setup_receive();
3446 bool peer_connection::can_write() const
3448 // if we have requests or pending data to be sent or announcements to be made
3449 // we want to send data
3450 return !m_send_buffer.empty()
3451 && (m_bandwidth_limit[upload_channel].quota_left() > 0
3452 || m_ignore_bandwidth_limits)
3453 && !m_connecting;
3456 bool peer_connection::can_read() const
3458 bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
3459 || m_ignore_bandwidth_limits)
3460 && !m_connecting
3461 && m_outstanding_writing_bytes <
3462 m_ses.settings().max_outstanding_disk_bytes_per_connection;
3464 return ret;
3467 void peer_connection::connect(int ticket)
3469 INVARIANT_CHECK;
3471 error_code ec;
3472 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3473 (*m_ses.m_logger) << time_now_string() << " CONNECTING: " << m_remote.address().to_string(ec)
3474 << ":" << m_remote.port() << "\n";
3475 #endif
3477 m_connection_ticket = ticket;
3478 boost::shared_ptr<torrent> t = m_torrent.lock();
3480 m_queued = false;
3481 TORRENT_ASSERT(m_connecting);
3483 if (!t)
3485 disconnect("torrent aborted");
3486 return;
3489 m_socket->open(t->get_interface().protocol(), ec);
3490 if (ec)
3492 disconnect(ec.message().c_str());
3493 return;
3496 // set the socket to non-blocking, so that we can
3497 // read the entire buffer on each read event we get
3498 tcp::socket::non_blocking_io ioc(true);
3499 m_socket->io_control(ioc, ec);
3500 if (ec)
3502 disconnect(ec.message().c_str());
3503 return;
3505 m_socket->bind(t->get_interface(), ec);
3506 if (ec)
3508 disconnect(ec.message().c_str());
3509 return;
3511 m_socket->async_connect(m_remote
3512 , bind(&peer_connection::on_connection_complete, self(), _1));
3513 m_connect = time_now();
3515 if (t->alerts().should_post<peer_connect_alert>())
3517 t->alerts().post_alert(peer_connect_alert(
3518 t->get_handle(), remote(), pid()));
3522 void peer_connection::on_connection_complete(error_code const& e)
3524 ptime completed = time_now();
3526 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3528 INVARIANT_CHECK;
3530 m_rtt = total_milliseconds(completed - m_connect);
3532 if (m_disconnecting) return;
3534 m_connecting = false;
3535 m_ses.m_half_open.done(m_connection_ticket);
3537 if (e)
3539 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3540 (*m_ses.m_logger) << time_now_string() << " CONNECTION FAILED: " << m_remote.address().to_string()
3541 << ": " << e.message() << "\n";
3542 #endif
3543 disconnect(e.message().c_str(), 1);
3544 return;
3547 if (m_disconnecting) return;
3548 m_last_receive = time_now();
3550 // this means the connection just succeeded
3552 TORRENT_ASSERT(m_socket);
3553 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3554 (*m_ses.m_logger) << time_now_string() << " COMPLETED: " << m_remote.address().to_string()
3555 << " rtt = " << m_rtt << "\n";
3556 #endif
3558 error_code ec;
3559 if (m_remote == m_socket->local_endpoint(ec))
3561 // if the remote endpoint is the same as the local endpoint, we're connected
3562 // to ourselves
3563 disconnect("connected to ourselves", 1);
3564 return;
3567 if (m_remote.address().is_v4())
3569 error_code ec;
3570 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
3573 on_connected();
3574 setup_send();
3575 setup_receive();
3578 // --------------------------
3579 // SEND DATA
3580 // --------------------------
3582 // throws exception when the client should be disconnected
3583 void peer_connection::on_send_data(error_code const& error
3584 , std::size_t bytes_transferred)
3586 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3588 INVARIANT_CHECK;
3590 TORRENT_ASSERT(m_channel_state[upload_channel] == peer_info::bw_network);
3592 m_send_buffer.pop_front(bytes_transferred);
3594 for (std::vector<int>::iterator i = m_requests_in_buffer.begin()
3595 , end(m_requests_in_buffer.end()); i != end; ++i)
3596 *i -= bytes_transferred;
3598 while (!m_requests_in_buffer.empty()
3599 && m_requests_in_buffer.front() <= 0)
3600 m_requests_in_buffer.erase(m_requests_in_buffer.begin());
3602 m_channel_state[upload_channel] = peer_info::bw_idle;
3604 if (!m_ignore_bandwidth_limits)
3605 m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
3607 #ifdef TORRENT_VERBOSE_LOGGING
3608 (*m_logger) << "wrote " << bytes_transferred << " bytes\n";
3609 #endif
3611 if (error)
3613 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3614 (*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
3615 #endif
3616 disconnect(error.message().c_str());
3617 return;
3619 if (m_disconnecting) return;
3621 TORRENT_ASSERT(!m_connecting);
3622 TORRENT_ASSERT(bytes_transferred > 0);
3624 m_last_sent = time_now();
3626 on_sent(error, bytes_transferred);
3627 fill_send_buffer();
3629 setup_send();
3632 #ifndef NDEBUG
3633 void peer_connection::check_invariant() const
3635 TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
3637 boost::shared_ptr<torrent> t = m_torrent.lock();
3638 if (m_disconnecting)
3640 TORRENT_ASSERT(!t);
3641 TORRENT_ASSERT(m_disconnect_started);
3643 else if (!m_in_constructor)
3645 TORRENT_ASSERT(m_ses.has_peer((peer_connection*)this));
3649 // this assertion correct most of the time, but sometimes right when the
3650 // limit is changed it might break
3651 for (int i = 0; i < 2; ++i)
3653 // this peer is in the bandwidth history iff max_assignable < limit
3654 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3655 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3656 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3660 if (m_channel_state[download_channel] == peer_info::bw_torrent
3661 || m_channel_state[download_channel] == peer_info::bw_global)
3662 TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0);
3663 if (m_channel_state[upload_channel] == peer_info::bw_torrent
3664 || m_channel_state[upload_channel] == peer_info::bw_global)
3665 TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
3667 std::set<piece_block> unique;
3668 std::transform(m_download_queue.begin(), m_download_queue.end()
3669 , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
3670 std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin()));
3671 TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
3672 if (m_peer_info)
3674 TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0);
3675 TORRENT_ASSERT(m_peer_info->prev_amount_download == 0);
3676 TORRENT_ASSERT(m_peer_info->connection == this
3677 || m_peer_info->connection == 0);
3679 if (m_peer_info->optimistically_unchoked)
3680 TORRENT_ASSERT(!is_choked());
3683 if (!t)
3685 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3686 // since this connection doesn't have a torrent reference
3687 // no torrent should have a reference to this connection either
3688 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
3689 , end(m_ses.m_torrents.end()); i != end; ++i)
3690 TORRENT_ASSERT(!i->second->has_peer((peer_connection*)this));
3691 #endif
3692 return;
3695 if (m_ses.settings().close_redundant_connections)
3697 // make sure upload only peers are disconnected
3698 if (t->is_finished() && m_upload_only)
3699 TORRENT_ASSERT(m_disconnect_started);
3700 if (m_upload_only
3701 && !m_interesting
3702 && m_bitfield_received
3703 && t->are_files_checked())
3704 TORRENT_ASSERT(m_disconnect_started);
3707 if (t->is_finished())
3708 TORRENT_ASSERT(!m_interesting);
3709 if (is_seed())
3710 TORRENT_ASSERT(m_upload_only);
3712 if (t->has_picker())
3714 std::map<piece_block, int> num_requests;
3715 for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i)
3717 // make sure this peer is not a dangling pointer
3718 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3719 TORRENT_ASSERT(m_ses.has_peer(*i));
3720 #endif
3721 peer_connection const& p = *(*i);
3722 for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
3723 , end(p.request_queue().end()); i != end; ++i)
3724 ++num_requests[*i];
3725 for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
3726 , end(p.download_queue().end()); i != end; ++i)
3727 ++num_requests[i->block];
3729 for (std::map<piece_block, int>::iterator i = num_requests.begin()
3730 , end(num_requests.end()); i != end; ++i)
3732 if (!t->picker().is_downloaded(i->first))
3733 TORRENT_ASSERT(t->picker().num_peers(i->first) == i->second);
3736 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3737 if (m_peer_info)
3739 policy::const_iterator i = t->get_policy().begin_peer();
3740 policy::const_iterator end = t->get_policy().end_peer();
3741 for (; i != end; ++i)
3743 if (&i->second == m_peer_info) break;
3745 TORRENT_ASSERT(i != end);
3747 #endif
3748 if (t->has_picker() && !t->is_aborted())
3750 // make sure that pieces that have completed the download
3751 // of all their blocks are in the disk io thread's queue
3752 // to be checked.
3753 const std::vector<piece_picker::downloading_piece>& dl_queue
3754 = t->picker().get_download_queue();
3755 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3756 dl_queue.begin(); i != dl_queue.end(); ++i)
3758 const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
3760 bool complete = true;
3761 for (int j = 0; j < blocks_per_piece; ++j)
3763 if (i->info[j].state == piece_picker::block_info::state_finished)
3764 continue;
3765 complete = false;
3766 break;
3769 // this invariant is not valid anymore since the completion event
3770 // might be queued in the io service
3771 if (complete && !piece_failed)
3773 disk_io_job ret = m_ses.m_disk_thread.find_job(
3774 &t->filesystem(), -1, i->index);
3775 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3776 TORRENT_ASSERT(ret.piece == i->index);
3781 // expensive when using checked iterators
3783 if (t->valid_metadata())
3785 int piece_count = std::count(m_have_piece.begin()
3786 , m_have_piece.end(), true);
3787 if (m_num_pieces != piece_count)
3789 TORRENT_ASSERT(false);
3794 // extremely expensive invariant check
3796 if (!t->is_seed())
3798 piece_picker& p = t->picker();
3799 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3800 const int blocks_per_piece = static_cast<int>(
3801 t->torrent_file().piece_length() / t->block_size());
3803 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3804 dlq.begin(); i != dlq.end(); ++i)
3806 for (int j = 0; j < blocks_per_piece; ++j)
3808 if (std::find(m_request_queue.begin(), m_request_queue.end()
3809 , piece_block(i->index, j)) != m_request_queue.end()
3811 std::find(m_download_queue.begin(), m_download_queue.end()
3812 , piece_block(i->index, j)) != m_download_queue.end())
3814 TORRENT_ASSERT(i->info[j].peer == m_remote);
3816 else
3818 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
3825 #endif
3827 peer_connection::peer_speed_t peer_connection::peer_speed()
3829 shared_ptr<torrent> t = m_torrent.lock();
3830 TORRENT_ASSERT(t);
3832 int download_rate = int(statistics().download_payload_rate());
3833 int torrent_download_rate = int(t->statistics().download_payload_rate());
3835 if (download_rate > 512 && download_rate > torrent_download_rate / 16)
3836 m_speed = fast;
3837 else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
3838 m_speed = medium;
3839 else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
3840 m_speed = medium;
3841 else if (download_rate < torrent_download_rate / 63 && m_speed == medium)
3842 m_speed = slow;
3844 return m_speed;
3847 void peer_connection::keep_alive()
3849 INVARIANT_CHECK;
3851 time_duration d;
3852 d = time_now() - m_last_sent;
3853 if (total_seconds(d) < m_timeout / 2) return;
3855 if (m_connecting) return;
3856 if (in_handshake()) return;
3858 // if the last send has not completed yet, do not send a keep
3859 // alive
3860 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3862 #ifdef TORRENT_VERBOSE_LOGGING
3863 (*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
3864 #endif
3866 m_last_sent = time_now();
3867 write_keepalive();
3870 bool peer_connection::is_seed() const
3872 // if m_num_pieces == 0, we probably don't have the
3873 // metadata yet.
3874 return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0;