1 /***************************************************************************
3 * Copyright (C) 2006 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
19 #include <tairon/core/log.h>
20 #include <tairon/core/thread.h>
21 #include <tairon/net/ireader.h>
22 #include <tairon/net/limiter.h>
23 #include <tairon/net/socket.h>
24 #include <tairon/net/swriter.h>
25 #include <tairon/net/timer.h>
27 #include "connection.h"
29 #include "core/bencode.h"
30 #include "core/bitfield.h"
31 #include "ratemeasurer.h"
33 #include "torrentclient.h"
34 #include "torrentmanager.h"
35 #include "torrentserver.h"
43 const unsigned int handshakeLength
= 48; // without peer id
44 const char *protocolName
= "BitTorrent protocol";
46 Tairon::Net::Limiter
*rlimiter
= 0;
47 Tairon::Net::Limiter
*wlimiter
= 0;
49 /* {{{ Connection::Connection(int) */
50 Connection::Connection(int fd
) : bitfield(0), choked(true), client(0), commandReader(0), deleteReader(true), interested(false), lastPieceTime(0), lengthReader(0), peerChoked(true), peerInterested(false), pieceReaders(0), pieceWriters(0), reader(0)
52 socket
= new Tairon::Net::Socket(fd
);
55 incomingRate
= new RateMeasurer(10);
56 outgoingRate
= new RateMeasurer(10);
60 /* {{{ Connection::Connection(const String &, uint16_t, const String &, TorrentClient *) */
61 Connection::Connection(const String
&ip
, uint16_t port
, const String
&pID
, TorrentClient
*c
) : bitfield(0), choked(true), client(c
), commandReader(0), deleteReader(true), interested(false), lastPieceTime(0), lengthReader(0), peerChoked(true), peerID(pID
), peerInterested(false), pieceReaders(0), pieceWriters(0), reader(0)
63 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
65 socket
= new Tairon::Net::Socket(Tairon::Net::Socket::IPv4
, Tairon::Net::Socket::Stream
);
66 socket
->connectedSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::socketConnected
));
67 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::connectingError
));
69 incomingRate
= new RateMeasurer(10);
70 outgoingRate
= new RateMeasurer(10);
72 socket
->connect(ip
, port
);
76 /* {{{ Connection::~Connection() */
77 Connection::~Connection()
87 for (std::list
<Tairon::Net::Writer
*>::const_iterator it
= commandQueue
.begin(); it
!= commandQueue
.end(); ++it
)
92 /* {{{ Connection::readBitField(Tairon::Net::Reader *) */
93 void Connection::readBitField(Tairon::Net::Reader
*)
95 DEBUG("bitfield read");
96 bitfield
= new Tairent::Core::BitField(reader
->getBuffer(), client
->getMetaInfo()["info"]["pieces"].asString().length() / 20);
98 reader
= lengthReader
;
101 client
->gotBitField(this);
105 /* {{{ Connection::readCancel(Tairon::Net::Reader *) */
106 void Connection::readCancel(Tairon::Net::Reader
*)
108 uint32_t index
= htonl(*(uint32_t *) reader
->getBuffer());
109 uint32_t start
= htonl(*(uint32_t *) (reader
->getBuffer() + 4));
110 uint32_t length
= htonl(*(uint32_t *) (reader
->getBuffer() + 8));
112 for (std::list
<PieceRequestStruct
>::iterator it
= peersRequests
.begin(); it
!= peersRequests
.end(); ++it
)
113 if ((it
->index
== index
) && (it
->start
== start
) && (it
->length
== length
)) {
114 peersRequests
.erase(it
);
119 reader
= lengthReader
;
120 deleteReader
= false;
124 /* {{{ Connection::readCommand(Tairon::Net::Reader *) */
125 void Connection::readCommand(Tairon::Net::Reader
*)
127 commandReader
->reset();
128 switch (*reader
->getBuffer()) {
130 if (commandLength
!= 1) {
131 DEBUG("Invalid length of choke command");
133 closedSignal
.emit(this);
137 reader
= lengthReader
;
138 if (choked
) // we're already choked
141 client
->gotChoke(this);
144 if (commandLength
!= 1) {
145 DEBUG("Invalid length of unchoke command");
147 closedSignal
.emit(this);
151 reader
= lengthReader
;
152 if (!choked
) // we're already unchoked
155 client
->gotUnchoke(this);
158 if (commandLength
!= 1) {
159 DEBUG("Invalid length of interested command");
161 closedSignal
.emit(this);
165 reader
= lengthReader
;
166 peerInterested
= true;
167 client
->gotInterestedChange(this);
170 if (commandLength
!= 1) {
171 DEBUG("Invalid length of not interested command");
173 closedSignal
.emit(this);
176 DEBUG("not interested");
177 reader
= lengthReader
;
178 peerInterested
= false;
179 client
->gotInterestedChange(this);
182 if (commandLength
!= 5) {
183 DEBUG("Invalid length of have command");
185 closedSignal
.emit(this);
189 reader
= new Tairon::Net::IReader(4, socket
, rlimiter
);
190 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readHave
));
191 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
197 closedSignal
.emit(this);
200 if (commandLength
!= 13) {
201 DEBUG("Invalid length of request command");
203 closedSignal
.emit(this);
207 reader
= new Tairon::Net::IReader(12, socket
, rlimiter
);
208 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readRequest
));
209 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
214 reader
= new Tairon::Net::IReader(8, socket
, rlimiter
);
215 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readPiece
));
216 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
217 lastPieceTime
= Tairon::Net::Timer::currentTime();
221 if (commandLength
!= 13) {
222 DEBUG("Invalid length of cancel command");
224 closedSignal
.emit(this);
228 reader
= new Tairon::Net::IReader(12, socket
, rlimiter
);
229 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readCancel
));
230 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
234 DEBUG((const char *) String("unknown message" + String::number(*reader
->getBuffer())));
236 closedSignal
.emit(this);
241 /* {{{ Connection::readFirstCommand(Tairon::Net::Reader *) */
242 void Connection::readFirstCommand(Tairon::Net::Reader
*)
244 commandReader
->bufferFullSignal
.clear();
245 commandReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readCommand
));
246 commandReader
->reset();
248 if (*reader
->getBuffer() == 5) { // bitfield
251 // check if the peer isn't sending fake data
252 uint32_t len
= client
->getMetaInfo()["info"]["pieces"].asString().length() / 20;
253 if ((commandLength
- 1) != (len
/ 8 + (len
% 8 ? 1 : 0))) {
254 ERROR("Peer sent an invalid bitfield length");
256 closedSignal
.emit(this);
260 reader
= new Tairon::Net::IReader(commandLength
- 1, socket
, rlimiter
);
261 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readBitField
));
262 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
266 bitfield
= new Tairent::Core::BitField(client
->getMetaInfo()["info"]["pieces"].asString().length() / 20);
272 /* {{{ Connection::readHandshake(Tairon::Net::Reader *) */
273 void Connection::readHandshake(Tairon::Net::Reader
*)
275 DEBUG("got handshake 1");
277 const char *buffer
= reader
->getBuffer();
279 // check the protocol length (in the case that reader emitted
280 // bufferFullSignal instead of dataReadSignal.
282 WARNING("Invalid protocol length");
284 closedSignal
.emit(this);
288 if (strstr(buffer
+ 1, protocolName
) != (buffer
+ 1)) { // invalid protocol name
289 WARNING("Invalid protocol name");
291 closedSignal
.emit(this);
295 DEBUG("Protocol ok");
298 String
infoHash(buffer
+ 28, 20);
299 DEBUG("sending handshake");
300 client
= TorrentManager::self()->getClient(infoHash
);
302 WARNING("Got unknown infohash");
304 closedSignal
.emit(this);
307 sendHandshake(infoHash
);
312 reader
= new Tairon::Net::IReader(20, socket
, rlimiter
);
313 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readPeerID
));
314 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
318 /* {{{ Connection::readHave(Tairon::Net::Reader *) */
319 void Connection::readHave(Tairon::Net::Reader
*)
321 uint32_t index
= htonl(*(uint32_t *) reader
->getBuffer());
323 if (index
> bitfield
->getLength()) {
324 WARNING("Peer sent invalid have index");
326 closedSignal
.emit(this);
331 reader
= lengthReader
;
332 deleteReader
= false;
334 bitfield
->setBit(index
);
335 client
->gotHave(index
, this);
339 /* {{{ Connection::readLength(Tairon::Net::Reader *) */
340 void Connection::readLength(Tairon::Net::Reader
*)
342 commandLength
= ntohl(*(uint32_t *) lengthReader
->getBuffer());
343 DEBUG((const char *) String("Got message of length " + String::number(commandLength
)));
345 lengthReader
->reset();
348 reader
= commandReader
;
353 /* {{{ Connection::readPeerID(Tairon::Net::Reader *) */
354 void Connection::readPeerID(Tairon::Net::Reader
*)
356 DEBUG("got peer id");
357 String
pID(reader
->getBuffer(), 20);
359 if ((peerID
.length() == 20) && (peerID
!= pID
)) { // we are connecting to a peer and it's ID doesn't match
360 WARNING("Peer's ID doesn't match");
362 closedSignal
.emit(this);
366 String oldPeerID
= peerID
;
371 lengthReader
= new Tairon::Net::IReader(4, socket
, rlimiter
);
372 lengthReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readLength
));
373 lengthReader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
375 commandReader
= new Tairon::Net::IReader(1, socket
, rlimiter
);
376 commandReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readFirstCommand
));
377 commandReader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
379 reader
= lengthReader
;
380 deleteReader
= false;
382 TorrentServer::self()->connectionCompleted(this);
383 client
->newConnection(this, oldPeerID
);
387 /* {{{ Connection::readPiece(Tairon::Net::Reader *) */
388 void Connection::readPiece(Tairon::Net::Reader
*)
390 pieceIndex
= ntohl(*(uint32_t *) reader
->getBuffer());
391 pieceStart
= ntohl(*(uint32_t *) (reader
->getBuffer() + 4));
394 std::set
<uint32_t> &r
= requests
[pieceIndex
];
397 requests
.erase(pieceIndex
);
399 deleteReader
= false;
400 client
->gotPiece(pieceIndex
, pieceStart
, this);
404 /* {{{ Connection::readProtocolLength(Tairon::Net::Reader *, size_t) */
405 void Connection::readProtocolLength(Tairon::Net::Reader
*, size_t)
407 if (*reader
->getBuffer() != 19) { // length of the protocol name
408 WARNING("Invalid protocol length");
410 closedSignal
.emit(this);
414 reader
->dataReadSignal
.clear();
418 /* {{{ Connection::readRequest(Tairon::Net::Reader *) */
419 void Connection::readRequest(Tairon::Net::Reader
*)
421 if (peerChoked
) { // don't send anything to the choked peer
423 reader
= lengthReader
;
424 deleteReader
= false;
428 PieceRequestStruct rs
;
429 rs
.index
= htonl(*(uint32_t *) reader
->getBuffer());
430 rs
.start
= htonl(*(uint32_t *) (reader
->getBuffer() + 4));
431 rs
.length
= htonl(*(uint32_t *) (reader
->getBuffer() + 8));
433 if (rs
.length
> 16384) {
434 WARNING((const char *) String("Client requested too big chunk: " + String::number(rs
.length
)));
436 closedSignal
.emit(this);
441 reader
= lengthReader
;
442 deleteReader
= false;
444 if (pieceWriters
|| commandQueue
.size()) // we are sending something
445 peersRequests
.push_back(rs
);
447 client
->getStorage()->gotRequest(rs
.index
, rs
.start
, rs
.length
, this);
451 /* {{{ Connection::readyRead(Tairon::Net::Socket *) */
452 void Connection::readyRead(Tairon::Net::Socket
*)
456 } catch (const Tairon::Net::SocketException
&) { // connection closed or other error
458 closedSignal
.emit(this);
463 /* {{{ Connection::readyWrite(Tairon::Net::Socket *) */
464 void Connection::readyWrite(Tairon::Net::Socket
*)
467 pieceWriters
->writers
.front()->write();
469 commandQueue
.front()->write();
473 /* {{{ Connection::addCommandWriter(Tairon::Net::Writer *) */
474 void Connection::addCommandWriter(Tairon::Net::Writer
*writer
)
476 commandQueue
.push_back(writer
);
477 if ((commandQueue
.size() == 1) && !pieceWriters
) // don't send anything if we are sending something
482 /* {{{ Connection::connectingError(Tairon::Net::Socket *, int) */
483 void Connection::connectingError(Tairon::Net::Socket
*, int)
486 client
->connectingFailed(this);
490 /* {{{ Connection::close() */
491 void Connection::close()
497 /* {{{ Connection::clearRequested() */
498 void Connection::clearRequested()
504 /* {{{ Connection::dataWritten(Tairon::Net::Writer *) */
505 void Connection::dataWritten(Tairon::Net::Writer
*writer
)
507 DEBUG("dataWritten");
509 Tairon::Net::Writer
*w
= commandQueue
.front();
510 if (w
== writer
) // is this writer in the queue?
511 commandQueue
.pop_front(); // remove it
514 if (commandQueue
.size()) // are there some commands to send?
515 commandQueue
.front()->write();
516 else if (peersRequests
.size()) { // let's try to send some piece
517 PieceRequestStruct rs
= peersRequests
.front();
518 peersRequests
.pop_front();
519 client
->getStorage()->gotRequest(rs
.index
, rs
.start
, rs
.length
, this);
524 /* {{{ Connection::destroyPieceReaders() */
525 void Connection::destroyPieceReaders()
530 for (std::list
<Tairon::Net::Reader
*>::iterator it
= pieceReaders
->readers
.begin(); it
!= pieceReaders
->readers
.end(); ++it
)
537 /* {{{ Connection::destroyPieceWriters() */
538 void Connection::destroyPieceWriters()
543 for (std::list
<Tairon::Net::Writer
*>::iterator it
= pieceWriters
->writers
.begin(); it
!= pieceWriters
->writers
.end(); ++it
)
550 /* {{{ Connection::getIncomingRate() */
551 double Connection::getIncomingRate()
553 return incomingRate
->getRate();
557 /* {{{ Connection::getOutgoingRate() */
558 double Connection::getOutgoingRate()
560 return outgoingRate
->getRate();
564 /* {{{ Connection::getPieceLength() */
565 uint32_t Connection::getPieceLength()
567 // 1 byte message type, 4 bytes index, 4 bytes start
568 return commandLength
- 9;
572 /* {{{ Connection::getReadingLimiter() */
573 Tairon::Net::Limiter
*Connection::getReadingLimiter()
579 /* {{{ Connection::getRequestsCount() */
580 unsigned int Connection::getRequestsCount()
582 unsigned int ret
= 0;
583 for (std::map
<uint32_t, std::set
<uint32_t> >::const_iterator it
= requests
.begin(); it
!= requests
.end(); ++it
)
584 ret
+= it
->second
.size();
589 /* {{{ Connection::getWritingLimiter() */
590 Tairon::Net::Limiter
*Connection::getWritingLimiter()
596 /* {{{ Connection::init() */
597 void Connection::init()
599 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
601 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::socketError
));
602 socket
->readyReadSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::readyRead
));
603 socket
->readyWriteSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::readyWrite
));
606 reader
= new Tairon::Net::IReader(handshakeLength
, socket
, rlimiter
);
607 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readHandshake
));
608 // it is safe to do this, we just want to check the first byte
609 reader
->dataReadSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readProtocolLength
));
610 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
614 /* {{{ Connection::isSnubbed() */
615 bool Connection::isSnubbed()
617 return Tairon::Net::Timer::currentTime() - lastPieceTime
> 10000; // 10 seconds, should be configurable
621 /* {{{ Connection::pieceDataWritten(Tairon::Net::Writer *) */
622 void Connection::pieceDataWritten(Tairon::Net::Writer
*writer
)
624 pieceWriters
->writers
.pop_front();
626 if (pieceWriters
->writers
.size()) { // there is still something to send from the piece
627 delete writer
; // delete the current writer
628 pieceWriters
->writers
.front()->write();
629 } else { // whole piece has been sent
630 outgoingRate
->update(pieceWriters
->length
);
631 client
->pieceSent(pieceWriters
->index
, pieceWriters
->length
, this);
632 // the last piece's writer will be deleted in dataWritten() method
641 /* {{{ Connection::readerError(Tairon::Net::Reader *, Tairon::Net::Socket *) */
642 void Connection::readerError(Tairon::Net::Reader
*, Tairon::Net::Socket
*)
644 closedSignal
.emit(this);
648 /* {{{ Connection::sendChoke() */
649 void Connection::sendChoke()
651 DEBUG("sending choke");
656 peersRequests
.clear();
659 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x00", 5), socket
, wlimiter
);
660 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
661 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
663 addCommandWriter(writer
);
667 /* {{{ Connection::sendHandshake(const String &) */
668 void Connection::sendHandshake(const String
&infoHash
)
671 handshake
[0] = (char) 19;
672 memcpy(handshake
+ 1, protocolName
, 19);
673 memset(handshake
+ 20, 0, 8);
674 memcpy(handshake
+ 28, infoHash
.data(), 20);
675 memcpy(handshake
+ 48, TorrentManager::self()->getClientID().data(), 20);
677 String
h(handshake
, 68);
679 Tairon::Net::Writer
*writer
= new Tairon::Net::SWriter(h
, socket
, wlimiter
);
680 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
681 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
683 addCommandWriter(writer
);
685 if (client
->getStorage()->hasSomething()) { // we have already downloaded some piece, send bitfield as well
686 DEBUG("sending bitfield");
687 // count length of the bitfield
688 size_t length
= client
->getStorage()->getBitField()->getLength();
689 length
= length
/ 8 + (length
% 8 ? 1 : 0);
691 char *bf
= new char[5 + length
];
693 uint32_t l
= htonl(1 + length
); // 1 byte message type, rest is bitfield
694 memcpy(bf
, (const char *) (&l
), 4);
695 bf
[4] = 5; // bitfield message
696 memcpy(bf
+ 5, client
->getStorage()->getBitField()->getData(), length
);
698 String
b(bf
, 5 + length
);
699 writer
= new Tairon::Net::SWriter(b
, socket
, wlimiter
);
700 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
701 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
703 addCommandWriter(writer
);
710 /* {{{ Connection::sendHave(uint32_t) */
711 void Connection::sendHave(uint32_t index
)
713 DEBUG("sending have");
715 uint32_t i
= htonl(index
);
716 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x05\x04", 5) + String((const char *) (&i
), 4), socket
, wlimiter
);
717 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
718 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
720 addCommandWriter(writer
);
724 /* {{{ Connection::sendInterested() */
725 void Connection::sendInterested()
727 DEBUG("sending interested");
733 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x02", 5), socket
, wlimiter
);
734 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
735 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
737 addCommandWriter(writer
);
741 /* {{{ Connection::sendNotInterested() */
742 void Connection::sendNotInterested()
744 DEBUG("sending not interested");
750 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x03", 5), socket
, wlimiter
);
751 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
752 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
754 addCommandWriter(writer
);
758 /* {{{ Connection::sendRequest(uint32_t, uint32_t, uint32_t) */
759 void Connection::sendRequest(uint32_t index
, uint32_t start
, uint32_t length
)
761 requests
[index
].insert(start
);
764 memcpy(msg
, "\x00\x00\x00\x0d\x06", 5);
765 *(uint32_t *) (msg
+ 5) = htonl(index
);
766 *(uint32_t *) (msg
+ 9) = htonl(start
);
767 *(uint32_t *) (msg
+ 13) = htonl(length
);
769 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String(msg
, 17), socket
, wlimiter
);
770 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
771 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
773 addCommandWriter(writer
);
777 /* {{{ Connection::sendUnchoke(int) */
778 void Connection::sendUnchoke(int time
)
780 DEBUG("sending unchoke");
788 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x01", 5), socket
, wlimiter
);
789 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
790 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
792 addCommandWriter(writer
);
796 /* {{{ Connection::setNextReader(Tairon::Net::Reader *) */
797 void Connection::setNextReader(Tairon::Net::Reader
*)
799 delete pieceReaders
->readers
.front();
800 pieceReaders
->readers
.pop_front();
802 if (pieceReaders
->readers
.size()) {
803 DEBUG("Setting next piece reader");
804 reader
= pieceReaders
->readers
.front();
806 DEBUG("Piece downloaded");
808 incomingRate
->update(pieceReaders
->length
);
810 bool fake
= pieceReaders
->fake
;
813 reader
= lengthReader
;
814 deleteReader
= false;
816 client
->pieceDownloaded(pieceIndex
, pieceStart
, this);
821 /* {{{ Connection::setReaders(PieceReadersStruct *) */
822 void Connection::setReaders(PieceReadersStruct
*readers
)
824 for (std::list
<Tairon::Net::Reader
*>::iterator it
= readers
->readers
.begin(); it
!= readers
->readers
.end(); ++it
) {
825 (*it
)->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::setNextReader
));
826 (*it
)->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
828 pieceReaders
= readers
;
829 reader
= pieceReaders
->readers
.front();
833 /* {{{ Connection::setWriters(PieceWritersStruct *) */
834 void Connection::setWriters(PieceWritersStruct
*writers
)
836 for (std::list
<Tairon::Net::Writer
*>::iterator it
= writers
->writers
.begin(); it
!= writers
->writers
.end(); ++it
) {
837 (*it
)->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::pieceDataWritten
));
838 (*it
)->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
841 pieceWriters
= writers
;
842 writers
->writers
.front()->write();
846 /* {{{ Connection::socketConnected(Tairon::Net::Socket *) */
847 void Connection::socketConnected(Tairon::Net::Socket
*)
849 socket
->connectedSignal
.clear();
850 socket
->errorSignal
.clear();
854 sendHandshake(client
->getInfoHash());
858 /* {{{ Connection::socketError(Tairon::Net::Socket *, int) */
859 void Connection::socketError(Tairon::Net::Socket
*, int)
862 closedSignal
.emit(this);
866 /* {{{ Connection::writerError(Tairon::Net::Writer *, Tairon::Net::Socket *) */
867 void Connection::writerError(Tairon::Net::Writer
*, Tairon::Net::Socket
*)
869 closedSignal
.emit(this);
875 }; // namespace Tairent
877 // vim: ai sw=4 ts=4 noet fdm=marker