Fix the equality operator
[amule.git] / src / UploadBandwidthThrottler.cpp
blob67ebabf2532bb6fa7e884db4e63315d2c6422ff3
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2005-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadBandwidthThrottler.h"
28 #include <protocol/ed2k/Constants.h>
29 #include <common/Macros.h>
30 #include <common/Constants.h>
32 #include <cmath>
33 #include "OtherFunctions.h"
34 #include "ThrottledSocket.h"
35 #include "Logger.h"
36 #include "Preferences.h"
37 #include "Statistics.h"
40 /////////////////////////////////////
43 /**
44 * The constructor starts the thread.
46 UploadBandwidthThrottler::UploadBandwidthThrottler()
47 : wxThread( wxTHREAD_JOINABLE )
49 m_SentBytesSinceLastCall = 0;
50 m_SentBytesSinceLastCallOverhead = 0;
52 m_doRun = true;
54 Create();
55 Run();
59 /**
60 * The destructor stops the thread. If the thread has already stoppped, destructor does nothing.
62 UploadBandwidthThrottler::~UploadBandwidthThrottler()
64 EndThread();
68 /**
69 * Find out how many bytes that has been put on the sockets since the last call to this
70 * method. Includes overhead of control packets.
72 * @return the number of bytes that has been put on the sockets since the last call
74 uint64 UploadBandwidthThrottler::GetNumberOfSentBytesSinceLastCallAndReset()
76 wxMutexLocker lock( m_sendLocker );
78 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCall;
79 m_SentBytesSinceLastCall = 0;
81 return numberOfSentBytesSinceLastCall;
84 /**
85 * Find out how many bytes that has been put on the sockets since the last call to this
86 * method. Excludes overhead of control packets.
88 * @return the number of bytes that has been put on the sockets since the last call
90 uint64 UploadBandwidthThrottler::GetNumberOfSentBytesOverheadSinceLastCallAndReset()
92 wxMutexLocker lock( m_sendLocker );
94 uint64 numberOfSentBytesSinceLastCall = m_SentBytesSinceLastCallOverhead;
95 m_SentBytesSinceLastCallOverhead = 0;
97 return numberOfSentBytesSinceLastCall;
102 * Add a socket to the list of sockets that have upload slots. The main thread will
103 * continously call send on these sockets, to give them chance to work off their queues.
104 * The sockets are called in the order they exist in the list, so the top socket (index 0)
105 * will be given a chance first to use bandwidth, and then the next socket (index 1) etc.
107 * It is possible to add a socket several times to the list without removing it inbetween,
108 * but that should be avoided.
110 * @param index insert the socket at this place in the list. An index that is higher than the
111 * current number of sockets in the list will mean that the socket should be inserted
112 * last in the list.
114 * @param socket the address to the socket that should be added to the list. If the address is NULL,
115 * this method will do nothing.
117 void UploadBandwidthThrottler::AddToStandardList(uint32 index, ThrottledFileSocket* socket)
119 if ( socket ) {
120 wxMutexLocker lock( m_sendLocker );
122 RemoveFromStandardListNoLock(socket);
123 if (index > (uint32)m_StandardOrder_list.size()) {
124 index = m_StandardOrder_list.size();
127 m_StandardOrder_list.insert(m_StandardOrder_list.begin() + index, socket);
133 * Remove a socket from the list of sockets that have upload slots.
135 * If the socket has mistakenly been added several times to the list, this method
136 * will return all of the entries for the socket.
138 * @param socket the address of the socket that should be removed from the list. If this socket
139 * does not exist in the list, this method will do nothing.
141 bool UploadBandwidthThrottler::RemoveFromStandardList(ThrottledFileSocket* socket)
143 wxMutexLocker lock( m_sendLocker );
145 return RemoveFromStandardListNoLock(socket);
150 * Remove a socket from the list of sockets that have upload slots. NOT THREADSAFE!
151 * This is an internal method that doesn't take the necessary lock before it removes
152 * the socket. This method should only be called when the current thread already owns
153 * the m_sendLocker lock!
155 * @param socket address of the socket that should be removed from the list. If this socket
156 * does not exist in the list, this method will do nothing.
158 bool UploadBandwidthThrottler::RemoveFromStandardListNoLock(ThrottledFileSocket* socket)
160 return (EraseFirstValue( m_StandardOrder_list, socket ) > 0);
165 * Notifies the send thread that it should try to call controlpacket send
166 * for the supplied socket. It is allowed to call this method several times
167 * for the same socket, without having controlpacket send called for the socket
168 * first. The doublette entries are never filtered, since it is incurs less cpu
169 * overhead to simply call Send() in the socket for each double. Send() will
170 * already have done its work when the second Send() is called, and will just
171 * return with little cpu overhead.
173 * @param socket address to the socket that requests to have controlpacket send
174 * to be called on it
176 void UploadBandwidthThrottler::QueueForSendingControlPacket(ThrottledControlSocket* socket, bool hasSent)
178 // Get critical section
179 wxMutexLocker lock( m_tempQueueLocker );
181 if ( m_doRun ) {
182 if( hasSent ) {
183 m_TempControlQueueFirst_list.push_back(socket);
184 } else {
185 m_TempControlQueue_list.push_back(socket);
193 * Remove the socket from all lists and queues. This will make it safe to
194 * erase/delete the socket. It will also cause the main thread to stop calling
195 * send() for the socket.
197 * @param socket address to the socket that should be removed
199 void UploadBandwidthThrottler::DoRemoveFromAllQueues(ThrottledControlSocket* socket)
201 if ( m_doRun ) {
202 // Remove this socket from control packet queue
203 EraseValue( m_ControlQueue_list, socket );
204 EraseValue( m_ControlQueueFirst_list, socket );
206 wxMutexLocker lock( m_tempQueueLocker );
207 EraseValue( m_TempControlQueue_list, socket );
208 EraseValue( m_TempControlQueueFirst_list, socket );
213 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledControlSocket* socket)
215 wxMutexLocker lock( m_sendLocker );
217 DoRemoveFromAllQueues( socket );
221 void UploadBandwidthThrottler::RemoveFromAllQueues(ThrottledFileSocket* socket)
223 wxMutexLocker lock( m_sendLocker );
225 if (m_doRun) {
226 DoRemoveFromAllQueues(socket);
228 // And remove it from upload slots
229 RemoveFromStandardListNoLock(socket);
235 * Make the thread exit. This method will not return until the thread has stopped
236 * looping. This guarantees that the thread will not access the CEMSockets after this
237 * call has exited.
239 void UploadBandwidthThrottler::EndThread()
241 if (m_doRun) { // do it only once
243 wxMutexLocker lock(m_sendLocker);
245 // signal the thread to stop looping and exit.
246 m_doRun = false;
249 Wait();
255 * The thread method that handles calling send for the individual sockets.
257 * Control packets will always be tried to be sent first. If there is any bandwidth leftover
258 * after that, send() for the upload slot sockets will be called in priority order until we have run
259 * out of available bandwidth for this loop. Upload slots will not be allowed to go without having sent
260 * called for more than a defined amount of time (i.e. two seconds).
262 * @return always returns 0.
264 void* UploadBandwidthThrottler::Entry()
266 const uint32 TIME_BETWEEN_UPLOAD_LOOPS = 1;
268 uint32 lastLoopTick = GetTickCountFullRes();
269 // Bytes to spend in current cycle. If we spend more this becomes negative and causes a wait next time.
270 sint32 bytesToSpend = 0;
271 uint32 allowedDataRate = 0;
272 uint32 rememberedSlotCounter = 0;
273 uint32 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
275 while (m_doRun && !TestDestroy()) {
276 uint32 timeSinceLastLoop = GetTickCountFullRes() - lastLoopTick;
278 // Calculate data rate
279 if (thePrefs::GetMaxUpload() == UNLIMITED) {
280 // Try to increase the upload rate from UploadSpeedSense
281 allowedDataRate = (uint32)theStats::GetUploadRate() + 5 * 1024;
282 } else {
283 allowedDataRate = thePrefs::GetMaxUpload() * 1024;
286 uint32 minFragSize = 1300;
287 uint32 doubleSendSize = minFragSize*2; // send two packages at a time so they can share an ACK
288 if (allowedDataRate < 6*1024) {
289 minFragSize = 536;
290 doubleSendSize = minFragSize; // don't send two packages at a time at very low speeds to give them a smoother load
294 uint32 sleepTime;
295 if (bytesToSpend < 1) {
296 // We have sent more than allowed in last cycle so we have to wait now
297 // until we can send at least 1 byte.
298 sleepTime = std::max((-bytesToSpend + 1) * 1000 / allowedDataRate + 2, // add 2 ms to allow for rounding inaccuracies
299 extraSleepTime);
300 } else {
301 // We could send at once, but sleep a while to not suck up all cpu
302 sleepTime = extraSleepTime;
305 if (timeSinceLastLoop < sleepTime) {
306 Sleep(sleepTime-timeSinceLastLoop);
309 // Check after sleep in case the thread has been signaled to end
310 if (!m_doRun || TestDestroy()) {
311 break;
314 const uint32 thisLoopTick = GetTickCountFullRes();
315 timeSinceLastLoop = thisLoopTick - lastLoopTick;
316 lastLoopTick = thisLoopTick;
318 if (timeSinceLastLoop > sleepTime + 2000) {
319 AddDebugLogLineN(logGeneral, CFormat(wxT("UploadBandwidthThrottler: Time since last loop too long. time: %ims wanted: %ims Max: %ims"))
320 % timeSinceLastLoop % sleepTime % (sleepTime + 2000));
322 timeSinceLastLoop = sleepTime + 2000;
325 // Calculate how many bytes we can spend
327 bytesToSpend += (sint32) (allowedDataRate / 1000.0 * timeSinceLastLoop);
329 if (bytesToSpend >= 1) {
330 sint32 spentBytes = 0;
331 sint32 spentOverhead = 0;
333 wxMutexLocker sendLock(m_sendLocker);
336 wxMutexLocker queueLock(m_tempQueueLocker);
338 // are there any sockets in m_TempControlQueue_list? Move them to normal m_ControlQueue_list;
339 m_ControlQueueFirst_list.insert( m_ControlQueueFirst_list.end(),
340 m_TempControlQueueFirst_list.begin(),
341 m_TempControlQueueFirst_list.end() );
343 m_ControlQueue_list.insert( m_ControlQueue_list.end(),
344 m_TempControlQueue_list.begin(),
345 m_TempControlQueue_list.end() );
347 m_TempControlQueue_list.clear();
348 m_TempControlQueueFirst_list.clear();
351 // Send any queued up control packets first
352 while (spentBytes < bytesToSpend && (!m_ControlQueueFirst_list.empty() || !m_ControlQueue_list.empty())) {
353 ThrottledControlSocket* socket = NULL;
355 if (!m_ControlQueueFirst_list.empty()) {
356 socket = m_ControlQueueFirst_list.front();
357 m_ControlQueueFirst_list.pop_front();
358 } else if (!m_ControlQueue_list.empty()) {
359 socket = m_ControlQueue_list.front();
360 m_ControlQueue_list.pop_front();
363 if (socket != NULL) {
364 SocketSentBytes socketSentBytes = socket->SendControlData(bytesToSpend-spentBytes, minFragSize);
365 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
366 spentOverhead += socketSentBytes.sentBytesControlPackets;
370 // Check if any sockets haven't gotten data for a long time. Then trickle them a package.
371 uint32 slots = m_StandardOrder_list.size();
372 for (uint32 slotCounter = 0; slotCounter < slots; slotCounter++) {
373 ThrottledFileSocket* socket = m_StandardOrder_list[ slotCounter ];
375 if (socket != NULL) {
376 if (thisLoopTick-socket->GetLastCalledSend() > SEC2MS(1)) {
377 // trickle
378 uint32 neededBytes = socket->GetNeededBytes();
380 if (neededBytes > 0) {
381 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(neededBytes, minFragSize);
382 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
383 spentOverhead += socketSentBytes.sentBytesControlPackets;
386 } else {
387 AddDebugLogLineN(logGeneral, CFormat( wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (trickle)! Prevented usage. Index: %i Size: %i"))
388 % slotCounter % m_StandardOrder_list.size());
392 // Give available bandwidth to slots, starting with the one we ended with last time.
393 // There are two passes. First pass gives packets of doubleSendSize, second pass
394 // gives as much as possible.
395 // Second pass starts with the last slot of the first pass actually.
396 for (uint32 slotCounter = 0; (slotCounter < slots * 2) && spentBytes < bytesToSpend; slotCounter++) {
397 if (rememberedSlotCounter >= slots) { // wrap around pointer
398 rememberedSlotCounter = 0;
401 uint32 data = (slotCounter < slots - 1) ? doubleSendSize // pass 1
402 : (bytesToSpend - spentBytes); // pass 2
404 ThrottledFileSocket* socket = m_StandardOrder_list[ rememberedSlotCounter ];
406 if (socket != NULL) {
407 SocketSentBytes socketSentBytes = socket->SendFileAndControlData(data, doubleSendSize);
408 spentBytes += socketSentBytes.sentBytesControlPackets + socketSentBytes.sentBytesStandardPackets;
409 spentOverhead += socketSentBytes.sentBytesControlPackets;
410 } else {
411 AddDebugLogLineN(logGeneral, CFormat(wxT("There was a NULL socket in the UploadBandwidthThrottler Standard list (equal-for-all)! Prevented usage. Index: %i Size: %i"))
412 % rememberedSlotCounter % m_StandardOrder_list.size());
415 rememberedSlotCounter++;
418 // Do some limiting of what we keep for the next loop.
419 bytesToSpend -= spentBytes;
420 sint32 minBytesToSpend = (slots + 1) * minFragSize;
422 if (bytesToSpend < - minBytesToSpend) {
423 bytesToSpend = - minBytesToSpend;
424 } else {
425 sint32 bandwidthSavedTolerance = slots * 512 + 1;
426 if (bytesToSpend > bandwidthSavedTolerance) {
427 bytesToSpend = bandwidthSavedTolerance;
431 m_SentBytesSinceLastCall += spentBytes;
432 m_SentBytesSinceLastCallOverhead += spentOverhead;
434 if (spentBytes == 0) { // spentBytes includes the overhead
435 extraSleepTime = std::min<uint32>(extraSleepTime * 5, 1000); // 1s at most
436 } else {
437 extraSleepTime = TIME_BETWEEN_UPLOAD_LOOPS;
443 wxMutexLocker queueLock(m_tempQueueLocker);
444 m_TempControlQueue_list.clear();
445 m_TempControlQueueFirst_list.clear();
448 wxMutexLocker sendLock(m_sendLocker);
449 m_ControlQueue_list.clear();
450 m_StandardOrder_list.clear();
452 return 0;
454 // File_checked_for_headers