1 /***************************************************************************
3 * Copyright (C) 2006 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/tinyxml.h>
24 #include <tairon/net/ireader.h>
25 #include <tairon/net/mreader.h>
26 #include <tairon/net/mwriter.h>
27 #include <tairon/net/swriter.h>
31 #include "core/bitfield.h"
32 #include "connection.h"
33 #include "hashchecker.h"
34 #include "torrentmanager.h"
36 #define min(x, y) (x) < (y) ? x : y
44 const uint32_t chunkLength
= 16384;
46 /* {{{ Storage::Storage(const Tairent::Core::BEncode &) */
47 Storage::Storage(const Tairent::Core::BEncode
&i
, TiXmlNode
*n
) : info(i
)
49 pieceCount
= info
["pieces"].asString().length() / 20;
50 pieceLength
= info
["piece length"].asValue();
55 availability
= new unsigned short[pieceCount
];
56 memset(availability
, 0, pieceCount
* sizeof(unsigned short));
58 if (n
) // load fast resume
60 else // we're starting from the beginning
61 bitfield
= new Tairent::Core::BitField(pieceCount
);
67 /* {{{ Storage::~Storage() */
71 delete [] availability
;
73 for (std::map
<uint32_t, RequestStruct
*>::const_iterator it
= requests
.begin(); it
!= requests
.end(); ++it
)
78 /* {{{ Storage::addBitField(Tairent::Core::BitField *) */
79 void Storage::addBitField(Tairent::Core::BitField
*b
)
81 for (size_t i
= 0; i
< pieceCount
; ++i
)
87 /* {{{ Storage::buildFilesList() */
88 void Storage::buildFilesList()
90 if (info
.asMap().count("length")) {
91 totalLength
= info
["length"].asValue();
92 FilePositionStruct fs
= {
93 info
["name"].asString(),
94 info
["length"].asValue(),
97 filesPositions
.push_back(fs
);
100 const Tairent::Core::BEncode::List
&f
= info
["files"].asList();
101 for (Tairent::Core::BEncode::List::const_iterator it
= f
.begin(); it
!= f
.end(); ++it
) {
102 FilePositionStruct fs
= {
103 listToString((*it
)["path"].asList()),
104 (*it
)["length"].asValue(),
107 filesPositions
.push_back(fs
);
108 totalLength
+= (*it
)["length"].asValue();
114 /* {{{ Storage::closeFile(const String &) */
115 void Storage::closeFile(const String
&filename
)
117 if (!files
.count(filename
))
118 return; // TODO: throw an exception
120 FileStruct
&fs
= files
[filename
];
124 files
.erase(filename
);
129 /* {{{ Storage::createFile(const String &, uint64_t) */
130 void Storage::createFile(const String
&name
, uint64_t length
)
132 // TODO: error checking
133 int fd
= open(name
, O_CREAT
| O_WRONLY
| O_LARGEFILE
, 0666);
134 ftruncate64(fd
, length
);
139 /* {{{ Storage::createFiles() */
140 void Storage::createFiles()
142 DEBUG("Creating files");
143 if (info
.asMap().count("length")) {
144 createFile(info
["name"].asString(), info
["length"].asValue());
145 // TODO: error checking
147 const Tairent::Core::BEncode::List
&f
= info
["files"].asList();
148 mkdir(info
["name"].asString(), 0777);
149 for (Tairent::Core::BEncode::List::const_iterator it
= f
.begin(); it
!= f
.end(); ++it
) {
150 String name
= info
["name"].asString();
151 const Tairent::Core::BEncode::List
&path
= (*it
)["path"].asList();
152 Tairent::Core::BEncode::List::const_iterator end
= --path
.end();
153 Tairent::Core::BEncode::List::const_iterator dir
;
154 for (dir
= path
.begin(); dir
!= end
; ++dir
) {
156 name
+= dir
->asString();
160 name
+= dir
->asString();
161 createFile(name
, (*it
)["length"].asValue());
167 /* {{{ Storage::getChunkLength(uint32_t, uint32_t) */
168 uint32_t Storage::getChunkLength(uint32_t index
, uint32_t start
)
170 return min(chunkLength
, getPieceLength(index
) - start
);
174 /* {{{ Storage::getPiece(uint32_t) */
175 PieceStruct
*Storage::getPiece(uint32_t index
)
177 if (pieces
.count(index
))
178 return pieces
[index
];
183 /* {{{ Storage::getPieceLength(uint32_t) */
184 uint32_t Storage::getPieceLength(uint32_t index
)
186 if (index
== (pieceCount
- 1)) // last piece
187 if (totalLength
% pieceLength
) // the last piece is shorter
188 return totalLength
% pieceLength
;
189 // else: last piece has the same length as any other
194 /* {{{ Storage::getRemainingSize() */
195 uint64_t Storage::getRemainingSize()
197 uint64_t ret
= bitfield
->getRemaining() * pieceLength
;
198 if (!bitfield
->getBit(pieceCount
- 1))
199 ret
- pieceLength
+ getPieceLength(pieceCount
- 1);
204 /* {{{ Storage::gotHave(uint32_t) */
205 bool Storage::gotHave(uint32_t index
)
207 availability
[index
] += 1;
208 if (bitfield
->getBit(index
)) // we already have this piece
214 /* {{{ Storage::gotPiece(uint32_t, uint32_t, Connection *) */
215 void Storage::gotPiece(uint32_t index
, uint32_t start
, Connection
*c
)
217 if ((index
>= pieceCount
) || (start
+ getChunkLength(index
, start
) > getPieceLength(index
)) || (c
->getPieceLength() != getChunkLength(index
, start
))) {
218 WARNING("Client sent an invalid piece");
219 invalidDataSignal
.emit(c
);
223 if (!requests
.count(index
) || !requests
[index
]->requested
.count(start
)) { // discarded
224 // create fake reader
225 PieceReadersStruct
*pr
= new PieceReadersStruct
;
228 pr
->readers
.push_back(new Tairon::Net::IReader(c
->getPieceLength(), c
->getSocket(), c
->getReadingLimiter()));
233 // We need to get files to the memory
236 PieceStruct
*ps
= pieces
[index
];
238 PieceReadersStruct
*pr
= new PieceReadersStruct
;
241 Tairon::Net::Reader
*reader
;
243 std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin();
244 uint32_t position
= start
;
246 // skip uninteresting files
247 while (it
->start
+ it
->length
< position
)
250 // Just iterate over the mapped files and create readers for them. The
251 // difficult part is counting the correct length of the allocated readers.
252 // A picture should make this clean:
255 // +-- ... --+------ ... -----------------------------------------+-- ...
257 // +---... --+------ ... -----------------------------------------+-- ...
259 // starting position of actual position start + length
260 // the file within a piece |<- length of the chunk if ->|
261 // | the file is shorter than |
262 // | what we need; otherwise we |
263 // | use the remaining count |
265 uint32_t remaining
= c
->getPieceLength();
267 uint32_t len
= min(it
->start
+ it
->length
- position
, remaining
);
268 reader
= new Tairon::Net::MReader(it
->mem
+ position
- it
->start
, len
, c
->getSocket(), c
->getReadingLimiter());
269 pr
->readers
.push_back(reader
);
279 /* {{{ Storage::gotRequest(uint32_t, uint32_t, uint32_t, Connection *) */
280 void Storage::gotRequest(uint32_t index
, uint32_t start
, uint32_t length
, Connection
*c
)
282 if ((index
>= pieceCount
) || (start
+ length
>= getPieceLength(index
))) {
283 WARNING("Client sent an invalid request");
284 invalidDataSignal
.emit(c
);
288 // map the piece to the memory
291 PieceStruct
*ps
= pieces
[index
];
293 PieceWritersStruct
*writers
= new PieceWritersStruct
;
294 writers
->index
= index
;
295 writers
->length
= length
;
296 Tairon::Net::Writer
*writer
;
298 std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin();
299 uint32_t position
= start
;
301 // skip uninteresting files
302 while (it
->start
+ it
->length
< position
)
305 // create writer for the message
306 char header
[13]; // 4 bytes msg length, 1 byte command and 2 x 4 bytes for index and start
307 uint32_t num
= htonl(length
+ 9);
308 memcpy(header
, (const char *) (&num
), 4);
309 header
[4] = 7; // piece type
311 memcpy(header
+ 5, (const char *) (&num
), 4);
313 memcpy(header
+ 9, (const char *) (&num
), 4);
314 writer
= new Tairon::Net::SWriter(String(header
, 13), c
->getSocket(), c
->getWritingLimiter());
315 writers
->writers
.push_back(writer
);
317 // create writers for the piece
318 uint32_t remaining
= length
;
320 uint32_t len
= min(it
->start
+ it
->length
- position
, remaining
);
321 writer
= new Tairon::Net::MWriter(it
->mem
+ position
- it
->start
, len
, c
->getSocket(), c
->getWritingLimiter());
322 writers
->writers
.push_back(writer
);
327 if ((it
== ps
->pieces
.end()) && remaining
) {
328 // we are at the end of the piece and there are still some bytes left?
329 // destroy created writers and unmap piece
330 for (std::list
<Tairon::Net::Writer
*>::iterator it
= writers
->writers
.begin(); it
!= writers
->writers
.end(); ++it
)
340 // send it to the peer
341 c
->setWriters(writers
);
345 /* {{{ Storage::hashCorrect(uint32_t, HashChecker *) */
346 void Storage::hashCorrect(uint32_t index
, HashChecker
*checker
)
348 hashCheckers
.erase(checker
);
350 bitfield
->setBit(index
);
351 pieceDownloadedSignal
.emit(index
);
356 /* {{{ Storage::hashIncorrect(uint32_t, HashChecker *) */
357 void Storage::hashIncorrect(uint32_t index
, HashChecker
*checker
)
359 scrambled
.push_back(index
);
360 DEBUG("hash incorrect");
361 hashCheckers
.erase(checker
);
367 /* {{{ Storage::hasSomething() */
368 bool Storage::hasSomething()
370 if (pieceCount
- bitfield
->getRemaining())
376 /* {{{ Storage::listToString(const Tairent::Core::BEncode::List &) */
377 String
Storage::listToString(const Tairent::Core::BEncode::List
&l
)
379 String ret
= info
["name"].asString();
380 for (Tairent::Core::BEncode::List::const_iterator it
= l
.begin(); it
!= l
.end(); ++it
) {
382 ret
+= it
->asString();
388 /* {{{ Storage::load(TiXmlNode *) */
389 void Storage::load(TiXmlNode
*n
)
391 TiXmlElement
*root
= n
->ToElement();
393 for (TiXmlNode
*node
= root
->FirstChild(); node
; node
= node
->NextSibling()) {
394 if (node
->Type() != TiXmlNode::ELEMENT
)
397 if (node
->ValueStr() == "bitfield")
399 else if (node
->ValueStr() == "queued")
405 /* {{{ Storage::loadBitField(TiXmlNode *) */
406 void Storage::loadBitField(TiXmlNode
*n
)
408 TiXmlElement
*element
= n
->ToElement();
410 const char *t
= element
->GetText();
412 WARNING("Cannot load bitfield");
416 bitfield
= new Tairent::Core::BitField(TorrentManager::hexToBin(t
), pieceCount
);
420 /* {{{ Storage::loadQueued(TiXmlNode *) */
421 void Storage::loadQueued(TiXmlNode
*n
)
423 TiXmlElement
*element
= n
->ToElement();
425 const char *i
= element
->Attribute("index");
427 WARNING("Missing index for queued pieces");
435 num
= strtoll(i
, 0, 10); // get the index
436 if (errno
) { // error while converting
437 WARNING("Error while converting piece index");
439 } else if ((num
< 0) || (num
>= pieceCount
)) { // out of range
440 WARNING((const char *) String("Invalid piece index: " + String::number(num
)));
443 index
= num
& 0xffffffff;
445 const char *t
= element
->GetText();
447 WARNING((const char *) String("No queued piece for index: " + String::number(index
)));
453 RequestStruct
*rs
= new RequestStruct
;
455 // go through the list
458 num
= strtoll(t
, &end
, 10);
461 WARNING((const char *) String("Cannot load queued index: " + String::number(index
)));
464 piece
= num
& 0xffffffff;
466 rs
->queued
.insert(piece
);
468 t
= end
+ 1; // skip ','
469 } while (!errno
&& *end
);
471 requests
[index
] = rs
;
475 /* {{{ Storage::mapPiece(uint32_t) */
476 void Storage::mapPiece(uint32_t index
)
478 // TODO: index checking
480 if (pieces
.count(index
)) {
481 pieces
[index
]->refCount
++;
485 uint64_t currentPos
= index
* pieceLength
;
486 uint64_t finalPos
= currentPos
+ getPieceLength(index
);
488 std::list
<FilePositionStruct
>::iterator it
= filesPositions
.begin();
490 // find the first file that contains our index
491 while ((it
->start
+ it
->length
) < currentPos
)
494 PieceStruct
*ps
= new PieceStruct
;
495 PieceStruct::FilePieceStruct fps
;
497 // Now begins the tricky part. We need few informations that will be used
498 // later for creating readers/writers. The first case is somewhat special,
499 // because a piece can start (and usually does) in the middle of a file.
500 // The second part just maps pieces at the beginning of the files, so we
501 // don't need to do the getpagesize() stuff.
503 int fd
= openFile(it
->name
);
504 fps
.filename
= &it
->name
;
506 // Length of the allocated block. Because mmapped block must start at a
507 // multiple of page size we need to add this size. This size will be passed
508 // to the unmap function.
509 // The first part computes the ramaining length of the file aligned to the
510 // page size. The second one just aligns length of a piece to the page
512 fps
.privLength
= min(it
->start
+ it
->length
- currentPos
+ ((currentPos
- it
->start
) % getpagesize()), pieceLength
+ (currentPos
- it
->start
) % getpagesize());
514 // Map the piece! Length of the piece was counted above. Offset is just a
515 // current position of the piece in bytes minus starting position of the
516 // file and it is aligned to the page size.
517 fps
.priv
= mmap64(0, fps
.privLength
, PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, currentPos
- it
->start
- (currentPos
- it
->start
) % getpagesize());
519 // Real starting position. We need to add previously subtracted page size
521 fps
.mem
= (char *) fps
.priv
+ (currentPos
- it
->start
) % getpagesize();
523 // The first piece always starts at position 0.
526 // Length of the piece is the length of the mapped block minus page size
528 fps
.length
= fps
.privLength
- (currentPos
- it
->start
) % getpagesize();
530 // And, at last, add this piece to the list.
531 ps
->pieces
.push_back(fps
);
533 // Map rest of the piece. We can reuse the fps struct because a copy of it
534 // is already in the list.
535 currentPos
+= fps
.length
;
537 while (currentPos
< finalPos
) {
539 int fd
= openFile(it
->name
);
540 fps
.filename
= &it
->name
;
541 fps
.start
+= fps
.length
;
542 fps
.length
= min(finalPos
- currentPos
, it
->length
);
543 fps
.privLength
= fps
.length
;
544 fps
.priv
= mmap64(0, fps
.length
, PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, 0);
545 fps
.mem
= (char *) fps
.priv
;
546 ps
->pieces
.push_back(fps
);
547 currentPos
+= fps
.length
;
556 /* {{{ Storage::openFile(const String &) */
557 int Storage::openFile(const String
&filename
)
559 if (files
.count(filename
)) {
560 FileStruct
&fs
= files
[filename
];
565 open(filename
, O_CREAT
| O_RDWR
| O_LARGEFILE
, 0666),
568 // TODO: error checking
569 files
[filename
] = fs
;
574 /* {{{ Storage::pickPiece(Tairent::Core::BitField *, uint32_t &, uint32_t &, uint32_t &) */
575 bool Storage::pickPiece(Tairent::Core::BitField
*b
, uint32_t &index
, uint32_t &start
, uint32_t &length
)
577 // first try to find a piece in already requested ones.
578 for (std::map
<uint32_t, RequestStruct
*>::iterator it
= requests
.begin(); it
!= requests
.end(); ++it
)
579 if (b
->getBit(it
->first
)) {
580 if (!it
->second
->queued
.size())
582 start
= *(it
->second
->queued
.begin());
583 it
->second
->queued
.erase(start
);
584 it
->second
->requested
.insert(start
);
585 length
= getChunkLength(it
->first
, start
);
590 for (std::list
<uint32_t>::iterator it
= scrambled
.begin(); it
!= scrambled
.end(); ++it
)
591 if (b
->getBit(*it
) && !requests
.count(*it
)) {
592 RequestStruct
*rs
= new RequestStruct
;
594 uint32_t pieceLength
= getPieceLength(*it
);
595 uint32_t st
= min(chunkLength
, pieceLength
);
598 while (st
< pieceLength
) {
599 len
= min(chunkLength
, pieceLength
- st
);
600 rs
->queued
.insert(st
);
606 length
= getChunkLength(*it
, 0);
608 rs
->requested
.insert(0);
609 requests
[index
] = rs
;
618 /* {{{ Storage::pieceDownloaded(uint32_t, uint32_t) */
619 void Storage::pieceDownloaded(uint32_t index
, uint32_t start
)
621 if (!requests
.count(index
))
622 return; // TODO: discarded bytes
624 RequestStruct
*rs
= requests
[index
];
625 if (!rs
->requested
.count(start
))
626 return; // TODO: discarded bytes
628 rs
->requested
.erase(start
);
630 if (!rs
->requested
.size() && !rs
->queued
.size()) {
632 requests
.erase(index
);
633 scrambled
.remove(index
); // don't download this piece
635 HashChecker
*checker
= new HashChecker(index
, this);
636 hashCheckers
.insert(checker
);
637 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
638 checker
->hashCorrectSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Storage::hashCorrect
));
639 checker
->hashIncorrectSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Storage::hashIncorrect
));
646 /* {{{ Storage::pieceSent(uint32_t) */
647 void Storage::pieceSent(uint32_t index
)
653 /* {{{ Storage::removeBitField(Tairent::Core::BitField *) */
654 void Storage::removeBitField(Tairent::Core::BitField
*b
)
656 if (!b
) // We can get this from a connection that hasn't been fully established
659 for (size_t i
= 0; i
< pieceCount
; ++i
)
661 availability
[i
] -= 1;
665 /* {{{ Storage::reRequest(const std::map<uint32_t, std::set<uint32_t> > &) */
666 void Storage::reRequest(const std::map
<uint32_t, std::set
<uint32_t> > &r
)
668 for (std::map
<uint32_t, std::set
<uint32_t> >::const_iterator it1
= r
.begin(); it1
!= r
.end(); ++it1
) {
671 if (!requests
.count(it1
->first
)) {
672 rs
= new RequestStruct
;
673 requests
[it1
->first
] = rs
;
675 rs
= requests
[it1
->first
];
677 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
.begin(); it2
!= it1
->second
.end(); ++it2
) {
678 rs
->queued
.insert(*it2
);
679 rs
->requested
.erase(*it2
);
685 /* {{{ Storage::save() */
686 TiXmlElement
*Storage::save()
688 TiXmlElement
*element
= new TiXmlElement("storage");
691 TiXmlElement
*child
= new TiXmlElement("bitfield");
692 size_t bitFieldLength
= bitfield
->getLength();
693 child
->LinkEndChild(new TiXmlText(TorrentManager::binToHex(String(bitfield
->getData(), bitFieldLength
/ 8 + (bitFieldLength
% 8 ? 1 : 0)))));
694 element
->LinkEndChild(child
);
696 // store list of parts that we don't have
697 for (std::map
<uint32_t, RequestStruct
*>::const_iterator it1
= requests
.begin(); it1
!= requests
.end(); ++it1
) {
700 // store parts that haven't been requested
701 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
->queued
.begin(); it2
!= it1
->second
->queued
.end(); ++it2
)
702 queued
+= String::number(*it2
) + ',';
704 // store parts that have been requested but we don't have them yet
705 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
->requested
.begin(); it2
!= it1
->second
->requested
.end(); ++it2
)
706 queued
+= String::number(*it2
) + ',';
708 // remove trailing ','
709 queued
.resize(queued
.size() - 1);
711 child
= new TiXmlElement("queued");
712 child
->SetAttribute(String("index"), String::number(it1
->first
));
713 child
->LinkEndChild(new TiXmlText(queued
));
714 element
->LinkEndChild(child
);
721 /* {{{ Storage::scramble() */
722 void Storage::scramble()
724 uint32_t *buf
= new uint32_t[pieceCount
];
727 for (uint32_t i
= 0; i
< pieceCount
; ++i
)
728 if (!bitfield
->getBit(i
)) // we don't have this piece yet, add it to the list
731 for (uint32_t i = 0; i < pieceCount; ++i) {
732 uint32_t orig = buf[i];
733 uint32_t pos = random() % pieceCount;
739 for (uint32_t i
= 0; i
< pos
; ++i
)
740 scrambled
.push_back(buf
[i
]);
746 /* {{{ Storage::shouldBeInterested(Tairent::Core::BitField *) */
747 bool Storage::shouldBeInterested(Tairent::Core::BitField
*b
)
749 for (size_t i
= 0; i
< pieceCount
; ++i
)
750 if (b
->getBit(i
) && !bitfield
->getBit(i
))
756 /* {{{ Storage::unmapPiece(uint32_t) */
757 void Storage::unmapPiece(uint32_t index
)
759 // check if it is mapped
760 if (!pieces
.count(index
)) {
761 ERROR((const char *) String("Unmapping piece that isn't mapped: " + String::number(index
)));
765 PieceStruct
*ps
= pieces
[index
];
769 if (ps
->refCount
) // don't unmap piece that is needed
772 for (std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin(); it
!= ps
->pieces
.end(); ++it
) {
773 munmap(it
->priv
, it
->privLength
);
774 closeFile(*it
->filename
);
782 /* {{{ Storage::unmapPieces(PieceReadersStruct *) */
783 void Storage::unmapPieces(PieceReadersStruct
*readers
)
787 if (readers
->fake
) // this piece hasn't been mapped
789 unmapPiece(readers
->index
);
793 /* {{{ Storage::unmapPieces(PieceWritersStruct *) */
794 void Storage::unmapPieces(PieceWritersStruct
*writers
)
798 unmapPiece(writers
->index
);
804 }; // namespace Tairent
806 // vim: ai sw=4 ts=4 noet fdm=marker