delete_files bug fix
[libtorrent.git] / src / disk_io_thread.cpp
blobf091c3db8bf8333e962d75641ac8d61357fb3c69
1 /*
3 Copyright (c) 2007, Arvid Norberg
4 All rights reserved.
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
8 are met:
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/storage.hpp"
34 #include <deque>
35 #include "libtorrent/disk_io_thread.hpp"
36 #include "libtorrent/disk_buffer_holder.hpp"
37 #include <boost/scoped_array.hpp>
39 #ifdef _WIN32
40 #include <malloc.h>
41 #define alloca(s) _alloca(s)
42 #endif
44 #ifdef TORRENT_DISK_STATS
45 #include "libtorrent/time.hpp"
46 #endif
48 namespace libtorrent
51 disk_io_thread::disk_io_thread(asio::io_service& ios, int block_size)
52 : m_abort(false)
53 , m_queue_buffer_size(0)
54 , m_cache_size(512) // 512 * 16kB = 8MB
55 , m_cache_expiry(60) // 1 minute
56 , m_coalesce_writes(true)
57 , m_coalesce_reads(true)
58 , m_use_read_cache(true)
59 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
60 , m_pool(block_size)
61 #endif
62 , m_block_size(block_size)
63 , m_ios(ios)
64 , m_disk_io_thread(boost::ref(*this))
66 #ifdef TORRENT_STATS
67 m_allocations = 0;
68 #endif
69 #ifdef TORRENT_DISK_STATS
70 m_log.open("disk_io_thread.log", std::ios::trunc);
71 #endif
74 disk_io_thread::~disk_io_thread()
76 TORRENT_ASSERT(m_abort == true);
79 void disk_io_thread::join()
81 mutex_t::scoped_lock l(m_queue_mutex);
82 disk_io_job j;
83 j.action = disk_io_job::abort_thread;
84 m_jobs.insert(m_jobs.begin(), j);
85 m_signal.notify_all();
86 l.unlock();
88 m_disk_io_thread.join();
91 void disk_io_thread::get_cache_info(sha1_hash const& ih, std::vector<cached_piece_info>& ret) const
93 mutex_t::scoped_lock l(m_piece_mutex);
94 ret.clear();
95 ret.reserve(m_pieces.size());
96 for (cache_t::const_iterator i = m_pieces.begin()
97 , end(m_pieces.end()); i != end; ++i)
99 torrent_info const& ti = *i->storage->info();
100 if (ti.info_hash() != ih) continue;
101 cached_piece_info info;
102 info.piece = i->piece;
103 info.last_use = i->last_use;
104 info.kind = cached_piece_info::write_cache;
105 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
106 info.blocks.resize(blocks_in_piece);
107 for (int b = 0; b < blocks_in_piece; ++b)
108 if (i->blocks[b]) info.blocks[b] = true;
109 ret.push_back(info);
111 for (cache_t::const_iterator i = m_read_pieces.begin()
112 , end(m_read_pieces.end()); i != end; ++i)
114 torrent_info const& ti = *i->storage->info();
115 if (ti.info_hash() != ih) continue;
116 cached_piece_info info;
117 info.piece = i->piece;
118 info.last_use = i->last_use;
119 info.kind = cached_piece_info::read_cache;
120 int blocks_in_piece = (ti.piece_size(i->piece) + (m_block_size) - 1) / m_block_size;
121 info.blocks.resize(blocks_in_piece);
122 for (int b = 0; b < blocks_in_piece; ++b)
123 if (i->blocks[b]) info.blocks[b] = true;
124 ret.push_back(info);
128 cache_status disk_io_thread::status() const
130 mutex_t::scoped_lock l(m_piece_mutex);
131 return m_cache_stats;
134 void disk_io_thread::set_cache_size(int s)
136 mutex_t::scoped_lock l(m_piece_mutex);
137 TORRENT_ASSERT(s >= 0);
138 m_cache_size = s;
141 void disk_io_thread::set_cache_expiry(int ex)
143 mutex_t::scoped_lock l(m_piece_mutex);
144 TORRENT_ASSERT(ex > 0);
145 m_cache_expiry = ex;
148 // aborts read operations
149 void disk_io_thread::stop(boost::intrusive_ptr<piece_manager> s)
151 mutex_t::scoped_lock l(m_queue_mutex);
152 // read jobs are aborted, write and move jobs are syncronized
153 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
154 i != m_jobs.end();)
156 if (i->storage != s)
158 ++i;
159 continue;
161 if (i->action == disk_io_job::read)
163 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
164 m_jobs.erase(i++);
165 continue;
167 if (i->action == disk_io_job::check_files)
169 if (i->callback) m_ios.post(bind(i->callback
170 , piece_manager::disk_check_aborted, *i));
171 m_jobs.erase(i++);
172 continue;
174 ++i;
176 m_signal.notify_all();
179 bool range_overlap(int start1, int length1, int start2, int length2)
181 return (start1 <= start2 && start1 + length1 > start2)
182 || (start2 <= start1 && start2 + length2 > start1);
185 namespace
187 // The semantic of this operator is:
188 // should lhs come before rhs in the job queue
189 bool operator<(disk_io_job const& lhs, disk_io_job const& rhs)
191 // NOTE: comparison inverted to make higher priority
192 // skip _in_front_of_ lower priority
193 if (lhs.priority > rhs.priority) return true;
194 if (lhs.priority < rhs.priority) return false;
196 if (lhs.storage.get() < rhs.storage.get()) return true;
197 if (lhs.storage.get() > rhs.storage.get()) return false;
198 if (lhs.piece < rhs.piece) return true;
199 if (lhs.piece > rhs.piece) return false;
200 if (lhs.offset < rhs.offset) return true;
201 // if (lhs.offset > rhs.offset) return false;
202 return false;
206 disk_io_thread::cache_t::iterator disk_io_thread::find_cached_piece(
207 disk_io_thread::cache_t& cache
208 , disk_io_job const& j, mutex_t::scoped_lock& l)
210 for (cache_t::iterator i = cache.begin()
211 , end(cache.end()); i != end; ++i)
213 if (i->storage != j.storage || i->piece != j.piece) continue;
214 return i;
216 return cache.end();
219 void disk_io_thread::flush_expired_pieces()
221 ptime now = time_now();
223 mutex_t::scoped_lock l(m_piece_mutex);
225 INVARIANT_CHECK;
226 for (;;)
228 cache_t::iterator i = std::min_element(
229 m_pieces.begin(), m_pieces.end()
230 , bind(&cached_piece_entry::last_use, _1)
231 < bind(&cached_piece_entry::last_use, _2));
232 if (i == m_pieces.end()) return;
233 int age = total_seconds(now - i->last_use);
234 if (age < m_cache_expiry) return;
235 flush_and_remove(i, l);
239 void disk_io_thread::free_piece(cached_piece_entry& p, mutex_t::scoped_lock& l)
241 int piece_size = p.storage->info()->piece_size(p.piece);
242 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
244 for (int i = 0; i < blocks_in_piece; ++i)
246 if (p.blocks[i] == 0) continue;
247 free_buffer(p.blocks[i]);
248 p.blocks[i] = 0;
249 --p.num_blocks;
250 --m_cache_stats.cache_size;
251 --m_cache_stats.read_cache_size;
255 bool disk_io_thread::clear_oldest_read_piece(
256 cache_t::iterator ignore
257 , mutex_t::scoped_lock& l)
259 INVARIANT_CHECK;
261 cache_t::iterator i = std::min_element(
262 m_read_pieces.begin(), m_read_pieces.end()
263 , bind(&cached_piece_entry::last_use, _1)
264 < bind(&cached_piece_entry::last_use, _2));
265 if (i != m_read_pieces.end() && i != ignore)
267 // don't replace an entry that is less than one second old
268 if (time_now() - i->last_use < seconds(1)) return false;
269 free_piece(*i, l);
270 m_read_pieces.erase(i);
271 return true;
273 return false;
276 void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock& l)
278 INVARIANT_CHECK;
279 // first look if there are any read cache entries that can
280 // be cleared
281 if (clear_oldest_read_piece(m_read_pieces.end(), l)) return;
283 cache_t::iterator i = std::min_element(
284 m_pieces.begin(), m_pieces.end()
285 , bind(&cached_piece_entry::last_use, _1)
286 < bind(&cached_piece_entry::last_use, _2));
287 if (i == m_pieces.end()) return;
288 flush_and_remove(i, l);
291 void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
292 , mutex_t::scoped_lock& l)
294 flush(e, l);
295 m_pieces.erase(e);
298 void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
299 , mutex_t::scoped_lock& l)
301 INVARIANT_CHECK;
302 cached_piece_entry& p = *e;
303 int piece_size = p.storage->info()->piece_size(p.piece);
304 #ifdef TORRENT_DISK_STATS
305 m_log << log_time() << " flushing " << piece_size << std::endl;
306 #endif
307 TORRENT_ASSERT(piece_size > 0);
308 boost::scoped_array<char> buf;
309 if (m_coalesce_writes) buf.reset(new (std::nothrow) char[piece_size]);
311 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
312 int buffer_size = 0;
313 int offset = 0;
314 for (int i = 0; i <= blocks_in_piece; ++i)
316 if (i == blocks_in_piece || p.blocks[i] == 0)
318 if (buffer_size == 0) continue;
319 TORRENT_ASSERT(buf);
321 TORRENT_ASSERT(buffer_size <= i * m_block_size);
322 l.unlock();
323 p.storage->write_impl(buf.get(), p.piece, (std::min)(
324 i * m_block_size, piece_size) - buffer_size, buffer_size);
325 l.lock();
326 ++m_cache_stats.writes;
327 // std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
328 buffer_size = 0;
329 offset = 0;
330 continue;
332 int block_size = (std::min)(piece_size - i * m_block_size, m_block_size);
333 TORRENT_ASSERT(offset + block_size <= piece_size);
334 TORRENT_ASSERT(offset + block_size > 0);
335 if (!buf)
337 l.unlock();
338 p.storage->write_impl(p.blocks[i], p.piece, i * m_block_size, block_size);
339 l.lock();
340 ++m_cache_stats.writes;
342 else
344 std::memcpy(buf.get() + offset, p.blocks[i], block_size);
345 offset += m_block_size;
346 buffer_size += block_size;
348 free_buffer(p.blocks[i]);
349 p.blocks[i] = 0;
350 TORRENT_ASSERT(p.num_blocks > 0);
351 --p.num_blocks;
352 ++m_cache_stats.blocks_written;
353 --m_cache_stats.cache_size;
355 TORRENT_ASSERT(buffer_size == 0);
356 // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
357 #ifndef NDEBUG
358 for (int i = 0; i < blocks_in_piece; ++i)
359 TORRENT_ASSERT(p.blocks[i] == 0);
360 #endif
363 void disk_io_thread::cache_block(disk_io_job& j, mutex_t::scoped_lock& l)
365 INVARIANT_CHECK;
366 TORRENT_ASSERT(find_cached_piece(m_pieces, j, l) == m_pieces.end());
367 cached_piece_entry p;
369 int piece_size = j.storage->info()->piece_size(j.piece);
370 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
372 p.piece = j.piece;
373 p.storage = j.storage;
374 p.last_use = time_now();
375 p.num_blocks = 1;
376 p.blocks.reset(new char*[blocks_in_piece]);
377 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
378 int block = j.offset / m_block_size;
379 // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
380 p.blocks[block] = j.buffer;
381 ++m_cache_stats.cache_size;
382 m_pieces.push_back(p);
385 // fills a piece with data from disk, returns the total number of bytes
386 // read or -1 if there was an error
387 int disk_io_thread::read_into_piece(cached_piece_entry& p, int start_block, mutex_t::scoped_lock& l)
389 int piece_size = p.storage->info()->piece_size(p.piece);
390 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
392 int end_block = start_block;
393 for (int i = start_block; i < blocks_in_piece
394 && m_cache_stats.cache_size < m_cache_size; ++i)
396 // this is a block that is already allocated
397 // stop allocating and don't read more than
398 // what we've allocated now
399 if (p.blocks[i]) break;
400 p.blocks[i] = allocate_buffer();
402 // the allocation failed, break
403 if (p.blocks[i] == 0) break;
404 ++p.num_blocks;
405 ++m_cache_stats.cache_size;
406 ++m_cache_stats.read_cache_size;
407 ++end_block;
410 if (end_block == start_block) return -2;
412 int buffer_size = piece_size - (end_block - 1) * m_block_size + (end_block - start_block - 1) * m_block_size;
413 TORRENT_ASSERT(buffer_size <= piece_size);
414 TORRENT_ASSERT(buffer_size + start_block * m_block_size <= piece_size);
415 boost::scoped_array<char> buf;
416 if (m_coalesce_reads) buf.reset(new (std::nothrow) char[buffer_size]);
417 int ret = 0;
418 if (buf)
420 l.unlock();
421 ret += p.storage->read_impl(buf.get(), p.piece, start_block * m_block_size, buffer_size);
422 l.lock();
423 if (p.storage->error()) { return -1; }
424 ++m_cache_stats.reads;
427 int piece_offset = start_block * m_block_size;
428 int offset = 0;
429 for (int i = start_block; i < end_block; ++i)
431 int block_size = (std::min)(piece_size - piece_offset, m_block_size);
432 if (p.blocks[i] == 0) break;
433 TORRENT_ASSERT(offset <= buffer_size);
434 TORRENT_ASSERT(piece_offset <= piece_size);
435 if (buf)
437 std::memcpy(p.blocks[i], buf.get() + offset, block_size);
439 else
441 l.unlock();
442 ret += p.storage->read_impl(p.blocks[i], p.piece, piece_offset, block_size);
443 if (!p.storage->error()) { return -1; }
444 l.lock();
445 ++m_cache_stats.reads;
447 offset += m_block_size;
448 piece_offset += m_block_size;
450 TORRENT_ASSERT(ret <= buffer_size);
451 return (ret != buffer_size) ? -1 : ret;
454 bool disk_io_thread::make_room(int num_blocks
455 , cache_t::iterator ignore
456 , mutex_t::scoped_lock& l)
458 if (m_cache_size - m_cache_stats.cache_size < num_blocks)
460 // there's not enough room in the cache, clear a piece
461 // from the read cache
462 if (!clear_oldest_read_piece(ignore, l)) return false;
465 return m_cache_size - m_cache_stats.cache_size >= num_blocks;
468 // returns -1 on read error, -2 if there isn't any space in the cache
469 // or the number of bytes read
470 int disk_io_thread::cache_read_block(disk_io_job const& j, mutex_t::scoped_lock& l)
472 INVARIANT_CHECK;
474 int piece_size = j.storage->info()->piece_size(j.piece);
475 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
477 int start_block = j.offset / m_block_size;
479 if (!make_room(blocks_in_piece - start_block
480 , m_read_pieces.end(), l)) return -2;
482 cached_piece_entry p;
483 p.piece = j.piece;
484 p.storage = j.storage;
485 p.last_use = time_now();
486 p.num_blocks = 0;
487 p.blocks.reset(new char*[blocks_in_piece]);
488 std::memset(&p.blocks[0], 0, blocks_in_piece * sizeof(char*));
489 int ret = read_into_piece(p, start_block, l);
491 if (ret == -1)
492 free_piece(p, l);
493 else
494 m_read_pieces.push_back(p);
496 return ret;
499 #ifndef NDEBUG
500 void disk_io_thread::check_invariant() const
502 int cached_write_blocks = 0;
503 for (cache_t::const_iterator i = m_pieces.begin()
504 , end(m_pieces.end()); i != end; ++i)
506 cached_piece_entry const& p = *i;
507 TORRENT_ASSERT(p.blocks);
509 if (!p.storage) continue;
510 int piece_size = p.storage->info()->piece_size(p.piece);
511 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
512 int blocks = 0;
513 for (int k = 0; k < blocks_in_piece; ++k)
515 if (p.blocks[k])
517 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
518 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
519 #endif
520 ++blocks;
523 // TORRENT_ASSERT(blocks == p.num_blocks);
524 cached_write_blocks += blocks;
527 int cached_read_blocks = 0;
528 for (cache_t::const_iterator i = m_read_pieces.begin()
529 , end(m_read_pieces.end()); i != end; ++i)
531 cached_piece_entry const& p = *i;
532 TORRENT_ASSERT(p.blocks);
534 int piece_size = p.storage->info()->piece_size(p.piece);
535 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
536 int blocks = 0;
537 for (int k = 0; k < blocks_in_piece; ++k)
539 if (p.blocks[k])
541 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
542 TORRENT_ASSERT(is_disk_buffer(p.blocks[k]));
543 #endif
544 ++blocks;
547 // TORRENT_ASSERT(blocks == p.num_blocks);
548 cached_read_blocks += blocks;
551 TORRENT_ASSERT(cached_read_blocks + cached_write_blocks == m_cache_stats.cache_size);
552 TORRENT_ASSERT(cached_read_blocks == m_cache_stats.read_cache_size);
554 // when writing, there may be a one block difference, right before an old piece
555 // is flushed
556 TORRENT_ASSERT(m_cache_stats.cache_size <= m_cache_size + 1);
558 #endif
560 int disk_io_thread::try_read_from_cache(disk_io_job const& j)
562 TORRENT_ASSERT(j.buffer);
564 mutex_t::scoped_lock l(m_piece_mutex);
565 if (!m_use_read_cache) return -2;
567 cache_t::iterator p
568 = find_cached_piece(m_read_pieces, j, l);
570 bool hit = true;
571 int ret = 0;
573 // if the piece cannot be found in the cache,
574 // read the whole piece starting at the block
575 // we got a request for.
576 if (p == m_read_pieces.end())
578 ret = cache_read_block(j, l);
579 hit = false;
580 if (ret < 0) return ret;
581 p = m_read_pieces.end();
582 --p;
583 TORRENT_ASSERT(!m_read_pieces.empty());
584 TORRENT_ASSERT(p->piece == j.piece);
585 TORRENT_ASSERT(p->storage == j.storage);
588 if (p != m_read_pieces.end())
590 // copy from the cache and update the last use timestamp
591 int block = j.offset / m_block_size;
592 int block_offset = j.offset % m_block_size;
593 int buffer_offset = 0;
594 int size = j.buffer_size;
595 if (p->blocks[block] == 0)
597 int piece_size = j.storage->info()->piece_size(j.piece);
598 int blocks_in_piece = (piece_size + m_block_size - 1) / m_block_size;
599 int end_block = block;
600 while (end_block < blocks_in_piece && p->blocks[end_block] == 0) ++end_block;
601 if (!make_room(end_block - block, p, l)) return -2;
602 ret = read_into_piece(*p, block, l);
603 hit = false;
604 if (ret < 0) return ret;
605 TORRENT_ASSERT(p->blocks[block]);
608 p->last_use = time_now();
609 while (size > 0)
611 TORRENT_ASSERT(p->blocks[block]);
612 int to_copy = (std::min)(m_block_size
613 - block_offset, size);
614 std::memcpy(j.buffer + buffer_offset
615 , p->blocks[block] + block_offset
616 , to_copy);
617 size -= to_copy;
618 block_offset = 0;
619 buffer_offset += to_copy;
621 ret = j.buffer_size;
622 ++m_cache_stats.blocks_read;
623 if (hit) ++m_cache_stats.blocks_read_hit;
625 return ret;
628 void disk_io_thread::add_job(disk_io_job const& j
629 , boost::function<void(int, disk_io_job const&)> const& f)
631 TORRENT_ASSERT(!j.callback);
632 TORRENT_ASSERT(j.storage);
633 TORRENT_ASSERT(j.buffer_size <= m_block_size);
634 mutex_t::scoped_lock l(m_queue_mutex);
635 #ifndef NDEBUG
636 if (j.action == disk_io_job::write)
638 cache_t::iterator p
639 = find_cached_piece(m_pieces, j, l);
640 if (p != m_pieces.end())
642 int block = j.offset / m_block_size;
643 char const* buffer = p->blocks[block];
644 TORRENT_ASSERT(buffer == 0);
647 #endif
649 std::list<disk_io_job>::reverse_iterator i = m_jobs.rbegin();
650 if (j.action == disk_io_job::read)
652 // when we're reading, we may not skip
653 // ahead of any write operation that overlaps
654 // the region we're reading
655 for (; i != m_jobs.rend(); i++)
657 // if *i should come before j, stop
658 // and insert j before i
659 if (*i < j) break;
660 // if we come across a write operation that
661 // overlaps the region we're reading, we need
662 // to stop
663 if (i->action == disk_io_job::write
664 && i->storage == j.storage
665 && i->piece == j.piece
666 && range_overlap(i->offset, i->buffer_size
667 , j.offset, j.buffer_size))
668 break;
671 else if (j.action == disk_io_job::write)
673 for (; i != m_jobs.rend(); ++i)
675 if (*i < j)
677 if (i != m_jobs.rbegin()
678 && i.base()->storage.get() != j.storage.get())
679 i = m_jobs.rbegin();
680 break;
685 // if we are placed in front of all other jobs, put it on the back of
686 // the queue, to sweep the disk in the same direction, and to avoid
687 // starvation. The exception is if the priority is higher than the
688 // job at the front of the queue
689 if (i == m_jobs.rend() && (m_jobs.empty() || j.priority <= m_jobs.back().priority))
690 i = m_jobs.rbegin();
692 std::list<disk_io_job>::iterator k = m_jobs.insert(i.base(), j);
693 k->callback.swap(const_cast<boost::function<void(int, disk_io_job const&)>&>(f));
694 if (j.action == disk_io_job::write)
695 m_queue_buffer_size += j.buffer_size;
696 TORRENT_ASSERT(j.storage.get());
697 m_signal.notify_all();
700 #ifndef NDEBUG
701 bool disk_io_thread::is_disk_buffer(char* buffer) const
703 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
704 return true;
705 #else
706 mutex_t::scoped_lock l(m_pool_mutex);
707 return m_pool.is_from(buffer);
708 #endif
710 #endif
712 char* disk_io_thread::allocate_buffer()
714 mutex_t::scoped_lock l(m_pool_mutex);
715 #ifdef TORRENT_STATS
716 ++m_allocations;
717 #endif
718 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
719 return (char*)malloc(m_block_size);
720 #else
721 return (char*)m_pool.ordered_malloc();
722 #endif
725 void disk_io_thread::free_buffer(char* buf)
727 mutex_t::scoped_lock l(m_pool_mutex);
728 #ifdef TORRENT_STATS
729 --m_allocations;
730 #endif
731 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
732 free(buf);
733 #else
734 m_pool.ordered_free(buf);
735 #endif
738 bool disk_io_thread::test_error(disk_io_job& j)
740 error_code const& ec = j.storage->error();
741 if (ec)
743 j.str = ec.message();
744 j.error = ec;
745 j.error_file = j.storage->error_file();
746 j.storage->clear_error();
747 #ifndef NDEBUG
748 std::cout << "ERROR: '" << j.str << "' " << j.error_file << std::endl;
749 #endif
750 return true;
752 return false;
755 void disk_io_thread::operator()()
757 for (;;)
759 #ifdef TORRENT_DISK_STATS
760 m_log << log_time() << " idle" << std::endl;
761 #endif
762 mutex_t::scoped_lock jl(m_queue_mutex);
764 while (m_jobs.empty() && !m_abort)
765 m_signal.wait(jl);
766 if (m_abort && m_jobs.empty())
768 jl.unlock();
770 mutex_t::scoped_lock l(m_piece_mutex);
771 // flush all disk caches
772 for (cache_t::iterator i = m_pieces.begin()
773 , end(m_pieces.end()); i != end; ++i)
774 flush(i, l);
775 for (cache_t::iterator i = m_read_pieces.begin()
776 , end(m_read_pieces.end()); i != end; ++i)
777 free_piece(*i, l);
778 m_pieces.clear();
779 m_read_pieces.clear();
780 return;
783 // if there's a buffer in this job, it will be freed
784 // when this holder is destructed, unless it has been
785 // released.
786 disk_buffer_holder holder(*this
787 , m_jobs.front().action != disk_io_job::check_fastresume
788 ? m_jobs.front().buffer : 0);
790 boost::function<void(int, disk_io_job const&)> handler;
791 handler.swap(m_jobs.front().callback);
793 disk_io_job j = m_jobs.front();
794 m_jobs.pop_front();
795 m_queue_buffer_size -= j.buffer_size;
796 jl.unlock();
798 flush_expired_pieces();
800 int ret = 0;
802 TORRENT_ASSERT(j.storage || j.action == disk_io_job::abort_thread);
803 #ifdef TORRENT_DISK_STATS
804 ptime start = time_now();
805 #endif
806 #ifndef BOOST_NO_EXCEPTIONS
807 try {
808 #endif
810 switch (j.action)
812 case disk_io_job::abort_thread:
814 mutex_t::scoped_lock jl(m_queue_mutex);
815 m_abort = true;
817 for (std::list<disk_io_job>::iterator i = m_jobs.begin();
818 i != m_jobs.end();)
820 if (i->action == disk_io_job::read)
822 if (i->callback) m_ios.post(bind(i->callback, -1, *i));
823 m_jobs.erase(i++);
824 continue;
826 if (i->action == disk_io_job::check_files)
828 if (i->callback) m_ios.post(bind(i->callback
829 , piece_manager::disk_check_aborted, *i));
830 m_jobs.erase(i++);
831 continue;
833 ++i;
835 break;
837 case disk_io_job::read:
839 if (test_error(j))
841 ret = -1;
842 return;
844 #ifdef TORRENT_DISK_STATS
845 m_log << log_time() << " read " << j.buffer_size << std::endl;
846 #endif
847 INVARIANT_CHECK;
848 TORRENT_ASSERT(j.buffer == 0);
849 j.buffer = allocate_buffer();
850 TORRENT_ASSERT(j.buffer_size <= m_block_size);
851 if (j.buffer == 0)
853 ret = -1;
854 j.error = error_code(ENOMEM, get_posix_category());
855 j.str = j.error.message();
856 break;
859 disk_buffer_holder read_holder(*this, j.buffer);
860 ret = try_read_from_cache(j);
862 // -2 means there's no space in the read cache
863 // or that the read cache is disabled
864 if (ret == -1)
866 j.buffer = 0;
867 test_error(j);
868 break;
870 else if (ret == -2)
872 ret = j.storage->read_impl(j.buffer, j.piece, j.offset
873 , j.buffer_size);
874 if (ret < 0)
876 test_error(j);
877 break;
879 ++m_cache_stats.blocks_read;
881 read_holder.release();
882 break;
884 case disk_io_job::write:
886 if (test_error(j))
888 ret = -1;
889 break;
891 #ifdef TORRENT_DISK_STATS
892 m_log << log_time() << " write " << j.buffer_size << std::endl;
893 #endif
894 mutex_t::scoped_lock l(m_piece_mutex);
895 INVARIANT_CHECK;
896 cache_t::iterator p
897 = find_cached_piece(m_pieces, j, l);
898 int block = j.offset / m_block_size;
899 TORRENT_ASSERT(j.buffer);
900 TORRENT_ASSERT(j.buffer_size <= m_block_size);
901 if (p != m_pieces.end())
903 TORRENT_ASSERT(p->blocks[block] == 0);
904 if (p->blocks[block])
906 free_buffer(p->blocks[block]);
907 --p->num_blocks;
909 p->blocks[block] = j.buffer;
910 ++m_cache_stats.cache_size;
911 ++p->num_blocks;
912 p->last_use = time_now();
914 else
916 cache_block(j, l);
918 // we've now inserted the buffer
919 // in the cache, we should not
920 // free it at the end
921 holder.release();
922 if (m_cache_stats.cache_size >= m_cache_size)
923 flush_oldest_piece(l);
924 break;
926 case disk_io_job::hash:
928 #ifdef TORRENT_DISK_STATS
929 m_log << log_time() << " hash" << std::endl;
930 #endif
931 mutex_t::scoped_lock l(m_piece_mutex);
932 INVARIANT_CHECK;
934 cache_t::iterator i
935 = find_cached_piece(m_pieces, j, l);
936 if (i != m_pieces.end())
938 flush_and_remove(i, l);
939 if (test_error(j))
941 ret = -1;
942 j.storage->mark_failed(j.piece);
943 break;
946 l.unlock();
947 sha1_hash h = j.storage->hash_for_piece_impl(j.piece);
948 if (test_error(j))
950 ret = -1;
951 j.storage->mark_failed(j.piece);
952 break;
954 ret = (j.storage->info()->hash_for_piece(j.piece) == h)?0:-2;
955 if (ret == -2) j.storage->mark_failed(j.piece);
956 break;
958 case disk_io_job::move_storage:
960 #ifdef TORRENT_DISK_STATS
961 m_log << log_time() << " move" << std::endl;
962 #endif
963 TORRENT_ASSERT(j.buffer == 0);
964 ret = j.storage->move_storage_impl(j.str) ? 1 : 0;
965 if (ret != 0)
967 test_error(j);
968 break;
970 j.str = j.storage->save_path().string();
971 break;
973 case disk_io_job::release_files:
975 #ifdef TORRENT_DISK_STATS
976 m_log << log_time() << " release" << std::endl;
977 #endif
978 TORRENT_ASSERT(j.buffer == 0);
980 mutex_t::scoped_lock l(m_piece_mutex);
981 INVARIANT_CHECK;
983 for (cache_t::iterator i = m_pieces.begin(); i != m_pieces.end();)
985 if (i->storage == j.storage)
987 flush(i, l);
988 i = m_pieces.erase(i);
990 else
992 ++i;
995 l.unlock();
996 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
998 mutex_t::scoped_lock l(m_pool_mutex);
999 m_pool.release_memory();
1001 #endif
1002 ret = j.storage->release_files_impl();
1003 if (ret != 0) test_error(j);
1004 break;
1006 case disk_io_job::clear_read_cache:
1008 #ifdef TORRENT_DISK_STATS
1009 m_log << log_time() << " clear-cache" << std::endl;
1010 #endif
1011 TORRENT_ASSERT(j.buffer == 0);
1013 mutex_t::scoped_lock l(m_piece_mutex);
1014 INVARIANT_CHECK;
1016 for (cache_t::iterator i = m_read_pieces.begin();
1017 i != m_read_pieces.end();)
1019 if (i->storage == j.storage)
1021 free_piece(*i, l);
1022 i = m_read_pieces.erase(i);
1024 else
1026 ++i;
1029 l.unlock();
1030 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1032 mutex_t::scoped_lock l(m_pool_mutex);
1033 m_pool.release_memory();
1035 #endif
1036 ret = 0;
1037 break;
1039 case disk_io_job::delete_files:
1041 #ifdef TORRENT_DISK_STATS
1042 m_log << log_time() << " delete" << std::endl;
1043 #endif
1044 TORRENT_ASSERT(j.buffer == 0);
1046 mutex_t::scoped_lock l(m_piece_mutex);
1047 INVARIANT_CHECK;
1049 cache_t::iterator i = std::remove_if(
1050 m_pieces.begin(), m_pieces.end(), bind(&cached_piece_entry::storage, _1) == j.storage);
1052 for (cache_t::iterator k = i; k != m_pieces.end(); ++k)
1054 torrent_info const& ti = *k->storage->info();
1055 int blocks_in_piece = (ti.piece_size(k->piece) + m_block_size - 1) / m_block_size;
1056 for (int j = 0; j < blocks_in_piece; ++j)
1058 if (k->blocks[j] == 0) continue;
1059 free_buffer(k->blocks[j]);
1060 k->blocks[j] = 0;
1063 m_pieces.erase(i, m_pieces.end());
1064 l.unlock();
1065 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1067 mutex_t::scoped_lock l(m_pool_mutex);
1068 m_pool.release_memory();
1070 #endif
1071 ret = j.storage->delete_files_impl();
1072 if (ret != 0) test_error(j);
1073 break;
1075 case disk_io_job::check_fastresume:
1077 #ifdef TORRENT_DISK_STATS
1078 m_log << log_time() << " check fastresume" << std::endl;
1079 #endif
1080 lazy_entry const* rd = (lazy_entry const*)j.buffer;
1081 TORRENT_ASSERT(rd != 0);
1082 ret = j.storage->check_fastresume(*rd, j.str);
1083 break;
1085 case disk_io_job::check_files:
1087 #ifdef TORRENT_DISK_STATS
1088 m_log << log_time() << " check files" << std::endl;
1089 #endif
1090 int piece_size = j.storage->info()->piece_length();
1091 for (int processed = 0; processed < 4 * 1024 * 1024; processed += piece_size)
1093 ret = j.storage->check_files(j.piece, j.offset, j.str);
1095 #ifndef BOOST_NO_EXCEPTIONS
1096 try {
1097 #endif
1098 TORRENT_ASSERT(handler);
1099 if (handler && ret == piece_manager::need_full_check)
1100 m_ios.post(bind(handler, ret, j));
1101 #ifndef BOOST_NO_EXCEPTIONS
1102 } catch (std::exception&) {}
1103 #endif
1104 if (ret != piece_manager::need_full_check) break;
1106 if (test_error(j))
1108 ret = piece_manager::fatal_disk_error;
1109 break;
1111 TORRENT_ASSERT(ret != -2 || !j.str.empty());
1113 // if the check is not done, add it at the end of the job queue
1114 if (ret == piece_manager::need_full_check)
1116 add_job(j, handler);
1117 continue;
1119 break;
1121 case disk_io_job::save_resume_data:
1123 #ifdef TORRENT_DISK_STATS
1124 m_log << log_time() << " save resume data" << std::endl;
1125 #endif
1126 j.resume_data.reset(new entry(entry::dictionary_t));
1127 j.storage->write_resume_data(*j.resume_data);
1128 ret = 0;
1129 break;
1131 case disk_io_job::rename_file:
1133 #ifdef TORRENT_DISK_STATS
1134 m_log << log_time() << " rename file" << std::endl;
1135 #endif
1136 ret = j.storage->rename_file_impl(j.piece, j.str);
1139 #ifndef BOOST_NO_EXCEPTIONS
1140 } catch (std::exception& e)
1142 ret = -1;
1145 j.str = e.what();
1147 catch (std::exception&) {}
1149 #endif
1151 // if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1152 // else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1153 #ifndef BOOST_NO_EXCEPTIONS
1154 try {
1155 #endif
1156 TORRENT_ASSERT(ret != -2 || !j.str.empty());
1157 if (handler) m_ios.post(bind(handler, ret, j));
1158 #ifndef BOOST_NO_EXCEPTIONS
1159 } catch (std::exception&)
1161 TORRENT_ASSERT(false);
1163 #endif
1165 TORRENT_ASSERT(false);