1 /***************************************************************************
3 * Copyright (C) 2006 David Brodsky *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License as *
7 * published by the Free Software Foundation and appearing *
8 * in the file LICENSE.GPL included in the packaging of this file. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
13 * General Public License for more details. *
15 ***************************************************************************/
17 #include <arpa/inet.h>
22 #include <tairon/core/log.h>
23 #include <tairon/core/tinyxml.h>
24 #include <tairon/net/ireader.h>
25 #include <tairon/net/mreader.h>
26 #include <tairon/net/mwriter.h>
27 #include <tairon/net/swriter.h>
31 #include "core/bitfield.h"
32 #include "connection.h"
33 #include "hashchecker.h"
34 #include "torrentmanager.h"
36 #define min(x, y) (x) < (y) ? x : y
44 const uint32_t chunkLength
= 16384;
46 /* {{{ Storage::Storage(const Tairent::Core::BEncode &) */
47 Storage::Storage(const Tairent::Core::BEncode
&i
, TiXmlNode
*n
) : info(i
)
49 pieceCount
= info
["pieces"].asString().length() / 20;
50 pieceLength
= info
["piece length"].asValue();
55 availability
= new unsigned short[pieceCount
];
56 memset(availability
, 0, pieceCount
* sizeof(unsigned short));
58 if (n
) // load fast resume
60 else // we're starting from the beginning
61 bitfield
= new Tairent::Core::BitField(pieceCount
);
67 /* {{{ Storage::~Storage() */
71 delete [] availability
;
73 for (std::map
<uint32_t, RequestStruct
*>::const_iterator it
= requests
.begin(); it
!= requests
.end(); ++it
)
78 /* {{{ Storage::addBitField(Tairent::Core::BitField *) */
79 void Storage::addBitField(Tairent::Core::BitField
*b
)
81 for (size_t i
= 0; i
< pieceCount
; ++i
)
87 /* {{{ Storage::buildFilesList() */
88 void Storage::buildFilesList()
90 if (info
.asMap().count("length")) {
91 totalLength
= info
["length"].asValue();
92 FilePositionStruct fs
= {
93 info
["name"].asString(),
94 info
["length"].asValue(),
97 filesPositions
.push_back(fs
);
100 const Tairent::Core::BEncode::List
&f
= info
["files"].asList();
101 for (Tairent::Core::BEncode::List::const_iterator it
= f
.begin(); it
!= f
.end(); ++it
) {
102 FilePositionStruct fs
= {
103 listToString((*it
)["path"].asList()),
104 (*it
)["length"].asValue(),
107 filesPositions
.push_back(fs
);
108 totalLength
+= (*it
)["length"].asValue();
114 /* {{{ Storage::closeFile(const String &) */
115 void Storage::closeFile(const String
&filename
)
117 if (!files
.count(filename
))
118 return; // TODO: throw an exception
120 FileStruct
&fs
= files
[filename
];
124 files
.erase(filename
);
129 /* {{{ Storage::complete() */
130 void Storage::complete()
136 /* {{{ Storage::createFile(const String &, uint64_t) */
137 void Storage::createFile(const String
&name
, uint64_t length
)
139 // TODO: error checking
140 int fd
= open(name
, O_CREAT
| O_WRONLY
| O_LARGEFILE
, 0666);
141 ftruncate64(fd
, length
);
146 /* {{{ Storage::createFiles() */
147 void Storage::createFiles()
149 DEBUG("Creating files");
150 if (info
.asMap().count("length")) {
151 createFile(info
["name"].asString(), info
["length"].asValue());
152 // TODO: error checking
154 const Tairent::Core::BEncode::List
&f
= info
["files"].asList();
155 mkdir(info
["name"].asString(), 0777);
156 for (Tairent::Core::BEncode::List::const_iterator it
= f
.begin(); it
!= f
.end(); ++it
) {
157 String name
= info
["name"].asString();
158 const Tairent::Core::BEncode::List
&path
= (*it
)["path"].asList();
159 Tairent::Core::BEncode::List::const_iterator end
= --path
.end();
160 Tairent::Core::BEncode::List::const_iterator dir
;
161 for (dir
= path
.begin(); dir
!= end
; ++dir
) {
163 name
+= dir
->asString();
167 name
+= dir
->asString();
168 createFile(name
, (*it
)["length"].asValue());
174 /* {{{ Storage::getChunkLength(uint32_t, uint32_t) */
175 uint32_t Storage::getChunkLength(uint32_t index
, uint32_t start
)
177 return min(chunkLength
, getPieceLength(index
) - start
);
181 /* {{{ Storage::getPiece(uint32_t) */
182 PieceStruct
*Storage::getPiece(uint32_t index
)
184 if (pieces
.count(index
))
185 return pieces
[index
];
190 /* {{{ Storage::getPieceLength(uint32_t) */
191 uint32_t Storage::getPieceLength(uint32_t index
)
193 if (index
== (pieceCount
- 1)) // last piece
194 if (totalLength
% pieceLength
) // the last piece is shorter
195 return totalLength
% pieceLength
;
196 // else: last piece has the same length as any other
201 /* {{{ Storage::getRemainingSize() */
202 uint64_t Storage::getRemainingSize()
204 uint64_t ret
= bitfield
->getRemaining() * pieceLength
;
205 if (!bitfield
->getBit(pieceCount
- 1))
206 ret
-= pieceLength
- getPieceLength(pieceCount
- 1);
211 /* {{{ Storage::gotHave(uint32_t) */
212 bool Storage::gotHave(uint32_t index
)
214 availability
[index
] += 1;
215 if (bitfield
->getBit(index
)) // we already have this piece
221 /* {{{ Storage::gotPiece(uint32_t, uint32_t, Connection *) */
222 void Storage::gotPiece(uint32_t index
, uint32_t start
, Connection
*c
)
224 if ((index
>= pieceCount
) || (start
+ getChunkLength(index
, start
) > getPieceLength(index
)) || (c
->getPieceLength() != getChunkLength(index
, start
))) {
225 WARNING("Client sent an invalid piece");
226 invalidDataSignal
.emit(c
);
230 if (!requests
.count(index
) || !requests
[index
]->requested
.count(start
)) { // discarded
231 // create fake reader
232 PieceReadersStruct
*pr
= new PieceReadersStruct
;
235 pr
->length
= getChunkLength(index
, start
);
236 pr
->readers
.push_back(new Tairon::Net::IReader(c
->getPieceLength(), c
->getSocket(), c
->getReadingLimiter()));
241 // We need to get files to the memory
244 PieceStruct
*ps
= pieces
[index
];
246 PieceReadersStruct
*pr
= new PieceReadersStruct
;
249 pr
->length
= getChunkLength(index
, start
);
250 Tairon::Net::Reader
*reader
;
252 std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin();
253 uint32_t position
= start
;
255 // skip uninteresting files
256 while (it
->start
+ it
->length
< position
)
259 // Just iterate over the mapped files and create readers for them. The
260 // difficult part is counting the correct length of the allocated readers.
261 // A picture should make this clean:
264 // +-- ... --+------ ... -----------------------------------------+-- ...
266 // +---... --+------ ... -----------------------------------------+-- ...
268 // starting position of actual position start + length
269 // the file within a piece |<- length of the chunk if ->|
270 // | the file is shorter than |
271 // | what we need; otherwise we |
272 // | use the remaining count |
274 uint32_t remaining
= c
->getPieceLength();
276 uint32_t len
= min(it
->start
+ it
->length
- position
, remaining
);
277 reader
= new Tairon::Net::MReader(it
->mem
+ position
- it
->start
, len
, c
->getSocket(), c
->getReadingLimiter());
278 pr
->readers
.push_back(reader
);
288 /* {{{ Storage::gotRequest(uint32_t, uint32_t, uint32_t, Connection *) */
289 void Storage::gotRequest(uint32_t index
, uint32_t start
, uint32_t length
, Connection
*c
)
291 if ((index
>= pieceCount
) || (start
+ length
> getPieceLength(index
))) {
292 WARNING("Client sent an invalid request");
293 invalidDataSignal
.emit(c
);
297 // map the piece to the memory
300 PieceStruct
*ps
= pieces
[index
];
302 PieceWritersStruct
*writers
= new PieceWritersStruct
;
303 writers
->index
= index
;
304 writers
->length
= length
;
305 Tairon::Net::Writer
*writer
;
307 std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin();
308 uint32_t position
= start
;
310 // skip uninteresting files
311 while (it
->start
+ it
->length
< position
)
314 // create writer for the message
315 char header
[13]; // 4 bytes msg length, 1 byte command and 2 x 4 bytes for index and start
316 uint32_t num
= htonl(length
+ 9);
317 memcpy(header
, (const char *) (&num
), 4);
318 header
[4] = 7; // piece type
320 memcpy(header
+ 5, (const char *) (&num
), 4);
322 memcpy(header
+ 9, (const char *) (&num
), 4);
323 writer
= new Tairon::Net::SWriter(String(header
, 13), c
->getSocket(), c
->getWritingLimiter());
324 writers
->writers
.push_back(writer
);
326 // create writers for the piece
327 uint32_t remaining
= length
;
329 uint32_t len
= min(it
->start
+ it
->length
- position
, remaining
);
330 writer
= new Tairon::Net::MWriter(it
->mem
+ position
- it
->start
, len
, c
->getSocket(), c
->getWritingLimiter());
331 writers
->writers
.push_back(writer
);
336 if ((it
== ps
->pieces
.end()) && remaining
) {
337 // we are at the end of the piece and there are still some bytes left?
338 // destroy created writers and unmap piece
339 for (std::list
<Tairon::Net::Writer
*>::iterator it
= writers
->writers
.begin(); it
!= writers
->writers
.end(); ++it
)
349 // send it to the peer
350 c
->setWriters(writers
);
354 /* {{{ Storage::hashCorrect(uint32_t, HashChecker *) */
355 void Storage::hashCorrect(uint32_t index
, HashChecker
*checker
)
357 hashCheckers
.erase(checker
);
359 bitfield
->setBit(index
);
360 pieceDownloadedSignal
.emit(index
);
365 /* {{{ Storage::hashIncorrect(uint32_t, HashChecker *) */
366 void Storage::hashIncorrect(uint32_t index
, HashChecker
*checker
)
368 scrambled
.push_back(index
);
369 DEBUG("hash incorrect");
370 hashCheckers
.erase(checker
);
376 /* {{{ Storage::hasSomething() */
377 bool Storage::hasSomething()
379 if (pieceCount
- bitfield
->getRemaining())
385 /* {{{ Storage::isComplete() */
386 bool Storage::isComplete()
388 return !bitfield
->getRemaining();
392 /* {{{ Storage::listToString(const Tairent::Core::BEncode::List &) */
393 String
Storage::listToString(const Tairent::Core::BEncode::List
&l
)
395 String ret
= info
["name"].asString();
396 for (Tairent::Core::BEncode::List::const_iterator it
= l
.begin(); it
!= l
.end(); ++it
) {
398 ret
+= it
->asString();
404 /* {{{ Storage::load(TiXmlNode *) */
405 void Storage::load(TiXmlNode
*n
)
407 TiXmlElement
*root
= n
->ToElement();
409 for (TiXmlNode
*node
= root
->FirstChild(); node
; node
= node
->NextSibling()) {
410 if (node
->Type() != TiXmlNode::ELEMENT
)
413 if (node
->ValueStr() == "bitfield")
415 else if (node
->ValueStr() == "queued")
421 /* {{{ Storage::loadBitField(TiXmlNode *) */
422 void Storage::loadBitField(TiXmlNode
*n
)
424 TiXmlElement
*element
= n
->ToElement();
426 const char *t
= element
->GetText();
428 WARNING("Cannot load bitfield");
432 bitfield
= new Tairent::Core::BitField(TorrentManager::hexToBin(t
), pieceCount
);
436 /* {{{ Storage::loadQueued(TiXmlNode *) */
437 void Storage::loadQueued(TiXmlNode
*n
)
439 TiXmlElement
*element
= n
->ToElement();
441 const char *i
= element
->Attribute("index");
443 WARNING("Missing index for queued pieces");
451 num
= strtoll(i
, 0, 10); // get the index
452 if (errno
) { // error while converting
453 WARNING("Error while converting piece index");
455 } else if ((num
< 0) || (num
>= pieceCount
)) { // out of range
456 WARNING((const char *) String("Invalid piece index: " + String::number(num
)));
459 index
= num
& 0xffffffff;
461 const char *t
= element
->GetText();
463 WARNING((const char *) String("No queued piece for index: " + String::number(index
)));
469 RequestStruct
*rs
= new RequestStruct
;
471 // go through the list
474 num
= strtoll(t
, &end
, 10);
477 WARNING((const char *) String("Cannot load queued index: " + String::number(index
)));
480 piece
= num
& 0xffffffff;
482 rs
->queued
.insert(piece
);
484 t
= end
+ 1; // skip ','
485 } while (!errno
&& *end
);
487 requests
[index
] = rs
;
491 /* {{{ Storage::mapPiece(uint32_t) */
492 void Storage::mapPiece(uint32_t index
)
494 // TODO: index checking
496 if (pieces
.count(index
)) {
497 pieces
[index
]->refCount
++;
501 uint64_t currentPos
= index
* pieceLength
;
502 uint64_t finalPos
= currentPos
+ getPieceLength(index
);
504 std::list
<FilePositionStruct
>::iterator it
= filesPositions
.begin();
506 // find the first file that contains our index
507 while ((it
->start
+ it
->length
) < currentPos
)
510 PieceStruct
*ps
= new PieceStruct
;
511 PieceStruct::FilePieceStruct fps
;
513 // Now begins the tricky part. We need few informations that will be used
514 // later for creating readers/writers. The first case is somewhat special,
515 // because a piece can start (and usually does) in the middle of a file.
516 // The second part just maps pieces at the beginning of the files, so we
517 // don't need to do the getpagesize() stuff.
519 int fd
= openFile(it
->name
);
520 fps
.filename
= &it
->name
;
522 // Length of the allocated block. Because mmapped block must start at a
523 // multiple of page size we need to add this size. This size will be passed
524 // to the unmap function.
525 // The first part computes the ramaining length of the file aligned to the
526 // page size. The second one just aligns length of a piece to the page
528 fps
.privLength
= min(it
->start
+ it
->length
- currentPos
+ ((currentPos
- it
->start
) % getpagesize()), pieceLength
+ (currentPos
- it
->start
) % getpagesize());
530 // Map the piece! Length of the piece was counted above. Offset is just a
531 // current position of the piece in bytes minus starting position of the
532 // file and it is aligned to the page size.
533 fps
.priv
= mmap64(0, fps
.privLength
, PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, currentPos
- it
->start
- (currentPos
- it
->start
) % getpagesize());
535 // Real starting position. We need to add previously subtracted page size
537 fps
.mem
= (char *) fps
.priv
+ (currentPos
- it
->start
) % getpagesize();
539 // The first piece always starts at position 0.
542 // Length of the piece is the length of the mapped block minus page size
544 fps
.length
= fps
.privLength
- (currentPos
- it
->start
) % getpagesize();
546 // And, at last, add this piece to the list.
547 ps
->pieces
.push_back(fps
);
549 // Map rest of the piece. We can reuse the fps struct because a copy of it
550 // is already in the list.
551 currentPos
+= fps
.length
;
553 while (currentPos
< finalPos
) {
555 int fd
= openFile(it
->name
);
556 fps
.filename
= &it
->name
;
557 fps
.start
+= fps
.length
;
558 fps
.length
= min(finalPos
- currentPos
, it
->length
);
559 fps
.privLength
= fps
.length
;
560 fps
.priv
= mmap64(0, fps
.length
, PROT_READ
| PROT_WRITE
, MAP_SHARED
, fd
, 0);
561 fps
.mem
= (char *) fps
.priv
;
562 ps
->pieces
.push_back(fps
);
563 currentPos
+= fps
.length
;
572 /* {{{ Storage::openFile(const String &) */
573 int Storage::openFile(const String
&filename
)
575 if (files
.count(filename
)) {
576 FileStruct
&fs
= files
[filename
];
581 open(filename
, O_CREAT
| O_RDWR
| O_LARGEFILE
, 0666),
584 // TODO: error checking
585 files
[filename
] = fs
;
590 /* {{{ Storage::pickPiece(Tairent::Core::BitField *, uint32_t &, uint32_t &, uint32_t &) */
591 bool Storage::pickPiece(Tairent::Core::BitField
*b
, uint32_t &index
, uint32_t &start
, uint32_t &length
)
593 // first try to find a piece in already requested ones.
594 for (std::map
<uint32_t, RequestStruct
*>::iterator it
= requests
.begin(); it
!= requests
.end(); ++it
)
595 if (b
->getBit(it
->first
)) {
596 if (!it
->second
->queued
.size())
598 start
= *(it
->second
->queued
.begin());
599 it
->second
->queued
.erase(start
);
600 it
->second
->requested
.insert(start
);
601 length
= getChunkLength(it
->first
, start
);
606 for (std::list
<uint32_t>::iterator it
= scrambled
.begin(); it
!= scrambled
.end(); ++it
)
607 if (b
->getBit(*it
) && !requests
.count(*it
)) {
608 RequestStruct
*rs
= new RequestStruct
;
610 uint32_t pieceLength
= getPieceLength(*it
);
611 uint32_t st
= min(chunkLength
, pieceLength
);
614 while (st
< pieceLength
) {
615 len
= min(chunkLength
, pieceLength
- st
);
616 rs
->queued
.insert(st
);
622 length
= getChunkLength(*it
, 0);
624 rs
->requested
.insert(0);
625 requests
[index
] = rs
;
634 /* {{{ Storage::pieceDownloaded(uint32_t, uint32_t) */
635 void Storage::pieceDownloaded(uint32_t index
, uint32_t start
)
637 if (!requests
.count(index
))
638 return; // TODO: discarded bytes
640 RequestStruct
*rs
= requests
[index
];
641 if (!rs
->requested
.count(start
))
642 return; // TODO: discarded bytes
644 rs
->requested
.erase(start
);
646 if (!rs
->requested
.size() && !rs
->queued
.size()) {
648 requests
.erase(index
);
649 scrambled
.remove(index
); // don't download this piece
651 HashChecker
*checker
= new HashChecker(index
, this);
652 hashCheckers
.insert(checker
);
653 Tairon::Core::Thread
*current
= Tairon::Core::Thread::current();
654 checker
->hashCorrectSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Storage::hashCorrect
));
655 checker
->hashIncorrectSignal
.connect(Tairon::Core::threadMethodDFunctor(current
, this, &Storage::hashIncorrect
));
662 /* {{{ Storage::pieceSent(uint32_t) */
663 void Storage::pieceSent(uint32_t index
)
669 /* {{{ Storage::removeBitField(Tairent::Core::BitField *) */
670 void Storage::removeBitField(Tairent::Core::BitField
*b
)
672 if (!b
) // We can get this from a connection that hasn't been fully established
675 for (size_t i
= 0; i
< pieceCount
; ++i
)
677 availability
[i
] -= 1;
681 /* {{{ Storage::reRequest(const std::map<uint32_t, std::set<uint32_t> > &) */
682 void Storage::reRequest(const std::map
<uint32_t, std::set
<uint32_t> > &r
)
684 for (std::map
<uint32_t, std::set
<uint32_t> >::const_iterator it1
= r
.begin(); it1
!= r
.end(); ++it1
) {
687 if (!requests
.count(it1
->first
)) {
688 rs
= new RequestStruct
;
689 requests
[it1
->first
] = rs
;
691 rs
= requests
[it1
->first
];
693 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
.begin(); it2
!= it1
->second
.end(); ++it2
) {
694 rs
->queued
.insert(*it2
);
695 rs
->requested
.erase(*it2
);
701 /* {{{ Storage::save() */
702 TiXmlElement
*Storage::save()
704 TiXmlElement
*element
= new TiXmlElement("storage");
707 TiXmlElement
*child
= new TiXmlElement("bitfield");
708 size_t bitFieldLength
= bitfield
->getLength();
709 child
->LinkEndChild(new TiXmlText(TorrentManager::binToHex(String(bitfield
->getData(), bitFieldLength
/ 8 + (bitFieldLength
% 8 ? 1 : 0)))));
710 element
->LinkEndChild(child
);
712 // store list of parts that we don't have
713 for (std::map
<uint32_t, RequestStruct
*>::const_iterator it1
= requests
.begin(); it1
!= requests
.end(); ++it1
) {
716 // store parts that haven't been requested
717 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
->queued
.begin(); it2
!= it1
->second
->queued
.end(); ++it2
)
718 queued
+= String::number(*it2
) + ',';
720 // store parts that have been requested but we don't have them yet
721 for (std::set
<uint32_t>::const_iterator it2
= it1
->second
->requested
.begin(); it2
!= it1
->second
->requested
.end(); ++it2
)
722 queued
+= String::number(*it2
) + ',';
724 // remove trailing ','
725 queued
.resize(queued
.size() - 1);
727 child
= new TiXmlElement("queued");
728 child
->SetAttribute(String("index"), String::number(it1
->first
));
729 child
->LinkEndChild(new TiXmlText(queued
));
730 element
->LinkEndChild(child
);
737 /* {{{ Storage::scramble() */
738 void Storage::scramble()
740 uint32_t *buf
= new uint32_t[pieceCount
];
743 for (uint32_t i
= 0; i
< pieceCount
; ++i
)
744 if (!bitfield
->getBit(i
)) // we don't have this piece yet, add it to the list
747 for (uint32_t i = 0; i < pieceCount; ++i) {
748 uint32_t orig = buf[i];
749 uint32_t pos = random() % pieceCount;
755 for (uint32_t i
= 0; i
< pos
; ++i
)
756 scrambled
.push_back(buf
[i
]);
762 /* {{{ Storage::shouldBeInterested(Tairent::Core::BitField *) */
763 bool Storage::shouldBeInterested(Tairent::Core::BitField
*b
)
765 for (size_t i
= 0; i
< pieceCount
; ++i
)
766 if (b
->getBit(i
) && !bitfield
->getBit(i
))
772 /* {{{ Storage::unmapPiece(uint32_t) */
773 void Storage::unmapPiece(uint32_t index
)
775 // check if it is mapped
776 if (!pieces
.count(index
)) {
777 ERROR((const char *) String("Unmapping piece that isn't mapped: " + String::number(index
)));
781 PieceStruct
*ps
= pieces
[index
];
785 if (ps
->refCount
) // don't unmap piece that is needed
788 for (std::list
<PieceStruct::FilePieceStruct
>::iterator it
= ps
->pieces
.begin(); it
!= ps
->pieces
.end(); ++it
) {
789 munmap(it
->priv
, it
->privLength
);
790 closeFile(*it
->filename
);
798 /* {{{ Storage::unmapPieces(PieceReadersStruct *) */
799 void Storage::unmapPieces(PieceReadersStruct
*readers
)
803 if (readers
->fake
) // this piece hasn't been mapped
805 unmapPiece(readers
->index
);
809 /* {{{ Storage::unmapPieces(PieceWritersStruct *) */
810 void Storage::unmapPieces(PieceWritersStruct
*writers
)
814 unmapPiece(writers
->index
);
820 }; // namespace Tairent
822 // vim: ai sw=4 ts=4 noet fdm=marker