1 /***************************************************************************
3 * Copyright (C) 2006 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
19 #include <tairon/core/log.h>
20 #include <tairon/core/thread.h>
21 #include <tairon/net/ireader.h>
22 #include <tairon/net/limiter.h>
23 #include <tairon/net/socket.h>
24 #include <tairon/net/swriter.h>
25 #include <tairon/net/timer.h>
27 #include "connection.h"
29 #include "core/bencode.h"
30 #include "core/bitfield.h"
31 #include "ratemeasurer.h"
33 #include "torrentclient.h"
34 #include "torrentmanager.h"
35 #include "torrentserver.h"
43 const unsigned int handshakeLength
= 48; // without peer id
44 const char *protocolName
= "BitTorrent protocol";
46 Tairon::Net::Limiter
*rlimiter
= 0;
47 Tairon::Net::Limiter
*wlimiter
= 0;
49 /* {{{ Connection::Connection(int, TorrentClient *) */
50 Connection::Connection(int fd
, TorrentClient
*c
) : bitfield(0), choked(true), client(c
), deleteReader(true), interested(false), lastPieceTime(0), peerChoked(true), peerInterested(false), sendingPiece(false)
52 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
54 socket
= new Tairon::Net::Socket(fd
);
55 socket
->errorSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::socketError
));
56 socket
->readyReadSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::readyRead
));
57 socket
->readyWriteSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Connection::readyWrite
));
60 reader
= new Tairon::Net::IReader(handshakeLength
, socket
, rlimiter
);
61 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readHandshake
));
62 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
64 incomingRate
= new RateMeasurer(10);
68 /* {{{ Connection::~Connection() */
69 Connection::~Connection()
75 /* {{{ Connection::readBitField(Tairon::Net::Reader *) */
76 void Connection::readBitField(Tairon::Net::Reader
*)
78 DEBUG("bitfield read");
79 bitfield
= new Tairent::Core::BitField(reader
->getBuffer(), client
->getMetaInfo()["info"]["pieces"].asString().length() / 20);
81 reader
= lengthReader
;
84 client
->gotBitField(this);
88 /* {{{ Connection::readCommand(Tairon::Net::Reader *) */
89 void Connection::readCommand(Tairon::Net::Reader
*)
91 switch (*reader
->getBuffer()) {
95 reader
= lengthReader
;
96 client
->gotChoke(this);
101 reader
= lengthReader
;
102 client
->gotUnchoke(this);
106 reader
= lengthReader
;
107 peerInterested
= true;
108 client
->gotInterestedChange(this);
111 DEBUG("not interested");
112 reader
= lengthReader
;
113 peerInterested
= false;
114 client
->gotInterestedChange(this);
118 reader
= new Tairon::Net::IReader(4, socket
, rlimiter
);
119 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readHave
));
120 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
126 closedSignal
.emit(this);
130 reader
= new Tairon::Net::IReader(12, socket
, rlimiter
);
131 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readRequest
));
132 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
137 reader
= new Tairon::Net::IReader(8, socket
, rlimiter
);
138 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readPiece
));
139 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
140 lastPieceTime
= Tairon::Net::Timer::currentTime();
147 DEBUG((const char *) String("unknown message" + String::number(*reader
->getBuffer())));
149 commandReader
->reset();
153 /* {{{ Connection::readFirstCommand(Tairon::Net::Reader *) */
154 void Connection::readFirstCommand(Tairon::Net::Reader
*)
156 commandReader
->bufferFullSignal
.clear();
157 commandReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readCommand
));
159 if (*reader
->getBuffer() == 5) { // bitfield
162 // check if the peer isn't sending fake data
163 uint32_t len
= client
->getMetaInfo()["info"]["pieces"].asString().length() / 20;
164 if ((commandLength
- 1) != (len
/ 8 + (len
% 8 ? 1 : 0))) {
165 ERROR("Peer sent an invalid bitfield length");
167 closedSignal
.emit(this);
171 reader
= new Tairon::Net::IReader(commandLength
- 1, socket
, rlimiter
);
172 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readBitField
));
173 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
177 bitfield
= new Tairent::Core::BitField(client
->getMetaInfo()["info"]["pieces"].asString().length() / 20);
181 commandReader
->reset();
185 /* {{{ Connection::readHandshake(Tairon::Net::Reader *) */
186 void Connection::readHandshake(Tairon::Net::Reader
*)
188 DEBUG("got handshake 1");
190 const char *buffer
= reader
->getBuffer();
192 if (*buffer
!= 19) { // length of the protocol name
193 WARNING("Invalid protocol length");
194 closedSignal
.emit(this);
198 if (strstr(buffer
+ 1, protocolName
) != (buffer
+ 1)) { // invalid protocol name
199 WARNING("Invalid protocol name");
200 closedSignal
.emit(this);
204 DEBUG("Protocol ok");
206 String
infoHash(buffer
+ 28, 20);
208 // we are initializing the connection
210 DEBUG("sending handshake");
211 client
= TorrentManager::self()->getClient(infoHash
);
212 sendHandshake(infoHash
);
217 reader
= new Tairon::Net::IReader(20, socket
, rlimiter
);
218 reader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readPeerID
));
219 reader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
223 /* {{{ Connection::readHave(Tairon::Net::Reader *) */
224 void Connection::readHave(Tairon::Net::Reader
*)
226 uint32_t index
= htonl(*(uint32_t *) reader
->getBuffer());
227 bitfield
->setBit(index
);
228 client
->gotHave(index
, this);
231 reader
= lengthReader
;
232 deleteReader
= false;
236 /* {{{ Connection::readLength(Tairon::Net::Reader *) */
237 void Connection::readLength(Tairon::Net::Reader
*)
239 commandLength
= ntohl(*(uint32_t *) lengthReader
->getBuffer());
240 DEBUG((const char *) String("Got message of length " + String::number(commandLength
)));
242 lengthReader
->reset();
245 reader
= commandReader
;
250 /* {{{ Connection::readPeerID(Tairon::Net::Reader *) */
251 void Connection::readPeerID(Tairon::Net::Reader
*)
253 DEBUG("got peer id");
254 String
peerID(reader
->getBuffer(), 20);
258 lengthReader
= new Tairon::Net::IReader(4, socket
, rlimiter
);
259 lengthReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readLength
));
260 lengthReader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
262 commandReader
= new Tairon::Net::IReader(1, socket
, rlimiter
);
263 commandReader
->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readFirstCommand
));
264 commandReader
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::readerError
));
266 reader
= lengthReader
;
267 deleteReader
= false;
269 TorrentServer::self()->connectionCompleted(this);
270 client
->newConnection(this);
274 /* {{{ Connection::readPiece(Tairon::Net::Reader *) */
275 void Connection::readPiece(Tairon::Net::Reader
*)
277 pieceIndex
= ntohl(*(uint32_t *) reader
->getBuffer());
278 pieceStart
= ntohl(*(uint32_t *) (reader
->getBuffer() + 4));
281 std::set
<uint32_t> &r
= requests
[pieceIndex
];
284 requests
.erase(pieceIndex
);
286 deleteReader
= false;
287 client
->gotPiece(pieceIndex
, pieceStart
, this);
291 /* {{{ Connection::readRequest(Tairon::Net::Reader *) */
292 void Connection::readRequest(Tairon::Net::Reader
*)
294 uint32_t index
= htonl(*(uint32_t *) reader
->getBuffer());
295 uint32_t start
= htonl(*(uint32_t *) (reader
->getBuffer() + 4));
296 uint32_t length
= htonl(*(uint32_t *) (reader
->getBuffer() + 8));
299 reader
= lengthReader
;
300 deleteReader
= false;
302 client
->gotRequest(index
, start
, length
, this);
306 /* {{{ Connection::readyRead(Tairon::Net::Socket *) */
307 void Connection::readyRead(Tairon::Net::Socket
*)
311 } catch (const Tairon::Net::SocketException
&) { // connection closed or other error
312 closedSignal
.emit(this);
318 /* {{{ Connection::readyWrite(Tairon::Net::Socket *) */
319 void Connection::readyWrite(Tairon::Net::Socket
*)
322 pieceWriters
.front()->writers
.front()->write();
324 commandQueue
.front()->write();
328 /* {{{ Connection::addCommandWriter(Tairon::Net::Writer *) */
329 void Connection::addCommandWriter(Tairon::Net::Writer
*writer
)
331 commandQueue
.push_back(writer
);
332 if ((commandQueue
.size() == 1) && !sendingPiece
) // don't send anything if we are sending something
337 /* {{{ Connection::addWriters(PieceWritersStruct *) */
338 void Connection::addWriters(PieceWritersStruct
*writers
)
340 pieceWriters
.push_back(writers
);
342 for (std::list
<Tairon::Net::Writer
*>::iterator it
= writers
->writers
.begin(); it
!= writers
->writers
.end(); ++it
) {
343 (*it
)->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::pieceDataWritten
));
344 (*it
)->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
347 if (!commandQueue
.size() && (pieceWriters
.size() == 1)) { // we aren't sending any command or data right now
349 writers
->writers
.front()->write();
354 /* {{{ Connection::close() */
355 void Connection::close()
363 /* {{{ Connection::clearRequested() */
364 void Connection::clearRequested()
370 /* {{{ Connection::dataWritten(Tairon::Net::Writer *) */
371 void Connection::dataWritten(Tairon::Net::Writer
*writer
)
373 DEBUG("dataWritten");
375 Tairon::Net::Writer
*w
= commandQueue
.front();
376 if (w
== writer
) // is this writer in the queue?
377 commandQueue
.pop_front(); // remove it
380 if (commandQueue
.size()) { // are there some commands to send?
381 sendingPiece
= false;
382 commandQueue
.front()->write();
383 } else if (pieceWriters
.size()) { // let's try to send some piece
385 pieceWriters
.front()->writers
.front()->write();
390 /* {{{ Connection::destroyPieceReaders() */
391 void Connection::destroyPieceReaders()
393 for (std::list
<Tairon::Net::Reader
*>::iterator it
= pieceReaders
->readers
.begin(); it
!= pieceReaders
->readers
.end(); ++it
)
400 /* {{{ Connection::destroyPieceWriters() */
401 void Connection::destroyPieceWriters()
403 for (std::list
<PieceWritersStruct
*>::iterator it1
= pieceWriters
.begin(); it1
!= pieceWriters
.end(); ++it1
) {
404 for (std::list
<Tairon::Net::Writer
*>::iterator it2
= (*it1
)->writers
.begin(); it2
!= (*it1
)->writers
.end(); ++it2
)
408 pieceWriters
.clear();
412 /* {{{ Connection::getIncomingRate() */
413 double Connection::getIncomingRate()
415 return incomingRate
->getRate();
419 /* {{{ Connection::getPieceLength() */
420 uint32_t Connection::getPieceLength()
422 // 1 byte message type, 4 bytes index, 4 bytes start
423 return commandLength
- 9;
427 /* {{{ Connection::getReadingLimiter() */
428 Tairon::Net::Limiter
*Connection::getReadingLimiter()
434 /* {{{ Connection::getWritingLimiter() */
435 Tairon::Net::Limiter
*Connection::getWritingLimiter()
441 /* {{{ Connection::isSnubbed() */
442 bool Connection::isSnubbed()
444 return Tairon::Net::Timer::currentTime() - lastPieceTime
> 10000; // 10 seconds, should be configurable
448 /* {{{ Connection::pieceDataWritten(Tairon::Net::Writer *) */
449 void Connection::pieceDataWritten(Tairon::Net::Writer
*writer
)
451 PieceWritersStruct
*writers
= pieceWriters
.front();
452 writers
->writers
.pop_front();
454 if (writers
->writers
.size()) { // there is still something to send from the piece
455 delete writer
; // delete the current writer
456 writers
->writers
.front()->write();
457 } else { // whole piece has been sent
458 client
->pieceSent(writers
->index
, writers
->length
, this);
459 // the last piece's writer will be deleted in dataWritten() method
461 pieceWriters
.pop_front();
463 sendingPiece
= false;
470 /* {{{ Connection::pieceReaderError(Tairon::Net::Reader *, Tairon::Net::Socket *) */
471 void Connection::pieceReaderError(Tairon::Net::Reader
*, Tairon::Net::Socket
*)
476 closedSignal
.emit(this);
480 /* {{{ Connection::readerError(Tairon::Net::Reader *, Tairon::Net::Socket *) */
481 void Connection::readerError(Tairon::Net::Reader
*, Tairon::Net::Socket
*)
485 closedSignal
.emit(this);
489 /* {{{ Connection::sendChoke() */
490 void Connection::sendChoke()
492 DEBUG("sending choke");
498 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x00", 5), socket
, wlimiter
);
499 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
500 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
502 addCommandWriter(writer
);
506 /* {{{ Connection::sendHandshake(const String &) */
507 void Connection::sendHandshake(const String
&infoHash
)
510 handshake
[0] = (char) 19;
511 memcpy(handshake
+ 1, protocolName
, 19);
512 memset(handshake
+ 20, 0, 8);
513 memcpy(handshake
+ 28, infoHash
.data(), 20);
514 memcpy(handshake
+ 48, TorrentManager::self()->getClientID().data(), 20);
516 String
h(handshake
, 68);
518 Tairon::Net::Writer
*writer
= new Tairon::Net::SWriter(h
, socket
, wlimiter
);
519 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
520 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
522 addCommandWriter(writer
);
524 if (client
->getStorage()->hasSomething()) { // we have already downloaded some piece, send bitfield as well
525 DEBUG("sending bitfield");
526 // count length of the bitfield
527 size_t length
= client
->getStorage()->getBitField()->getLength();
528 length
= length
/ 8 + (length
/ 8 ? 1 : 0);
530 char *bf
= new char[5 + length
];
532 uint32_t l
= htonl(1 + length
); // 1 byte message type, rest is bitfield
533 memcpy(bf
, (const char *) (&l
), 4);
534 bf
[4] = 5; // bitfield message
535 memcpy(bf
+ 5, client
->getStorage()->getBitField()->getData(), length
);
537 String
b(bf
, 5 + length
);
538 writer
= new Tairon::Net::SWriter(b
, socket
, wlimiter
);
539 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
540 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
542 addCommandWriter(writer
);
547 /* {{{ Connection::sendHave(uint32_t) */
548 void Connection::sendHave(uint32_t index
)
550 DEBUG("sending have");
552 uint32_t i
= htonl(index
);
553 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x05\x04", 5) + String((const char *) (&i
), 4), socket
, wlimiter
);
554 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
555 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
557 addCommandWriter(writer
);
561 /* {{{ Connection::sendInterested() */
562 void Connection::sendInterested()
564 DEBUG("sending interested");
570 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x02", 5), socket
, wlimiter
);
571 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
572 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
574 addCommandWriter(writer
);
578 /* {{{ Connection::sendNotInterested() */
579 void Connection::sendNotInterested()
581 DEBUG("sending not interested");
587 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x03", 5), socket
, wlimiter
);
588 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
589 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
591 addCommandWriter(writer
);
595 /* {{{ Connection::sendRequest(uint32_t, uint32_t, uint32_t) */
596 void Connection::sendRequest(uint32_t index
, uint32_t start
, uint32_t length
)
598 requests
[index
].insert(start
);
601 memcpy(msg
, "\x00\x00\x00\x0d\x06", 5);
602 *(uint32_t *) (msg
+ 5) = htonl(index
);
603 *(uint32_t *) (msg
+ 9) = htonl(start
);
604 *(uint32_t *) (msg
+ 13) = htonl(length
);
606 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String(msg
, 17), socket
, wlimiter
);
607 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
608 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
610 addCommandWriter(writer
);
614 /* {{{ Connection::sendUnchoke() */
615 void Connection::sendUnchoke()
617 DEBUG("sending unchoke");
623 Tairon::Net::SWriter
*writer
= new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x01", 5), socket
, wlimiter
);
624 writer
->dataWrittenSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten
));
625 writer
->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::writerError
));
627 addCommandWriter(writer
);
631 /* {{{ Connection::setNextReader(Tairon::Net::Reader *) */
632 void Connection::setNextReader(Tairon::Net::Reader
*)
634 delete pieceReaders
->readers
.front();
635 pieceReaders
->readers
.pop_front();
637 if (pieceReaders
->readers
.size()) {
638 DEBUG("Setting next piece reader");
639 reader
= pieceReaders
->readers
.front();
641 DEBUG("Piece downloaded");
644 reader
= lengthReader
;
645 deleteReader
= false;
646 client
->pieceDownloaded(pieceIndex
, pieceStart
, this);
651 /* {{{ Connection::setReaders(PieceReadersStruct *) */
652 void Connection::setReaders(PieceReadersStruct
*readers
)
654 for (std::list
<Tairon::Net::Reader
*>::iterator it
= readers
->readers
.begin(); it
!= readers
->readers
.end(); ++it
) {
655 (*it
)->bufferFullSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::setNextReader
));
656 (*it
)->errorSignal
.connect(Tairon::Core::methodFunctor(this, &Connection::pieceReaderError
));
658 pieceReaders
= readers
;
659 reader
= pieceReaders
->readers
.front();
663 /* {{{ Connection::socketError(Tairon::Net::Socket *, int) */
664 void Connection::socketError(Tairon::Net::Socket
*, int)
668 closedSignal
.emit(this);
672 /* {{{ Connection::writerError(Tairon::Net::Writer *, Tairon::Net::Socket *) */
673 void Connection::writerError(Tairon::Net::Writer
*, Tairon::Net::Socket
*)
677 closedSignal
.emit(this);
683 }; // namespace Tairent
685 // vim: ai sw=4 ts=4 noet fdm=marker