2 // This file is part of the aMule Project.
4 // Copyright (c) 2005-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2008 Merkur ( devs@emule-project.net / http://www.emule-project.net )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadBandwidthThrottler.h"
28 #include <protocol/ed2k/Constants.h>
29 #include <common/Macros.h>
30 #include <common/Constants.h>
33 #include <limits> // Do_not_auto_remove (NetBSD)
34 #include "OtherFunctions.h"
35 #include "ThrottledSocket.h"
37 #include "Preferences.h"
38 #include "Statistics.h"
50 const uint32 _UI32_MAX
= std::numeric_limits
<uint32
>::max();
51 const sint32 _I32_MAX
= std::numeric_limits
<sint32
>::max();
52 const uint64 _UI64_MAX
= std::numeric_limits
<uint64
>::max();
53 const sint64 _I64_MAX
= std::numeric_limits
<sint64
>::max();
57 /////////////////////////////////////
61 * The constructor starts the thread.
63 UploadBandwidthThrottler::UploadBandwidthThrottler()
64 : wxThread( wxTHREAD_JOINABLE
)
66 m_SentBytesSinceLastCall
= 0;
67 m_SentBytesSinceLastCallOverhead
= 0;
77 * The destructor stops the thread. If the thread has already stoppped, destructor does nothing.
79 UploadBandwidthThrottler::~UploadBandwidthThrottler()
86 * Find out how many bytes that has been put on the sockets since the last call to this
87 * method. Includes overhead of control packets.
89 * @return the number of bytes that has been put on the sockets since the last call
91 uint64
UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset()
93 wxMutexLocker
lock( m_sendLocker
);
95 uint64 numberOfSentBytesSinceLastCall
= m_SentBytesSinceLastCall
;
96 m_SentBytesSinceLastCall
= 0;
98 return numberOfSentBytesSinceLastCall
;
102 * Find out how many bytes that has been put on the sockets since the last call to this
103 * method. Excludes overhead of control packets.
105 * @return the number of bytes that has been put on the sockets since the last call
107 uint64
UploadBandwidthThrottler::GetNumberOfSentBytesOverheadSinceLastCallAndReset()
109 wxMutexLocker
lock( m_sendLocker
);
111 uint64 numberOfSentBytesSinceLastCall
= m_SentBytesSinceLastCallOverhead
;
112 m_SentBytesSinceLastCallOverhead
= 0;
114 return numberOfSentBytesSinceLastCall
;
119 * Add a socket to the list of sockets that have upload slots. The main thread will
120 * continously call send on these sockets, to give them chance to work off their queues.
121 * The sockets are called in the order they exist in the list, so the top socket (index 0)
122 * will be given a chance first to use bandwidth, and then the next socket (index 1) etc.
124 * It is possible to add a socket several times to the list without removing it inbetween,
125 * but that should be avoided.
127 * @param index insert the socket at this place in the list. An index that is higher than the
128 * current number of sockets in the list will mean that the socket should be inserted
131 * @param socket the address to the socket that should be added to the list. If the address is NULL,
132 * this method will do nothing.
134 void UploadBandwidthThrottler::AddToStandardList(uint32 index
, ThrottledFileSocket
* socket
)
137 wxMutexLocker
lock( m_sendLocker
);
139 RemoveFromStandardListNoLock(socket
);
140 if (index
> (uint32
)m_StandardOrder_list
.size()) {
141 index
= m_StandardOrder_list
.size();
144 m_StandardOrder_list
.insert(m_StandardOrder_list
.begin() + index
, socket
);
150 * Remove a socket from the list of sockets that have upload slots.
152 * If the socket has mistakenly been added several times to the list, this method
153 * will return all of the entries for the socket.
155 * @param socket the address of the socket that should be removed from the list. If this socket
156 * does not exist in the list, this method will do nothing.
158 bool UploadBandwidthThrottler::RemoveFromStandardList(ThrottledFileSocket
* socket
)
160 wxMutexLocker
lock( m_sendLocker
);
162 return RemoveFromStandardListNoLock(socket
);
167 * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE!
168 * This is an internal method that doesn't take the necessary lock before it removes
169 * the socket. This method should only be called when the current thread already owns
170 * the m_sendLocker lock!
172 * @param socket address of the socket that should be removed from the list. If this socket
173 * does not exist in the list, this method will do nothing.
175 bool UploadBandwidthThrottler::RemoveFromStandardListNoLock(ThrottledFileSocket
* socket
)
177 return (EraseFirstValue( m_StandardOrder_list
, socket
) > 0);
182 * Notifies the send thread that it should try to call controlpacket send
183 * for the supplied socket. It is allowed to call this method several times
184 * for the same socket, without having controlpacket send called for the socket
185 * first. The doublette entries are never filtered, since it is incurs less cpu
186 * overhead to simply call Send() in the socket for each double. Send() will
187 * already have done its work when the second Send() is called, and will just
188 * return with little cpu overhead.
190 * @param socket address to the socket that requests to have controlpacket send
193 void UploadBandwidthThrottler::QueueForSendingControlPacket(ThrottledControlSocket
* socket
, bool hasSent
)
195 // Get critical section
196 wxMutexLocker
lock( m_tempQueueLocker
);
200 m_TempControlQueueFirst_list
.push_back(socket
);
202 m_TempControlQueue_list
.push_back(socket
);
210 * Remove the socket from all lists and queues. This will make it safe to
211 * erase/delete the socket. It will also cause the main thread to stop calling
212 * send() for the socket.
214 * @param socket address to the socket that should be removed
216 void UploadBandwidthThrottler::DoRemoveFromAllQueues(ThrottledControlSocket
* socket
)
219 // Remove this socket from control packet queue
220 EraseValue( m_ControlQueue_list
, socket
);
221 EraseValue( m_ControlQueueFirst_list
, socket
);
223 wxMutexLocker
lock( m_tempQueueLocker
);
224 EraseValue( m_TempControlQueue_list
, socket
);
225 EraseValue( m_TempControlQueueFirst_list
, socket
);
230 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledControlSocket
* socket
)
232 wxMutexLocker
lock( m_sendLocker
);
234 DoRemoveFromAllQueues( socket
);
238 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket
* socket
)
240 wxMutexLocker
lock( m_sendLocker
);
243 DoRemoveFromAllQueues(socket
);
245 // And remove it from upload slots
246 RemoveFromStandardListNoLock(socket
);
252 * Make the thread exit. This method will not return until the thread has stopped
253 * looping. This guarantees that the thread will not access the CEMSockets after this
256 void UploadBandwidthThrottler::EndThread()
258 if (m_doRun
) { // do it only once
260 wxMutexLocker
lock(m_sendLocker
);
262 // signal the thread to stop looping and exit.
272 * The thread method that handles calling send for the individual sockets.
274 * Control packets will always be tried to be sent first. If there is any bandwidth leftover
275 * after that, send() for the upload slot sockets will be called in priority order until we have run
276 * out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent
277 * called for more than a defined amount of time (i.e. two seconds).
279 * @return always returns 0.
281 void* UploadBandwidthThrottler::Entry()
283 const uint32 TIME_BETWEEN_UPLOAD_LOOPS
= 1;
285 uint32 lastLoopTick
= ::GetTickCountFullRes();
286 sint64 realBytesToSpend
= 0;
287 uint32 allowedDataRate
= 0;
288 uint32 rememberedSlotCounter
= 0;
289 uint32 extraSleepTime
= TIME_BETWEEN_UPLOAD_LOOPS
;
292 uint32 timeSinceLastLoop
= ::GetTickCountFullRes() - lastLoopTick
;
294 // Get current speed from UploadSpeedSense
295 if (thePrefs::GetMaxUpload() == UNLIMITED
) {
296 // Try to increase the upload rate
297 allowedDataRate
= (uint32
)theStats::GetUploadRate() + 5 * 1024;
299 allowedDataRate
= thePrefs::GetMaxUpload() * 1024;
302 uint32 minFragSize
= 1300;
303 uint32 doubleSendSize
= minFragSize
*2; // send two packages at a time so they can share an ACK
304 if (allowedDataRate
< 6*1024) {
306 doubleSendSize
= minFragSize
; // don't send two packages at a time at very low speeds to give them a smoother load
311 if(allowedDataRate
== 0 || allowedDataRate
== _UI32_MAX
|| realBytesToSpend
>= 1000) {
312 // we could send at once, but sleep a while to not suck up all cpu
313 sleepTime
= extraSleepTime
;
315 // sleep for just as long as we need to get back to having one byte to send
316 sleepTime
= std::max((uint32
)ceil((double)(-realBytesToSpend
+ 1000)/allowedDataRate
), extraSleepTime
);
319 if(timeSinceLastLoop
< sleepTime
) {
320 Sleep(sleepTime
-timeSinceLastLoop
);
323 const uint32 thisLoopTick
= ::GetTickCountFullRes();
324 timeSinceLastLoop
= thisLoopTick
- lastLoopTick
;
326 // Calculate how many bytes we can spend
327 sint64 bytesToSpend
= 0;
329 if(allowedDataRate
!= 0 && allowedDataRate
!= _UI32_MAX
) {
331 if(timeSinceLastLoop
== 0) {
332 // no time has passed, so don't add any bytes. Shouldn't happen.
333 bytesToSpend
= 0; //realBytesToSpend/1000;
334 } else if(_I64_MAX
/timeSinceLastLoop
> allowedDataRate
&& _I64_MAX
-allowedDataRate
*timeSinceLastLoop
> realBytesToSpend
) {
335 if(timeSinceLastLoop
> sleepTime
+ 2000) {
336 AddDebugLogLineM(false, logGeneral
, CFormat(wxT("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims"))
337 % timeSinceLastLoop
% sleepTime
% (sleepTime
+ 2000));
339 timeSinceLastLoop
= sleepTime
+ 2000;
340 lastLoopTick
= thisLoopTick
- timeSinceLastLoop
;
343 realBytesToSpend
+= allowedDataRate
*timeSinceLastLoop
;
345 bytesToSpend
= realBytesToSpend
/1000;
347 realBytesToSpend
= _I64_MAX
;
348 bytesToSpend
= _I32_MAX
;
351 realBytesToSpend
= 0; //_I64_MAX;
352 bytesToSpend
= _I32_MAX
;
355 lastLoopTick
= thisLoopTick
;
357 if(bytesToSpend
>= 1) {
358 uint64 spentBytes
= 0;
359 uint64 spentOverhead
= 0;
361 wxMutexLocker
sendLock(m_sendLocker
);
364 wxMutexLocker
queueLock(m_tempQueueLocker
);
366 // are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list;
367 m_ControlQueueFirst_list
.insert( m_ControlQueueFirst_list
.end(),
368 m_TempControlQueueFirst_list
.begin(),
369 m_TempControlQueueFirst_list
.end() );
371 m_ControlQueue_list
.insert( m_ControlQueue_list
.end(),
372 m_TempControlQueue_list
.begin(),
373 m_TempControlQueue_list
.end() );
375 m_TempControlQueue_list
.clear();
376 m_TempControlQueueFirst_list
.clear();
379 // Send any queued up control packets first
380 while(bytesToSpend
> 0 && spentBytes
< (uint64
)bytesToSpend
&& (!m_ControlQueueFirst_list
.empty() || !m_ControlQueue_list
.empty())) {
381 ThrottledControlSocket
* socket
= NULL
;
383 if(!m_ControlQueueFirst_list
.empty()) {
384 socket
= m_ControlQueueFirst_list
.front();
385 m_ControlQueueFirst_list
.pop_front();
386 } else if(!m_ControlQueue_list
.empty()) {
387 socket
= m_ControlQueue_list
.front();
388 m_ControlQueue_list
.pop_front();
392 SocketSentBytes socketSentBytes
= socket
->SendControlData(bytesToSpend
-spentBytes
, minFragSize
);
393 uint32 lastSpentBytes
= socketSentBytes
.sentBytesControlPackets
+ socketSentBytes
.sentBytesStandardPackets
;
394 spentBytes
+= lastSpentBytes
;
395 spentOverhead
+= socketSentBytes
.sentBytesControlPackets
;
399 // Check if any sockets haven't gotten data for a long time. Then trickle them a package.
400 for ( uint32 slotCounter
= 0; slotCounter
< m_StandardOrder_list
.size(); slotCounter
++) {
401 ThrottledFileSocket
* socket
= m_StandardOrder_list
[ slotCounter
];
404 if(thisLoopTick
-socket
->GetLastCalledSend() > SEC2MS(1)) {
406 uint32 neededBytes
= socket
->GetNeededBytes();
408 if(neededBytes
> 0) {
409 SocketSentBytes socketSentBytes
= socket
->SendFileAndControlData(neededBytes
, minFragSize
);
410 uint32 lastSpentBytes
= socketSentBytes
.sentBytesControlPackets
+ socketSentBytes
.sentBytesStandardPackets
;
411 spentBytes
+= lastSpentBytes
;
412 spentOverhead
+= socketSentBytes
.sentBytesControlPackets
;
416 AddDebugLogLineM(false, logGeneral
, CFormat( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i"))
417 % slotCounter
% m_StandardOrder_list
.size());
421 // Equal bandwidth for all slots
422 uint32 maxSlot
= m_StandardOrder_list
.size();
423 if(maxSlot
> 0 && allowedDataRate
/maxSlot
< UPLOAD_CLIENT_DATARATE
) {
424 maxSlot
= allowedDataRate
/UPLOAD_CLIENT_DATARATE
;
427 for(uint32 maxCounter
= 0; maxCounter
< std::min(maxSlot
, (uint32
)m_StandardOrder_list
.size()) && bytesToSpend
> 0 && spentBytes
< (uint64
)bytesToSpend
; maxCounter
++) {
428 if(rememberedSlotCounter
>= m_StandardOrder_list
.size() ||
429 rememberedSlotCounter
>= maxSlot
) {
430 rememberedSlotCounter
= 0;
433 ThrottledFileSocket
* socket
= m_StandardOrder_list
[ rememberedSlotCounter
];
436 SocketSentBytes socketSentBytes
= socket
->SendFileAndControlData(std::min(doubleSendSize
, (uint32
)(bytesToSpend
-spentBytes
)), doubleSendSize
);
437 uint32 lastSpentBytes
= socketSentBytes
.sentBytesControlPackets
+ socketSentBytes
.sentBytesStandardPackets
;
439 spentBytes
+= lastSpentBytes
;
440 spentOverhead
+= socketSentBytes
.sentBytesControlPackets
;
442 AddDebugLogLineM(false, logGeneral
, CFormat(wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i"))
443 % rememberedSlotCounter
% m_StandardOrder_list
.size());
446 rememberedSlotCounter
++;
449 // Any bandwidth that hasn't been used yet are used first to last.
450 for(uint32 slotCounter
= 0; slotCounter
< m_StandardOrder_list
.size() && bytesToSpend
> 0 && spentBytes
< (uint64
)bytesToSpend
; slotCounter
++) {
451 ThrottledFileSocket
* socket
= m_StandardOrder_list
[ slotCounter
];
454 uint32 bytesToSpendTemp
= bytesToSpend
-spentBytes
;
455 SocketSentBytes socketSentBytes
= socket
->SendFileAndControlData(bytesToSpendTemp
, doubleSendSize
);
456 uint32 lastSpentBytes
= socketSentBytes
.sentBytesControlPackets
+ socketSentBytes
.sentBytesStandardPackets
;
457 spentBytes
+= lastSpentBytes
;
458 spentOverhead
+= socketSentBytes
.sentBytesControlPackets
;
460 AddDebugLogLineM(false, logGeneral
, CFormat(wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (fully activated)! Prevented usage. Index: %i Size: %i"))
461 % slotCounter
% m_StandardOrder_list
.size());
464 realBytesToSpend
-= spentBytes
*1000;
466 if(realBytesToSpend
< -(((sint64
)m_StandardOrder_list
.size()+1)*minFragSize
)*1000) {
467 sint64 newRealBytesToSpend
= -(((sint64
)m_StandardOrder_list
.size()+1)*minFragSize
)*1000;
469 realBytesToSpend
= newRealBytesToSpend
;
471 uint64 bandwidthSavedTolerance
= m_StandardOrder_list
.size()*512*1000;
472 if(realBytesToSpend
> 0 && (uint64
)realBytesToSpend
> 999+bandwidthSavedTolerance
) {
473 sint64 newRealBytesToSpend
= 999+bandwidthSavedTolerance
;
474 realBytesToSpend
= newRealBytesToSpend
;
478 m_SentBytesSinceLastCall
+= spentBytes
;
479 m_SentBytesSinceLastCallOverhead
+= spentOverhead
;
481 if ((spentBytes
== 0) && (spentOverhead
== 0)) {
482 extraSleepTime
= std::min
<uint32
>(extraSleepTime
* 5, 1000); // 1s at most
484 extraSleepTime
= TIME_BETWEEN_UPLOAD_LOOPS
;
490 wxMutexLocker
queueLock(m_tempQueueLocker
);
491 m_TempControlQueue_list
.clear();
492 m_TempControlQueueFirst_list
.clear();
495 wxMutexLocker
sendLock(m_sendLocker
);
496 m_ControlQueue_list
.clear();
497 m_StandardOrder_list
.clear();
501 // File_checked_for_headers