Upstream tarball 9964
[amule.git] / src / UploadQueue.cpp
blobf3304858aff876d064b3ac00851f9f5db6d3a7c6
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2003-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2008 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
20 //
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadQueue.h" // Interface declarations
28 #include <protocol/Protocols.h>
29 #include <protocol/ed2k/Client2Client/TCP.h>
30 #include <common/Macros.h>
31 #include <common/Constants.h>
33 #include <cmath>
35 #include "Types.h" // Do_not_auto_remove (win32)
37 #ifdef __WXMSW__
38 #include <winsock.h> // Do_not_auto_remove
39 #else
40 #include <sys/types.h> // Do_not_auto_remove
41 #include <netinet/in.h> // Do_not_auto_remove
42 #include <arpa/inet.h> // Do_not_auto_remove
43 #endif
45 #include "ServerConnect.h" // Needed for CServerConnect
46 #include "KnownFile.h" // Needed for CKnownFile
47 #include "Packet.h" // Needed for CPacket
48 #include "ClientTCPSocket.h" // Needed for CClientTCPSocket
49 #include "SharedFileList.h" // Needed for CSharedFileList
50 #include "updownclient.h" // Needed for CUpDownClient
51 #include "amule.h" // Needed for theApp
52 #include "Preferences.h"
53 #include "ClientList.h"
54 #include "Statistics.h" // Needed for theStats
55 #include "Logger.h"
56 #include <common/Format.h>
57 #include "UploadBandwidthThrottler.h"
58 #include "GuiEvents.h" // Needed for Notify_*
61 //TODO rewrite the whole networkcode, use overlapped sockets
63 CUploadQueue::CUploadQueue()
65 m_nLastStartUpload = 0;
66 lastupslotHighID = true;
67 m_allowKicking = true;
71 void CUploadQueue::AddUpNextClient(CUpDownClient* directadd)
73 CClientPtrList::iterator toadd = m_waitinglist.end();
74 CClientPtrList::iterator toaddlow = m_waitinglist.end();
76 uint32_t bestscore = 0;
77 uint32_t bestlowscore = 0;
79 CUpDownClient* newclient;
80 // select next client or use given client
81 if (!directadd) {
82 // Track if we purged any clients from the queue, as to only send one notify in total
83 bool purged = false;
85 CClientPtrList::iterator it = m_waitinglist.begin();
86 for (; it != m_waitinglist.end(); ) {
87 CClientPtrList::iterator tmp_it = it++;
88 CUpDownClient* cur_client = *tmp_it;
90 // clear dead clients
91 if ( (::GetTickCount() - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME) || !theApp->sharedfiles->GetFileByID(cur_client->GetUploadFileID()) ) {
92 purged = true;
93 cur_client->ClearWaitStartTime();
94 RemoveFromWaitingQueue(tmp_it);
95 if (!cur_client->GetSocket()) {
96 if(cur_client->Disconnected(wxT("AddUpNextClient - purged"))) {
97 cur_client->Safe_Delete();
98 cur_client = NULL;
101 continue;
104 suspendlist::iterator it2 = std::find( suspended_uploads_list.begin(),
105 suspended_uploads_list.end(),
106 cur_client->GetUploadFileID() );
107 if (cur_client->IsBanned() || it2 != suspended_uploads_list.end() ) { // Banned client or suspended upload ?
108 continue;
110 // finished clearing
112 uint32_t cur_score = cur_client->GetScore(true);
113 if (cur_score > bestscore) {
114 bestscore = cur_score;
115 toadd = tmp_it;
116 } else {
117 cur_score = cur_client->GetScore(false);
118 if ((cur_score > bestlowscore) && !cur_client->m_bAddNextConnect){
119 bestlowscore = cur_score;
120 toaddlow = tmp_it;
125 // Update the count on GUI if any clients were purged
126 if (purged) {
127 Notify_ShowQueueCount(m_waitinglist.size());
130 if (bestlowscore > bestscore){
131 newclient = *toaddlow;
132 newclient->m_bAddNextConnect = true;
135 if (toadd == m_waitinglist.end()) {
136 return;
139 newclient = *toadd;
140 lastupslotHighID = true; // VQB LowID alternate
141 RemoveFromWaitingQueue(toadd);
142 Notify_ShowQueueCount(m_waitinglist.size());
143 } else {
144 //prevent another potential access of a suspended upload
146 suspendlist::iterator it = std::find( suspended_uploads_list.begin(),
147 suspended_uploads_list.end(),
148 directadd->GetUploadFileID() );
149 if ( it != suspended_uploads_list.end() ) {
150 return;
151 } else {
152 newclient = directadd;
156 if (IsDownloading(newclient)) {
157 return;
159 // tell the client that we are now ready to upload
160 if (!newclient->IsConnected()) {
161 newclient->SetUploadState(US_CONNECTING);
162 if (!newclient->TryToConnect(true)) {
163 return;
165 } else {
166 CPacket* packet = new CPacket(OP_ACCEPTUPLOADREQ, 0, OP_EDONKEYPROT);
167 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
168 AddDebugLogLineM( false, logLocalClient, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + newclient->GetFullIP() );
169 newclient->SendPacket(packet,true);
170 newclient->SetUploadState(US_UPLOADING);
172 newclient->SetUpStartTime();
173 newclient->ResetSessionUp();
175 theApp->uploadBandwidthThrottler->AddToStandardList(m_uploadinglist.size(), newclient->GetSocket());
176 m_uploadinglist.push_back(newclient);
177 theStats::AddUploadingClient();
179 // Statistic
180 CKnownFile* reqfile = (CKnownFile*) newclient->GetUploadFile();
181 if (reqfile) {
182 reqfile->statistic.AddAccepted();
184 Notify_UploadCtrlAddClient(newclient);
187 void CUploadQueue::Process()
189 // Check if someone's waiting, if there is a slot for him,
190 // or if we should try to free a slot for him
191 uint32 tick = GetTickCount();
192 // Nobody waiting or upload started recently
193 // (Actually instead of "empty" it should check for "no HighID clients queued",
194 // but the cost for that outweights the benefit. As it is, a slot will be freed
195 // even if it can't be taken because all of the queue is LowID. But just one,
196 // and the kicked client will instantly get it back if he has HighID.)
197 if (m_waitinglist.empty() || tick - m_nLastStartUpload < 1000) {
198 m_allowKicking = false;
199 // Already a slot free, try to fill it
200 } else if (m_uploadinglist.size() < GetMaxSlots()) {
201 m_allowKicking = false;
202 m_nLastStartUpload = tick;
203 AddUpNextClient();
204 // All slots taken, try to free one
205 } else {
206 m_allowKicking = true;
209 // The loop that feeds the upload slots with data.
210 CClientPtrList::iterator it = m_uploadinglist.begin();
211 while (it != m_uploadinglist.end()) {
212 // Get the client. Note! Also updates pos as a side effect.
213 CUpDownClient* cur_client = *it++;
215 // It seems chatting or friend slots can get stuck at times in upload.. This needs looked into..
216 if (!cur_client->GetSocket()) {
217 RemoveFromUploadQueue(cur_client);
218 if(cur_client->Disconnected(_T("CUploadQueue::Process"))){
219 cur_client->Safe_Delete();
221 } else {
222 cur_client->SendBlockData();
226 // Save used bandwidth for speed calculations
227 uint64 sentBytes = theApp->uploadBandwidthThrottler->GetNumberOfSentBytesSinceLastCallAndReset();
228 (void)theApp->uploadBandwidthThrottler->GetNumberOfSentBytesOverheadSinceLastCallAndReset();
230 // Update statistics
231 if (sentBytes) {
232 theStats::AddSentBytes(sentBytes);
237 uint16 CUploadQueue::GetMaxSlots() const
239 uint16 nMaxSlots = 0;
240 float kBpsUpPerClient = (float)thePrefs::GetSlotAllocation();
241 if (thePrefs::GetMaxUpload() == UNLIMITED) {
242 float kBpsUp = theStats::GetUploadRate() / 1024.0f;
243 nMaxSlots = (uint16)(kBpsUp / kBpsUpPerClient) + 2;
244 } else {
245 if (thePrefs::GetMaxUpload() >= 10) {
246 nMaxSlots = (uint16)floor((float)thePrefs::GetMaxUpload() / kBpsUpPerClient + 0.5);
247 // floor(x + 0.5) is a way of doing round(x) that works with gcc < 3 ...
248 if (nMaxSlots < MIN_UP_CLIENTS_ALLOWED) {
249 nMaxSlots=MIN_UP_CLIENTS_ALLOWED;
251 } else {
252 nMaxSlots = MIN_UP_CLIENTS_ALLOWED;
255 if (nMaxSlots > MAX_UP_CLIENTS_ALLOWED) {
256 nMaxSlots = MAX_UP_CLIENTS_ALLOWED;
258 return nMaxSlots;
262 CUploadQueue::~CUploadQueue()
264 wxASSERT(m_waitinglist.empty());
265 wxASSERT(m_uploadinglist.empty());
269 bool CUploadQueue::IsOnUploadQueue(const CUpDownClient* client) const
271 return std::find(m_waitinglist.begin(), m_waitinglist.end(), client)
272 != m_waitinglist.end();
276 bool CUploadQueue::IsDownloading(CUpDownClient* client) const
278 return std::find(m_uploadinglist.begin(), m_uploadinglist.end(), client)
279 != m_uploadinglist.end();
283 CUpDownClient* CUploadQueue::GetWaitingClientByIP_UDP(uint32 dwIP, uint16 nUDPPort, bool bIgnorePortOnUniqueIP, bool* pbMultipleIPs)
285 CUpDownClient* pMatchingIPClient = NULL;
287 int cMatches = 0;
289 CClientPtrList::iterator it = m_waitinglist.begin();
290 for (; it != m_waitinglist.end(); ++it) {
291 CUpDownClient* cur_client = *it;
293 if ((dwIP == cur_client->GetIP()) && (nUDPPort == cur_client->GetUDPPort())) {
294 return cur_client;
295 } else if ((dwIP == cur_client->GetIP()) && bIgnorePortOnUniqueIP) {
296 pMatchingIPClient = cur_client;
297 cMatches++;
301 if (pbMultipleIPs) {
302 *pbMultipleIPs = cMatches > 1;
305 if (pMatchingIPClient && cMatches == 1) {
306 return pMatchingIPClient;
307 } else {
308 return NULL;
313 void CUploadQueue::AddClientToQueue(CUpDownClient* client)
315 if (theApp->serverconnect->IsConnected() && theApp->serverconnect->IsLowID() && !theApp->serverconnect->IsLocalServer(client->GetServerIP(),client->GetServerPort()) && client->GetDownloadState() == DS_NONE && !client->IsFriend() && theStats::GetWaitingUserCount() > 50) {
316 // Well, all that issues finish in the same: don't allow to add to the queue
317 return;
320 if ( client->IsBanned() ) {
321 return;
324 client->AddAskedCount();
325 client->SetLastUpRequest();
327 // Find all clients with the same user-hash
328 CClientList::SourceList found = theApp->clientlist->GetClientsByHash( client->GetUserHash() );
330 CClientList::SourceList::iterator it = found.begin();
331 while (it != found.end()) {
332 CUpDownClient* cur_client = *it++;
334 if ( IsOnUploadQueue( cur_client ) ) {
335 if ( cur_client == client ) {
336 // This is where LowID clients get their upload slot assigned.
337 // They can't be contacted if they reach top of the queue, so they are just marked for uploading.
338 // When they reconnect next time AddClientToQueue() is called, and they get their slot
339 // through the connection they initiated.
340 // Since at that time no slot is free they get assigned an extra slot,
341 // so then the number of slots exceeds the configured number by one.
342 // To prevent a further increase no more LowID clients get a slot, until
343 // - a HighID client has got one (which happens only after two clients
344 // have been kicked so a slot is free again)
345 // - or there is a free slot, which means there is no HighID client on queue
346 if (client->m_bAddNextConnect) {
347 uint16 maxSlots = GetMaxSlots();
348 if (lastupslotHighID) {
349 maxSlots++;
351 if (m_uploadinglist.size() < maxSlots) {
352 client->m_bAddNextConnect = false;
353 RemoveFromWaitingQueue(client);
354 AddUpNextClient(client);
355 lastupslotHighID = false; // LowID alternate
356 return;
360 client->SendRankingInfo();
361 Notify_QlistRefreshClient(client);
362 return;
364 } else {
365 // Hash-clash, remove unidentified clients (possibly both)
367 if ( !cur_client->IsIdentified() ) {
368 // Cur_client isn't identifed, remove it
369 theApp->clientlist->AddTrackClient( cur_client );
371 RemoveFromWaitingQueue( cur_client );
372 if ( !cur_client->GetSocket() ) {
373 if (cur_client->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
374 cur_client->Safe_Delete();
379 if ( !client->IsIdentified() ) {
380 // New client isn't identified, remove it
381 theApp->clientlist->AddTrackClient( client );
383 if ( !client->GetSocket() ) {
384 if ( client->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
385 client->Safe_Delete();
389 return;
395 // Count the number of clients with the same IP-address
396 found = theApp->clientlist->GetClientsByIP( client->GetIP() );
398 int ipCount = 0;
399 for ( it = found.begin(); it != found.end(); it++ ) {
400 if ( ( *it == client ) || IsOnUploadQueue( *it ) ) {
401 ipCount++;
405 // We do not accept more than 3 clients from the same IP
406 if ( ipCount > 3 ) {
407 return;
408 } else if ( theApp->clientlist->GetClientsFromIP(client->GetIP()) >= 3 ) {
409 return;
412 // statistic values
413 CKnownFile* reqfile = (CKnownFile*) client->GetUploadFile();
414 if (reqfile) {
415 reqfile->statistic.AddRequest();
418 // TODO find better ways to cap the list
419 if (m_waitinglist.size() >= (thePrefs::GetQueueSize())) {
420 return;
423 if (client->IsDownloading()) {
424 // he's already downloading and wants probably only another file
425 CPacket* packet = new CPacket(OP_ACCEPTUPLOADREQ, 0, OP_EDONKEYPROT);
426 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
427 AddDebugLogLineM( false, logLocalClient, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + client->GetFullIP() );
428 client->SendPacket(packet,true);
429 return;
432 uint32 tick = GetTickCount();
433 client->ClearWaitStartTime();
434 // if possible start upload right away
435 if (m_waitinglist.empty() && tick - m_nLastStartUpload >= 1000 && m_uploadinglist.size() < GetMaxSlots()) {
436 AddUpNextClient(client);
437 m_nLastStartUpload = tick;
438 } else {
439 m_waitinglist.push_back(client);
440 theStats::AddWaitingClient();
441 client->ClearAskedCount();
442 client->SetUploadState(US_ONUPLOADQUEUE);
443 client->SendRankingInfo();
444 Notify_QlistAddClient(client);
445 Notify_ShowQueueCount(m_waitinglist.size());
450 bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient* client)
452 // Keep track of this client
453 theApp->clientlist->AddTrackClient(client);
455 CClientPtrList::iterator it = std::find(m_uploadinglist.begin(),
456 m_uploadinglist.end(), client);
458 if (it != m_uploadinglist.end()) {
459 Notify_UploadCtrlRemoveClient(client);
460 m_uploadinglist.erase(it);
461 theStats::RemoveUploadingClient();
462 if( client->GetTransferredUp() ) {
463 theStats::AddSuccessfulUpload();
464 theStats::AddUploadTime(client->GetUpStartTimeDelay() / 1000);
465 } else {
466 theStats::AddFailedUpload();
468 client->SetUploadState(US_NONE);
469 client->ClearUploadBlockRequests();
470 return true;
473 return false;
477 bool CUploadQueue::CheckForTimeOver(CUpDownClient* client)
479 // Don't kick anybody if there's no need to
480 if (!m_allowKicking) {
481 return false;
483 // First, check if it is a VIP slot (friend or Release-Prio).
484 if (client->GetFriendSlot()) {
485 return false; // never drop the friend
487 // Release-Prio and nobody on queue for it?
488 if (client->GetUploadFile()->GetUpPriority() == PR_POWERSHARE) {
489 // Keep it unless half of the UL slots are occupied with friends or Release uploads.
490 uint16 vips = 0;
491 for (CClientPtrList::iterator it = m_uploadinglist.begin(); it != m_uploadinglist.end(); ++it) {
492 CUpDownClient* cur_client = *it;
493 if (cur_client->GetFriendSlot() || cur_client->GetUploadFile()->GetUpPriority() == PR_POWERSHARE) {
494 vips++;
497 // allow if VIP uploads occupy at most half of the possible upload slots
498 if (vips <= GetMaxSlots() / 2) {
499 return false;
501 // Otherwise normal rules apply.
504 // Ordinary slots
505 bool kickHim = false;
507 if (thePrefs::TransferFullChunks()) {
508 // "Transfer full chunks": drop client after 10 MB upload, or after an hour.
509 // (so average UL speed should at least be 2.84 kB/s)
510 // We don't track what he is downloading, but if it's all from one chunk he gets it.
511 if (client->GetUpStartTimeDelay() > 3600000 // time: 1h
512 || client->GetSessionUp() > 10485760) { // data: 10MB
513 kickHim = true;
515 } else {
516 uint32 clientScore = client->GetScore(true,true);
517 CClientPtrList::iterator it = m_waitinglist.begin();
518 for (; it != m_waitinglist.end(); ++it ) {
519 if (clientScore < (*it)->GetScore(true,false)) {
520 kickHim = true;
521 break;
526 if (kickHim) {
527 m_allowKicking = false; // kick max one client per cycle
530 return kickHim;
534 uint16 CUploadQueue::GetWaitingPosition(const CUpDownClient *client) const
536 if ( !IsOnUploadQueue(client) ) {
537 return 0;
540 uint16 rank = 1;
541 const uint32 myscore = client->GetScore(false);
542 CClientPtrList::const_iterator it = m_waitinglist.begin();
543 for (; it != m_waitinglist.end(); ++it) {
544 if ((*it)->GetScore(false) > myscore) {
545 rank++;
549 return rank;
554 * This function removes a file indicated by filehash from suspended_uploads_list.
556 void CUploadQueue::ResumeUpload( const CMD4Hash& filehash )
558 //Find the position of the filehash in the list and remove it.
559 suspendlist::iterator it = std::find( suspended_uploads_list.begin(),
560 suspended_uploads_list.end(),
561 filehash );
562 if ( it != suspended_uploads_list.end() )
563 suspended_uploads_list.erase( it );
565 AddLogLineM( false, CFormat( _("Resuming uploads of file: %s" ) )
566 % filehash.Encode() );
570 * This function adds a file indicated by filehash to suspended_uploads_list
572 uint16 CUploadQueue::SuspendUpload( const CMD4Hash& filehash )
574 AddLogLineM( false, CFormat( _("Suspending upload of file: %s" ) )
575 % filehash.Encode() );
576 uint16 removed = 0;
578 //Append the filehash to the list.
579 suspended_uploads_list.push_back(filehash);
580 wxString base16hash = filehash.Encode();
582 CClientPtrList::iterator it = m_uploadinglist.begin();
583 while (it != m_uploadinglist.end()) {
584 CUpDownClient *potential = *it++;
585 //check if the client is uploading the file we need to suspend
586 if(potential->GetUploadFileID() == filehash) {
587 //remove the unlucky client from the upload queue and add to the waiting queue
588 RemoveFromUploadQueue(potential);
590 m_waitinglist.push_back(potential);
591 theStats::AddWaitingClient();
592 potential->SetUploadState(US_ONUPLOADQUEUE);
593 potential->SendRankingInfo();
594 Notify_QlistRefreshClient(potential);
595 Notify_ShowQueueCount(m_waitinglist.size());
596 removed++;
599 return removed;
602 bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient* client)
604 CClientPtrList::iterator it = std::find(m_waitinglist.begin(),
605 m_waitinglist.end(), client);
607 if (it != m_waitinglist.end()) {
608 RemoveFromWaitingQueue(it);
609 Notify_ShowQueueCount(m_waitinglist.size());
610 return true;
611 } else {
612 return false;
617 void CUploadQueue::RemoveFromWaitingQueue(CClientPtrList::iterator pos)
619 CUpDownClient* todelete = *pos;
620 m_waitinglist.erase(pos);
621 theStats::RemoveWaitingClient();
622 if( todelete->IsBanned() ) {
623 todelete->UnBan();
625 Notify_QlistRemoveClient(todelete);
626 todelete->SetUploadState(US_NONE);
629 // File_checked_for_headers