3 Copyright (c) 2007, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
34 #define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
36 #include <boost/shared_ptr.hpp>
37 #include <boost/intrusive_ptr.hpp>
38 #include <boost/function.hpp>
39 #include <boost/bind.hpp>
40 #include <boost/integer_traits.hpp>
41 #include <boost/thread/mutex.hpp>
44 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
48 #include "libtorrent/socket.hpp"
49 #include "libtorrent/invariant_check.hpp"
50 #include "libtorrent/assert.hpp"
51 #include "libtorrent/bandwidth_limit.hpp"
52 #include "libtorrent/bandwidth_queue_entry.hpp"
54 using boost::weak_ptr
;
55 using boost::shared_ptr
;
56 using boost::intrusive_ptr
;
60 namespace libtorrent
{
62 // the maximum block of bandwidth quota to
63 // hand out is 33kB. The block size may
64 // be smaller on lower limits
67 max_bandwidth_block_size
= 33000,
68 min_bandwidth_block_size
= 400
71 const time_duration bw_window_size
= seconds(1);
73 template<class PeerConnection
, class Torrent
>
76 history_entry(intrusive_ptr
<PeerConnection
> p
, weak_ptr
<Torrent
> t
78 : expires_at(exp
), amount(a
), peer(p
), tor(t
) {}
79 history_entry(int a
, ptime exp
)
80 : expires_at(exp
), amount(a
), peer(), tor() {}
83 intrusive_ptr
<PeerConnection
> peer
;
84 weak_ptr
<Torrent
> tor
;
88 T
clamp(T val
, T ceiling
, T floor
)
90 TORRENT_ASSERT(ceiling
>= floor
);
91 if (val
>= ceiling
) return ceiling
;
92 else if (val
<= floor
) return floor
;
99 assign_at_exit(T
& var
, T val
): var_(var
), val_(val
) {}
100 ~assign_at_exit() { var_
= val_
; }
105 template<class PeerConnection
, class Torrent
>
106 struct bandwidth_manager
108 bandwidth_manager(io_service
& ios
, int channel
109 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
114 , m_history_timer(m_ios
)
115 , m_limit(bandwidth_limit::inf
)
119 , m_in_hand_out_bandwidth(false)
122 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
124 m_log
.open("bandwidth_limiter.log", std::ios::trunc
);
125 m_start
= time_now();
129 void drain(int bytes
)
131 mutex_t::scoped_lock
l(m_mutex
);
132 TORRENT_ASSERT(bytes
>= 0);
133 m_drain_quota
+= bytes
;
134 if (m_drain_quota
> m_limit
* 5) m_drain_quota
= m_limit
* 5;
137 void throttle(int limit
)
139 mutex_t::scoped_lock
l(m_mutex
);
140 TORRENT_ASSERT(limit
>= 0);
146 mutex_t::scoped_lock
l(m_mutex
);
156 m_history_timer
.cancel();
160 bool is_queued(PeerConnection
const* peer
) const
162 mutex_t::scoped_lock
l(m_mutex
);
163 return is_queued(peer
);
166 bool is_queued(PeerConnection
const* peer
, boost::mutex::scoped_lock
& l
) const
168 for (typename
queue_t::const_iterator i
= m_queue
.begin()
169 , end(m_queue
.end()); i
!= end
; ++i
)
171 if (i
->peer
.get() == peer
) return true;
176 bool is_in_history(PeerConnection
const* peer
) const
178 mutex_t::scoped_lock
l(m_mutex
);
179 return is_in_history(peer
, l
);
182 bool is_in_history(PeerConnection
const* peer
, boost::mutex::scoped_lock
& l
) const
184 for (typename
history_t::const_iterator i
185 = m_history
.begin(), end(m_history
.end()); i
!= end
; ++i
)
187 if (i
->peer
.get() == peer
) return true;
193 int queue_size() const
195 mutex_t::scoped_lock
l(m_mutex
);
196 return m_queue
.size();
199 // non prioritized means that, if there's a line for bandwidth,
200 // others will cut in front of the non-prioritized peers.
201 // this is used by web seeds
202 void request_bandwidth(intrusive_ptr
<PeerConnection
> const& peer
203 , int blk
, int priority
)
205 mutex_t::scoped_lock
l(m_mutex
);
208 TORRENT_ASSERT(blk
> 0);
209 TORRENT_ASSERT(!is_queued(peer
.get(), l
));
211 // make sure this peer isn't already in line
212 // waiting for bandwidth
213 TORRENT_ASSERT(peer
->max_assignable_bandwidth(m_channel
) > 0);
215 typename
queue_t::reverse_iterator
i(m_queue
.rbegin());
216 while (i
!= m_queue
.rend() && priority
> i
->priority
)
221 m_queue
.insert(i
.base(), bw_queue_entry
<PeerConnection
, Torrent
>(peer
, blk
, priority
));
222 if (!m_queue
.empty()) hand_out_bandwidth(l
);
226 void check_invariant() const
228 int current_quota
= 0;
229 for (typename
history_t::const_iterator i
230 = m_history
.begin(), end(m_history
.end()); i
!= end
; ++i
)
232 current_quota
+= i
->amount
;
234 TORRENT_ASSERT(current_quota
== m_current_quota
);
236 typename
queue_t::const_iterator j
= m_queue
.begin();
237 if (j
!= m_queue
.end())
240 for (typename
queue_t::const_iterator i
= m_queue
.begin()
241 , end(m_queue
.end()); i
!= end
&& j
!= end
; ++i
, ++j
)
242 TORRENT_ASSERT(i
->priority
>= j
->priority
);
249 void add_history_entry(history_entry
<PeerConnection
, Torrent
> const& e
)
253 m_history
.push_front(e
);
254 m_current_quota
+= e
.amount
;
255 // in case the size > 1 there is already a timer
256 // active that will be invoked, no need to set one up
258 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
259 m_log
<< std::setw(7) << total_milliseconds(time_now() - m_start
) << " + "
260 " queue: " << std::setw(3) << m_queue
.size()
261 << " used: " << std::setw(7) << m_current_quota
262 << " limit: " << std::setw(7) << m_limit
263 << " history: " << std::setw(3) << m_history
.size()
266 if (m_history
.size() > 1) return;
271 m_history_timer
.expires_at(e
.expires_at
, ec
);
272 m_history_timer
.async_wait(bind(&bandwidth_manager::on_history_expire
, this, _1
));
275 void on_history_expire(error_code
const& e
)
279 mutex_t::scoped_lock
l(m_mutex
);
283 TORRENT_ASSERT(!m_history
.empty());
285 ptime
now(time_now());
286 while (!m_history
.empty() && m_history
.back().expires_at
<= now
)
288 history_entry
<PeerConnection
, Torrent
> e
= m_history
.back();
289 m_history
.pop_back();
290 m_current_quota
-= e
.amount
;
291 TORRENT_ASSERT(m_current_quota
>= 0);
293 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
294 m_log
<< std::setw(7) << total_milliseconds(time_now() - m_start
) << " - "
295 " queue: " << std::setw(3) << m_queue
.size()
296 << " used: " << std::setw(7) << m_current_quota
297 << " limit: " << std::setw(7) << m_limit
298 << " history: " << std::setw(3) << m_history
.size()
301 intrusive_ptr
<PeerConnection
> c
= e
.peer
;
303 shared_ptr
<Torrent
> t
= e
.tor
.lock();
305 if (!c
->is_disconnecting()) c
->expire_bandwidth(m_channel
, e
.amount
);
306 if (t
) t
->expire_bandwidth(m_channel
, e
.amount
);
310 // now, wait for the next chunk to expire
311 if (!m_history
.empty() && !m_abort
)
314 m_history_timer
.expires_at(m_history
.back().expires_at
, ec
);
315 m_history_timer
.async_wait(bind(&bandwidth_manager::on_history_expire
, this, _1
));
318 // since some bandwidth just expired, it
319 // means we can hand out more (in case there
320 // are still consumers in line)
321 if (!m_queue
.empty()) hand_out_bandwidth(l
);
324 void hand_out_bandwidth(boost::mutex::scoped_lock
& l
)
326 // if we're already handing out bandwidth, just return back
327 // to the loop further down on the callstack
328 if (m_in_hand_out_bandwidth
) return;
329 m_in_hand_out_bandwidth
= true;
330 // set it to false when exiting function
331 assign_at_exit
<bool> sg(m_in_hand_out_bandwidth
, false);
335 ptime
now(time_now());
339 // available bandwidth to hand out
340 int amount
= limit
- m_current_quota
;
342 if (amount
<= 0) return;
344 if (m_drain_quota
> 0)
346 int drain_amount
= (std::min
)(m_drain_quota
, amount
);
347 m_drain_quota
-= drain_amount
;
348 amount
-= drain_amount
;
349 add_history_entry(history_entry
<PeerConnection
, Torrent
>(
350 drain_amount
, now
+ bw_window_size
));
354 while (!m_queue
.empty() && amount
> 0)
356 bw_queue_entry
<PeerConnection
, Torrent
> qe
= m_queue
.front();
357 TORRENT_ASSERT(qe
.max_block_size
> 0);
360 shared_ptr
<Torrent
> t
= qe
.torrent
.lock();
362 if (qe
.peer
->is_disconnecting())
365 t
->expire_bandwidth(m_channel
, qe
.max_block_size
);
370 // at this point, max_assignable may actually be zero. Since
371 // the rate limit of the peer might have changed while it
373 int max_assignable
= qe
.peer
->max_assignable_bandwidth(m_channel
);
374 if (max_assignable
== 0)
376 TORRENT_ASSERT(is_in_history(qe
.peer
.get(), l
));
381 // this is the limit of the block size. It depends on the throttle
382 // so that it can be closer to optimal. Larger block sizes will give lower
383 // granularity to the rate but will be more efficient. At high rates
384 // the block sizes are bigger and at low rates, the granularity
385 // is more important and block sizes are smaller
387 // the minimum rate that can be given is the block size, so, the
388 // block size must be smaller for lower rates. This is because
389 // the history window is one second, and the block will be forgotten
391 int block_size
= (std::min
)(qe
.peer
->bandwidth_throttle(m_channel
)
394 if (block_size
< min_bandwidth_block_size
)
396 block_size
= (std::min
)(int(min_bandwidth_block_size
), limit
);
398 else if (block_size
> max_bandwidth_block_size
)
400 if (limit
== bandwidth_limit::inf
)
402 block_size
= max_bandwidth_block_size
;
406 // try to make the block_size a divisor of
407 // m_limit to make the distributions as fair
409 // TODO: move this calculcation to where the limit
412 / (limit
/ max_bandwidth_block_size
);
415 if (block_size
> qe
.max_block_size
) block_size
= qe
.max_block_size
;
417 if (amount
< block_size
/ 4)
420 // m_queue.push_front(qe);
424 // so, hand out max_assignable, but no more than
425 // the available bandwidth (amount) and no more
426 // than the max_bandwidth_block_size
427 int hand_out_amount
= (std::min
)((std::min
)(block_size
, max_assignable
)
429 TORRENT_ASSERT(hand_out_amount
> 0);
430 amount
-= hand_out_amount
;
431 TORRENT_ASSERT(hand_out_amount
<= qe
.max_block_size
);
433 t
->assign_bandwidth(m_channel
, hand_out_amount
, qe
.max_block_size
);
434 qe
.peer
->assign_bandwidth(m_channel
, hand_out_amount
);
436 add_history_entry(history_entry
<PeerConnection
, Torrent
>(
437 qe
.peer
, t
, hand_out_amount
, now
+ bw_window_size
));
439 if (!tmp
.empty()) m_queue
.insert(m_queue
.begin(), tmp
.begin(), tmp
.end());
443 typedef boost::mutex mutex_t
;
444 mutable mutex_t m_mutex
;
446 // the io_service used for the timer
449 // the timer that is waiting for the entries
450 // in the history queue to expire (slide out
451 // of the history window)
452 deadline_timer m_history_timer
;
454 // the rate limit (bytes per second)
457 // bytes to drain without handing out to a peer
458 // used to deduct the IP overhead
461 // the sum of all recently handed out bandwidth blocks
464 // these are the consumers that want bandwidth
465 typedef std::deque
<bw_queue_entry
<PeerConnection
, Torrent
> > queue_t
;
468 // these are the consumers that have received bandwidth
470 typedef std::deque
<history_entry
<PeerConnection
, Torrent
> > history_t
;
473 // this is the channel within the consumers
474 // that bandwidth is assigned to (upload or download)
477 // this is true while we're in the hand_out_bandwidth loop
478 // to prevent recursive invocations to interfere
479 bool m_in_hand_out_bandwidth
;
483 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT