LevelDB源碼解析10.創建VersionSet

前面介紹了如何通過一個一個的WAL LOG文件來重新資料庫信息。資料庫在重建的時候,前面有很重要的一步就是重建VersionSet。

DBImple::Recover(...) { s = versions_->Recover(save_manifest);}DBImpl類成員VersionSet* const versions_;

這裡需要看一下版本概念 這裡面所介紹的內容。整體上來說,也就是一張圖。

VersionSet與VersionEdit,以及Builder的關係圖

空的VersionSet

在利用已經存在的數據要重建一個VersionSet的時候。需要在一個空的VersionSet上操作。

DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) : env_(raw_options.env), internal_comparator_(raw_options.comparator), options_(SanitizeOptions(dbname, &internal_comparator_, &internal_filter_policy_, raw_options)), dbname_(dbname), table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) { has_imm_.Release_Store(NULL);}

這裡需要注意一下VersionSet的構造函數了。table_cache什麼的先不要管。

VersionSet::VersionSet(const std::string& dbname, const Options* options, TableCache* table_cache, const InternalKeyComparator* cmp) : env_(options->env), dbname_(dbname), options_(options), table_cache_(table_cache), icmp_(*cmp), next_file_number_(2), manifest_file_number_(0), // Filled by Recover() last_sequence_(0), log_number_(0), prev_log_number_(0), descriptor_file_(NULL), descriptor_log_(NULL), dummy_versions_(this), current_(NULL) { // 這裡當前版本指向的是空的。 // 其值會在AppendVersion裡面更新。 AppendVersion(new Version(this));}void VersionSet::AppendVersion(Version* v) { // Make "v" current assert(v->refs_ == 0); assert(v != current_); if (current_ != NULL) { current_->Unref(); } current_ = v; v->Ref(); // Append to linked list v->prev_ = dummy_versions_.prev_; v->next_ = &dummy_versions_; v->prev_->next_ = v; v->next_->prev_ = v;}

這裡注意去看圖中的雙向鏈表。dummy_versions_就是這樣的一個雙向鏈表。

從初始化的角度來看。dummy_versions_就是在雙向鏈表中插入第一個Version。

如何重建

通過版本概念 可以知道。Manifest文件裡面的每個Record都是記錄了一個VersionEdit。通過這個VersionEdit,可以恢復到DB服務關閉前的狀態。

演算法: 有個空的CurrentVersion while 順序讀入每個VersionEdit: 把每個VersionEdit在CurrentVersion上回放 CurrentVersion就是關機前的版本狀態

代碼也特別容易看懂。

Manifest文件在哪?

即然知道要從manifest文件裡面讀入VersionEdit。那麼問題來?Manifest文件在哪?

LevelDB的設計是把當前的manifest文件名寫在CURRENT文件裡面。

查找manifest的代碼如下:

// Read "CURRENT" file, which contains a pointer to the current manifest file // 這裡讀取CURRENT文件裡面的內容 std::string current; Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current); current.resize(current.size() - 1); // 如果CURRENT文件裡面寫的是MANIFEST-00003 std::string dscname = dbname_ + "/" + current; // 那麼dscname = "MANIFEST-00003" SequentialFile* manifest_file; // 這裡生成一個新的SequentialFile. // 這裡只是生成一個新的內存結構體,指向已經有的文件 // 並且是按照只讀的方式來打開。 s = env_->NewSequentialFile(dscname, &manifest_file);

去掉一些對於讀代碼來說不重要的部分。可以看出,代碼是非常簡單的。

讀取所有的Record

