Upstream tarball 9613
[amule.git] / src / EMSocket.cpp
blobcd19e8767b697d44b0b1f7ae802a2a01ce2e702e
1 //
2 // This file is part of the aMule Project.
3 //
4 // Copyright (c) 2003-2008 aMule Team ( admin@amule.org / http://www.amule.org )
5 // Copyright (c) 2002-2008 Merkur ( devs@emule-project.net / http://www.emule-project.net )
6 //
7 // Any parts of this program derived from the xMule, lMule or eMule project,
8 // or contributed by third-party developers are copyrighted by their
9 // respective authors.
11 // This program is free software; you can redistribute it and/or modify
12 // it under the terms of the GNU General Public License as published by
13 // the Free Software Foundation; either version 2 of the License, or
14 // (at your option) any later version.
16 // This program is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 // GNU General Public License for more details.
20 //
21 // You should have received a copy of the GNU General Public License
22 // along with this program; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
27 #include "EMSocket.h" // Interface declarations.
29 #include <protocol/Protocols.h>
30 #include <protocol/ed2k/Constants.h>
32 #include "Packet.h" // Needed for CPacket
33 #include "amule.h"
34 #include "GetTickCount.h"
35 #include "UploadBandwidthThrottler.h"
36 #include "Logger.h"
37 #include "Preferences.h"
40 const uint32 MAX_SIZE = 2000000;
42 CEMSocket::CEMSocket(const CProxyData *ProxyData)
43 : CEncryptedStreamSocket(wxSOCKET_NOWAIT, ProxyData)
45 // If an interface has been specified,
46 // then we need to bind to it.
47 if (!thePrefs::GetAddress().IsEmpty()) {
48 amuleIPV4Address host;
50 // No need to warn here, in case of failure to
51 // assign the hostname. That is already done
52 // in amule.cpp when starting ...
53 if (host.Hostname(thePrefs::GetAddress())) {
54 SetLocal(host);
58 byConnected = ES_NOTCONNECTED;
59 m_uTimeOut = CONNECTION_TIMEOUT; // default timeout for ed2k sockets
61 // Download (pseudo) rate control
62 downloadLimit = 0;
63 downloadLimitEnable = false;
64 pendingOnReceive = false;
66 // Download partial header
67 pendingHeaderSize = 0;
69 // Download partial packet
70 pendingPacket = NULL;
71 pendingPacketSize = 0;
73 // Upload control
74 sendbuffer = NULL;
75 sendblen = 0;
76 sent = 0;
78 m_currentPacket_is_controlpacket = false;
79 m_currentPackageIsFromPartFile = false;
81 m_numberOfSentBytesCompleteFile = 0;
82 m_numberOfSentBytesPartFile = 0;
83 m_numberOfSentBytesControlPacket = 0;
85 lastCalledSend = ::GetTickCount();
86 lastSent = ::GetTickCount()-1000;
88 m_bAccelerateUpload = false;
90 m_actualPayloadSize = 0;
91 m_actualPayloadSizeSent = 0;
93 m_bBusy = false;
94 m_hasSent = false;
96 lastFinishedStandard = 0;
98 DoingDestroy = false;
102 CEMSocket::~CEMSocket()
104 // need to be locked here to know that the other methods
105 // won't be in the middle of things
107 wxMutexLocker lock(m_sendLocker);
108 byConnected = ES_DISCONNECTED;
111 // now that we know no other method will keep adding to the queue
112 // we can remove ourself from the queue
113 if (theApp->uploadBandwidthThrottler) {
114 theApp->uploadBandwidthThrottler->RemoveFromAllQueues(this);
117 ClearQueues();
119 SetNotify(0);
120 Notify(FALSE);
123 void CEMSocket::Destroy() {
124 if (!DoingDestroy) {
125 DoingDestroy = true;
126 wxSocketClient::Destroy();
131 void CEMSocket::ClearQueues()
133 wxMutexLocker lock(m_sendLocker);
136 CPacketQueue::iterator it = m_control_queue.begin();
137 for (; it != m_control_queue.end(); ++it) {
138 delete *it;
140 m_control_queue.clear();
144 CStdPacketQueue::iterator it = m_standard_queue.begin();
145 for (; it != m_standard_queue.end(); ++it) {
146 delete it->packet;
148 m_standard_queue.clear();
151 // Download (pseudo) rate control
152 downloadLimit = 0;
153 downloadLimitEnable = false;
154 pendingOnReceive = false;
156 // Download partial header
157 pendingHeaderSize = 0;
159 // Download partial packet
160 delete pendingPacket;
161 pendingPacket = NULL;
162 pendingPacketSize = 0;
164 // Upload control
165 delete[] sendbuffer;
166 sendbuffer = NULL;
167 sendblen = 0;
168 sent = 0;
172 void CEMSocket::OnClose(int WXUNUSED(nErrorCode))
174 // need to be locked here to know that the other methods
175 // won't be in the middle of things
177 wxMutexLocker lock(m_sendLocker);
178 byConnected = ES_DISCONNECTED;
181 // now that we know no other method will keep adding to the queue
182 // we can remove ourself from the queue
183 theApp->uploadBandwidthThrottler->RemoveFromAllQueues(this);
185 ClearQueues();
189 void CEMSocket::OnReceive(int nErrorCode)
191 // the 2 meg size was taken from another place
192 static byte GlobalReadBuffer[MAX_SIZE];
194 if(nErrorCode) {
195 uint32 error = LastError();
196 if (error != wxSOCKET_WOULDBLOCK) {
197 OnError(nErrorCode);
198 return;
202 // Check current connection state
203 if (byConnected == ES_DISCONNECTED) {
204 return;
205 } else {
206 byConnected = ES_CONNECTED; // ES_DISCONNECTED, ES_NOTCONNECTED, ES_CONNECTED
209 // CPU load improvement
210 if(downloadLimitEnable == true && downloadLimit == 0){
211 pendingOnReceive = true;
212 return;
215 // Remark: an overflow can not occur here
216 uint32 readMax = sizeof(GlobalReadBuffer) - pendingHeaderSize;
217 if((downloadLimitEnable == true) && (readMax > downloadLimit)) {
218 readMax = downloadLimit;
222 // We attempt to read up to 2 megs at a time (minus whatever is in our internal read buffer)
223 uint32 ret;
226 wxMutexLocker lock(m_sendLocker);
227 ret = Read(GlobalReadBuffer + pendingHeaderSize, readMax);
228 if (Error() || (ret == 0)) {
229 if (LastError() == wxSOCKET_WOULDBLOCK) {
230 pendingOnReceive = true;
232 return;
237 // Bandwidth control
238 if(downloadLimitEnable == true){
239 // Update limit
240 downloadLimit -= GetRealReceivedBytes();
243 // CPU load improvement
244 // Detect if the socket's buffer is empty (or the size did match...)
245 pendingOnReceive = (ret == readMax);
247 // Copy back the partial header into the global read buffer for processing
248 if(pendingHeaderSize > 0) {
249 memcpy(GlobalReadBuffer, pendingHeader, pendingHeaderSize);
250 ret += pendingHeaderSize;
251 pendingHeaderSize = 0;
254 byte* rptr = GlobalReadBuffer; // floating index initialized with begin of buffer
255 const byte* rend = GlobalReadBuffer + ret; // end of buffer
257 // Loop, processing packets until we run out of them
258 while((rend - rptr >= PACKET_HEADER_SIZE) ||
259 ((pendingPacket != NULL) && (rend - rptr > 0 ))){
261 // Two possibilities here:
263 // 1. There is no pending incoming packet
264 // 2. There is already a partial pending incoming packet
266 // It's important to remember that emule exchange two kinds of packet
267 // - The control packet
268 // - The data packet for the transport of the block
270 // The biggest part of the traffic is done with the data packets.
271 // The default size of one block is 10240 bytes (or less if compressed), but the
272 // maximal size for one packet on the network is 1300 bytes. It's the reason
273 // why most of the Blocks are splitted before to be sent.
275 // Conclusion: When the download limit is disabled, this method can be at least
276 // called 8 times (10240/1300) by the lower layer before a splitted packet is
277 // rebuild and transferred to the above layer for processing.
279 // The purpose of this algorithm is to limit the amount of data exchanged between buffers
281 if(pendingPacket == NULL){
282 pendingPacket = new CPacket(rptr); // Create new packet container.
283 rptr += 6; // Only the header is initialized so far
285 // Bugfix We still need to check for a valid protocol
286 // Remark: the default eMule v0.26b had removed this test......
287 switch (pendingPacket->GetProtocol()){
288 case OP_EDONKEYPROT:
289 case OP_PACKEDPROT:
290 case OP_EMULEPROT:
291 case OP_ED2KV2HEADER:
292 case OP_ED2KV2PACKEDPROT:
293 break;
294 default:
295 delete pendingPacket;
296 pendingPacket = NULL;
297 OnError(ERR_WRONGHEADER);
298 return;
301 // Security: Check for buffer overflow (2MB)
302 if(pendingPacket->GetPacketSize() > sizeof(GlobalReadBuffer)) {
303 delete pendingPacket;
304 pendingPacket = NULL;
305 OnError(ERR_TOOBIG);
306 return;
309 // Init data buffer
310 pendingPacket->AllocDataBuffer();
311 pendingPacketSize = 0;
314 // Bytes ready to be copied into packet's internal buffer
315 wxASSERT(rptr <= rend);
316 uint32 toCopy = ((pendingPacket->GetPacketSize() - pendingPacketSize) < (uint32)(rend - rptr)) ?
317 (pendingPacket->GetPacketSize() - pendingPacketSize) : (uint32)(rend - rptr);
319 // Copy Bytes from Global buffer to packet's internal buffer
320 pendingPacket->CopyToDataBuffer(pendingPacketSize, rptr, toCopy);
321 pendingPacketSize += toCopy;
322 rptr += toCopy;
324 // Check if packet is complet
325 wxASSERT(pendingPacket->GetPacketSize() >= pendingPacketSize);
326 if(pendingPacket->GetPacketSize() == pendingPacketSize) {
327 // Process packet
328 bool bPacketResult = PacketReceived(pendingPacket);
329 delete pendingPacket;
330 pendingPacket = NULL;
331 pendingPacketSize = 0;
333 if (!bPacketResult) {
334 return;
339 // Finally, if there is any data left over, save it for next time
340 wxASSERT(rptr <= rend);
341 wxASSERT(rend - rptr < PACKET_HEADER_SIZE);
342 if(rptr != rend) {
343 // Keep the partial head
344 pendingHeaderSize = rend - rptr;
345 memcpy(pendingHeader, rptr, pendingHeaderSize);
350 void CEMSocket::SetDownloadLimit(uint32 limit)
352 downloadLimit = limit;
353 downloadLimitEnable = true;
355 // CPU load improvement
356 if(limit > 0 && pendingOnReceive == true){
357 OnReceive(0);
362 void CEMSocket::DisableDownloadLimit()
364 downloadLimitEnable = false;
366 // CPU load improvement
367 if (pendingOnReceive == true){
368 OnReceive(0);
374 * Queues up the packet to be sent. Another thread will actually send the packet.
376 * If the packet is not a control packet, and if the socket decides that its queue is
377 * full and forceAdd is false, then the socket is allowed to refuse to add the packet
378 * to its queue. It will then return false and it is up to the calling thread to try
379 * to call SendPacket for that packet again at a later time.
381 * @param packet address to the packet that should be added to the queue
383 * @param delpacket if true, the responsibility for deleting the packet after it has been sent
384 * has been transferred to this object. If false, don't delete the packet after it
385 * has been sent.
387 * @param controlpacket the packet is a controlpacket
389 * @param forceAdd this packet must be added to the queue, even if it is full. If this flag is true
390 * then the method can not refuse to add the packet, and therefore not return false.
392 * @return true if the packet was added to the queue, false otherwise
394 void CEMSocket::SendPacket(CPacket* packet, bool delpacket, bool controlpacket, uint32 actualPayloadSize)
396 //printf("* SendPacket called on socket %p\n", this);
397 wxMutexLocker lock(m_sendLocker);
399 if (byConnected == ES_DISCONNECTED) {
400 //printf("* Disconnected, drop packet\n");
401 if(delpacket) {
402 delete packet;
404 } else {
405 if (!delpacket){
406 packet = new CPacket(*packet);
409 if (controlpacket) {
410 //printf("* Adding a control packet\n");
411 m_control_queue.push_back(packet);
413 // queue up for controlpacket
414 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent());
415 } else {
416 //printf("* Adding a normal packet to the queue\n");
417 bool first = !((sendbuffer && !m_currentPacket_is_controlpacket) || !m_standard_queue.empty());
418 StandardPacketQueueEntry queueEntry = { actualPayloadSize, packet };
419 m_standard_queue.push_back(queueEntry);
421 // reset timeout for the first time
422 if (first) {
423 lastFinishedStandard = ::GetTickCount();
424 m_bAccelerateUpload = true; // Always accelerate first packet in a block
431 uint64 CEMSocket::GetSentBytesCompleteFileSinceLastCallAndReset()
433 wxMutexLocker lock( m_sendLocker );
435 uint64 sentBytes = m_numberOfSentBytesCompleteFile;
436 m_numberOfSentBytesCompleteFile = 0;
438 return sentBytes;
442 uint64 CEMSocket::GetSentBytesPartFileSinceLastCallAndReset()
444 wxMutexLocker lock( m_sendLocker );
446 uint64 sentBytes = m_numberOfSentBytesPartFile;
447 m_numberOfSentBytesPartFile = 0;
449 return sentBytes;
452 uint64 CEMSocket::GetSentBytesControlPacketSinceLastCallAndReset()
454 wxMutexLocker lock( m_sendLocker );
456 uint64 sentBytes = m_numberOfSentBytesControlPacket;
457 m_numberOfSentBytesControlPacket = 0;
459 return sentBytes;
462 uint64 CEMSocket::GetSentPayloadSinceLastCallAndReset()
464 wxMutexLocker lock( m_sendLocker );
466 uint64 sentBytes = m_actualPayloadSizeSent;
467 m_actualPayloadSizeSent = 0;
469 return sentBytes;
473 void CEMSocket::OnSend(int nErrorCode)
475 if (nErrorCode){
476 OnError(nErrorCode);
477 return;
480 CEncryptedStreamSocket::OnSend(0);
482 wxMutexLocker lock( m_sendLocker );
483 m_bBusy = false;
485 if (byConnected != ES_DISCONNECTED) {
486 byConnected = ES_CONNECTED;
488 if (m_currentPacket_is_controlpacket) {
489 // queue up for control packet
490 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent());
497 * Try to put queued up data on the socket.
499 * Control packets have higher priority, and will be sent first, if possible.
500 * Standard packets can be split up in several package containers. In that case
501 * all the parts of a split package must be sent in a row, without any control packet
502 * in between.
504 * @param maxNumberOfBytesToSend This is the maximum number of bytes that is allowed to be put on the socket
505 * this call. The actual number of sent bytes will be returned from the method.
507 * @param onlyAllowedToSendControlPacket This call we only try to put control packets on the sockets.
508 * If there's a standard packet "in the way", and we think that this socket
509 * is no longer an upload slot, then it is ok to send the standard packet to
510 * get it out of the way. But it is not allowed to pick a new standard packet
511 * from the queue during this call. Several split packets are counted as one
512 * standard packet though, so it is ok to finish them all off if necessary.
514 * @return the actual number of bytes that were put on the socket.
516 SocketSentBytes CEMSocket::Send(uint32 maxNumberOfBytesToSend, uint32 minFragSize, bool onlyAllowedToSendControlPacket)
518 wxMutexLocker lock(m_sendLocker);
520 //printf("* Attempt to send a packet on socket %p\n", this);
522 if (byConnected == ES_DISCONNECTED) {
523 //printf("* Disconnected socket %p\n", this);
524 SocketSentBytes returnVal = { false, 0, 0 };
525 return returnVal;
526 } else if (m_bBusy && onlyAllowedToSendControlPacket) {
527 //printf("* Busy socket %p\n", this);
528 SocketSentBytes returnVal = { true, 0, 0 };
529 return returnVal;
532 bool anErrorHasOccured = false;
533 uint32 sentStandardPacketBytesThisCall = 0;
534 uint32 sentControlPacketBytesThisCall = 0;
536 if(byConnected == ES_CONNECTED && IsEncryptionLayerReady() && !(m_bBusy && onlyAllowedToSendControlPacket)) {
538 //printf("* Internal attemptto send on %p\n", this);
540 if(minFragSize < 1) {
541 minFragSize = 1;
544 maxNumberOfBytesToSend = GetNextFragSize(maxNumberOfBytesToSend, minFragSize);
546 bool bWasLongTimeSinceSend = (::GetTickCount() - lastSent) > 1000;
548 lastCalledSend = ::GetTickCount();
551 while(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend && anErrorHasOccured == false && // don't send more than allowed. Also, there should have been no error in earlier loop
552 (!m_control_queue.empty() || !m_standard_queue.empty() || sendbuffer != NULL) && // there must exist something to send
553 (onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed
554 (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall > 0 && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0) ||
555 (sendbuffer == NULL && !m_control_queue.empty()) || // There's a control packet in queue, and we are not currently sending anything, so we will handle the control packet next
556 (sendbuffer != NULL && m_currentPacket_is_controlpacket == true) || // We are in the progress of sending a control packet. We are always allowed to send those
557 (sendbuffer != NULL && m_currentPacket_is_controlpacket == false && bWasLongTimeSinceSend && !m_control_queue.empty() && m_standard_queue.empty() && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) // We have waited to long to clean the current packet (which may be a standard packet that is in the way). Proceed no matter what the value of onlyAllowedToSendControlPacket.
561 // If we are currently not in the progress of sending a packet, we will need to find the next one to send
562 if(sendbuffer == NULL) {
563 CPacket* curPacket = NULL;
564 if(!m_control_queue.empty()) {
565 // There's a control packet to send
566 m_currentPacket_is_controlpacket = true;
567 curPacket = m_control_queue.front();
568 m_control_queue.pop_front();
569 } else if(!m_standard_queue.empty() /*&& onlyAllowedToSendControlPacket == false*/) {
570 // There's a standard packet to send
571 m_currentPacket_is_controlpacket = false;
572 StandardPacketQueueEntry queueEntry = m_standard_queue.front();
573 m_standard_queue.pop_front();
574 curPacket = queueEntry.packet;
575 m_actualPayloadSize = queueEntry.actualPayloadSize;
577 // remember this for statistics purposes.
578 m_currentPackageIsFromPartFile = curPacket->IsFromPF();
579 } else {
580 // Just to be safe. Shouldn't happen?
581 // if we reach this point, then there's something wrong with the while condition above!
582 wxFAIL;
583 AddDebugLogLineM(true, logGeneral, wxT("EMSocket: Couldn't get a new packet! There's an error in the first while condition in EMSocket::Send()"));
585 SocketSentBytes returnVal = { true, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall };
586 return returnVal;
589 // We found a packet to send. Get the data to send from the
590 // package container and dispose of the container.
591 sendblen = curPacket->GetRealPacketSize();
592 sendbuffer = curPacket->DetachPacket();
593 sent = 0;
594 delete curPacket;
596 CryptPrepareSendData((byte*)sendbuffer, sendblen);
599 // At this point we've got a packet to send in sendbuffer. Try to send it. Loop until entire packet
600 // is sent, or until we reach maximum bytes to send for this call, or until we get an error.
601 // NOTE! If send would block (returns WOULDBLOCK), we will return from this method INSIDE this loop.
602 while (sent < sendblen &&
603 sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall < maxNumberOfBytesToSend &&
605 onlyAllowedToSendControlPacket == false || // this means we are allowed to send both types of packets, so proceed
606 m_currentPacket_is_controlpacket ||
607 (bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) ||
608 (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) % minFragSize != 0
609 ) &&
610 anErrorHasOccured == false) {
611 uint32 tosend = sendblen-sent;
612 if(!onlyAllowedToSendControlPacket || m_currentPacket_is_controlpacket) {
613 if (maxNumberOfBytesToSend >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
614 tosend = maxNumberOfBytesToSend-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
615 } else if(bWasLongTimeSinceSend && (sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall) < minFragSize) {
616 if (minFragSize >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
617 tosend = minFragSize-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
618 } else {
619 uint32 nextFragMaxBytesToSent = GetNextFragSize(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall, minFragSize);
620 if (nextFragMaxBytesToSent >= sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall && tosend > nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall))
621 tosend = nextFragMaxBytesToSent-(sentStandardPacketBytesThisCall + sentControlPacketBytesThisCall);
623 wxASSERT(tosend != 0 && tosend <= sendblen-sent);
625 //DWORD tempStartSendTick = ::GetTickCount();
627 lastSent = ::GetTickCount();
629 uint32 result = CEncryptedStreamSocket::Write(sendbuffer+sent,tosend);
631 if (Error()){
633 uint32 error = LastError();
634 if (error == wxSOCKET_WOULDBLOCK){
635 m_bBusy = true;
637 SocketSentBytes returnVal = { true, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall };
639 return returnVal; // Send() blocked, onsend will be called when ready to send again
640 } else{
641 // Send() gave an error
642 anErrorHasOccured = true;
644 } else {
645 // we managed to send some bytes. Perform bookkeeping.
646 m_bBusy = false;
647 m_hasSent = true;
649 sent += result;
651 // Log send bytes in correct class
652 if(m_currentPacket_is_controlpacket == false) {
653 sentStandardPacketBytesThisCall += result;
655 if(m_currentPackageIsFromPartFile == true) {
656 m_numberOfSentBytesPartFile += result;
657 } else {
658 m_numberOfSentBytesCompleteFile += result;
660 } else {
661 sentControlPacketBytesThisCall += result;
662 m_numberOfSentBytesControlPacket += result;
667 if (sent == sendblen){
668 // we are done sending the current packet. Delete it and set
669 // sendbuffer to NULL so a new packet can be fetched.
670 delete[] sendbuffer;
671 sendbuffer = NULL;
672 sendblen = 0;
674 if(!m_currentPacket_is_controlpacket) {
675 m_actualPayloadSizeSent += m_actualPayloadSize;
676 m_actualPayloadSize = 0;
678 lastFinishedStandard = ::GetTickCount(); // reset timeout
679 m_bAccelerateUpload = false; // Safe until told otherwise
682 sent = 0;
687 if(onlyAllowedToSendControlPacket && (!m_control_queue.empty() || (sendbuffer != NULL && m_currentPacket_is_controlpacket))) {
688 // enter control packet send queue
689 // we might enter control packet queue several times for the same package,
690 // but that costs very little overhead. Less overhead than trying to make sure
691 // that we only enter the queue once.
692 //printf("* Requeueing control packet on %p\n", this);
693 theApp->uploadBandwidthThrottler->QueueForSendingControlPacket(this, HasSent());
696 //printf("* Finishing send debug on %p\n",this);
698 SocketSentBytes returnVal = { !anErrorHasOccured, sentStandardPacketBytesThisCall, sentControlPacketBytesThisCall };
700 return returnVal;
704 uint32 CEMSocket::GetNextFragSize(uint32 current, uint32 minFragSize)
706 if(current % minFragSize == 0) {
707 return current;
708 } else {
709 return minFragSize*(current/minFragSize+1);
715 * Decides the (minimum) amount the socket needs to send to prevent timeout.
717 * @author SlugFiller
719 uint32 CEMSocket::GetNeededBytes()
721 uint32 sendgap;
723 uint64 timetotal;
724 uint64 timeleft;
725 uint64 sizeleft, sizetotal;
728 wxMutexLocker lock(m_sendLocker);
730 if (byConnected == ES_DISCONNECTED) {
731 return 0;
734 if (!((sendbuffer && !m_currentPacket_is_controlpacket) || !m_standard_queue.empty())) {
735 // No standard packet to send. Even if data needs to be sent to prevent timout, there's nothing to send.
736 return 0;
739 if (((sendbuffer && !m_currentPacket_is_controlpacket)) && !m_control_queue.empty())
740 m_bAccelerateUpload = true; // We might be trying to send a block request, accelerate packet
742 sendgap = ::GetTickCount() - lastCalledSend;
744 timetotal = m_bAccelerateUpload?45000:90000;
745 timeleft = ::GetTickCount() - lastFinishedStandard;
746 if (sendbuffer && !m_currentPacket_is_controlpacket) {
747 sizeleft = sendblen-sent;
748 sizetotal = sendblen;
749 } else {
750 sizeleft = sizetotal = m_standard_queue.front().packet->GetRealPacketSize();
754 if (timeleft >= timetotal)
755 return sizeleft;
756 timeleft = timetotal-timeleft;
757 if (timeleft*sizetotal >= timetotal*sizeleft) {
758 // don't use 'GetTimeOut' here in case the timeout value is high,
759 if (sendgap > SEC2MS(20))
760 return 1; // Don't let the socket itself time out - Might happen when switching from spread(non-focus) slot to trickle slot
761 return 0;
763 uint64 decval = timeleft*sizetotal/timetotal;
764 if (!decval)
765 return sizeleft;
766 if (decval < sizeleft)
767 return sizeleft-decval+1; // Round up
768 else
769 return 1;
774 * Removes all packets from the standard queue that don't have to be sent for the socket to be able to send a control packet.
776 * Before a socket can send a new packet, the current packet has to be finished. If the current packet is part of
777 * a split packet, then all parts of that split packet must be sent before the socket can send a control packet.
779 * This method keeps in standard queue only those packets that must be sent (rest of split packet), and removes everything
780 * after it. The method doesn't touch the control packet queue.
782 void CEMSocket::TruncateQueues()
784 wxMutexLocker lock(m_sendLocker);
786 // Clear the standard queue totally
787 // Please note! There may still be a standardpacket in the sendbuffer variable!
788 CStdPacketQueue::iterator it = m_standard_queue.begin();
789 for (; it != m_standard_queue.end(); ++it) {
790 delete it->packet;
793 m_standard_queue.clear();
797 uint32 CEMSocket::GetTimeOut() const
799 return m_uTimeOut;
803 void CEMSocket::SetTimeOut(uint32 uTimeOut)
805 m_uTimeOut = uTimeOut;
807 // File_checked_for_headers