Updated Russian documentation by Morse.
[amule.git] / src / UploadQueue.cpp
blob1ad80700835a160405127d31f87d4d80bf12e1f8
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2003-2011 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2011 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
20 //
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadQueue.h" // Interface declarations
28 #include <protocol/Protocols.h>
29 #include <protocol/ed2k/Client2Client/TCP.h>
30 #include <common/Macros.h>
31 #include <common/Constants.h>
33 #include <cmath>
35 #include "Types.h" // Do_not_auto_remove (win32)
37 #ifdef __WXMSW__
38 #include <winsock.h> // Do_not_auto_remove
39 #else
40 #include <sys/types.h> // Do_not_auto_remove
41 #include <netinet/in.h> // Do_not_auto_remove
42 #include <arpa/inet.h> // Do_not_auto_remove
43 #endif
45 #include "ServerConnect.h" // Needed for CServerConnect
46 #include "KnownFile.h" // Needed for CKnownFile
47 #include "Packet.h" // Needed for CPacket
48 #include "ClientTCPSocket.h" // Needed for CClientTCPSocket
49 #include "SharedFileList.h" // Needed for CSharedFileList
50 #include "updownclient.h" // Needed for CUpDownClient
51 #include "amule.h" // Needed for theApp
52 #include "Preferences.h"
53 #include "ClientList.h"
54 #include "Statistics.h" // Needed for theStats
55 #include "Logger.h"
56 #include <common/Format.h>
57 #include "UploadBandwidthThrottler.h"
58 #include "GuiEvents.h" // Needed for Notify_*
59 #include "ListenSocket.h"
62 //TODO rewrite the whole networkcode, use overlapped sockets
64 CUploadQueue::CUploadQueue()
66 m_nLastStartUpload = 0;
67 m_lastSort = 0;
68 lastupslotHighID = true;
69 m_allowKicking = true;
70 m_allUploadingKnownFile = new CKnownFile;
74 CUpDownClient* CUploadQueue::SortGetBestClient(bool sortonly)
76 CUpDownClient* newclient = NULL;
77 uint32 tick = GetTickCount();
78 m_lastSort = tick;
79 CClientRefList::iterator it = m_waitinglist.begin();
80 for (; it != m_waitinglist.end(); ) {
81 CClientRefList::iterator it2 = it++;
82 CUpDownClient* cur_client = it2->GetClient();
84 // clear dead clients
85 if (tick - cur_client->GetLastUpRequest() > MAX_PURGEQUEUETIME
86 || !theApp->sharedfiles->GetFileByID(cur_client->GetUploadFileID())) {
87 cur_client->ClearWaitStartTime();
88 RemoveFromWaitingQueue(it2);
89 if (!cur_client->GetSocket()) {
90 if (cur_client->Disconnected(wxT("AddUpNextClient - purged"))) {
91 cur_client->Safe_Delete();
92 cur_client = NULL;
95 continue;
98 if (cur_client->IsBanned() || IsSuspended(cur_client->GetUploadFileID())) { // Banned client or suspended upload ?
99 cur_client->ClearScore();
100 continue;
102 // finished clearing
104 // Calculate score of current client
105 uint32 cur_score = cur_client->CalculateScore();
106 // Check if it's better than that of a previous one, and move it up then.
107 CClientRefList::iterator it1 = it2;
108 while (it1 != m_waitinglist.begin()) {
109 it1--;
110 if (cur_score > it1->GetClient()->GetScore()) {
111 // swap them
112 std::swap(*it2, *it1);
113 it2--;
114 } else {
115 // no need to check further since list is already sorted
116 break;
121 // Second Pass:
122 // - calculate queue rank
123 // - find best high id client
124 // - mark all better low id clients as enabled for upload
125 uint16 rank = 1;
126 for (it = m_waitinglist.begin(); it != m_waitinglist.end(); ) {
127 CClientRefList::iterator it2 = it++;
128 CUpDownClient* cur_client = it2->GetClient();
129 cur_client->SetUploadQueueWaitingPosition(rank++);
130 if (newclient) {
131 // There's a better high id client
132 cur_client->m_bAddNextConnect = false;
133 } else {
134 if (cur_client->HasLowID() && !cur_client->IsConnected()) {
135 // No better high id client, so start upload to this one once it connects
136 cur_client->m_bAddNextConnect = true;
137 } else {
138 // We found a high id client (or a currently connected low id client)
139 newclient = cur_client;
140 cur_client->m_bAddNextConnect = false;
141 if (!sortonly) {
142 RemoveFromWaitingQueue(it2);
143 rank--;
144 lastupslotHighID = true; // VQB LowID alternate
150 #ifdef __DEBUG__
151 AddDebugLogLineN(logLocalClient, CFormat(wxT("Current UL queue (%d):")) % (rank - 1));
152 for (it = m_waitinglist.begin(); it != m_waitinglist.end(); it++) {
153 CUpDownClient* c = it->GetClient();
154 AddDebugLogLineN(logLocalClient, CFormat(wxT("%4d %7d %s %5d %s"))
155 % c->GetUploadQueueWaitingPosition()
156 % c->GetScore()
157 % (c->HasLowID() ? (c->IsConnected() ? wxT("LoCon") : wxT("LowId")) : wxT("High "))
158 % c->ECID()
159 % c->GetUserName()
162 #endif // __DEBUG__
164 return newclient;
168 void CUploadQueue::AddUpNextClient(CUpDownClient* directadd)
170 CUpDownClient* newclient = NULL;
171 // select next client or use given client
172 if (!directadd) {
173 newclient = SortGetBestClient(false);
174 if (!newclient) {
175 return;
177 } else {
178 // Check if requested file is suspended or not shared (maybe deleted recently)
180 if (IsSuspended(directadd->GetUploadFileID())
181 || !theApp->sharedfiles->GetFileByID(directadd->GetUploadFileID())) {
182 return;
183 } else {
184 newclient = directadd;
188 if (IsDownloading(newclient)) {
189 return;
191 // tell the client that we are now ready to upload
192 if (!newclient->IsConnected()) {
193 newclient->SetUploadState(US_CONNECTING);
194 if (!newclient->TryToConnect(true)) {
195 return;
197 } else {
198 CPacket* packet = new CPacket(OP_ACCEPTUPLOADREQ, 0, OP_EDONKEYPROT);
199 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
200 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + newclient->GetFullIP() );
201 newclient->SendPacket(packet,true);
202 newclient->SetUploadState(US_UPLOADING);
204 newclient->SetUpStartTime();
205 newclient->ResetSessionUp();
207 theApp->uploadBandwidthThrottler->AddToStandardList(m_uploadinglist.size(), newclient->GetSocket());
208 m_uploadinglist.push_back(CCLIENTREF(newclient, wxT("CUploadQueue::AddUpNextClient")));
209 m_allUploadingKnownFile->AddUploadingClient(newclient);
210 theStats::AddUploadingClient();
212 // Statistic
213 CKnownFile* reqfile = (CKnownFile*) newclient->GetUploadFile();
214 if (reqfile) {
215 reqfile->statistic.AddAccepted();
218 Notify_SharedCtrlRefreshClient(newclient->ECID(), AVAILABLE_SOURCE);
221 void CUploadQueue::Process()
223 // Check if someone's waiting, if there is a slot for him,
224 // or if we should try to free a slot for him
225 uint32 tick = GetTickCount();
226 // Nobody waiting or upload started recently
227 // (Actually instead of "empty" it should check for "no HighID clients queued",
228 // but the cost for that outweights the benefit. As it is, a slot will be freed
229 // even if it can't be taken because all of the queue is LowID. But just one,
230 // and the kicked client will instantly get it back if he has HighID.)
231 // Also, if we are running out of sockets, don't add new clients, but also don't kick existing ones,
232 // or uploading will cease right away.
233 if (m_waitinglist.empty() || tick - m_nLastStartUpload < 1000
234 || theApp->listensocket->TooManySockets()) {
235 m_allowKicking = false;
236 // Already a slot free, try to fill it
237 } else if (m_uploadinglist.size() < GetMaxSlots()) {
238 m_allowKicking = false;
239 m_nLastStartUpload = tick;
240 AddUpNextClient();
241 // All slots taken, try to free one
242 } else {
243 m_allowKicking = true;
246 // The loop that feeds the upload slots with data.
247 CClientRefList::iterator it = m_uploadinglist.begin();
248 while (it != m_uploadinglist.end()) {
249 // Get the client. Note! Also updates pos as a side effect.
250 CUpDownClient* cur_client = it++->GetClient();
252 // It seems chatting or friend slots can get stuck at times in upload.. This needs looked into..
253 if (!cur_client->GetSocket()) {
254 RemoveFromUploadQueue(cur_client);
255 if(cur_client->Disconnected(_T("CUploadQueue::Process"))){
256 cur_client->Safe_Delete();
258 } else {
259 cur_client->SendBlockData();
263 // Save used bandwidth for speed calculations
264 uint64 sentBytes = theApp->uploadBandwidthThrottler->GetNumberOfSentBytesSinceLastCallAndReset();
265 (void)theApp->uploadBandwidthThrottler->GetNumberOfSentBytesOverheadSinceLastCallAndReset();
267 // Update statistics
268 if (sentBytes) {
269 theStats::AddSentBytes(sentBytes);
272 // Periodically resort queue if it doesn't happen anyway
273 if ((sint32) (tick - m_lastSort) > MIN2MS(2)) {
274 SortGetBestClient(true);
279 uint16 CUploadQueue::GetMaxSlots() const
281 uint16 nMaxSlots = 0;
282 float kBpsUpPerClient = (float)thePrefs::GetSlotAllocation();
283 if (thePrefs::GetMaxUpload() == UNLIMITED) {
284 float kBpsUp = theStats::GetUploadRate() / 1024.0f;
285 nMaxSlots = (uint16)(kBpsUp / kBpsUpPerClient) + 2;
286 } else {
287 if (thePrefs::GetMaxUpload() >= 10) {
288 nMaxSlots = (uint16)floor((float)thePrefs::GetMaxUpload() / kBpsUpPerClient + 0.5);
289 // floor(x + 0.5) is a way of doing round(x) that works with gcc < 3 ...
290 if (nMaxSlots < MIN_UP_CLIENTS_ALLOWED) {
291 nMaxSlots=MIN_UP_CLIENTS_ALLOWED;
293 } else {
294 nMaxSlots = MIN_UP_CLIENTS_ALLOWED;
297 if (nMaxSlots > MAX_UP_CLIENTS_ALLOWED) {
298 nMaxSlots = MAX_UP_CLIENTS_ALLOWED;
300 return nMaxSlots;
304 CUploadQueue::~CUploadQueue()
306 wxASSERT(m_waitinglist.empty());
307 wxASSERT(m_uploadinglist.empty());
308 delete m_allUploadingKnownFile;
312 bool CUploadQueue::IsOnUploadQueue(const CUpDownClient* client) const
314 for (CClientRefList::const_iterator it = m_waitinglist.begin(); it != m_waitinglist.end(); it++) {
315 if (it->GetClient() == client) {
316 return true;
319 return false;
323 bool CUploadQueue::IsDownloading(const CUpDownClient* client) const
325 for (CClientRefList::const_iterator it = m_uploadinglist.begin(); it != m_uploadinglist.end(); it++) {
326 if (it->GetClient() == client) {
327 return true;
330 return false;
334 CUpDownClient* CUploadQueue::GetWaitingClientByIP_UDP(uint32 dwIP, uint16 nUDPPort, bool bIgnorePortOnUniqueIP, bool* pbMultipleIPs)
336 CUpDownClient* pMatchingIPClient = NULL;
338 int cMatches = 0;
340 CClientRefList::iterator it = m_waitinglist.begin();
341 for (; it != m_waitinglist.end(); ++it) {
342 CUpDownClient* cur_client = it->GetClient();
344 if ((dwIP == cur_client->GetIP()) && (nUDPPort == cur_client->GetUDPPort())) {
345 return cur_client;
346 } else if ((dwIP == cur_client->GetIP()) && bIgnorePortOnUniqueIP) {
347 pMatchingIPClient = cur_client;
348 cMatches++;
352 if (pbMultipleIPs) {
353 *pbMultipleIPs = cMatches > 1;
356 if (pMatchingIPClient && cMatches == 1) {
357 return pMatchingIPClient;
358 } else {
359 return NULL;
364 void CUploadQueue::AddClientToQueue(CUpDownClient* client)
366 if (theApp->serverconnect->IsConnected() && theApp->serverconnect->IsLowID() && !theApp->serverconnect->IsLocalServer(client->GetServerIP(),client->GetServerPort()) && client->GetDownloadState() == DS_NONE && !client->IsFriend() && theStats::GetWaitingUserCount() > 50) {
367 // Well, all that issues finish in the same: don't allow to add to the queue
368 return;
371 if ( client->IsBanned() ) {
372 return;
375 client->AddAskedCount();
376 client->SetLastUpRequest();
378 // Find all clients with the same user-hash
379 CClientList::SourceList found = theApp->clientlist->GetClientsByHash( client->GetUserHash() );
381 CClientList::SourceList::iterator it = found.begin();
382 while (it != found.end()) {
383 CUpDownClient* cur_client = it++->GetClient();
385 if ( IsOnUploadQueue( cur_client ) ) {
386 if ( cur_client == client ) {
387 // This is where LowID clients get their upload slot assigned.
388 // They can't be contacted if they reach top of the queue, so they are just marked for uploading.
389 // When they reconnect next time AddClientToQueue() is called, and they get their slot
390 // through the connection they initiated.
391 // Since at that time no slot is free they get assigned an extra slot,
392 // so then the number of slots exceeds the configured number by one.
393 // To prevent a further increase no more LowID clients get a slot, until
394 // - a HighID client has got one (which happens only after two clients
395 // have been kicked so a slot is free again)
396 // - or there is a free slot, which means there is no HighID client on queue
397 if (client->m_bAddNextConnect) {
398 uint16 maxSlots = GetMaxSlots();
399 if (lastupslotHighID) {
400 maxSlots++;
402 if (m_uploadinglist.size() < maxSlots) {
403 client->m_bAddNextConnect = false;
404 RemoveFromWaitingQueue(client);
405 AddUpNextClient(client);
406 lastupslotHighID = false; // LowID alternate
407 return;
411 client->SendRankingInfo();
412 Notify_SharedCtrlRefreshClient(client->ECID(), AVAILABLE_SOURCE);
413 return;
414 } else {
415 // Hash-clash, remove unidentified clients (possibly both)
417 if ( !cur_client->IsIdentified() ) {
418 // Cur_client isn't identifed, remove it
419 theApp->clientlist->AddTrackClient( cur_client );
421 RemoveFromWaitingQueue( cur_client );
422 if ( !cur_client->GetSocket() ) {
423 if (cur_client->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
424 cur_client->Safe_Delete();
429 if ( !client->IsIdentified() ) {
430 // New client isn't identified, remove it
431 theApp->clientlist->AddTrackClient( client );
433 if ( !client->GetSocket() ) {
434 if ( client->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
435 client->Safe_Delete();
439 return;
445 // Count the number of clients with the same IP-address
446 found = theApp->clientlist->GetClientsByIP( client->GetIP() );
448 int ipCount = 0;
449 for ( it = found.begin(); it != found.end(); it++ ) {
450 if ( ( it->GetClient() == client ) || IsOnUploadQueue( it->GetClient() ) ) {
451 ipCount++;
455 // We do not accept more than 3 clients from the same IP
456 if ( ipCount > 3 ) {
457 return;
458 } else if ( theApp->clientlist->GetClientsFromIP(client->GetIP()) >= 3 ) {
459 return;
462 // statistic values
463 CKnownFile* reqfile = (CKnownFile*) client->GetUploadFile();
464 if (reqfile) {
465 reqfile->statistic.AddRequest();
468 if (client->IsDownloading()) {
469 // he's already downloading and wants probably only another file
470 CPacket* packet = new CPacket(OP_ACCEPTUPLOADREQ, 0, OP_EDONKEYPROT);
471 theStats::AddUpOverheadFileRequest(packet->GetPacketSize());
472 AddDebugLogLineN( logLocalClient, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + client->GetFullIP() );
473 client->SendPacket(packet,true);
474 return;
477 // TODO find better ways to cap the list
478 if (m_waitinglist.size() >= (thePrefs::GetQueueSize())) {
479 return;
482 uint32 tick = GetTickCount();
483 client->ClearWaitStartTime();
484 // if possible start upload right away
485 if (m_waitinglist.empty() && tick - m_nLastStartUpload >= 1000
486 && m_uploadinglist.size() < GetMaxSlots()
487 && !theApp->listensocket->TooManySockets()) {
488 AddUpNextClient(client);
489 m_nLastStartUpload = tick;
490 } else {
491 // add to waiting queue
492 m_waitinglist.push_back(CCLIENTREF(client, wxT("CUploadQueue::AddClientToQueue m_waitinglist.push_back")));
493 // and sort it to update queue ranks
494 SortGetBestClient(true);
495 theStats::AddWaitingClient();
496 client->ClearAskedCount();
497 client->SetUploadState(US_ONUPLOADQUEUE);
498 client->SendRankingInfo();
499 //Notify_QlistAddClient(client);
504 bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient* client)
506 // Keep track of this client
507 theApp->clientlist->AddTrackClient(client);
509 CClientRefList::iterator it = std::find(m_uploadinglist.begin(),
510 m_uploadinglist.end(), CCLIENTREF(client, wxEmptyString));
512 if (it != m_uploadinglist.end()) {
513 m_uploadinglist.erase(it);
514 m_allUploadingKnownFile->RemoveUploadingClient(client);
515 theStats::RemoveUploadingClient();
516 if( client->GetTransferredUp() ) {
517 theStats::AddSuccessfulUpload();
518 theStats::AddUploadTime(client->GetUpStartTimeDelay() / 1000);
519 } else {
520 theStats::AddFailedUpload();
522 client->SetUploadState(US_NONE);
523 client->ClearUploadBlockRequests();
524 return true;
527 return false;
531 bool CUploadQueue::CheckForTimeOver(CUpDownClient* client)
533 // Don't kick anybody if there's no need to
534 if (!m_allowKicking) {
535 return false;
537 // First, check if it is a VIP slot (friend or Release-Prio).
538 if (client->GetFriendSlot()) {
539 return false; // never drop the friend
541 // Release-Prio and nobody on queue for it?
542 if (client->GetUploadFile()->GetUpPriority() == PR_POWERSHARE) {
543 // Keep it unless half of the UL slots are occupied with friends or Release uploads.
544 uint16 vips = 0;
545 for (CClientRefList::iterator it = m_uploadinglist.begin(); it != m_uploadinglist.end(); ++it) {
546 CUpDownClient* cur_client = it->GetClient();
547 if (cur_client->GetFriendSlot() || cur_client->GetUploadFile()->GetUpPriority() == PR_POWERSHARE) {
548 vips++;
551 // allow if VIP uploads occupy at most half of the possible upload slots
552 if (vips <= GetMaxSlots() / 2) {
553 return false;
555 // Otherwise normal rules apply.
558 // Ordinary slots
559 // "Transfer full chunks": drop client after 10 MB upload, or after an hour.
560 // (so average UL speed should at least be 2.84 kB/s)
561 // We don't track what he is downloading, but if it's all from one chunk he gets it.
562 if (client->GetUpStartTimeDelay() > 3600000 // time: 1h
563 || client->GetSessionUp() > 10485760) { // data: 10MB
564 m_allowKicking = false; // kick max one client per cycle
565 return true;
568 return false;
573 * This function removes a file indicated by filehash from suspended_uploads_list.
575 void CUploadQueue::ResumeUpload( const CMD4Hash& filehash )
577 suspendedUploadsSet.erase(filehash);
578 AddLogLineN(CFormat( _("Resuming uploads of file: %s" ) )
579 % filehash.Encode() );
583 * This function stops upload of a file indicated by filehash.
585 * a) teminate == false:
586 * File is suspended while a download completes. Then it is resumed after completion,
587 * so it makes sense to keep the client. Such files are kept in suspendedUploadsSet.
588 * b) teminate == true:
589 * File is deleted. Then the client is not added to the waiting list.
590 * Waiting clients are swept out with next run of AddUpNextClient,
591 * because their file is not shared anymore.
593 uint16 CUploadQueue::SuspendUpload(const CMD4Hash& filehash, bool terminate)
595 AddLogLineN(CFormat( _("Suspending upload of file: %s" ) )
596 % filehash.Encode() );
597 uint16 removed = 0;
599 //Append the filehash to the list.
600 if (!terminate) {
601 suspendedUploadsSet.insert(filehash);
604 CClientRefList::iterator it = m_uploadinglist.begin();
605 while (it != m_uploadinglist.end()) {
606 CUpDownClient *potential = it++->GetClient();
607 //check if the client is uploading the file we need to suspend
608 if (potential->GetUploadFileID() == filehash) {
609 // remove the unlucky client from the upload queue
610 RemoveFromUploadQueue(potential);
611 // if suspend isn't permanent add it to the waiting queue
612 if (terminate) {
613 potential->SetUploadState(US_NONE);
614 } else {
615 m_waitinglist.push_back(CCLIENTREF(potential, wxT("CUploadQueue::SuspendUpload")));
616 theStats::AddWaitingClient();
617 potential->SetUploadState(US_ONUPLOADQUEUE);
618 potential->SendRankingInfo();
619 Notify_SharedCtrlRefreshClient(potential->ECID(), AVAILABLE_SOURCE);
621 removed++;
624 return removed;
627 bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient* client)
629 CClientRefList::iterator it = m_waitinglist.begin();
631 uint16 rank = 1;
632 while (it != m_waitinglist.end()) {
633 CClientRefList::iterator it1 = it++;
634 if (it1->GetClient() == client) {
635 RemoveFromWaitingQueue(it1);
636 // update ranks of remaining queue
637 while (it != m_waitinglist.end()) {
638 it->GetClient()->SetUploadQueueWaitingPosition(rank++);
639 it++;
641 return true;
643 rank++;
645 return false;
649 void CUploadQueue::RemoveFromWaitingQueue(CClientRefList::iterator pos)
651 CUpDownClient* todelete = pos->GetClient();
652 m_waitinglist.erase(pos);
653 theStats::RemoveWaitingClient();
654 if( todelete->IsBanned() ) {
655 todelete->UnBan();
657 //Notify_QlistRemoveClient(todelete);
658 todelete->SetUploadState(US_NONE);
659 todelete->ClearScore();
660 todelete->SetUploadQueueWaitingPosition(0);
663 // File_checked_for_headers