2 // This file is part of the aMule Project.
4 // Copyright (c) 2003-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2008 Merkur ( devs@emule-project.net / http://www.emule-project.net )
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
26 #include "UploadQueue.h" // Interface declarations
28 #include <protocol/Protocols.h>
29 #include <protocol/ed2k/Client2Client/TCP.h>
30 #include <common/Macros.h>
31 #include <common/Constants.h>
35 #include "Types.h" // Do_not_auto_remove (win32)
38 #include <winsock.h> // Do_not_auto_remove
40 #include <sys/types.h> // Do_not_auto_remove
41 #include <netinet/in.h> // Do_not_auto_remove
42 #include <arpa/inet.h> // Do_not_auto_remove
45 #include "ServerConnect.h" // Needed for CServerConnect
46 #include "KnownFile.h" // Needed for CKnownFile
47 #include "Packet.h" // Needed for CPacket
48 #include "ClientTCPSocket.h" // Needed for CClientTCPSocket
49 #include "SharedFileList.h" // Needed for CSharedFileList
50 #include "updownclient.h" // Needed for CUpDownClient
51 #include "amule.h" // Needed for theApp
52 #include "Preferences.h"
53 #include "ClientList.h"
54 #include "Statistics.h" // Needed for theStats
56 #include <common/Format.h>
57 #include "UploadBandwidthThrottler.h"
58 #include "GuiEvents.h" // Needed for Notify_*
61 //TODO rewrite the whole networkcode, use overlapped sockets
63 CUploadQueue::CUploadQueue()
65 m_nLastStartUpload
= 0;
66 lastupslotHighID
= true;
67 m_allowKicking
= true;
71 void CUploadQueue::AddUpNextClient(CUpDownClient
* directadd
)
73 CClientPtrList::iterator toadd
= m_waitinglist
.end();
74 CClientPtrList::iterator toaddlow
= m_waitinglist
.end();
76 uint32_t bestscore
= 0;
77 uint32_t bestlowscore
= 0;
79 CUpDownClient
* newclient
;
80 // select next client or use given client
82 // Track if we purged any clients from the queue, as to only send one notify in total
85 CClientPtrList::iterator it
= m_waitinglist
.begin();
86 for (; it
!= m_waitinglist
.end(); ) {
87 CClientPtrList::iterator tmp_it
= it
++;
88 CUpDownClient
* cur_client
= *tmp_it
;
91 if ( (::GetTickCount() - cur_client
->GetLastUpRequest() > MAX_PURGEQUEUETIME
) || !theApp
->sharedfiles
->GetFileByID(cur_client
->GetUploadFileID()) ) {
93 cur_client
->ClearWaitStartTime();
94 RemoveFromWaitingQueue(tmp_it
);
95 if (!cur_client
->GetSocket()) {
96 if(cur_client
->Disconnected(wxT("AddUpNextClient - purged"))) {
97 cur_client
->Safe_Delete();
104 suspendlist::iterator it2
= std::find( suspended_uploads_list
.begin(),
105 suspended_uploads_list
.end(),
106 cur_client
->GetUploadFileID() );
107 if (cur_client
->IsBanned() || it2
!= suspended_uploads_list
.end() ) { // Banned client or suspended upload ?
112 uint32_t cur_score
= cur_client
->GetScore(true);
113 if (cur_score
> bestscore
) {
114 bestscore
= cur_score
;
117 cur_score
= cur_client
->GetScore();
118 if ((cur_score
> bestlowscore
) && !cur_client
->m_bAddNextConnect
){
119 bestlowscore
= cur_score
;
125 // Update the count on GUI if any clients were purged
127 Notify_ShowQueueCount(m_waitinglist
.size());
130 if (bestlowscore
> bestscore
){
131 newclient
= *toaddlow
;
132 newclient
->m_bAddNextConnect
= true;
135 if (toadd
== m_waitinglist
.end()) {
140 lastupslotHighID
= true; // VQB LowID alternate
141 RemoveFromWaitingQueue(toadd
);
142 Notify_ShowQueueCount(m_waitinglist
.size());
144 //prevent another potential access of a suspended upload
146 suspendlist::iterator it
= std::find( suspended_uploads_list
.begin(),
147 suspended_uploads_list
.end(),
148 directadd
->GetUploadFileID() );
149 if ( it
!= suspended_uploads_list
.end() ) {
152 newclient
= directadd
;
156 if (IsDownloading(newclient
)) {
159 // tell the client that we are now ready to upload
160 if (!newclient
->IsConnected()) {
161 newclient
->SetUploadState(US_CONNECTING
);
162 if (!newclient
->TryToConnect(true)) {
166 CPacket
* packet
= new CPacket(OP_ACCEPTUPLOADREQ
, 0, OP_EDONKEYPROT
);
167 theStats::AddUpOverheadFileRequest(packet
->GetPacketSize());
168 AddDebugLogLineN( logLocalClient
, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + newclient
->GetFullIP() );
169 newclient
->SendPacket(packet
,true);
170 newclient
->SetUploadState(US_UPLOADING
);
172 newclient
->SetUpStartTime();
173 newclient
->ResetSessionUp();
175 theApp
->uploadBandwidthThrottler
->AddToStandardList(m_uploadinglist
.size(), newclient
->GetSocket());
176 m_uploadinglist
.push_back(newclient
);
177 theStats::AddUploadingClient();
180 CKnownFile
* reqfile
= (CKnownFile
*) newclient
->GetUploadFile();
182 reqfile
->statistic
.AddAccepted();
186 void CUploadQueue::Process()
188 // Check if someone's waiting, if there is a slot for him,
189 // or if we should try to free a slot for him
190 uint32 tick
= GetTickCount();
191 // Nobody waiting or upload started recently
192 // (Actually instead of "empty" it should check for "no HighID clients queued",
193 // but the cost for that outweights the benefit. As it is, a slot will be freed
194 // even if it can't be taken because all of the queue is LowID. But just one,
195 // and the kicked client will instantly get it back if he has HighID.)
196 if (m_waitinglist
.empty() || tick
- m_nLastStartUpload
< 1000) {
197 m_allowKicking
= false;
198 // Already a slot free, try to fill it
199 } else if (m_uploadinglist
.size() < GetMaxSlots()) {
200 m_allowKicking
= false;
201 m_nLastStartUpload
= tick
;
203 // All slots taken, try to free one
205 m_allowKicking
= true;
208 // The loop that feeds the upload slots with data.
209 CClientPtrList::iterator it
= m_uploadinglist
.begin();
210 while (it
!= m_uploadinglist
.end()) {
211 // Get the client. Note! Also updates pos as a side effect.
212 CUpDownClient
* cur_client
= *it
++;
214 // It seems chatting or friend slots can get stuck at times in upload.. This needs looked into..
215 if (!cur_client
->GetSocket()) {
216 RemoveFromUploadQueue(cur_client
);
217 if(cur_client
->Disconnected(_T("CUploadQueue::Process"))){
218 cur_client
->Safe_Delete();
221 cur_client
->SendBlockData();
225 // Save used bandwidth for speed calculations
226 uint64 sentBytes
= theApp
->uploadBandwidthThrottler
->GetNumberOfSentBytesSinceLastCallAndReset();
227 (void)theApp
->uploadBandwidthThrottler
->GetNumberOfSentBytesOverheadSinceLastCallAndReset();
231 theStats::AddSentBytes(sentBytes
);
236 uint16
CUploadQueue::GetMaxSlots() const
238 uint16 nMaxSlots
= 0;
239 float kBpsUpPerClient
= (float)thePrefs::GetSlotAllocation();
240 if (thePrefs::GetMaxUpload() == UNLIMITED
) {
241 float kBpsUp
= theStats::GetUploadRate() / 1024.0f
;
242 nMaxSlots
= (uint16
)(kBpsUp
/ kBpsUpPerClient
) + 2;
244 if (thePrefs::GetMaxUpload() >= 10) {
245 nMaxSlots
= (uint16
)floor((float)thePrefs::GetMaxUpload() / kBpsUpPerClient
+ 0.5);
246 // floor(x + 0.5) is a way of doing round(x) that works with gcc < 3 ...
247 if (nMaxSlots
< MIN_UP_CLIENTS_ALLOWED
) {
248 nMaxSlots
=MIN_UP_CLIENTS_ALLOWED
;
251 nMaxSlots
= MIN_UP_CLIENTS_ALLOWED
;
254 if (nMaxSlots
> MAX_UP_CLIENTS_ALLOWED
) {
255 nMaxSlots
= MAX_UP_CLIENTS_ALLOWED
;
261 CUploadQueue::~CUploadQueue()
263 wxASSERT(m_waitinglist
.empty());
264 wxASSERT(m_uploadinglist
.empty());
268 bool CUploadQueue::IsOnUploadQueue(const CUpDownClient
* client
) const
270 return std::find(m_waitinglist
.begin(), m_waitinglist
.end(), client
)
271 != m_waitinglist
.end();
275 bool CUploadQueue::IsDownloading(CUpDownClient
* client
) const
277 return std::find(m_uploadinglist
.begin(), m_uploadinglist
.end(), client
)
278 != m_uploadinglist
.end();
282 CUpDownClient
* CUploadQueue::GetWaitingClientByIP_UDP(uint32 dwIP
, uint16 nUDPPort
, bool bIgnorePortOnUniqueIP
, bool* pbMultipleIPs
)
284 CUpDownClient
* pMatchingIPClient
= NULL
;
288 CClientPtrList::iterator it
= m_waitinglist
.begin();
289 for (; it
!= m_waitinglist
.end(); ++it
) {
290 CUpDownClient
* cur_client
= *it
;
292 if ((dwIP
== cur_client
->GetIP()) && (nUDPPort
== cur_client
->GetUDPPort())) {
294 } else if ((dwIP
== cur_client
->GetIP()) && bIgnorePortOnUniqueIP
) {
295 pMatchingIPClient
= cur_client
;
301 *pbMultipleIPs
= cMatches
> 1;
304 if (pMatchingIPClient
&& cMatches
== 1) {
305 return pMatchingIPClient
;
312 void CUploadQueue::AddClientToQueue(CUpDownClient
* client
)
314 if (theApp
->serverconnect
->IsConnected() && theApp
->serverconnect
->IsLowID() && !theApp
->serverconnect
->IsLocalServer(client
->GetServerIP(),client
->GetServerPort()) && client
->GetDownloadState() == DS_NONE
&& !client
->IsFriend() && theStats::GetWaitingUserCount() > 50) {
315 // Well, all that issues finish in the same: don't allow to add to the queue
319 if ( client
->IsBanned() ) {
323 client
->AddAskedCount();
324 client
->SetLastUpRequest();
326 // Find all clients with the same user-hash
327 CClientList::SourceList found
= theApp
->clientlist
->GetClientsByHash( client
->GetUserHash() );
329 CClientList::SourceList::iterator it
= found
.begin();
330 while (it
!= found
.end()) {
331 CUpDownClient
* cur_client
= *it
++;
333 if ( IsOnUploadQueue( cur_client
) ) {
334 if ( cur_client
== client
) {
335 // This is where LowID clients get their upload slot assigned.
336 // They can't be contacted if they reach top of the queue, so they are just marked for uploading.
337 // When they reconnect next time AddClientToQueue() is called, and they get their slot
338 // through the connection they initiated.
339 // Since at that time no slot is free they get assigned an extra slot,
340 // so then the number of slots exceeds the configured number by one.
341 // To prevent a further increase no more LowID clients get a slot, until
342 // - a HighID client has got one (which happens only after two clients
343 // have been kicked so a slot is free again)
344 // - or there is a free slot, which means there is no HighID client on queue
345 if (client
->m_bAddNextConnect
) {
346 uint16 maxSlots
= GetMaxSlots();
347 if (lastupslotHighID
) {
350 if (m_uploadinglist
.size() < maxSlots
) {
351 client
->m_bAddNextConnect
= false;
352 RemoveFromWaitingQueue(client
);
353 AddUpNextClient(client
);
354 lastupslotHighID
= false; // LowID alternate
359 client
->SendRankingInfo();
360 Notify_SharedCtrlRefreshClient(client
, AVAILABLE_SOURCE
);
363 // Hash-clash, remove unidentified clients (possibly both)
365 if ( !cur_client
->IsIdentified() ) {
366 // Cur_client isn't identifed, remove it
367 theApp
->clientlist
->AddTrackClient( cur_client
);
369 RemoveFromWaitingQueue( cur_client
);
370 if ( !cur_client
->GetSocket() ) {
371 if (cur_client
->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
372 cur_client
->Safe_Delete();
377 if ( !client
->IsIdentified() ) {
378 // New client isn't identified, remove it
379 theApp
->clientlist
->AddTrackClient( client
);
381 if ( !client
->GetSocket() ) {
382 if ( client
->Disconnected( wxT("AddClientToQueue - same userhash") ) ) {
383 client
->Safe_Delete();
393 // Count the number of clients with the same IP-address
394 found
= theApp
->clientlist
->GetClientsByIP( client
->GetIP() );
397 for ( it
= found
.begin(); it
!= found
.end(); it
++ ) {
398 if ( ( *it
== client
) || IsOnUploadQueue( *it
) ) {
403 // We do not accept more than 3 clients from the same IP
406 } else if ( theApp
->clientlist
->GetClientsFromIP(client
->GetIP()) >= 3 ) {
411 CKnownFile
* reqfile
= (CKnownFile
*) client
->GetUploadFile();
413 reqfile
->statistic
.AddRequest();
416 if (client
->IsDownloading()) {
417 // he's already downloading and wants probably only another file
418 CPacket
* packet
= new CPacket(OP_ACCEPTUPLOADREQ
, 0, OP_EDONKEYPROT
);
419 theStats::AddUpOverheadFileRequest(packet
->GetPacketSize());
420 AddDebugLogLineN( logLocalClient
, wxT("Local Client: OP_ACCEPTUPLOADREQ to ") + client
->GetFullIP() );
421 client
->SendPacket(packet
,true);
425 // TODO find better ways to cap the list
426 if (m_waitinglist
.size() >= (thePrefs::GetQueueSize())) {
430 uint32 tick
= GetTickCount();
431 client
->ClearWaitStartTime();
432 // if possible start upload right away
433 if (m_waitinglist
.empty() && tick
- m_nLastStartUpload
>= 1000 && m_uploadinglist
.size() < GetMaxSlots()) {
434 AddUpNextClient(client
);
435 m_nLastStartUpload
= tick
;
437 m_waitinglist
.push_back(client
);
438 theStats::AddWaitingClient();
439 client
->ClearAskedCount();
440 client
->SetUploadState(US_ONUPLOADQUEUE
);
441 client
->SendRankingInfo();
442 //Notify_QlistAddClient(client);
443 Notify_ShowQueueCount(m_waitinglist
.size());
448 bool CUploadQueue::RemoveFromUploadQueue(CUpDownClient
* client
)
450 // Keep track of this client
451 theApp
->clientlist
->AddTrackClient(client
);
453 CClientPtrList::iterator it
= std::find(m_uploadinglist
.begin(),
454 m_uploadinglist
.end(), client
);
456 if (it
!= m_uploadinglist
.end()) {
457 m_uploadinglist
.erase(it
);
458 theStats::RemoveUploadingClient();
459 if( client
->GetTransferredUp() ) {
460 theStats::AddSuccessfulUpload();
461 theStats::AddUploadTime(client
->GetUpStartTimeDelay() / 1000);
463 theStats::AddFailedUpload();
465 client
->SetUploadState(US_NONE
);
466 client
->ClearUploadBlockRequests();
474 bool CUploadQueue::CheckForTimeOver(CUpDownClient
* client
)
476 // Don't kick anybody if there's no need to
477 if (!m_allowKicking
) {
480 // First, check if it is a VIP slot (friend or Release-Prio).
481 if (client
->GetFriendSlot()) {
482 return false; // never drop the friend
484 // Release-Prio and nobody on queue for it?
485 if (client
->GetUploadFile()->GetUpPriority() == PR_POWERSHARE
) {
486 // Keep it unless half of the UL slots are occupied with friends or Release uploads.
488 for (CClientPtrList::iterator it
= m_uploadinglist
.begin(); it
!= m_uploadinglist
.end(); ++it
) {
489 CUpDownClient
* cur_client
= *it
;
490 if (cur_client
->GetFriendSlot() || cur_client
->GetUploadFile()->GetUpPriority() == PR_POWERSHARE
) {
494 // allow if VIP uploads occupy at most half of the possible upload slots
495 if (vips
<= GetMaxSlots() / 2) {
498 // Otherwise normal rules apply.
502 // "Transfer full chunks": drop client after 10 MB upload, or after an hour.
503 // (so average UL speed should at least be 2.84 kB/s)
504 // We don't track what he is downloading, but if it's all from one chunk he gets it.
505 if (client
->GetUpStartTimeDelay() > 3600000 // time: 1h
506 || client
->GetSessionUp() > 10485760) { // data: 10MB
507 m_allowKicking
= false; // kick max one client per cycle
515 uint16
CUploadQueue::GetWaitingPosition(const CUpDownClient
*client
) const
517 if ( !IsOnUploadQueue(client
) ) {
522 const uint32 myscore
= client
->GetScore();
523 CClientPtrList::const_iterator it
= m_waitinglist
.begin();
524 for (; it
!= m_waitinglist
.end(); ++it
) {
525 if ((*it
)->GetScore() > myscore
) {
535 * This function removes a file indicated by filehash from suspended_uploads_list.
537 void CUploadQueue::ResumeUpload( const CMD4Hash
& filehash
)
539 //Find the position of the filehash in the list and remove it.
540 suspendlist::iterator it
= std::find( suspended_uploads_list
.begin(),
541 suspended_uploads_list
.end(),
543 if ( it
!= suspended_uploads_list
.end() )
544 suspended_uploads_list
.erase( it
);
546 AddLogLineN(CFormat( _("Resuming uploads of file: %s" ) )
547 % filehash
.Encode() );
551 * This function adds a file indicated by filehash to suspended_uploads_list
553 uint16
CUploadQueue::SuspendUpload( const CMD4Hash
& filehash
)
555 AddLogLineN(CFormat( _("Suspending upload of file: %s" ) )
556 % filehash
.Encode() );
559 //Append the filehash to the list.
560 suspended_uploads_list
.push_back(filehash
);
561 wxString base16hash
= filehash
.Encode();
563 CClientPtrList::iterator it
= m_uploadinglist
.begin();
564 while (it
!= m_uploadinglist
.end()) {
565 CUpDownClient
*potential
= *it
++;
566 //check if the client is uploading the file we need to suspend
567 if(potential
->GetUploadFileID() == filehash
) {
568 //remove the unlucky client from the upload queue and add to the waiting queue
569 RemoveFromUploadQueue(potential
);
571 m_waitinglist
.push_back(potential
);
572 theStats::AddWaitingClient();
573 potential
->SetUploadState(US_ONUPLOADQUEUE
);
574 potential
->SendRankingInfo();
575 Notify_SharedCtrlRefreshClient(potential
, AVAILABLE_SOURCE
);
576 Notify_ShowQueueCount(m_waitinglist
.size());
583 bool CUploadQueue::RemoveFromWaitingQueue(CUpDownClient
* client
)
585 CClientPtrList::iterator it
= std::find(m_waitinglist
.begin(),
586 m_waitinglist
.end(), client
);
588 if (it
!= m_waitinglist
.end()) {
589 RemoveFromWaitingQueue(it
);
590 Notify_ShowQueueCount(m_waitinglist
.size());
598 void CUploadQueue::RemoveFromWaitingQueue(CClientPtrList::iterator pos
)
600 CUpDownClient
* todelete
= *pos
;
601 m_waitinglist
.erase(pos
);
602 theStats::RemoveWaitingClient();
603 if( todelete
->IsBanned() ) {
606 //Notify_QlistRemoveClient(todelete);
607 todelete
->SetUploadState(US_NONE
);
610 // File_checked_for_headers