fixed bug in windows path of file.cpp
[libtorrent.git] / src / peer_connection.cpp
blob95dd990b57beb960081ecfe01407d7414b969a71
1 /*
3 Copyright (c) 2003, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
35 #include <vector>
36 #include <iostream>
37 #include <iomanip>
38 #include <limits>
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
58 using boost::bind;
59 using boost::shared_ptr;
60 using libtorrent::aux::session_impl;
62 namespace libtorrent
64 // outbound connection
65 peer_connection::peer_connection(
66 session_impl& ses
67 , boost::weak_ptr<torrent> tor
68 , shared_ptr<socket_type> s
69 , tcp::endpoint const& endp
70 , policy::peer* peerinfo)
72 #ifndef NDEBUG
73 m_last_choke(time_now() - hours(1))
75 #endif
76 m_ses(ses)
77 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
85 , m_timeout_extend(0)
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
90 , m_free_upload(0)
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses, 0)
93 , m_socket(s)
94 , m_remote(endp)
95 , m_torrent(tor)
96 , m_num_pieces(0)
97 , m_timeout(m_ses.settings().peer_timeout)
98 , m_packet_size(0)
99 , m_recv_pos(0)
100 , m_disk_recv_buffer_size(0)
101 , m_reading_bytes(0)
102 , m_num_invalid_requests(0)
103 , m_priority(1)
104 , m_upload_limit(bandwidth_limit::inf)
105 , m_download_limit(bandwidth_limit::inf)
106 , m_peer_info(peerinfo)
107 , m_speed(slow)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
114 , m_rtt(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
118 , m_active(true)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
122 , m_choked(true)
123 , m_failed(false)
124 , m_ignore_bandwidth_limits(false)
125 , m_have_all(false)
126 , m_disconnecting(false)
127 , m_connecting(true)
128 , m_queued(true)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
131 , m_snubbed(false)
132 , m_bitfield_received(false)
133 #ifndef NDEBUG
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
136 #endif
138 m_channel_state[upload_channel] = peer_info::bw_idle;
139 m_channel_state[download_channel] = peer_info::bw_idle;
141 TORRENT_ASSERT(peerinfo == 0 || peerinfo->banned == false);
142 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
143 std::fill(m_country, m_country + 2, 0);
144 #ifndef TORRENT_DISABLE_GEO_IP
145 if (m_ses.has_country_db())
147 char const *country = m_ses.country_for_ip(m_remote.address());
148 if (country != 0)
150 m_country[0] = country[0];
151 m_country[1] = country[1];
154 #endif
155 #endif
156 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
157 m_logger = m_ses.create_log(m_remote.address().to_string() + "_"
158 + boost::lexical_cast<std::string>(m_remote.port()), m_ses.listen_port());
159 (*m_logger) << "*** OUTGOING CONNECTION\n";
160 #endif
161 #ifndef NDEBUG
162 piece_failed = false;
163 #endif
164 #ifndef TORRENT_DISABLE_GEO_IP
165 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
166 #endif
168 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
171 // incoming connection
172 peer_connection::peer_connection(
173 session_impl& ses
174 , shared_ptr<socket_type> s
175 , tcp::endpoint const& endp
176 , policy::peer* peerinfo)
178 #ifndef NDEBUG
179 m_last_choke(time_now() - hours(1))
181 #endif
182 m_ses(ses)
183 , m_max_out_request_queue(m_ses.settings().max_out_request_queue)
184 , m_last_piece(time_now())
185 , m_last_request(time_now())
186 , m_last_incoming_request(min_time())
187 , m_last_unchoke(min_time())
188 , m_last_receive(time_now())
189 , m_last_sent(time_now())
190 , m_requested(min_time())
191 , m_timeout_extend(0)
192 , m_remote_dl_update(time_now())
193 , m_connect(time_now())
194 , m_became_uninterested(time_now())
195 , m_became_uninteresting(time_now())
196 , m_free_upload(0)
197 , m_downloaded_at_last_unchoke(0)
198 , m_disk_recv_buffer(ses, 0)
199 , m_socket(s)
200 , m_remote(endp)
201 , m_num_pieces(0)
202 , m_timeout(m_ses.settings().peer_timeout)
203 , m_packet_size(0)
204 , m_recv_pos(0)
205 , m_disk_recv_buffer_size(0)
206 , m_reading_bytes(0)
207 , m_num_invalid_requests(0)
208 , m_priority(1)
209 , m_upload_limit(bandwidth_limit::inf)
210 , m_download_limit(bandwidth_limit::inf)
211 , m_peer_info(peerinfo)
212 , m_speed(slow)
213 , m_connection_ticket(-1)
214 , m_remote_bytes_dled(0)
215 , m_remote_dl_rate(0)
216 , m_outstanding_writing_bytes(0)
217 , m_download_rate_peak(0)
218 , m_upload_rate_peak(0)
219 , m_rtt(0)
220 , m_prefer_whole_pieces(0)
221 , m_desired_queue_size(2)
222 , m_fast_reconnect(false)
223 , m_active(false)
224 , m_peer_interested(false)
225 , m_peer_choked(true)
226 , m_interesting(false)
227 , m_choked(true)
228 , m_failed(false)
229 , m_ignore_bandwidth_limits(false)
230 , m_have_all(false)
231 , m_disconnecting(false)
232 , m_connecting(false)
233 , m_queued(false)
234 , m_request_large_blocks(false)
235 , m_upload_only(false)
236 , m_snubbed(false)
237 , m_bitfield_received(false)
238 #ifndef NDEBUG
239 , m_in_constructor(true)
240 , m_disconnect_started(false)
241 #endif
243 m_channel_state[upload_channel] = peer_info::bw_idle;
244 m_channel_state[download_channel] = peer_info::bw_idle;
246 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
247 std::fill(m_country, m_country + 2, 0);
248 #ifndef TORRENT_DISABLE_GEO_IP
249 if (m_ses.has_country_db())
251 char const *country = m_ses.country_for_ip(m_remote.address());
252 if (country != 0)
254 m_country[0] = country[0];
255 m_country[1] = country[1];
258 #endif
259 #endif
261 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
262 error_code ec;
263 TORRENT_ASSERT(m_socket->remote_endpoint(ec) == m_remote || ec);
264 m_logger = m_ses.create_log(remote().address().to_string(ec) + "_"
265 + boost::lexical_cast<std::string>(remote().port()), m_ses.listen_port());
266 (*m_logger) << "*** INCOMING CONNECTION\n";
267 #endif
269 #ifndef TORRENT_DISABLE_GEO_IP
270 m_inet_as_name = m_ses.as_name_for_ip(m_remote.address());
271 #endif
272 #ifndef NDEBUG
273 piece_failed = false;
274 #endif
275 std::fill(m_peer_id.begin(), m_peer_id.end(), 0);
278 bool peer_connection::unchoke_compare(boost::intrusive_ptr<peer_connection const> const& p) const
280 TORRENT_ASSERT(p);
281 peer_connection const& rhs = *p;
283 // first compare how many bytes they've sent us
284 size_type c1 = m_statistics.total_payload_download() - m_downloaded_at_last_unchoke;
285 size_type c2 = rhs.m_statistics.total_payload_download() - rhs.m_downloaded_at_last_unchoke;
286 if (c1 > c2) return true;
287 if (c1 < c2) return false;
289 // if they are equal, compare how much we have uploaded
290 if (m_peer_info) c1 = m_peer_info->total_upload();
291 else c1 = m_statistics.total_payload_upload();
292 if (rhs.m_peer_info) c2 = rhs.m_peer_info->total_upload();
293 else c2 = rhs.m_statistics.total_payload_upload();
295 return c1 < c2;
298 void peer_connection::reset_choke_counters()
300 m_downloaded_at_last_unchoke = m_statistics.total_payload_download();
303 void peer_connection::start()
305 TORRENT_ASSERT(m_peer_info == 0 || m_peer_info->connection == this);
306 boost::shared_ptr<torrent> t = m_torrent.lock();
308 if (!t)
310 tcp::socket::non_blocking_io ioc(true);
311 error_code ec;
312 m_socket->io_control(ioc, ec);
313 if (ec)
315 disconnect(ec.message().c_str());
316 return;
318 m_remote = m_socket->remote_endpoint(ec);
319 if (ec)
321 disconnect(ec.message().c_str());
322 return;
324 if (m_remote.address().is_v4())
325 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
327 else if (t->ready_for_connections())
329 init();
333 void peer_connection::update_interest()
335 boost::shared_ptr<torrent> t = m_torrent.lock();
336 TORRENT_ASSERT(t);
338 bool interested = false;
339 if (!t->is_finished())
341 piece_picker const& p = t->picker();
342 int num_pieces = p.num_pieces();
343 for (int j = 0; j != num_pieces; ++j)
345 if (!p.have_piece(j)
346 && t->piece_priority(j) > 0
347 && m_have_piece[j])
349 interested = true;
350 break;
356 if (!interested) send_not_interested();
357 else t->get_policy().peer_is_interesting(*this);
359 // may throw an asio error if socket has disconnected
360 catch (std::exception&) {}
362 TORRENT_ASSERT(is_interesting() == interested);
365 #ifndef TORRENT_DISABLE_EXTENSIONS
366 void peer_connection::add_extension(boost::shared_ptr<peer_plugin> ext)
368 m_extensions.push_back(ext);
370 #endif
372 void peer_connection::send_allowed_set()
374 INVARIANT_CHECK;
376 boost::shared_ptr<torrent> t = m_torrent.lock();
377 TORRENT_ASSERT(t);
379 int num_allowed_pieces = m_ses.settings().allowed_fast_set_size;
380 int num_pieces = t->torrent_file().num_pieces();
382 if (num_allowed_pieces >= num_pieces)
384 for (int i = 0; i < num_pieces; ++i)
386 #ifdef TORRENT_VERBOSE_LOGGING
387 (*m_logger) << time_now_string()
388 << " ==> ALLOWED_FAST [ " << i << " ]\n";
389 #endif
390 write_allow_fast(i);
391 m_accept_fast.insert(i);
393 return;
396 std::string x;
397 address const& addr = m_remote.address();
398 if (addr.is_v4())
400 address_v4::bytes_type bytes = addr.to_v4().to_bytes();
401 x.assign((char*)&bytes[0], bytes.size());
403 else
405 address_v6::bytes_type bytes = addr.to_v6().to_bytes();
406 x.assign((char*)&bytes[0], bytes.size());
408 x.append((char*)&t->torrent_file().info_hash()[0], 20);
410 sha1_hash hash = hasher(&x[0], x.size()).final();
411 for (;;)
413 char* p = (char*)&hash[0];
414 for (int i = 0; i < 5; ++i)
416 int piece = detail::read_uint32(p) % num_pieces;
417 if (m_accept_fast.find(piece) == m_accept_fast.end())
419 #ifdef TORRENT_VERBOSE_LOGGING
420 (*m_logger) << time_now_string()
421 << " ==> ALLOWED_FAST [ " << piece << " ]\n";
422 #endif
423 write_allow_fast(piece);
424 m_accept_fast.insert(piece);
425 if (int(m_accept_fast.size()) >= num_allowed_pieces
426 || int(m_accept_fast.size()) == num_pieces) return;
429 hash = hasher((char*)&hash[0], 20).final();
433 void peer_connection::init()
435 INVARIANT_CHECK;
437 boost::shared_ptr<torrent> t = m_torrent.lock();
438 TORRENT_ASSERT(t);
439 TORRENT_ASSERT(t->valid_metadata());
440 TORRENT_ASSERT(t->ready_for_connections());
442 m_have_piece.resize(t->torrent_file().num_pieces(), m_have_all);
443 if (m_have_all) m_num_pieces = t->torrent_file().num_pieces();
445 // now that we have a piece_picker,
446 // update it with this peer's pieces
448 TORRENT_ASSERT(m_num_pieces == std::count(m_have_piece.begin()
449 , m_have_piece.end(), true));
451 if (m_num_pieces == int(m_have_piece.size()))
453 #ifdef TORRENT_VERBOSE_LOGGING
454 (*m_logger) << " *** THIS IS A SEED ***\n";
455 #endif
456 // if this is a web seed. we don't have a peer_info struct
457 if (m_peer_info) m_peer_info->seed = true;
459 t->peer_has_all();
460 if (t->is_finished()) send_not_interested();
461 else t->get_policy().peer_is_interesting(*this);
462 return;
465 // if we're a seed, we don't keep track of piece availability
466 if (!t->is_seed())
468 t->peer_has(m_have_piece);
469 bool interesting = false;
470 for (int i = 0; i < int(m_have_piece.size()); ++i)
472 if (m_have_piece[i])
474 // if the peer has a piece and we don't, the peer is interesting
475 if (!t->have_piece(i)
476 && t->picker().piece_priority(i) != 0)
477 interesting = true;
480 if (interesting) t->get_policy().peer_is_interesting(*this);
481 else send_not_interested();
483 else
485 update_interest();
489 peer_connection::~peer_connection()
491 // INVARIANT_CHECK;
492 TORRENT_ASSERT(!m_in_constructor);
493 TORRENT_ASSERT(m_disconnecting);
494 TORRENT_ASSERT(m_disconnect_started);
496 m_disk_recv_buffer_size = 0;
498 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
499 if (m_logger)
501 (*m_logger) << time_now_string()
502 << " *** CONNECTION CLOSED\n";
504 #endif
505 TORRENT_ASSERT(!m_ses.has_peer(this));
506 #ifndef NDEBUG
507 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
508 , end(m_ses.m_torrents.end()); i != end; ++i)
509 TORRENT_ASSERT(!i->second->has_peer(this));
510 if (m_peer_info)
511 TORRENT_ASSERT(m_peer_info->connection == 0);
513 boost::shared_ptr<torrent> t = m_torrent.lock();
514 #endif
517 void peer_connection::fast_reconnect(bool r)
519 if (!peer_info_struct() || peer_info_struct()->fast_reconnects > 1)
520 return;
521 m_fast_reconnect = r;
522 peer_info_struct()->connected = time_now()
523 - seconds(m_ses.settings().min_reconnect_time
524 * m_ses.settings().max_failcount);
525 ++peer_info_struct()->fast_reconnects;
528 void peer_connection::announce_piece(int index)
530 // dont announce during handshake
531 if (in_handshake()) return;
533 // remove suggested pieces that we have
534 std::vector<int>::iterator i = std::find(
535 m_suggested_pieces.begin(), m_suggested_pieces.end(), index);
536 if (i != m_suggested_pieces.end()) m_suggested_pieces.erase(i);
538 if (has_piece(index))
540 // if we got a piece that this peer has
541 // it might have been the last interesting
542 // piece this peer had. We might not be
543 // interested anymore
544 update_interest();
545 if (is_disconnecting()) return;
547 // optimization, don't send have messages
548 // to peers that already have the piece
549 if (!m_ses.settings().send_redundant_have)
551 #ifdef TORRENT_VERBOSE_LOGGING
552 (*m_logger) << time_now_string()
553 << " ==> HAVE [ piece: " << index << " ] SUPRESSED\n";
554 #endif
555 return;
559 #ifdef TORRENT_VERBOSE_LOGGING
560 (*m_logger) << time_now_string()
561 << " ==> HAVE [ piece: " << index << "]\n";
562 #endif
563 write_have(index);
564 #ifndef NDEBUG
565 boost::shared_ptr<torrent> t = m_torrent.lock();
566 TORRENT_ASSERT(t);
567 TORRENT_ASSERT(t->have_piece(index));
568 #endif
571 bool peer_connection::has_piece(int i) const
573 boost::shared_ptr<torrent> t = m_torrent.lock();
574 TORRENT_ASSERT(t);
575 TORRENT_ASSERT(t->valid_metadata());
576 TORRENT_ASSERT(i >= 0);
577 TORRENT_ASSERT(i < t->torrent_file().num_pieces());
578 return m_have_piece[i];
581 std::deque<piece_block> const& peer_connection::request_queue() const
583 return m_request_queue;
586 std::deque<pending_block> const& peer_connection::download_queue() const
588 return m_download_queue;
591 std::deque<peer_request> const& peer_connection::upload_queue() const
593 return m_requests;
596 void peer_connection::add_stat(size_type downloaded, size_type uploaded)
598 m_statistics.add_stat(downloaded, uploaded);
601 bitfield const& peer_connection::get_bitfield() const
603 return m_have_piece;
606 void peer_connection::received_valid_data(int index)
608 INVARIANT_CHECK;
610 #ifndef TORRENT_DISABLE_EXTENSIONS
611 for (extension_list_t::iterator i = m_extensions.begin()
612 , end(m_extensions.end()); i != end; ++i)
614 #ifdef BOOST_NO_EXCEPTIONS
615 (*i)->on_piece_pass(index);
616 #else
617 try { (*i)->on_piece_pass(index); } catch (std::exception&) {}
618 #endif
620 #endif
623 void peer_connection::received_invalid_data(int index)
625 INVARIANT_CHECK;
627 #ifndef TORRENT_DISABLE_EXTENSIONS
628 for (extension_list_t::iterator i = m_extensions.begin()
629 , end(m_extensions.end()); i != end; ++i)
631 #ifdef BOOST_NO_EXCEPTIONS
632 (*i)->on_piece_failed(index);
633 #else
634 try { (*i)->on_piece_failed(index); } catch (std::exception&) {}
635 #endif
637 #endif
638 if (is_disconnecting()) return;
640 if (peer_info_struct())
642 if (m_ses.settings().use_parole_mode)
643 peer_info_struct()->on_parole = true;
645 ++peer_info_struct()->hashfails;
646 boost::int8_t& trust_points = peer_info_struct()->trust_points;
648 // we decrease more than we increase, to keep the
649 // allowed failed/passed ratio low.
650 // TODO: make this limit user settable
651 trust_points -= 2;
652 if (trust_points < -7) trust_points = -7;
656 size_type peer_connection::total_free_upload() const
658 return m_free_upload;
661 void peer_connection::add_free_upload(size_type free_upload)
663 INVARIANT_CHECK;
665 m_free_upload += free_upload;
668 // verifies a piece to see if it is valid (is within a valid range)
669 // and if it can correspond to a request generated by libtorrent.
670 bool peer_connection::verify_piece(const peer_request& p) const
672 INVARIANT_CHECK;
674 boost::shared_ptr<torrent> t = m_torrent.lock();
675 TORRENT_ASSERT(t);
677 TORRENT_ASSERT(t->valid_metadata());
678 torrent_info const& ti = t->torrent_file();
680 return p.piece >= 0
681 && p.piece < t->torrent_file().num_pieces()
682 && p.length > 0
683 && p.start >= 0
684 && (p.length == t->block_size()
685 || (p.length < t->block_size()
686 && p.piece == ti.num_pieces()-1
687 && p.start + p.length == ti.piece_size(p.piece))
688 || (m_request_large_blocks
689 && p.length <= ti.piece_length() * m_prefer_whole_pieces == 0 ?
690 1 : m_prefer_whole_pieces))
691 && p.piece * size_type(ti.piece_length()) + p.start + p.length
692 <= ti.total_size()
693 && (p.start % t->block_size() == 0);
696 void peer_connection::attach_to_torrent(sha1_hash const& ih)
698 INVARIANT_CHECK;
700 TORRENT_ASSERT(!m_disconnecting);
701 TORRENT_ASSERT(m_torrent.expired());
702 boost::weak_ptr<torrent> wpt = m_ses.find_torrent(ih);
703 boost::shared_ptr<torrent> t = wpt.lock();
705 if (t && t->is_aborted())
707 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
708 (*m_logger) << " *** the torrent has been aborted\n";
709 #endif
710 t.reset();
713 if (!t)
715 // we couldn't find the torrent!
716 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
717 (*m_logger) << " *** couldn't find a torrent with the given info_hash: " << ih << "\n";
718 (*m_logger) << " torrents:\n";
719 session_impl::torrent_map const& torrents = m_ses.m_torrents;
720 for (session_impl::torrent_map::const_iterator i = torrents.begin()
721 , end(torrents.end()); i != end; ++i)
723 (*m_logger) << " " << i->second->torrent_file().info_hash() << "\n";
725 #endif
726 disconnect("got invalid info-hash", 2);
727 return;
730 if (t->is_paused())
732 // paused torrents will not accept
733 // incoming connections
734 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
735 (*m_logger) << " rejected connection to paused torrent\n";
736 #endif
737 disconnect("connection rejected bacause torrent is paused");
738 return;
741 TORRENT_ASSERT(m_torrent.expired());
742 // check to make sure we don't have another connection with the same
743 // info_hash and peer_id. If we do. close this connection.
744 #ifndef NDEBUG
747 #endif
748 t->attach_peer(this);
749 #ifndef NDEBUG
751 catch (std::exception& e)
753 std::cout << e.what() << std::endl;
754 TORRENT_ASSERT(false);
756 #endif
757 if (m_disconnecting) return;
758 m_torrent = wpt;
760 TORRENT_ASSERT(!m_torrent.expired());
762 // if the torrent isn't ready to accept
763 // connections yet, we'll have to wait with
764 // our initialization
765 if (t->ready_for_connections()) init();
767 TORRENT_ASSERT(!m_torrent.expired());
769 // assume the other end has no pieces
770 // if we don't have valid metadata yet,
771 // leave the vector unallocated
772 TORRENT_ASSERT(m_num_pieces == 0);
773 m_have_piece.clear_all();
774 TORRENT_ASSERT(!m_torrent.expired());
777 // message handlers
779 // -----------------------------
780 // --------- KEEPALIVE ---------
781 // -----------------------------
783 void peer_connection::incoming_keepalive()
785 INVARIANT_CHECK;
787 #ifdef TORRENT_VERBOSE_LOGGING
788 (*m_logger) << time_now_string() << " <== KEEPALIVE\n";
789 #endif
792 // -----------------------------
793 // ----------- CHOKE -----------
794 // -----------------------------
796 void peer_connection::incoming_choke()
798 INVARIANT_CHECK;
800 boost::shared_ptr<torrent> t = m_torrent.lock();
801 TORRENT_ASSERT(t);
803 #ifndef TORRENT_DISABLE_EXTENSIONS
804 for (extension_list_t::iterator i = m_extensions.begin()
805 , end(m_extensions.end()); i != end; ++i)
807 if ((*i)->on_choke()) return;
809 #endif
810 if (is_disconnecting()) return;
812 #ifdef TORRENT_VERBOSE_LOGGING
813 (*m_logger) << time_now_string() << " <== CHOKE\n";
814 #endif
815 m_peer_choked = true;
817 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole)
819 // if the peer is not in parole mode, clear the queued
820 // up block requests
821 if (!t->is_seed())
823 piece_picker& p = t->picker();
824 for (std::deque<piece_block>::const_iterator i = m_request_queue.begin()
825 , end(m_request_queue.end()); i != end; ++i)
827 // since this piece was skipped, clear it and allow it to
828 // be requested from other peers
829 p.abort_download(*i);
832 m_request_queue.clear();
836 bool match_request(peer_request const& r, piece_block const& b, int block_size)
838 if (b.piece_index != r.piece) return false;
839 if (b.block_index != r.start / block_size) return false;
840 if (r.start % block_size != 0) return false;
841 return true;
844 // -----------------------------
845 // -------- REJECT PIECE -------
846 // -----------------------------
848 void peer_connection::incoming_reject_request(peer_request const& r)
850 INVARIANT_CHECK;
852 boost::shared_ptr<torrent> t = m_torrent.lock();
853 TORRENT_ASSERT(t);
855 #ifndef TORRENT_DISABLE_EXTENSIONS
856 for (extension_list_t::iterator i = m_extensions.begin()
857 , end(m_extensions.end()); i != end; ++i)
859 if ((*i)->on_reject(r)) return;
861 #endif
863 if (is_disconnecting()) return;
865 std::deque<pending_block>::iterator i = std::find_if(
866 m_download_queue.begin(), m_download_queue.end()
867 , bind(match_request, boost::cref(r), bind(&pending_block::block, _1)
868 , t->block_size()));
870 #ifdef TORRENT_VERBOSE_LOGGING
871 (*m_logger) << time_now_string()
872 << " <== REJECT_PIECE [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
873 #endif
875 piece_block b(-1, 0);
876 if (i != m_download_queue.end())
878 b = i->block;
879 m_download_queue.erase(i);
881 // if the peer is in parole mode, keep the request
882 if (peer_info_struct() && peer_info_struct()->on_parole)
884 m_request_queue.push_front(b);
886 else if (!t->is_seed())
888 piece_picker& p = t->picker();
889 p.abort_download(b);
892 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
893 else
895 (*m_logger) << time_now_string()
896 << " *** PIECE NOT IN REQUEST QUEUE\n";
898 #endif
899 if (has_peer_choked())
901 // if we're choked and we got a rejection of
902 // a piece in the allowed fast set, remove it
903 // from the allow fast set.
904 std::vector<int>::iterator i = std::find(
905 m_allowed_fast.begin(), m_allowed_fast.end(), r.piece);
906 if (i != m_allowed_fast.end()) m_allowed_fast.erase(i);
908 else
910 std::vector<int>::iterator i = std::find(m_suggested_pieces.begin()
911 , m_suggested_pieces.end(), r.piece);
912 if (i != m_suggested_pieces.end())
913 m_suggested_pieces.erase(i);
916 if (m_request_queue.empty() && m_download_queue.size() < 2)
918 request_a_block(*t, *this);
919 send_block_requests();
923 // -----------------------------
924 // ------- SUGGEST PIECE -------
925 // -----------------------------
927 void peer_connection::incoming_suggest(int index)
929 INVARIANT_CHECK;
931 #ifdef TORRENT_VERBOSE_LOGGING
932 (*m_logger) << time_now_string()
933 << " <== SUGGEST_PIECE [ piece: " << index << " ]\n";
934 #endif
935 boost::shared_ptr<torrent> t = m_torrent.lock();
936 if (!t) return;
938 #ifndef TORRENT_DISABLE_EXTENSIONS
939 for (extension_list_t::iterator i = m_extensions.begin()
940 , end(m_extensions.end()); i != end; ++i)
942 if ((*i)->on_suggest(index)) return;
944 #endif
946 if (is_disconnecting()) return;
947 if (t->have_piece(index)) return;
949 if (m_suggested_pieces.size() > 9)
950 m_suggested_pieces.erase(m_suggested_pieces.begin());
951 m_suggested_pieces.push_back(index);
953 #ifdef TORRENT_VERBOSE_LOGGING
954 (*m_logger) << time_now_string()
955 << " ** SUGGEST_PIECE [ piece: " << index << " added to set: " << m_suggested_pieces.size() << " ]\n";
956 #endif
959 // -----------------------------
960 // ---------- UNCHOKE ----------
961 // -----------------------------
963 void peer_connection::incoming_unchoke()
965 INVARIANT_CHECK;
967 boost::shared_ptr<torrent> t = m_torrent.lock();
968 TORRENT_ASSERT(t);
970 #ifndef TORRENT_DISABLE_EXTENSIONS
971 for (extension_list_t::iterator i = m_extensions.begin()
972 , end(m_extensions.end()); i != end; ++i)
974 if ((*i)->on_unchoke()) return;
976 #endif
978 #ifdef TORRENT_VERBOSE_LOGGING
979 (*m_logger) << time_now_string() << " <== UNCHOKE\n";
980 #endif
981 m_peer_choked = false;
982 if (is_disconnecting()) return;
984 t->get_policy().unchoked(*this);
987 // -----------------------------
988 // -------- INTERESTED ---------
989 // -----------------------------
991 void peer_connection::incoming_interested()
993 INVARIANT_CHECK;
995 boost::shared_ptr<torrent> t = m_torrent.lock();
996 TORRENT_ASSERT(t);
998 #ifndef TORRENT_DISABLE_EXTENSIONS
999 for (extension_list_t::iterator i = m_extensions.begin()
1000 , end(m_extensions.end()); i != end; ++i)
1002 if ((*i)->on_interested()) return;
1004 #endif
1006 #ifdef TORRENT_VERBOSE_LOGGING
1007 (*m_logger) << time_now_string() << " <== INTERESTED\n";
1008 #endif
1009 m_peer_interested = true;
1010 if (is_disconnecting()) return;
1011 t->get_policy().interested(*this);
1014 // -----------------------------
1015 // ------ NOT INTERESTED -------
1016 // -----------------------------
1018 void peer_connection::incoming_not_interested()
1020 INVARIANT_CHECK;
1022 #ifndef TORRENT_DISABLE_EXTENSIONS
1023 for (extension_list_t::iterator i = m_extensions.begin()
1024 , end(m_extensions.end()); i != end; ++i)
1026 if ((*i)->on_not_interested()) return;
1028 #endif
1030 m_became_uninterested = time_now();
1032 #ifdef TORRENT_VERBOSE_LOGGING
1033 (*m_logger) << time_now_string() << " <== NOT_INTERESTED\n";
1034 #endif
1035 m_peer_interested = false;
1036 if (is_disconnecting()) return;
1038 boost::shared_ptr<torrent> t = m_torrent.lock();
1039 TORRENT_ASSERT(t);
1041 t->get_policy().not_interested(*this);
1044 // -----------------------------
1045 // ----------- HAVE ------------
1046 // -----------------------------
1048 void peer_connection::incoming_have(int index)
1050 INVARIANT_CHECK;
1052 boost::shared_ptr<torrent> t = m_torrent.lock();
1053 TORRENT_ASSERT(t);
1055 #ifndef TORRENT_DISABLE_EXTENSIONS
1056 for (extension_list_t::iterator i = m_extensions.begin()
1057 , end(m_extensions.end()); i != end; ++i)
1059 if ((*i)->on_have(index)) return;
1061 #endif
1063 if (is_disconnecting()) return;
1065 // if we haven't received a bitfield, it was
1066 // probably omitted, which is the same as 'have_none'
1067 if (!m_bitfield_received) incoming_have_none();
1069 #ifdef TORRENT_VERBOSE_LOGGING
1070 (*m_logger) << time_now_string()
1071 << " <== HAVE [ piece: " << index << "]\n";
1072 #endif
1074 if (!t->valid_metadata() && index > int(m_have_piece.size()))
1076 if (index < 65536)
1078 // if we don't have metadata
1079 // and we might not have received a bitfield
1080 // extend the bitmask to fit the new
1081 // have message
1082 m_have_piece.resize(index + 1, false);
1084 else
1086 // unless the index > 64k, in which case
1087 // we just ignore it
1088 return;
1092 // if we got an invalid message, abort
1093 if (index >= int(m_have_piece.size()) || index < 0)
1095 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1096 return;
1099 if (m_have_piece[index])
1101 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1102 (*m_logger) << " got redundant HAVE message for index: " << index << "\n";
1103 #endif
1105 else
1107 m_have_piece.set_bit(index);
1109 // only update the piece_picker if
1110 // we have the metadata and if
1111 // we're not a seed (in which case
1112 // we won't have a piece picker)
1113 if (t->valid_metadata())
1115 ++m_num_pieces;
1116 t->peer_has(index);
1118 if (!t->have_piece(index)
1119 && !t->is_seed()
1120 && !is_interesting()
1121 && t->picker().piece_priority(index) != 0)
1122 t->get_policy().peer_is_interesting(*this);
1124 // this will disregard all have messages we get within
1125 // the first two seconds. Since some clients implements
1126 // lazy bitfields, these will not be reliable to use
1127 // for an estimated peer download rate.
1128 if (!peer_info_struct() || time_now() - peer_info_struct()->connected > seconds(2))
1130 // update bytes downloaded since last timer
1131 m_remote_bytes_dled += t->torrent_file().piece_size(index);
1135 if (is_seed())
1137 m_peer_info->seed = true;
1138 m_upload_only = true;
1139 disconnect_if_redundant();
1140 if (is_disconnecting()) return;
1145 // -----------------------------
1146 // --------- BITFIELD ----------
1147 // -----------------------------
1149 void peer_connection::incoming_bitfield(bitfield const& bits)
1151 INVARIANT_CHECK;
1153 boost::shared_ptr<torrent> t = m_torrent.lock();
1154 TORRENT_ASSERT(t);
1156 #ifndef TORRENT_DISABLE_EXTENSIONS
1157 for (extension_list_t::iterator i = m_extensions.begin()
1158 , end(m_extensions.end()); i != end; ++i)
1160 if ((*i)->on_bitfield(bits)) return;
1162 #endif
1164 if (is_disconnecting()) return;
1166 #ifdef TORRENT_VERBOSE_LOGGING
1167 (*m_logger) << time_now_string() << " <== BITFIELD ";
1169 for (int i = 0; i < int(bits.size()); ++i)
1171 if (bits[i]) (*m_logger) << "1";
1172 else (*m_logger) << "0";
1174 (*m_logger) << "\n";
1175 #endif
1177 // if we don't have the metedata, we cannot
1178 // verify the bitfield size
1179 if (t->valid_metadata()
1180 && (bits.size() / 8) != (m_have_piece.size() / 8))
1182 std::stringstream msg;
1183 msg << "got bitfield with invalid size: " << (bits.size() / 8)
1184 << "bytes. expected: " << (m_have_piece.size() / 8)
1185 << " bytes";
1186 disconnect(msg.str().c_str(), 2);
1187 return;
1190 m_bitfield_received = true;
1192 // if we don't have metadata yet
1193 // just remember the bitmask
1194 // don't update the piecepicker
1195 // (since it doesn't exist yet)
1196 if (!t->ready_for_connections())
1198 m_have_piece = bits;
1199 m_num_pieces = bits.count();
1200 if (m_peer_info) m_peer_info->seed = (m_num_pieces == int(bits.size()));
1201 return;
1204 TORRENT_ASSERT(t->valid_metadata());
1206 int num_pieces = bits.count();
1207 if (num_pieces == int(m_have_piece.size()))
1209 #ifdef TORRENT_VERBOSE_LOGGING
1210 (*m_logger) << " *** THIS IS A SEED ***\n";
1211 #endif
1212 // if this is a web seed. we don't have a peer_info struct
1213 if (m_peer_info) m_peer_info->seed = true;
1214 m_upload_only = true;
1215 disconnect_if_redundant();
1216 if (is_disconnecting()) return;
1218 m_have_piece.set_all();
1219 m_num_pieces = num_pieces;
1220 t->peer_has_all();
1221 if (!t->is_finished())
1222 t->get_policy().peer_is_interesting(*this);
1223 return;
1226 // let the torrent know which pieces the
1227 // peer has
1228 // if we're a seed, we don't keep track of piece availability
1229 bool interesting = false;
1230 if (!t->is_seed())
1232 t->peer_has(bits);
1234 for (int i = 0; i < (int)m_have_piece.size(); ++i)
1236 bool have = bits[i];
1237 if (have && !m_have_piece[i])
1239 if (!t->have_piece(i) && t->picker().piece_priority(i) != 0)
1240 interesting = true;
1242 else if (!have && m_have_piece[i])
1244 // this should probably not be allowed
1245 t->peer_lost(i);
1250 m_have_piece = bits;
1251 m_num_pieces = num_pieces;
1253 if (interesting) t->get_policy().peer_is_interesting(*this);
1254 else if (upload_only()) disconnect("upload to upload connections");
1257 void peer_connection::disconnect_if_redundant()
1259 if (!m_ses.settings().close_redundant_connections) return;
1261 boost::shared_ptr<torrent> t = m_torrent.lock();
1262 TORRENT_ASSERT(t);
1263 if (m_upload_only && t->is_finished())
1264 disconnect("seed to seed");
1266 if (m_upload_only
1267 && !m_interesting
1268 && m_bitfield_received
1269 && t->are_files_checked())
1270 disconnect("uninteresting upload-only peer");
1273 // -----------------------------
1274 // ---------- REQUEST ----------
1275 // -----------------------------
1277 void peer_connection::incoming_request(peer_request const& r)
1279 INVARIANT_CHECK;
1281 boost::shared_ptr<torrent> t = m_torrent.lock();
1282 TORRENT_ASSERT(t);
1284 // if we haven't received a bitfield, it was
1285 // probably omitted, which is the same as 'have_none'
1286 if (!m_bitfield_received) incoming_have_none();
1288 #ifndef TORRENT_DISABLE_EXTENSIONS
1289 for (extension_list_t::iterator i = m_extensions.begin()
1290 , end(m_extensions.end()); i != end; ++i)
1292 if ((*i)->on_request(r)) return;
1294 #endif
1295 if (is_disconnecting()) return;
1297 if (!t->valid_metadata())
1299 // if we don't have valid metadata yet,
1300 // we shouldn't get a request
1301 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1302 (*m_logger) << time_now_string()
1303 << " <== UNEXPECTED_REQUEST [ "
1304 "piece: " << r.piece << " | "
1305 "s: " << r.start << " | "
1306 "l: " << r.length << " | "
1307 "i: " << m_peer_interested << " | "
1308 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1309 "n: " << t->torrent_file().num_pieces() << " ]\n";
1311 (*m_logger) << time_now_string()
1312 << " ==> REJECT_PIECE [ "
1313 "piece: " << r.piece << " | "
1314 "s: " << r.start << " | "
1315 "l: " << r.length << " ]\n";
1316 #endif
1317 write_reject_request(r);
1318 return;
1321 if (int(m_requests.size()) > m_ses.settings().max_allowed_in_request_queue)
1323 // don't allow clients to abuse our
1324 // memory consumption.
1325 // ignore requests if the client
1326 // is making too many of them.
1327 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1328 (*m_logger) << time_now_string()
1329 << " <== TOO MANY REQUESTS [ "
1330 "piece: " << r.piece << " | "
1331 "s: " << r.start << " | "
1332 "l: " << r.length << " | "
1333 "i: " << m_peer_interested << " | "
1334 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1335 "n: " << t->torrent_file().num_pieces() << " ]\n";
1337 (*m_logger) << time_now_string()
1338 << " ==> REJECT_PIECE [ "
1339 "piece: " << r.piece << " | "
1340 "s: " << r.start << " | "
1341 "l: " << r.length << " ]\n";
1342 #endif
1343 write_reject_request(r);
1344 return;
1347 // make sure this request
1348 // is legal and that the peer
1349 // is not choked
1350 if (r.piece >= 0
1351 && r.piece < t->torrent_file().num_pieces()
1352 && t->have_piece(r.piece)
1353 && r.start >= 0
1354 && r.start < t->torrent_file().piece_size(r.piece)
1355 && r.length > 0
1356 && r.length + r.start <= t->torrent_file().piece_size(r.piece)
1357 && m_peer_interested
1358 && r.length <= t->block_size())
1360 #ifdef TORRENT_VERBOSE_LOGGING
1361 (*m_logger) << time_now_string()
1362 << " <== REQUEST [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1363 #endif
1364 // if we have choked the client
1365 // ignore the request
1366 if (m_choked && m_accept_fast.find(r.piece) == m_accept_fast.end())
1368 write_reject_request(r);
1369 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1370 (*m_logger) << time_now_string()
1371 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1372 (*m_logger) << time_now_string()
1373 << " ==> REJECT_PIECE [ "
1374 "piece: " << r.piece << " | "
1375 "s: " << r.start << " | "
1376 "l: " << r.length << " ]\n";
1377 #endif
1379 else
1381 m_requests.push_back(r);
1382 m_last_incoming_request = time_now();
1383 fill_send_buffer();
1386 else
1388 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1389 (*m_logger) << time_now_string()
1390 << " <== INVALID_REQUEST [ "
1391 "piece: " << r.piece << " | "
1392 "s: " << r.start << " | "
1393 "l: " << r.length << " | "
1394 "i: " << m_peer_interested << " | "
1395 "t: " << t->torrent_file().piece_size(r.piece) << " | "
1396 "n: " << t->torrent_file().num_pieces() << " | "
1397 "h: " << t->have_piece(r.piece) << " | "
1398 "block_limit: " << t->block_size() << " ]\n";
1400 (*m_logger) << time_now_string()
1401 << " ==> REJECT_PIECE [ "
1402 "piece: " << r.piece << " | "
1403 "s: " << r.start << " | "
1404 "l: " << r.length << " ]\n";
1405 #endif
1407 write_reject_request(r);
1408 ++m_num_invalid_requests;
1410 if (t->alerts().should_post<invalid_request_alert>())
1412 t->alerts().post_alert(invalid_request_alert(
1413 t->get_handle(), m_remote, m_peer_id, r));
1418 void peer_connection::incoming_piece_fragment()
1420 m_last_piece = time_now();
1423 #ifndef NDEBUG
1424 struct check_postcondition
1426 check_postcondition(boost::shared_ptr<torrent> const& t_
1427 , bool init_check = true): t(t_) { if (init_check) check(); }
1429 ~check_postcondition() { check(); }
1431 void check()
1433 if (!t->is_seed())
1435 const int blocks_per_piece = static_cast<int>(
1436 t->torrent_file().piece_length() / t->block_size());
1438 std::vector<piece_picker::downloading_piece> const& dl_queue
1439 = t->picker().get_download_queue();
1441 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
1442 dl_queue.begin(); i != dl_queue.end(); ++i)
1444 TORRENT_ASSERT(i->finished <= blocks_per_piece);
1449 shared_ptr<torrent> t;
1451 #endif
1454 // -----------------------------
1455 // ----------- PIECE -----------
1456 // -----------------------------
1458 void peer_connection::incoming_piece(peer_request const& p, char const* data)
1460 char* buffer = m_ses.allocate_disk_buffer();
1461 if (buffer == 0)
1463 disconnect("out of memory");
1464 return;
1466 disk_buffer_holder holder(m_ses, buffer);
1467 std::memcpy(buffer, data, p.length);
1468 incoming_piece(p, holder);
1471 void peer_connection::incoming_piece(peer_request const& p, disk_buffer_holder& data)
1473 INVARIANT_CHECK;
1475 boost::shared_ptr<torrent> t = m_torrent.lock();
1476 TORRENT_ASSERT(t);
1478 TORRENT_ASSERT(!m_disk_recv_buffer);
1479 TORRENT_ASSERT(m_disk_recv_buffer_size == 0);
1481 #ifdef TORRENT_CORRUPT_DATA
1482 // corrupt all pieces from certain peers
1483 if (m_remote.address().is_v4()
1484 && (m_remote.address().to_v4().to_ulong() & 0xf) == 0)
1486 data.get()[0] = ~data.get()[0];
1488 #endif
1490 // if we haven't received a bitfield, it was
1491 // probably omitted, which is the same as 'have_none'
1492 if (!m_bitfield_received) incoming_have_none();
1494 #ifndef TORRENT_DISABLE_EXTENSIONS
1495 for (extension_list_t::iterator i = m_extensions.begin()
1496 , end(m_extensions.end()); i != end; ++i)
1498 if ((*i)->on_piece(p, data)) return;
1500 #endif
1501 if (is_disconnecting()) return;
1503 #ifndef NDEBUG
1504 check_postcondition post_checker_(t);
1505 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS
1506 t->check_invariant();
1507 #endif
1508 #endif
1510 #ifdef TORRENT_VERBOSE_LOGGING
1511 (*m_logger) << time_now_string()
1512 << " <== PIECE [ piece: " << p.piece << " | "
1513 "s: " << p.start << " | "
1514 "l: " << p.length << " | "
1515 "ds: " << statistics().download_rate() << " | "
1516 "qs: " << m_desired_queue_size << " ]\n";
1517 #endif
1519 if (p.length == 0)
1521 if (t->alerts().should_post<peer_error_alert>())
1523 t->alerts().post_alert(peer_error_alert(t->get_handle(), m_remote
1524 , m_peer_id, "peer sent 0 length piece"));
1526 return;
1529 if (!verify_piece(p))
1531 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1532 (*m_logger) << time_now_string()
1533 << " <== INVALID_PIECE [ piece: " << p.piece << " | "
1534 "start: " << p.start << " | "
1535 "length: " << p.length << " ]\n";
1536 #endif
1537 disconnect("got invalid piece packet", 2);
1538 return;
1541 // if we're already seeding, don't bother,
1542 // just ignore it
1543 if (t->is_seed())
1545 t->add_redundant_bytes(p.length);
1546 return;
1549 ptime now = time_now();
1551 piece_picker& picker = t->picker();
1552 piece_manager& fs = t->filesystem();
1554 std::vector<piece_block> finished_blocks;
1555 piece_block block_finished(p.piece, p.start / t->block_size());
1556 TORRENT_ASSERT(p.start % t->block_size() == 0);
1557 TORRENT_ASSERT(p.length == t->block_size()
1558 || p.length == t->torrent_file().total_size() % t->block_size());
1560 std::deque<pending_block>::iterator b
1561 = std::find_if(
1562 m_download_queue.begin()
1563 , m_download_queue.end()
1564 , has_block(block_finished));
1566 if (b == m_download_queue.end())
1568 if (t->alerts().should_post<unwanted_block_alert>())
1570 t->alerts().post_alert(unwanted_block_alert(t->get_handle(), m_remote
1571 , m_peer_id, block_finished.block_index, block_finished.piece_index));
1573 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1574 (*m_logger) << " *** The block we just got was not in the "
1575 "request queue ***\n";
1576 #endif
1577 t->add_redundant_bytes(p.length);
1578 request_a_block(*t, *this);
1579 send_block_requests();
1580 return;
1583 for (std::deque<pending_block>::iterator i = m_download_queue.begin();
1584 i != b;)
1587 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1588 (*m_logger) << time_now_string()
1589 << " *** SKIPPED_PIECE [ piece: " << i->block.piece_index << " | "
1590 "b: " << i->block.block_index << " ] ***\n";
1591 #endif
1593 ++i->skipped;
1594 // if the number of times a block is skipped by out of order
1595 // blocks exceeds the size of the outstanding queue, assume that
1596 // the other end dropped the request.
1597 if (i->skipped > m_desired_queue_size)
1599 if (m_ses.m_alerts.should_post<request_dropped_alert>())
1600 m_ses.m_alerts.post_alert(request_dropped_alert(t->get_handle()
1601 , remote(), pid(), i->block.block_index, i->block.piece_index));
1602 picker.abort_download(i->block);
1603 i = m_download_queue.erase(i);
1605 else
1607 ++i;
1611 // if the block we got is already finished, then ignore it
1612 if (picker.is_downloaded(block_finished))
1614 t->add_redundant_bytes(p.length);
1616 m_download_queue.erase(b);
1617 m_timeout_extend = 0;
1619 if (!m_download_queue.empty())
1620 m_requested = now;
1622 request_a_block(*t, *this);
1623 send_block_requests();
1624 return;
1627 if (total_seconds(now - m_requested)
1628 < m_ses.settings().request_timeout
1629 && m_snubbed)
1631 m_snubbed = false;
1632 if (m_ses.m_alerts.should_post<peer_unsnubbed_alert>())
1634 m_ses.m_alerts.post_alert(peer_unsnubbed_alert(t->get_handle()
1635 , m_remote, m_peer_id));
1639 fs.async_write(p, data, bind(&peer_connection::on_disk_write_complete
1640 , self(), _1, _2, p, t));
1641 m_outstanding_writing_bytes += p.length;
1642 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
1643 m_download_queue.erase(b);
1645 if (!m_download_queue.empty())
1647 m_timeout_extend = (std::max)(m_timeout_extend
1648 - m_ses.settings().request_timeout, 0);
1649 m_requested += seconds(m_ses.settings().request_timeout);
1650 if (m_requested > now) m_requested = now;
1652 else
1654 m_timeout_extend = 0;
1657 // did we request this block from any other peers?
1658 bool multi = picker.num_peers(block_finished) > 1;
1659 picker.mark_as_writing(block_finished, peer_info_struct());
1661 // if we requested this block from other peers, cancel it now
1662 if (multi) t->cancel_block(block_finished);
1664 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
1665 t->check_invariant();
1666 #endif
1667 request_a_block(*t, *this);
1668 send_block_requests();
1671 void peer_connection::on_disk_write_complete(int ret, disk_io_job const& j
1672 , peer_request p, boost::shared_ptr<torrent> t)
1674 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
1676 INVARIANT_CHECK;
1678 m_outstanding_writing_bytes -= p.length;
1679 TORRENT_ASSERT(m_outstanding_writing_bytes >= 0);
1681 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1682 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1683 // << p.piece << " o: " << p.start << " ]\n";
1684 #endif
1685 // in case the outstanding bytes just dropped down
1686 // to allow to receive more data
1687 setup_receive();
1689 piece_block block_finished(p.piece, p.start / t->block_size());
1691 if (ret == -1 || !t)
1693 if (t->has_picker()) t->picker().write_failed(block_finished);
1695 if (!t)
1697 disconnect(j.str.c_str());
1698 return;
1701 if (t->alerts().should_post<file_error_alert>())
1703 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
1705 t->pause();
1706 return;
1709 if (t->is_seed()) return;
1711 piece_picker& picker = t->picker();
1713 TORRENT_ASSERT(p.piece == j.piece);
1714 TORRENT_ASSERT(p.start == j.offset);
1715 picker.mark_as_finished(block_finished, peer_info_struct());
1716 if (t->alerts().should_post<block_finished_alert>())
1718 t->alerts().post_alert(block_finished_alert(t->get_handle(),
1719 remote(), pid(), block_finished.block_index, block_finished.piece_index));
1722 // did we just finish the piece?
1723 if (picker.is_piece_finished(p.piece))
1725 #ifndef NDEBUG
1726 check_postcondition post_checker2_(t, false);
1727 #endif
1728 t->async_verify_piece(p.piece, bind(&torrent::piece_finished, t
1729 , p.piece, _1));
1732 if (!t->is_seed() && !m_torrent.expired())
1734 // this is a free function defined in policy.cpp
1735 request_a_block(*t, *this);
1736 send_block_requests();
1741 // -----------------------------
1742 // ---------- CANCEL -----------
1743 // -----------------------------
1745 void peer_connection::incoming_cancel(peer_request const& r)
1747 INVARIANT_CHECK;
1749 #ifndef TORRENT_DISABLE_EXTENSIONS
1750 for (extension_list_t::iterator i = m_extensions.begin()
1751 , end(m_extensions.end()); i != end; ++i)
1753 if ((*i)->on_cancel(r)) return;
1755 #endif
1756 if (is_disconnecting()) return;
1758 #ifdef TORRENT_VERBOSE_LOGGING
1759 (*m_logger) << time_now_string()
1760 << " <== CANCEL [ piece: " << r.piece << " | s: " << r.start << " | l: " << r.length << " ]\n";
1761 #endif
1763 std::deque<peer_request>::iterator i
1764 = std::find(m_requests.begin(), m_requests.end(), r);
1766 if (i != m_requests.end())
1768 m_requests.erase(i);
1769 #ifdef TORRENT_VERBOSE_LOGGING
1770 (*m_logger) << time_now_string()
1771 << " ==> REJECT_PIECE [ "
1772 "piece: " << r.piece << " | "
1773 "s: " << r.start << " | "
1774 "l: " << r.length << " ]\n";
1775 #endif
1776 write_reject_request(r);
1778 else
1780 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1781 (*m_logger) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1782 #endif
1786 // -----------------------------
1787 // --------- DHT PORT ----------
1788 // -----------------------------
1790 void peer_connection::incoming_dht_port(int listen_port)
1792 INVARIANT_CHECK;
1794 #ifdef TORRENT_VERBOSE_LOGGING
1795 (*m_logger) << time_now_string()
1796 << " <== DHT_PORT [ p: " << listen_port << " ]\n";
1797 #endif
1798 #ifndef TORRENT_DISABLE_DHT
1799 m_ses.add_dht_node(udp::endpoint(
1800 m_remote.address(), listen_port));
1801 #endif
1804 // -----------------------------
1805 // --------- HAVE ALL ----------
1806 // -----------------------------
1808 void peer_connection::incoming_have_all()
1810 INVARIANT_CHECK;
1812 boost::shared_ptr<torrent> t = m_torrent.lock();
1813 TORRENT_ASSERT(t);
1815 #ifdef TORRENT_VERBOSE_LOGGING
1816 (*m_logger) << time_now_string() << " <== HAVE_ALL\n";
1817 #endif
1819 #ifndef TORRENT_DISABLE_EXTENSIONS
1820 for (extension_list_t::iterator i = m_extensions.begin()
1821 , end(m_extensions.end()); i != end; ++i)
1823 if ((*i)->on_have_all()) return;
1825 #endif
1826 if (is_disconnecting()) return;
1828 m_have_all = true;
1830 if (m_peer_info) m_peer_info->seed = true;
1831 m_upload_only = true;
1832 m_bitfield_received = true;
1834 #ifdef TORRENT_VERBOSE_LOGGING
1835 (*m_logger) << " *** THIS IS A SEED ***\n";
1836 #endif
1838 // if we don't have metadata yet
1839 // just remember the bitmask
1840 // don't update the piecepicker
1841 // (since it doesn't exist yet)
1842 if (!t->ready_for_connections())
1844 disconnect_if_redundant();
1845 // TODO: this might need something more
1846 // so that once we have the metadata
1847 // we can construct a full bitfield
1848 return;
1851 TORRENT_ASSERT(!m_have_piece.empty());
1852 m_have_piece.set_all();
1853 m_num_pieces = m_have_piece.size();
1855 t->peer_has_all();
1857 // if we're finished, we're not interested
1858 if (t->is_finished()) send_not_interested();
1859 else t->get_policy().peer_is_interesting(*this);
1861 disconnect_if_redundant();
1864 // -----------------------------
1865 // --------- HAVE NONE ---------
1866 // -----------------------------
1868 void peer_connection::incoming_have_none()
1870 INVARIANT_CHECK;
1872 #ifdef TORRENT_VERBOSE_LOGGING
1873 (*m_logger) << time_now_string() << " <== HAVE_NONE\n";
1874 #endif
1876 boost::shared_ptr<torrent> t = m_torrent.lock();
1877 TORRENT_ASSERT(t);
1879 #ifndef TORRENT_DISABLE_EXTENSIONS
1880 for (extension_list_t::iterator i = m_extensions.begin()
1881 , end(m_extensions.end()); i != end; ++i)
1883 if ((*i)->on_have_none()) return;
1885 #endif
1886 if (is_disconnecting()) return;
1887 if (m_peer_info) m_peer_info->seed = false;
1888 m_bitfield_received = true;
1890 // we're never interested in a peer that doesn't have anything
1891 send_not_interested();
1893 TORRENT_ASSERT(!m_have_piece.empty() || !t->ready_for_connections());
1894 disconnect_if_redundant();
1897 // -----------------------------
1898 // ------- ALLOWED FAST --------
1899 // -----------------------------
1901 void peer_connection::incoming_allowed_fast(int index)
1903 INVARIANT_CHECK;
1905 boost::shared_ptr<torrent> t = m_torrent.lock();
1906 TORRENT_ASSERT(t);
1908 #ifdef TORRENT_VERBOSE_LOGGING
1909 (*m_logger) << time_now_string() << " <== ALLOWED_FAST [ " << index << " ]\n";
1910 #endif
1912 #ifndef TORRENT_DISABLE_EXTENSIONS
1913 for (extension_list_t::iterator i = m_extensions.begin()
1914 , end(m_extensions.end()); i != end; ++i)
1916 if ((*i)->on_allowed_fast(index)) return;
1918 #endif
1919 if (is_disconnecting()) return;
1921 if (index < 0 || index >= int(m_have_piece.size()))
1923 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1924 (*m_logger) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index << " | s: "
1925 << int(m_have_piece.size()) << " ]\n";
1926 #endif
1927 return;
1930 // if we already have the piece, we can
1931 // ignore this message
1932 if (t->valid_metadata()
1933 && t->have_piece(index))
1934 return;
1936 m_allowed_fast.push_back(index);
1938 // if the peer has the piece and we want
1939 // to download it, request it
1940 if (int(m_have_piece.size()) > index
1941 && m_have_piece[index]
1942 && t->has_picker()
1943 && t->picker().piece_priority(index) > 0)
1945 t->get_policy().peer_is_interesting(*this);
1949 std::vector<int> const& peer_connection::allowed_fast()
1951 boost::shared_ptr<torrent> t = m_torrent.lock();
1952 TORRENT_ASSERT(t);
1954 m_allowed_fast.erase(std::remove_if(m_allowed_fast.begin()
1955 , m_allowed_fast.end(), bind(&torrent::have_piece, t, _1))
1956 , m_allowed_fast.end());
1958 // TODO: sort the allowed fast set in priority order
1959 return m_allowed_fast;
1962 void peer_connection::add_request(piece_block const& block)
1964 // INVARIANT_CHECK;
1966 boost::shared_ptr<torrent> t = m_torrent.lock();
1967 TORRENT_ASSERT(t);
1969 TORRENT_ASSERT(t->valid_metadata());
1970 TORRENT_ASSERT(block.piece_index >= 0);
1971 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
1972 TORRENT_ASSERT(block.block_index >= 0);
1973 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
1974 TORRENT_ASSERT(!t->picker().is_requested(block) || (t->picker().num_peers(block) > 0));
1975 TORRENT_ASSERT(!t->have_piece(block.piece_index));
1976 TORRENT_ASSERT(std::find_if(m_download_queue.begin(), m_download_queue.end()
1977 , has_block(block)) == m_download_queue.end());
1978 TORRENT_ASSERT(std::find(m_request_queue.begin(), m_request_queue.end()
1979 , block) == m_request_queue.end());
1981 piece_picker::piece_state_t state;
1982 peer_speed_t speed = peer_speed();
1983 char const* speedmsg = 0;
1984 if (speed == fast)
1986 speedmsg = "fast";
1987 state = piece_picker::fast;
1989 else if (speed == medium)
1991 speedmsg = "medium";
1992 state = piece_picker::medium;
1994 else
1996 speedmsg = "slow";
1997 state = piece_picker::slow;
2000 if (!t->picker().mark_as_downloading(block, peer_info_struct(), state))
2001 return;
2003 if (t->alerts().should_post<block_downloading_alert>())
2005 t->alerts().post_alert(block_downloading_alert(t->get_handle(),
2006 remote(), pid(), speedmsg, block.block_index, block.piece_index));
2009 m_request_queue.push_back(block);
2012 void peer_connection::cancel_request(piece_block const& block)
2014 INVARIANT_CHECK;
2016 boost::shared_ptr<torrent> t = m_torrent.lock();
2017 // this peer might be disconnecting
2018 if (!t) return;
2020 TORRENT_ASSERT(t->valid_metadata());
2022 TORRENT_ASSERT(block.piece_index >= 0);
2023 TORRENT_ASSERT(block.piece_index < t->torrent_file().num_pieces());
2024 TORRENT_ASSERT(block.block_index >= 0);
2025 TORRENT_ASSERT(block.block_index < t->torrent_file().piece_size(block.piece_index));
2027 // if all the peers that requested this block has been
2028 // cancelled, then just ignore the cancel.
2029 if (!t->picker().is_requested(block)) return;
2031 std::deque<pending_block>::iterator it
2032 = std::find_if(m_download_queue.begin(), m_download_queue.end(), has_block(block));
2033 if (it == m_download_queue.end())
2035 std::deque<piece_block>::iterator rit = std::find(m_request_queue.begin()
2036 , m_request_queue.end(), block);
2038 // when a multi block is received, it is cancelled
2039 // from all peers, so if this one hasn't requested
2040 // the block, just ignore to cancel it.
2041 if (rit == m_request_queue.end()) return;
2043 t->picker().abort_download(block);
2044 m_request_queue.erase(rit);
2045 // since we found it in the request queue, it means it hasn't been
2046 // sent yet, so we don't have to send a cancel.
2047 return;
2050 int block_offset = block.block_index * t->block_size();
2051 int block_size
2052 = (std::min)(t->torrent_file().piece_size(block.piece_index)-block_offset,
2053 t->block_size());
2054 TORRENT_ASSERT(block_size > 0);
2055 TORRENT_ASSERT(block_size <= t->block_size());
2057 peer_request r;
2058 r.piece = block.piece_index;
2059 r.start = block_offset;
2060 r.length = block_size;
2062 #ifdef TORRENT_VERBOSE_LOGGING
2063 (*m_logger) << time_now_string()
2064 << " ==> CANCEL [ piece: " << block.piece_index << " | s: "
2065 << block_offset << " | l: " << block_size << " | " << block.block_index << " ]\n";
2066 #endif
2067 write_cancel(r);
2070 void peer_connection::send_choke()
2072 INVARIANT_CHECK;
2074 TORRENT_ASSERT(!m_peer_info || !m_peer_info->optimistically_unchoked);
2076 if (m_choked) return;
2077 write_choke();
2078 m_choked = true;
2080 #ifdef TORRENT_VERBOSE_LOGGING
2081 (*m_logger) << time_now_string() << " ==> CHOKE\n";
2082 #endif
2083 #ifndef NDEBUG
2084 m_last_choke = time_now();
2085 #endif
2086 m_num_invalid_requests = 0;
2088 // reject the requests we have in the queue
2089 // except the allowed fast pieces
2090 for (std::deque<peer_request>::iterator i = m_requests.begin();
2091 i != m_requests.end();)
2093 if (m_accept_fast.count(i->piece))
2095 ++i;
2096 continue;
2099 peer_request const& r = *i;
2100 write_reject_request(r);
2102 #ifdef TORRENT_VERBOSE_LOGGING
2103 (*m_logger) << time_now_string()
2104 << " ==> REJECT_PIECE [ "
2105 "piece: " << r.piece << " | "
2106 "s: " << r.start << " | "
2107 "l: " << r.length << " ]\n";
2108 #endif
2109 i = m_requests.erase(i);
2113 void peer_connection::send_unchoke()
2115 INVARIANT_CHECK;
2117 if (!m_choked) return;
2118 m_last_unchoke = time_now();
2119 write_unchoke();
2120 m_choked = false;
2122 #ifdef TORRENT_VERBOSE_LOGGING
2123 (*m_logger) << time_now_string() << " ==> UNCHOKE\n";
2124 #endif
2127 void peer_connection::send_interested()
2129 if (m_interesting) return;
2130 m_interesting = true;
2131 write_interested();
2133 #ifdef TORRENT_VERBOSE_LOGGING
2134 (*m_logger) << time_now_string() << " ==> INTERESTED\n";
2135 #endif
2138 void peer_connection::send_not_interested()
2140 if (!m_interesting) return;
2141 m_interesting = false;
2142 write_not_interested();
2144 m_became_uninteresting = time_now();
2146 #ifdef TORRENT_VERBOSE_LOGGING
2147 (*m_logger) << time_now_string() << " ==> NOT_INTERESTED\n";
2148 #endif
2149 disconnect_if_redundant();
2152 void peer_connection::send_block_requests()
2154 INVARIANT_CHECK;
2156 boost::shared_ptr<torrent> t = m_torrent.lock();
2157 TORRENT_ASSERT(t);
2159 if ((int)m_download_queue.size() >= m_desired_queue_size) return;
2161 bool empty_download_queue = m_download_queue.empty();
2163 while (!m_request_queue.empty()
2164 && (int)m_download_queue.size() < m_desired_queue_size)
2166 piece_block block = m_request_queue.front();
2168 int block_offset = block.block_index * t->block_size();
2169 int block_size = (std::min)(t->torrent_file().piece_size(
2170 block.piece_index) - block_offset, t->block_size());
2171 TORRENT_ASSERT(block_size > 0);
2172 TORRENT_ASSERT(block_size <= t->block_size());
2174 peer_request r;
2175 r.piece = block.piece_index;
2176 r.start = block_offset;
2177 r.length = block_size;
2179 m_request_queue.pop_front();
2180 if (t->is_seed()) continue;
2181 // this can happen if a block times out, is re-requested and
2182 // then arrives "unexpectedly"
2183 if (t->picker().is_finished(block) || t->picker().is_downloaded(block))
2184 continue;
2186 m_download_queue.push_back(block);
2188 #ifdef TORRENT_VERBOSE_LOGGING
2189 (*m_logger) << time_now_string()
2190 << " *** REQUEST-QUEUE** [ "
2191 "piece: " << block.piece_index << " | "
2192 "block: " << block.block_index << " ]\n";
2193 #endif
2195 // if we are requesting large blocks, merge the smaller
2196 // blocks that are in the same piece into larger requests
2197 if (m_request_large_blocks)
2199 int blocks_per_piece = t->torrent_file().piece_length() / t->block_size();
2201 while (!m_request_queue.empty())
2203 // check to see if this block is connected to the previous one
2204 // if it is, merge them, otherwise, break this merge loop
2205 piece_block const& front = m_request_queue.front();
2206 if (front.piece_index * blocks_per_piece + front.block_index
2207 != block.piece_index * blocks_per_piece + block.block_index + 1)
2208 break;
2209 block = m_request_queue.front();
2210 m_request_queue.pop_front();
2211 m_download_queue.push_back(block);
2213 #ifdef TORRENT_VERBOSE_LOGGING
2214 (*m_logger) << time_now_string()
2215 << " *** MERGING REQUEST ** [ "
2216 "piece: " << block.piece_index << " | "
2217 "block: " << block.block_index << " ]\n";
2218 #endif
2220 block_offset = block.block_index * t->block_size();
2221 block_size = (std::min)(t->torrent_file().piece_size(
2222 block.piece_index) - block_offset, t->block_size());
2223 TORRENT_ASSERT(block_size > 0);
2224 TORRENT_ASSERT(block_size <= t->block_size());
2226 r.length += block_size;
2230 TORRENT_ASSERT(verify_piece(r));
2232 #ifndef TORRENT_DISABLE_EXTENSIONS
2233 bool handled = false;
2234 for (extension_list_t::iterator i = m_extensions.begin()
2235 , end(m_extensions.end()); i != end; ++i)
2237 if (handled = (*i)->write_request(r)) break;
2239 if (is_disconnecting()) return;
2240 if (!handled)
2242 write_request(r);
2243 m_last_request = time_now();
2245 #else
2246 write_request(r);
2247 m_last_request = time_now();
2248 #endif
2250 #ifdef TORRENT_VERBOSE_LOGGING
2251 (*m_logger) << time_now_string()
2252 << " ==> REQUEST [ "
2253 "piece: " << r.piece << " | "
2254 "s: " << r.start << " | "
2255 "l: " << r.length << " | "
2256 "ds: " << statistics().download_rate() << " B/s | "
2257 "qs: " << m_desired_queue_size << " "
2258 "blk: " << (m_request_large_blocks?"large":"single") << " ]\n";
2259 #endif
2261 m_last_piece = time_now();
2263 if (!m_download_queue.empty()
2264 && empty_download_queue)
2266 // This means we just added a request to this connection
2267 m_requested = time_now();
2271 void peer_connection::timed_out()
2273 TORRENT_ASSERT(m_connecting);
2274 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2275 (*m_ses.m_logger) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote.address().to_string()
2276 << "\n";
2277 #endif
2278 disconnect("timed out: connect", 1);
2281 // the error argument defaults to 0, which means deliberate disconnect
2282 // 1 means unexpected disconnect/error
2283 // 2 protocol error (client sent something invalid)
2284 void peer_connection::disconnect(char const* message, int error)
2286 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2288 #ifndef NDEBUG
2289 m_disconnect_started = true;
2290 #endif
2292 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2293 switch (error)
2295 case 0:
2296 (*m_logger) << "*** CONNECTION CLOSED " << message << "\n";
2297 break;
2298 case 1:
2299 (*m_logger) << "*** CONNECTION FAILED " << message << "\n";
2300 break;
2301 case 2:
2302 (*m_logger) << "*** PEER ERROR " << message << "\n";
2303 break;
2305 #endif
2306 // we cannot do this in a constructor
2307 TORRENT_ASSERT(m_in_constructor == false);
2308 if (error > 0) m_failed = true;
2309 if (m_disconnecting) return;
2310 boost::intrusive_ptr<peer_connection> me(this);
2312 INVARIANT_CHECK;
2314 if (m_connecting && m_connection_ticket >= 0)
2316 m_ses.m_half_open.done(m_connection_ticket);
2317 m_connection_ticket = -1;
2320 boost::shared_ptr<torrent> t = m_torrent.lock();
2321 torrent_handle handle;
2322 if (t) handle = t->get_handle();
2324 if (message)
2326 if (error > 1 && m_ses.m_alerts.should_post<peer_error_alert>())
2328 m_ses.m_alerts.post_alert(
2329 peer_error_alert(handle, remote(), pid(), message));
2331 else if (error <= 1 && m_ses.m_alerts.should_post<peer_disconnected_alert>())
2333 m_ses.m_alerts.post_alert(
2334 peer_disconnected_alert(handle, remote(), pid(), message));
2338 if (t)
2340 // make sure we keep all the stats!
2341 calc_ip_overhead();
2342 t->add_stats(statistics());
2344 if (t->has_picker())
2346 piece_picker& picker = t->picker();
2348 while (!m_download_queue.empty())
2350 picker.abort_download(m_download_queue.back().block);
2351 m_download_queue.pop_back();
2353 while (!m_request_queue.empty())
2355 picker.abort_download(m_request_queue.back());
2356 m_request_queue.pop_back();
2360 t->remove_peer(this);
2361 m_torrent.reset();
2364 #ifndef NDEBUG
2365 // since this connection doesn't have a torrent reference
2366 // no torrent should have a reference to this connection either
2367 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
2368 , end(m_ses.m_torrents.end()); i != end; ++i)
2369 TORRENT_ASSERT(!i->second->has_peer(this));
2370 #endif
2372 m_disconnecting = true;
2373 error_code ec;
2374 m_socket->close(ec);
2375 m_ses.close_connection(this, message);
2378 void peer_connection::set_upload_limit(int limit)
2380 TORRENT_ASSERT(limit >= -1);
2381 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2382 if (limit < 10) limit = 10;
2383 m_upload_limit = limit;
2384 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2387 void peer_connection::set_download_limit(int limit)
2389 TORRENT_ASSERT(limit >= -1);
2390 if (limit == -1) limit = (std::numeric_limits<int>::max)();
2391 if (limit < 10) limit = 10;
2392 m_download_limit = limit;
2393 m_bandwidth_limit[download_channel].throttle(m_download_limit);
2396 size_type peer_connection::share_diff() const
2398 INVARIANT_CHECK;
2400 boost::shared_ptr<torrent> t = m_torrent.lock();
2401 TORRENT_ASSERT(t);
2403 float ratio = t->ratio();
2405 // if we have an infinite ratio, just say we have downloaded
2406 // much more than we have uploaded. And we'll keep uploading.
2407 if (ratio == 0.f)
2408 return (std::numeric_limits<size_type>::max)();
2410 return m_free_upload
2411 + static_cast<size_type>(m_statistics.total_payload_download() * ratio)
2412 - m_statistics.total_payload_upload();
2415 // defined in upnp.cpp
2416 bool is_local(address const& a);
2418 bool peer_connection::on_local_network() const
2420 if (libtorrent::is_local(m_remote.address())
2421 || is_loopback(m_remote.address())) return true;
2422 return false;
2425 void peer_connection::get_peer_info(peer_info& p) const
2427 TORRENT_ASSERT(!associated_torrent().expired());
2429 ptime now = time_now();
2431 p.download_rate_peak = m_download_rate_peak;
2432 p.upload_rate_peak = m_upload_rate_peak;
2433 p.rtt = m_rtt;
2434 p.down_speed = statistics().download_rate();
2435 p.up_speed = statistics().upload_rate();
2436 p.payload_down_speed = statistics().download_payload_rate();
2437 p.payload_up_speed = statistics().upload_payload_rate();
2438 p.pid = pid();
2439 p.ip = remote();
2440 p.pending_disk_bytes = m_outstanding_writing_bytes;
2441 p.send_quota = m_bandwidth_limit[upload_channel].quota_left();
2442 p.receive_quota = m_bandwidth_limit[download_channel].quota_left();
2443 if (m_download_queue.empty()) p.request_timeout = -1;
2444 else p.request_timeout = total_seconds(m_requested - now) + m_ses.settings().request_timeout
2445 + m_timeout_extend;
2446 #ifndef TORRENT_DISABLE_GEO_IP
2447 p.inet_as_name = m_inet_as_name;
2448 #endif
2450 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2451 p.country[0] = m_country[0];
2452 p.country[1] = m_country[1];
2453 #endif
2455 p.total_download = statistics().total_payload_download();
2456 p.total_upload = statistics().total_payload_upload();
2458 if (m_bandwidth_limit[upload_channel].throttle() == bandwidth_limit::inf)
2459 p.upload_limit = -1;
2460 else
2461 p.upload_limit = m_bandwidth_limit[upload_channel].throttle();
2463 if (m_bandwidth_limit[download_channel].throttle() == bandwidth_limit::inf)
2464 p.download_limit = -1;
2465 else
2466 p.download_limit = m_bandwidth_limit[download_channel].throttle();
2468 p.load_balancing = total_free_upload();
2470 p.download_queue_length = int(download_queue().size() + m_request_queue.size());
2471 p.requests_in_buffer = int(m_requests_in_buffer.size());
2472 p.target_dl_queue_length = int(desired_queue_size());
2473 p.upload_queue_length = int(upload_queue().size());
2475 if (boost::optional<piece_block_progress> ret = downloading_piece_progress())
2477 p.downloading_piece_index = ret->piece_index;
2478 p.downloading_block_index = ret->block_index;
2479 p.downloading_progress = ret->bytes_downloaded;
2480 p.downloading_total = ret->full_block_bytes;
2482 else
2484 p.downloading_piece_index = -1;
2485 p.downloading_block_index = -1;
2486 p.downloading_progress = 0;
2487 p.downloading_total = 0;
2490 p.pieces = get_bitfield();
2491 p.last_request = now - m_last_request;
2492 p.last_active = now - (std::max)(m_last_sent, m_last_receive);
2494 // this will set the flags so that we can update them later
2495 p.flags = 0;
2496 get_specific_peer_info(p);
2498 p.flags |= is_seed() ? peer_info::seed : 0;
2499 p.flags |= m_snubbed ? peer_info::snubbed : 0;
2500 p.flags |= m_upload_only ? peer_info::upload_only : 0;
2501 if (peer_info_struct())
2503 policy::peer* pi = peer_info_struct();
2504 p.source = pi->source;
2505 p.failcount = pi->failcount;
2506 p.num_hashfails = pi->hashfails;
2507 p.flags |= pi->on_parole ? peer_info::on_parole : 0;
2508 p.flags |= pi->optimistically_unchoked ? peer_info::optimistic_unchoke : 0;
2509 #ifndef TORRENT_DISABLE_GEO_IP
2510 p.inet_as = pi->inet_as->first;
2511 #endif
2513 else
2515 p.source = 0;
2516 p.failcount = 0;
2517 p.num_hashfails = 0;
2518 p.remote_dl_rate = 0;
2519 #ifndef TORRENT_DISABLE_GEO_IP
2520 p.inet_as = 0xffff;
2521 #endif
2524 p.remote_dl_rate = m_remote_dl_rate;
2525 p.send_buffer_size = m_send_buffer.capacity();
2526 p.used_send_buffer = m_send_buffer.size();
2527 p.receive_buffer_size = m_recv_buffer.capacity() + m_disk_recv_buffer_size;
2528 p.used_receive_buffer = m_recv_pos;
2529 p.write_state = m_channel_state[upload_channel];
2530 p.read_state = m_channel_state[download_channel];
2533 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2534 // end of the current receive buffer with it. i.e. the receive pos
2535 // must be <= packet_size - disk_buffer_size
2536 // the disk buffer can be accessed through release_disk_receive_buffer()
2537 // when it is queried, the responsibility to free it is transferred
2538 // to the caller
2539 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size)
2541 INVARIANT_CHECK;
2543 TORRENT_ASSERT(m_packet_size > 0);
2544 TORRENT_ASSERT(m_recv_pos <= m_packet_size - disk_buffer_size);
2545 TORRENT_ASSERT(!m_disk_recv_buffer);
2546 TORRENT_ASSERT(disk_buffer_size <= 16 * 1024);
2548 if (disk_buffer_size > 16 * 1024)
2550 disconnect("invalid piece size", 2);
2551 return false;
2554 m_disk_recv_buffer.reset(m_ses.allocate_disk_buffer());
2555 if (!m_disk_recv_buffer)
2557 disconnect("out of memory");
2558 return false;
2560 m_disk_recv_buffer_size = disk_buffer_size;
2561 return true;
2564 char* peer_connection::release_disk_receive_buffer()
2566 m_disk_recv_buffer_size = 0;
2567 return m_disk_recv_buffer.release();
2570 void peer_connection::cut_receive_buffer(int size, int packet_size)
2572 INVARIANT_CHECK;
2574 TORRENT_ASSERT(packet_size > 0);
2575 TORRENT_ASSERT(int(m_recv_buffer.size()) >= size);
2576 TORRENT_ASSERT(int(m_recv_buffer.size()) >= m_recv_pos);
2577 TORRENT_ASSERT(m_recv_pos >= size);
2579 if (size > 0)
2580 std::memmove(&m_recv_buffer[0], &m_recv_buffer[0] + size, m_recv_pos - size);
2582 m_recv_pos -= size;
2584 #ifndef NDEBUG
2585 std::fill(m_recv_buffer.begin() + m_recv_pos, m_recv_buffer.end(), 0);
2586 #endif
2588 m_packet_size = packet_size;
2591 void peer_connection::calc_ip_overhead()
2593 m_statistics.calc_ip_overhead();
2596 void peer_connection::second_tick(float tick_interval)
2598 ptime now(time_now());
2599 boost::intrusive_ptr<peer_connection> me(self());
2601 // the invariant check must be run before me is destructed
2602 // in case the peer got disconnected
2603 INVARIANT_CHECK;
2605 boost::shared_ptr<torrent> t = m_torrent.lock();
2606 if (!t || m_disconnecting)
2608 m_ses.m_half_open.done(m_connection_ticket);
2609 m_connecting = false;
2610 disconnect("torrent aborted");
2611 return;
2614 on_tick();
2616 #ifndef TORRENT_DISABLE_EXTENSIONS
2617 for (extension_list_t::iterator i = m_extensions.begin()
2618 , end(m_extensions.end()); i != end; ++i)
2620 (*i)->tick();
2622 if (is_disconnecting()) return;
2623 #endif
2625 // if the peer hasn't said a thing for a certain
2626 // time, it is considered to have timed out
2627 time_duration d;
2628 d = now - m_last_receive;
2629 if (d > seconds(m_timeout) && !m_connecting)
2631 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2632 (*m_logger) << time_now_string() << " *** LAST ACTIVITY [ "
2633 << total_seconds(d) << " seconds ago ] ***\n";
2634 #endif
2635 disconnect("timed out: inactivity");
2636 return;
2639 // do not stall waiting for a handshake
2640 if (!m_connecting
2641 && in_handshake()
2642 && d > seconds(m_ses.settings().handshake_timeout))
2644 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2645 (*m_logger) << time_now_string() << " *** NO HANDSHAKE [ waited "
2646 << total_seconds(d) << " seconds ] ***\n";
2647 #endif
2648 disconnect("timed out: no handshake");
2649 return;
2652 // disconnect peers that we unchoked, but
2653 // they didn't send a request within 20 seconds.
2654 // but only if we're a seed
2655 d = now - (std::max)(m_last_unchoke, m_last_incoming_request);
2656 if (!m_connecting
2657 && m_requests.empty()
2658 && !m_choked
2659 && m_peer_interested
2660 && t && t->is_finished()
2661 && d > seconds(20))
2663 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2664 (*m_logger) << time_now_string() << " *** NO REQUEST [ t: "
2665 << total_seconds(d) << " ] ***\n";
2666 #endif
2667 disconnect("timed out: no request when unchoked");
2668 return;
2671 // if the peer hasn't become interested and we haven't
2672 // become interested in the peer for 10 minutes, it
2673 // has also timed out.
2674 time_duration d1;
2675 time_duration d2;
2676 d1 = now - m_became_uninterested;
2677 d2 = now - m_became_uninteresting;
2678 time_duration time_limit = seconds(
2679 m_ses.settings().inactivity_timeout);
2681 // don't bother disconnect peers we haven't been interested
2682 // in (and that hasn't been interested in us) for a while
2683 // unless we have used up all our connection slots
2684 if (!m_interesting
2685 && !m_peer_interested
2686 && d1 > time_limit
2687 && d2 > time_limit
2688 && (m_ses.num_connections() >= m_ses.max_connections()
2689 || (t && t->num_peers() >= t->max_connections())))
2691 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2692 (*m_logger) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2693 "t1: " << total_seconds(d1) << " | "
2694 "t2: " << total_seconds(d2) << " ] ***\n";
2695 #endif
2696 disconnect("timed out: no interest");
2697 return;
2700 if (!m_download_queue.empty()
2701 && now > m_requested + seconds(m_ses.settings().request_timeout
2702 + m_timeout_extend))
2704 snub_peer();
2707 // if we haven't sent something in too long, send a keep-alive
2708 keep_alive();
2710 m_ignore_bandwidth_limits = m_ses.settings().ignore_limits_on_local_network
2711 && on_local_network();
2713 m_statistics.second_tick(tick_interval);
2715 if (m_statistics.upload_payload_rate() > m_upload_rate_peak)
2717 m_upload_rate_peak = m_statistics.upload_payload_rate();
2719 if (m_statistics.download_payload_rate() > m_download_rate_peak)
2721 m_download_rate_peak = m_statistics.download_payload_rate();
2722 #ifndef TORRENT_DISABLE_GEO_IP
2723 if (peer_info_struct())
2725 std::pair<const int, int>* as_stats = peer_info_struct()->inet_as;
2726 if (as_stats && as_stats->second < m_download_rate_peak)
2727 as_stats->second = m_download_rate_peak;
2729 #endif
2731 if (is_disconnecting()) return;
2733 if (!t->valid_metadata()) return;
2735 // calculate the desired download queue size
2736 const float queue_time = m_ses.settings().request_queue_time;
2737 // (if the latency is more than this, the download will stall)
2738 // so, the queue size is queue_time * down_rate / 16 kiB
2739 // (16 kB is the size of each request)
2740 // the minimum number of requests is 2 and the maximum is 48
2741 // the block size doesn't have to be 16. So we first query the
2742 // torrent for it
2743 const int block_size = m_request_large_blocks
2744 ? t->torrent_file().piece_length() : t->block_size();
2745 TORRENT_ASSERT(block_size > 0);
2747 if (m_snubbed)
2749 m_desired_queue_size = 1;
2751 else
2753 m_desired_queue_size = static_cast<int>(queue_time
2754 * statistics().download_rate() / block_size);
2755 if (m_desired_queue_size > m_max_out_request_queue)
2756 m_desired_queue_size = m_max_out_request_queue;
2757 if (m_desired_queue_size < min_request_queue)
2758 m_desired_queue_size = min_request_queue;
2761 if (!m_download_queue.empty()
2762 && now - m_last_piece > seconds(m_ses.settings().piece_timeout
2763 + m_timeout_extend))
2765 // this peer isn't sending the pieces we've
2766 // requested (this has been observed by BitComet)
2767 // in this case we'll clear our download queue and
2768 // re-request the blocks.
2769 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2770 (*m_logger) << time_now_string()
2771 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue.size()
2772 << " " << total_seconds(now - m_last_piece) << "] ***\n";
2773 #endif
2775 snub_peer();
2778 // If the client sends more data
2779 // we send it data faster, otherwise, slower.
2780 // It will also depend on how much data the
2781 // client has sent us. This is the mean to
2782 // maintain the share ratio given by m_ratio
2783 // with all peers.
2785 if (t->is_finished() || is_choked() || t->ratio() == 0.0f)
2787 // if we have downloaded more than one piece more
2788 // than we have uploaded OR if we are a seed
2789 // have an unlimited upload rate
2790 m_bandwidth_limit[upload_channel].throttle(m_upload_limit);
2792 else
2794 size_type bias = 0x10000 + 2 * t->block_size() + m_free_upload;
2796 double break_even_time = 15; // seconds.
2797 size_type have_uploaded = m_statistics.total_payload_upload();
2798 size_type have_downloaded = m_statistics.total_payload_download();
2799 double download_speed = m_statistics.download_rate();
2801 size_type soon_downloaded =
2802 have_downloaded + (size_type)(download_speed * break_even_time*1.5);
2804 if (t->ratio() != 1.f)
2805 soon_downloaded = (size_type)(soon_downloaded*(double)t->ratio());
2807 double upload_speed_limit = (std::min)((soon_downloaded - have_uploaded
2808 + bias) / break_even_time, double(m_upload_limit));
2810 upload_speed_limit = (std::min)(upload_speed_limit,
2811 (double)(std::numeric_limits<int>::max)());
2813 m_bandwidth_limit[upload_channel].throttle(
2814 (std::min)((std::max)((int)upload_speed_limit, 20)
2815 , m_upload_limit));
2818 // update once every minute
2819 if (now - m_remote_dl_update >= seconds(60))
2821 float factor = 0.6666666666667f;
2823 if (m_remote_dl_rate == 0) factor = 0.0f;
2825 m_remote_dl_rate = int((m_remote_dl_rate * factor) +
2826 ((m_remote_bytes_dled * (1.0f-factor)) / 60.f));
2828 m_remote_bytes_dled = 0;
2829 m_remote_dl_update = now;
2832 fill_send_buffer();
2835 void peer_connection::snub_peer()
2837 INVARIANT_CHECK;
2839 boost::shared_ptr<torrent> t = m_torrent.lock();
2840 TORRENT_ASSERT(t);
2842 if (!m_snubbed)
2844 m_snubbed = true;
2845 if (m_ses.m_alerts.should_post<peer_snubbed_alert>())
2847 m_ses.m_alerts.post_alert(peer_snubbed_alert(t->get_handle()
2848 , m_remote, m_peer_id));
2851 m_desired_queue_size = 1;
2853 if (on_parole())
2855 m_timeout_extend += m_ses.settings().request_timeout;
2856 return;
2858 if (!t->has_picker()) return;
2859 piece_picker& picker = t->picker();
2861 piece_block r(-1, -1);
2862 // time out the last request in the queue
2863 if (!m_request_queue.empty())
2865 r = m_request_queue.back();
2866 m_request_queue.pop_back();
2868 else
2870 TORRENT_ASSERT(!m_download_queue.empty());
2871 r = m_download_queue.back().block;
2873 // only time out a request if it blocks the piece
2874 // from being completed (i.e. no free blocks to
2875 // request from it)
2876 piece_picker::downloading_piece p;
2877 picker.piece_info(r.piece_index, p);
2878 int free_blocks = picker.blocks_in_piece(r.piece_index)
2879 - p.finished - p.writing - p.requested;
2880 if (free_blocks > 0)
2882 m_timeout_extend += m_ses.settings().request_timeout;
2883 return;
2886 if (m_ses.m_alerts.should_post<block_timeout_alert>())
2888 m_ses.m_alerts.post_alert(block_timeout_alert(t->get_handle()
2889 , remote(), pid(), r.block_index, r.piece_index));
2891 m_download_queue.pop_back();
2893 if (!m_download_queue.empty() || !m_request_queue.empty())
2894 m_timeout_extend += m_ses.settings().request_timeout;
2896 m_desired_queue_size = 2;
2897 request_a_block(*t, *this);
2898 m_desired_queue_size = 1;
2900 // abort the block after the new one has
2901 // been requested in order to prevent it from
2902 // picking the same block again, stalling the
2903 // same piece indefinitely.
2904 if (r != piece_block(-1, -1))
2905 picker.abort_download(r);
2907 send_block_requests();
2910 void peer_connection::fill_send_buffer()
2912 INVARIANT_CHECK;
2914 boost::shared_ptr<torrent> t = m_torrent.lock();
2915 if (!t) return;
2917 // only add new piece-chunks if the send buffer is small enough
2918 // otherwise there will be no end to how large it will be!
2920 int buffer_size_watermark = int(m_statistics.upload_rate()) / 2;
2921 if (buffer_size_watermark < 512) buffer_size_watermark = 512;
2922 else if (buffer_size_watermark > m_ses.settings().send_buffer_watermark)
2923 buffer_size_watermark = m_ses.settings().send_buffer_watermark;
2925 while (!m_requests.empty()
2926 && (send_buffer_size() + m_reading_bytes < buffer_size_watermark))
2928 TORRENT_ASSERT(t->valid_metadata());
2929 peer_request& r = m_requests.front();
2931 TORRENT_ASSERT(r.piece >= 0);
2932 TORRENT_ASSERT(r.piece < (int)m_have_piece.size());
2933 TORRENT_ASSERT(t->have_piece(r.piece));
2934 TORRENT_ASSERT(r.start + r.length <= t->torrent_file().piece_size(r.piece));
2935 TORRENT_ASSERT(r.length > 0 && r.start >= 0);
2937 t->filesystem().async_read(r, bind(&peer_connection::on_disk_read_complete
2938 , self(), _1, _2, r));
2939 m_reading_bytes += r.length;
2941 m_requests.erase(m_requests.begin());
2945 void peer_connection::on_disk_read_complete(int ret, disk_io_job const& j, peer_request r)
2947 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2949 m_reading_bytes -= r.length;
2951 disk_buffer_holder buffer(m_ses, j.buffer);
2953 if (ret != r.length || m_torrent.expired())
2955 boost::shared_ptr<torrent> t = m_torrent.lock();
2956 if (!t)
2958 disconnect(j.str.c_str());
2959 return;
2962 if (t->alerts().should_post<file_error_alert>())
2963 t->alerts().post_alert(file_error_alert(j.error_file, t->get_handle(), j.str));
2964 t->set_error(j.str);
2965 t->pause();
2966 return;
2969 #ifdef TORRENT_VERBOSE_LOGGING
2970 (*m_logger) << time_now_string()
2971 << " ==> PIECE [ piece: " << r.piece << " | s: " << r.start
2972 << " | l: " << r.length << " ]\n";
2973 #endif
2975 write_piece(r, buffer);
2976 setup_send();
2979 void peer_connection::assign_bandwidth(int channel, int amount)
2981 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
2983 #ifdef TORRENT_VERBOSE_LOGGING
2984 (*m_logger) << "bandwidth [ " << channel << " ] + " << amount << "\n";
2985 #endif
2987 m_bandwidth_limit[channel].assign(amount);
2988 TORRENT_ASSERT(m_channel_state[channel] == peer_info::bw_global);
2989 m_channel_state[channel] = peer_info::bw_idle;
2990 if (channel == upload_channel)
2992 setup_send();
2994 else if (channel == download_channel)
2996 setup_receive();
3000 void peer_connection::expire_bandwidth(int channel, int amount)
3002 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3004 m_bandwidth_limit[channel].expire(amount);
3005 if (channel == upload_channel)
3007 setup_send();
3009 else if (channel == download_channel)
3011 setup_receive();
3015 void peer_connection::setup_send()
3017 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3019 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3021 shared_ptr<torrent> t = m_torrent.lock();
3023 if (m_bandwidth_limit[upload_channel].quota_left() == 0
3024 && !m_send_buffer.empty()
3025 && !m_connecting
3026 && t
3027 && !m_ignore_bandwidth_limits)
3029 // in this case, we have data to send, but no
3030 // bandwidth. So, we simply request bandwidth
3031 // from the torrent
3032 TORRENT_ASSERT(t);
3033 if (m_bandwidth_limit[upload_channel].max_assignable() > 0)
3035 int priority = is_interesting() * 2 + m_requests_in_buffer.size();
3036 // peers that we are not interested in are non-prioritized
3037 m_channel_state[upload_channel] = peer_info::bw_torrent;
3038 t->request_bandwidth(upload_channel, self()
3039 , m_send_buffer.size(), priority);
3040 #ifdef TORRENT_VERBOSE_LOGGING
3041 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3042 << priority << "]\n";
3043 #endif
3046 return;
3049 if (!can_write())
3051 #ifdef TORRENT_VERBOSE_LOGGING
3052 (*m_logger) << time_now_string() << " *** CANNOT WRITE ["
3053 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3054 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3055 " buf: " << m_send_buffer.size() <<
3056 " connecting: " << (m_connecting?"yes":"no") <<
3057 " ]\n";
3058 #endif
3059 return;
3062 // send the actual buffer
3063 if (!m_send_buffer.empty())
3065 int amount_to_send = m_send_buffer.size();
3066 int quota_left = m_bandwidth_limit[upload_channel].quota_left();
3067 if (!m_ignore_bandwidth_limits && amount_to_send > quota_left)
3068 amount_to_send = quota_left;
3070 TORRENT_ASSERT(amount_to_send > 0);
3072 #ifdef TORRENT_VERBOSE_LOGGING
3073 (*m_logger) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send << " ]\n";
3074 #endif
3075 std::list<asio::const_buffer> const& vec = m_send_buffer.build_iovec(amount_to_send);
3076 m_socket->async_write_some(vec, bind(&peer_connection::on_send_data, self(), _1, _2));
3078 m_channel_state[upload_channel] = peer_info::bw_network;
3082 void peer_connection::setup_receive()
3084 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3086 INVARIANT_CHECK;
3088 if (m_channel_state[download_channel] != peer_info::bw_idle) return;
3090 shared_ptr<torrent> t = m_torrent.lock();
3092 if (m_bandwidth_limit[download_channel].quota_left() == 0
3093 && !m_connecting
3094 && t
3095 && !m_ignore_bandwidth_limits)
3097 if (m_bandwidth_limit[download_channel].max_assignable() > 0)
3099 #ifdef TORRENT_VERBOSE_LOGGING
3100 (*m_logger) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3101 #endif
3102 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_idle);
3103 m_channel_state[download_channel] = peer_info::bw_torrent;
3104 t->request_bandwidth(download_channel, self()
3105 , m_download_queue.size() * 16 * 1024 + 30, m_priority);
3107 return;
3110 if (!can_read())
3112 #ifdef TORRENT_VERBOSE_LOGGING
3113 (*m_logger) << time_now_string() << " *** CANNOT READ ["
3114 " quota: " << m_bandwidth_limit[download_channel].quota_left() <<
3115 " ignore: " << (m_ignore_bandwidth_limits?"yes":"no") <<
3116 " outstanding: " << m_outstanding_writing_bytes <<
3117 " outstanding-limit: " << m_ses.settings().max_outstanding_disk_bytes_per_connection <<
3118 " ]\n";
3119 #endif
3120 return;
3123 TORRENT_ASSERT(m_packet_size > 0);
3124 int max_receive = m_packet_size - m_recv_pos;
3125 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3126 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3127 max_receive = quota_left;
3129 if (max_receive == 0) return;
3131 TORRENT_ASSERT(m_recv_pos >= 0);
3132 TORRENT_ASSERT(m_packet_size > 0);
3133 TORRENT_ASSERT(can_read());
3134 #ifdef TORRENT_VERBOSE_LOGGING
3135 (*m_logger) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive << " bytes ]\n";
3136 #endif
3138 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3140 if (int(m_recv_buffer.size()) < regular_buffer_size)
3141 m_recv_buffer.resize(regular_buffer_size);
3143 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3145 // only receive into regular buffer
3146 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3147 m_socket->async_read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3148 , max_receive), bind(&peer_connection::on_receive_data, self(), _1, _2));
3150 else if (m_recv_pos >= regular_buffer_size)
3152 // only receive into disk buffer
3153 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3154 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3155 m_socket->async_read_some(asio::buffer(m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size
3156 , max_receive)
3157 , bind(&peer_connection::on_receive_data, self(), _1, _2));
3159 else
3161 // receive into both regular and disk buffer
3162 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3163 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3164 TORRENT_ASSERT(max_receive - regular_buffer_size
3165 + m_recv_pos <= m_disk_recv_buffer_size);
3167 boost::array<asio::mutable_buffer, 2> vec;
3168 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3169 , regular_buffer_size - m_recv_pos);
3170 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3171 , max_receive - regular_buffer_size + m_recv_pos);
3172 m_socket->async_read_some(vec, bind(&peer_connection::on_receive_data
3173 , self(), _1, _2));
3175 m_channel_state[download_channel] = peer_info::bw_network;
3178 #ifndef TORRENT_DISABLE_ENCRYPTION
3180 // returns the last 'bytes' from the receive buffer
3181 std::pair<buffer::interval, buffer::interval> peer_connection::wr_recv_buffers(int bytes)
3183 TORRENT_ASSERT(bytes <= m_recv_pos);
3185 std::pair<buffer::interval, buffer::interval> vec;
3186 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3187 TORRENT_ASSERT(regular_buffer_size >= 0);
3188 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos)
3190 vec.first = buffer::interval(&m_recv_buffer[0]
3191 + m_recv_pos - bytes, &m_recv_buffer[0] + m_recv_pos);
3192 vec.second = buffer::interval(0,0);
3194 else if (m_recv_pos - bytes >= regular_buffer_size)
3196 vec.first = buffer::interval(m_disk_recv_buffer.get() + m_recv_pos
3197 - regular_buffer_size - bytes, m_disk_recv_buffer.get() + m_recv_pos
3198 - regular_buffer_size);
3199 vec.second = buffer::interval(0,0);
3201 else
3203 TORRENT_ASSERT(m_recv_pos - bytes < regular_buffer_size);
3204 TORRENT_ASSERT(m_recv_pos > regular_buffer_size);
3205 vec.first = buffer::interval(&m_recv_buffer[0] + m_recv_pos - bytes
3206 , &m_recv_buffer[0] + regular_buffer_size);
3207 vec.second = buffer::interval(m_disk_recv_buffer.get()
3208 , m_disk_recv_buffer.get() + m_recv_pos - regular_buffer_size);
3210 TORRENT_ASSERT(vec.first.left() + vec.second.left() == bytes);
3211 return vec;
3213 #endif
3215 void peer_connection::reset_recv_buffer(int packet_size)
3217 TORRENT_ASSERT(packet_size > 0);
3218 if (m_recv_pos > m_packet_size)
3220 cut_receive_buffer(m_packet_size, packet_size);
3221 return;
3223 m_recv_pos = 0;
3224 m_packet_size = packet_size;
3227 void peer_connection::send_buffer(char const* buf, int size, int flags)
3229 if (flags == message_type_request)
3230 m_requests_in_buffer.push_back(m_send_buffer.size() + size);
3232 int free_space = m_send_buffer.space_in_last_buffer();
3233 if (free_space > size) free_space = size;
3234 if (free_space > 0)
3236 m_send_buffer.append(buf, free_space);
3237 size -= free_space;
3238 buf += free_space;
3239 #ifdef TORRENT_STATS
3240 m_ses.m_buffer_usage_logger << log_time() << " send_buffer: "
3241 << free_space << std::endl;
3242 m_ses.log_buffer_usage();
3243 #endif
3245 if (size <= 0) return;
3247 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3248 if (buffer.first == 0)
3250 disconnect("out of memory");
3251 return;
3253 TORRENT_ASSERT(buffer.second >= size);
3254 std::memcpy(buffer.first, buf, size);
3255 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3256 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3257 #ifdef TORRENT_STATS
3258 m_ses.m_buffer_usage_logger << log_time() << " send_buffer_alloc: " << size << std::endl;
3259 m_ses.log_buffer_usage();
3260 #endif
3261 setup_send();
3264 // TODO: change this interface to automatically call setup_send() when the
3265 // return value is destructed
3266 buffer::interval peer_connection::allocate_send_buffer(int size)
3268 TORRENT_ASSERT(size > 0);
3269 char* insert = m_send_buffer.allocate_appendix(size);
3270 if (insert == 0)
3272 std::pair<char*, int> buffer = m_ses.allocate_buffer(size);
3273 if (buffer.first == 0)
3275 disconnect("out of memory");
3276 return buffer::interval(0, 0);
3278 TORRENT_ASSERT(buffer.second >= size);
3279 m_send_buffer.append_buffer(buffer.first, buffer.second, size
3280 , bind(&session_impl::free_buffer, boost::ref(m_ses), _1, buffer.second));
3281 buffer::interval ret(buffer.first, buffer.first + size);
3282 #ifdef TORRENT_STATS
3283 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer_alloc: " << size << std::endl;
3284 m_ses.log_buffer_usage();
3285 #endif
3286 return ret;
3288 else
3290 #ifdef TORRENT_STATS
3291 m_ses.m_buffer_usage_logger << log_time() << " allocate_buffer: " << size << std::endl;
3292 m_ses.log_buffer_usage();
3293 #endif
3294 buffer::interval ret(insert, insert + size);
3295 return ret;
3299 template<class T>
3300 struct set_to_zero
3302 set_to_zero(T& v, bool cond): m_val(v), m_cond(cond) {}
3303 void fire() { if (!m_cond) return; m_cond = false; m_val = 0; }
3304 ~set_to_zero() { if (m_cond) m_val = 0; }
3305 private:
3306 T& m_val;
3307 bool m_cond;
3310 // --------------------------
3311 // RECEIVE DATA
3312 // --------------------------
3314 // throws exception when the client should be disconnected
3315 void peer_connection::on_receive_data(const error_code& error
3316 , std::size_t bytes_transferred)
3318 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3320 INVARIANT_CHECK;
3322 TORRENT_ASSERT(m_channel_state[download_channel] == peer_info::bw_network);
3323 m_channel_state[download_channel] = peer_info::bw_idle;
3325 if (error)
3327 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3328 (*m_logger) << time_now_string() << " **ERROR**: "
3329 << error.message() << "[in peer_connection::on_receive_data]\n";
3330 #endif
3331 on_receive(error, bytes_transferred);
3332 disconnect(error.message().c_str());
3333 return;
3336 int max_receive = 0;
3339 #ifdef TORRENT_VERBOSE_LOGGING
3340 (*m_logger) << "read " << bytes_transferred << " bytes\n";
3341 #endif
3342 // correct the dl quota usage, if not all of the buffer was actually read
3343 if (!m_ignore_bandwidth_limits)
3344 m_bandwidth_limit[download_channel].use_quota(bytes_transferred);
3346 if (m_disconnecting) return;
3348 TORRENT_ASSERT(m_packet_size > 0);
3349 TORRENT_ASSERT(bytes_transferred > 0);
3351 m_last_receive = time_now();
3352 m_recv_pos += bytes_transferred;
3353 TORRENT_ASSERT(m_recv_pos <= int(m_recv_buffer.size()
3354 + m_disk_recv_buffer_size));
3356 on_receive(error, bytes_transferred);
3358 TORRENT_ASSERT(m_packet_size > 0);
3360 if (m_peer_choked
3361 && m_recv_pos == 0
3362 && (m_recv_buffer.capacity() - m_packet_size) > 128)
3364 buffer(m_packet_size).swap(m_recv_buffer);
3367 max_receive = m_packet_size - m_recv_pos;
3368 int quota_left = m_bandwidth_limit[download_channel].quota_left();
3369 if (!m_ignore_bandwidth_limits && max_receive > quota_left)
3370 max_receive = quota_left;
3372 if (max_receive == 0) break;
3374 int regular_buffer_size = m_packet_size - m_disk_recv_buffer_size;
3376 if (int(m_recv_buffer.size()) < regular_buffer_size)
3377 m_recv_buffer.resize(regular_buffer_size);
3379 error_code ec;
3380 if (!m_disk_recv_buffer || regular_buffer_size >= m_recv_pos + max_receive)
3382 // only receive into regular buffer
3383 TORRENT_ASSERT(m_recv_pos + max_receive <= int(m_recv_buffer.size()));
3384 bytes_transferred = m_socket->read_some(asio::buffer(&m_recv_buffer[m_recv_pos]
3385 , max_receive), ec);
3387 else if (m_recv_pos >= regular_buffer_size)
3389 // only receive into disk buffer
3390 TORRENT_ASSERT(m_recv_pos - regular_buffer_size >= 0);
3391 TORRENT_ASSERT(m_recv_pos - regular_buffer_size + max_receive <= m_disk_recv_buffer_size);
3392 bytes_transferred = m_socket->read_some(asio::buffer(m_disk_recv_buffer.get()
3393 + m_recv_pos - regular_buffer_size, (std::min)(m_packet_size
3394 - m_recv_pos, max_receive)), ec);
3396 else
3398 // receive into both regular and disk buffer
3399 TORRENT_ASSERT(max_receive + m_recv_pos > regular_buffer_size);
3400 TORRENT_ASSERT(m_recv_pos < regular_buffer_size);
3401 TORRENT_ASSERT(max_receive - regular_buffer_size
3402 + m_recv_pos <= m_disk_recv_buffer_size);
3404 boost::array<asio::mutable_buffer, 2> vec;
3405 vec[0] = asio::buffer(&m_recv_buffer[m_recv_pos]
3406 , regular_buffer_size - m_recv_pos);
3407 vec[1] = asio::buffer(m_disk_recv_buffer.get()
3408 , (std::min)(m_disk_recv_buffer_size
3409 , max_receive - regular_buffer_size + m_recv_pos));
3410 bytes_transferred = m_socket->read_some(vec, ec);
3412 if (ec && ec != asio::error::would_block)
3414 disconnect(ec.message().c_str());
3415 return;
3417 if (ec == asio::error::would_block) break;
3419 while (bytes_transferred > 0);
3421 setup_receive();
3424 bool peer_connection::can_write() const
3426 // if we have requests or pending data to be sent or announcements to be made
3427 // we want to send data
3428 return !m_send_buffer.empty()
3429 && (m_bandwidth_limit[upload_channel].quota_left() > 0
3430 || m_ignore_bandwidth_limits)
3431 && !m_connecting;
3434 bool peer_connection::can_read() const
3436 bool ret = (m_bandwidth_limit[download_channel].quota_left() > 0
3437 || m_ignore_bandwidth_limits)
3438 && !m_connecting
3439 && m_outstanding_writing_bytes <
3440 m_ses.settings().max_outstanding_disk_bytes_per_connection;
3442 return ret;
3445 void peer_connection::connect(int ticket)
3447 INVARIANT_CHECK;
3449 error_code ec;
3450 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3451 (*m_ses.m_logger) << time_now_string() << " CONNECTING: " << m_remote.address().to_string(ec)
3452 << ":" << m_remote.port() << "\n";
3453 #endif
3455 m_connection_ticket = ticket;
3456 boost::shared_ptr<torrent> t = m_torrent.lock();
3458 m_queued = false;
3459 TORRENT_ASSERT(m_connecting);
3461 if (!t)
3463 disconnect("torrent aborted");
3464 return;
3467 m_socket->open(t->get_interface().protocol(), ec);
3468 if (ec)
3470 disconnect(ec.message().c_str());
3471 return;
3474 // set the socket to non-blocking, so that we can
3475 // read the entire buffer on each read event we get
3476 tcp::socket::non_blocking_io ioc(true);
3477 m_socket->io_control(ioc, ec);
3478 if (ec)
3480 disconnect(ec.message().c_str());
3481 return;
3483 m_socket->bind(t->get_interface(), ec);
3484 if (ec)
3486 disconnect(ec.message().c_str());
3487 return;
3489 m_socket->async_connect(m_remote
3490 , bind(&peer_connection::on_connection_complete, self(), _1));
3491 m_connect = time_now();
3493 if (t->alerts().should_post<peer_connect_alert>())
3495 t->alerts().post_alert(peer_connect_alert(
3496 t->get_handle(), remote(), pid()));
3500 void peer_connection::on_connection_complete(error_code const& e)
3502 ptime completed = time_now();
3504 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3506 INVARIANT_CHECK;
3508 m_rtt = total_milliseconds(completed - m_connect);
3510 if (m_disconnecting) return;
3512 m_connecting = false;
3513 m_ses.m_half_open.done(m_connection_ticket);
3515 if (e)
3517 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3518 (*m_ses.m_logger) << time_now_string() << " CONNECTION FAILED: " << m_remote.address().to_string()
3519 << ": " << e.message() << "\n";
3520 #endif
3521 disconnect(e.message().c_str(), 1);
3522 return;
3525 if (m_disconnecting) return;
3526 m_last_receive = time_now();
3528 // this means the connection just succeeded
3530 TORRENT_ASSERT(m_socket);
3531 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3532 (*m_ses.m_logger) << time_now_string() << " COMPLETED: " << m_remote.address().to_string()
3533 << " rtt = " << m_rtt << "\n";
3534 #endif
3536 error_code ec;
3537 if (m_remote == m_socket->local_endpoint(ec))
3539 // if the remote endpoint is the same as the local endpoint, we're connected
3540 // to ourselves
3541 disconnect("connected to ourselves", 1);
3542 return;
3545 if (m_remote.address().is_v4())
3547 error_code ec;
3548 m_socket->set_option(type_of_service(m_ses.settings().peer_tos), ec);
3551 on_connected();
3552 setup_send();
3553 setup_receive();
3556 // --------------------------
3557 // SEND DATA
3558 // --------------------------
3560 // throws exception when the client should be disconnected
3561 void peer_connection::on_send_data(error_code const& error
3562 , std::size_t bytes_transferred)
3564 session_impl::mutex_t::scoped_lock l(m_ses.m_mutex);
3566 INVARIANT_CHECK;
3568 TORRENT_ASSERT(m_channel_state[upload_channel] == peer_info::bw_network);
3570 m_send_buffer.pop_front(bytes_transferred);
3572 for (std::vector<int>::iterator i = m_requests_in_buffer.begin()
3573 , end(m_requests_in_buffer.end()); i != end; ++i)
3574 *i -= bytes_transferred;
3576 while (!m_requests_in_buffer.empty()
3577 && m_requests_in_buffer.front() <= 0)
3578 m_requests_in_buffer.erase(m_requests_in_buffer.begin());
3580 m_channel_state[upload_channel] = peer_info::bw_idle;
3582 if (!m_ignore_bandwidth_limits)
3583 m_bandwidth_limit[upload_channel].use_quota(bytes_transferred);
3585 #ifdef TORRENT_VERBOSE_LOGGING
3586 (*m_logger) << "wrote " << bytes_transferred << " bytes\n";
3587 #endif
3589 if (error)
3591 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3592 (*m_logger) << "**ERROR**: " << error.message() << " [in peer_connection::on_send_data]\n";
3593 #endif
3594 disconnect(error.message().c_str());
3595 return;
3597 if (m_disconnecting) return;
3599 TORRENT_ASSERT(!m_connecting);
3600 TORRENT_ASSERT(bytes_transferred > 0);
3602 m_last_sent = time_now();
3604 on_sent(error, bytes_transferred);
3605 fill_send_buffer();
3607 setup_send();
3610 #ifndef NDEBUG
3611 void peer_connection::check_invariant() const
3613 TORRENT_ASSERT(bool(m_disk_recv_buffer) == (m_disk_recv_buffer_size > 0));
3615 boost::shared_ptr<torrent> t = m_torrent.lock();
3616 if (m_disconnecting)
3618 TORRENT_ASSERT(!t);
3619 TORRENT_ASSERT(m_disconnect_started);
3621 else if (!m_in_constructor)
3623 TORRENT_ASSERT(m_ses.has_peer((peer_connection*)this));
3626 for (int i = 0; i < 2; ++i)
3628 // this peer is in the bandwidth history iff max_assignable < limit
3629 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3630 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3631 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3634 if (m_channel_state[download_channel] == peer_info::bw_torrent
3635 || m_channel_state[download_channel] == peer_info::bw_global)
3636 TORRENT_ASSERT(m_bandwidth_limit[download_channel].quota_left() == 0);
3637 if (m_channel_state[upload_channel] == peer_info::bw_torrent
3638 || m_channel_state[upload_channel] == peer_info::bw_global)
3639 TORRENT_ASSERT(m_bandwidth_limit[upload_channel].quota_left() == 0);
3641 std::set<piece_block> unique;
3642 std::transform(m_download_queue.begin(), m_download_queue.end()
3643 , std::inserter(unique, unique.begin()), boost::bind(&pending_block::block, _1));
3644 std::copy(m_request_queue.begin(), m_request_queue.end(), std::inserter(unique, unique.begin()));
3645 TORRENT_ASSERT(unique.size() == m_download_queue.size() + m_request_queue.size());
3646 if (m_peer_info)
3648 TORRENT_ASSERT(m_peer_info->prev_amount_upload == 0);
3649 TORRENT_ASSERT(m_peer_info->prev_amount_download == 0);
3650 TORRENT_ASSERT(m_peer_info->connection == this
3651 || m_peer_info->connection == 0);
3653 if (m_peer_info->optimistically_unchoked)
3654 TORRENT_ASSERT(!is_choked());
3657 if (!t)
3659 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3660 // since this connection doesn't have a torrent reference
3661 // no torrent should have a reference to this connection either
3662 for (aux::session_impl::torrent_map::const_iterator i = m_ses.m_torrents.begin()
3663 , end(m_ses.m_torrents.end()); i != end; ++i)
3664 TORRENT_ASSERT(!i->second->has_peer((peer_connection*)this));
3665 #endif
3666 return;
3669 if (m_ses.settings().close_redundant_connections)
3671 // make sure upload only peers are disconnected
3672 if (t->is_finished() && m_upload_only)
3673 TORRENT_ASSERT(m_disconnect_started);
3674 if (m_upload_only
3675 && !m_interesting
3676 && m_bitfield_received
3677 && t->are_files_checked())
3678 TORRENT_ASSERT(m_disconnect_started);
3681 if (t->is_finished())
3682 TORRENT_ASSERT(!m_interesting);
3683 if (is_seed())
3684 TORRENT_ASSERT(m_upload_only);
3686 if (t->has_picker())
3688 std::map<piece_block, int> num_requests;
3689 for (torrent::const_peer_iterator i = t->begin(); i != t->end(); ++i)
3691 // make sure this peer is not a dangling pointer
3692 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3693 TORRENT_ASSERT(m_ses.has_peer(*i));
3694 #endif
3695 peer_connection const& p = *(*i);
3696 for (std::deque<piece_block>::const_iterator i = p.request_queue().begin()
3697 , end(p.request_queue().end()); i != end; ++i)
3698 ++num_requests[*i];
3699 for (std::deque<pending_block>::const_iterator i = p.download_queue().begin()
3700 , end(p.download_queue().end()); i != end; ++i)
3701 ++num_requests[i->block];
3703 for (std::map<piece_block, int>::iterator i = num_requests.begin()
3704 , end(num_requests.end()); i != end; ++i)
3706 if (!t->picker().is_downloaded(i->first))
3707 TORRENT_ASSERT(t->picker().num_peers(i->first) == i->second);
3710 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3711 if (m_peer_info)
3713 policy::const_iterator i;
3714 for (i = t->get_policy().begin_peer()
3715 , end(t->get_policy().end_peer()); i != end; ++i)
3717 if (&i->second == m_peer_info) break;
3719 TORRENT_ASSERT(i != t->get_policy().end_peer());
3721 #endif
3722 if (t->has_picker() && !t->is_aborted())
3724 // make sure that pieces that have completed the download
3725 // of all their blocks are in the disk io thread's queue
3726 // to be checked.
3727 const std::vector<piece_picker::downloading_piece>& dl_queue
3728 = t->picker().get_download_queue();
3729 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3730 dl_queue.begin(); i != dl_queue.end(); ++i)
3732 const int blocks_per_piece = t->picker().blocks_in_piece(i->index);
3734 bool complete = true;
3735 for (int j = 0; j < blocks_per_piece; ++j)
3737 if (i->info[j].state == piece_picker::block_info::state_finished)
3738 continue;
3739 complete = false;
3740 break;
3743 // this invariant is not valid anymore since the completion event
3744 // might be queued in the io service
3745 if (complete && !piece_failed)
3747 disk_io_job ret = m_ses.m_disk_thread.find_job(
3748 &t->filesystem(), -1, i->index);
3749 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3750 TORRENT_ASSERT(ret.piece == i->index);
3755 // expensive when using checked iterators
3757 if (t->valid_metadata())
3759 int piece_count = std::count(m_have_piece.begin()
3760 , m_have_piece.end(), true);
3761 if (m_num_pieces != piece_count)
3763 TORRENT_ASSERT(false);
3768 // extremely expensive invariant check
3770 if (!t->is_seed())
3772 piece_picker& p = t->picker();
3773 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3774 const int blocks_per_piece = static_cast<int>(
3775 t->torrent_file().piece_length() / t->block_size());
3777 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3778 dlq.begin(); i != dlq.end(); ++i)
3780 for (int j = 0; j < blocks_per_piece; ++j)
3782 if (std::find(m_request_queue.begin(), m_request_queue.end()
3783 , piece_block(i->index, j)) != m_request_queue.end()
3785 std::find(m_download_queue.begin(), m_download_queue.end()
3786 , piece_block(i->index, j)) != m_download_queue.end())
3788 TORRENT_ASSERT(i->info[j].peer == m_remote);
3790 else
3792 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
3799 #endif
3801 peer_connection::peer_speed_t peer_connection::peer_speed()
3803 shared_ptr<torrent> t = m_torrent.lock();
3804 TORRENT_ASSERT(t);
3806 int download_rate = int(statistics().download_payload_rate());
3807 int torrent_download_rate = int(t->statistics().download_payload_rate());
3809 if (download_rate > 512 && download_rate > torrent_download_rate / 16)
3810 m_speed = fast;
3811 else if (download_rate > 4096 && download_rate > torrent_download_rate / 64)
3812 m_speed = medium;
3813 else if (download_rate < torrent_download_rate / 15 && m_speed == fast)
3814 m_speed = medium;
3815 else if (download_rate < torrent_download_rate / 63 && m_speed == medium)
3816 m_speed = slow;
3818 return m_speed;
3821 void peer_connection::keep_alive()
3823 INVARIANT_CHECK;
3825 time_duration d;
3826 d = time_now() - m_last_sent;
3827 if (total_seconds(d) < m_timeout / 2) return;
3829 if (m_connecting) return;
3830 if (in_handshake()) return;
3832 // if the last send has not completed yet, do not send a keep
3833 // alive
3834 if (m_channel_state[upload_channel] != peer_info::bw_idle) return;
3836 #ifdef TORRENT_VERBOSE_LOGGING
3837 (*m_logger) << time_now_string() << " ==> KEEPALIVE\n";
3838 #endif
3840 m_last_sent = time_now();
3841 write_keepalive();
3844 bool peer_connection::is_seed() const
3846 // if m_num_pieces == 0, we probably don't have the
3847 // metadata yet.
3848 return m_num_pieces == (int)m_have_piece.size() && m_num_pieces > 0;