1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
5 #include "db/db_impl.h"
13 #include "db/builder.h"
14 #include "db/db_iter.h"
15 #include "db/dbformat.h"
16 #include "db/filename.h"
17 #include "db/log_reader.h"
18 #include "db/log_writer.h"
19 #include "db/memtable.h"
20 #include "db/table_cache.h"
21 #include "db/version_set.h"
22 #include "db/write_batch_internal.h"
23 #include "leveldb/db.h"
24 #include "leveldb/env.h"
25 #include "leveldb/status.h"
26 #include "leveldb/table.h"
27 #include "leveldb/table_builder.h"
28 #include "port/port.h"
29 #include "table/block.h"
30 #include "table/merger.h"
31 #include "table/two_level_iterator.h"
32 #include "util/coding.h"
33 #include "util/logging.h"
34 #include "util/mutexlock.h"
38 const int kNumNonTableCacheFiles
= 10;
40 // Information kept for every waiting writer
41 struct DBImpl::Writer
{
48 explicit Writer(port::Mutex
* mu
) : cv(mu
) { }
51 struct DBImpl::CompactionState
{
52 Compaction
* const compaction
;
54 // Sequence numbers < smallest_snapshot are not significant since we
55 // will never have to service a snapshot below smallest_snapshot.
56 // Therefore if we have seen a sequence number S <= smallest_snapshot,
57 // we can drop all entries for the same key with sequence numbers < S.
58 SequenceNumber smallest_snapshot
;
60 // Files produced by compaction
64 InternalKey smallest
, largest
;
66 std::vector
<Output
> outputs
;
68 // State kept for output being generated
69 WritableFile
* outfile
;
70 TableBuilder
* builder
;
74 Output
* current_output() { return &outputs
[outputs
.size()-1]; }
76 explicit CompactionState(Compaction
* c
)
84 // Fix user-supplied options to be reasonable
85 template <class T
,class V
>
86 static void ClipToRange(T
* ptr
, V minvalue
, V maxvalue
) {
87 if (static_cast<V
>(*ptr
) > maxvalue
) *ptr
= maxvalue
;
88 if (static_cast<V
>(*ptr
) < minvalue
) *ptr
= minvalue
;
90 Options
SanitizeOptions(const std::string
& dbname
,
91 const InternalKeyComparator
* icmp
,
92 const InternalFilterPolicy
* ipolicy
,
95 result
.comparator
= icmp
;
96 result
.filter_policy
= (src
.filter_policy
!= NULL
) ? ipolicy
: NULL
;
97 ClipToRange(&result
.max_open_files
, 64 + kNumNonTableCacheFiles
, 50000);
98 ClipToRange(&result
.write_buffer_size
, 64<<10, 1<<30);
99 ClipToRange(&result
.max_file_size
, 1<<20, 1<<30);
100 ClipToRange(&result
.block_size
, 1<<10, 4<<20);
101 if (result
.info_log
== NULL
) {
102 // Open a log file in the same directory as the db
103 src
.env
->CreateDir(dbname
); // In case it does not exist
104 src
.env
->RenameFile(InfoLogFileName(dbname
), OldInfoLogFileName(dbname
));
105 Status s
= src
.env
->NewLogger(InfoLogFileName(dbname
), &result
.info_log
);
107 // No place suitable for logging
108 result
.info_log
= NULL
;
111 if (result
.block_cache
== NULL
) {
112 result
.block_cache
= NewLRUCache(8 << 20);
117 DBImpl::DBImpl(const Options
& raw_options
, const std::string
& dbname
)
118 : env_(raw_options
.env
),
119 internal_comparator_(raw_options
.comparator
),
120 internal_filter_policy_(raw_options
.filter_policy
),
121 options_(SanitizeOptions(dbname
, &internal_comparator_
,
122 &internal_filter_policy_
, raw_options
)),
123 owns_info_log_(options_
.info_log
!= raw_options
.info_log
),
124 owns_cache_(options_
.block_cache
!= raw_options
.block_cache
),
127 shutting_down_(NULL
),
135 tmp_batch_(new WriteBatch
),
136 bg_compaction_scheduled_(false),
137 manual_compaction_(NULL
) {
138 has_imm_
.Release_Store(NULL
);
140 // Reserve ten files or so for other uses and give the rest to TableCache.
141 const int table_cache_size
= options_
.max_open_files
- kNumNonTableCacheFiles
;
142 table_cache_
= new TableCache(dbname_
, &options_
, table_cache_size
);
144 versions_
= new VersionSet(dbname_
, &options_
, table_cache_
,
145 &internal_comparator_
);
149 // Wait for background work to finish
151 shutting_down_
.Release_Store(this); // Any non-NULL value is ok
152 while (bg_compaction_scheduled_
) {
157 if (db_lock_
!= NULL
) {
158 env_
->UnlockFile(db_lock_
);
162 if (mem_
!= NULL
) mem_
->Unref();
163 if (imm_
!= NULL
) imm_
->Unref();
169 if (owns_info_log_
) {
170 delete options_
.info_log
;
173 delete options_
.block_cache
;
177 Status
DBImpl::NewDB() {
179 new_db
.SetComparatorName(user_comparator()->Name());
180 new_db
.SetLogNumber(0);
181 new_db
.SetNextFile(2);
182 new_db
.SetLastSequence(0);
184 const std::string manifest
= DescriptorFileName(dbname_
, 1);
186 Status s
= env_
->NewWritableFile(manifest
, &file
);
191 log::Writer
log(file
);
193 new_db
.EncodeTo(&record
);
194 s
= log
.AddRecord(record
);
201 // Make "CURRENT" file that points to the new manifest file.
202 s
= SetCurrentFile(env_
, dbname_
, 1);
204 env_
->DeleteFile(manifest
);
209 void DBImpl::MaybeIgnoreError(Status
* s
) const {
210 if (s
->ok() || options_
.paranoid_checks
) {
213 Log(options_
.info_log
, "Ignoring error %s", s
->ToString().c_str());
218 void DBImpl::DeleteObsoleteFiles() {
219 if (!bg_error_
.ok()) {
220 // After a background error, we don't know whether a new version may
221 // or may not have been committed, so we cannot safely garbage collect.
225 // Make a set of all of the live files
226 std::set
<uint64_t> live
= pending_outputs_
;
227 versions_
->AddLiveFiles(&live
);
229 std::vector
<std::string
> filenames
;
230 env_
->GetChildren(dbname_
, &filenames
); // Ignoring errors on purpose
233 for (size_t i
= 0; i
< filenames
.size(); i
++) {
234 if (ParseFileName(filenames
[i
], &number
, &type
)) {
238 keep
= ((number
>= versions_
->LogNumber()) ||
239 (number
== versions_
->PrevLogNumber()));
241 case kDescriptorFile
:
242 // Keep my manifest file, and any newer incarnations'
243 // (in case there is a race that allows other incarnations)
244 keep
= (number
>= versions_
->ManifestFileNumber());
247 keep
= (live
.find(number
) != live
.end());
250 // Any temp files that are currently being written to must
251 // be recorded in pending_outputs_, which is inserted into "live"
252 keep
= (live
.find(number
) != live
.end());
262 if (type
== kTableFile
) {
263 table_cache_
->Evict(number
);
265 Log(options_
.info_log
, "Delete type=%d #%lld\n",
267 static_cast<unsigned long long>(number
));
268 env_
->DeleteFile(dbname_
+ "/" + filenames
[i
]);
274 Status
DBImpl::Recover(VersionEdit
* edit
, bool *save_manifest
) {
277 // Ignore error from CreateDir since the creation of the DB is
278 // committed only when the descriptor is created, and this directory
279 // may already exist from a previous failed creation attempt.
280 env_
->CreateDir(dbname_
);
281 assert(db_lock_
== NULL
);
282 Status s
= env_
->LockFile(LockFileName(dbname_
), &db_lock_
);
287 if (!env_
->FileExists(CurrentFileName(dbname_
))) {
288 if (options_
.create_if_missing
) {
294 return Status::InvalidArgument(
295 dbname_
, "does not exist (create_if_missing is false)");
298 if (options_
.error_if_exists
) {
299 return Status::InvalidArgument(
300 dbname_
, "exists (error_if_exists is true)");
304 s
= versions_
->Recover(save_manifest
);
308 SequenceNumber
max_sequence(0);
310 // Recover from all newer log files than the ones named in the
311 // descriptor (new log files may have been added by the previous
312 // incarnation without registering them in the descriptor).
314 // Note that PrevLogNumber() is no longer used, but we pay
315 // attention to it in case we are recovering a database
316 // produced by an older version of leveldb.
317 const uint64_t min_log
= versions_
->LogNumber();
318 const uint64_t prev_log
= versions_
->PrevLogNumber();
319 std::vector
<std::string
> filenames
;
320 s
= env_
->GetChildren(dbname_
, &filenames
);
324 std::set
<uint64_t> expected
;
325 versions_
->AddLiveFiles(&expected
);
328 std::vector
<uint64_t> logs
;
329 for (size_t i
= 0; i
< filenames
.size(); i
++) {
330 if (ParseFileName(filenames
[i
], &number
, &type
)) {
331 expected
.erase(number
);
332 if (type
== kLogFile
&& ((number
>= min_log
) || (number
== prev_log
)))
333 logs
.push_back(number
);
336 if (!expected
.empty()) {
338 snprintf(buf
, sizeof(buf
), "%d missing files; e.g.",
339 static_cast<int>(expected
.size()));
340 return Status::Corruption(buf
, TableFileName(dbname_
, *(expected
.begin())));
343 // Recover in the order in which the logs were generated
344 std::sort(logs
.begin(), logs
.end());
345 for (size_t i
= 0; i
< logs
.size(); i
++) {
346 s
= RecoverLogFile(logs
[i
], (i
== logs
.size() - 1), save_manifest
, edit
,
352 // The previous incarnation may not have written any MANIFEST
353 // records after allocating this log number. So we manually
354 // update the file number allocation counter in VersionSet.
355 versions_
->MarkFileNumberUsed(logs
[i
]);
358 if (versions_
->LastSequence() < max_sequence
) {
359 versions_
->SetLastSequence(max_sequence
);
365 Status
DBImpl::RecoverLogFile(uint64_t log_number
, bool last_log
,
366 bool* save_manifest
, VersionEdit
* edit
,
367 SequenceNumber
* max_sequence
) {
368 struct LogReporter
: public log::Reader::Reporter
{
372 Status
* status
; // NULL if options_.paranoid_checks==false
373 virtual void Corruption(size_t bytes
, const Status
& s
) {
374 Log(info_log
, "%s%s: dropping %d bytes; %s",
375 (this->status
== NULL
? "(ignoring error) " : ""),
376 fname
, static_cast<int>(bytes
), s
.ToString().c_str());
377 if (this->status
!= NULL
&& this->status
->ok()) *this->status
= s
;
384 std::string fname
= LogFileName(dbname_
, log_number
);
385 SequentialFile
* file
;
386 Status status
= env_
->NewSequentialFile(fname
, &file
);
388 MaybeIgnoreError(&status
);
392 // Create the log reader.
393 LogReporter reporter
;
395 reporter
.info_log
= options_
.info_log
;
396 reporter
.fname
= fname
.c_str();
397 reporter
.status
= (options_
.paranoid_checks
? &status
: NULL
);
398 // We intentionally make log::Reader do checksumming even if
399 // paranoid_checks==false so that corruptions cause entire commits
400 // to be skipped instead of propagating bad information (like overly
401 // large sequence numbers).
402 log::Reader
reader(file
, &reporter
, true/*checksum*/,
403 0/*initial_offset*/);
404 Log(options_
.info_log
, "Recovering log #%llu",
405 (unsigned long long) log_number
);
407 // Read all the records and add to a memtable
412 MemTable
* mem
= NULL
;
413 while (reader
.ReadRecord(&record
, &scratch
) &&
415 if (record
.size() < 12) {
417 record
.size(), Status::Corruption("log record too small"));
420 WriteBatchInternal::SetContents(&batch
, record
);
423 mem
= new MemTable(internal_comparator_
);
426 status
= WriteBatchInternal::InsertInto(&batch
, mem
);
427 MaybeIgnoreError(&status
);
431 const SequenceNumber last_seq
=
432 WriteBatchInternal::Sequence(&batch
) +
433 WriteBatchInternal::Count(&batch
) - 1;
434 if (last_seq
> *max_sequence
) {
435 *max_sequence
= last_seq
;
438 if (mem
->ApproximateMemoryUsage() > options_
.write_buffer_size
) {
440 *save_manifest
= true;
441 status
= WriteLevel0Table(mem
, edit
, NULL
);
445 // Reflect errors immediately so that conditions like full
446 // file-systems cause the DB::Open() to fail.
454 // See if we should keep reusing the last log file.
455 if (status
.ok() && options_
.reuse_logs
&& last_log
&& compactions
== 0) {
456 assert(logfile_
== NULL
);
457 assert(log_
== NULL
);
458 assert(mem_
== NULL
);
460 if (env_
->GetFileSize(fname
, &lfile_size
).ok() &&
461 env_
->NewAppendableFile(fname
, &logfile_
).ok()) {
462 Log(options_
.info_log
, "Reusing old log %s \n", fname
.c_str());
463 log_
= new log::Writer(logfile_
, lfile_size
);
464 logfile_number_
= log_number
;
469 // mem can be NULL if lognum exists but was empty.
470 mem_
= new MemTable(internal_comparator_
);
477 // mem did not get reused; compact it.
479 *save_manifest
= true;
480 status
= WriteLevel0Table(mem
, edit
, NULL
);
488 Status
DBImpl::WriteLevel0Table(MemTable
* mem
, VersionEdit
* edit
,
491 const uint64_t start_micros
= env_
->NowMicros();
493 meta
.number
= versions_
->NewFileNumber();
494 pending_outputs_
.insert(meta
.number
);
495 Iterator
* iter
= mem
->NewIterator();
496 Log(options_
.info_log
, "Level-0 table #%llu: started",
497 (unsigned long long) meta
.number
);
502 s
= BuildTable(dbname_
, env_
, options_
, table_cache_
, iter
, &meta
);
506 Log(options_
.info_log
, "Level-0 table #%llu: %lld bytes %s",
507 (unsigned long long) meta
.number
,
508 (unsigned long long) meta
.file_size
,
509 s
.ToString().c_str());
511 pending_outputs_
.erase(meta
.number
);
514 // Note that if file_size is zero, the file has been deleted and
515 // should not be added to the manifest.
517 if (s
.ok() && meta
.file_size
> 0) {
518 const Slice min_user_key
= meta
.smallest
.user_key();
519 const Slice max_user_key
= meta
.largest
.user_key();
521 level
= base
->PickLevelForMemTableOutput(min_user_key
, max_user_key
);
523 edit
->AddFile(level
, meta
.number
, meta
.file_size
,
524 meta
.smallest
, meta
.largest
);
527 CompactionStats stats
;
528 stats
.micros
= env_
->NowMicros() - start_micros
;
529 stats
.bytes_written
= meta
.file_size
;
530 stats_
[level
].Add(stats
);
534 void DBImpl::CompactMemTable() {
536 assert(imm_
!= NULL
);
538 // Save the contents of the memtable as a new Table
540 Version
* base
= versions_
->current();
542 Status s
= WriteLevel0Table(imm_
, &edit
, base
);
545 if (s
.ok() && shutting_down_
.Acquire_Load()) {
546 s
= Status::IOError("Deleting DB during memtable compaction");
549 // Replace immutable memtable with the generated Table
551 edit
.SetPrevLogNumber(0);
552 edit
.SetLogNumber(logfile_number_
); // Earlier logs no longer needed
553 s
= versions_
->LogAndApply(&edit
, &mutex_
);
557 // Commit to the new state
560 has_imm_
.Release_Store(NULL
);
561 DeleteObsoleteFiles();
563 RecordBackgroundError(s
);
567 void DBImpl::CompactRange(const Slice
* begin
, const Slice
* end
) {
568 int max_level_with_files
= 1;
570 MutexLock
l(&mutex_
);
571 Version
* base
= versions_
->current();
572 for (int level
= 1; level
< config::kNumLevels
; level
++) {
573 if (base
->OverlapInLevel(level
, begin
, end
)) {
574 max_level_with_files
= level
;
578 TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
579 for (int level
= 0; level
< max_level_with_files
; level
++) {
580 TEST_CompactRange(level
, begin
, end
);
584 void DBImpl::TEST_CompactRange(int level
, const Slice
* begin
,const Slice
* end
) {
586 assert(level
+ 1 < config::kNumLevels
);
588 InternalKey begin_storage
, end_storage
;
590 ManualCompaction manual
;
591 manual
.level
= level
;
596 begin_storage
= InternalKey(*begin
, kMaxSequenceNumber
, kValueTypeForSeek
);
597 manual
.begin
= &begin_storage
;
602 end_storage
= InternalKey(*end
, 0, static_cast<ValueType
>(0));
603 manual
.end
= &end_storage
;
606 MutexLock
l(&mutex_
);
607 while (!manual
.done
&& !shutting_down_
.Acquire_Load() && bg_error_
.ok()) {
608 if (manual_compaction_
== NULL
) { // Idle
609 manual_compaction_
= &manual
;
610 MaybeScheduleCompaction();
611 } else { // Running either my compaction or another compaction.
615 if (manual_compaction_
== &manual
) {
616 // Cancel my manual compaction since we aborted early for some reason.
617 manual_compaction_
= NULL
;
621 Status
DBImpl::TEST_CompactMemTable() {
622 // NULL batch means just wait for earlier writes to be done
623 Status s
= Write(WriteOptions(), NULL
);
625 // Wait until the compaction completes
626 MutexLock
l(&mutex_
);
627 while (imm_
!= NULL
&& bg_error_
.ok()) {
637 void DBImpl::RecordBackgroundError(const Status
& s
) {
639 if (bg_error_
.ok()) {
645 void DBImpl::MaybeScheduleCompaction() {
647 if (bg_compaction_scheduled_
) {
649 } else if (shutting_down_
.Acquire_Load()) {
650 // DB is being deleted; no more background compactions
651 } else if (!bg_error_
.ok()) {
652 // Already got an error; no more changes
653 } else if (imm_
== NULL
&&
654 manual_compaction_
== NULL
&&
655 !versions_
->NeedsCompaction()) {
656 // No work to be done
658 bg_compaction_scheduled_
= true;
659 env_
->Schedule(&DBImpl::BGWork
, this);
663 void DBImpl::BGWork(void* db
) {
664 reinterpret_cast<DBImpl
*>(db
)->BackgroundCall();
667 void DBImpl::BackgroundCall() {
668 MutexLock
l(&mutex_
);
669 assert(bg_compaction_scheduled_
);
670 if (shutting_down_
.Acquire_Load()) {
671 // No more background work when shutting down.
672 } else if (!bg_error_
.ok()) {
673 // No more background work after a background error.
675 BackgroundCompaction();
678 bg_compaction_scheduled_
= false;
680 // Previous compaction may have produced too many files in a level,
681 // so reschedule another compaction if needed.
682 MaybeScheduleCompaction();
686 void DBImpl::BackgroundCompaction() {
695 bool is_manual
= (manual_compaction_
!= NULL
);
696 InternalKey manual_end
;
698 ManualCompaction
* m
= manual_compaction_
;
699 c
= versions_
->CompactRange(m
->level
, m
->begin
, m
->end
);
700 m
->done
= (c
== NULL
);
702 manual_end
= c
->input(0, c
->num_input_files(0) - 1)->largest
;
704 Log(options_
.info_log
,
705 "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
707 (m
->begin
? m
->begin
->DebugString().c_str() : "(begin)"),
708 (m
->end
? m
->end
->DebugString().c_str() : "(end)"),
709 (m
->done
? "(end)" : manual_end
.DebugString().c_str()));
711 c
= versions_
->PickCompaction();
717 } else if (!is_manual
&& c
->IsTrivialMove()) {
718 // Move file to next level
719 assert(c
->num_input_files(0) == 1);
720 FileMetaData
* f
= c
->input(0, 0);
721 c
->edit()->DeleteFile(c
->level(), f
->number
);
722 c
->edit()->AddFile(c
->level() + 1, f
->number
, f
->file_size
,
723 f
->smallest
, f
->largest
);
724 status
= versions_
->LogAndApply(c
->edit(), &mutex_
);
726 RecordBackgroundError(status
);
728 VersionSet::LevelSummaryStorage tmp
;
729 Log(options_
.info_log
, "Moved #%lld to level-%d %lld bytes %s: %s\n",
730 static_cast<unsigned long long>(f
->number
),
732 static_cast<unsigned long long>(f
->file_size
),
733 status
.ToString().c_str(),
734 versions_
->LevelSummary(&tmp
));
736 CompactionState
* compact
= new CompactionState(c
);
737 status
= DoCompactionWork(compact
);
739 RecordBackgroundError(status
);
741 CleanupCompaction(compact
);
743 DeleteObsoleteFiles();
749 } else if (shutting_down_
.Acquire_Load()) {
750 // Ignore compaction errors found during shutting down
752 Log(options_
.info_log
,
753 "Compaction error: %s", status
.ToString().c_str());
757 ManualCompaction
* m
= manual_compaction_
;
762 // We only compacted part of the requested range. Update *m
763 // to the range that is left to be compacted.
764 m
->tmp_storage
= manual_end
;
765 m
->begin
= &m
->tmp_storage
;
767 manual_compaction_
= NULL
;
771 void DBImpl::CleanupCompaction(CompactionState
* compact
) {
773 if (compact
->builder
!= NULL
) {
774 // May happen if we get a shutdown call in the middle of compaction
775 compact
->builder
->Abandon();
776 delete compact
->builder
;
778 assert(compact
->outfile
== NULL
);
780 delete compact
->outfile
;
781 for (size_t i
= 0; i
< compact
->outputs
.size(); i
++) {
782 const CompactionState::Output
& out
= compact
->outputs
[i
];
783 pending_outputs_
.erase(out
.number
);
788 Status
DBImpl::OpenCompactionOutputFile(CompactionState
* compact
) {
789 assert(compact
!= NULL
);
790 assert(compact
->builder
== NULL
);
791 uint64_t file_number
;
794 file_number
= versions_
->NewFileNumber();
795 pending_outputs_
.insert(file_number
);
796 CompactionState::Output out
;
797 out
.number
= file_number
;
798 out
.smallest
.Clear();
800 compact
->outputs
.push_back(out
);
804 // Make the output file
805 std::string fname
= TableFileName(dbname_
, file_number
);
806 Status s
= env_
->NewWritableFile(fname
, &compact
->outfile
);
808 compact
->builder
= new TableBuilder(options_
, compact
->outfile
);
813 Status
DBImpl::FinishCompactionOutputFile(CompactionState
* compact
,
815 assert(compact
!= NULL
);
816 assert(compact
->outfile
!= NULL
);
817 assert(compact
->builder
!= NULL
);
819 const uint64_t output_number
= compact
->current_output()->number
;
820 assert(output_number
!= 0);
822 // Check for iterator errors
823 Status s
= input
->status();
824 const uint64_t current_entries
= compact
->builder
->NumEntries();
826 s
= compact
->builder
->Finish();
828 compact
->builder
->Abandon();
830 const uint64_t current_bytes
= compact
->builder
->FileSize();
831 compact
->current_output()->file_size
= current_bytes
;
832 compact
->total_bytes
+= current_bytes
;
833 delete compact
->builder
;
834 compact
->builder
= NULL
;
836 // Finish and check for file errors
838 s
= compact
->outfile
->Sync();
841 s
= compact
->outfile
->Close();
843 delete compact
->outfile
;
844 compact
->outfile
= NULL
;
846 if (s
.ok() && current_entries
> 0) {
847 // Verify that the table is usable
848 Iterator
* iter
= table_cache_
->NewIterator(ReadOptions(),
854 Log(options_
.info_log
,
855 "Generated table #%llu@%d: %lld keys, %lld bytes",
856 (unsigned long long) output_number
,
857 compact
->compaction
->level(),
858 (unsigned long long) current_entries
,
859 (unsigned long long) current_bytes
);
866 Status
DBImpl::InstallCompactionResults(CompactionState
* compact
) {
868 Log(options_
.info_log
, "Compacted %d@%d + %d@%d files => %lld bytes",
869 compact
->compaction
->num_input_files(0),
870 compact
->compaction
->level(),
871 compact
->compaction
->num_input_files(1),
872 compact
->compaction
->level() + 1,
873 static_cast<long long>(compact
->total_bytes
));
875 // Add compaction outputs
876 compact
->compaction
->AddInputDeletions(compact
->compaction
->edit());
877 const int level
= compact
->compaction
->level();
878 for (size_t i
= 0; i
< compact
->outputs
.size(); i
++) {
879 const CompactionState::Output
& out
= compact
->outputs
[i
];
880 compact
->compaction
->edit()->AddFile(
882 out
.number
, out
.file_size
, out
.smallest
, out
.largest
);
884 return versions_
->LogAndApply(compact
->compaction
->edit(), &mutex_
);
887 Status
DBImpl::DoCompactionWork(CompactionState
* compact
) {
888 const uint64_t start_micros
= env_
->NowMicros();
889 int64_t imm_micros
= 0; // Micros spent doing imm_ compactions
891 Log(options_
.info_log
, "Compacting %d@%d + %d@%d files",
892 compact
->compaction
->num_input_files(0),
893 compact
->compaction
->level(),
894 compact
->compaction
->num_input_files(1),
895 compact
->compaction
->level() + 1);
897 assert(versions_
->NumLevelFiles(compact
->compaction
->level()) > 0);
898 assert(compact
->builder
== NULL
);
899 assert(compact
->outfile
== NULL
);
900 if (snapshots_
.empty()) {
901 compact
->smallest_snapshot
= versions_
->LastSequence();
903 compact
->smallest_snapshot
= snapshots_
.oldest()->number_
;
906 // Release mutex while we're actually doing the compaction work
909 Iterator
* input
= versions_
->MakeInputIterator(compact
->compaction
);
910 input
->SeekToFirst();
912 ParsedInternalKey ikey
;
913 std::string current_user_key
;
914 bool has_current_user_key
= false;
915 SequenceNumber last_sequence_for_key
= kMaxSequenceNumber
;
916 for (; input
->Valid() && !shutting_down_
.Acquire_Load(); ) {
917 // Prioritize immutable compaction work
918 if (has_imm_
.NoBarrier_Load() != NULL
) {
919 const uint64_t imm_start
= env_
->NowMicros();
923 bg_cv_
.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
926 imm_micros
+= (env_
->NowMicros() - imm_start
);
929 Slice key
= input
->key();
930 if (compact
->compaction
->ShouldStopBefore(key
) &&
931 compact
->builder
!= NULL
) {
932 status
= FinishCompactionOutputFile(compact
, input
);
938 // Handle key/value, add to state, etc.
940 if (!ParseInternalKey(key
, &ikey
)) {
941 // Do not hide error keys
942 current_user_key
.clear();
943 has_current_user_key
= false;
944 last_sequence_for_key
= kMaxSequenceNumber
;
946 if (!has_current_user_key
||
947 user_comparator()->Compare(ikey
.user_key
,
948 Slice(current_user_key
)) != 0) {
949 // First occurrence of this user key
950 current_user_key
.assign(ikey
.user_key
.data(), ikey
.user_key
.size());
951 has_current_user_key
= true;
952 last_sequence_for_key
= kMaxSequenceNumber
;
955 if (last_sequence_for_key
<= compact
->smallest_snapshot
) {
956 // Hidden by an newer entry for same user key
958 } else if (ikey
.type
== kTypeDeletion
&&
959 ikey
.sequence
<= compact
->smallest_snapshot
&&
960 compact
->compaction
->IsBaseLevelForKey(ikey
.user_key
)) {
961 // For this user key:
962 // (1) there is no data in higher levels
963 // (2) data in lower levels will have larger sequence numbers
964 // (3) data in layers that are being compacted here and have
965 // smaller sequence numbers will be dropped in the next
966 // few iterations of this loop (by rule (A) above).
967 // Therefore this deletion marker is obsolete and can be dropped.
971 last_sequence_for_key
= ikey
.sequence
;
974 Log(options_
.info_log
,
975 " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
976 "%d smallest_snapshot: %d",
977 ikey
.user_key
.ToString().c_str(),
978 (int)ikey
.sequence
, ikey
.type
, kTypeValue
, drop
,
979 compact
->compaction
->IsBaseLevelForKey(ikey
.user_key
),
980 (int)last_sequence_for_key
, (int)compact
->smallest_snapshot
);
984 // Open output file if necessary
985 if (compact
->builder
== NULL
) {
986 status
= OpenCompactionOutputFile(compact
);
991 if (compact
->builder
->NumEntries() == 0) {
992 compact
->current_output()->smallest
.DecodeFrom(key
);
994 compact
->current_output()->largest
.DecodeFrom(key
);
995 compact
->builder
->Add(key
, input
->value());
997 // Close output file if it is big enough
998 if (compact
->builder
->FileSize() >=
999 compact
->compaction
->MaxOutputFileSize()) {
1000 status
= FinishCompactionOutputFile(compact
, input
);
1010 if (status
.ok() && shutting_down_
.Acquire_Load()) {
1011 status
= Status::IOError("Deleting DB during compaction");
1013 if (status
.ok() && compact
->builder
!= NULL
) {
1014 status
= FinishCompactionOutputFile(compact
, input
);
1017 status
= input
->status();
1022 CompactionStats stats
;
1023 stats
.micros
= env_
->NowMicros() - start_micros
- imm_micros
;
1024 for (int which
= 0; which
< 2; which
++) {
1025 for (int i
= 0; i
< compact
->compaction
->num_input_files(which
); i
++) {
1026 stats
.bytes_read
+= compact
->compaction
->input(which
, i
)->file_size
;
1029 for (size_t i
= 0; i
< compact
->outputs
.size(); i
++) {
1030 stats
.bytes_written
+= compact
->outputs
[i
].file_size
;
1034 stats_
[compact
->compaction
->level() + 1].Add(stats
);
1037 status
= InstallCompactionResults(compact
);
1040 RecordBackgroundError(status
);
1042 VersionSet::LevelSummaryStorage tmp
;
1043 Log(options_
.info_log
,
1044 "compacted to: %s", versions_
->LevelSummary(&tmp
));
1056 static void CleanupIteratorState(void* arg1
, void* arg2
) {
1057 IterState
* state
= reinterpret_cast<IterState
*>(arg1
);
1059 state
->mem
->Unref();
1060 if (state
->imm
!= NULL
) state
->imm
->Unref();
1061 state
->version
->Unref();
1062 state
->mu
->Unlock();
1067 Iterator
* DBImpl::NewInternalIterator(const ReadOptions
& options
,
1068 SequenceNumber
* latest_snapshot
,
1070 IterState
* cleanup
= new IterState
;
1072 *latest_snapshot
= versions_
->LastSequence();
1074 // Collect together all needed child iterators
1075 std::vector
<Iterator
*> list
;
1076 list
.push_back(mem_
->NewIterator());
1079 list
.push_back(imm_
->NewIterator());
1082 versions_
->current()->AddIterators(options
, &list
);
1083 Iterator
* internal_iter
=
1084 NewMergingIterator(&internal_comparator_
, &list
[0], list
.size());
1085 versions_
->current()->Ref();
1087 cleanup
->mu
= &mutex_
;
1088 cleanup
->mem
= mem_
;
1089 cleanup
->imm
= imm_
;
1090 cleanup
->version
= versions_
->current();
1091 internal_iter
->RegisterCleanup(CleanupIteratorState
, cleanup
, NULL
);
1095 return internal_iter
;
1098 Iterator
* DBImpl::TEST_NewInternalIterator() {
1099 SequenceNumber ignored
;
1100 uint32_t ignored_seed
;
1101 return NewInternalIterator(ReadOptions(), &ignored
, &ignored_seed
);
1104 int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1105 MutexLock
l(&mutex_
);
1106 return versions_
->MaxNextLevelOverlappingBytes();
1109 Status
DBImpl::Get(const ReadOptions
& options
,
1111 std::string
* value
) {
1113 MutexLock
l(&mutex_
);
1114 SequenceNumber snapshot
;
1115 if (options
.snapshot
!= NULL
) {
1116 snapshot
= reinterpret_cast<const SnapshotImpl
*>(options
.snapshot
)->number_
;
1118 snapshot
= versions_
->LastSequence();
1121 MemTable
* mem
= mem_
;
1122 MemTable
* imm
= imm_
;
1123 Version
* current
= versions_
->current();
1125 if (imm
!= NULL
) imm
->Ref();
1128 bool have_stat_update
= false;
1129 Version::GetStats stats
;
1131 // Unlock while reading from files and memtables
1134 // First look in the memtable, then in the immutable memtable (if any).
1135 LookupKey
lkey(key
, snapshot
);
1136 if (mem
->Get(lkey
, value
, &s
)) {
1138 } else if (imm
!= NULL
&& imm
->Get(lkey
, value
, &s
)) {
1141 s
= current
->Get(options
, lkey
, value
, &stats
);
1142 have_stat_update
= true;
1147 if (have_stat_update
&& current
->UpdateStats(stats
)) {
1148 MaybeScheduleCompaction();
1151 if (imm
!= NULL
) imm
->Unref();
1156 Iterator
* DBImpl::NewIterator(const ReadOptions
& options
) {
1157 SequenceNumber latest_snapshot
;
1159 Iterator
* iter
= NewInternalIterator(options
, &latest_snapshot
, &seed
);
1160 return NewDBIterator(
1161 this, user_comparator(), iter
,
1162 (options
.snapshot
!= NULL
1163 ? reinterpret_cast<const SnapshotImpl
*>(options
.snapshot
)->number_
1168 void DBImpl::RecordReadSample(Slice key
) {
1169 MutexLock
l(&mutex_
);
1170 if (versions_
->current()->RecordReadSample(key
)) {
1171 MaybeScheduleCompaction();
1175 const Snapshot
* DBImpl::GetSnapshot() {
1176 MutexLock
l(&mutex_
);
1177 return snapshots_
.New(versions_
->LastSequence());
1180 void DBImpl::ReleaseSnapshot(const Snapshot
* s
) {
1181 MutexLock
l(&mutex_
);
1182 snapshots_
.Delete(reinterpret_cast<const SnapshotImpl
*>(s
));
1185 // Convenience methods
1186 Status
DBImpl::Put(const WriteOptions
& o
, const Slice
& key
, const Slice
& val
) {
1187 return DB::Put(o
, key
, val
);
1190 Status
DBImpl::Delete(const WriteOptions
& options
, const Slice
& key
) {
1191 return DB::Delete(options
, key
);
1194 Status
DBImpl::Write(const WriteOptions
& options
, WriteBatch
* my_batch
) {
1197 w
.sync
= options
.sync
;
1200 MutexLock
l(&mutex_
);
1201 writers_
.push_back(&w
);
1202 while (!w
.done
&& &w
!= writers_
.front()) {
1209 // May temporarily unlock and wait.
1210 Status status
= MakeRoomForWrite(my_batch
== NULL
);
1211 uint64_t last_sequence
= versions_
->LastSequence();
1212 Writer
* last_writer
= &w
;
1213 if (status
.ok() && my_batch
!= NULL
) { // NULL batch is for compactions
1214 WriteBatch
* updates
= BuildBatchGroup(&last_writer
);
1215 WriteBatchInternal::SetSequence(updates
, last_sequence
+ 1);
1216 last_sequence
+= WriteBatchInternal::Count(updates
);
1218 // Add to log and apply to memtable. We can release the lock
1219 // during this phase since &w is currently responsible for logging
1220 // and protects against concurrent loggers and concurrent writes
1224 status
= log_
->AddRecord(WriteBatchInternal::Contents(updates
));
1225 bool sync_error
= false;
1226 if (status
.ok() && options
.sync
) {
1227 status
= logfile_
->Sync();
1233 status
= WriteBatchInternal::InsertInto(updates
, mem_
);
1237 // The state of the log file is indeterminate: the log record we
1238 // just added may or may not show up when the DB is re-opened.
1239 // So we force the DB into a mode where all future writes fail.
1240 RecordBackgroundError(status
);
1243 if (updates
== tmp_batch_
) tmp_batch_
->Clear();
1245 versions_
->SetLastSequence(last_sequence
);
1249 Writer
* ready
= writers_
.front();
1250 writers_
.pop_front();
1252 ready
->status
= status
;
1256 if (ready
== last_writer
) break;
1259 // Notify new head of write queue
1260 if (!writers_
.empty()) {
1261 writers_
.front()->cv
.Signal();
1267 // REQUIRES: Writer list must be non-empty
1268 // REQUIRES: First writer must have a non-NULL batch
1269 WriteBatch
* DBImpl::BuildBatchGroup(Writer
** last_writer
) {
1270 assert(!writers_
.empty());
1271 Writer
* first
= writers_
.front();
1272 WriteBatch
* result
= first
->batch
;
1273 assert(result
!= NULL
);
1275 size_t size
= WriteBatchInternal::ByteSize(first
->batch
);
1277 // Allow the group to grow up to a maximum size, but if the
1278 // original write is small, limit the growth so we do not slow
1279 // down the small write too much.
1280 size_t max_size
= 1 << 20;
1281 if (size
<= (128<<10)) {
1282 max_size
= size
+ (128<<10);
1285 *last_writer
= first
;
1286 std::deque
<Writer
*>::iterator iter
= writers_
.begin();
1287 ++iter
; // Advance past "first"
1288 for (; iter
!= writers_
.end(); ++iter
) {
1290 if (w
->sync
&& !first
->sync
) {
1291 // Do not include a sync write into a batch handled by a non-sync write.
1295 if (w
->batch
!= NULL
) {
1296 size
+= WriteBatchInternal::ByteSize(w
->batch
);
1297 if (size
> max_size
) {
1298 // Do not make batch too big
1302 // Append to *result
1303 if (result
== first
->batch
) {
1304 // Switch to temporary batch instead of disturbing caller's batch
1305 result
= tmp_batch_
;
1306 assert(WriteBatchInternal::Count(result
) == 0);
1307 WriteBatchInternal::Append(result
, first
->batch
);
1309 WriteBatchInternal::Append(result
, w
->batch
);
1316 // REQUIRES: mutex_ is held
1317 // REQUIRES: this thread is currently at the front of the writer queue
1318 Status
DBImpl::MakeRoomForWrite(bool force
) {
1319 mutex_
.AssertHeld();
1320 assert(!writers_
.empty());
1321 bool allow_delay
= !force
;
1324 if (!bg_error_
.ok()) {
1325 // Yield previous error
1330 versions_
->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger
) {
1331 // We are getting close to hitting a hard limit on the number of
1332 // L0 files. Rather than delaying a single write by several
1333 // seconds when we hit the hard limit, start delaying each
1334 // individual write by 1ms to reduce latency variance. Also,
1335 // this delay hands over some CPU to the compaction thread in
1336 // case it is sharing the same core as the writer.
1338 env_
->SleepForMicroseconds(1000);
1339 allow_delay
= false; // Do not delay a single write more than once
1341 } else if (!force
&&
1342 (mem_
->ApproximateMemoryUsage() <= options_
.write_buffer_size
)) {
1343 // There is room in current memtable
1345 } else if (imm_
!= NULL
) {
1346 // We have filled up the current memtable, but the previous
1347 // one is still being compacted, so we wait.
1348 Log(options_
.info_log
, "Current memtable full; waiting...\n");
1350 } else if (versions_
->NumLevelFiles(0) >= config::kL0_StopWritesTrigger
) {
1351 // There are too many level-0 files.
1352 Log(options_
.info_log
, "Too many L0 files; waiting...\n");
1355 // Attempt to switch to a new memtable and trigger compaction of old
1356 assert(versions_
->PrevLogNumber() == 0);
1357 uint64_t new_log_number
= versions_
->NewFileNumber();
1358 WritableFile
* lfile
= NULL
;
1359 s
= env_
->NewWritableFile(LogFileName(dbname_
, new_log_number
), &lfile
);
1361 // Avoid chewing through file number space in a tight loop.
1362 versions_
->ReuseFileNumber(new_log_number
);
1368 logfile_number_
= new_log_number
;
1369 log_
= new log::Writer(lfile
);
1371 has_imm_
.Release_Store(imm_
);
1372 mem_
= new MemTable(internal_comparator_
);
1374 force
= false; // Do not force another compaction if have room
1375 MaybeScheduleCompaction();
1381 bool DBImpl::GetProperty(const Slice
& property
, std::string
* value
) {
1384 MutexLock
l(&mutex_
);
1385 Slice in
= property
;
1386 Slice
prefix("leveldb.");
1387 if (!in
.starts_with(prefix
)) return false;
1388 in
.remove_prefix(prefix
.size());
1390 if (in
.starts_with("num-files-at-level")) {
1391 in
.remove_prefix(strlen("num-files-at-level"));
1393 bool ok
= ConsumeDecimalNumber(&in
, &level
) && in
.empty();
1394 if (!ok
|| level
>= config::kNumLevels
) {
1398 snprintf(buf
, sizeof(buf
), "%d",
1399 versions_
->NumLevelFiles(static_cast<int>(level
)));
1403 } else if (in
== "stats") {
1405 snprintf(buf
, sizeof(buf
),
1407 "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1408 "--------------------------------------------------\n"
1411 for (int level
= 0; level
< config::kNumLevels
; level
++) {
1412 int files
= versions_
->NumLevelFiles(level
);
1413 if (stats_
[level
].micros
> 0 || files
> 0) {
1416 "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
1419 versions_
->NumLevelBytes(level
) / 1048576.0,
1420 stats_
[level
].micros
/ 1e6
,
1421 stats_
[level
].bytes_read
/ 1048576.0,
1422 stats_
[level
].bytes_written
/ 1048576.0);
1427 } else if (in
== "sstables") {
1428 *value
= versions_
->current()->DebugString();
1430 } else if (in
== "approximate-memory-usage") {
1431 size_t total_usage
= options_
.block_cache
->TotalCharge();
1433 total_usage
+= mem_
->ApproximateMemoryUsage();
1436 total_usage
+= imm_
->ApproximateMemoryUsage();
1439 snprintf(buf
, sizeof(buf
), "%llu",
1440 static_cast<unsigned long long>(total_usage
));
1448 void DBImpl::GetApproximateSizes(
1449 const Range
* range
, int n
,
1451 // TODO(opt): better implementation
1454 MutexLock
l(&mutex_
);
1455 versions_
->current()->Ref();
1456 v
= versions_
->current();
1459 for (int i
= 0; i
< n
; i
++) {
1460 // Convert user_key into a corresponding internal key.
1461 InternalKey
k1(range
[i
].start
, kMaxSequenceNumber
, kValueTypeForSeek
);
1462 InternalKey
k2(range
[i
].limit
, kMaxSequenceNumber
, kValueTypeForSeek
);
1463 uint64_t start
= versions_
->ApproximateOffsetOf(v
, k1
);
1464 uint64_t limit
= versions_
->ApproximateOffsetOf(v
, k2
);
1465 sizes
[i
] = (limit
>= start
? limit
- start
: 0);
1469 MutexLock
l(&mutex_
);
1474 // Default implementations of convenience methods that subclasses of DB
1475 // can call if they wish
1476 Status
DB::Put(const WriteOptions
& opt
, const Slice
& key
, const Slice
& value
) {
1478 batch
.Put(key
, value
);
1479 return Write(opt
, &batch
);
1482 Status
DB::Delete(const WriteOptions
& opt
, const Slice
& key
) {
1485 return Write(opt
, &batch
);
1490 Status
DB::Open(const Options
& options
, const std::string
& dbname
,
1494 DBImpl
* impl
= new DBImpl(options
, dbname
);
1495 impl
->mutex_
.Lock();
1497 // Recover handles create_if_missing, error_if_exists
1498 bool save_manifest
= false;
1499 Status s
= impl
->Recover(&edit
, &save_manifest
);
1500 if (s
.ok() && impl
->mem_
== NULL
) {
1501 // Create new log and a corresponding memtable.
1502 uint64_t new_log_number
= impl
->versions_
->NewFileNumber();
1503 WritableFile
* lfile
;
1504 s
= options
.env
->NewWritableFile(LogFileName(dbname
, new_log_number
),
1507 edit
.SetLogNumber(new_log_number
);
1508 impl
->logfile_
= lfile
;
1509 impl
->logfile_number_
= new_log_number
;
1510 impl
->log_
= new log::Writer(lfile
);
1511 impl
->mem_
= new MemTable(impl
->internal_comparator_
);
1515 if (s
.ok() && save_manifest
) {
1516 edit
.SetPrevLogNumber(0); // No older logs needed after recovery.
1517 edit
.SetLogNumber(impl
->logfile_number_
);
1518 s
= impl
->versions_
->LogAndApply(&edit
, &impl
->mutex_
);
1521 impl
->DeleteObsoleteFiles();
1522 impl
->MaybeScheduleCompaction();
1524 impl
->mutex_
.Unlock();
1526 assert(impl
->mem_
!= NULL
);
1534 Snapshot::~Snapshot() {
1537 Status
DestroyDB(const std::string
& dbname
, const Options
& options
) {
1538 Env
* env
= options
.env
;
1539 std::vector
<std::string
> filenames
;
1540 // Ignore error in case directory does not exist
1541 env
->GetChildren(dbname
, &filenames
);
1542 if (filenames
.empty()) {
1543 return Status::OK();
1547 const std::string lockname
= LockFileName(dbname
);
1548 Status result
= env
->LockFile(lockname
, &lock
);
1552 for (size_t i
= 0; i
< filenames
.size(); i
++) {
1553 if (ParseFileName(filenames
[i
], &number
, &type
) &&
1554 type
!= kDBLockFile
) { // Lock file will be deleted at end
1555 Status del
= env
->DeleteFile(dbname
+ "/" + filenames
[i
]);
1556 if (result
.ok() && !del
.ok()) {
1561 env
->UnlockFile(lock
); // Ignore error since state is already gone
1562 env
->DeleteFile(lockname
);
1563 env
->DeleteDir(dbname
); // Ignore error in case dir contains other files
1568 } // namespace leveldb