Status VersionSet::Recover(bool *save_manifest) { // .. code to get manifest_file. // VersionBuilder // current_指向當前的version. // 通過初始化函數的構造。current_已經是一個空的version了。 // builder這裡就是相當於回放器。利用current_作為基準。 // 然後把歷史上的操作都在current_上重放。 Builder builder(this, current_); { // 也就是前面的manifestfile. // 通過前面的介紹,知道manifest文件裡面存放的就是一個一個的version_edit。 log::Reader reader(manifest_file, &reporter, true/*checksum*/, 0/*initial_offset*/); Slice record; // 主要起作用的還是record. scratch只是做為一個緩衝區。 std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { VersionEdit edit; s = edit.DecodeFrom(record); // ... code to compare comparator. // 前面的操作都正確的情況builder apply這個VersionEdit. if (s.ok()) { builder.Apply(&edit); } // ... other code. } } // .. other code}

到這裡就可以發現。Apply其實才是關鍵。也就是VersionEdit是如何Apply到Version上的呢?

Apply

Apply就是把一個VersionEdit不斷地Apply到一個Current Version上。但是在恢復過程中。如果是按照如下流程:

version0 + record_a = version1version1 + record_b = version2version2 + record_c = version3

按照

圖中的語義來操作的時候,就會形成

<version0, version1, version2, version3>

並且會被放到雙向鏈表dummy_list裡面。

但是回想一下。當前的操作是處在打開資料庫的初始階段。其實是沒有必要保留

version0, version1, version2

因為根據LevelDB的操作語義。後面的version肯定是最新的。所以只需要version3就可以了。

所以VersionSet::Builder的作用就是直接把所有的VersionEdit,Apply到VersionSet上面。最後形成一個version。中間過程中的各種Version就沒有必要保留了。

具體過程如下:這裡為了簡化代碼,改成了c++11的語法。也就是為了讓代碼更加容易讀懂。

// Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { // Update compaction pointers for (auto &item : edit->compact_pointers_) { const int level = item.first; // 在這個level裡面,下次開始做compact的開始的key // 對於一個level來說,合併的時候的key的指針不是每次都從頭開始掃描。 // 如果每次合併都從這層level的第一個文件掃描到最後一個文件。實際上是 // 會增加讀的壓力。所以一種折中就是每次需要合併的時候,記住上次合併完 // 成後的位置,下次合併的時候從這裡開始合併。 vset_->compact_pointer_[level] = item.second.Encode().ToString(); } // Delete files // 每次version edit都會有需要刪除的文件。 // 比如version0要刪除level0_a, // version1要刪除level1_a, // version2要刪除level2_a // 這裡採用的方法非常簡單。就是直接把相應文件的序號插入進來就可以了。 // 我覺得這裡可以加個assert,那就是,前面一個版本需要刪除的文件 // 在這個版本裡面,就不應該再出現。 for (auto &iter : edit->deleted_files_) { const int level = iter.first; const uint64_t number = iter.second; // 可以在這裡deleted_files加個assert,那就是不可能在這裡找到number. levels_[level].deleted_files.insert(number); } // Add new files // 每個版本都可以增加文件。這裡採取的操作非常簡單。就是把 // number添加到added_files裡面。 // 如果這個文件還在deleted files裡面。那麼就需要刪除之。 // Q: 按理說序號是不斷遞增的。一個版本要刪除的文件,不應該再次added_files. // 此外,還加了對於合併的控制。就是當一個文件經常被掃描到的時候。就需要進行合併了。 // 這裡做了掃描次數的一個統計與閥值設置。 for (auto &item : edit->new_files_) { const int level = item.first; FileMetaData* f = new FileMetaData(item.second); f->refs = 1; // We arrange to automatically compact this file after // a certain number of seeks. Lets assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. f->allowed_seeks = (f->file_size / 16384); if (f->allowed_seeks < 100) f->allowed_seeks = 100; levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } }

生成的版本

當讀取所有的record,並且利用VersionSet生成相應的VersionSet::Builder之後直接利用VersionSet::Recovery中的後半部分

Status VersionSet::Recover(bool *save_manifest) { // .. 前面已經分析過的代碼 .. if (s.ok()) { Version* v = new Version(this); // 新生成一個version. builder.SaveTo(v); // 把builder最後的結果放到version裡面。 Finalize(v); // 把version反映到磁碟上 AppendVersion(v); // 把最後的version放到VersionSet裡面。 // .. other code. } return s;}

SaveTo

這裡看一下如何把VersionSet::Builder的結果放到了一個version裡面。需要注意的是:

Version與Builder的不一樣的地方:Version是一個靜態視角。所以只需要記住當前有的那些文件就可以了。而Builder是一個Action,或者說VersionEdit也是一個Action。動作則是會刪除文件的。在看代碼之前,可以大概想一想如何把Builder保存到一個Version裡面。

原則:v->files是一個vector。為了以後搜索方便。在放數據到這個vector裡面的時候。一定要保證有序性。

假設Base有的文件為

level0: 1level1: 8level2: 13, 14

builder經過多輪record(VersionEdit)累加之後。得到的一個總的操作是

level0: 添加文件2,3, 刪除文件 1level1: 添加文件9,level2: 添加文件10,11,12,17 刪除文件14

那麼如果把"總的操作」作用在Base Version上。操作順序如下:

level0: added_file = 2 為了保證文件的順序性。就是v_->files數組裡面的元素都是有序的。 這個時候,就把小於2的文件,就call MaybeAddFile(1)。 MaybeAddFile發現1是在刪除列表裡面。就跳過了。 added_file = 3 這個時候,base->files[level0]裡面已經沒有數據了。 直接嘗試MaybeAddFile(3)level1: Added_file = 9 為了保證文件的順序性,先把所有小於9的base裡面的文件調用一下 MaybeAddFile也就是call MaybeAddFile(8)按照這個順序合併,並且結果是保存在version裡面。

代碼如下:

// Save the current state in *v. void SaveTo(Version* v) { // 拿到version_set_的比較器。 BySmallestKey cmp; cmp.internal_comparator = &vset_->icmp_; // 處理每一層 for (int level = 0; level < config::kNumLevels; level++) { // 合併添加的文件和已經存在的文件 // 刪除那些deleted files. // 然後把結果存到version v裡面。 // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. // 對於初始化OpenDB時的VersionSet::Builder來說。base_就是指向current_。 const std::vector<FileMetaData*>& base_files = base_->files_[level]; std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin(); std::vector<FileMetaData*>::const_iterator base_end = base_files.end(); // 這裡added指向的是version_set::builder裡面的levels。 const FileSet* added = levels_[level].added_files; // 先預留了那麼多空間,實際上可能是用不到那麼多的。 v->files_[level].reserve(base_files.size() + added->size()); // 這裡去遍歷level的每個added_files. for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { // 在base_files裡面找到所有比added_iter小的文件。 for (auto bpos = std::upper_bound(base_iter, base_end, *added_iter, cmp); base_iter != bpos; ++base_iter) { // 嘗試把base_iter加到v_->files裡面。 // 前提是不在刪除的列表裡面。 // 注意:base_iter是不斷向前走的。所以這裡並不會出現一個base_iter文件 // 多次被添加的情況。 // MaybeAddFile(v, level, *base_iter); } MaybeAddFile(v, level, *added_iter); } // Add remaining base files for (; base_iter != base_end; ++base_iter) { MaybeAddFile(v, level, *base_iter); } } } void MaybeAddFile(Version* v, int level, FileMetaData* f) { if (levels_[level].deleted_files.count(f->number) > 0) { // 如果這個文件要被刪除掉,那麼就不加到files_裡面。 // File is deleted: do nothing } else { std::vector<FileMetaData*>* files = &v->files_[level]; // level0的文件是可能存在overlap的 // level1~7的則不可能 if (level > 0 && !files->empty()) { // Must not overlap assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest, f->smallest) < 0); } f->refs++; files->push_back(f); } }

Finalize

當把所有的結果都保存到了一個Version之後。接下來就可以算一下這個Version裡面有哪些文件是需要合併的。合併的時候是挑得分最高,也就是最需要合併的文件來進行合併。

void VersionSet::Finalize(Version* v) { // Precomputed best level for next compaction int best_level = -1; double best_score = -1; for (int level = 0; level < config::kNumLevels-1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = v->files_[level].size() / static_cast<double>(config::kL0_CompactionTrigger); } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(v->files_[level]); score = static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level); } if (score > best_score) { best_level = level; best_score = score; } } v->compaction_level_ = best_level; v->compaction_score_ = best_score;}

AppendVersion

當Version穩定下來之後。那麼接下來就是把這個Version放到雙向鏈表中。並且改變current的指向。

Q:如果遇到多線程應該怎麼辦?這個還沒有想明白。

代碼如下:

void VersionSet::AppendVersion(Version* v) { // Make "v" current assert(v->refs_ == 0); assert(v != current_); if (current_ != NULL) { current_->Unref(); } current_ = v; v->Ref(); // Append to linked list v->prev_ = dummy_versions_.prev_; v->next_ = &dummy_versions_; v->prev_->next_ = v; v->next_->prev_ = v;}

推薦閱讀:

谷歌爬蟲主要是用C++開發嗎?
C++基本的知識都有了,但是很少C++解決問題,怎麼提高自己的實踐能力?
初學者如何上手C++?
大家能談談是怎麼學習windbg的嗎?
同一段代碼,為什麼有的編譯器能編譯通過,有的不能?

TAG:LevelDB | C | 源碼閱讀 |