2 // This file is part of the aMule Project.
4 // Copyright (c) 2003-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2008 Merkur ( devs@emule-project.net / http://www.emule-project.net )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadQueue.h" // Interface declarations
28 #include <protocol/Protocols.h>
29 #include <protocol/ed2k/Client2Client/TCP.h>
30 #include <common/Macros.h>
31 #include <common/Constants.h>
35 #include "Types.h" // Do_not_auto_remove (win32)
38 #include <winsock.h> // Do_not_auto_remove
40 #include <sys/types.h> // Do_not_auto_remove
41 #include <netinet/in.h> // Do_not_auto_remove
42 #include <arpa/inet.h> // Do_not_auto_remove
45 #include "ServerConnect.h" // Needed for CServerConnect
46 #include "KnownFile.h" // Needed for CKnownFile
47 #include "Packet.h" // Needed for CPacket
48 #include "ClientTCPSocket.h" // Needed for CClientTCPSocket
49 #include "SharedFileList.h" // Needed for CSharedFileList
50 #include "updownclient.h" // Needed for CUpDownClient
51 #include "amule.h" // Needed for theApp
52 #include "Preferences.h"
53 #include "ClientList.h"
54 #include "Statistics.h" // Needed for theStats
56 #include <common/Format.h>
57 #include "UploadBandwidthThrottler.h"
58 #include "GuiEvents.h" // Needed for Notify_*
61 //TODO rewrite the whole networkcode, use overlapped sockets
63 CUploadQueue::CUploadQueue()
65 m_nLastStartUpload
= 0;
67 lastupslotHighID
= true;
68 m_allowKicking
= true;
69 m_allUploadingKnownFile
= new CKnownFile
;
73 CUpDownClient
* CUploadQueue::SortGetBestClient(bool sortonly
)
75 CUpDownClient
* newclient
= NULL
;
76 // Track if we purged any clients from the queue, as to only send one notify in total
78 uint32 tick
= GetTickCount();
80 CClientRefList::iterator it
= m_waitinglist
.begin();
81 for (; it
!= m_waitinglist
.end(); ) {
82 CClientRefList::iterator it2
= it
++;
83 CUpDownClient
* cur_client
= it2
->GetClient();
86 if (tick
- cur_client
->GetLastUpRequest() > MAX_PURGEQUEUETIME
87 || !theApp
->sharedfiles
->GetFileByID(cur_client
->GetUploadFileID())) {
89 cur_client
->ClearWaitStartTime();
90 RemoveFromWaitingQueue(it2
);
91 if (!cur_client
->GetSocket()) {
92 if (cur_client
->Disconnected(wxT("AddUpNextClient - purged"))) {
93 cur_client
->Safe_Delete();
100 if (cur_client
->IsBanned() || IsSuspended(cur_client
->GetUploadFileID())) { // Banned client or suspended upload ?
101 cur_client
->ClearScore();
106 // Calculate score of current client
107 uint32 cur_score
= cur_client
->CalculateScore();
108 // Check if it's better than that of a previous one, and move it up then.
109 CClientRefList::iterator it1
= it2
;
110 while (it1
!= m_waitinglist
.begin()) {
112 if (cur_score
> it1
->GetClient()->GetScore()) {
114 std::swap(*it2
, *it1
);
117 // no need to check further since list is already sorted
124 // - calculate queue rank
125 // - find best high id client
126 // - mark all better low id clients as enabled for upload
128 for (it
= m_waitinglist
.begin(); it
!= m_waitinglist
.end(); ) {
129 CClientRefList::iterator it2
= it
++;
130 CUpDownClient
* cur_client
= it2
->GetClient();
131 cur_client
->SetUploadQueueWaitingPosition(rank
++);
133 // There's a better high id client
134 cur_client
->m_bAddNextConnect
= false;
136 if (cur_client
->HasLowID() && !cur_client
->IsConnected()) {
137 // No better high id client, so start upload to this one once it connects
138 cur_client
->m_bAddNextConnect
= true;
140 // We found a high id client (or a currently connected low id client)
141 newclient
= cur_client
;
142 cur_client
->m_bAddNextConnect
= false;
144 RemoveFromWaitingQueue(it2
);
146 lastupslotHighID
= true; // VQB LowID alternate
152 // Update the count on GUI if any clients were purged
153 if (purged
|| (newclient
&& !sortonly
)) {
154 Notify_ShowQueueCount(m_waitinglist
.size());
158 AddDebugLogLineN(logLocalClient
, CFormat(wxT("Current UL queue (%d):")) % (rank
- 1));
159 for (it
= m_waitinglist
.begin(); it
!= m_waitinglist
.end(); it
++) {
160 CUpDownClient
* c
= it
->GetClient();
161 AddDebugLogLineN(logLocalClient
, CFormat(wxT("%4d %7d %s %5d %s"))
162 % c
->GetUploadQueueWaitingPosition()
164 % (c
->HasLowID() ? (c
->IsConnected() ? wxT("LoCon") : wxT("LowId")) : wxT("High "))
175 void CUploadQueue::AddUpNextClient(CUpDownClient
* directadd
)
177 CUpDownClient
* newclient
= NULL
;
178 // select next client or use given client
180 newclient
= SortGetBestClient(false);
185 // Check if requested file is suspended or not shared (maybe deleted recently)
187 if (IsSuspended(directadd
->GetUploadFileID())
188 || !theApp
->sharedfiles
->GetFileByID(directadd
->GetUploadFileID())) {
191 newclient
= directadd
;
195 if (IsDownloading(newclient
)) {
198 // tell the client that we are now ready to upload
199 if (!newclient
->IsConnected()) {
200 newclient
->SetUploadState(US_CONNECTING
);
201 if (!newclient
->TryToConnect(true)) {
205 CPacket
* packet
= new CPacket(OP_ACCEPTUPLOADREQ
, 0, OP_EDONKEYPROT
);
206 theStats::AddUpOverheadFileRequest(packet
->GetPacketSize());
207 AddDebugLogLineN( logLocalClient
, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + newclient
->GetFullIP() );
208 newclient
->SendPacket(packet
,true);
209 newclient
->SetUploadState(US_UPLOADING
);
211 newclient
->SetUpStartTime();
212 newclient
->ResetSessionUp();
214 theApp
->uploadBandwidthThrottler
->AddToStandardList(m_uploadinglist
.size(), newclient
->GetSocket());
215 m_uploadinglist
.push_back(CCLIENTREF(newclient
, wxT("CUploadQueue::AddUpNextClient")));
216 m_allUploadingKnownFile
->AddUploadingClient(newclient
);
217 theStats::AddUploadingClient();
220 CKnownFile
* reqfile
= (CKnownFile
*) newclient
->GetUploadFile();
222 reqfile
->statistic
.AddAccepted();
225 Notify_SharedCtrlRefreshClient(newclient
->ECID(), AVAILABLE_SOURCE
);
228 void CUploadQueue::Process()
230 // Check if someone's waiting, if there is a slot for him,
231 // or if we should try to free a slot for him
232 uint32 tick
= GetTickCount();
233 // Nobody waiting or upload started recently
234 // (Actually instead of "empty" it should check for "no HighID clients queued",
235 // but the cost for that outweights the benefit. As it is, a slot will be freed
236 // even if it can't be taken because all of the queue is LowID. But just one,
237 // and the kicked client will instantly get it back if he has HighID.)
238 if (m_waitinglist
.empty() || tick
- m_nLastStartUpload
< 1000) {
239 m_allowKicking
= false;
240 // Already a slot free, try to fill it
241 } else if (m_uploadinglist
.size() < GetMaxSlots()) {
242 m_allowKicking
= false;
243 m_nLastStartUpload
= tick
;
245 // All slots taken, try to free one
247 m_allowKicking
= true;
250 // The loop that feeds the upload slots with data.
251 CClientRefList::iterator it
= m_uploadinglist
.begin();
252 while (it
!= m_uploadinglist
.end()) {
253 // Get the client. Note! Also updates pos as a side effect.
254 CUpDownClient
* cur_client
= it
++->GetClient();
256 // It seems chatting or friend slots can get stuck at times in upload.. This needs looked into..
257 if (!cur_client
->GetSocket()) {
258 RemoveFromUploadQueue(cur_client
);
259 if(cur_client
->Disconnected(_T("CUploadQueue::Process"))){
260 cur_client
->Safe_Delete();
263 cur_client
->SendBlockData();
267 // Save used bandwidth for speed calculations
268 uint64 sentBytes
= theApp
->uploadBandwidthThrottler
->GetNumberOfSentBytesSinceLastCallAndReset();
269 (void)theApp
->uploadBandwidthThrottler
->GetNumberOfSentBytesOverheadSinceLastCallAndReset();
273 theStats::AddSentBytes(sentBytes
);
276 // Periodically resort queue if it doesn't happen anyway
277 if ((sint32
) (tick
- m_lastSort
) > MIN2MS(2)) {
278 SortGetBestClient(true);
283 uint16
CUploadQueue::GetMaxSlots() const
285 uint16 nMaxSlots
= 0;
286 float kBpsUpPerClient
= (float)thePrefs::GetSlotAllocation();
287 if (thePrefs::GetMaxUpload() == UNLIMITED
) {
288 float kBpsUp
= theStats::GetUploadRate() / 1024.0f
;
289 nMaxSlots
= (uint16
)(kBpsUp
/ kBpsUpPerClient
) + 2;
291 if (thePrefs::GetMaxUpload() >= 10) {
292 nMaxSlots
= (uint16
)floor((float)thePrefs::GetMaxUpload() / kBpsUpPerClient
+ 0.5);
293 // floor(x + 0.5) is a way of doing round(x) that works with gcc < 3 ...
294 if (nMaxSlots
< MIN_UP_CLIENTS_ALLOWED
) {
295 nMaxSlots
=MIN_UP_CLIENTS_ALLOWED
;
298 nMaxSlots
= MIN_UP_CLIENTS_ALLOWED
;
301 if (nMaxSlots
> MAX_UP_CLIENTS_ALLOWED
) {
302 nMaxSlots
= MAX_UP_CLIENTS_ALLOWED
;
308 CUploadQueue::~CUploadQueue()
310 wxASSERT(m_waitinglist
.empty());
311 wxASSERT(m_uploadinglist
.empty());
312 delete m_allUploadingKnownFile
;
316 bool CUploadQueue::IsOnUploadQueue(const CUpDownClient
* client
) const
318 for (CClientRefList::const_iterator it
= m_waitinglist
.begin(); it
!= m_waitinglist
.end(); it
++) {
319 if (it
->GetClient() == client
) {
327 bool CUploadQueue::IsDownloading(const CUpDownClient
* client
) const
329 for (CClientRefList::const_iterator it
= m_uploadinglist
.begin(); it
!= m_uploadinglist
.end(); it
++) {
330 if (it
->GetClient() == client
) {
338 CUpDownClient
* CUploadQueue::GetWaitingClientByIP_UDP(uint32 dwIP
, uint16 nUDPPort
, bool bIgnorePortOnUniqueIP
, bool* pbMultipleIPs
)
340 CUpDownClient
* pMatchingIPClient
= NULL
;
344 CClientRefList::iterator it
= m_waitinglist
.begin();
345 for (; it
!= m_waitinglist
.end(); ++it
) {
346 CUpDownClient
* cur_client
= it
->GetClient();
348 if ((dwIP
== cur_client
->GetIP()) && (nUDPPort
== cur_client
->GetUDPPort())) {
350 } else if ((dwIP
== cur_client
->GetIP()) && bIgnorePortOnUniqueIP
) {
351 pMatchingIPClient
= cur_client
;
357 *pbMultipleIPs
= cMatches
> 1;
360 if (pMatchingIPClient
&& cMatches
== 1) {
361 return pMatchingIPClient
;
368 void CUploadQueue::AddClientToQueue(CUpDownClient
* client
)
370 if (theApp
->serverconnect
->IsConnected() && theApp
->serverconnect
->IsLowID() && !theApp
->serverconnect
->IsLocalServer(client
->GetServerIP(),client
->GetServerPort()) && client
->GetDownloadState() == DS_NONE
&& !client
->IsFriend() && theStats::GetWaitingUserCount() > 50) {
371 // Well, all that issues finish in the same: don't allow to add to the queue
375 if ( client
->IsBanned() ) {
379 client
->AddAskedCount();
380 client
->SetLastUpRequest();
382 // Find all clients with the same user-hash
383 CClientList::SourceList found
= theApp
->clientlist
->GetClientsByHash( client
->GetUserHash() );
385 CClientList::SourceList::iterator it
= found
.begin();
386 while (it
!= found
.end()) {
387 CUpDownClient
* cur_client
= it
++->GetClient();
389 if ( IsOnUploadQueue( cur_client
) ) {
390 if ( cur_client
== client
) {
391 // This is where LowID clients get their upload slot assigned.
392 // They can't be contacted if they reach top of the queue, so they are just marked for uploading.
393 // When they reconnect next time AddClientToQueue() is called, and they get their slot
394 // through the connection they initiated.
395 // Since at that time no slot is free they get assigned an extra slot,
396 // so then the number of slots exceeds the configured number by one.
397 // To prevent a further increase no more LowID clients get a slot, until
398 // - a HighID client has got one (which happens only after two clients
399 // have been kicked so a slot is free again)
400 // - or there is a free slot, which means there is no HighID client on queue
401 if (client
->m_bAddNextConnect
) {
402 uint16 maxSlots
= GetMaxSlots();
403 if (lastupslotHighID
) {
406 if (m_uploadinglist
.size() < maxSlots
) {
407 client
->m_bAddNextConnect
= false;
408 RemoveFromWaitingQueue(client
);
409 AddUpNextClient(client
);
410 lastupslotHighID
= false; // LowID alternate
415 client
->SendRankingInfo();
416 Notify_SharedCtrlRefreshClient(client
->ECID(), AVAILABLE_SOURCE
);
419 // Hash-clash, remove unidentified clients (possibly both)
421 if ( !cur_client
->IsIdentified() ) {
422 // Cur_client isn't identifed, remove it
423 theApp
->clientlist
->AddTrackClient( cur_client
);
425 RemoveFromWaitingQueue( cur_client
);
426 if ( !cur_client
->GetSocket() ) {
427 if (cur_client
->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
428 cur_client
->Safe_Delete();
433 if ( !client
->IsIdentified() ) {
434 // New client isn't identified, remove it
435 theApp
->clientlist
->AddTrackClient( client
);
437 if ( !client
->GetSocket() ) {
438 if ( client
->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
439 client
->Safe_Delete();
449 // Count the number of clients with the same IP-address
450 found
= theApp
->clientlist
->GetClientsByIP( client
->GetIP() );
453 for ( it
= found
.begin(); it
!= found
.end(); it
++ ) {
454 if ( ( it
->GetClient() == client
) || IsOnUploadQueue( it
->GetClient() ) ) {
459 // We do not accept more than 3 clients from the same IP
462 } else if ( theApp
->clientlist
->GetClientsFromIP(client
->GetIP()) >= 3 ) {
467 CKnownFile
* reqfile
= (CKnownFile
*) client
->GetUploadFile();
469 reqfile
->statistic
.AddRequest();
472 if (client
->IsDownloading()) {
473 // he's already downloading and wants probably only another file
474 CPacket
* packet
= new CPacket(OP_ACCEPTUPLOADREQ
, 0, OP_EDONKEYPROT
);
475 theStats::AddUpOverheadFileRequest(packet
->GetPacketSize());
476 AddDebugLogLineN( logLocalClient
, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + client
->GetFullIP() );
477 client
->SendPacket(packet
,true);
481 // TODO find better ways to cap the list
482 if (m_waitinglist
.size() >= (thePrefs::GetQueueSize())) {
486 uint32 tick
= GetTickCount();
487 client
->ClearWaitStartTime();
488 // if possible start upload right away
489 if (m_waitinglist
.empty() && tick
- m_nLastStartUpload
>= 1000 && m_uploadinglist
.size() < GetMaxSlots()) {
490 AddUpNextClient(client
);
491 m_nLastStartUpload
= tick
;
493 // add to waiting queue
494 m_waitinglist
.push_back(CCLIENTREF(client
, wxT("CUploadQueue::AddClientToQueue m_waitinglist.push_back")));
495 // and sort it to update queue ranks
496 SortGetBestClient(true);
497 theStats::AddWaitingClient();
498 client
->ClearAskedCount();
499 client
->SetUploadState(US_ONUPLOADQUEUE
);
500 client
->SendRankingInfo();
501 //Notify_QlistAddClient(client);
506 bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient
* client
)
508 // Keep track of this client
509 theApp
->clientlist
->AddTrackClient(client
);
511 CClientRefList::iterator it
= std::find(m_uploadinglist
.begin(),
512 m_uploadinglist
.end(), CCLIENTREF(client
, wxEmptyString
));
514 if (it
!= m_uploadinglist
.end()) {
515 m_uploadinglist
.erase(it
);
516 m_allUploadingKnownFile
->RemoveUploadingClient(client
);
517 theStats::RemoveUploadingClient();
518 if( client
->GetTransferredUp() ) {
519 theStats::AddSuccessfulUpload();
520 theStats::AddUploadTime(client
->GetUpStartTimeDelay() / 1000);
522 theStats::AddFailedUpload();
524 client
->SetUploadState(US_NONE
);
525 client
->ClearUploadBlockRequests();
533 bool CUploadQueue::CheckForTimeOver(CUpDownClient
* client
)
535 // Don't kick anybody if there's no need to
536 if (!m_allowKicking
) {
539 // First, check if it is a VIP slot (friend or Release-Prio).
540 if (client
->GetFriendSlot()) {
541 return false; // never drop the friend
543 // Release-Prio and nobody on queue for it?
544 if (client
->GetUploadFile()->GetUpPriority() == PR_POWERSHARE
) {
545 // Keep it unless half of the UL slots are occupied with friends or Release uploads.
547 for (CClientRefList::iterator it
= m_uploadinglist
.begin(); it
!= m_uploadinglist
.end(); ++it
) {
548 CUpDownClient
* cur_client
= it
->GetClient();
549 if (cur_client
->GetFriendSlot() || cur_client
->GetUploadFile()->GetUpPriority() == PR_POWERSHARE
) {
553 // allow if VIP uploads occupy at most half of the possible upload slots
554 if (vips
<= GetMaxSlots() / 2) {
557 // Otherwise normal rules apply.
561 // "Transfer full chunks": drop client after 10 MB upload, or after an hour.
562 // (so average UL speed should at least be 2.84 kB/s)
563 // We don't track what he is downloading, but if it's all from one chunk he gets it.
564 if (client
->GetUpStartTimeDelay() > 3600000 // time: 1h
565 || client
->GetSessionUp() > 10485760) { // data: 10MB
566 m_allowKicking
= false; // kick max one client per cycle
575 * This function removes a file indicated by filehash from suspended_uploads_list.
577 void CUploadQueue::ResumeUpload( const CMD4Hash
& filehash
)
579 suspendedUploadsSet
.erase(filehash
);
580 AddLogLineN(CFormat( _("Resuming uploads of file: %s" ) )
581 % filehash
.Encode() );
585 * This function stops upload of a file indicated by filehash.
587 * a) teminate == false:
588 * File is suspended while a download completes. Then it is resumed after completion,
589 * so it makes sense to keep the client. Such files are kept in suspendedUploadsSet.
590 * b) teminate == true:
591 * File is deleted. Then the client is not added to the waiting list.
592 * Waiting clients are swept out with next run of AddUpNextClient,
593 * because their file is not shared anymore.
595 uint16
CUploadQueue::SuspendUpload(const CMD4Hash
& filehash
, bool terminate
)
597 AddLogLineN(CFormat( _("Suspending upload of file: %s" ) )
598 % filehash
.Encode() );
601 //Append the filehash to the list.
603 suspendedUploadsSet
.insert(filehash
);
606 CClientRefList::iterator it
= m_uploadinglist
.begin();
607 while (it
!= m_uploadinglist
.end()) {
608 CUpDownClient
*potential
= it
++->GetClient();
609 //check if the client is uploading the file we need to suspend
610 if (potential
->GetUploadFileID() == filehash
) {
611 // remove the unlucky client from the upload queue
612 RemoveFromUploadQueue(potential
);
613 // if suspend isn't permanent add it to the waiting queue
615 potential
->SetUploadState(US_NONE
);
617 m_waitinglist
.push_back(CCLIENTREF(potential
, wxT("CUploadQueue::SuspendUpload")));
618 theStats::AddWaitingClient();
619 potential
->SetUploadState(US_ONUPLOADQUEUE
);
620 potential
->SendRankingInfo();
621 Notify_SharedCtrlRefreshClient(potential
->ECID(), AVAILABLE_SOURCE
);
627 Notify_ShowQueueCount(m_waitinglist
.size());
632 bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient
* client
)
634 CClientRefList::iterator it
= m_waitinglist
.begin();
637 while (it
!= m_waitinglist
.end()) {
638 CClientRefList::iterator it1
= it
++;
639 if (it1
->GetClient() == client
) {
640 RemoveFromWaitingQueue(it1
);
641 Notify_ShowQueueCount(m_waitinglist
.size());
642 // update ranks of remaining queue
643 while (it
!= m_waitinglist
.end()) {
644 it
->GetClient()->SetUploadQueueWaitingPosition(rank
++);
655 void CUploadQueue::RemoveFromWaitingQueue(CClientRefList::iterator pos
)
657 CUpDownClient
* todelete
= pos
->GetClient();
658 m_waitinglist
.erase(pos
);
659 theStats::RemoveWaitingClient();
660 if( todelete
->IsBanned() ) {
663 //Notify_QlistRemoveClient(todelete);
664 todelete
->SetUploadState(US_NONE
);
665 todelete
->ClearScore();
666 todelete
->SetUploadQueueWaitingPosition(0);
669 // File_checked_for_headers