3 Copyright (c) 2003, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/pch.hpp"
39 #include <boost/bind.hpp>
41 #include "libtorrent/peer_connection.hpp"
42 #include "libtorrent/identify_client.hpp"
43 #include "libtorrent/entry.hpp"
44 #include "libtorrent/bencode.hpp"
45 #include "libtorrent/alert_types.hpp"
46 #include "libtorrent/invariant_check.hpp"
47 #include "libtorrent/io.hpp"
48 #include "libtorrent/file.hpp"
49 #include "libtorrent/version.hpp"
50 #include "libtorrent/extensions.hpp"
51 #include "libtorrent/aux_/session_impl.hpp"
52 #include "libtorrent/policy.hpp"
53 #include "libtorrent/socket_type.hpp"
54 #include "libtorrent/assert.hpp"
56 //#define TORRENT_CORRUPT_DATA
59 using boost::shared_ptr
;
60 using libtorrent::aux::session_impl
;
64 // outbound connection
65 peer_connection::peer_connection(
67 , boost::weak_ptr
<torrent
> tor
68 , shared_ptr
<socket_type
> s
69 , tcp::endpoint
const& endp
70 , policy::peer
* peerinfo
)
73 m_last_choke(time_now() - hours(1))
77 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
78 , m_last_piece(time_now())
79 , m_last_request(time_now())
80 , m_last_incoming_request(min_time())
81 , m_last_unchoke(min_time())
82 , m_last_receive(time_now())
83 , m_last_sent(time_now())
84 , m_requested(min_time())
86 , m_remote_dl_update(time_now())
87 , m_connect(time_now())
88 , m_became_uninterested(time_now())
89 , m_became_uninteresting(time_now())
91 , m_downloaded_at_last_unchoke(0)
92 , m_disk_recv_buffer(ses
, 0)
97 , m_timeout(m_ses
.settings().peer_timeout
)
100 , m_disk_recv_buffer_size(0)
102 , m_num_invalid_requests(0)
104 , m_upload_limit(bandwidth_limit::inf
)
105 , m_download_limit(bandwidth_limit::inf
)
106 , m_peer_info(peerinfo
)
108 , m_connection_ticket(-1)
109 , m_remote_bytes_dled(0)
110 , m_remote_dl_rate(0)
111 , m_outstanding_writing_bytes(0)
112 , m_download_rate_peak(0)
113 , m_upload_rate_peak(0)
115 , m_prefer_whole_pieces(0)
116 , m_desired_queue_size(2)
117 , m_fast_reconnect(false)
119 , m_peer_interested(false)
120 , m_peer_choked(true)
121 , m_interesting(false)
124 , m_ignore_bandwidth_limits(false)
126 , m_disconnecting(false)
129 , m_request_large_blocks(false)
130 , m_upload_only(false)
132 , m_bitfield_received(false)
134 , m_in_constructor(true)
135 , m_disconnect_started(false)
138 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
139 m_channel_state
[download_channel
] = peer_info::bw_idle
;
141 TORRENT_ASSERT(peerinfo
== 0 || peerinfo
->banned
== false);
142 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
143 std::fill(m_country
, m_country
+ 2, 0);
144 #ifndef TORRENT_DISABLE_GEO_IP
145 if (m_ses
.has_country_db())
147 char const *country
= m_ses
.country_for_ip(m_remote
.address());
150 m_country
[0] = country
[0];
151 m_country
[1] = country
[1];
156 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
157 m_logger
= m_ses
.create_log(m_remote
.address().to_string() + "_"
158 + boost::lexical_cast
<std::string
>(m_remote
.port()), m_ses
.listen_port());
159 (*m_logger
) << "*** OUTGOING CONNECTION\n";
162 piece_failed
= false;
164 #ifndef TORRENT_DISABLE_GEO_IP
165 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
168 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
171 // incoming connection
172 peer_connection::peer_connection(
174 , shared_ptr
<socket_type
> s
175 , tcp::endpoint
const& endp
176 , policy::peer
* peerinfo
)
179 m_last_choke(time_now() - hours(1))
183 , m_max_out_request_queue(m_ses
.settings().max_out_request_queue
)
184 , m_last_piece(time_now())
185 , m_last_request(time_now())
186 , m_last_incoming_request(min_time())
187 , m_last_unchoke(min_time())
188 , m_last_receive(time_now())
189 , m_last_sent(time_now())
190 , m_requested(min_time())
191 , m_timeout_extend(0)
192 , m_remote_dl_update(time_now())
193 , m_connect(time_now())
194 , m_became_uninterested(time_now())
195 , m_became_uninteresting(time_now())
197 , m_downloaded_at_last_unchoke(0)
198 , m_disk_recv_buffer(ses
, 0)
202 , m_timeout(m_ses
.settings().peer_timeout
)
205 , m_disk_recv_buffer_size(0)
207 , m_num_invalid_requests(0)
209 , m_upload_limit(bandwidth_limit::inf
)
210 , m_download_limit(bandwidth_limit::inf
)
211 , m_peer_info(peerinfo
)
213 , m_connection_ticket(-1)
214 , m_remote_bytes_dled(0)
215 , m_remote_dl_rate(0)
216 , m_outstanding_writing_bytes(0)
217 , m_download_rate_peak(0)
218 , m_upload_rate_peak(0)
220 , m_prefer_whole_pieces(0)
221 , m_desired_queue_size(2)
222 , m_fast_reconnect(false)
224 , m_peer_interested(false)
225 , m_peer_choked(true)
226 , m_interesting(false)
229 , m_ignore_bandwidth_limits(false)
231 , m_disconnecting(false)
232 , m_connecting(false)
234 , m_request_large_blocks(false)
235 , m_upload_only(false)
237 , m_bitfield_received(false)
239 , m_in_constructor(true)
240 , m_disconnect_started(false)
243 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
244 m_channel_state
[download_channel
] = peer_info::bw_idle
;
246 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
247 std::fill(m_country
, m_country
+ 2, 0);
248 #ifndef TORRENT_DISABLE_GEO_IP
249 if (m_ses
.has_country_db())
251 char const *country
= m_ses
.country_for_ip(m_remote
.address());
254 m_country
[0] = country
[0];
255 m_country
[1] = country
[1];
261 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
263 TORRENT_ASSERT(m_socket
->remote_endpoint(ec
) == m_remote
|| ec
);
264 m_logger
= m_ses
.create_log(remote().address().to_string(ec
) + "_"
265 + boost::lexical_cast
<std::string
>(remote().port()), m_ses
.listen_port());
266 (*m_logger
) << "*** INCOMING CONNECTION\n";
269 #ifndef TORRENT_DISABLE_GEO_IP
270 m_inet_as_name
= m_ses
.as_name_for_ip(m_remote
.address());
273 piece_failed
= false;
275 std::fill(m_peer_id
.begin(), m_peer_id
.end(), 0);
278 bool peer_connection::unchoke_compare(boost::intrusive_ptr
<peer_connection
const> const& p
) const
281 peer_connection
const& rhs
= *p
;
283 // first compare how many bytes they've sent us
284 size_type c1
= m_statistics
.total_payload_download() - m_downloaded_at_last_unchoke
;
285 size_type c2
= rhs
.m_statistics
.total_payload_download() - rhs
.m_downloaded_at_last_unchoke
;
286 if (c1
> c2
) return true;
287 if (c1
< c2
) return false;
289 // if they are equal, compare how much we have uploaded
290 if (m_peer_info
) c1
= m_peer_info
->total_upload();
291 else c1
= m_statistics
.total_payload_upload();
292 if (rhs
.m_peer_info
) c2
= rhs
.m_peer_info
->total_upload();
293 else c2
= rhs
.m_statistics
.total_payload_upload();
298 void peer_connection::reset_choke_counters()
300 m_downloaded_at_last_unchoke
= m_statistics
.total_payload_download();
303 void peer_connection::start()
305 TORRENT_ASSERT(m_peer_info
== 0 || m_peer_info
->connection
== this);
306 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
310 tcp::socket::non_blocking_io
ioc(true);
312 m_socket
->io_control(ioc
, ec
);
315 disconnect(ec
.message().c_str());
318 m_remote
= m_socket
->remote_endpoint(ec
);
321 disconnect(ec
.message().c_str());
324 if (m_remote
.address().is_v4())
325 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
327 else if (t
->ready_for_connections())
333 void peer_connection::update_interest()
335 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
338 bool interested
= false;
339 if (!t
->is_finished())
341 piece_picker
const& p
= t
->picker();
342 int num_pieces
= p
.num_pieces();
343 for (int j
= 0; j
!= num_pieces
; ++j
)
346 && t
->piece_priority(j
) > 0
356 if (!interested
) send_not_interested();
357 else t
->get_policy().peer_is_interesting(*this);
359 // may throw an asio error if socket has disconnected
360 catch (std::exception
&) {}
362 TORRENT_ASSERT(is_interesting() == interested
);
365 #ifndef TORRENT_DISABLE_EXTENSIONS
366 void peer_connection::add_extension(boost::shared_ptr
<peer_plugin
> ext
)
368 m_extensions
.push_back(ext
);
372 void peer_connection::send_allowed_set()
376 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
379 int num_allowed_pieces
= m_ses
.settings().allowed_fast_set_size
;
380 int num_pieces
= t
->torrent_file().num_pieces();
382 if (num_allowed_pieces
>= num_pieces
)
384 for (int i
= 0; i
< num_pieces
; ++i
)
386 #ifdef TORRENT_VERBOSE_LOGGING
387 (*m_logger
) << time_now_string()
388 << " ==> ALLOWED_FAST [ " << i
<< " ]\n";
391 m_accept_fast
.insert(i
);
397 address
const& addr
= m_remote
.address();
400 address_v4::bytes_type bytes
= addr
.to_v4().to_bytes();
401 x
.assign((char*)&bytes
[0], bytes
.size());
405 address_v6::bytes_type bytes
= addr
.to_v6().to_bytes();
406 x
.assign((char*)&bytes
[0], bytes
.size());
408 x
.append((char*)&t
->torrent_file().info_hash()[0], 20);
410 sha1_hash hash
= hasher(&x
[0], x
.size()).final();
413 char* p
= (char*)&hash
[0];
414 for (int i
= 0; i
< 5; ++i
)
416 int piece
= detail::read_uint32(p
) % num_pieces
;
417 if (m_accept_fast
.find(piece
) == m_accept_fast
.end())
419 #ifdef TORRENT_VERBOSE_LOGGING
420 (*m_logger
) << time_now_string()
421 << " ==> ALLOWED_FAST [ " << piece
<< " ]\n";
423 write_allow_fast(piece
);
424 m_accept_fast
.insert(piece
);
425 if (int(m_accept_fast
.size()) >= num_allowed_pieces
426 || int(m_accept_fast
.size()) == num_pieces
) return;
429 hash
= hasher((char*)&hash
[0], 20).final();
433 void peer_connection::init()
437 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
439 TORRENT_ASSERT(t
->valid_metadata());
440 TORRENT_ASSERT(t
->ready_for_connections());
442 m_have_piece
.resize(t
->torrent_file().num_pieces(), m_have_all
);
443 if (m_have_all
) m_num_pieces
= t
->torrent_file().num_pieces();
445 // now that we have a piece_picker,
446 // update it with this peer's pieces
448 TORRENT_ASSERT(m_num_pieces
== std::count(m_have_piece
.begin()
449 , m_have_piece
.end(), true));
451 if (m_num_pieces
== int(m_have_piece
.size()))
453 #ifdef TORRENT_VERBOSE_LOGGING
454 (*m_logger
) << " *** THIS IS A SEED ***\n";
456 // if this is a web seed. we don't have a peer_info struct
457 if (m_peer_info
) m_peer_info
->seed
= true;
460 if (t
->is_finished()) send_not_interested();
461 else t
->get_policy().peer_is_interesting(*this);
465 // if we're a seed, we don't keep track of piece availability
468 t
->peer_has(m_have_piece
);
469 bool interesting
= false;
470 for (int i
= 0; i
< int(m_have_piece
.size()); ++i
)
474 // if the peer has a piece and we don't, the peer is interesting
475 if (!t
->have_piece(i
)
476 && t
->picker().piece_priority(i
) != 0)
480 if (interesting
) t
->get_policy().peer_is_interesting(*this);
481 else send_not_interested();
489 peer_connection::~peer_connection()
492 TORRENT_ASSERT(!m_in_constructor
);
493 TORRENT_ASSERT(m_disconnecting
);
494 TORRENT_ASSERT(m_disconnect_started
);
496 m_disk_recv_buffer_size
= 0;
498 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
501 (*m_logger
) << time_now_string()
502 << " *** CONNECTION CLOSED\n";
505 TORRENT_ASSERT(!m_ses
.has_peer(this));
507 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
508 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
509 TORRENT_ASSERT(!i
->second
->has_peer(this));
511 TORRENT_ASSERT(m_peer_info
->connection
== 0);
513 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
517 void peer_connection::fast_reconnect(bool r
)
519 if (!peer_info_struct() || peer_info_struct()->fast_reconnects
> 1)
521 m_fast_reconnect
= r
;
522 peer_info_struct()->connected
= time_now()
523 - seconds(m_ses
.settings().min_reconnect_time
524 * m_ses
.settings().max_failcount
);
525 ++peer_info_struct()->fast_reconnects
;
528 void peer_connection::announce_piece(int index
)
530 // dont announce during handshake
531 if (in_handshake()) return;
533 // remove suggested pieces that we have
534 std::vector
<int>::iterator i
= std::find(
535 m_suggested_pieces
.begin(), m_suggested_pieces
.end(), index
);
536 if (i
!= m_suggested_pieces
.end()) m_suggested_pieces
.erase(i
);
538 if (has_piece(index
))
540 // if we got a piece that this peer has
541 // it might have been the last interesting
542 // piece this peer had. We might not be
543 // interested anymore
545 if (is_disconnecting()) return;
547 // optimization, don't send have messages
548 // to peers that already have the piece
549 if (!m_ses
.settings().send_redundant_have
)
551 #ifdef TORRENT_VERBOSE_LOGGING
552 (*m_logger
) << time_now_string()
553 << " ==> HAVE [ piece: " << index
<< " ] SUPRESSED\n";
559 #ifdef TORRENT_VERBOSE_LOGGING
560 (*m_logger
) << time_now_string()
561 << " ==> HAVE [ piece: " << index
<< "]\n";
565 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
567 TORRENT_ASSERT(t
->have_piece(index
));
571 bool peer_connection::has_piece(int i
) const
573 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
575 TORRENT_ASSERT(t
->valid_metadata());
576 TORRENT_ASSERT(i
>= 0);
577 TORRENT_ASSERT(i
< t
->torrent_file().num_pieces());
578 return m_have_piece
[i
];
581 std::deque
<piece_block
> const& peer_connection::request_queue() const
583 return m_request_queue
;
586 std::deque
<pending_block
> const& peer_connection::download_queue() const
588 return m_download_queue
;
591 std::deque
<peer_request
> const& peer_connection::upload_queue() const
596 void peer_connection::add_stat(size_type downloaded
, size_type uploaded
)
598 m_statistics
.add_stat(downloaded
, uploaded
);
601 bitfield
const& peer_connection::get_bitfield() const
606 void peer_connection::received_valid_data(int index
)
610 #ifndef TORRENT_DISABLE_EXTENSIONS
611 for (extension_list_t::iterator i
= m_extensions
.begin()
612 , end(m_extensions
.end()); i
!= end
; ++i
)
614 #ifdef BOOST_NO_EXCEPTIONS
615 (*i
)->on_piece_pass(index
);
617 try { (*i
)->on_piece_pass(index
); } catch (std::exception
&) {}
623 void peer_connection::received_invalid_data(int index
)
627 #ifndef TORRENT_DISABLE_EXTENSIONS
628 for (extension_list_t::iterator i
= m_extensions
.begin()
629 , end(m_extensions
.end()); i
!= end
; ++i
)
631 #ifdef BOOST_NO_EXCEPTIONS
632 (*i
)->on_piece_failed(index
);
634 try { (*i
)->on_piece_failed(index
); } catch (std::exception
&) {}
638 if (is_disconnecting()) return;
640 if (peer_info_struct())
642 if (m_ses
.settings().use_parole_mode
)
643 peer_info_struct()->on_parole
= true;
645 ++peer_info_struct()->hashfails
;
646 boost::int8_t& trust_points
= peer_info_struct()->trust_points
;
648 // we decrease more than we increase, to keep the
649 // allowed failed/passed ratio low.
650 // TODO: make this limit user settable
652 if (trust_points
< -7) trust_points
= -7;
656 size_type
peer_connection::total_free_upload() const
658 return m_free_upload
;
661 void peer_connection::add_free_upload(size_type free_upload
)
665 m_free_upload
+= free_upload
;
668 // verifies a piece to see if it is valid (is within a valid range)
669 // and if it can correspond to a request generated by libtorrent.
670 bool peer_connection::verify_piece(const peer_request
& p
) const
674 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
677 TORRENT_ASSERT(t
->valid_metadata());
678 torrent_info
const& ti
= t
->torrent_file();
681 && p
.piece
< t
->torrent_file().num_pieces()
684 && (p
.length
== t
->block_size()
685 || (p
.length
< t
->block_size()
686 && p
.piece
== ti
.num_pieces()-1
687 && p
.start
+ p
.length
== ti
.piece_size(p
.piece
))
688 || (m_request_large_blocks
689 && p
.length
<= ti
.piece_length() * m_prefer_whole_pieces
== 0 ?
690 1 : m_prefer_whole_pieces
))
691 && p
.piece
* size_type(ti
.piece_length()) + p
.start
+ p
.length
693 && (p
.start
% t
->block_size() == 0);
696 void peer_connection::attach_to_torrent(sha1_hash
const& ih
)
700 TORRENT_ASSERT(!m_disconnecting
);
701 TORRENT_ASSERT(m_torrent
.expired());
702 boost::weak_ptr
<torrent
> wpt
= m_ses
.find_torrent(ih
);
703 boost::shared_ptr
<torrent
> t
= wpt
.lock();
705 if (t
&& t
->is_aborted())
707 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
708 (*m_logger
) << " *** the torrent has been aborted\n";
715 // we couldn't find the torrent!
716 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
717 (*m_logger
) << " *** couldn't find a torrent with the given info_hash: " << ih
<< "\n";
718 (*m_logger
) << " torrents:\n";
719 session_impl::torrent_map
const& torrents
= m_ses
.m_torrents
;
720 for (session_impl::torrent_map::const_iterator i
= torrents
.begin()
721 , end(torrents
.end()); i
!= end
; ++i
)
723 (*m_logger
) << " " << i
->second
->torrent_file().info_hash() << "\n";
726 disconnect("got invalid info-hash", 2);
732 // paused torrents will not accept
733 // incoming connections
734 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
735 (*m_logger
) << " rejected connection to paused torrent\n";
737 disconnect("connection rejected bacause torrent is paused");
741 TORRENT_ASSERT(m_torrent
.expired());
742 // check to make sure we don't have another connection with the same
743 // info_hash and peer_id. If we do. close this connection.
748 t
->attach_peer(this);
751 catch (std::exception
& e
)
753 std::cout
<< e
.what() << std::endl
;
754 TORRENT_ASSERT(false);
757 if (m_disconnecting
) return;
760 TORRENT_ASSERT(!m_torrent
.expired());
762 // if the torrent isn't ready to accept
763 // connections yet, we'll have to wait with
764 // our initialization
765 if (t
->ready_for_connections()) init();
767 TORRENT_ASSERT(!m_torrent
.expired());
769 // assume the other end has no pieces
770 // if we don't have valid metadata yet,
771 // leave the vector unallocated
772 TORRENT_ASSERT(m_num_pieces
== 0);
773 m_have_piece
.clear_all();
774 TORRENT_ASSERT(!m_torrent
.expired());
779 // -----------------------------
780 // --------- KEEPALIVE ---------
781 // -----------------------------
783 void peer_connection::incoming_keepalive()
787 #ifdef TORRENT_VERBOSE_LOGGING
788 (*m_logger
) << time_now_string() << " <== KEEPALIVE\n";
792 // -----------------------------
793 // ----------- CHOKE -----------
794 // -----------------------------
796 void peer_connection::incoming_choke()
800 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
803 #ifndef TORRENT_DISABLE_EXTENSIONS
804 for (extension_list_t::iterator i
= m_extensions
.begin()
805 , end(m_extensions
.end()); i
!= end
; ++i
)
807 if ((*i
)->on_choke()) return;
810 if (is_disconnecting()) return;
812 #ifdef TORRENT_VERBOSE_LOGGING
813 (*m_logger
) << time_now_string() << " <== CHOKE\n";
815 m_peer_choked
= true;
817 if (peer_info_struct() == 0 || !peer_info_struct()->on_parole
)
819 // if the peer is not in parole mode, clear the queued
823 piece_picker
& p
= t
->picker();
824 for (std::deque
<piece_block
>::const_iterator i
= m_request_queue
.begin()
825 , end(m_request_queue
.end()); i
!= end
; ++i
)
827 // since this piece was skipped, clear it and allow it to
828 // be requested from other peers
829 p
.abort_download(*i
);
832 m_request_queue
.clear();
836 bool match_request(peer_request
const& r
, piece_block
const& b
, int block_size
)
838 if (b
.piece_index
!= r
.piece
) return false;
839 if (b
.block_index
!= r
.start
/ block_size
) return false;
840 if (r
.start
% block_size
!= 0) return false;
844 // -----------------------------
845 // -------- REJECT PIECE -------
846 // -----------------------------
848 void peer_connection::incoming_reject_request(peer_request
const& r
)
852 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
855 #ifndef TORRENT_DISABLE_EXTENSIONS
856 for (extension_list_t::iterator i
= m_extensions
.begin()
857 , end(m_extensions
.end()); i
!= end
; ++i
)
859 if ((*i
)->on_reject(r
)) return;
863 if (is_disconnecting()) return;
865 std::deque
<pending_block
>::iterator i
= std::find_if(
866 m_download_queue
.begin(), m_download_queue
.end()
867 , bind(match_request
, boost::cref(r
), bind(&pending_block::block
, _1
)
870 #ifdef TORRENT_VERBOSE_LOGGING
871 (*m_logger
) << time_now_string()
872 << " <== REJECT_PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
875 piece_block
b(-1, 0);
876 if (i
!= m_download_queue
.end())
879 m_download_queue
.erase(i
);
881 // if the peer is in parole mode, keep the request
882 if (peer_info_struct() && peer_info_struct()->on_parole
)
884 m_request_queue
.push_front(b
);
886 else if (!t
->is_seed())
888 piece_picker
& p
= t
->picker();
892 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
895 (*m_logger
) << time_now_string()
896 << " *** PIECE NOT IN REQUEST QUEUE\n";
899 if (has_peer_choked())
901 // if we're choked and we got a rejection of
902 // a piece in the allowed fast set, remove it
903 // from the allow fast set.
904 std::vector
<int>::iterator i
= std::find(
905 m_allowed_fast
.begin(), m_allowed_fast
.end(), r
.piece
);
906 if (i
!= m_allowed_fast
.end()) m_allowed_fast
.erase(i
);
910 std::vector
<int>::iterator i
= std::find(m_suggested_pieces
.begin()
911 , m_suggested_pieces
.end(), r
.piece
);
912 if (i
!= m_suggested_pieces
.end())
913 m_suggested_pieces
.erase(i
);
916 if (m_request_queue
.empty() && m_download_queue
.size() < 2)
918 request_a_block(*t
, *this);
919 send_block_requests();
923 // -----------------------------
924 // ------- SUGGEST PIECE -------
925 // -----------------------------
927 void peer_connection::incoming_suggest(int index
)
931 #ifdef TORRENT_VERBOSE_LOGGING
932 (*m_logger
) << time_now_string()
933 << " <== SUGGEST_PIECE [ piece: " << index
<< " ]\n";
935 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
938 #ifndef TORRENT_DISABLE_EXTENSIONS
939 for (extension_list_t::iterator i
= m_extensions
.begin()
940 , end(m_extensions
.end()); i
!= end
; ++i
)
942 if ((*i
)->on_suggest(index
)) return;
946 if (is_disconnecting()) return;
947 if (t
->have_piece(index
)) return;
949 if (m_suggested_pieces
.size() > 9)
950 m_suggested_pieces
.erase(m_suggested_pieces
.begin());
951 m_suggested_pieces
.push_back(index
);
953 #ifdef TORRENT_VERBOSE_LOGGING
954 (*m_logger
) << time_now_string()
955 << " ** SUGGEST_PIECE [ piece: " << index
<< " added to set: " << m_suggested_pieces
.size() << " ]\n";
959 // -----------------------------
960 // ---------- UNCHOKE ----------
961 // -----------------------------
963 void peer_connection::incoming_unchoke()
967 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
970 #ifndef TORRENT_DISABLE_EXTENSIONS
971 for (extension_list_t::iterator i
= m_extensions
.begin()
972 , end(m_extensions
.end()); i
!= end
; ++i
)
974 if ((*i
)->on_unchoke()) return;
978 #ifdef TORRENT_VERBOSE_LOGGING
979 (*m_logger
) << time_now_string() << " <== UNCHOKE\n";
981 m_peer_choked
= false;
982 if (is_disconnecting()) return;
984 t
->get_policy().unchoked(*this);
987 // -----------------------------
988 // -------- INTERESTED ---------
989 // -----------------------------
991 void peer_connection::incoming_interested()
995 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
998 #ifndef TORRENT_DISABLE_EXTENSIONS
999 for (extension_list_t::iterator i
= m_extensions
.begin()
1000 , end(m_extensions
.end()); i
!= end
; ++i
)
1002 if ((*i
)->on_interested()) return;
1006 #ifdef TORRENT_VERBOSE_LOGGING
1007 (*m_logger
) << time_now_string() << " <== INTERESTED\n";
1009 m_peer_interested
= true;
1010 if (is_disconnecting()) return;
1011 t
->get_policy().interested(*this);
1014 // -----------------------------
1015 // ------ NOT INTERESTED -------
1016 // -----------------------------
1018 void peer_connection::incoming_not_interested()
1022 #ifndef TORRENT_DISABLE_EXTENSIONS
1023 for (extension_list_t::iterator i
= m_extensions
.begin()
1024 , end(m_extensions
.end()); i
!= end
; ++i
)
1026 if ((*i
)->on_not_interested()) return;
1030 m_became_uninterested
= time_now();
1032 #ifdef TORRENT_VERBOSE_LOGGING
1033 (*m_logger
) << time_now_string() << " <== NOT_INTERESTED\n";
1035 m_peer_interested
= false;
1036 if (is_disconnecting()) return;
1038 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1041 t
->get_policy().not_interested(*this);
1044 // -----------------------------
1045 // ----------- HAVE ------------
1046 // -----------------------------
1048 void peer_connection::incoming_have(int index
)
1052 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1055 #ifndef TORRENT_DISABLE_EXTENSIONS
1056 for (extension_list_t::iterator i
= m_extensions
.begin()
1057 , end(m_extensions
.end()); i
!= end
; ++i
)
1059 if ((*i
)->on_have(index
)) return;
1063 if (is_disconnecting()) return;
1065 // if we haven't received a bitfield, it was
1066 // probably omitted, which is the same as 'have_none'
1067 if (!m_bitfield_received
) incoming_have_none();
1069 #ifdef TORRENT_VERBOSE_LOGGING
1070 (*m_logger
) << time_now_string()
1071 << " <== HAVE [ piece: " << index
<< "]\n";
1074 if (!t
->valid_metadata() && index
> int(m_have_piece
.size()))
1078 // if we don't have metadata
1079 // and we might not have received a bitfield
1080 // extend the bitmask to fit the new
1082 m_have_piece
.resize(index
+ 1, false);
1086 // unless the index > 64k, in which case
1087 // we just ignore it
1092 // if we got an invalid message, abort
1093 if (index
>= int(m_have_piece
.size()) || index
< 0)
1095 disconnect("got 'have'-message with higher index than the number of pieces", 2);
1099 if (m_have_piece
[index
])
1101 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1102 (*m_logger
) << " got redundant HAVE message for index: " << index
<< "\n";
1107 m_have_piece
.set_bit(index
);
1109 // only update the piece_picker if
1110 // we have the metadata and if
1111 // we're not a seed (in which case
1112 // we won't have a piece picker)
1113 if (t
->valid_metadata())
1118 if (!t
->have_piece(index
)
1120 && !is_interesting()
1121 && t
->picker().piece_priority(index
) != 0)
1122 t
->get_policy().peer_is_interesting(*this);
1124 // this will disregard all have messages we get within
1125 // the first two seconds. Since some clients implements
1126 // lazy bitfields, these will not be reliable to use
1127 // for an estimated peer download rate.
1128 if (!peer_info_struct() || time_now() - peer_info_struct()->connected
> seconds(2))
1130 // update bytes downloaded since last timer
1131 m_remote_bytes_dled
+= t
->torrent_file().piece_size(index
);
1137 m_peer_info
->seed
= true;
1138 m_upload_only
= true;
1139 disconnect_if_redundant();
1140 if (is_disconnecting()) return;
1145 // -----------------------------
1146 // --------- BITFIELD ----------
1147 // -----------------------------
1149 void peer_connection::incoming_bitfield(bitfield
const& bits
)
1153 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1156 #ifndef TORRENT_DISABLE_EXTENSIONS
1157 for (extension_list_t::iterator i
= m_extensions
.begin()
1158 , end(m_extensions
.end()); i
!= end
; ++i
)
1160 if ((*i
)->on_bitfield(bits
)) return;
1164 if (is_disconnecting()) return;
1166 #ifdef TORRENT_VERBOSE_LOGGING
1167 (*m_logger
) << time_now_string() << " <== BITFIELD ";
1169 for (int i
= 0; i
< int(bits
.size()); ++i
)
1171 if (bits
[i
]) (*m_logger
) << "1";
1172 else (*m_logger
) << "0";
1174 (*m_logger
) << "\n";
1177 // if we don't have the metedata, we cannot
1178 // verify the bitfield size
1179 if (t
->valid_metadata()
1180 && (bits
.size() / 8) != (m_have_piece
.size() / 8))
1182 std::stringstream msg
;
1183 msg
<< "got bitfield with invalid size: " << (bits
.size() / 8)
1184 << "bytes. expected: " << (m_have_piece
.size() / 8)
1186 disconnect(msg
.str().c_str(), 2);
1190 m_bitfield_received
= true;
1192 // if we don't have metadata yet
1193 // just remember the bitmask
1194 // don't update the piecepicker
1195 // (since it doesn't exist yet)
1196 if (!t
->ready_for_connections())
1198 m_have_piece
= bits
;
1199 m_num_pieces
= bits
.count();
1200 if (m_peer_info
) m_peer_info
->seed
= (m_num_pieces
== int(bits
.size()));
1204 TORRENT_ASSERT(t
->valid_metadata());
1206 int num_pieces
= bits
.count();
1207 if (num_pieces
== int(m_have_piece
.size()))
1209 #ifdef TORRENT_VERBOSE_LOGGING
1210 (*m_logger
) << " *** THIS IS A SEED ***\n";
1212 // if this is a web seed. we don't have a peer_info struct
1213 if (m_peer_info
) m_peer_info
->seed
= true;
1214 m_upload_only
= true;
1216 m_have_piece
.set_all();
1217 m_num_pieces
= num_pieces
;
1219 if (!t
->is_finished())
1220 t
->get_policy().peer_is_interesting(*this);
1222 disconnect_if_redundant();
1227 // let the torrent know which pieces the
1229 // if we're a seed, we don't keep track of piece availability
1230 bool interesting
= false;
1235 for (int i
= 0; i
< (int)m_have_piece
.size(); ++i
)
1237 bool have
= bits
[i
];
1238 if (have
&& !m_have_piece
[i
])
1240 if (!t
->have_piece(i
) && t
->picker().piece_priority(i
) != 0)
1243 else if (!have
&& m_have_piece
[i
])
1245 // this should probably not be allowed
1251 m_have_piece
= bits
;
1252 m_num_pieces
= num_pieces
;
1254 if (interesting
) t
->get_policy().peer_is_interesting(*this);
1255 else if (upload_only()) disconnect("upload to upload connections");
1258 void peer_connection::disconnect_if_redundant()
1260 if (!m_ses
.settings().close_redundant_connections
) return;
1262 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1264 if (m_upload_only
&& t
->is_finished())
1265 disconnect("seed to seed");
1269 && m_bitfield_received
1270 && t
->are_files_checked())
1271 disconnect("uninteresting upload-only peer");
1274 // -----------------------------
1275 // ---------- REQUEST ----------
1276 // -----------------------------
1278 void peer_connection::incoming_request(peer_request
const& r
)
1282 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1285 // if we haven't received a bitfield, it was
1286 // probably omitted, which is the same as 'have_none'
1287 if (!m_bitfield_received
) incoming_have_none();
1289 #ifndef TORRENT_DISABLE_EXTENSIONS
1290 for (extension_list_t::iterator i
= m_extensions
.begin()
1291 , end(m_extensions
.end()); i
!= end
; ++i
)
1293 if ((*i
)->on_request(r
)) return;
1296 if (is_disconnecting()) return;
1298 if (!t
->valid_metadata())
1300 // if we don't have valid metadata yet,
1301 // we shouldn't get a request
1302 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1303 (*m_logger
) << time_now_string()
1304 << " <== UNEXPECTED_REQUEST [ "
1305 "piece: " << r
.piece
<< " | "
1306 "s: " << r
.start
<< " | "
1307 "l: " << r
.length
<< " | "
1308 "i: " << m_peer_interested
<< " | "
1309 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1310 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1312 (*m_logger
) << time_now_string()
1313 << " ==> REJECT_PIECE [ "
1314 "piece: " << r
.piece
<< " | "
1315 "s: " << r
.start
<< " | "
1316 "l: " << r
.length
<< " ]\n";
1318 write_reject_request(r
);
1322 if (int(m_requests
.size()) > m_ses
.settings().max_allowed_in_request_queue
)
1324 // don't allow clients to abuse our
1325 // memory consumption.
1326 // ignore requests if the client
1327 // is making too many of them.
1328 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1329 (*m_logger
) << time_now_string()
1330 << " <== TOO MANY REQUESTS [ "
1331 "piece: " << r
.piece
<< " | "
1332 "s: " << r
.start
<< " | "
1333 "l: " << r
.length
<< " | "
1334 "i: " << m_peer_interested
<< " | "
1335 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1336 "n: " << t
->torrent_file().num_pieces() << " ]\n";
1338 (*m_logger
) << time_now_string()
1339 << " ==> REJECT_PIECE [ "
1340 "piece: " << r
.piece
<< " | "
1341 "s: " << r
.start
<< " | "
1342 "l: " << r
.length
<< " ]\n";
1344 write_reject_request(r
);
1348 // make sure this request
1349 // is legal and that the peer
1352 && r
.piece
< t
->torrent_file().num_pieces()
1353 && t
->have_piece(r
.piece
)
1355 && r
.start
< t
->torrent_file().piece_size(r
.piece
)
1357 && r
.length
+ r
.start
<= t
->torrent_file().piece_size(r
.piece
)
1358 && m_peer_interested
1359 && r
.length
<= t
->block_size())
1361 #ifdef TORRENT_VERBOSE_LOGGING
1362 (*m_logger
) << time_now_string()
1363 << " <== REQUEST [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1365 // if we have choked the client
1366 // ignore the request
1367 if (m_choked
&& m_accept_fast
.find(r
.piece
) == m_accept_fast
.end())
1369 write_reject_request(r
);
1370 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1371 (*m_logger
) << time_now_string()
1372 << " *** REJECTING REQUEST [ peer choked and piece not in allowed fast set ]\n";
1373 (*m_logger
) << time_now_string()
1374 << " ==> REJECT_PIECE [ "
1375 "piece: " << r
.piece
<< " | "
1376 "s: " << r
.start
<< " | "
1377 "l: " << r
.length
<< " ]\n";
1382 m_requests
.push_back(r
);
1383 m_last_incoming_request
= time_now();
1389 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1390 (*m_logger
) << time_now_string()
1391 << " <== INVALID_REQUEST [ "
1392 "piece: " << r
.piece
<< " | "
1393 "s: " << r
.start
<< " | "
1394 "l: " << r
.length
<< " | "
1395 "i: " << m_peer_interested
<< " | "
1396 "t: " << t
->torrent_file().piece_size(r
.piece
) << " | "
1397 "n: " << t
->torrent_file().num_pieces() << " | "
1398 "h: " << t
->have_piece(r
.piece
) << " | "
1399 "block_limit: " << t
->block_size() << " ]\n";
1401 (*m_logger
) << time_now_string()
1402 << " ==> REJECT_PIECE [ "
1403 "piece: " << r
.piece
<< " | "
1404 "s: " << r
.start
<< " | "
1405 "l: " << r
.length
<< " ]\n";
1408 write_reject_request(r
);
1409 ++m_num_invalid_requests
;
1411 if (t
->alerts().should_post
<invalid_request_alert
>())
1413 t
->alerts().post_alert(invalid_request_alert(
1414 t
->get_handle(), m_remote
, m_peer_id
, r
));
1419 void peer_connection::incoming_piece_fragment()
1421 m_last_piece
= time_now();
1425 struct check_postcondition
1427 check_postcondition(boost::shared_ptr
<torrent
> const& t_
1428 , bool init_check
= true): t(t_
) { if (init_check
) check(); }
1430 ~check_postcondition() { check(); }
1436 const int blocks_per_piece
= static_cast<int>(
1437 t
->torrent_file().piece_length() / t
->block_size());
1439 std::vector
<piece_picker::downloading_piece
> const& dl_queue
1440 = t
->picker().get_download_queue();
1442 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
1443 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
1445 TORRENT_ASSERT(i
->finished
<= blocks_per_piece
);
1450 shared_ptr
<torrent
> t
;
1455 // -----------------------------
1456 // ----------- PIECE -----------
1457 // -----------------------------
1459 void peer_connection::incoming_piece(peer_request
const& p
, char const* data
)
1461 char* buffer
= m_ses
.allocate_disk_buffer();
1464 disconnect("out of memory");
1467 disk_buffer_holder
holder(m_ses
, buffer
);
1468 std::memcpy(buffer
, data
, p
.length
);
1469 incoming_piece(p
, holder
);
1472 void peer_connection::incoming_piece(peer_request
const& p
, disk_buffer_holder
& data
)
1476 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1479 TORRENT_ASSERT(!m_disk_recv_buffer
);
1480 TORRENT_ASSERT(m_disk_recv_buffer_size
== 0);
1482 #ifdef TORRENT_CORRUPT_DATA
1483 // corrupt all pieces from certain peers
1484 if (m_remote
.address().is_v4()
1485 && (m_remote
.address().to_v4().to_ulong() & 0xf) == 0)
1487 data
.get()[0] = ~data
.get()[0];
1491 // if we haven't received a bitfield, it was
1492 // probably omitted, which is the same as 'have_none'
1493 if (!m_bitfield_received
) incoming_have_none();
1495 #ifndef TORRENT_DISABLE_EXTENSIONS
1496 for (extension_list_t::iterator i
= m_extensions
.begin()
1497 , end(m_extensions
.end()); i
!= end
; ++i
)
1499 if ((*i
)->on_piece(p
, data
)) return;
1502 if (is_disconnecting()) return;
1505 check_postcondition
post_checker_(t
);
1506 #if !defined TORRENT_DISABLE_INVARIANT_CHECKS
1507 t
->check_invariant();
1511 #ifdef TORRENT_VERBOSE_LOGGING
1512 (*m_logger
) << time_now_string()
1513 << " <== PIECE [ piece: " << p
.piece
<< " | "
1514 "s: " << p
.start
<< " | "
1515 "l: " << p
.length
<< " | "
1516 "ds: " << statistics().download_rate() << " | "
1517 "qs: " << m_desired_queue_size
<< " ]\n";
1522 if (t
->alerts().should_post
<peer_error_alert
>())
1524 t
->alerts().post_alert(peer_error_alert(t
->get_handle(), m_remote
1525 , m_peer_id
, "peer sent 0 length piece"));
1530 if (!verify_piece(p
))
1532 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1533 (*m_logger
) << time_now_string()
1534 << " <== INVALID_PIECE [ piece: " << p
.piece
<< " | "
1535 "start: " << p
.start
<< " | "
1536 "length: " << p
.length
<< " ]\n";
1538 disconnect("got invalid piece packet", 2);
1542 // if we're already seeding, don't bother,
1546 t
->add_redundant_bytes(p
.length
);
1550 ptime now
= time_now();
1552 piece_picker
& picker
= t
->picker();
1553 piece_manager
& fs
= t
->filesystem();
1555 std::vector
<piece_block
> finished_blocks
;
1556 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1557 TORRENT_ASSERT(p
.start
% t
->block_size() == 0);
1558 TORRENT_ASSERT(p
.length
== t
->block_size()
1559 || p
.length
== t
->torrent_file().total_size() % t
->block_size());
1561 std::deque
<pending_block
>::iterator b
1563 m_download_queue
.begin()
1564 , m_download_queue
.end()
1565 , has_block(block_finished
));
1567 if (b
== m_download_queue
.end())
1569 if (t
->alerts().should_post
<unwanted_block_alert
>())
1571 t
->alerts().post_alert(unwanted_block_alert(t
->get_handle(), m_remote
1572 , m_peer_id
, block_finished
.block_index
, block_finished
.piece_index
));
1574 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1575 (*m_logger
) << " *** The block we just got was not in the "
1576 "request queue ***\n";
1578 t
->add_redundant_bytes(p
.length
);
1579 request_a_block(*t
, *this);
1580 send_block_requests();
1584 int block_index
= b
- m_download_queue
.begin();
1585 for (int i
= 0; i
< block_index
; ++i
)
1587 pending_block
& qe
= m_download_queue
[i
];
1589 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1590 (*m_logger
) << time_now_string()
1591 << " *** SKIPPED_PIECE [ piece: " << qe
.block
.piece_index
<< " | "
1592 "b: " << qe
.block
.block_index
<< " ] ***\n";
1596 // if the number of times a block is skipped by out of order
1597 // blocks exceeds the size of the outstanding queue, assume that
1598 // the other end dropped the request.
1599 if (qe
.skipped
> m_desired_queue_size
)
1601 if (m_ses
.m_alerts
.should_post
<request_dropped_alert
>())
1602 m_ses
.m_alerts
.post_alert(request_dropped_alert(t
->get_handle()
1603 , remote(), pid(), qe
.block
.block_index
, qe
.block
.piece_index
));
1604 picker
.abort_download(qe
.block
);
1605 m_download_queue
.erase(m_download_queue
.begin() + i
);
1611 // if the block we got is already finished, then ignore it
1612 if (picker
.is_downloaded(block_finished
))
1614 t
->add_redundant_bytes(p
.length
);
1616 m_download_queue
.erase(b
);
1617 m_timeout_extend
= 0;
1619 if (!m_download_queue
.empty())
1622 request_a_block(*t
, *this);
1623 send_block_requests();
1627 if (total_seconds(now
- m_requested
)
1628 < m_ses
.settings().request_timeout
1632 if (m_ses
.m_alerts
.should_post
<peer_unsnubbed_alert
>())
1634 m_ses
.m_alerts
.post_alert(peer_unsnubbed_alert(t
->get_handle()
1635 , m_remote
, m_peer_id
));
1639 fs
.async_write(p
, data
, bind(&peer_connection::on_disk_write_complete
1640 , self(), _1
, _2
, p
, t
));
1641 m_outstanding_writing_bytes
+= p
.length
;
1642 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
1643 m_download_queue
.erase(b
);
1645 if (m_outstanding_writing_bytes
>= m_ses
.settings().max_outstanding_disk_bytes_per_connection
1646 && t
->alerts().should_post
<performance_alert
>())
1648 t
->alerts().post_alert(performance_alert(t
->get_handle()
1649 , performance_alert::outstanding_disk_buffer_limit_reached
));
1652 if (!m_download_queue
.empty())
1654 m_timeout_extend
= (std::max
)(m_timeout_extend
1655 - m_ses
.settings().request_timeout
, 0);
1656 m_requested
+= seconds(m_ses
.settings().request_timeout
);
1657 if (m_requested
> now
) m_requested
= now
;
1661 m_timeout_extend
= 0;
1664 // did we request this block from any other peers?
1665 bool multi
= picker
.num_peers(block_finished
) > 1;
1666 picker
.mark_as_writing(block_finished
, peer_info_struct());
1668 // if we requested this block from other peers, cancel it now
1669 if (multi
) t
->cancel_block(block_finished
);
1671 #if !defined NDEBUG && !defined TORRENT_DISABLE_INVARIANT_CHECKS
1672 t
->check_invariant();
1674 request_a_block(*t
, *this);
1675 send_block_requests();
1678 void peer_connection::on_disk_write_complete(int ret
, disk_io_job
const& j
1679 , peer_request p
, boost::shared_ptr
<torrent
> t
)
1681 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
1685 m_outstanding_writing_bytes
-= p
.length
;
1686 TORRENT_ASSERT(m_outstanding_writing_bytes
>= 0);
1688 #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
1689 // (*m_ses.m_logger) << time_now_string() << " *** DISK_WRITE_COMPLETE [ p: "
1690 // << p.piece << " o: " << p.start << " ]\n";
1692 // in case the outstanding bytes just dropped down
1693 // to allow to receive more data
1696 piece_block
block_finished(p
.piece
, p
.start
/ t
->block_size());
1698 if (ret
== -1 || !t
)
1700 if (t
->has_picker()) t
->picker().write_failed(block_finished
);
1704 disconnect(j
.str
.c_str());
1708 if (t
->alerts().should_post
<file_error_alert
>())
1710 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
1716 if (t
->is_seed()) return;
1718 piece_picker
& picker
= t
->picker();
1720 TORRENT_ASSERT(p
.piece
== j
.piece
);
1721 TORRENT_ASSERT(p
.start
== j
.offset
);
1722 picker
.mark_as_finished(block_finished
, peer_info_struct());
1723 if (t
->alerts().should_post
<block_finished_alert
>())
1725 t
->alerts().post_alert(block_finished_alert(t
->get_handle(),
1726 remote(), pid(), block_finished
.block_index
, block_finished
.piece_index
));
1729 // did we just finish the piece?
1730 if (picker
.is_piece_finished(p
.piece
))
1733 check_postcondition
post_checker2_(t
, false);
1735 t
->async_verify_piece(p
.piece
, bind(&torrent::piece_finished
, t
1739 if (!t
->is_seed() && !m_torrent
.expired())
1741 // this is a free function defined in policy.cpp
1742 request_a_block(*t
, *this);
1743 send_block_requests();
1748 // -----------------------------
1749 // ---------- CANCEL -----------
1750 // -----------------------------
1752 void peer_connection::incoming_cancel(peer_request
const& r
)
1756 #ifndef TORRENT_DISABLE_EXTENSIONS
1757 for (extension_list_t::iterator i
= m_extensions
.begin()
1758 , end(m_extensions
.end()); i
!= end
; ++i
)
1760 if ((*i
)->on_cancel(r
)) return;
1763 if (is_disconnecting()) return;
1765 #ifdef TORRENT_VERBOSE_LOGGING
1766 (*m_logger
) << time_now_string()
1767 << " <== CANCEL [ piece: " << r
.piece
<< " | s: " << r
.start
<< " | l: " << r
.length
<< " ]\n";
1770 std::deque
<peer_request
>::iterator i
1771 = std::find(m_requests
.begin(), m_requests
.end(), r
);
1773 if (i
!= m_requests
.end())
1775 m_requests
.erase(i
);
1776 #ifdef TORRENT_VERBOSE_LOGGING
1777 (*m_logger
) << time_now_string()
1778 << " ==> REJECT_PIECE [ "
1779 "piece: " << r
.piece
<< " | "
1780 "s: " << r
.start
<< " | "
1781 "l: " << r
.length
<< " ]\n";
1783 write_reject_request(r
);
1787 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1788 (*m_logger
) << time_now_string() << " *** GOT CANCEL NOT IN THE QUEUE\n";
1793 // -----------------------------
1794 // --------- DHT PORT ----------
1795 // -----------------------------
1797 void peer_connection::incoming_dht_port(int listen_port
)
1801 #ifdef TORRENT_VERBOSE_LOGGING
1802 (*m_logger
) << time_now_string()
1803 << " <== DHT_PORT [ p: " << listen_port
<< " ]\n";
1805 #ifndef TORRENT_DISABLE_DHT
1806 m_ses
.add_dht_node(udp::endpoint(
1807 m_remote
.address(), listen_port
));
1811 // -----------------------------
1812 // --------- HAVE ALL ----------
1813 // -----------------------------
1815 void peer_connection::incoming_have_all()
1819 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1822 #ifdef TORRENT_VERBOSE_LOGGING
1823 (*m_logger
) << time_now_string() << " <== HAVE_ALL\n";
1826 #ifndef TORRENT_DISABLE_EXTENSIONS
1827 for (extension_list_t::iterator i
= m_extensions
.begin()
1828 , end(m_extensions
.end()); i
!= end
; ++i
)
1830 if ((*i
)->on_have_all()) return;
1833 if (is_disconnecting()) return;
1837 if (m_peer_info
) m_peer_info
->seed
= true;
1838 m_upload_only
= true;
1839 m_bitfield_received
= true;
1841 #ifdef TORRENT_VERBOSE_LOGGING
1842 (*m_logger
) << " *** THIS IS A SEED ***\n";
1845 // if we don't have metadata yet
1846 // just remember the bitmask
1847 // don't update the piecepicker
1848 // (since it doesn't exist yet)
1849 if (!t
->ready_for_connections())
1851 // assume seeds are interesting when we
1852 // don't even have the metadata
1853 t
->get_policy().peer_is_interesting(*this);
1855 disconnect_if_redundant();
1856 // TODO: this might need something more
1857 // so that once we have the metadata
1858 // we can construct a full bitfield
1862 TORRENT_ASSERT(!m_have_piece
.empty());
1863 m_have_piece
.set_all();
1864 m_num_pieces
= m_have_piece
.size();
1868 // if we're finished, we're not interested
1869 if (t
->is_finished()) send_not_interested();
1870 else t
->get_policy().peer_is_interesting(*this);
1872 disconnect_if_redundant();
1875 // -----------------------------
1876 // --------- HAVE NONE ---------
1877 // -----------------------------
1879 void peer_connection::incoming_have_none()
1883 #ifdef TORRENT_VERBOSE_LOGGING
1884 (*m_logger
) << time_now_string() << " <== HAVE_NONE\n";
1887 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1890 #ifndef TORRENT_DISABLE_EXTENSIONS
1891 for (extension_list_t::iterator i
= m_extensions
.begin()
1892 , end(m_extensions
.end()); i
!= end
; ++i
)
1894 if ((*i
)->on_have_none()) return;
1897 if (is_disconnecting()) return;
1898 if (m_peer_info
) m_peer_info
->seed
= false;
1899 m_bitfield_received
= true;
1901 // we're never interested in a peer that doesn't have anything
1902 send_not_interested();
1904 TORRENT_ASSERT(!m_have_piece
.empty() || !t
->ready_for_connections());
1905 disconnect_if_redundant();
1908 // -----------------------------
1909 // ------- ALLOWED FAST --------
1910 // -----------------------------
1912 void peer_connection::incoming_allowed_fast(int index
)
1916 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1919 #ifdef TORRENT_VERBOSE_LOGGING
1920 (*m_logger
) << time_now_string() << " <== ALLOWED_FAST [ " << index
<< " ]\n";
1923 #ifndef TORRENT_DISABLE_EXTENSIONS
1924 for (extension_list_t::iterator i
= m_extensions
.begin()
1925 , end(m_extensions
.end()); i
!= end
; ++i
)
1927 if ((*i
)->on_allowed_fast(index
)) return;
1930 if (is_disconnecting()) return;
1932 if (index
< 0 || index
>= int(m_have_piece
.size()))
1934 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
1935 (*m_logger
) << time_now_string() << " <== INVALID_ALLOWED_FAST [ " << index
<< " | s: "
1936 << int(m_have_piece
.size()) << " ]\n";
1941 // if we already have the piece, we can
1942 // ignore this message
1943 if (t
->valid_metadata()
1944 && t
->have_piece(index
))
1947 m_allowed_fast
.push_back(index
);
1949 // if the peer has the piece and we want
1950 // to download it, request it
1951 if (int(m_have_piece
.size()) > index
1952 && m_have_piece
[index
]
1954 && t
->picker().piece_priority(index
) > 0)
1956 t
->get_policy().peer_is_interesting(*this);
1960 std::vector
<int> const& peer_connection::allowed_fast()
1962 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1965 m_allowed_fast
.erase(std::remove_if(m_allowed_fast
.begin()
1966 , m_allowed_fast
.end(), bind(&torrent::have_piece
, t
, _1
))
1967 , m_allowed_fast
.end());
1969 // TODO: sort the allowed fast set in priority order
1970 return m_allowed_fast
;
1973 void peer_connection::add_request(piece_block
const& block
)
1977 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
1980 TORRENT_ASSERT(t
->valid_metadata());
1981 TORRENT_ASSERT(block
.piece_index
>= 0);
1982 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
1983 TORRENT_ASSERT(block
.block_index
>= 0);
1984 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
1985 TORRENT_ASSERT(!t
->picker().is_requested(block
) || (t
->picker().num_peers(block
) > 0));
1986 TORRENT_ASSERT(!t
->have_piece(block
.piece_index
));
1987 TORRENT_ASSERT(std::find_if(m_download_queue
.begin(), m_download_queue
.end()
1988 , has_block(block
)) == m_download_queue
.end());
1989 TORRENT_ASSERT(std::find(m_request_queue
.begin(), m_request_queue
.end()
1990 , block
) == m_request_queue
.end());
1992 piece_picker::piece_state_t state
;
1993 peer_speed_t speed
= peer_speed();
1994 char const* speedmsg
= 0;
1998 state
= piece_picker::fast
;
2000 else if (speed
== medium
)
2002 speedmsg
= "medium";
2003 state
= piece_picker::medium
;
2008 state
= piece_picker::slow
;
2011 if (!t
->picker().mark_as_downloading(block
, peer_info_struct(), state
))
2014 if (t
->alerts().should_post
<block_downloading_alert
>())
2016 t
->alerts().post_alert(block_downloading_alert(t
->get_handle(),
2017 remote(), pid(), speedmsg
, block
.block_index
, block
.piece_index
));
2020 m_request_queue
.push_back(block
);
2023 void peer_connection::cancel_request(piece_block
const& block
)
2027 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2028 // this peer might be disconnecting
2031 TORRENT_ASSERT(t
->valid_metadata());
2033 TORRENT_ASSERT(block
.piece_index
>= 0);
2034 TORRENT_ASSERT(block
.piece_index
< t
->torrent_file().num_pieces());
2035 TORRENT_ASSERT(block
.block_index
>= 0);
2036 TORRENT_ASSERT(block
.block_index
< t
->torrent_file().piece_size(block
.piece_index
));
2038 // if all the peers that requested this block has been
2039 // cancelled, then just ignore the cancel.
2040 if (!t
->picker().is_requested(block
)) return;
2042 std::deque
<pending_block
>::iterator it
2043 = std::find_if(m_download_queue
.begin(), m_download_queue
.end(), has_block(block
));
2044 if (it
== m_download_queue
.end())
2046 std::deque
<piece_block
>::iterator rit
= std::find(m_request_queue
.begin()
2047 , m_request_queue
.end(), block
);
2049 // when a multi block is received, it is cancelled
2050 // from all peers, so if this one hasn't requested
2051 // the block, just ignore to cancel it.
2052 if (rit
== m_request_queue
.end()) return;
2054 t
->picker().abort_download(block
);
2055 m_request_queue
.erase(rit
);
2056 // since we found it in the request queue, it means it hasn't been
2057 // sent yet, so we don't have to send a cancel.
2061 int block_offset
= block
.block_index
* t
->block_size();
2063 = (std::min
)(t
->torrent_file().piece_size(block
.piece_index
)-block_offset
,
2065 TORRENT_ASSERT(block_size
> 0);
2066 TORRENT_ASSERT(block_size
<= t
->block_size());
2069 r
.piece
= block
.piece_index
;
2070 r
.start
= block_offset
;
2071 r
.length
= block_size
;
2073 #ifdef TORRENT_VERBOSE_LOGGING
2074 (*m_logger
) << time_now_string()
2075 << " ==> CANCEL [ piece: " << block
.piece_index
<< " | s: "
2076 << block_offset
<< " | l: " << block_size
<< " | " << block
.block_index
<< " ]\n";
2081 void peer_connection::send_choke()
2085 TORRENT_ASSERT(!m_peer_info
|| !m_peer_info
->optimistically_unchoked
);
2087 if (m_choked
) return;
2091 #ifdef TORRENT_VERBOSE_LOGGING
2092 (*m_logger
) << time_now_string() << " ==> CHOKE\n";
2095 m_last_choke
= time_now();
2097 m_num_invalid_requests
= 0;
2099 // reject the requests we have in the queue
2100 // except the allowed fast pieces
2101 for (std::deque
<peer_request
>::iterator i
= m_requests
.begin();
2102 i
!= m_requests
.end();)
2104 if (m_accept_fast
.count(i
->piece
))
2110 peer_request
const& r
= *i
;
2111 write_reject_request(r
);
2113 #ifdef TORRENT_VERBOSE_LOGGING
2114 (*m_logger
) << time_now_string()
2115 << " ==> REJECT_PIECE [ "
2116 "piece: " << r
.piece
<< " | "
2117 "s: " << r
.start
<< " | "
2118 "l: " << r
.length
<< " ]\n";
2120 i
= m_requests
.erase(i
);
2124 void peer_connection::send_unchoke()
2128 if (!m_choked
) return;
2129 m_last_unchoke
= time_now();
2133 #ifdef TORRENT_VERBOSE_LOGGING
2134 (*m_logger
) << time_now_string() << " ==> UNCHOKE\n";
2138 void peer_connection::send_interested()
2140 if (m_interesting
) return;
2141 m_interesting
= true;
2142 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2143 if (!t
->valid_metadata()) return;
2146 #ifdef TORRENT_VERBOSE_LOGGING
2147 (*m_logger
) << time_now_string() << " ==> INTERESTED\n";
2151 void peer_connection::send_not_interested()
2153 if (!m_interesting
) return;
2154 m_interesting
= false;
2155 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2156 if (!t
->valid_metadata()) return;
2157 write_not_interested();
2159 m_became_uninteresting
= time_now();
2161 #ifdef TORRENT_VERBOSE_LOGGING
2162 (*m_logger
) << time_now_string() << " ==> NOT_INTERESTED\n";
2164 disconnect_if_redundant();
2167 void peer_connection::send_block_requests()
2171 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2174 if ((int)m_download_queue
.size() >= m_desired_queue_size
) return;
2176 bool empty_download_queue
= m_download_queue
.empty();
2178 while (!m_request_queue
.empty()
2179 && (int)m_download_queue
.size() < m_desired_queue_size
)
2181 piece_block block
= m_request_queue
.front();
2183 int block_offset
= block
.block_index
* t
->block_size();
2184 int block_size
= (std::min
)(t
->torrent_file().piece_size(
2185 block
.piece_index
) - block_offset
, t
->block_size());
2186 TORRENT_ASSERT(block_size
> 0);
2187 TORRENT_ASSERT(block_size
<= t
->block_size());
2190 r
.piece
= block
.piece_index
;
2191 r
.start
= block_offset
;
2192 r
.length
= block_size
;
2194 m_request_queue
.pop_front();
2195 if (t
->is_seed()) continue;
2196 // this can happen if a block times out, is re-requested and
2197 // then arrives "unexpectedly"
2198 if (t
->picker().is_finished(block
) || t
->picker().is_downloaded(block
))
2201 m_download_queue
.push_back(block
);
2203 #ifdef TORRENT_VERBOSE_LOGGING
2204 (*m_logger) << time_now_string()
2205 << " *** REQUEST-QUEUE** [ "
2206 "piece: " << block.piece_index << " | "
2207 "block: " << block.block_index << " ]\n";
2210 // if we are requesting large blocks, merge the smaller
2211 // blocks that are in the same piece into larger requests
2212 if (m_request_large_blocks
)
2214 int blocks_per_piece
= t
->torrent_file().piece_length() / t
->block_size();
2216 while (!m_request_queue
.empty())
2218 // check to see if this block is connected to the previous one
2219 // if it is, merge them, otherwise, break this merge loop
2220 piece_block
const& front
= m_request_queue
.front();
2221 if (front
.piece_index
* blocks_per_piece
+ front
.block_index
2222 != block
.piece_index
* blocks_per_piece
+ block
.block_index
+ 1)
2224 block
= m_request_queue
.front();
2225 m_request_queue
.pop_front();
2226 m_download_queue
.push_back(block
);
2228 #ifdef TORRENT_VERBOSE_LOGGING
2229 (*m_logger
) << time_now_string()
2230 << " *** MERGING REQUEST ** [ "
2231 "piece: " << block
.piece_index
<< " | "
2232 "block: " << block
.block_index
<< " ]\n";
2235 block_offset
= block
.block_index
* t
->block_size();
2236 block_size
= (std::min
)(t
->torrent_file().piece_size(
2237 block
.piece_index
) - block_offset
, t
->block_size());
2238 TORRENT_ASSERT(block_size
> 0);
2239 TORRENT_ASSERT(block_size
<= t
->block_size());
2241 r
.length
+= block_size
;
2245 TORRENT_ASSERT(verify_piece(r
));
2247 #ifndef TORRENT_DISABLE_EXTENSIONS
2248 bool handled
= false;
2249 for (extension_list_t::iterator i
= m_extensions
.begin()
2250 , end(m_extensions
.end()); i
!= end
; ++i
)
2252 if (handled
= (*i
)->write_request(r
)) break;
2254 if (is_disconnecting()) return;
2258 m_last_request
= time_now();
2262 m_last_request
= time_now();
2265 #ifdef TORRENT_VERBOSE_LOGGING
2266 (*m_logger
) << time_now_string()
2267 << " ==> REQUEST [ "
2268 "piece: " << r
.piece
<< " | "
2269 "s: " << r
.start
<< " | "
2270 "l: " << r
.length
<< " | "
2271 "ds: " << statistics().download_rate() << " B/s | "
2272 "qs: " << m_desired_queue_size
<< " "
2273 "blk: " << (m_request_large_blocks
?"large":"single") << " ]\n";
2276 m_last_piece
= time_now();
2278 if (!m_download_queue
.empty()
2279 && empty_download_queue
)
2281 // This means we just added a request to this connection
2282 m_requested
= time_now();
2286 void peer_connection::timed_out()
2288 TORRENT_ASSERT(m_connecting
);
2289 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
2290 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION TIMED OUT: " << m_remote
.address().to_string()
2293 disconnect("timed out: connect", 1);
2296 // the error argument defaults to 0, which means deliberate disconnect
2297 // 1 means unexpected disconnect/error
2298 // 2 protocol error (client sent something invalid)
2299 void peer_connection::disconnect(char const* message
, int error
)
2301 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
2304 m_disconnect_started
= true;
2307 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2311 (*m_logger
) << "*** CONNECTION CLOSED " << message
<< "\n";
2314 (*m_logger
) << "*** CONNECTION FAILED " << message
<< "\n";
2317 (*m_logger
) << "*** PEER ERROR " << message
<< "\n";
2321 // we cannot do this in a constructor
2322 TORRENT_ASSERT(m_in_constructor
== false);
2323 if (error
> 0) m_failed
= true;
2324 if (m_disconnecting
) return;
2325 boost::intrusive_ptr
<peer_connection
> me(this);
2329 if (m_connecting
&& m_connection_ticket
>= 0)
2331 m_ses
.m_half_open
.done(m_connection_ticket
);
2332 m_connection_ticket
= -1;
2335 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2336 torrent_handle handle
;
2337 if (t
) handle
= t
->get_handle();
2341 if (error
> 1 && m_ses
.m_alerts
.should_post
<peer_error_alert
>())
2343 m_ses
.m_alerts
.post_alert(
2344 peer_error_alert(handle
, remote(), pid(), message
));
2346 else if (error
<= 1 && m_ses
.m_alerts
.should_post
<peer_disconnected_alert
>())
2348 m_ses
.m_alerts
.post_alert(
2349 peer_disconnected_alert(handle
, remote(), pid(), message
));
2355 // make sure we keep all the stats!
2357 t
->add_stats(statistics());
2359 if (t
->has_picker())
2361 piece_picker
& picker
= t
->picker();
2363 while (!m_download_queue
.empty())
2365 picker
.abort_download(m_download_queue
.back().block
);
2366 m_download_queue
.pop_back();
2368 while (!m_request_queue
.empty())
2370 picker
.abort_download(m_request_queue
.back());
2371 m_request_queue
.pop_back();
2375 t
->remove_peer(this);
2380 // since this connection doesn't have a torrent reference
2381 // no torrent should have a reference to this connection either
2382 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
2383 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
2384 TORRENT_ASSERT(!i
->second
->has_peer(this));
2387 m_disconnecting
= true;
2389 m_socket
->close(ec
);
2390 m_ses
.close_connection(this, message
);
2393 void peer_connection::set_upload_limit(int limit
)
2395 TORRENT_ASSERT(limit
>= -1);
2396 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2397 if (limit
< 10) limit
= 10;
2398 m_upload_limit
= limit
;
2399 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2402 void peer_connection::set_download_limit(int limit
)
2404 TORRENT_ASSERT(limit
>= -1);
2405 if (limit
== -1) limit
= (std::numeric_limits
<int>::max
)();
2406 if (limit
< 10) limit
= 10;
2407 m_download_limit
= limit
;
2408 m_bandwidth_limit
[download_channel
].throttle(m_download_limit
);
2411 size_type
peer_connection::share_diff() const
2415 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2418 float ratio
= t
->ratio();
2420 // if we have an infinite ratio, just say we have downloaded
2421 // much more than we have uploaded. And we'll keep uploading.
2423 return (std::numeric_limits
<size_type
>::max
)();
2425 return m_free_upload
2426 + static_cast<size_type
>(m_statistics
.total_payload_download() * ratio
)
2427 - m_statistics
.total_payload_upload();
2430 // defined in upnp.cpp
2431 bool is_local(address
const& a
);
2433 bool peer_connection::on_local_network() const
2435 if (libtorrent::is_local(m_remote
.address())
2436 || is_loopback(m_remote
.address())) return true;
2440 void peer_connection::get_peer_info(peer_info
& p
) const
2442 TORRENT_ASSERT(!associated_torrent().expired());
2444 ptime now
= time_now();
2446 p
.download_rate_peak
= m_download_rate_peak
;
2447 p
.upload_rate_peak
= m_upload_rate_peak
;
2449 p
.down_speed
= statistics().download_rate();
2450 p
.up_speed
= statistics().upload_rate();
2451 p
.payload_down_speed
= statistics().download_payload_rate();
2452 p
.payload_up_speed
= statistics().upload_payload_rate();
2455 p
.pending_disk_bytes
= m_outstanding_writing_bytes
;
2456 p
.send_quota
= m_bandwidth_limit
[upload_channel
].quota_left();
2457 p
.receive_quota
= m_bandwidth_limit
[download_channel
].quota_left();
2458 if (m_download_queue
.empty()) p
.request_timeout
= -1;
2459 else p
.request_timeout
= total_seconds(m_requested
- now
) + m_ses
.settings().request_timeout
2461 #ifndef TORRENT_DISABLE_GEO_IP
2462 p
.inet_as_name
= m_inet_as_name
;
2465 #ifndef TORRENT_DISABLE_RESOLVE_COUNTRIES
2466 p
.country
[0] = m_country
[0];
2467 p
.country
[1] = m_country
[1];
2470 p
.total_download
= statistics().total_payload_download();
2471 p
.total_upload
= statistics().total_payload_upload();
2473 if (m_bandwidth_limit
[upload_channel
].throttle() == bandwidth_limit::inf
)
2474 p
.upload_limit
= -1;
2476 p
.upload_limit
= m_bandwidth_limit
[upload_channel
].throttle();
2478 if (m_bandwidth_limit
[download_channel
].throttle() == bandwidth_limit::inf
)
2479 p
.download_limit
= -1;
2481 p
.download_limit
= m_bandwidth_limit
[download_channel
].throttle();
2483 p
.load_balancing
= total_free_upload();
2485 p
.download_queue_length
= int(download_queue().size() + m_request_queue
.size());
2486 p
.requests_in_buffer
= int(m_requests_in_buffer
.size());
2487 p
.target_dl_queue_length
= int(desired_queue_size());
2488 p
.upload_queue_length
= int(upload_queue().size());
2490 if (boost::optional
<piece_block_progress
> ret
= downloading_piece_progress())
2492 p
.downloading_piece_index
= ret
->piece_index
;
2493 p
.downloading_block_index
= ret
->block_index
;
2494 p
.downloading_progress
= ret
->bytes_downloaded
;
2495 p
.downloading_total
= ret
->full_block_bytes
;
2499 p
.downloading_piece_index
= -1;
2500 p
.downloading_block_index
= -1;
2501 p
.downloading_progress
= 0;
2502 p
.downloading_total
= 0;
2505 p
.pieces
= get_bitfield();
2506 p
.last_request
= now
- m_last_request
;
2507 p
.last_active
= now
- (std::max
)(m_last_sent
, m_last_receive
);
2509 // this will set the flags so that we can update them later
2511 get_specific_peer_info(p
);
2513 p
.flags
|= is_seed() ? peer_info::seed
: 0;
2514 p
.flags
|= m_snubbed
? peer_info::snubbed
: 0;
2515 p
.flags
|= m_upload_only
? peer_info::upload_only
: 0;
2516 if (peer_info_struct())
2518 policy::peer
* pi
= peer_info_struct();
2519 p
.source
= pi
->source
;
2520 p
.failcount
= pi
->failcount
;
2521 p
.num_hashfails
= pi
->hashfails
;
2522 p
.flags
|= pi
->on_parole
? peer_info::on_parole
: 0;
2523 p
.flags
|= pi
->optimistically_unchoked
? peer_info::optimistic_unchoke
: 0;
2524 #ifndef TORRENT_DISABLE_GEO_IP
2525 p
.inet_as
= pi
->inet_as
->first
;
2532 p
.num_hashfails
= 0;
2533 p
.remote_dl_rate
= 0;
2534 #ifndef TORRENT_DISABLE_GEO_IP
2539 p
.remote_dl_rate
= m_remote_dl_rate
;
2540 p
.send_buffer_size
= m_send_buffer
.capacity();
2541 p
.used_send_buffer
= m_send_buffer
.size();
2542 p
.receive_buffer_size
= m_recv_buffer
.capacity() + m_disk_recv_buffer_size
;
2543 p
.used_receive_buffer
= m_recv_pos
;
2544 p
.write_state
= m_channel_state
[upload_channel
];
2545 p
.read_state
= m_channel_state
[download_channel
];
2548 // allocates a disk buffer of size 'disk_buffer_size' and replaces the
2549 // end of the current receive buffer with it. i.e. the receive pos
2550 // must be <= packet_size - disk_buffer_size
2551 // the disk buffer can be accessed through release_disk_receive_buffer()
2552 // when it is queried, the responsibility to free it is transferred
2554 bool peer_connection::allocate_disk_receive_buffer(int disk_buffer_size
)
2558 TORRENT_ASSERT(m_packet_size
> 0);
2559 TORRENT_ASSERT(m_recv_pos
<= m_packet_size
- disk_buffer_size
);
2560 TORRENT_ASSERT(!m_disk_recv_buffer
);
2561 TORRENT_ASSERT(disk_buffer_size
<= 16 * 1024);
2563 if (disk_buffer_size
> 16 * 1024)
2565 disconnect("invalid piece size", 2);
2569 m_disk_recv_buffer
.reset(m_ses
.allocate_disk_buffer());
2570 if (!m_disk_recv_buffer
)
2572 disconnect("out of memory");
2575 m_disk_recv_buffer_size
= disk_buffer_size
;
2579 char* peer_connection::release_disk_receive_buffer()
2581 m_disk_recv_buffer_size
= 0;
2582 return m_disk_recv_buffer
.release();
2585 void peer_connection::cut_receive_buffer(int size
, int packet_size
)
2589 TORRENT_ASSERT(packet_size
> 0);
2590 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= size
);
2591 TORRENT_ASSERT(int(m_recv_buffer
.size()) >= m_recv_pos
);
2592 TORRENT_ASSERT(m_recv_pos
>= size
);
2595 std::memmove(&m_recv_buffer
[0], &m_recv_buffer
[0] + size
, m_recv_pos
- size
);
2600 std::fill(m_recv_buffer
.begin() + m_recv_pos
, m_recv_buffer
.end(), 0);
2603 m_packet_size
= packet_size
;
2606 void peer_connection::calc_ip_overhead()
2608 m_statistics
.calc_ip_overhead();
2611 void peer_connection::second_tick(float tick_interval
)
2613 ptime
now(time_now());
2614 boost::intrusive_ptr
<peer_connection
> me(self());
2616 // the invariant check must be run before me is destructed
2617 // in case the peer got disconnected
2620 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2621 if (!t
|| m_disconnecting
)
2623 m_ses
.m_half_open
.done(m_connection_ticket
);
2624 m_connecting
= false;
2625 disconnect("torrent aborted");
2631 #ifndef TORRENT_DISABLE_EXTENSIONS
2632 for (extension_list_t::iterator i
= m_extensions
.begin()
2633 , end(m_extensions
.end()); i
!= end
; ++i
)
2637 if (is_disconnecting()) return;
2640 // if the peer hasn't said a thing for a certain
2641 // time, it is considered to have timed out
2643 d
= now
- m_last_receive
;
2644 if (d
> seconds(m_timeout
) && !m_connecting
)
2646 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2647 (*m_logger
) << time_now_string() << " *** LAST ACTIVITY [ "
2648 << total_seconds(d
) << " seconds ago ] ***\n";
2650 disconnect("timed out: inactivity");
2654 // do not stall waiting for a handshake
2657 && d
> seconds(m_ses
.settings().handshake_timeout
))
2659 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2660 (*m_logger
) << time_now_string() << " *** NO HANDSHAKE [ waited "
2661 << total_seconds(d
) << " seconds ] ***\n";
2663 disconnect("timed out: no handshake");
2667 // disconnect peers that we unchoked, but
2668 // they didn't send a request within 20 seconds.
2669 // but only if we're a seed
2670 d
= now
- (std::max
)(m_last_unchoke
, m_last_incoming_request
);
2672 && m_requests
.empty()
2674 && m_peer_interested
2675 && t
&& t
->is_finished()
2678 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2679 (*m_logger
) << time_now_string() << " *** NO REQUEST [ t: "
2680 << total_seconds(d
) << " ] ***\n";
2682 disconnect("timed out: no request when unchoked");
2686 // if the peer hasn't become interested and we haven't
2687 // become interested in the peer for 10 minutes, it
2688 // has also timed out.
2691 d1
= now
- m_became_uninterested
;
2692 d2
= now
- m_became_uninteresting
;
2693 time_duration time_limit
= seconds(
2694 m_ses
.settings().inactivity_timeout
);
2696 // don't bother disconnect peers we haven't been interested
2697 // in (and that hasn't been interested in us) for a while
2698 // unless we have used up all our connection slots
2700 && !m_peer_interested
2703 && (m_ses
.num_connections() >= m_ses
.max_connections()
2704 || (t
&& t
->num_peers() >= t
->max_connections())))
2706 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2707 (*m_logger
) << time_now_string() << " *** MUTUAL NO INTEREST [ "
2708 "t1: " << total_seconds(d1
) << " | "
2709 "t2: " << total_seconds(d2
) << " ] ***\n";
2711 disconnect("timed out: no interest");
2715 if (!m_download_queue
.empty()
2716 && now
> m_requested
+ seconds(m_ses
.settings().request_timeout
2717 + m_timeout_extend
))
2722 // if we haven't sent something in too long, send a keep-alive
2725 m_ignore_bandwidth_limits
= m_ses
.settings().ignore_limits_on_local_network
2726 && on_local_network();
2728 m_statistics
.second_tick(tick_interval
);
2730 if (m_statistics
.upload_payload_rate() > m_upload_rate_peak
)
2732 m_upload_rate_peak
= m_statistics
.upload_payload_rate();
2734 if (m_statistics
.download_payload_rate() > m_download_rate_peak
)
2736 m_download_rate_peak
= m_statistics
.download_payload_rate();
2737 #ifndef TORRENT_DISABLE_GEO_IP
2738 if (peer_info_struct())
2740 std::pair
<const int, int>* as_stats
= peer_info_struct()->inet_as
;
2741 if (as_stats
&& as_stats
->second
< m_download_rate_peak
)
2742 as_stats
->second
= m_download_rate_peak
;
2746 if (is_disconnecting()) return;
2748 if (!t
->valid_metadata()) return;
2750 // calculate the desired download queue size
2751 const float queue_time
= m_ses
.settings().request_queue_time
;
2752 // (if the latency is more than this, the download will stall)
2753 // so, the queue size is queue_time * down_rate / 16 kiB
2754 // (16 kB is the size of each request)
2755 // the minimum number of requests is 2 and the maximum is 48
2756 // the block size doesn't have to be 16. So we first query the
2758 const int block_size
= m_request_large_blocks
2759 ? t
->torrent_file().piece_length() : t
->block_size();
2760 TORRENT_ASSERT(block_size
> 0);
2764 m_desired_queue_size
= 1;
2768 m_desired_queue_size
= static_cast<int>(queue_time
2769 * statistics().download_rate() / block_size
);
2770 if (m_desired_queue_size
> m_max_out_request_queue
)
2771 m_desired_queue_size
= m_max_out_request_queue
;
2772 if (m_desired_queue_size
< min_request_queue
)
2773 m_desired_queue_size
= min_request_queue
;
2775 if (m_desired_queue_size
== m_max_out_request_queue
2776 && t
->alerts().should_post
<performance_alert
>())
2778 t
->alerts().post_alert(performance_alert(t
->get_handle()
2779 , performance_alert::outstanding_request_limit_reached
));
2783 if (!m_download_queue
.empty()
2784 && now
- m_last_piece
> seconds(m_ses
.settings().piece_timeout
2785 + m_timeout_extend
))
2787 // this peer isn't sending the pieces we've
2788 // requested (this has been observed by BitComet)
2789 // in this case we'll clear our download queue and
2790 // re-request the blocks.
2791 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
2792 (*m_logger
) << time_now_string()
2793 << " *** PIECE_REQUESTS TIMED OUT [ " << (int)m_download_queue
.size()
2794 << " " << total_seconds(now
- m_last_piece
) << "] ***\n";
2800 // If the client sends more data
2801 // we send it data faster, otherwise, slower.
2802 // It will also depend on how much data the
2803 // client has sent us. This is the mean to
2804 // maintain the share ratio given by m_ratio
2807 if (t
->is_finished() || is_choked() || t
->ratio() == 0.0f
)
2809 // if we have downloaded more than one piece more
2810 // than we have uploaded OR if we are a seed
2811 // have an unlimited upload rate
2812 m_bandwidth_limit
[upload_channel
].throttle(m_upload_limit
);
2816 size_type bias
= 0x10000 + 2 * t
->block_size() + m_free_upload
;
2818 double break_even_time
= 15; // seconds.
2819 size_type have_uploaded
= m_statistics
.total_payload_upload();
2820 size_type have_downloaded
= m_statistics
.total_payload_download();
2821 double download_speed
= m_statistics
.download_rate();
2823 size_type soon_downloaded
=
2824 have_downloaded
+ (size_type
)(download_speed
* break_even_time
*1.5);
2826 if (t
->ratio() != 1.f
)
2827 soon_downloaded
= (size_type
)(soon_downloaded
*(double)t
->ratio());
2829 double upload_speed_limit
= (std::min
)((soon_downloaded
- have_uploaded
2830 + bias
) / break_even_time
, double(m_upload_limit
));
2832 upload_speed_limit
= (std::min
)(upload_speed_limit
,
2833 (double)(std::numeric_limits
<int>::max
)());
2835 m_bandwidth_limit
[upload_channel
].throttle(
2836 (std::min
)((std::max
)((int)upload_speed_limit
, 20)
2840 // update once every minute
2841 if (now
- m_remote_dl_update
>= seconds(60))
2843 float factor
= 0.6666666666667f
;
2845 if (m_remote_dl_rate
== 0) factor
= 0.0f
;
2847 m_remote_dl_rate
= int((m_remote_dl_rate
* factor
) +
2848 ((m_remote_bytes_dled
* (1.0f
-factor
)) / 60.f
));
2850 m_remote_bytes_dled
= 0;
2851 m_remote_dl_update
= now
;
2857 void peer_connection::snub_peer()
2861 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2867 if (m_ses
.m_alerts
.should_post
<peer_snubbed_alert
>())
2869 m_ses
.m_alerts
.post_alert(peer_snubbed_alert(t
->get_handle()
2870 , m_remote
, m_peer_id
));
2873 m_desired_queue_size
= 1;
2877 m_timeout_extend
+= m_ses
.settings().request_timeout
;
2880 if (!t
->has_picker()) return;
2881 piece_picker
& picker
= t
->picker();
2883 piece_block
r(-1, -1);
2884 // time out the last request in the queue
2885 if (!m_request_queue
.empty())
2887 r
= m_request_queue
.back();
2888 m_request_queue
.pop_back();
2892 TORRENT_ASSERT(!m_download_queue
.empty());
2893 r
= m_download_queue
.back().block
;
2895 // only time out a request if it blocks the piece
2896 // from being completed (i.e. no free blocks to
2898 piece_picker::downloading_piece p
;
2899 picker
.piece_info(r
.piece_index
, p
);
2900 int free_blocks
= picker
.blocks_in_piece(r
.piece_index
)
2901 - p
.finished
- p
.writing
- p
.requested
;
2902 if (free_blocks
> 0)
2904 m_timeout_extend
+= m_ses
.settings().request_timeout
;
2908 if (m_ses
.m_alerts
.should_post
<block_timeout_alert
>())
2910 m_ses
.m_alerts
.post_alert(block_timeout_alert(t
->get_handle()
2911 , remote(), pid(), r
.block_index
, r
.piece_index
));
2913 m_download_queue
.pop_back();
2915 if (!m_download_queue
.empty() || !m_request_queue
.empty())
2916 m_timeout_extend
+= m_ses
.settings().request_timeout
;
2918 m_desired_queue_size
= 2;
2919 request_a_block(*t
, *this);
2920 m_desired_queue_size
= 1;
2922 // abort the block after the new one has
2923 // been requested in order to prevent it from
2924 // picking the same block again, stalling the
2925 // same piece indefinitely.
2926 if (r
!= piece_block(-1, -1))
2927 picker
.abort_download(r
);
2929 send_block_requests();
2932 void peer_connection::fill_send_buffer()
2936 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2939 // only add new piece-chunks if the send buffer is small enough
2940 // otherwise there will be no end to how large it will be!
2942 int buffer_size_watermark
= int(m_statistics
.upload_rate()) / 2;
2943 if (buffer_size_watermark
< 512) buffer_size_watermark
= 512;
2944 else if (buffer_size_watermark
> m_ses
.settings().send_buffer_watermark
)
2945 buffer_size_watermark
= m_ses
.settings().send_buffer_watermark
;
2947 while (!m_requests
.empty()
2948 && (send_buffer_size() + m_reading_bytes
< buffer_size_watermark
))
2950 TORRENT_ASSERT(t
->valid_metadata());
2951 peer_request
& r
= m_requests
.front();
2953 TORRENT_ASSERT(r
.piece
>= 0);
2954 TORRENT_ASSERT(r
.piece
< (int)m_have_piece
.size());
2955 TORRENT_ASSERT(t
->have_piece(r
.piece
));
2956 TORRENT_ASSERT(r
.start
+ r
.length
<= t
->torrent_file().piece_size(r
.piece
));
2957 TORRENT_ASSERT(r
.length
> 0 && r
.start
>= 0);
2959 t
->filesystem().async_read(r
, bind(&peer_connection::on_disk_read_complete
2960 , self(), _1
, _2
, r
));
2961 m_reading_bytes
+= r
.length
;
2963 m_requests
.erase(m_requests
.begin());
2967 void peer_connection::on_disk_read_complete(int ret
, disk_io_job
const& j
, peer_request r
)
2969 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
2971 m_reading_bytes
-= r
.length
;
2973 disk_buffer_holder
buffer(m_ses
, j
.buffer
);
2975 if (ret
!= r
.length
|| m_torrent
.expired())
2977 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
2980 disconnect(j
.str
.c_str());
2984 if (t
->alerts().should_post
<file_error_alert
>())
2985 t
->alerts().post_alert(file_error_alert(j
.error_file
, t
->get_handle(), j
.str
));
2986 t
->set_error(j
.str
);
2991 #ifdef TORRENT_VERBOSE_LOGGING
2992 (*m_logger
) << time_now_string()
2993 << " ==> PIECE [ piece: " << r
.piece
<< " | s: " << r
.start
2994 << " | l: " << r
.length
<< " ]\n";
2997 write_piece(r
, buffer
);
3001 void peer_connection::assign_bandwidth(int channel
, int amount
)
3003 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3005 #ifdef TORRENT_VERBOSE_LOGGING
3006 (*m_logger
) << "bandwidth [ " << channel
<< " ] + " << amount
<< "\n";
3009 m_bandwidth_limit
[channel
].assign(amount
);
3010 TORRENT_ASSERT(m_channel_state
[channel
] == peer_info::bw_global
);
3011 m_channel_state
[channel
] = peer_info::bw_idle
;
3012 if (channel
== upload_channel
)
3016 else if (channel
== download_channel
)
3022 void peer_connection::expire_bandwidth(int channel
, int amount
)
3024 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3026 m_bandwidth_limit
[channel
].expire(amount
);
3027 if (channel
== upload_channel
)
3031 else if (channel
== download_channel
)
3037 void peer_connection::setup_send()
3039 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3041 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
3043 shared_ptr
<torrent
> t
= m_torrent
.lock();
3045 if (m_bandwidth_limit
[upload_channel
].quota_left() == 0
3046 && !m_send_buffer
.empty()
3049 && !m_ignore_bandwidth_limits
)
3051 // in this case, we have data to send, but no
3052 // bandwidth. So, we simply request bandwidth
3055 if (m_bandwidth_limit
[upload_channel
].max_assignable() > 0)
3057 int priority
= is_interesting() * 2 + m_requests_in_buffer
.size();
3058 // peers that we are not interested in are non-prioritized
3059 m_channel_state
[upload_channel
] = peer_info::bw_torrent
;
3060 t
->request_bandwidth(upload_channel
, self()
3061 , m_send_buffer
.size(), priority
);
3062 #ifdef TORRENT_VERBOSE_LOGGING
3063 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ upload prio: "
3064 << priority
<< "]\n";
3073 #ifdef TORRENT_VERBOSE_LOGGING
3074 (*m_logger
) << time_now_string() << " *** CANNOT WRITE ["
3075 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3076 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3077 " buf: " << m_send_buffer
.size() <<
3078 " connecting: " << (m_connecting
?"yes":"no") <<
3084 // send the actual buffer
3085 if (!m_send_buffer
.empty())
3087 int amount_to_send
= m_send_buffer
.size();
3088 int quota_left
= m_bandwidth_limit
[upload_channel
].quota_left();
3089 if (!m_ignore_bandwidth_limits
&& amount_to_send
> quota_left
)
3090 amount_to_send
= quota_left
;
3092 TORRENT_ASSERT(amount_to_send
> 0);
3094 #ifdef TORRENT_VERBOSE_LOGGING
3095 (*m_logger
) << time_now_string() << " *** ASYNC_WRITE [ bytes: " << amount_to_send
<< " ]\n";
3097 std::list
<asio::const_buffer
> const& vec
= m_send_buffer
.build_iovec(amount_to_send
);
3098 m_socket
->async_write_some(vec
, bind(&peer_connection::on_send_data
, self(), _1
, _2
));
3100 m_channel_state
[upload_channel
] = peer_info::bw_network
;
3104 void peer_connection::setup_receive()
3106 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3110 if (m_channel_state
[download_channel
] != peer_info::bw_idle
) return;
3112 shared_ptr
<torrent
> t
= m_torrent
.lock();
3114 if (m_bandwidth_limit
[download_channel
].quota_left() == 0
3117 && !m_ignore_bandwidth_limits
)
3119 if (m_bandwidth_limit
[download_channel
].max_assignable() > 0)
3121 #ifdef TORRENT_VERBOSE_LOGGING
3122 (*m_logger
) << time_now_string() << " *** REQUEST_BANDWIDTH [ download ]\n";
3124 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_idle
);
3125 m_channel_state
[download_channel
] = peer_info::bw_torrent
;
3126 t
->request_bandwidth(download_channel
, self()
3127 , m_download_queue
.size() * 16 * 1024 + 30, m_priority
);
3134 #ifdef TORRENT_VERBOSE_LOGGING
3135 (*m_logger
) << time_now_string() << " *** CANNOT READ ["
3136 " quota: " << m_bandwidth_limit
[download_channel
].quota_left() <<
3137 " ignore: " << (m_ignore_bandwidth_limits
?"yes":"no") <<
3138 " outstanding: " << m_outstanding_writing_bytes
<<
3139 " outstanding-limit: " << m_ses
.settings().max_outstanding_disk_bytes_per_connection
<<
3145 TORRENT_ASSERT(m_packet_size
> 0);
3146 int max_receive
= m_packet_size
- m_recv_pos
;
3147 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3148 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3149 max_receive
= quota_left
;
3151 if (max_receive
== 0) return;
3153 TORRENT_ASSERT(m_recv_pos
>= 0);
3154 TORRENT_ASSERT(m_packet_size
> 0);
3155 TORRENT_ASSERT(can_read());
3156 #ifdef TORRENT_VERBOSE_LOGGING
3157 (*m_logger
) << time_now_string() << " *** ASYNC_READ [ max: " << max_receive
<< " bytes ]\n";
3160 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3162 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3163 m_recv_buffer
.resize(regular_buffer_size
);
3165 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3167 // only receive into regular buffer
3168 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3169 m_socket
->async_read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3170 , max_receive
), bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3172 else if (m_recv_pos
>= regular_buffer_size
)
3174 // only receive into disk buffer
3175 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3176 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3177 m_socket
->async_read_some(asio::buffer(m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
3179 , bind(&peer_connection::on_receive_data
, self(), _1
, _2
));
3183 // receive into both regular and disk buffer
3184 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3185 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3186 TORRENT_ASSERT(max_receive
- regular_buffer_size
3187 + m_recv_pos
<= m_disk_recv_buffer_size
);
3189 boost::array
<asio::mutable_buffer
, 2> vec
;
3190 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3191 , regular_buffer_size
- m_recv_pos
);
3192 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3193 , max_receive
- regular_buffer_size
+ m_recv_pos
);
3194 m_socket
->async_read_some(vec
, bind(&peer_connection::on_receive_data
3197 m_channel_state
[download_channel
] = peer_info::bw_network
;
3200 #ifndef TORRENT_DISABLE_ENCRYPTION
3202 // returns the last 'bytes' from the receive buffer
3203 std::pair
<buffer::interval
, buffer::interval
> peer_connection::wr_recv_buffers(int bytes
)
3205 TORRENT_ASSERT(bytes
<= m_recv_pos
);
3207 std::pair
<buffer::interval
, buffer::interval
> vec
;
3208 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3209 TORRENT_ASSERT(regular_buffer_size
>= 0);
3210 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
)
3212 vec
.first
= buffer::interval(&m_recv_buffer
[0]
3213 + m_recv_pos
- bytes
, &m_recv_buffer
[0] + m_recv_pos
);
3214 vec
.second
= buffer::interval(0,0);
3216 else if (m_recv_pos
- bytes
>= regular_buffer_size
)
3218 vec
.first
= buffer::interval(m_disk_recv_buffer
.get() + m_recv_pos
3219 - regular_buffer_size
- bytes
, m_disk_recv_buffer
.get() + m_recv_pos
3220 - regular_buffer_size
);
3221 vec
.second
= buffer::interval(0,0);
3225 TORRENT_ASSERT(m_recv_pos
- bytes
< regular_buffer_size
);
3226 TORRENT_ASSERT(m_recv_pos
> regular_buffer_size
);
3227 vec
.first
= buffer::interval(&m_recv_buffer
[0] + m_recv_pos
- bytes
3228 , &m_recv_buffer
[0] + regular_buffer_size
);
3229 vec
.second
= buffer::interval(m_disk_recv_buffer
.get()
3230 , m_disk_recv_buffer
.get() + m_recv_pos
- regular_buffer_size
);
3232 TORRENT_ASSERT(vec
.first
.left() + vec
.second
.left() == bytes
);
3237 void peer_connection::reset_recv_buffer(int packet_size
)
3239 TORRENT_ASSERT(packet_size
> 0);
3240 if (m_recv_pos
> m_packet_size
)
3242 cut_receive_buffer(m_packet_size
, packet_size
);
3246 m_packet_size
= packet_size
;
3249 void peer_connection::send_buffer(char const* buf
, int size
, int flags
)
3251 if (flags
== message_type_request
)
3252 m_requests_in_buffer
.push_back(m_send_buffer
.size() + size
);
3254 int free_space
= m_send_buffer
.space_in_last_buffer();
3255 if (free_space
> size
) free_space
= size
;
3258 m_send_buffer
.append(buf
, free_space
);
3261 #ifdef TORRENT_STATS
3262 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer: "
3263 << free_space
<< std::endl
;
3264 m_ses
.log_buffer_usage();
3267 if (size
<= 0) return;
3269 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3270 if (buffer
.first
== 0)
3272 disconnect("out of memory");
3275 TORRENT_ASSERT(buffer
.second
>= size
);
3276 std::memcpy(buffer
.first
, buf
, size
);
3277 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3278 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3279 #ifdef TORRENT_STATS
3280 m_ses
.m_buffer_usage_logger
<< log_time() << " send_buffer_alloc: " << size
<< std::endl
;
3281 m_ses
.log_buffer_usage();
3286 // TODO: change this interface to automatically call setup_send() when the
3287 // return value is destructed
3288 buffer::interval
peer_connection::allocate_send_buffer(int size
)
3290 TORRENT_ASSERT(size
> 0);
3291 char* insert
= m_send_buffer
.allocate_appendix(size
);
3294 std::pair
<char*, int> buffer
= m_ses
.allocate_buffer(size
);
3295 if (buffer
.first
== 0)
3297 disconnect("out of memory");
3298 return buffer::interval(0, 0);
3300 TORRENT_ASSERT(buffer
.second
>= size
);
3301 m_send_buffer
.append_buffer(buffer
.first
, buffer
.second
, size
3302 , bind(&session_impl::free_buffer
, boost::ref(m_ses
), _1
, buffer
.second
));
3303 buffer::interval
ret(buffer
.first
, buffer
.first
+ size
);
3304 #ifdef TORRENT_STATS
3305 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer_alloc: " << size
<< std::endl
;
3306 m_ses
.log_buffer_usage();
3312 #ifdef TORRENT_STATS
3313 m_ses
.m_buffer_usage_logger
<< log_time() << " allocate_buffer: " << size
<< std::endl
;
3314 m_ses
.log_buffer_usage();
3316 buffer::interval
ret(insert
, insert
+ size
);
3324 set_to_zero(T
& v
, bool cond
): m_val(v
), m_cond(cond
) {}
3325 void fire() { if (!m_cond
) return; m_cond
= false; m_val
= 0; }
3326 ~set_to_zero() { if (m_cond
) m_val
= 0; }
3332 // --------------------------
3334 // --------------------------
3336 // throws exception when the client should be disconnected
3337 void peer_connection::on_receive_data(const error_code
& error
3338 , std::size_t bytes_transferred
)
3340 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3344 TORRENT_ASSERT(m_channel_state
[download_channel
] == peer_info::bw_network
);
3345 m_channel_state
[download_channel
] = peer_info::bw_idle
;
3349 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3350 (*m_logger
) << time_now_string() << " **ERROR**: "
3351 << error
.message() << "[in peer_connection::on_receive_data]\n";
3353 on_receive(error
, bytes_transferred
);
3354 disconnect(error
.message().c_str());
3358 int max_receive
= 0;
3361 #ifdef TORRENT_VERBOSE_LOGGING
3362 (*m_logger
) << "read " << bytes_transferred
<< " bytes\n";
3364 // correct the dl quota usage, if not all of the buffer was actually read
3365 if (!m_ignore_bandwidth_limits
)
3366 m_bandwidth_limit
[download_channel
].use_quota(bytes_transferred
);
3368 if (m_disconnecting
) return;
3370 TORRENT_ASSERT(m_packet_size
> 0);
3371 TORRENT_ASSERT(bytes_transferred
> 0);
3373 m_last_receive
= time_now();
3374 m_recv_pos
+= bytes_transferred
;
3375 TORRENT_ASSERT(m_recv_pos
<= int(m_recv_buffer
.size()
3376 + m_disk_recv_buffer_size
));
3378 on_receive(error
, bytes_transferred
);
3380 TORRENT_ASSERT(m_packet_size
> 0);
3384 && (m_recv_buffer
.capacity() - m_packet_size
) > 128)
3386 buffer(m_packet_size
).swap(m_recv_buffer
);
3389 max_receive
= m_packet_size
- m_recv_pos
;
3390 int quota_left
= m_bandwidth_limit
[download_channel
].quota_left();
3391 if (!m_ignore_bandwidth_limits
&& max_receive
> quota_left
)
3392 max_receive
= quota_left
;
3394 if (max_receive
== 0) break;
3396 int regular_buffer_size
= m_packet_size
- m_disk_recv_buffer_size
;
3398 if (int(m_recv_buffer
.size()) < regular_buffer_size
)
3399 m_recv_buffer
.resize(regular_buffer_size
);
3402 if (!m_disk_recv_buffer
|| regular_buffer_size
>= m_recv_pos
+ max_receive
)
3404 // only receive into regular buffer
3405 TORRENT_ASSERT(m_recv_pos
+ max_receive
<= int(m_recv_buffer
.size()));
3406 bytes_transferred
= m_socket
->read_some(asio::buffer(&m_recv_buffer
[m_recv_pos
]
3407 , max_receive
), ec
);
3409 else if (m_recv_pos
>= regular_buffer_size
)
3411 // only receive into disk buffer
3412 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
>= 0);
3413 TORRENT_ASSERT(m_recv_pos
- regular_buffer_size
+ max_receive
<= m_disk_recv_buffer_size
);
3414 bytes_transferred
= m_socket
->read_some(asio::buffer(m_disk_recv_buffer
.get()
3415 + m_recv_pos
- regular_buffer_size
, (std::min
)(m_packet_size
3416 - m_recv_pos
, max_receive
)), ec
);
3420 // receive into both regular and disk buffer
3421 TORRENT_ASSERT(max_receive
+ m_recv_pos
> regular_buffer_size
);
3422 TORRENT_ASSERT(m_recv_pos
< regular_buffer_size
);
3423 TORRENT_ASSERT(max_receive
- regular_buffer_size
3424 + m_recv_pos
<= m_disk_recv_buffer_size
);
3426 boost::array
<asio::mutable_buffer
, 2> vec
;
3427 vec
[0] = asio::buffer(&m_recv_buffer
[m_recv_pos
]
3428 , regular_buffer_size
- m_recv_pos
);
3429 vec
[1] = asio::buffer(m_disk_recv_buffer
.get()
3430 , (std::min
)(m_disk_recv_buffer_size
3431 , max_receive
- regular_buffer_size
+ m_recv_pos
));
3432 bytes_transferred
= m_socket
->read_some(vec
, ec
);
3434 if (ec
&& ec
!= asio::error::would_block
)
3436 disconnect(ec
.message().c_str());
3439 if (ec
== asio::error::would_block
) break;
3441 while (bytes_transferred
> 0);
3446 bool peer_connection::can_write() const
3448 // if we have requests or pending data to be sent or announcements to be made
3449 // we want to send data
3450 return !m_send_buffer
.empty()
3451 && (m_bandwidth_limit
[upload_channel
].quota_left() > 0
3452 || m_ignore_bandwidth_limits
)
3456 bool peer_connection::can_read() const
3458 bool ret
= (m_bandwidth_limit
[download_channel
].quota_left() > 0
3459 || m_ignore_bandwidth_limits
)
3461 && m_outstanding_writing_bytes
<
3462 m_ses
.settings().max_outstanding_disk_bytes_per_connection
;
3467 void peer_connection::connect(int ticket
)
3472 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3473 (*m_ses
.m_logger
) << time_now_string() << " CONNECTING: " << m_remote
.address().to_string(ec
)
3474 << ":" << m_remote
.port() << "\n";
3477 m_connection_ticket
= ticket
;
3478 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3481 TORRENT_ASSERT(m_connecting
);
3485 disconnect("torrent aborted");
3489 m_socket
->open(t
->get_interface().protocol(), ec
);
3492 disconnect(ec
.message().c_str());
3496 // set the socket to non-blocking, so that we can
3497 // read the entire buffer on each read event we get
3498 tcp::socket::non_blocking_io
ioc(true);
3499 m_socket
->io_control(ioc
, ec
);
3502 disconnect(ec
.message().c_str());
3505 m_socket
->bind(t
->get_interface(), ec
);
3508 disconnect(ec
.message().c_str());
3511 m_socket
->async_connect(m_remote
3512 , bind(&peer_connection::on_connection_complete
, self(), _1
));
3513 m_connect
= time_now();
3515 if (t
->alerts().should_post
<peer_connect_alert
>())
3517 t
->alerts().post_alert(peer_connect_alert(
3518 t
->get_handle(), remote(), pid()));
3522 void peer_connection::on_connection_complete(error_code
const& e
)
3524 ptime completed
= time_now();
3526 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3530 m_rtt
= total_milliseconds(completed
- m_connect
);
3532 if (m_disconnecting
) return;
3534 m_connecting
= false;
3535 m_ses
.m_half_open
.done(m_connection_ticket
);
3539 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING || defined TORRENT_ERROR_LOGGING
3540 (*m_ses
.m_logger
) << time_now_string() << " CONNECTION FAILED: " << m_remote
.address().to_string()
3541 << ": " << e
.message() << "\n";
3543 disconnect(e
.message().c_str(), 1);
3547 if (m_disconnecting
) return;
3548 m_last_receive
= time_now();
3550 // this means the connection just succeeded
3552 TORRENT_ASSERT(m_socket
);
3553 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_LOGGING
3554 (*m_ses
.m_logger
) << time_now_string() << " COMPLETED: " << m_remote
.address().to_string()
3555 << " rtt = " << m_rtt
<< "\n";
3559 if (m_remote
== m_socket
->local_endpoint(ec
))
3561 // if the remote endpoint is the same as the local endpoint, we're connected
3563 disconnect("connected to ourselves", 1);
3567 if (m_remote
.address().is_v4())
3570 m_socket
->set_option(type_of_service(m_ses
.settings().peer_tos
), ec
);
3578 // --------------------------
3580 // --------------------------
3582 // throws exception when the client should be disconnected
3583 void peer_connection::on_send_data(error_code
const& error
3584 , std::size_t bytes_transferred
)
3586 session_impl::mutex_t::scoped_lock
l(m_ses
.m_mutex
);
3590 TORRENT_ASSERT(m_channel_state
[upload_channel
] == peer_info::bw_network
);
3592 m_send_buffer
.pop_front(bytes_transferred
);
3594 for (std::vector
<int>::iterator i
= m_requests_in_buffer
.begin()
3595 , end(m_requests_in_buffer
.end()); i
!= end
; ++i
)
3596 *i
-= bytes_transferred
;
3598 while (!m_requests_in_buffer
.empty()
3599 && m_requests_in_buffer
.front() <= 0)
3600 m_requests_in_buffer
.erase(m_requests_in_buffer
.begin());
3602 m_channel_state
[upload_channel
] = peer_info::bw_idle
;
3604 if (!m_ignore_bandwidth_limits
)
3605 m_bandwidth_limit
[upload_channel
].use_quota(bytes_transferred
);
3607 #ifdef TORRENT_VERBOSE_LOGGING
3608 (*m_logger
) << "wrote " << bytes_transferred
<< " bytes\n";
3613 #if defined TORRENT_VERBOSE_LOGGING || defined TORRENT_ERROR_LOGGING
3614 (*m_logger
) << "**ERROR**: " << error
.message() << " [in peer_connection::on_send_data]\n";
3616 disconnect(error
.message().c_str());
3619 if (m_disconnecting
) return;
3621 TORRENT_ASSERT(!m_connecting
);
3622 TORRENT_ASSERT(bytes_transferred
> 0);
3624 m_last_sent
= time_now();
3626 on_sent(error
, bytes_transferred
);
3633 void peer_connection::check_invariant() const
3635 TORRENT_ASSERT(bool(m_disk_recv_buffer
) == (m_disk_recv_buffer_size
> 0));
3637 boost::shared_ptr
<torrent
> t
= m_torrent
.lock();
3638 if (m_disconnecting
)
3641 TORRENT_ASSERT(m_disconnect_started
);
3643 else if (!m_in_constructor
)
3645 TORRENT_ASSERT(m_ses
.has_peer((peer_connection
*)this));
3649 // this assertion correct most of the time, but sometimes right when the
3650 // limit is changed it might break
3651 for (int i = 0; i < 2; ++i)
3653 // this peer is in the bandwidth history iff max_assignable < limit
3654 TORRENT_ASSERT((m_bandwidth_limit[i].max_assignable() < m_bandwidth_limit[i].throttle())
3655 == m_ses.m_bandwidth_manager[i]->is_in_history(this)
3656 || m_bandwidth_limit[i].throttle() == bandwidth_limit::inf);
3660 if (m_channel_state
[download_channel
] == peer_info::bw_torrent
3661 || m_channel_state
[download_channel
] == peer_info::bw_global
)
3662 TORRENT_ASSERT(m_bandwidth_limit
[download_channel
].quota_left() == 0);
3663 if (m_channel_state
[upload_channel
] == peer_info::bw_torrent
3664 || m_channel_state
[upload_channel
] == peer_info::bw_global
)
3665 TORRENT_ASSERT(m_bandwidth_limit
[upload_channel
].quota_left() == 0);
3667 std::set
<piece_block
> unique
;
3668 std::transform(m_download_queue
.begin(), m_download_queue
.end()
3669 , std::inserter(unique
, unique
.begin()), boost::bind(&pending_block::block
, _1
));
3670 std::copy(m_request_queue
.begin(), m_request_queue
.end(), std::inserter(unique
, unique
.begin()));
3671 TORRENT_ASSERT(unique
.size() == m_download_queue
.size() + m_request_queue
.size());
3674 TORRENT_ASSERT(m_peer_info
->prev_amount_upload
== 0);
3675 TORRENT_ASSERT(m_peer_info
->prev_amount_download
== 0);
3676 TORRENT_ASSERT(m_peer_info
->connection
== this
3677 || m_peer_info
->connection
== 0);
3679 if (m_peer_info
->optimistically_unchoked
)
3680 TORRENT_ASSERT(!is_choked());
3685 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3686 // since this connection doesn't have a torrent reference
3687 // no torrent should have a reference to this connection either
3688 for (aux::session_impl::torrent_map::const_iterator i
= m_ses
.m_torrents
.begin()
3689 , end(m_ses
.m_torrents
.end()); i
!= end
; ++i
)
3690 TORRENT_ASSERT(!i
->second
->has_peer((peer_connection
*)this));
3695 if (m_ses
.settings().close_redundant_connections
)
3697 // make sure upload only peers are disconnected
3698 if (t
->is_finished() && m_upload_only
)
3699 TORRENT_ASSERT(m_disconnect_started
);
3702 && m_bitfield_received
3703 && t
->are_files_checked())
3704 TORRENT_ASSERT(m_disconnect_started
);
3707 if (t
->is_finished())
3708 TORRENT_ASSERT(!m_interesting
);
3710 TORRENT_ASSERT(m_upload_only
);
3712 if (t
->has_picker())
3714 std::map
<piece_block
, int> num_requests
;
3715 for (torrent::const_peer_iterator i
= t
->begin(); i
!= t
->end(); ++i
)
3717 // make sure this peer is not a dangling pointer
3718 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3719 TORRENT_ASSERT(m_ses
.has_peer(*i
));
3721 peer_connection
const& p
= *(*i
);
3722 for (std::deque
<piece_block
>::const_iterator i
= p
.request_queue().begin()
3723 , end(p
.request_queue().end()); i
!= end
; ++i
)
3725 for (std::deque
<pending_block
>::const_iterator i
= p
.download_queue().begin()
3726 , end(p
.download_queue().end()); i
!= end
; ++i
)
3727 ++num_requests
[i
->block
];
3729 for (std::map
<piece_block
, int>::iterator i
= num_requests
.begin()
3730 , end(num_requests
.end()); i
!= end
; ++i
)
3732 if (!t
->picker().is_downloaded(i
->first
))
3733 TORRENT_ASSERT(t
->picker().num_peers(i
->first
) == i
->second
);
3736 #ifdef TORRENT_EXPENSIVE_INVARIANT_CHECKS
3739 policy::const_iterator i
= t
->get_policy().begin_peer();
3740 policy::const_iterator end
= t
->get_policy().end_peer();
3741 for (; i
!= end
; ++i
)
3743 if (&i
->second
== m_peer_info
) break;
3745 TORRENT_ASSERT(i
!= end
);
3748 if (t
->has_picker() && !t
->is_aborted())
3750 // make sure that pieces that have completed the download
3751 // of all their blocks are in the disk io thread's queue
3753 const std::vector
<piece_picker::downloading_piece
>& dl_queue
3754 = t
->picker().get_download_queue();
3755 for (std::vector
<piece_picker::downloading_piece
>::const_iterator i
=
3756 dl_queue
.begin(); i
!= dl_queue
.end(); ++i
)
3758 const int blocks_per_piece
= t
->picker().blocks_in_piece(i
->index
);
3760 bool complete
= true;
3761 for (int j
= 0; j
< blocks_per_piece
; ++j
)
3763 if (i
->info
[j
].state
== piece_picker::block_info::state_finished
)
3769 // this invariant is not valid anymore since the completion event
3770 // might be queued in the io service
3771 if (complete && !piece_failed)
3773 disk_io_job ret = m_ses.m_disk_thread.find_job(
3774 &t->filesystem(), -1, i->index);
3775 TORRENT_ASSERT(ret.action == disk_io_job::hash || ret.action == disk_io_job::write);
3776 TORRENT_ASSERT(ret.piece == i->index);
3781 // expensive when using checked iterators
3783 if (t->valid_metadata())
3785 int piece_count = std::count(m_have_piece.begin()
3786 , m_have_piece.end(), true);
3787 if (m_num_pieces != piece_count)
3789 TORRENT_ASSERT(false);
3794 // extremely expensive invariant check
3798 piece_picker& p = t->picker();
3799 const std::vector<piece_picker::downloading_piece>& dlq = p.get_download_queue();
3800 const int blocks_per_piece = static_cast<int>(
3801 t->torrent_file().piece_length() / t->block_size());
3803 for (std::vector<piece_picker::downloading_piece>::const_iterator i =
3804 dlq.begin(); i != dlq.end(); ++i)
3806 for (int j = 0; j < blocks_per_piece; ++j)
3808 if (std::find(m_request_queue.begin(), m_request_queue.end()
3809 , piece_block(i->index, j)) != m_request_queue.end()
3811 std::find(m_download_queue.begin(), m_download_queue.end()
3812 , piece_block(i->index, j)) != m_download_queue.end())
3814 TORRENT_ASSERT(i->info[j].peer == m_remote);
3818 TORRENT_ASSERT(i->info[j].peer != m_remote || i->info[j].finished);
3827 peer_connection::peer_speed_t
peer_connection::peer_speed()
3829 shared_ptr
<torrent
> t
= m_torrent
.lock();
3832 int download_rate
= int(statistics().download_payload_rate());
3833 int torrent_download_rate
= int(t
->statistics().download_payload_rate());
3835 if (download_rate
> 512 && download_rate
> torrent_download_rate
/ 16)
3837 else if (download_rate
> 4096 && download_rate
> torrent_download_rate
/ 64)
3839 else if (download_rate
< torrent_download_rate
/ 15 && m_speed
== fast
)
3841 else if (download_rate
< torrent_download_rate
/ 63 && m_speed
== medium
)
3847 void peer_connection::keep_alive()
3852 d
= time_now() - m_last_sent
;
3853 if (total_seconds(d
) < m_timeout
/ 2) return;
3855 if (m_connecting
) return;
3856 if (in_handshake()) return;
3858 // if the last send has not completed yet, do not send a keep
3860 if (m_channel_state
[upload_channel
] != peer_info::bw_idle
) return;
3862 #ifdef TORRENT_VERBOSE_LOGGING
3863 (*m_logger
) << time_now_string() << " ==> KEEPALIVE\n";
3866 m_last_sent
= time_now();
3870 bool peer_connection::is_seed() const
3872 // if m_num_pieces == 0, we probably don't have the
3874 return m_num_pieces
== (int)m_have_piece
.size() && m_num_pieces
> 0;