Upstream tarball 20080512
[amule.git] / src / UploadBandwidthThrottler.cpp
blob1d3804a9975bb58fd44f99752ebb0824139f50a4
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (C) 2005-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
20 //
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadBandwidthThrottler.h"
28 #include <protocol/ed2k/Constants.h>
29 #include <common/Macros.h>
30 #include <common/Constants.h>
32 #include <cmath>
33 #include <limits> // Do_not_auto_remove (NetBSD)
34 #include "OtherFunctions.h"
35 #include "ThrottledSocket.h"
36 #include "Logger.h"
37 #include "Preferences.h"
38 #include "amule.h"
39 #include "Statistics.h"
41 #ifndef _MSC_VER
43 #ifdef _UI64_MAX
44 #undef _UI64_MAX
45 #endif
47 #ifdef _I64_MAX
48 #undef _I64_MAX
49 #endif
51 const uint32 _UI32_MAX = std::numeric_limits<uint32>::max();
52 const sint32 _I32_MAX = std::numeric_limits<sint32>::max();
53 const uint64 _UI64_MAX = std::numeric_limits<uint64>::max();
54 const sint64 _I64_MAX = std::numeric_limits<sint64>::max();
56 #endif
58 /////////////////////////////////////
61 /**
62 * The constructor starts the thread.
64 UploadBandwidthThrottler::UploadBandwidthThrottler()
65 : wxThread( wxTHREAD_JOINABLE )
67 m_SentBytesSinceLastCall = 0;
68 m_SentBytesSinceLastCallOverhead = 0;
70 m_doRun = true;
72 Create();
73 Run();
77 /**
78 * The destructor stops the thread. If the thread has already stoppped, destructor does nothing.
80 UploadBandwidthThrottler::~UploadBandwidthThrottler()
82 EndThread();
86 /**
87 * Find out how many bytes that has been put on the sockets since the last call to this
88 * method. Includes overhead of control packets.
90 * @return the number of bytes that has been put on the sockets since the last call
92 uint64 UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset()
94 wxMutexLocker lock( m_sendLocker );
96 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCall;
97 m_SentBytesSinceLastCall = 0;
99 return numberOfSentBytesSinceLastCall;
103 * Find out how many bytes that has been put on the sockets since the last call to this
104 * method. Excludes overhead of control packets.
106 * @return the number of bytes that has been put on the sockets since the last call
108 uint64 UploadBandwidthThrottler::GetNumberOfSentBytesOverheadSinceLastCallAndReset()
110 wxMutexLocker lock( m_sendLocker );
112 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCallOverhead;
113 m_SentBytesSinceLastCallOverhead = 0;
115 return numberOfSentBytesSinceLastCall;
120 * Add a socket to the list of sockets that have upload slots. The main thread will
121 * continously call send on these sockets, to give them chance to work off their queues.
122 * The sockets are called in the order they exist in the list, so the top socket (index 0)
123 * will be given a chance first to use bandwidth, and then the next socket (index 1) etc.
125 * It is possible to add a socket several times to the list without removing it inbetween,
126 * but that should be avoided.
128 * @param index insert the socket at this place in the list. An index that is higher than the
129 * current number of sockets in the list will mean that the socket should be inserted
130 * last in the list.
132 * @param socket the address to the socket that should be added to the list. If the address is NULL,
133 * this method will do nothing.
135 void UploadBandwidthThrottler::AddToStandardList(uint32 index, ThrottledFileSocket* socket)
137 if ( socket ) {
138 wxMutexLocker lock( m_sendLocker );
140 RemoveFromStandardListNoLock(socket);
141 if (index > (uint32)m_StandardOrder_list.size()) {
142 index = m_StandardOrder_list.size();
145 m_StandardOrder_list.insert(m_StandardOrder_list.begin() + index, socket);
151 * Remove a socket from the list of sockets that have upload slots.
153 * If the socket has mistakenly been added several times to the list, this method
154 * will return all of the entries for the socket.
156 * @param socket the address of the socket that should be removed from the list. If this socket
157 * does not exist in the list, this method will do nothing.
159 bool UploadBandwidthThrottler::RemoveFromStandardList(ThrottledFileSocket* socket)
161 wxMutexLocker lock( m_sendLocker );
163 return RemoveFromStandardListNoLock(socket);
168 * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE!
169 * This is an internal method that doesn't take the necessary lock before it removes
170 * the socket. This method should only be called when the current thread already owns
171 * the m_sendLocker lock!
173 * @param socket address of the socket that should be removed from the list. If this socket
174 * does not exist in the list, this method will do nothing.
176 bool UploadBandwidthThrottler::RemoveFromStandardListNoLock(ThrottledFileSocket* socket)
178 return (EraseFirstValue( m_StandardOrder_list, socket ) > 0);
183 * Notifies the send thread that it should try to call controlpacket send
184 * for the supplied socket. It is allowed to call this method several times
185 * for the same socket, without having controlpacket send called for the socket
186 * first. The doublette entries are never filtered, since it is incurs less cpu
187 * overhead to simply call Send() in the socket for each double. Send() will
188 * already have done its work when the second Send() is called, and will just
189 * return with little cpu overhead.
191 * @param socket address to the socket that requests to have controlpacket send
192 * to be called on it
194 void UploadBandwidthThrottler::QueueForSendingControlPacket(ThrottledControlSocket* socket, bool hasSent)
196 // Get critical section
197 wxMutexLocker lock( m_tempQueueLocker );
199 if ( m_doRun ) {
200 if( hasSent ) {
201 m_TempControlQueueFirst_list.push_back(socket);
202 } else {
203 m_TempControlQueue_list.push_back(socket);
211 * Remove the socket from all lists and queues. This will make it safe to
212 * erase/delete the socket. It will also cause the main thread to stop calling
213 * send() for the socket.
215 * @param socket address to the socket that should be removed
217 void UploadBandwidthThrottler::DoRemoveFromAllQueues(ThrottledControlSocket* socket)
219 if ( m_doRun ) {
220 // Remove this socket from control packet queue
221 EraseValue( m_ControlQueue_list, socket );
222 EraseValue( m_ControlQueueFirst_list, socket );
224 wxMutexLocker lock( m_tempQueueLocker );
225 EraseValue( m_TempControlQueue_list, socket );
226 EraseValue( m_TempControlQueueFirst_list, socket );
231 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledControlSocket* socket)
233 wxMutexLocker lock( m_sendLocker );
235 DoRemoveFromAllQueues( socket );
239 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket* socket)
241 wxMutexLocker lock( m_sendLocker );
243 if (m_doRun) {
244 DoRemoveFromAllQueues(socket);
246 // And remove it from upload slots
247 RemoveFromStandardListNoLock(socket);
253 * Make the thread exit. This method will not return until the thread has stopped
254 * looping. This guarantees that the thread will not access the CEMSockets after this
255 * call has exited.
257 void UploadBandwidthThrottler::EndThread()
260 wxMutexLocker lock(m_sendLocker);
262 // signal the thread to stop looping and exit.
263 m_doRun = false;
266 Wait();
271 * The thread method that handles calling send for the individual sockets.
273 * Control packets will always be tried to be sent first. If there is any bandwidth leftover
274 * after that, send() for the upload slot sockets will be called in priority order until we have run
275 * out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent
276 * called for more than a defined amount of time (i.e. two seconds).
278 * @return always returns 0.
280 void* UploadBandwidthThrottler::Entry()
282 const uint32 TIME_BETWEEN_UPLOAD_LOOPS = 1;
284 uint32 lastLoopTick = ::GetTickCountFullRes();
285 sint64 realBytesToSpend = 0;
286 uint32 allowedDataRate = 0;
287 uint32 rememberedSlotCounter = 0;
288 uint32 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
290 while (m_doRun) {
291 uint32 timeSinceLastLoop = ::GetTickCountFullRes() - lastLoopTick;
293 // Get current speed from UploadSpeedSense
294 if (thePrefs::GetMaxUpload() == UNLIMITED) {
295 // Try to increase the upload rate
296 if (theApp->uploadqueue) {
297 allowedDataRate = (uint32)theStats::GetUploadRate() + 5 * 1024;
298 } else {
299 // App not created yet or already destroyed.
300 allowedDataRate = (uint32)(-1);
302 } else {
303 allowedDataRate = thePrefs::GetMaxUpload() * 1024;
306 uint32 minFragSize = 1300;
307 uint32 doubleSendSize = minFragSize*2; // send two packages at a time so they can share an ACK
308 if (allowedDataRate < 6*1024) {
309 minFragSize = 536;
310 doubleSendSize = minFragSize; // don't send two packages at a time at very low speeds to give them a smoother load
314 uint32 sleepTime;
315 if(allowedDataRate == 0 || allowedDataRate == _UI32_MAX || realBytesToSpend >= 1000) {
316 // we could send at once, but sleep a while to not suck up all cpu
317 sleepTime = extraSleepTime;
318 } else {
319 // sleep for just as long as we need to get back to having one byte to send
320 sleepTime = std::max((uint32)ceil((double)(-realBytesToSpend + 1000)/allowedDataRate), extraSleepTime);
323 if(timeSinceLastLoop < sleepTime) {
324 Sleep(sleepTime-timeSinceLastLoop);
327 const uint32 thisLoopTick = ::GetTickCountFullRes();
328 timeSinceLastLoop = thisLoopTick - lastLoopTick;
330 // Calculate how many bytes we can spend
331 sint64 bytesToSpend = 0;
333 if(allowedDataRate != 0 && allowedDataRate != _UI32_MAX) {
334 // prevent overflow
335 if(timeSinceLastLoop == 0) {
336 // no time has passed, so don't add any bytes. Shouldn't happen.
337 bytesToSpend = 0; //realBytesToSpend/1000;
338 } else if(_I64_MAX/timeSinceLastLoop > allowedDataRate && _I64_MAX-allowedDataRate*timeSinceLastLoop > realBytesToSpend) {
339 if(timeSinceLastLoop > sleepTime + 2000) {
340 AddDebugLogLineM(false, logGeneral, wxString::Format(wxT("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims"), timeSinceLastLoop, sleepTime, sleepTime + 2000));
342 timeSinceLastLoop = sleepTime + 2000;
343 lastLoopTick = thisLoopTick - timeSinceLastLoop;
346 realBytesToSpend += allowedDataRate*timeSinceLastLoop;
348 bytesToSpend = realBytesToSpend/1000;
349 } else {
350 realBytesToSpend = _I64_MAX;
351 bytesToSpend = _I32_MAX;
353 } else {
354 realBytesToSpend = 0; //_I64_MAX;
355 bytesToSpend = _I32_MAX;
358 lastLoopTick = thisLoopTick;
360 if(bytesToSpend >= 1) {
361 uint64 spentBytes = 0;
362 uint64 spentOverhead = 0;
364 wxMutexLocker sendLock(m_sendLocker);
367 wxMutexLocker queueLock(m_tempQueueLocker);
369 // are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list;
370 m_ControlQueueFirst_list.insert( m_ControlQueueFirst_list.end(),
371 m_TempControlQueueFirst_list.begin(),
372 m_TempControlQueueFirst_list.end() );
374 m_ControlQueue_list.insert( m_ControlQueue_list.end(),
375 m_TempControlQueue_list.begin(),
376 m_TempControlQueue_list.end() );
378 m_TempControlQueue_list.clear();
379 m_TempControlQueueFirst_list.clear();
382 // Send any queued up control packets first
383 while(bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend && (!m_ControlQueueFirst_list.empty() || !m_ControlQueue_list.empty())) {
384 ThrottledControlSocket* socket = NULL;
386 if(!m_ControlQueueFirst_list.empty()) {
387 socket = m_ControlQueueFirst_list.front();
388 m_ControlQueueFirst_list.pop_front();
389 } else if(!m_ControlQueue_list.empty()) {
390 socket = m_ControlQueue_list.front();
391 m_ControlQueue_list.pop_front();
394 if(socket != NULL) {
395 SocketSentBytes socketSentBytes = socket->SendControlData(bytesToSpend-spentBytes, minFragSize);
396 uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
397 spentBytes += lastSpentBytes;
398 spentOverhead += socketSentBytes.sentBytesControlPackets;
402 // Check if any sockets haven't gotten data for a long time. Then trickle them a package.
403 for ( uint32 slotCounter = 0; slotCounter < m_StandardOrder_list.size(); slotCounter++) {
404 ThrottledFileSocket* socket = m_StandardOrder_list[ slotCounter ];
406 if(socket != NULL) {
407 if(thisLoopTick-socket->GetLastCalledSend() > SEC2MS(1)) {
408 // trickle
409 uint32 neededBytes = socket->GetNeededBytes();
411 if(neededBytes > 0) {
412 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
413 uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
414 spentBytes += lastSpentBytes;
415 spentOverhead += socketSentBytes.sentBytesControlPackets;
418 } else {
419 AddDebugLogLineM(false, logGeneral, wxString::Format( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i"), slotCounter, m_StandardOrder_list.size()) );
423 // Equal bandwidth for all slots
424 uint32 maxSlot = m_StandardOrder_list.size();
425 if(maxSlot > 0 && allowedDataRate/maxSlot < UPLOAD_CLIENT_DATARATE) {
426 maxSlot = allowedDataRate/UPLOAD_CLIENT_DATARATE;
429 for(uint32 maxCounter = 0; maxCounter < std::min(maxSlot, (uint32)m_StandardOrder_list.size()) && bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend; maxCounter++) {
430 if(rememberedSlotCounter >= m_StandardOrder_list.size() ||
431 rememberedSlotCounter >= maxSlot) {
432 rememberedSlotCounter = 0;
435 ThrottledFileSocket* socket = m_StandardOrder_list[ rememberedSlotCounter ];
437 if(socket != NULL) {
438 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(std::min(doubleSendSize, (uint32)(bytesToSpend-spentBytes)), doubleSendSize);
439 uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
441 spentBytes += lastSpentBytes;
442 spentOverhead += socketSentBytes.sentBytesControlPackets;
443 } else {
444 AddDebugLogLineM(false, logGeneral, wxString::Format( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i"), rememberedSlotCounter, m_StandardOrder_list.size()));
447 rememberedSlotCounter++;
450 // Any bandwidth that hasn't been used yet are used first to last.
451 for(uint32 slotCounter = 0; slotCounter < m_StandardOrder_list.size() && bytesToSpend > 0 && spentBytes < (uint64)bytesToSpend; slotCounter++) {
452 ThrottledFileSocket* socket = m_StandardOrder_list[ slotCounter ];
454 if(socket != NULL) {
455 uint32 bytesToSpendTemp = bytesToSpend-spentBytes;
456 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(bytesToSpendTemp, doubleSendSize);
457 uint32 lastSpentBytes = socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
458 spentBytes += lastSpentBytes;
459 spentOverhead += socketSentBytes.sentBytesControlPackets;
460 } else {
461 AddDebugLogLineM( false, logGeneral, wxString::Format( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (fully activated)! Prevented usage. Index: %i Size: %i"), slotCounter, m_StandardOrder_list.size()));
464 realBytesToSpend -= spentBytes*1000;
466 if(realBytesToSpend < -(((sint64)m_StandardOrder_list.size()+1)*minFragSize)*1000) {
467 sint64 newRealBytesToSpend = -(((sint64)m_StandardOrder_list.size()+1)*minFragSize)*1000;
469 realBytesToSpend = newRealBytesToSpend;
470 } else {
471 uint64 bandwidthSavedTolerance = m_StandardOrder_list.size()*512*1000;
472 if(realBytesToSpend > 0 && (uint64)realBytesToSpend > 999+bandwidthSavedTolerance) {
473 sint64 newRealBytesToSpend = 999+bandwidthSavedTolerance;
474 realBytesToSpend = newRealBytesToSpend;
478 m_SentBytesSinceLastCall += spentBytes;
479 m_SentBytesSinceLastCallOverhead += spentOverhead;
481 if ((spentBytes == 0) && (spentOverhead == 0)) {
482 extraSleepTime = std::min<uint32>(extraSleepTime * 5, 1000); // 1s at most
483 } else {
484 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
490 wxMutexLocker queueLock(m_tempQueueLocker);
491 m_TempControlQueue_list.clear();
492 m_TempControlQueueFirst_list.clear();
495 wxMutexLocker sendLock(m_sendLocker);
496 m_ControlQueue_list.clear();
497 m_StandardOrder_list.clear();
499 return 0;
501 // File_checked_for_headers