Send correct informations to a http tracker.
[tairent.git] / src / main / connection.cpp
blobe22bb2ec356f98e8d61c1a5e207c07dbf1e9450a
1 /***************************************************************************
2 * *
3 * Copyright (C) 2006 David Brodsky *
4 * *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
9 * *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
14 * *
15 ***************************************************************************/
17 #include <arpa/inet.h>
19 #include <tairon/core/log.h>
20 #include <tairon/core/thread.h>
21 #include <tairon/net/ireader.h>
22 #include <tairon/net/limiter.h>
23 #include <tairon/net/socket.h>
24 #include <tairon/net/swriter.h>
25 #include <tairon/net/timer.h>
27 #include "connection.h"
29 #include "core/bencode.h"
30 #include "core/bitfield.h"
31 #include "ratemeasurer.h"
32 #include "storage.h"
33 #include "torrentclient.h"
34 #include "torrentmanager.h"
35 #include "torrentserver.h"
37 namespace Tairent
40 namespace Main
43 const unsigned int handshakeLength = 48; // without peer id
44 const char *protocolName = "BitTorrent protocol";
46 Tairon::Net::Limiter *rlimiter = 0;
47 Tairon::Net::Limiter *wlimiter = 0;
49 /* {{{ Connection::Connection(int, TorrentClient *) */
50 Connection::Connection(int fd, TorrentClient *c) : bitfield(0), choked(true), client(c), deleteReader(true), interested(false), lastPieceTime(0), peerChoked(true), peerInterested(false), sendingPiece(false)
52 Tairon::Core::Thread *current = Tairon::Core::Thread::current();
54 socket = new Tairon::Net::Socket(fd);
55 socket->errorSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &Connection::socketError));
56 socket->readyReadSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &Connection::readyRead));
57 socket->readyWriteSignal.connect(Tairon::Core::threadMethodDFunctor(current, this, &Connection::readyWrite));
58 socket->ready();
60 reader = new Tairon::Net::IReader(handshakeLength, socket, rlimiter);
61 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readHandshake));
62 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
64 incomingRate = new RateMeasurer(10);
66 /* }}} */
68 /* {{{ Connection::~Connection() */
69 Connection::~Connection()
71 delete incomingRate;
73 /* }}} */
75 /* {{{ Connection::readBitField(Tairon::Net::Reader *) */
76 void Connection::readBitField(Tairon::Net::Reader *)
78 DEBUG("bitfield read");
79 bitfield = new Tairent::Core::BitField(reader->getBuffer(), client->getMetaInfo()["info"]["pieces"].asString().length() / 20);
80 delete reader;
81 reader = lengthReader;
82 deleteReader = false;
84 client->gotBitField(this);
86 /* }}} */
88 /* {{{ Connection::readCommand(Tairon::Net::Reader *) */
89 void Connection::readCommand(Tairon::Net::Reader *)
91 switch (*reader->getBuffer()) {
92 case 0:
93 DEBUG("choke");
94 choked = true;
95 reader = lengthReader;
96 client->gotChoke(this);
97 break;
98 case 1:
99 DEBUG("unchoke");
100 choked = false;
101 reader = lengthReader;
102 client->gotUnchoke(this);
103 break;
104 case 2:
105 DEBUG("interested");
106 reader = lengthReader;
107 peerInterested = true;
108 client->gotInterestedChange(this);
109 break;
110 case 3:
111 DEBUG("not interested");
112 reader = lengthReader;
113 peerInterested = false;
114 client->gotInterestedChange(this);
115 break;
116 case 4:
117 DEBUG("have");
118 reader = new Tairon::Net::IReader(4, socket, rlimiter);
119 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readHave));
120 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
121 deleteReader = true;
122 break;
123 case 5:
124 DEBUG("bitfield");
125 close();
126 closedSignal.emit(this);
127 break;
128 case 6:
129 DEBUG("request");
130 reader = new Tairon::Net::IReader(12, socket, rlimiter);
131 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readRequest));
132 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
133 deleteReader = true;
134 break;
135 case 7:
136 DEBUG("piece");
137 reader = new Tairon::Net::IReader(8, socket, rlimiter);
138 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readPiece));
139 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
140 lastPieceTime = Tairon::Net::Timer::currentTime();
141 deleteReader = true;
142 break;
143 case 8:
144 DEBUG("cancel");
145 break;
146 default:
147 DEBUG((const char *) String("unknown message" + String::number(*reader->getBuffer())));
149 commandReader->reset();
151 /* }}} */
153 /* {{{ Connection::readFirstCommand(Tairon::Net::Reader *) */
154 void Connection::readFirstCommand(Tairon::Net::Reader *)
156 commandReader->bufferFullSignal.clear();
157 commandReader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readCommand));
159 if (*reader->getBuffer() == 5) { // bitfield
160 DEBUG("bitfield");
162 // check if the peer isn't sending fake data
163 uint32_t len = client->getMetaInfo()["info"]["pieces"].asString().length() / 20;
164 if ((commandLength - 1) != (len / 8 + (len % 8 ? 1 : 0))) {
165 ERROR("Peer sent an invalid bitfield length");
166 socket->close();
167 closedSignal.emit(this);
168 return;
171 reader = new Tairon::Net::IReader(commandLength - 1, socket, rlimiter);
172 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readBitField));
173 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
175 deleteReader = true;
176 } else {
177 bitfield = new Tairent::Core::BitField(client->getMetaInfo()["info"]["pieces"].asString().length() / 20);
178 readCommand(reader);
181 commandReader->reset();
183 /* }}} */
185 /* {{{ Connection::readHandshake(Tairon::Net::Reader *) */
186 void Connection::readHandshake(Tairon::Net::Reader *)
188 DEBUG("got handshake 1");
190 const char *buffer = reader->getBuffer();
192 if (*buffer != 19) { // length of the protocol name
193 WARNING("Invalid protocol length");
194 closedSignal.emit(this);
195 close();
198 if (strstr(buffer + 1, protocolName) != (buffer + 1)) { // invalid protocol name
199 WARNING("Invalid protocol name");
200 closedSignal.emit(this);
201 close();
204 DEBUG("Protocol ok");
206 String infoHash(buffer + 28, 20);
207 if (client) {
208 // we are initializing the connection
209 } else {
210 DEBUG("sending handshake");
211 client = TorrentManager::self()->getClient(infoHash);
212 sendHandshake(infoHash);
215 delete reader;
217 reader = new Tairon::Net::IReader(20, socket, rlimiter);
218 reader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readPeerID));
219 reader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
221 /* }}} */
223 /* {{{ Connection::readHave(Tairon::Net::Reader *) */
224 void Connection::readHave(Tairon::Net::Reader *)
226 uint32_t index = htonl(*(uint32_t *) reader->getBuffer());
227 bitfield->setBit(index);
228 client->gotHave(index, this);
230 delete reader;
231 reader = lengthReader;
232 deleteReader = false;
234 /* }}} */
236 /* {{{ Connection::readLength(Tairon::Net::Reader *) */
237 void Connection::readLength(Tairon::Net::Reader *)
239 commandLength = ntohl(*(uint32_t *) lengthReader->getBuffer());
240 DEBUG((const char *) String("Got message of length " + String::number(commandLength)));
242 lengthReader->reset();
244 if (commandLength)
245 reader = commandReader;
246 // else keepalive
248 /* }}} */
250 /* {{{ Connection::readPeerID(Tairon::Net::Reader *) */
251 void Connection::readPeerID(Tairon::Net::Reader *)
253 DEBUG("got peer id");
254 String peerID(reader->getBuffer(), 20);
256 delete reader;
258 lengthReader = new Tairon::Net::IReader(4, socket, rlimiter);
259 lengthReader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readLength));
260 lengthReader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
262 commandReader = new Tairon::Net::IReader(1, socket, rlimiter);
263 commandReader->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readFirstCommand));
264 commandReader->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::readerError));
266 reader = lengthReader;
267 deleteReader = false;
269 TorrentServer::self()->connectionCompleted(this);
270 client->newConnection(this);
272 /* }}} */
274 /* {{{ Connection::readPiece(Tairon::Net::Reader *) */
275 void Connection::readPiece(Tairon::Net::Reader *)
277 pieceIndex = ntohl(*(uint32_t *) reader->getBuffer());
278 pieceStart = ntohl(*(uint32_t *) (reader->getBuffer() + 4));
279 delete reader;
281 std::set<uint32_t> &r = requests[pieceIndex];
282 r.erase(pieceStart);
283 if (!r.size())
284 requests.erase(pieceIndex);
286 deleteReader = false;
287 client->gotPiece(pieceIndex, pieceStart, this);
289 /* }}} */
291 /* {{{ Connection::readRequest(Tairon::Net::Reader *) */
292 void Connection::readRequest(Tairon::Net::Reader *)
294 uint32_t index = htonl(*(uint32_t *) reader->getBuffer());
295 uint32_t start = htonl(*(uint32_t *) (reader->getBuffer() + 4));
296 uint32_t length = htonl(*(uint32_t *) (reader->getBuffer() + 8));
298 delete reader;
299 reader = lengthReader;
300 deleteReader = false;
302 client->gotRequest(index, start, length, this);
304 /* }}} */
306 /* {{{ Connection::readyRead(Tairon::Net::Socket *) */
307 void Connection::readyRead(Tairon::Net::Socket *)
309 try {
310 reader->read();
311 } catch (const Tairon::Net::SocketException &) { // connection closed or other error
312 closedSignal.emit(this);
313 close();
316 /* }}} */
318 /* {{{ Connection::readyWrite(Tairon::Net::Socket *) */
319 void Connection::readyWrite(Tairon::Net::Socket *)
321 if (sendingPiece)
322 pieceWriters.front()->writers.front()->write();
323 else
324 commandQueue.front()->write();
326 /* }}} */
328 /* {{{ Connection::addCommandWriter(Tairon::Net::Writer *) */
329 void Connection::addCommandWriter(Tairon::Net::Writer *writer)
331 commandQueue.push_back(writer);
332 if ((commandQueue.size() == 1) && !sendingPiece) // don't send anything if we are sending something
333 writer->write();
335 /* }}} */
337 /* {{{ Connection::addWriters(PieceWritersStruct *) */
338 void Connection::addWriters(PieceWritersStruct *writers)
340 pieceWriters.push_back(writers);
342 for (std::list<Tairon::Net::Writer *>::iterator it = writers->writers.begin(); it != writers->writers.end(); ++it) {
343 (*it)->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::pieceDataWritten));
344 (*it)->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
347 if (!commandQueue.size() && (pieceWriters.size() == 1)) { // we aren't sending any command or data right now
348 sendingPiece = true;
349 writers->writers.front()->write();
352 /* }}} */
354 /* {{{ Connection::close() */
355 void Connection::close()
357 if (deleteReader)
358 delete reader;
359 socket->close();
361 /* }}} */
363 /* {{{ Connection::clearRequested() */
364 void Connection::clearRequested()
366 requests.clear();
368 /* }}} */
370 /* {{{ Connection::dataWritten(Tairon::Net::Writer *) */
371 void Connection::dataWritten(Tairon::Net::Writer *writer)
373 DEBUG("dataWritten");
375 Tairon::Net::Writer *w = commandQueue.front();
376 if (w == writer) // is this writer in the queue?
377 commandQueue.pop_front(); // remove it
378 delete writer;
380 if (commandQueue.size()) { // are there some commands to send?
381 sendingPiece = false;
382 commandQueue.front()->write();
383 } else if (pieceWriters.size()) { // let's try to send some piece
384 sendingPiece = true;
385 pieceWriters.front()->writers.front()->write();
388 /* }}} */
390 /* {{{ Connection::destroyPieceReaders() */
391 void Connection::destroyPieceReaders()
393 for (std::list<Tairon::Net::Reader *>::iterator it = pieceReaders->readers.begin(); it != pieceReaders->readers.end(); ++it)
394 delete *it;
395 delete pieceReaders;
396 pieceReaders = 0;
398 /* }}} */
400 /* {{{ Connection::destroyPieceWriters() */
401 void Connection::destroyPieceWriters()
403 for (std::list<PieceWritersStruct *>::iterator it1 = pieceWriters.begin(); it1 != pieceWriters.end(); ++it1) {
404 for (std::list<Tairon::Net::Writer *>::iterator it2 = (*it1)->writers.begin(); it2 != (*it1)->writers.end(); ++it2)
405 delete *it2;
406 delete *it1;
408 pieceWriters.clear();
410 /* }}} */
412 /* {{{ Connection::getIncomingRate() */
413 double Connection::getIncomingRate()
415 return incomingRate->getRate();
417 /* }}} */
419 /* {{{ Connection::getPieceLength() */
420 uint32_t Connection::getPieceLength()
422 // 1 byte message type, 4 bytes index, 4 bytes start
423 return commandLength - 9;
425 /* }}} */
427 /* {{{ Connection::getReadingLimiter() */
428 Tairon::Net::Limiter *Connection::getReadingLimiter()
430 return rlimiter;
432 /* }}} */
434 /* {{{ Connection::getWritingLimiter() */
435 Tairon::Net::Limiter *Connection::getWritingLimiter()
437 return wlimiter;
439 /* }}} */
441 /* {{{ Connection::isSnubbed() */
442 bool Connection::isSnubbed()
444 return Tairon::Net::Timer::currentTime() - lastPieceTime > 10000; // 10 seconds, should be configurable
446 /* }}} */
448 /* {{{ Connection::pieceDataWritten(Tairon::Net::Writer *) */
449 void Connection::pieceDataWritten(Tairon::Net::Writer *writer)
451 PieceWritersStruct *writers = pieceWriters.front();
452 writers->writers.pop_front();
454 if (writers->writers.size()) { // there is still something to send from the piece
455 delete writer; // delete the current writer
456 writers->writers.front()->write();
457 } else { // whole piece has been sent
458 client->pieceSent(writers->index, writers->length, this);
459 // the last piece's writer will be deleted in dataWritten() method
460 delete writers;
461 pieceWriters.pop_front();
463 sendingPiece = false;
465 dataWritten(writer);
468 /* }}} */
470 /* {{{ Connection::pieceReaderError(Tairon::Net::Reader *, Tairon::Net::Socket *) */
471 void Connection::pieceReaderError(Tairon::Net::Reader *, Tairon::Net::Socket *)
473 if (deleteReader)
474 delete reader;
476 closedSignal.emit(this);
478 /* }}} */
480 /* {{{ Connection::readerError(Tairon::Net::Reader *, Tairon::Net::Socket *) */
481 void Connection::readerError(Tairon::Net::Reader *, Tairon::Net::Socket *)
483 if (deleteReader)
484 delete reader;
485 closedSignal.emit(this);
487 /* }}} */
489 /* {{{ Connection::sendChoke() */
490 void Connection::sendChoke()
492 DEBUG("sending choke");
494 if (peerChoked)
495 return;
497 peerChoked = true;
498 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x00", 5), socket, wlimiter);
499 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
500 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
502 addCommandWriter(writer);
504 /* }}} */
506 /* {{{ Connection::sendHandshake(const String &) */
507 void Connection::sendHandshake(const String &infoHash)
509 char handshake[68];
510 handshake[0] = (char) 19;
511 memcpy(handshake + 1, protocolName, 19);
512 memset(handshake + 20, 0, 8);
513 memcpy(handshake + 28, infoHash.data(), 20);
514 memcpy(handshake + 48, TorrentManager::self()->getClientID().data(), 20);
516 String h(handshake, 68);
518 Tairon::Net::Writer *writer = new Tairon::Net::SWriter(h, socket, wlimiter);
519 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
520 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
522 addCommandWriter(writer);
524 if (client->getStorage()->hasSomething()) { // we have already downloaded some piece, send bitfield as well
525 DEBUG("sending bitfield");
526 // count length of the bitfield
527 size_t length = client->getStorage()->getBitField()->getLength();
528 length = length / 8 + (length / 8 ? 1 : 0);
530 char *bf = new char[5 + length];
532 uint32_t l = htonl(1 + length); // 1 byte message type, rest is bitfield
533 memcpy(bf, (const char *) (&l), 4);
534 bf[4] = 5; // bitfield message
535 memcpy(bf + 5, client->getStorage()->getBitField()->getData(), length);
537 String b(bf, 5 + length);
538 writer = new Tairon::Net::SWriter(b, socket, wlimiter);
539 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
540 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
542 addCommandWriter(writer);
545 /* }}} */
547 /* {{{ Connection::sendHave(uint32_t) */
548 void Connection::sendHave(uint32_t index)
550 DEBUG("sending have");
552 uint32_t i = htonl(index);
553 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String("\x00\x00\x00\x05\x04", 5) + String((const char *) (&i), 4), socket, wlimiter);
554 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
555 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
557 addCommandWriter(writer);
559 /* }}} */
561 /* {{{ Connection::sendInterested() */
562 void Connection::sendInterested()
564 DEBUG("sending interested");
566 if (interested)
567 return;
569 interested = true;
570 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x02", 5), socket, wlimiter);
571 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
572 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
574 addCommandWriter(writer);
576 /* }}} */
578 /* {{{ Connection::sendNotInterested() */
579 void Connection::sendNotInterested()
581 DEBUG("sending not interested");
583 if (!interested)
584 return;
586 interested = false;
587 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x03", 5), socket, wlimiter);
588 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
589 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
591 addCommandWriter(writer);
593 /* }}} */
595 /* {{{ Connection::sendRequest(uint32_t, uint32_t, uint32_t) */
596 void Connection::sendRequest(uint32_t index, uint32_t start, uint32_t length)
598 requests[index].insert(start);
600 char msg[17];
601 memcpy(msg, "\x00\x00\x00\x0d\x06", 5);
602 *(uint32_t *) (msg + 5) = htonl(index);
603 *(uint32_t *) (msg + 9) = htonl(start);
604 *(uint32_t *) (msg + 13) = htonl(length);
606 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String(msg, 17), socket, wlimiter);
607 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
608 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
610 addCommandWriter(writer);
612 /* }}} */
614 /* {{{ Connection::sendUnchoke() */
615 void Connection::sendUnchoke()
617 DEBUG("sending unchoke");
619 if (!peerChoked)
620 return;
622 peerChoked = false;
623 Tairon::Net::SWriter *writer = new Tairon::Net::SWriter(String("\x00\x00\x00\x01\x01", 5), socket, wlimiter);
624 writer->dataWrittenSignal.connect(Tairon::Core::methodFunctor(this, &Connection::dataWritten));
625 writer->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::writerError));
627 addCommandWriter(writer);
629 /* }}} */
631 /* {{{ Connection::setNextReader(Tairon::Net::Reader *) */
632 void Connection::setNextReader(Tairon::Net::Reader *)
634 delete pieceReaders->readers.front();
635 pieceReaders->readers.pop_front();
637 if (pieceReaders->readers.size()) {
638 DEBUG("Setting next piece reader");
639 reader = pieceReaders->readers.front();
640 } else {
641 DEBUG("Piece downloaded");
642 delete pieceReaders;
643 pieceReaders = 0;
644 reader = lengthReader;
645 deleteReader = false;
646 client->pieceDownloaded(pieceIndex, pieceStart, this);
649 /* }}} */
651 /* {{{ Connection::setReaders(PieceReadersStruct *) */
652 void Connection::setReaders(PieceReadersStruct *readers)
654 for (std::list<Tairon::Net::Reader *>::iterator it = readers->readers.begin(); it != readers->readers.end(); ++it) {
655 (*it)->bufferFullSignal.connect(Tairon::Core::methodFunctor(this, &Connection::setNextReader));
656 (*it)->errorSignal.connect(Tairon::Core::methodFunctor(this, &Connection::pieceReaderError));
658 pieceReaders = readers;
659 reader = pieceReaders->readers.front();
661 /* }}} */
663 /* {{{ Connection::socketError(Tairon::Net::Socket *, int) */
664 void Connection::socketError(Tairon::Net::Socket *, int)
666 delete reader;
667 socket->close();
668 closedSignal.emit(this);
670 /* }}} */
672 /* {{{ Connection::writerError(Tairon::Net::Writer *, Tairon::Net::Socket *) */
673 void Connection::writerError(Tairon::Net::Writer *, Tairon::Net::Socket *)
675 if (deleteReader)
676 delete reader;
677 closedSignal.emit(this);
679 /* }}} */
681 }; // namespace Main
683 }; // namespace Tairent
685 // vim: ai sw=4 ts=4 noet fdm=marker