3 Copyright (c) 2007, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
34 #define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
36 #include <boost/shared_ptr.hpp>
37 #include <boost/intrusive_ptr.hpp>
38 #include <boost/function.hpp>
39 #include <boost/bind.hpp>
40 #include <boost/integer_traits.hpp>
41 #include <boost/thread/mutex.hpp>
44 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
48 #include "libtorrent/socket.hpp"
49 #include "libtorrent/invariant_check.hpp"
50 #include "libtorrent/assert.hpp"
51 #include "libtorrent/bandwidth_limit.hpp"
52 #include "libtorrent/bandwidth_queue_entry.hpp"
54 using boost::weak_ptr
;
55 using boost::shared_ptr
;
56 using boost::intrusive_ptr
;
60 namespace libtorrent
{
62 // the maximum block of bandwidth quota to
63 // hand out is 33kB. The block size may
64 // be smaller on lower limits
67 max_bandwidth_block_size
= 33000,
68 min_bandwidth_block_size
= 400
71 const time_duration bw_window_size
= seconds(1);
73 template<class PeerConnection
, class Torrent
>
76 history_entry(intrusive_ptr
<PeerConnection
> p
, weak_ptr
<Torrent
> t
78 : expires_at(exp
), amount(a
), peer(p
), tor(t
) {}
79 history_entry(int a
, ptime exp
)
80 : expires_at(exp
), amount(a
), peer(), tor() {}
83 intrusive_ptr
<PeerConnection
> peer
;
84 weak_ptr
<Torrent
> tor
;
88 T
clamp(T val
, T ceiling
, T floor
)
90 TORRENT_ASSERT(ceiling
>= floor
);
91 if (val
>= ceiling
) return ceiling
;
92 else if (val
<= floor
) return floor
;
99 assign_at_exit(T
& var
, T val
): var_(var
), val_(val
) {}
100 ~assign_at_exit() { var_
= val_
; }
105 template<class PeerConnection
, class Torrent
>
106 struct bandwidth_manager
108 bandwidth_manager(io_service
& ios
, int channel
109 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
114 , m_history_timer(m_ios
)
115 , m_limit(bandwidth_limit::inf
)
119 , m_in_hand_out_bandwidth(false)
122 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
124 m_log
.open("bandwidth_limiter.log", std::ios::trunc
);
125 m_start
= time_now();
129 void drain(int bytes
)
131 mutex_t::scoped_lock
l(m_mutex
);
132 TORRENT_ASSERT(bytes
>= 0);
133 m_drain_quota
+= bytes
;
134 if (m_drain_quota
> m_limit
* 5) m_drain_quota
= m_limit
* 5;
137 void throttle(int limit
)
139 mutex_t::scoped_lock
l(m_mutex
);
140 TORRENT_ASSERT(limit
>= 0);
146 mutex_t::scoped_lock
l(m_mutex
);
156 m_history_timer
.cancel();
160 bool is_queued(PeerConnection
const* peer
) const
162 mutex_t::scoped_lock
l(m_mutex
);
163 return is_queued(peer
);
166 bool is_queued(PeerConnection
const* peer
, boost::mutex::scoped_lock
& l
) const
168 for (typename
queue_t::const_iterator i
= m_queue
.begin()
169 , end(m_queue
.end()); i
!= end
; ++i
)
171 if (i
->peer
.get() == peer
) return true;
176 bool is_in_history(PeerConnection
const* peer
) const
178 mutex_t::scoped_lock
l(m_mutex
);
179 return is_in_history(peer
, l
);
182 bool is_in_history(PeerConnection
const* peer
, boost::mutex::scoped_lock
& l
) const
184 for (typename
history_t::const_iterator i
185 = m_history
.begin(), end(m_history
.end()); i
!= end
; ++i
)
187 if (i
->peer
.get() == peer
) return true;
193 int queue_size() const
195 mutex_t::scoped_lock
l(m_mutex
);
196 return m_queue
.size();
199 // non prioritized means that, if there's a line for bandwidth,
200 // others will cut in front of the non-prioritized peers.
201 // this is used by web seeds
202 void request_bandwidth(intrusive_ptr
<PeerConnection
> const& peer
203 , int blk
, int priority
)
205 mutex_t::scoped_lock
l(m_mutex
);
208 TORRENT_ASSERT(blk
> 0);
209 TORRENT_ASSERT(!is_queued(peer
.get(), l
));
211 // make sure this peer isn't already in line
212 // waiting for bandwidth
213 TORRENT_ASSERT(peer
->max_assignable_bandwidth(m_channel
) > 0);
215 typename
queue_t::reverse_iterator
i(m_queue
.rbegin());
216 while (i
!= m_queue
.rend() && priority
> i
->priority
)
221 m_queue
.insert(i
.base(), bw_queue_entry
<PeerConnection
, Torrent
>(peer
, blk
, priority
));
222 if (!m_queue
.empty()) hand_out_bandwidth(l
);
226 void check_invariant() const
228 int current_quota
= 0;
229 for (typename
history_t::const_iterator i
230 = m_history
.begin(), end(m_history
.end()); i
!= end
; ++i
)
232 current_quota
+= i
->amount
;
234 TORRENT_ASSERT(current_quota
== m_current_quota
);
236 typename
queue_t::const_iterator j
= m_queue
.begin();
237 if (j
!= m_queue
.end())
240 for (typename
queue_t::const_iterator i
= m_queue
.begin()
241 , end(m_queue
.end()); i
!= end
&& j
!= end
; ++i
, ++j
)
242 TORRENT_ASSERT(i
->priority
>= j
->priority
);
249 void add_history_entry(history_entry
<PeerConnection
, Torrent
> const& e
)
253 m_history
.push_front(e
);
254 m_current_quota
+= e
.amount
;
255 // in case the size > 1 there is already a timer
256 // active that will be invoked, no need to set one up
258 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
259 m_log
<< std::setw(7) << total_milliseconds(time_now() - m_start
) << " + "
260 " queue: " << std::setw(3) << m_queue
.size()
261 << " used: " << std::setw(7) << m_current_quota
262 << " limit: " << std::setw(7) << m_limit
263 << " history: " << std::setw(3) << m_history
.size()
266 if (m_history
.size() > 1) return;
271 TORRENT_ASSERT(e
.expires_at
> time_now());
272 m_history_timer
.expires_at(e
.expires_at
, ec
);
273 m_history_timer
.async_wait(bind(&bandwidth_manager::on_history_expire
, this, _1
));
276 void on_history_expire(error_code
const& e
)
280 mutex_t::scoped_lock
l(m_mutex
);
284 TORRENT_ASSERT(!m_history
.empty());
286 ptime
now(time_now());
287 while (!m_history
.empty() && m_history
.back().expires_at
<= now
)
289 history_entry
<PeerConnection
, Torrent
> e
= m_history
.back();
290 m_history
.pop_back();
291 m_current_quota
-= e
.amount
;
292 TORRENT_ASSERT(m_current_quota
>= 0);
294 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
295 m_log
<< std::setw(7) << total_milliseconds(time_now() - m_start
) << " - "
296 " queue: " << std::setw(3) << m_queue
.size()
297 << " used: " << std::setw(7) << m_current_quota
298 << " limit: " << std::setw(7) << m_limit
299 << " history: " << std::setw(3) << m_history
.size()
302 intrusive_ptr
<PeerConnection
> c
= e
.peer
;
304 shared_ptr
<Torrent
> t
= e
.tor
.lock();
306 if (!c
->is_disconnecting()) c
->expire_bandwidth(m_channel
, e
.amount
);
307 if (t
) t
->expire_bandwidth(m_channel
, e
.amount
);
311 // now, wait for the next chunk to expire
312 if (!m_history
.empty() && !m_abort
)
315 TORRENT_ASSERT(m_history
.back().expires_at
> now
);
316 m_history_timer
.expires_at(m_history
.back().expires_at
, ec
);
317 m_history_timer
.async_wait(bind(&bandwidth_manager::on_history_expire
, this, _1
));
320 // since some bandwidth just expired, it
321 // means we can hand out more (in case there
322 // are still consumers in line)
323 if (!m_queue
.empty()) hand_out_bandwidth(l
);
326 void hand_out_bandwidth(boost::mutex::scoped_lock
& l
)
328 // if we're already handing out bandwidth, just return back
329 // to the loop further down on the callstack
330 if (m_in_hand_out_bandwidth
) return;
331 m_in_hand_out_bandwidth
= true;
332 // set it to false when exiting function
333 assign_at_exit
<bool> sg(m_in_hand_out_bandwidth
, false);
337 ptime
now(time_now());
341 // available bandwidth to hand out
342 int amount
= limit
- m_current_quota
;
344 if (amount
<= 0) return;
346 if (m_drain_quota
> 0)
348 int drain_amount
= (std::min
)(m_drain_quota
, amount
);
349 m_drain_quota
-= drain_amount
;
350 amount
-= drain_amount
;
351 add_history_entry(history_entry
<PeerConnection
, Torrent
>(
352 drain_amount
, now
+ bw_window_size
));
356 while (!m_queue
.empty() && amount
> 0)
358 bw_queue_entry
<PeerConnection
, Torrent
> qe
= m_queue
.front();
359 TORRENT_ASSERT(qe
.max_block_size
> 0);
362 shared_ptr
<Torrent
> t
= qe
.torrent
.lock();
364 if (qe
.peer
->is_disconnecting())
367 t
->expire_bandwidth(m_channel
, qe
.max_block_size
);
372 // at this point, max_assignable may actually be zero. Since
373 // the rate limit of the peer might have changed while it
375 int max_assignable
= qe
.peer
->max_assignable_bandwidth(m_channel
);
376 if (max_assignable
== 0)
378 TORRENT_ASSERT(is_in_history(qe
.peer
.get(), l
));
383 // this is the limit of the block size. It depends on the throttle
384 // so that it can be closer to optimal. Larger block sizes will give lower
385 // granularity to the rate but will be more efficient. At high rates
386 // the block sizes are bigger and at low rates, the granularity
387 // is more important and block sizes are smaller
389 // the minimum rate that can be given is the block size, so, the
390 // block size must be smaller for lower rates. This is because
391 // the history window is one second, and the block will be forgotten
393 int block_size
= (std::min
)(qe
.peer
->bandwidth_throttle(m_channel
)
396 if (block_size
< min_bandwidth_block_size
)
398 block_size
= (std::min
)(int(min_bandwidth_block_size
), limit
);
400 else if (block_size
> max_bandwidth_block_size
)
402 if (limit
== bandwidth_limit::inf
)
404 block_size
= max_bandwidth_block_size
;
408 // try to make the block_size a divisor of
409 // m_limit to make the distributions as fair
411 // TODO: move this calculcation to where the limit
414 / (limit
/ max_bandwidth_block_size
);
417 if (block_size
> qe
.max_block_size
) block_size
= qe
.max_block_size
;
419 if (amount
< block_size
/ 4)
422 // m_queue.push_front(qe);
426 // so, hand out max_assignable, but no more than
427 // the available bandwidth (amount) and no more
428 // than the max_bandwidth_block_size
429 int hand_out_amount
= (std::min
)((std::min
)(block_size
, max_assignable
)
431 TORRENT_ASSERT(hand_out_amount
> 0);
432 amount
-= hand_out_amount
;
433 TORRENT_ASSERT(hand_out_amount
<= qe
.max_block_size
);
435 t
->assign_bandwidth(m_channel
, hand_out_amount
, qe
.max_block_size
);
436 qe
.peer
->assign_bandwidth(m_channel
, hand_out_amount
);
438 add_history_entry(history_entry
<PeerConnection
, Torrent
>(
439 qe
.peer
, t
, hand_out_amount
, now
+ bw_window_size
));
441 if (!tmp
.empty()) m_queue
.insert(m_queue
.begin(), tmp
.begin(), tmp
.end());
445 typedef boost::mutex mutex_t
;
446 mutable mutex_t m_mutex
;
448 // the io_service used for the timer
451 // the timer that is waiting for the entries
452 // in the history queue to expire (slide out
453 // of the history window)
454 deadline_timer m_history_timer
;
456 // the rate limit (bytes per second)
459 // bytes to drain without handing out to a peer
460 // used to deduct the IP overhead
463 // the sum of all recently handed out bandwidth blocks
466 // these are the consumers that want bandwidth
467 typedef std::deque
<bw_queue_entry
<PeerConnection
, Torrent
> > queue_t
;
470 // these are the consumers that have received bandwidth
472 typedef std::deque
<history_entry
<PeerConnection
, Torrent
> > history_t
;
475 // this is the channel within the consumers
476 // that bandwidth is assigned to (upload or download)
479 // this is true while we're in the hand_out_bandwidth loop
480 // to prevent recursive invocations to interfere
481 bool m_in_hand_out_bandwidth
;
485 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT