makes all tracker requests 'stopped' when aborting
[libtorrent.git] / include / libtorrent / bandwidth_manager.hpp
blobef58337c5a907ad7a34e9616251ffd01a45b523a
1 /*
3 Copyright (c) 2007, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
34 #define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
36 #include <boost/shared_ptr.hpp>
37 #include <boost/intrusive_ptr.hpp>
38 #include <boost/function.hpp>
39 #include <boost/bind.hpp>
40 #include <boost/integer_traits.hpp>
41 #include <boost/thread/mutex.hpp>
42 #include <deque>
44 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
45 #include <fstream>
46 #endif
48 #include "libtorrent/socket.hpp"
49 #include "libtorrent/invariant_check.hpp"
50 #include "libtorrent/assert.hpp"
51 #include "libtorrent/bandwidth_limit.hpp"
52 #include "libtorrent/bandwidth_queue_entry.hpp"
54 using boost::weak_ptr;
55 using boost::shared_ptr;
56 using boost::intrusive_ptr;
57 using boost::bind;
60 namespace libtorrent {
62 // the maximum block of bandwidth quota to
63 // hand out is 33kB. The block size may
64 // be smaller on lower limits
65 enum
67 max_bandwidth_block_size = 33000,
68 min_bandwidth_block_size = 400
71 const time_duration bw_window_size = seconds(1);
73 template<class PeerConnection, class Torrent>
74 struct history_entry
76 history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
77 , int a, ptime exp)
78 : expires_at(exp), amount(a), peer(p), tor(t) {}
79 history_entry(int a, ptime exp)
80 : expires_at(exp), amount(a), peer(), tor() {}
81 ptime expires_at;
82 int amount;
83 intrusive_ptr<PeerConnection> peer;
84 weak_ptr<Torrent> tor;
87 template<class T>
88 T clamp(T val, T ceiling, T floor)
90 TORRENT_ASSERT(ceiling >= floor);
91 if (val >= ceiling) return ceiling;
92 else if (val <= floor) return floor;
93 return val;
96 template<class T>
97 struct assign_at_exit
99 assign_at_exit(T& var, T val): var_(var), val_(val) {}
100 ~assign_at_exit() { var_ = val_; }
101 T& var_;
102 T val_;
105 template<class PeerConnection, class Torrent>
106 struct bandwidth_manager
108 bandwidth_manager(io_service& ios, int channel
109 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
110 , bool log = false
111 #endif
113 : m_ios(ios)
114 , m_history_timer(m_ios)
115 , m_limit(bandwidth_limit::inf)
116 , m_drain_quota(0)
117 , m_current_quota(0)
118 , m_channel(channel)
119 , m_in_hand_out_bandwidth(false)
120 , m_abort(false)
122 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
123 if (log)
124 m_log.open("bandwidth_limiter.log", std::ios::trunc);
125 m_start = time_now();
126 #endif
129 void drain(int bytes)
131 mutex_t::scoped_lock l(m_mutex);
132 TORRENT_ASSERT(bytes >= 0);
133 m_drain_quota += bytes;
134 if (m_drain_quota > m_limit * 5) m_drain_quota = m_limit * 5;
137 void throttle(int limit)
139 mutex_t::scoped_lock l(m_mutex);
140 TORRENT_ASSERT(limit >= 0);
141 m_limit = limit;
144 int throttle() const
146 mutex_t::scoped_lock l(m_mutex);
147 return m_limit;
150 void close()
152 m_abort = true;
153 m_queue.clear();
154 m_history.clear();
155 m_current_quota = 0;
156 m_history_timer.cancel();
159 #ifndef NDEBUG
160 bool is_queued(PeerConnection const* peer) const
162 mutex_t::scoped_lock l(m_mutex);
163 return is_queued(peer);
166 bool is_queued(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
168 for (typename queue_t::const_iterator i = m_queue.begin()
169 , end(m_queue.end()); i != end; ++i)
171 if (i->peer.get() == peer) return true;
173 return false;
176 bool is_in_history(PeerConnection const* peer) const
178 mutex_t::scoped_lock l(m_mutex);
179 return is_in_history(peer, l);
182 bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
184 for (typename history_t::const_iterator i
185 = m_history.begin(), end(m_history.end()); i != end; ++i)
187 if (i->peer.get() == peer) return true;
189 return false;
191 #endif
193 int queue_size() const
195 mutex_t::scoped_lock l(m_mutex);
196 return m_queue.size();
199 // non prioritized means that, if there's a line for bandwidth,
200 // others will cut in front of the non-prioritized peers.
201 // this is used by web seeds
202 void request_bandwidth(intrusive_ptr<PeerConnection> const& peer
203 , int blk, int priority)
205 mutex_t::scoped_lock l(m_mutex);
206 INVARIANT_CHECK;
207 if (m_abort) return;
208 TORRENT_ASSERT(blk > 0);
209 TORRENT_ASSERT(!is_queued(peer.get(), l));
211 // make sure this peer isn't already in line
212 // waiting for bandwidth
213 TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
215 typename queue_t::reverse_iterator i(m_queue.rbegin());
216 while (i != m_queue.rend() && priority > i->priority)
218 ++i->priority;
219 ++i;
221 m_queue.insert(i.base(), bw_queue_entry<PeerConnection, Torrent>(peer, blk, priority));
222 if (!m_queue.empty()) hand_out_bandwidth(l);
225 #ifndef NDEBUG
226 void check_invariant() const
228 int current_quota = 0;
229 for (typename history_t::const_iterator i
230 = m_history.begin(), end(m_history.end()); i != end; ++i)
232 current_quota += i->amount;
234 TORRENT_ASSERT(current_quota == m_current_quota);
236 typename queue_t::const_iterator j = m_queue.begin();
237 if (j != m_queue.end())
239 ++j;
240 for (typename queue_t::const_iterator i = m_queue.begin()
241 , end(m_queue.end()); i != end && j != end; ++i, ++j)
242 TORRENT_ASSERT(i->priority >= j->priority);
245 #endif
247 private:
249 void add_history_entry(history_entry<PeerConnection, Torrent> const& e)
251 INVARIANT_CHECK;
253 m_history.push_front(e);
254 m_current_quota += e.amount;
255 // in case the size > 1 there is already a timer
256 // active that will be invoked, no need to set one up
258 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
259 m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " + "
260 " queue: " << std::setw(3) << m_queue.size()
261 << " used: " << std::setw(7) << m_current_quota
262 << " limit: " << std::setw(7) << m_limit
263 << " history: " << std::setw(3) << m_history.size()
264 << std::endl;
265 #endif
266 if (m_history.size() > 1) return;
268 if (m_abort) return;
270 error_code ec;
271 m_history_timer.expires_at(e.expires_at, ec);
272 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
275 void on_history_expire(error_code const& e)
277 if (e) return;
279 mutex_t::scoped_lock l(m_mutex);
280 INVARIANT_CHECK;
281 if (m_abort) return;
283 TORRENT_ASSERT(!m_history.empty());
285 ptime now(time_now());
286 while (!m_history.empty() && m_history.back().expires_at <= now)
288 history_entry<PeerConnection, Torrent> e = m_history.back();
289 m_history.pop_back();
290 m_current_quota -= e.amount;
291 TORRENT_ASSERT(m_current_quota >= 0);
293 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
294 m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " - "
295 " queue: " << std::setw(3) << m_queue.size()
296 << " used: " << std::setw(7) << m_current_quota
297 << " limit: " << std::setw(7) << m_limit
298 << " history: " << std::setw(3) << m_history.size()
299 << std::endl;
300 #endif
301 intrusive_ptr<PeerConnection> c = e.peer;
302 if (!c) continue;
303 shared_ptr<Torrent> t = e.tor.lock();
304 l.unlock();
305 if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
306 if (t) t->expire_bandwidth(m_channel, e.amount);
307 l.lock();
310 // now, wait for the next chunk to expire
311 if (!m_history.empty() && !m_abort)
313 error_code ec;
314 m_history_timer.expires_at(m_history.back().expires_at, ec);
315 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
318 // since some bandwidth just expired, it
319 // means we can hand out more (in case there
320 // are still consumers in line)
321 if (!m_queue.empty()) hand_out_bandwidth(l);
324 void hand_out_bandwidth(boost::mutex::scoped_lock& l)
326 // if we're already handing out bandwidth, just return back
327 // to the loop further down on the callstack
328 if (m_in_hand_out_bandwidth) return;
329 m_in_hand_out_bandwidth = true;
330 // set it to false when exiting function
331 assign_at_exit<bool> sg(m_in_hand_out_bandwidth, false);
333 INVARIANT_CHECK;
335 ptime now(time_now());
337 int limit = m_limit;
339 // available bandwidth to hand out
340 int amount = limit - m_current_quota;
342 if (amount <= 0) return;
344 if (m_drain_quota > 0)
346 int drain_amount = (std::min)(m_drain_quota, amount);
347 m_drain_quota -= drain_amount;
348 amount -= drain_amount;
349 add_history_entry(history_entry<PeerConnection, Torrent>(
350 drain_amount, now + bw_window_size));
353 queue_t tmp;
354 while (!m_queue.empty() && amount > 0)
356 bw_queue_entry<PeerConnection, Torrent> qe = m_queue.front();
357 TORRENT_ASSERT(qe.max_block_size > 0);
358 m_queue.pop_front();
360 shared_ptr<Torrent> t = qe.torrent.lock();
361 if (!t) continue;
362 if (qe.peer->is_disconnecting())
364 l.unlock();
365 t->expire_bandwidth(m_channel, qe.max_block_size);
366 l.lock();
367 continue;
370 // at this point, max_assignable may actually be zero. Since
371 // the rate limit of the peer might have changed while it
372 // was in the queue.
373 int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
374 if (max_assignable == 0)
376 TORRENT_ASSERT(is_in_history(qe.peer.get(), l));
377 tmp.push_back(qe);
378 continue;
381 // this is the limit of the block size. It depends on the throttle
382 // so that it can be closer to optimal. Larger block sizes will give lower
383 // granularity to the rate but will be more efficient. At high rates
384 // the block sizes are bigger and at low rates, the granularity
385 // is more important and block sizes are smaller
387 // the minimum rate that can be given is the block size, so, the
388 // block size must be smaller for lower rates. This is because
389 // the history window is one second, and the block will be forgotten
390 // after one second.
391 int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel)
392 , limit / 10);
394 if (block_size < min_bandwidth_block_size)
396 block_size = (std::min)(int(min_bandwidth_block_size), limit);
398 else if (block_size > max_bandwidth_block_size)
400 if (limit == bandwidth_limit::inf)
402 block_size = max_bandwidth_block_size;
404 else
406 // try to make the block_size a divisor of
407 // m_limit to make the distributions as fair
408 // as possible
409 // TODO: move this calculcation to where the limit
410 // is changed
411 block_size = limit
412 / (limit / max_bandwidth_block_size);
415 if (block_size > qe.max_block_size) block_size = qe.max_block_size;
417 if (amount < block_size / 4)
419 tmp.push_back(qe);
420 // m_queue.push_front(qe);
421 break;
424 // so, hand out max_assignable, but no more than
425 // the available bandwidth (amount) and no more
426 // than the max_bandwidth_block_size
427 int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
428 , amount);
429 TORRENT_ASSERT(hand_out_amount > 0);
430 amount -= hand_out_amount;
431 TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
432 l.unlock();
433 t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
434 qe.peer->assign_bandwidth(m_channel, hand_out_amount);
435 l.lock();
436 add_history_entry(history_entry<PeerConnection, Torrent>(
437 qe.peer, t, hand_out_amount, now + bw_window_size));
439 if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end());
443 typedef boost::mutex mutex_t;
444 mutable mutex_t m_mutex;
446 // the io_service used for the timer
447 io_service& m_ios;
449 // the timer that is waiting for the entries
450 // in the history queue to expire (slide out
451 // of the history window)
452 deadline_timer m_history_timer;
454 // the rate limit (bytes per second)
455 int m_limit;
457 // bytes to drain without handing out to a peer
458 // used to deduct the IP overhead
459 int m_drain_quota;
461 // the sum of all recently handed out bandwidth blocks
462 int m_current_quota;
464 // these are the consumers that want bandwidth
465 typedef std::deque<bw_queue_entry<PeerConnection, Torrent> > queue_t;
466 queue_t m_queue;
468 // these are the consumers that have received bandwidth
469 // that will expire
470 typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
471 history_t m_history;
473 // this is the channel within the consumers
474 // that bandwidth is assigned to (upload or download)
475 int m_channel;
477 // this is true while we're in the hand_out_bandwidth loop
478 // to prevent recursive invocations to interfere
479 bool m_in_hand_out_bandwidth;
481 bool m_abort;
483 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
484 std::ofstream m_log;
485 ptime m_start;
486 #endif
491 #endif