added some precautionary checks in bdecoder
[libtorrent.git] / include / libtorrent / bandwidth_manager.hpp
blobb548603bedc8804793d86a5b4374f17aa05b2378
1 /*
3 Copyright (c) 2007, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #ifndef TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
34 #define TORRENT_BANDWIDTH_MANAGER_HPP_INCLUDED
36 #include <boost/shared_ptr.hpp>
37 #include <boost/intrusive_ptr.hpp>
38 #include <boost/function.hpp>
39 #include <boost/bind.hpp>
40 #include <boost/integer_traits.hpp>
41 #include <boost/thread/mutex.hpp>
42 #include <deque>
44 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
45 #include <fstream>
46 #endif
48 #include "libtorrent/socket.hpp"
49 #include "libtorrent/invariant_check.hpp"
50 #include "libtorrent/assert.hpp"
51 #include "libtorrent/bandwidth_limit.hpp"
52 #include "libtorrent/bandwidth_queue_entry.hpp"
54 using boost::weak_ptr;
55 using boost::shared_ptr;
56 using boost::intrusive_ptr;
57 using boost::bind;
60 namespace libtorrent {
62 // the maximum block of bandwidth quota to
63 // hand out is 33kB. The block size may
64 // be smaller on lower limits
65 enum
67 max_bandwidth_block_size = 33000,
68 min_bandwidth_block_size = 400
71 const time_duration bw_window_size = seconds(1);
73 template<class PeerConnection, class Torrent>
74 struct history_entry
76 history_entry(intrusive_ptr<PeerConnection> p, weak_ptr<Torrent> t
77 , int a, ptime exp)
78 : expires_at(exp), amount(a), peer(p), tor(t) {}
79 history_entry(int a, ptime exp)
80 : expires_at(exp), amount(a), peer(), tor() {}
81 ptime expires_at;
82 int amount;
83 intrusive_ptr<PeerConnection> peer;
84 weak_ptr<Torrent> tor;
87 template<class T>
88 T clamp(T val, T ceiling, T floor)
90 TORRENT_ASSERT(ceiling >= floor);
91 if (val >= ceiling) return ceiling;
92 else if (val <= floor) return floor;
93 return val;
96 template<class T>
97 struct assign_at_exit
99 assign_at_exit(T& var, T val): var_(var), val_(val) {}
100 ~assign_at_exit() { var_ = val_; }
101 T& var_;
102 T val_;
105 template<class PeerConnection, class Torrent>
106 struct bandwidth_manager
108 bandwidth_manager(io_service& ios, int channel
109 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
110 , bool log = false
111 #endif
113 : m_ios(ios)
114 , m_history_timer(m_ios)
115 , m_limit(bandwidth_limit::inf)
116 , m_drain_quota(0)
117 , m_current_quota(0)
118 , m_channel(channel)
119 , m_in_hand_out_bandwidth(false)
120 , m_abort(false)
122 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
123 if (log)
124 m_log.open("bandwidth_limiter.log", std::ios::trunc);
125 m_start = time_now();
126 #endif
129 void drain(int bytes)
131 mutex_t::scoped_lock l(m_mutex);
132 TORRENT_ASSERT(bytes >= 0);
133 m_drain_quota += bytes;
134 if (m_drain_quota > m_limit * 5) m_drain_quota = m_limit * 5;
137 void throttle(int limit)
139 mutex_t::scoped_lock l(m_mutex);
140 TORRENT_ASSERT(limit >= 0);
141 m_limit = limit;
144 int throttle() const
146 mutex_t::scoped_lock l(m_mutex);
147 return m_limit;
150 void close()
152 m_abort = true;
153 m_queue.clear();
154 m_history.clear();
155 m_current_quota = 0;
156 m_history_timer.cancel();
159 #ifndef NDEBUG
160 bool is_queued(PeerConnection const* peer) const
162 mutex_t::scoped_lock l(m_mutex);
163 return is_queued(peer);
166 bool is_queued(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
168 for (typename queue_t::const_iterator i = m_queue.begin()
169 , end(m_queue.end()); i != end; ++i)
171 if (i->peer.get() == peer) return true;
173 return false;
176 bool is_in_history(PeerConnection const* peer) const
178 mutex_t::scoped_lock l(m_mutex);
179 return is_in_history(peer, l);
182 bool is_in_history(PeerConnection const* peer, boost::mutex::scoped_lock& l) const
184 for (typename history_t::const_iterator i
185 = m_history.begin(), end(m_history.end()); i != end; ++i)
187 if (i->peer.get() == peer) return true;
189 return false;
191 #endif
193 int queue_size() const
195 mutex_t::scoped_lock l(m_mutex);
196 return m_queue.size();
199 // non prioritized means that, if there's a line for bandwidth,
200 // others will cut in front of the non-prioritized peers.
201 // this is used by web seeds
202 void request_bandwidth(intrusive_ptr<PeerConnection> const& peer
203 , int blk, int priority)
205 mutex_t::scoped_lock l(m_mutex);
206 INVARIANT_CHECK;
207 if (m_abort) return;
208 TORRENT_ASSERT(blk > 0);
209 TORRENT_ASSERT(!is_queued(peer.get(), l));
211 // make sure this peer isn't already in line
212 // waiting for bandwidth
213 TORRENT_ASSERT(peer->max_assignable_bandwidth(m_channel) > 0);
215 typename queue_t::reverse_iterator i(m_queue.rbegin());
216 while (i != m_queue.rend() && priority > i->priority)
218 ++i->priority;
219 ++i;
221 m_queue.insert(i.base(), bw_queue_entry<PeerConnection, Torrent>(peer, blk, priority));
222 if (!m_queue.empty()) hand_out_bandwidth(l);
225 #ifndef NDEBUG
226 void check_invariant() const
228 int current_quota = 0;
229 for (typename history_t::const_iterator i
230 = m_history.begin(), end(m_history.end()); i != end; ++i)
232 current_quota += i->amount;
234 TORRENT_ASSERT(current_quota == m_current_quota);
236 typename queue_t::const_iterator j = m_queue.begin();
237 if (j != m_queue.end())
239 ++j;
240 for (typename queue_t::const_iterator i = m_queue.begin()
241 , end(m_queue.end()); i != end && j != end; ++i, ++j)
242 TORRENT_ASSERT(i->priority >= j->priority);
245 #endif
247 private:
249 void add_history_entry(history_entry<PeerConnection, Torrent> const& e)
251 INVARIANT_CHECK;
253 m_history.push_front(e);
254 m_current_quota += e.amount;
255 // in case the size > 1 there is already a timer
256 // active that will be invoked, no need to set one up
258 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
259 m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " + "
260 " queue: " << std::setw(3) << m_queue.size()
261 << " used: " << std::setw(7) << m_current_quota
262 << " limit: " << std::setw(7) << m_limit
263 << " history: " << std::setw(3) << m_history.size()
264 << std::endl;
265 #endif
266 if (m_history.size() > 1) return;
268 if (m_abort) return;
270 error_code ec;
271 TORRENT_ASSERT(e.expires_at > time_now());
272 m_history_timer.expires_at(e.expires_at, ec);
273 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
276 void on_history_expire(error_code const& e)
278 if (e) return;
280 mutex_t::scoped_lock l(m_mutex);
281 INVARIANT_CHECK;
282 if (m_abort) return;
284 TORRENT_ASSERT(!m_history.empty());
286 ptime now(time_now());
287 while (!m_history.empty() && m_history.back().expires_at <= now)
289 history_entry<PeerConnection, Torrent> e = m_history.back();
290 m_history.pop_back();
291 m_current_quota -= e.amount;
292 TORRENT_ASSERT(m_current_quota >= 0);
294 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
295 m_log << std::setw(7) << total_milliseconds(time_now() - m_start) << " - "
296 " queue: " << std::setw(3) << m_queue.size()
297 << " used: " << std::setw(7) << m_current_quota
298 << " limit: " << std::setw(7) << m_limit
299 << " history: " << std::setw(3) << m_history.size()
300 << std::endl;
301 #endif
302 intrusive_ptr<PeerConnection> c = e.peer;
303 if (!c) continue;
304 shared_ptr<Torrent> t = e.tor.lock();
305 l.unlock();
306 if (!c->is_disconnecting()) c->expire_bandwidth(m_channel, e.amount);
307 if (t) t->expire_bandwidth(m_channel, e.amount);
308 l.lock();
311 // now, wait for the next chunk to expire
312 if (!m_history.empty() && !m_abort)
314 error_code ec;
315 TORRENT_ASSERT(m_history.back().expires_at > now);
316 m_history_timer.expires_at(m_history.back().expires_at, ec);
317 m_history_timer.async_wait(bind(&bandwidth_manager::on_history_expire, this, _1));
320 // since some bandwidth just expired, it
321 // means we can hand out more (in case there
322 // are still consumers in line)
323 if (!m_queue.empty()) hand_out_bandwidth(l);
326 void hand_out_bandwidth(boost::mutex::scoped_lock& l)
328 // if we're already handing out bandwidth, just return back
329 // to the loop further down on the callstack
330 if (m_in_hand_out_bandwidth) return;
331 m_in_hand_out_bandwidth = true;
332 // set it to false when exiting function
333 assign_at_exit<bool> sg(m_in_hand_out_bandwidth, false);
335 INVARIANT_CHECK;
337 ptime now(time_now());
339 int limit = m_limit;
341 // available bandwidth to hand out
342 int amount = limit - m_current_quota;
344 if (amount <= 0) return;
346 if (m_drain_quota > 0)
348 int drain_amount = (std::min)(m_drain_quota, amount);
349 m_drain_quota -= drain_amount;
350 amount -= drain_amount;
351 add_history_entry(history_entry<PeerConnection, Torrent>(
352 drain_amount, now + bw_window_size));
355 queue_t tmp;
356 while (!m_queue.empty() && amount > 0)
358 bw_queue_entry<PeerConnection, Torrent> qe = m_queue.front();
359 TORRENT_ASSERT(qe.max_block_size > 0);
360 m_queue.pop_front();
362 shared_ptr<Torrent> t = qe.torrent.lock();
363 if (!t) continue;
364 if (qe.peer->is_disconnecting())
366 l.unlock();
367 t->expire_bandwidth(m_channel, qe.max_block_size);
368 l.lock();
369 continue;
372 // at this point, max_assignable may actually be zero. Since
373 // the rate limit of the peer might have changed while it
374 // was in the queue.
375 int max_assignable = qe.peer->max_assignable_bandwidth(m_channel);
376 if (max_assignable == 0)
378 TORRENT_ASSERT(is_in_history(qe.peer.get(), l));
379 tmp.push_back(qe);
380 continue;
383 // this is the limit of the block size. It depends on the throttle
384 // so that it can be closer to optimal. Larger block sizes will give lower
385 // granularity to the rate but will be more efficient. At high rates
386 // the block sizes are bigger and at low rates, the granularity
387 // is more important and block sizes are smaller
389 // the minimum rate that can be given is the block size, so, the
390 // block size must be smaller for lower rates. This is because
391 // the history window is one second, and the block will be forgotten
392 // after one second.
393 int block_size = (std::min)(qe.peer->bandwidth_throttle(m_channel)
394 , limit / 10);
396 if (block_size < min_bandwidth_block_size)
398 block_size = (std::min)(int(min_bandwidth_block_size), limit);
400 else if (block_size > max_bandwidth_block_size)
402 if (limit == bandwidth_limit::inf)
404 block_size = max_bandwidth_block_size;
406 else
408 // try to make the block_size a divisor of
409 // m_limit to make the distributions as fair
410 // as possible
411 // TODO: move this calculcation to where the limit
412 // is changed
413 block_size = limit
414 / (limit / max_bandwidth_block_size);
417 if (block_size > qe.max_block_size) block_size = qe.max_block_size;
419 if (amount < block_size / 4)
421 tmp.push_back(qe);
422 // m_queue.push_front(qe);
423 break;
426 // so, hand out max_assignable, but no more than
427 // the available bandwidth (amount) and no more
428 // than the max_bandwidth_block_size
429 int hand_out_amount = (std::min)((std::min)(block_size, max_assignable)
430 , amount);
431 TORRENT_ASSERT(hand_out_amount > 0);
432 amount -= hand_out_amount;
433 TORRENT_ASSERT(hand_out_amount <= qe.max_block_size);
434 l.unlock();
435 t->assign_bandwidth(m_channel, hand_out_amount, qe.max_block_size);
436 qe.peer->assign_bandwidth(m_channel, hand_out_amount);
437 l.lock();
438 add_history_entry(history_entry<PeerConnection, Torrent>(
439 qe.peer, t, hand_out_amount, now + bw_window_size));
441 if (!tmp.empty()) m_queue.insert(m_queue.begin(), tmp.begin(), tmp.end());
445 typedef boost::mutex mutex_t;
446 mutable mutex_t m_mutex;
448 // the io_service used for the timer
449 io_service& m_ios;
451 // the timer that is waiting for the entries
452 // in the history queue to expire (slide out
453 // of the history window)
454 deadline_timer m_history_timer;
456 // the rate limit (bytes per second)
457 int m_limit;
459 // bytes to drain without handing out to a peer
460 // used to deduct the IP overhead
461 int m_drain_quota;
463 // the sum of all recently handed out bandwidth blocks
464 int m_current_quota;
466 // these are the consumers that want bandwidth
467 typedef std::deque<bw_queue_entry<PeerConnection, Torrent> > queue_t;
468 queue_t m_queue;
470 // these are the consumers that have received bandwidth
471 // that will expire
472 typedef std::deque<history_entry<PeerConnection, Torrent> > history_t;
473 history_t m_history;
475 // this is the channel within the consumers
476 // that bandwidth is assigned to (upload or download)
477 int m_channel;
479 // this is true while we're in the hand_out_bandwidth loop
480 // to prevent recursive invocations to interfere
481 bool m_in_hand_out_bandwidth;
483 bool m_abort;
485 #ifdef TORRENT_VERBOSE_BANDWIDTH_LIMIT
486 std::ofstream m_log;
487 ptime m_start;
488 #endif
493 #endif