From f035fa75b4c1ad2ce02f92b687fe89ab45867ff5 Mon Sep 17 00:00:00 2001 From: upstream svn Date: Thu, 30 Sep 2010 19:31:49 +0000 Subject: [PATCH] Rewrote upload queue for better performance Waiting list is now sorted by queue rank. Clients now carry their rank and score in member variables, so they can be got in zero time for GUI or EC transfer. Scores and ranks are updated when a client is added or chosen for upload, or after 2 min. Should fix problem from http://forum.amule.org/index.php?topic=17956.0 And client list sorted by queue rank with large queue should greatly gain. :-) Also fixes the bug that sometimes two clients showed the same queue rank. --- .svn-revision | 2 +- src/BaseClient.cpp | 2 + src/ClientTCPSocket.cpp | 2 +- src/ClientUDPSocket.cpp | 2 +- src/ECSpecialCoreTags.cpp | 2 +- src/UploadClient.cpp | 11 +-- src/UploadQueue.cpp | 211 ++++++++++++++++++++++++++++------------------ src/UploadQueue.h | 3 +- src/amule-remote-gui.cpp | 2 - src/updownclient.h | 14 +-- 10 files changed, 144 insertions(+), 107 deletions(-) diff --git a/.svn-revision b/.svn-revision index 18e1a405..7285218b 100644 --- a/.svn-revision +++ b/.svn-revision @@ -1 +1 @@ -10300 +10303 diff --git a/src/BaseClient.cpp b/src/BaseClient.cpp index b0f2a579..65daaf14 100644 --- a/src/BaseClient.cpp +++ b/src/BaseClient.cpp @@ -212,6 +212,8 @@ void CUpDownClient::Init() m_fSharedDirectories = 0; m_lastPartAsked = 0xffff; m_nUpCompleteSourcesCount= 0; + m_waitingPosition = 0; + m_score = 0; m_lastRefreshedDLDisplay = 0; m_bHelloAnswerPending = false; m_fSentCancelTransfer = 0; diff --git a/src/ClientTCPSocket.cpp b/src/ClientTCPSocket.cpp index 67256827..847d2fc0 100644 --- a/src/ClientTCPSocket.cpp +++ b/src/ClientTCPSocket.cpp @@ -1811,7 +1811,7 @@ bool CClientTCPSocket::ProcessExtPacket(const byte* buffer, uint32 size, uint8 o } } - data_out.WriteUInt16(theApp->uploadqueue->GetWaitingPosition(sender)); + data_out.WriteUInt16(sender->GetUploadQueueWaitingPosition()); CPacket* response = new CPacket(data_out, OP_EMULEPROT, OP_REASKACK); theStats::AddUpOverheadFileRequest(response->GetPacketSize()); AddDebugLogLineN( logLocalClient, wxT("Local Client UDP: OP_REASKACK to ") + m_client->GetFullIP() ); diff --git a/src/ClientUDPSocket.cpp b/src/ClientUDPSocket.cpp index 5f802bc1..4c53f96e 100644 --- a/src/ClientUDPSocket.cpp +++ b/src/ClientUDPSocket.cpp @@ -219,7 +219,7 @@ void CClientUDPSocket::ProcessPacket(byte* packet, int16 size, int8 opcode, uint } } - data_out.WriteUInt16(theApp->uploadqueue->GetWaitingPosition(sender)); + data_out.WriteUInt16(sender->GetUploadQueueWaitingPosition()); CPacket* response = new CPacket(data_out, OP_EMULEPROT, OP_REASKACK); theStats::AddUpOverheadFileRequest(response->GetPacketSize()); AddDebugLogLineN( logClientUDP, wxT("Client UDP socket: OP_REASKACK to ") + sender->GetFullIP()); diff --git a/src/ECSpecialCoreTags.cpp b/src/ECSpecialCoreTags.cpp index bb07871d..fe3e63a7 100644 --- a/src/ECSpecialCoreTags.cpp +++ b/src/ECSpecialCoreTags.cpp @@ -252,7 +252,7 @@ CEC_UpDownClient_Tag::CEC_UpDownClient_Tag(const CUpDownClient* client, EC_DETAI //AddTag(CECTag(EC_TAG_CLIENT_XFER_TIME, client->GetUpStartTimeDelay()), valuemap); //AddTag(CECTag(EC_TAG_CLIENT_QUEUE_TIME, (uint64)(::GetTickCount() - client->GetWaitStartTime())), valuemap); //AddTag(CECTag(EC_TAG_CLIENT_LAST_TIME, (uint64)(::GetTickCount() - client->GetLastUpRequest())), valuemap); - AddTag(CECTag(EC_TAG_CLIENT_WAITING_POSITION, theApp->uploadqueue->GetWaitingPosition(client)), valuemap); + AddTag(CECTag(EC_TAG_CLIENT_WAITING_POSITION, client->GetUploadQueueWaitingPosition()), valuemap); AddTag(CECTag(EC_TAG_CLIENT_REMOTE_QUEUE_RANK, client->IsRemoteQueueFull() ? (uint16)0xffff : client->GetRemoteQueueRank()), valuemap); AddTag(CECTag(EC_TAG_CLIENT_OLD_REMOTE_QUEUE_RANK, client->GetOldRemoteQueueRank()), valuemap); AddTag(CECTag(EC_TAG_CLIENT_ASKED_COUNT, client->GetAskedCount()), valuemap); diff --git a/src/UploadClient.cpp b/src/UploadClient.cpp index 73b8dac7..eb56422a 100644 --- a/src/UploadClient.cpp +++ b/src/UploadClient.cpp @@ -69,7 +69,7 @@ void CUpDownClient::SetUploadState(uint8 eNewState) } } -uint32 CUpDownClient::GetScore( bool sysvalue ) const // true: return zero for lowID client (unless it's connected) +uint32 CUpDownClient::CalculateScoreInternal() { //TODO: complete this (friends, uploadspeed, amuleuser etc etc) if (m_Username.IsEmpty()) { @@ -98,10 +98,6 @@ uint32 CUpDownClient::GetScore( bool sysvalue ) const // true: return zero for l if (IsBanned()) return 0; - if (sysvalue && HasLowID() && !IsConnected()){ - return 0; - } - // score applies only to waiting clients, not to downloading clients if (IsDownloading()) { return 0; @@ -724,11 +720,6 @@ void CUpDownClient::ClearUploadBlockRequests() DeleteContents(m_DoneBlocks_list); } -uint16 CUpDownClient::GetUploadQueueWaitingPosition() const -{ - return theApp->uploadqueue->GetWaitingPosition(this); -} - void CUpDownClient::SendRankingInfo(){ if (!ExtProtocolAvailable()) { return; diff --git a/src/UploadQueue.cpp b/src/UploadQueue.cpp index 0c86766b..5cf23891 100644 --- a/src/UploadQueue.cpp +++ b/src/UploadQueue.cpp @@ -63,79 +63,124 @@ CUploadQueue::CUploadQueue() { m_nLastStartUpload = 0; + m_lastSort = 0; lastupslotHighID = true; m_allowKicking = true; } -void CUploadQueue::AddUpNextClient(CUpDownClient* directadd) +CUpDownClient* CUploadQueue::SortGetBestClient(bool sortonly) { - CClientPtrList::iterator toadd = m_waitinglist.end(); - CClientPtrList::iterator toaddlow = m_waitinglist.end(); - - uint32_t bestscore = 0; - uint32_t bestlowscore = 0; - - CUpDownClient* newclient; - // select next client or use given client - if (!directadd) { - // Track if we purged any clients from the queue, as to only send one notify in total - bool purged = false; - - CClientPtrList::iterator it = m_waitinglist.begin(); - for (; it != m_waitinglist.end(); ) { - CClientPtrList::iterator tmp_it = it++; - CUpDownClient* cur_client = *tmp_it; - - // clear dead clients - if ( (::GetTickCount() - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME) || !theApp->sharedfiles->GetFileByID(cur_client->GetUploadFileID()) ) { - purged = true; - cur_client->ClearWaitStartTime(); - RemoveFromWaitingQueue(tmp_it); - if (!cur_client->GetSocket()) { - if(cur_client->Disconnected(wxT("AddUpNextClient - purged"))) { - cur_client->Safe_Delete(); - cur_client = NULL; - } + CUpDownClient* newclient = NULL; + // Track if we purged any clients from the queue, as to only send one notify in total + bool purged = false; + uint32 tick = GetTickCount(); + m_lastSort = tick; + CClientPtrList::iterator it = m_waitinglist.begin(); + for (; it != m_waitinglist.end(); ) { + CClientPtrList::iterator it2 = it++; + CUpDownClient* cur_client = *it2; + + // clear dead clients + if (tick - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME + || !theApp->sharedfiles->GetFileByID(cur_client->GetUploadFileID())) { + purged = true; + cur_client->ClearWaitStartTime(); + RemoveFromWaitingQueue(it2); + if (!cur_client->GetSocket()) { + if (cur_client->Disconnected(wxT("AddUpNextClient - purged"))) { + cur_client->Safe_Delete(); + cur_client = NULL; } - continue; - } + } + continue; + } - if (cur_client->IsBanned() || IsSuspended(cur_client->GetUploadFileID())) { // Banned client or suspended upload ? - continue; + if (cur_client->IsBanned() || IsSuspended(cur_client->GetUploadFileID())) { // Banned client or suspended upload ? + cur_client->ClearScore(); + continue; + } + // finished clearing + + // Calculate score of current client + uint32 cur_score = cur_client->CalculateScore(); + // Check if it's better than that of a previous one, and move it up then. + CClientPtrList::iterator it1 = it2; + while (it1 != m_waitinglist.begin()) { + it1--; + if (cur_score > (*it1)->GetScore()) { + // swap them + *it2 = *it1; + *it1 = cur_client; + it2--; + } else { + // no need to check further since list is already sorted + break; } - // finished clearing - - uint32_t cur_score = cur_client->GetScore(true); - if (cur_score > bestscore) { - bestscore = cur_score; - toadd = tmp_it; + } + } + + // Second Pass: + // - calculate queue rank + // - find best high id client + // - mark all better low id clients as enabled for upload + uint16 rank = 1; + for (it = m_waitinglist.begin(); it != m_waitinglist.end(); ) { + CClientPtrList::iterator it2 = it++; + CUpDownClient* cur_client = *it2; + cur_client->SetUploadQueueWaitingPosition(rank++); + if (newclient) { + // There's a better high id client + cur_client->m_bAddNextConnect = false; + } else { + if (cur_client->HasLowID() && !cur_client->IsConnected()) { + // No better high id client, so start upload to this one once it connects + cur_client->m_bAddNextConnect = true; } else { - cur_score = cur_client->GetScore(); - if ((cur_score > bestlowscore) && !cur_client->m_bAddNextConnect){ - bestlowscore = cur_score; - toaddlow = tmp_it; + // We found a high id client (or a currently connected low id client) + newclient = cur_client; + cur_client->m_bAddNextConnect = false; + if (!sortonly) { + RemoveFromWaitingQueue(it2); + rank--; + lastupslotHighID = true; // VQB LowID alternate } } } + } - // Update the count on GUI if any clients were purged - if (purged) { - Notify_ShowQueueCount(m_waitinglist.size()); - } + // Update the count on GUI if any clients were purged + if (purged || (newclient && !sortonly)) { + Notify_ShowQueueCount(m_waitinglist.size()); + } + +#ifdef __DEBUG__ + AddDebugLogLineN(logLocalClient, CFormat(wxT("Current UL queue (%d):")) % (rank - 1)); + for (it = m_waitinglist.begin(); it != m_waitinglist.end(); it++) { + CUpDownClient* c = *it; + AddDebugLogLineN(logLocalClient, CFormat(wxT("%4d %7d %s %5d %s")) + % c->GetUploadQueueWaitingPosition() + % c->GetScore() + % (c->HasLowID() ? (c->IsConnected() ? wxT("LoCon") : wxT("LowId")) : wxT("High ")) + % c->ECID() + % c->GetUserName() + ); + } +#endif // __DEBUG__ + + return newclient; +} - if (bestlowscore > bestscore){ - (*toaddlow)->m_bAddNextConnect = true; - } - if (toadd == m_waitinglist.end()) { +void CUploadQueue::AddUpNextClient(CUpDownClient* directadd) +{ + CUpDownClient* newclient = NULL; + // select next client or use given client + if (!directadd) { + newclient = SortGetBestClient(false); + if (!newclient) { return; } - - newclient = *toadd; - lastupslotHighID = true; // VQB LowID alternate - RemoveFromWaitingQueue(toadd); - Notify_ShowQueueCount(m_waitinglist.size()); } else { // Check if requested file is suspended or not shared (maybe deleted recently) @@ -224,6 +269,11 @@ void CUploadQueue::Process() if (sentBytes) { theStats::AddSentBytes(sentBytes); } + + // Periodically resort queue if it doesn't happen anyway + if ((sint32) (tick - m_lastSort) > MIN2MS(2)) { + SortGetBestClient(true); + } } @@ -428,13 +478,15 @@ void CUploadQueue::AddClientToQueue(CUpDownClient* client) AddUpNextClient(client); m_nLastStartUpload = tick; } else { + // add to waiting queue m_waitinglist.push_back(client); + // and sort it to update queue ranks + SortGetBestClient(true); theStats::AddWaitingClient(); client->ClearAskedCount(); client->SetUploadState(US_ONUPLOADQUEUE); client->SendRankingInfo(); //Notify_QlistAddClient(client); - Notify_ShowQueueCount(m_waitinglist.size()); } } @@ -506,25 +558,6 @@ bool CUploadQueue::CheckForTimeOver(CUpDownClient* client) } -uint16 CUploadQueue::GetWaitingPosition(const CUpDownClient *client) const -{ - if ( !IsOnUploadQueue(client) ) { - return 0; - } - - uint16 rank = 1; - const uint32 myscore = client->GetScore(); - CClientPtrList::const_iterator it = m_waitinglist.begin(); - for (; it != m_waitinglist.end(); ++it) { - if ((*it)->GetScore() > myscore) { - rank++; - } - } - - return rank; -} - - /* * This function removes a file indicated by filehash from suspended_uploads_list. */ @@ -585,16 +618,24 @@ uint16 CUploadQueue::SuspendUpload(const CMD4Hash& filehash, bool terminate) bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient* client) { - CClientPtrList::iterator it = std::find(m_waitinglist.begin(), - m_waitinglist.end(), client); - - if (it != m_waitinglist.end()) { - RemoveFromWaitingQueue(it); - Notify_ShowQueueCount(m_waitinglist.size()); - return true; - } else { - return false; + CClientPtrList::iterator it = m_waitinglist.begin(); + + uint16 rank = 1; + while (it != m_waitinglist.end()) { + CClientPtrList::iterator it1 = it++; + if (*it1 == client) { + RemoveFromWaitingQueue(it1); + Notify_ShowQueueCount(m_waitinglist.size()); + // update ranks of remaining queue + while (it != m_waitinglist.end()) { + (*it)->SetUploadQueueWaitingPosition(rank++); + it++; + } + return true; + } + rank++; } + return false; } @@ -608,6 +649,8 @@ void CUploadQueue::RemoveFromWaitingQueue(CClientPtrList::iterator pos) } //Notify_QlistRemoveClient(todelete); todelete->SetUploadState(US_NONE); + todelete->ClearScore(); + todelete->SetUploadQueueWaitingPosition(0); } // File_checked_for_headers diff --git a/src/UploadQueue.h b/src/UploadQueue.h index 84884aa0..6773d433 100644 --- a/src/UploadQueue.h +++ b/src/UploadQueue.h @@ -49,7 +49,6 @@ public: CUpDownClient* GetWaitingClientByIP_UDP(uint32 dwIP, uint16 nUDPPort, bool bIgnorePortOnUniqueIP, bool* pbMultipleIPs = NULL); - uint16 GetWaitingPosition(const CUpDownClient *client) const; uint16 SuspendUpload(const CMD4Hash &, bool terminate); void ResumeUpload(const CMD4Hash &); @@ -58,12 +57,14 @@ private: uint16 GetMaxSlots() const; void AddUpNextClient(CUpDownClient* directadd = 0); bool IsSuspended(const CMD4Hash& hash) { return suspendedUploadsSet.find(hash) != suspendedUploadsSet.end(); } + CUpDownClient* SortGetBestClient(bool sortonly); CClientPtrList m_waitinglist; CClientPtrList m_uploadinglist; std::set suspendedUploadsSet; // set for suspended uploads uint32 m_nLastStartUpload; + uint32 m_lastSort; bool lastupslotHighID; // VQB lowID alternation bool m_allowKicking; }; diff --git a/src/amule-remote-gui.cpp b/src/amule-remote-gui.cpp index dcff6d43..838c6449 100644 --- a/src/amule-remote-gui.cpp +++ b/src/amule-remote-gui.cpp @@ -1126,8 +1126,6 @@ CUpDownClient::CUpDownClient(CEC_UpDownClient_Tag *tag) : CECID(tag->ID()) m_dwServerIP = 0; m_nServerPort = 0; - m_waitingPosition = 0; - m_score = 0; m_identState = IS_NOTAVAILABLE; m_obfuscationStatus = 0; diff --git a/src/updownclient.h b/src/updownclient.h index 81c4bcd8..e255884e 100644 --- a/src/updownclient.h +++ b/src/updownclient.h @@ -310,16 +310,15 @@ public: bool IsDownloading() const { return (m_nUploadState == US_UPLOADING); } + uint32 GetScore() const { return m_score; } + uint32 CalculateScore() { m_score = CalculateScoreInternal(); return m_score; } + void ClearScore() { m_score = 0; } + uint16 GetUploadQueueWaitingPosition() const { return m_waitingPosition; } + void SetUploadQueueWaitingPosition(uint16 pos) { m_waitingPosition = pos; } #ifndef CLIENT_GUI - uint32 GetScore(bool sysvalue = false) const; uint8 GetObfuscationStatus() const; - uint16 GetUploadQueueWaitingPosition() const; uint16 GetNextRequestedPart() const; #else - uint32 m_score; - uint32 GetScore(bool WXUNUSED(sysvalue) = false) const { return m_score; } - uint16 m_waitingPosition; - uint16 GetUploadQueueWaitingPosition() const { return m_waitingPosition; } EIdentState m_identState; uint8 GetObfuscationStatus() const { return m_obfuscationStatus; } uint8 m_obfuscationStatus; @@ -764,6 +763,7 @@ private: //upload void CreateStandardPackets(const unsigned char* data,uint32 togo, Requested_Block_Struct* currentblock); void CreatePackedPackets(const unsigned char* data,uint32 togo, Requested_Block_Struct* currentblock); + uint32 CalculateScoreInternal(); uint8 m_nUploadState; uint32 m_dwUploadTime; @@ -773,6 +773,8 @@ private: uint16 m_nUpPartCount; CMD4Hash m_requpfileid; uint16 m_nUpCompleteSourcesCount; + uint32 m_score; + uint16 m_waitingPosition; //! This vector contains the avilability of parts for the file that the user //! is requesting. When changing it, be sure to call CKnownFile::UpdatePartsFrequency -- 2.11.4.GIT