3 Copyright (c) 2007, Arvid Norberg
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions
10 * Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 * Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in
14 the documentation and/or other materials provided with the distribution.
15 * Neither the name of the author nor the names of its
16 contributors may be used to endorse or promote products derived
17 from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 POSSIBILITY OF SUCH DAMAGE.
33 #include "libtorrent/storage.hpp"
35 #include "libtorrent/disk_io_thread.hpp"
36 #include "libtorrent/disk_buffer_holder.hpp"
37 #include <boost/scoped_array.hpp>
41 #define alloca(s) _alloca(s)
44 #ifdef TORRENT_DISK_STATS
45 #include "libtorrent/time.hpp"
51 disk_io_thread::disk_io_thread(asio::io_service
& ios
, int block_size
)
53 , m_queue_buffer_size(0)
54 , m_cache_size(512) // 512 * 16kB = 8MB
55 , m_cache_expiry(60) // 1 minute
56 , m_coalesce_writes(true)
57 , m_coalesce_reads(true)
58 , m_use_read_cache(true)
59 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
62 , m_block_size(block_size
)
64 , m_disk_io_thread(boost::ref(*this))
69 #ifdef TORRENT_DISK_STATS
70 m_log
.open("disk_io_thread.log", std::ios::trunc
);
74 disk_io_thread::~disk_io_thread()
76 TORRENT_ASSERT(m_abort
== true);
79 void disk_io_thread::join()
81 mutex_t::scoped_lock
l(m_queue_mutex
);
83 j
.action
= disk_io_job::abort_thread
;
84 m_jobs
.insert(m_jobs
.begin(), j
);
85 m_signal
.notify_all();
88 m_disk_io_thread
.join();
91 void disk_io_thread::get_cache_info(sha1_hash
const& ih
, std::vector
<cached_piece_info
>& ret
) const
93 mutex_t::scoped_lock
l(m_piece_mutex
);
95 ret
.reserve(m_pieces
.size());
96 for (cache_t::const_iterator i
= m_pieces
.begin()
97 , end(m_pieces
.end()); i
!= end
; ++i
)
99 torrent_info
const& ti
= *i
->storage
->info();
100 if (ti
.info_hash() != ih
) continue;
101 cached_piece_info info
;
102 info
.piece
= i
->piece
;
103 info
.last_use
= i
->last_use
;
104 info
.kind
= cached_piece_info::write_cache
;
105 int blocks_in_piece
= (ti
.piece_size(i
->piece
) + (m_block_size
) - 1) / m_block_size
;
106 info
.blocks
.resize(blocks_in_piece
);
107 for (int b
= 0; b
< blocks_in_piece
; ++b
)
108 if (i
->blocks
[b
]) info
.blocks
[b
] = true;
111 for (cache_t::const_iterator i
= m_read_pieces
.begin()
112 , end(m_read_pieces
.end()); i
!= end
; ++i
)
114 torrent_info
const& ti
= *i
->storage
->info();
115 if (ti
.info_hash() != ih
) continue;
116 cached_piece_info info
;
117 info
.piece
= i
->piece
;
118 info
.last_use
= i
->last_use
;
119 info
.kind
= cached_piece_info::read_cache
;
120 int blocks_in_piece
= (ti
.piece_size(i
->piece
) + (m_block_size
) - 1) / m_block_size
;
121 info
.blocks
.resize(blocks_in_piece
);
122 for (int b
= 0; b
< blocks_in_piece
; ++b
)
123 if (i
->blocks
[b
]) info
.blocks
[b
] = true;
128 cache_status
disk_io_thread::status() const
130 mutex_t::scoped_lock
l(m_piece_mutex
);
131 return m_cache_stats
;
134 void disk_io_thread::set_cache_size(int s
)
136 mutex_t::scoped_lock
l(m_piece_mutex
);
137 TORRENT_ASSERT(s
>= 0);
141 void disk_io_thread::set_cache_expiry(int ex
)
143 mutex_t::scoped_lock
l(m_piece_mutex
);
144 TORRENT_ASSERT(ex
> 0);
148 // aborts read operations
149 void disk_io_thread::stop(boost::intrusive_ptr
<piece_manager
> s
)
151 mutex_t::scoped_lock
l(m_queue_mutex
);
152 // read jobs are aborted, write and move jobs are syncronized
153 for (std::list
<disk_io_job
>::iterator i
= m_jobs
.begin();
161 if (i
->action
== disk_io_job::read
)
163 if (i
->callback
) m_ios
.post(bind(i
->callback
, -1, *i
));
167 if (i
->action
== disk_io_job::check_files
)
169 if (i
->callback
) m_ios
.post(bind(i
->callback
170 , piece_manager::disk_check_aborted
, *i
));
176 m_signal
.notify_all();
179 bool range_overlap(int start1
, int length1
, int start2
, int length2
)
181 return (start1
<= start2
&& start1
+ length1
> start2
)
182 || (start2
<= start1
&& start2
+ length2
> start1
);
187 // The semantic of this operator is:
188 // should lhs come before rhs in the job queue
189 bool operator<(disk_io_job
const& lhs
, disk_io_job
const& rhs
)
191 // NOTE: comparison inverted to make higher priority
192 // skip _in_front_of_ lower priority
193 if (lhs
.priority
> rhs
.priority
) return true;
194 if (lhs
.priority
< rhs
.priority
) return false;
196 if (lhs
.storage
.get() < rhs
.storage
.get()) return true;
197 if (lhs
.storage
.get() > rhs
.storage
.get()) return false;
198 if (lhs
.piece
< rhs
.piece
) return true;
199 if (lhs
.piece
> rhs
.piece
) return false;
200 if (lhs
.offset
< rhs
.offset
) return true;
201 // if (lhs.offset > rhs.offset) return false;
206 disk_io_thread::cache_t::iterator
disk_io_thread::find_cached_piece(
207 disk_io_thread::cache_t
& cache
208 , disk_io_job
const& j
, mutex_t::scoped_lock
& l
)
210 for (cache_t::iterator i
= cache
.begin()
211 , end(cache
.end()); i
!= end
; ++i
)
213 if (i
->storage
!= j
.storage
|| i
->piece
!= j
.piece
) continue;
219 void disk_io_thread::flush_expired_pieces()
221 ptime now
= time_now();
223 mutex_t::scoped_lock
l(m_piece_mutex
);
228 cache_t::iterator i
= std::min_element(
229 m_pieces
.begin(), m_pieces
.end()
230 , bind(&cached_piece_entry::last_use
, _1
)
231 < bind(&cached_piece_entry::last_use
, _2
));
232 if (i
== m_pieces
.end()) return;
233 int age
= total_seconds(now
- i
->last_use
);
234 if (age
< m_cache_expiry
) return;
235 flush_and_remove(i
, l
);
239 void disk_io_thread::free_piece(cached_piece_entry
& p
, mutex_t::scoped_lock
& l
)
241 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
242 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
244 for (int i
= 0; i
< blocks_in_piece
; ++i
)
246 if (p
.blocks
[i
] == 0) continue;
247 free_buffer(p
.blocks
[i
]);
250 --m_cache_stats
.cache_size
;
251 --m_cache_stats
.read_cache_size
;
255 bool disk_io_thread::clear_oldest_read_piece(
256 cache_t::iterator ignore
257 , mutex_t::scoped_lock
& l
)
261 cache_t::iterator i
= std::min_element(
262 m_read_pieces
.begin(), m_read_pieces
.end()
263 , bind(&cached_piece_entry::last_use
, _1
)
264 < bind(&cached_piece_entry::last_use
, _2
));
265 if (i
!= m_read_pieces
.end() && i
!= ignore
)
267 // don't replace an entry that is less than one second old
268 if (time_now() - i
->last_use
< seconds(1)) return false;
270 m_read_pieces
.erase(i
);
276 void disk_io_thread::flush_oldest_piece(mutex_t::scoped_lock
& l
)
279 // first look if there are any read cache entries that can
281 if (clear_oldest_read_piece(m_read_pieces
.end(), l
)) return;
283 cache_t::iterator i
= std::min_element(
284 m_pieces
.begin(), m_pieces
.end()
285 , bind(&cached_piece_entry::last_use
, _1
)
286 < bind(&cached_piece_entry::last_use
, _2
));
287 if (i
== m_pieces
.end()) return;
288 flush_and_remove(i
, l
);
291 void disk_io_thread::flush_and_remove(disk_io_thread::cache_t::iterator e
292 , mutex_t::scoped_lock
& l
)
298 void disk_io_thread::flush(disk_io_thread::cache_t::iterator e
299 , mutex_t::scoped_lock
& l
)
302 cached_piece_entry
& p
= *e
;
303 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
304 #ifdef TORRENT_DISK_STATS
305 m_log
<< log_time() << " flushing " << piece_size
<< std::endl
;
307 TORRENT_ASSERT(piece_size
> 0);
308 boost::scoped_array
<char> buf
;
309 if (m_coalesce_writes
) buf
.reset(new (std::nothrow
) char[piece_size
]);
311 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
314 for (int i
= 0; i
<= blocks_in_piece
; ++i
)
316 if (i
== blocks_in_piece
|| p
.blocks
[i
] == 0)
318 if (buffer_size
== 0) continue;
321 TORRENT_ASSERT(buffer_size
<= i
* m_block_size
);
323 p
.storage
->write_impl(buf
.get(), p
.piece
, (std::min
)(
324 i
* m_block_size
, piece_size
) - buffer_size
, buffer_size
);
326 ++m_cache_stats
.writes
;
327 // std::cerr << " flushing p: " << p.piece << " bytes: " << buffer_size << std::endl;
332 int block_size
= (std::min
)(piece_size
- i
* m_block_size
, m_block_size
);
333 TORRENT_ASSERT(offset
+ block_size
<= piece_size
);
334 TORRENT_ASSERT(offset
+ block_size
> 0);
338 p
.storage
->write_impl(p
.blocks
[i
], p
.piece
, i
* m_block_size
, block_size
);
340 ++m_cache_stats
.writes
;
344 std::memcpy(buf
.get() + offset
, p
.blocks
[i
], block_size
);
345 offset
+= m_block_size
;
346 buffer_size
+= block_size
;
348 free_buffer(p
.blocks
[i
]);
350 TORRENT_ASSERT(p
.num_blocks
> 0);
352 ++m_cache_stats
.blocks_written
;
353 --m_cache_stats
.cache_size
;
355 TORRENT_ASSERT(buffer_size
== 0);
356 // std::cerr << " flushing p: " << p.piece << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
358 for (int i
= 0; i
< blocks_in_piece
; ++i
)
359 TORRENT_ASSERT(p
.blocks
[i
] == 0);
363 void disk_io_thread::cache_block(disk_io_job
& j
, mutex_t::scoped_lock
& l
)
366 TORRENT_ASSERT(find_cached_piece(m_pieces
, j
, l
) == m_pieces
.end());
367 cached_piece_entry p
;
369 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
370 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
373 p
.storage
= j
.storage
;
374 p
.last_use
= time_now();
376 p
.blocks
.reset(new char*[blocks_in_piece
]);
377 std::memset(&p
.blocks
[0], 0, blocks_in_piece
* sizeof(char*));
378 int block
= j
.offset
/ m_block_size
;
379 // std::cerr << " adding cache entry for p: " << j.piece << " block: " << block << " cached_blocks: " << m_cache_stats.cache_size << std::endl;
380 p
.blocks
[block
] = j
.buffer
;
381 ++m_cache_stats
.cache_size
;
382 m_pieces
.push_back(p
);
385 // fills a piece with data from disk, returns the total number of bytes
386 // read or -1 if there was an error
387 int disk_io_thread::read_into_piece(cached_piece_entry
& p
, int start_block
, mutex_t::scoped_lock
& l
)
389 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
390 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
392 int end_block
= start_block
;
393 for (int i
= start_block
; i
< blocks_in_piece
394 && m_cache_stats
.cache_size
< m_cache_size
; ++i
)
396 // this is a block that is already allocated
397 // stop allocating and don't read more than
398 // what we've allocated now
399 if (p
.blocks
[i
]) break;
400 p
.blocks
[i
] = allocate_buffer();
402 // the allocation failed, break
403 if (p
.blocks
[i
] == 0) break;
405 ++m_cache_stats
.cache_size
;
406 ++m_cache_stats
.read_cache_size
;
410 if (end_block
== start_block
) return -2;
412 int buffer_size
= piece_size
- (end_block
- 1) * m_block_size
+ (end_block
- start_block
- 1) * m_block_size
;
413 TORRENT_ASSERT(buffer_size
<= piece_size
);
414 TORRENT_ASSERT(buffer_size
+ start_block
* m_block_size
<= piece_size
);
415 boost::scoped_array
<char> buf
;
416 if (m_coalesce_reads
) buf
.reset(new (std::nothrow
) char[buffer_size
]);
421 ret
+= p
.storage
->read_impl(buf
.get(), p
.piece
, start_block
* m_block_size
, buffer_size
);
423 if (p
.storage
->error()) { return -1; }
424 ++m_cache_stats
.reads
;
427 int piece_offset
= start_block
* m_block_size
;
429 for (int i
= start_block
; i
< end_block
; ++i
)
431 int block_size
= (std::min
)(piece_size
- piece_offset
, m_block_size
);
432 if (p
.blocks
[i
] == 0) break;
433 TORRENT_ASSERT(offset
<= buffer_size
);
434 TORRENT_ASSERT(piece_offset
<= piece_size
);
437 std::memcpy(p
.blocks
[i
], buf
.get() + offset
, block_size
);
442 ret
+= p
.storage
->read_impl(p
.blocks
[i
], p
.piece
, piece_offset
, block_size
);
443 if (!p
.storage
->error()) { return -1; }
445 ++m_cache_stats
.reads
;
447 offset
+= m_block_size
;
448 piece_offset
+= m_block_size
;
450 TORRENT_ASSERT(ret
<= buffer_size
);
451 return (ret
!= buffer_size
) ? -1 : ret
;
454 bool disk_io_thread::make_room(int num_blocks
455 , cache_t::iterator ignore
456 , mutex_t::scoped_lock
& l
)
458 if (m_cache_size
- m_cache_stats
.cache_size
< num_blocks
)
460 // there's not enough room in the cache, clear a piece
461 // from the read cache
462 if (!clear_oldest_read_piece(ignore
, l
)) return false;
465 return m_cache_size
- m_cache_stats
.cache_size
>= num_blocks
;
468 // returns -1 on read error, -2 if there isn't any space in the cache
469 // or the number of bytes read
470 int disk_io_thread::cache_read_block(disk_io_job
const& j
, mutex_t::scoped_lock
& l
)
474 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
475 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
477 int start_block
= j
.offset
/ m_block_size
;
479 if (!make_room(blocks_in_piece
- start_block
480 , m_read_pieces
.end(), l
)) return -2;
482 cached_piece_entry p
;
484 p
.storage
= j
.storage
;
485 p
.last_use
= time_now();
487 p
.blocks
.reset(new char*[blocks_in_piece
]);
488 std::memset(&p
.blocks
[0], 0, blocks_in_piece
* sizeof(char*));
489 int ret
= read_into_piece(p
, start_block
, l
);
494 m_read_pieces
.push_back(p
);
500 void disk_io_thread::check_invariant() const
502 int cached_write_blocks
= 0;
503 for (cache_t::const_iterator i
= m_pieces
.begin()
504 , end(m_pieces
.end()); i
!= end
; ++i
)
506 cached_piece_entry
const& p
= *i
;
507 TORRENT_ASSERT(p
.blocks
);
509 if (!p
.storage
) continue;
510 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
511 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
513 for (int k
= 0; k
< blocks_in_piece
; ++k
)
517 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
518 TORRENT_ASSERT(is_disk_buffer(p
.blocks
[k
]));
523 // TORRENT_ASSERT(blocks == p.num_blocks);
524 cached_write_blocks
+= blocks
;
527 int cached_read_blocks
= 0;
528 for (cache_t::const_iterator i
= m_read_pieces
.begin()
529 , end(m_read_pieces
.end()); i
!= end
; ++i
)
531 cached_piece_entry
const& p
= *i
;
532 TORRENT_ASSERT(p
.blocks
);
534 int piece_size
= p
.storage
->info()->piece_size(p
.piece
);
535 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
537 for (int k
= 0; k
< blocks_in_piece
; ++k
)
541 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
542 TORRENT_ASSERT(is_disk_buffer(p
.blocks
[k
]));
547 // TORRENT_ASSERT(blocks == p.num_blocks);
548 cached_read_blocks
+= blocks
;
551 TORRENT_ASSERT(cached_read_blocks
+ cached_write_blocks
== m_cache_stats
.cache_size
);
552 TORRENT_ASSERT(cached_read_blocks
== m_cache_stats
.read_cache_size
);
554 // when writing, there may be a one block difference, right before an old piece
556 TORRENT_ASSERT(m_cache_stats
.cache_size
<= m_cache_size
+ 1);
560 int disk_io_thread::try_read_from_cache(disk_io_job
const& j
)
562 TORRENT_ASSERT(j
.buffer
);
564 mutex_t::scoped_lock
l(m_piece_mutex
);
565 if (!m_use_read_cache
) return -2;
568 = find_cached_piece(m_read_pieces
, j
, l
);
573 // if the piece cannot be found in the cache,
574 // read the whole piece starting at the block
575 // we got a request for.
576 if (p
== m_read_pieces
.end())
578 ret
= cache_read_block(j
, l
);
580 if (ret
< 0) return ret
;
581 p
= m_read_pieces
.end();
583 TORRENT_ASSERT(!m_read_pieces
.empty());
584 TORRENT_ASSERT(p
->piece
== j
.piece
);
585 TORRENT_ASSERT(p
->storage
== j
.storage
);
588 if (p
!= m_read_pieces
.end())
590 // copy from the cache and update the last use timestamp
591 int block
= j
.offset
/ m_block_size
;
592 int block_offset
= j
.offset
% m_block_size
;
593 int buffer_offset
= 0;
594 int size
= j
.buffer_size
;
595 if (p
->blocks
[block
] == 0)
597 int piece_size
= j
.storage
->info()->piece_size(j
.piece
);
598 int blocks_in_piece
= (piece_size
+ m_block_size
- 1) / m_block_size
;
599 int end_block
= block
;
600 while (end_block
< blocks_in_piece
&& p
->blocks
[end_block
] == 0) ++end_block
;
601 if (!make_room(end_block
- block
, p
, l
)) return -2;
602 ret
= read_into_piece(*p
, block
, l
);
604 if (ret
< 0) return ret
;
605 TORRENT_ASSERT(p
->blocks
[block
]);
608 p
->last_use
= time_now();
611 TORRENT_ASSERT(p
->blocks
[block
]);
612 int to_copy
= (std::min
)(m_block_size
613 - block_offset
, size
);
614 std::memcpy(j
.buffer
+ buffer_offset
615 , p
->blocks
[block
] + block_offset
619 buffer_offset
+= to_copy
;
622 ++m_cache_stats
.blocks_read
;
623 if (hit
) ++m_cache_stats
.blocks_read_hit
;
628 void disk_io_thread::add_job(disk_io_job
const& j
629 , boost::function
<void(int, disk_io_job
const&)> const& f
)
631 TORRENT_ASSERT(!j
.callback
);
632 TORRENT_ASSERT(j
.storage
);
633 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
634 mutex_t::scoped_lock
l(m_queue_mutex
);
636 if (j
.action
== disk_io_job::write
)
639 = find_cached_piece(m_pieces
, j
, l
);
640 if (p
!= m_pieces
.end())
642 int block
= j
.offset
/ m_block_size
;
643 char const* buffer
= p
->blocks
[block
];
644 TORRENT_ASSERT(buffer
== 0);
649 std::list
<disk_io_job
>::reverse_iterator i
= m_jobs
.rbegin();
650 if (j
.action
== disk_io_job::read
)
652 // when we're reading, we may not skip
653 // ahead of any write operation that overlaps
654 // the region we're reading
655 for (; i
!= m_jobs
.rend(); i
++)
657 // if *i should come before j, stop
658 // and insert j before i
660 // if we come across a write operation that
661 // overlaps the region we're reading, we need
663 if (i
->action
== disk_io_job::write
664 && i
->storage
== j
.storage
665 && i
->piece
== j
.piece
666 && range_overlap(i
->offset
, i
->buffer_size
667 , j
.offset
, j
.buffer_size
))
671 else if (j
.action
== disk_io_job::write
)
673 for (; i
!= m_jobs
.rend(); ++i
)
677 if (i
!= m_jobs
.rbegin()
678 && i
.base()->storage
.get() != j
.storage
.get())
685 // if we are placed in front of all other jobs, put it on the back of
686 // the queue, to sweep the disk in the same direction, and to avoid
687 // starvation. The exception is if the priority is higher than the
688 // job at the front of the queue
689 if (i
== m_jobs
.rend() && (m_jobs
.empty() || j
.priority
<= m_jobs
.back().priority
))
692 std::list
<disk_io_job
>::iterator k
= m_jobs
.insert(i
.base(), j
);
693 k
->callback
.swap(const_cast<boost::function
<void(int, disk_io_job
const&)>&>(f
));
694 if (j
.action
== disk_io_job::write
)
695 m_queue_buffer_size
+= j
.buffer_size
;
696 TORRENT_ASSERT(j
.storage
.get());
697 m_signal
.notify_all();
701 bool disk_io_thread::is_disk_buffer(char* buffer
) const
703 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
706 mutex_t::scoped_lock
l(m_pool_mutex
);
707 return m_pool
.is_from(buffer
);
712 char* disk_io_thread::allocate_buffer()
714 mutex_t::scoped_lock
l(m_pool_mutex
);
718 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
719 return (char*)malloc(m_block_size
);
721 return (char*)m_pool
.ordered_malloc();
725 void disk_io_thread::free_buffer(char* buf
)
727 mutex_t::scoped_lock
l(m_pool_mutex
);
731 #ifdef TORRENT_DISABLE_POOL_ALLOCATOR
734 m_pool
.ordered_free(buf
);
738 bool disk_io_thread::test_error(disk_io_job
& j
)
740 error_code
const& ec
= j
.storage
->error();
743 j
.str
= ec
.message();
745 j
.error_file
= j
.storage
->error_file();
746 j
.storage
->clear_error();
748 std::cout
<< "ERROR: '" << j
.str
<< "' " << j
.error_file
<< std::endl
;
755 void disk_io_thread::operator()()
759 #ifdef TORRENT_DISK_STATS
760 m_log
<< log_time() << " idle" << std::endl
;
762 mutex_t::scoped_lock
jl(m_queue_mutex
);
764 while (m_jobs
.empty() && !m_abort
)
766 if (m_abort
&& m_jobs
.empty())
770 mutex_t::scoped_lock
l(m_piece_mutex
);
771 // flush all disk caches
772 for (cache_t::iterator i
= m_pieces
.begin()
773 , end(m_pieces
.end()); i
!= end
; ++i
)
775 for (cache_t::iterator i
= m_read_pieces
.begin()
776 , end(m_read_pieces
.end()); i
!= end
; ++i
)
779 m_read_pieces
.clear();
783 // if there's a buffer in this job, it will be freed
784 // when this holder is destructed, unless it has been
786 disk_buffer_holder
holder(*this
787 , m_jobs
.front().action
!= disk_io_job::check_fastresume
788 ? m_jobs
.front().buffer
: 0);
790 boost::function
<void(int, disk_io_job
const&)> handler
;
791 handler
.swap(m_jobs
.front().callback
);
793 disk_io_job j
= m_jobs
.front();
795 m_queue_buffer_size
-= j
.buffer_size
;
798 flush_expired_pieces();
802 TORRENT_ASSERT(j
.storage
|| j
.action
== disk_io_job::abort_thread
);
803 #ifdef TORRENT_DISK_STATS
804 ptime start
= time_now();
806 #ifndef BOOST_NO_EXCEPTIONS
812 case disk_io_job::abort_thread
:
814 mutex_t::scoped_lock
jl(m_queue_mutex
);
817 for (std::list
<disk_io_job
>::iterator i
= m_jobs
.begin();
820 if (i
->action
== disk_io_job::read
)
822 if (i
->callback
) m_ios
.post(bind(i
->callback
, -1, *i
));
826 if (i
->action
== disk_io_job::check_files
)
828 if (i
->callback
) m_ios
.post(bind(i
->callback
829 , piece_manager::disk_check_aborted
, *i
));
837 case disk_io_job::read
:
844 #ifdef TORRENT_DISK_STATS
845 m_log
<< log_time() << " read " << j
.buffer_size
<< std::endl
;
848 TORRENT_ASSERT(j
.buffer
== 0);
849 j
.buffer
= allocate_buffer();
850 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
854 j
.error
= error_code(ENOMEM
, get_posix_category());
855 j
.str
= j
.error
.message();
859 disk_buffer_holder
read_holder(*this, j
.buffer
);
860 ret
= try_read_from_cache(j
);
862 // -2 means there's no space in the read cache
863 // or that the read cache is disabled
872 ret
= j
.storage
->read_impl(j
.buffer
, j
.piece
, j
.offset
879 ++m_cache_stats
.blocks_read
;
881 read_holder
.release();
884 case disk_io_job::write
:
891 #ifdef TORRENT_DISK_STATS
892 m_log
<< log_time() << " write " << j
.buffer_size
<< std::endl
;
894 mutex_t::scoped_lock
l(m_piece_mutex
);
897 = find_cached_piece(m_pieces
, j
, l
);
898 int block
= j
.offset
/ m_block_size
;
899 TORRENT_ASSERT(j
.buffer
);
900 TORRENT_ASSERT(j
.buffer_size
<= m_block_size
);
901 if (p
!= m_pieces
.end())
903 TORRENT_ASSERT(p
->blocks
[block
] == 0);
904 if (p
->blocks
[block
])
906 free_buffer(p
->blocks
[block
]);
909 p
->blocks
[block
] = j
.buffer
;
910 ++m_cache_stats
.cache_size
;
912 p
->last_use
= time_now();
918 // we've now inserted the buffer
919 // in the cache, we should not
920 // free it at the end
922 if (m_cache_stats
.cache_size
>= m_cache_size
)
923 flush_oldest_piece(l
);
926 case disk_io_job::hash
:
928 #ifdef TORRENT_DISK_STATS
929 m_log
<< log_time() << " hash" << std::endl
;
931 mutex_t::scoped_lock
l(m_piece_mutex
);
935 = find_cached_piece(m_pieces
, j
, l
);
936 if (i
!= m_pieces
.end())
938 flush_and_remove(i
, l
);
942 j
.storage
->mark_failed(j
.piece
);
947 sha1_hash h
= j
.storage
->hash_for_piece_impl(j
.piece
);
951 j
.storage
->mark_failed(j
.piece
);
954 ret
= (j
.storage
->info()->hash_for_piece(j
.piece
) == h
)?0:-2;
955 if (ret
== -2) j
.storage
->mark_failed(j
.piece
);
958 case disk_io_job::move_storage
:
960 #ifdef TORRENT_DISK_STATS
961 m_log
<< log_time() << " move" << std::endl
;
963 TORRENT_ASSERT(j
.buffer
== 0);
964 ret
= j
.storage
->move_storage_impl(j
.str
) ? 1 : 0;
970 j
.str
= j
.storage
->save_path().string();
973 case disk_io_job::release_files
:
975 #ifdef TORRENT_DISK_STATS
976 m_log
<< log_time() << " release" << std::endl
;
978 TORRENT_ASSERT(j
.buffer
== 0);
980 mutex_t::scoped_lock
l(m_piece_mutex
);
983 for (cache_t::iterator i
= m_pieces
.begin(); i
!= m_pieces
.end();)
985 if (i
->storage
== j
.storage
)
988 i
= m_pieces
.erase(i
);
996 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
998 mutex_t::scoped_lock
l(m_pool_mutex
);
999 m_pool
.release_memory();
1002 ret
= j
.storage
->release_files_impl();
1003 if (ret
!= 0) test_error(j
);
1006 case disk_io_job::clear_read_cache
:
1008 #ifdef TORRENT_DISK_STATS
1009 m_log
<< log_time() << " clear-cache" << std::endl
;
1011 TORRENT_ASSERT(j
.buffer
== 0);
1013 mutex_t::scoped_lock
l(m_piece_mutex
);
1016 for (cache_t::iterator i
= m_read_pieces
.begin();
1017 i
!= m_read_pieces
.end();)
1019 if (i
->storage
== j
.storage
)
1022 i
= m_read_pieces
.erase(i
);
1030 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1032 mutex_t::scoped_lock
l(m_pool_mutex
);
1033 m_pool
.release_memory();
1039 case disk_io_job::delete_files
:
1041 #ifdef TORRENT_DISK_STATS
1042 m_log
<< log_time() << " delete" << std::endl
;
1044 TORRENT_ASSERT(j
.buffer
== 0);
1046 mutex_t::scoped_lock
l(m_piece_mutex
);
1049 cache_t::iterator i
= std::remove_if(
1050 m_pieces
.begin(), m_pieces
.end(), bind(&cached_piece_entry::storage
, _1
) == j
.storage
);
1052 for (cache_t::iterator k
= i
; k
!= m_pieces
.end(); ++k
)
1054 torrent_info
const& ti
= *k
->storage
->info();
1055 int blocks_in_piece
= (ti
.piece_size(k
->piece
) + m_block_size
- 1) / m_block_size
;
1056 for (int j
= 0; j
< blocks_in_piece
; ++j
)
1058 if (k
->blocks
[j
] == 0) continue;
1059 free_buffer(k
->blocks
[j
]);
1063 m_pieces
.erase(i
, m_pieces
.end());
1065 #ifndef TORRENT_DISABLE_POOL_ALLOCATOR
1067 mutex_t::scoped_lock
l(m_pool_mutex
);
1068 m_pool
.release_memory();
1071 ret
= j
.storage
->delete_files_impl();
1072 if (ret
!= 0) test_error(j
);
1075 case disk_io_job::check_fastresume
:
1077 #ifdef TORRENT_DISK_STATS
1078 m_log
<< log_time() << " check fastresume" << std::endl
;
1080 lazy_entry
const* rd
= (lazy_entry
const*)j
.buffer
;
1081 TORRENT_ASSERT(rd
!= 0);
1082 ret
= j
.storage
->check_fastresume(*rd
, j
.str
);
1085 case disk_io_job::check_files
:
1087 #ifdef TORRENT_DISK_STATS
1088 m_log
<< log_time() << " check files" << std::endl
;
1090 int piece_size
= j
.storage
->info()->piece_length();
1091 for (int processed
= 0; processed
< 4 * 1024 * 1024; processed
+= piece_size
)
1093 ret
= j
.storage
->check_files(j
.piece
, j
.offset
, j
.str
);
1095 #ifndef BOOST_NO_EXCEPTIONS
1098 TORRENT_ASSERT(handler
);
1099 if (handler
&& ret
== piece_manager::need_full_check
)
1100 m_ios
.post(bind(handler
, ret
, j
));
1101 #ifndef BOOST_NO_EXCEPTIONS
1102 } catch (std::exception
&) {}
1104 if (ret
!= piece_manager::need_full_check
) break;
1108 ret
= piece_manager::fatal_disk_error
;
1111 TORRENT_ASSERT(ret
!= -2 || !j
.str
.empty());
1113 // if the check is not done, add it at the end of the job queue
1114 if (ret
== piece_manager::need_full_check
)
1116 add_job(j
, handler
);
1121 case disk_io_job::save_resume_data
:
1123 #ifdef TORRENT_DISK_STATS
1124 m_log
<< log_time() << " save resume data" << std::endl
;
1126 j
.resume_data
.reset(new entry(entry::dictionary_t
));
1127 j
.storage
->write_resume_data(*j
.resume_data
);
1131 case disk_io_job::rename_file
:
1133 #ifdef TORRENT_DISK_STATS
1134 m_log
<< log_time() << " rename file" << std::endl
;
1136 ret
= j
.storage
->rename_file_impl(j
.piece
, j
.str
);
1139 #ifndef BOOST_NO_EXCEPTIONS
1140 } catch (std::exception
& e
)
1147 catch (std::exception
&) {}
1151 // if (!handler) std::cerr << "DISK THREAD: no callback specified" << std::endl;
1152 // else std::cerr << "DISK THREAD: invoking callback" << std::endl;
1153 #ifndef BOOST_NO_EXCEPTIONS
1156 TORRENT_ASSERT(ret
!= -2 || !j
.str
.empty());
1157 if (handler
) m_ios
.post(bind(handler
, ret
, j
));
1158 #ifndef BOOST_NO_EXCEPTIONS
1159 } catch (std::exception
&)
1161 TORRENT_ASSERT(false);
1165 TORRENT_ASSERT(false);