LevelDB源碼解析9. WAL LOG讀取

準備

前面都在講設計邏輯。這裡開始介紹一些事務。比如打開DB的時候是怎麼樣的一個流程。

這個時候,應該逐步調試之前的那個程序app_test.cc

#include "leveldb/db.h"#include <cassert>#include <iostream>using namespace std;using namespace leveldb;int main() { leveldb::DB *db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "testdb", &db); assert(status.ok()); status = db->Put(WriteOptions(), "KeyNameExample", "ValueExample"); assert(status.ok()); string res; status = db->Get(ReadOptions(), "KeyNameExample", &res); assert(status.ok()); cout << res << endl; delete db; return 0;}

調試設置

{ "version": "0.2.0", "configurations": [ { "name": "(lldb) Launch", "type": "cppdbg", "request": "launch", "program": "${workspaceRoot}/build/app_test", "args": [], "stopAtEntry": false, "cwd": "${workspaceRoot}", "environment": [], "externalConsole": true, "MIMode": "lldb" } ]}

注意參考相應的調試設置。

首先在這裡,只會關心如何打開資料庫。也就是如下代碼:

leveldb::DB *db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "testdb", &db); assert(status.ok());

什麼時候刪除WAL LOG

回想之前的架構

整個leveldb架構

.log文件就是我們平時說的WAL LOG文件。有的博文會把這個文件叫做binlog。想一想這個問題:

什麼時候可以刪除舊有的xx.log文件。

我們假設這種情況:

  • 用戶在put<key,value>之後。這個請求被levelDB接收到之後。會把這個請求持久化到WAL LOG裡面。
  • 接著把key的值更新到內存裡面。

那麼這個時候,是否可以把WAL LOG刪除掉呢?

當然不行啦。因為如果這個時候,DB掛掉了。機器掉電了。

更新到內存裡面的數據就不存在了。用戶寫入的<key, value>就不存在了。這種情況會導致數據丟失。所以不能在更新到內存不管是memtable, immutable裡面。都是不能立馬刪除WAL LOG。那麼時候可以刪除LOG呢?

可以刪除的時候,也就是只能在內存裡面的數據,比如memtable/immutable裡面的數據刷寫到level0之後。這個時候就可以把「刷寫到level0"對應的這部分日誌給刪除掉了。

WAL LOG的作用

前文說過WAL LOG的作用。就是當DB服務掛掉的時候,可以快速恢復。在重新打開DB的時候,就需要考慮一下如何恢復了。

可以想像一下這個流程。

  1. 找到WAL LOG
  2. 按照順序,一個一個地把WAL LOG重放一遍
  3. DB恢復完成

這個順序如何保證?在LevelDB裡面非常簡單,每次要寫一個WAL LOG的時候,都會去申請一個數字。最簡單的辦法,當然是按照遞增的順序把這個數字分發出去。

WAL LOG的文件名

說到恢復,總得知道什麼樣的文件是 WAL LOG吧。

std::string LogFileName(const std::string& dbname, uint64_t number) { assert(number > 0); return MakeFileName(dbname, number, "log");}static std::string MakeFileName(const std::string& dbname, uint64_t number, const char* suffix) { char buf[100]; snprintf(buf, sizeof(buf), "/%06llu.%s", static_cast<unsigned long long>(number), suffix); return dbname + buf;}

重放 WAL LOG

那麼這裡就看一下是如何執行前面講到的3步。

// VersionEdit和save_manifest都是傳進來,然後要被修改的。// 主要是C語言不支持多個返回值。Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) { // 如果要進行操作,必須要保證鎖是獲得的。 mutex_.AssertHeld(); // 忽略CreateDir時遇到的錯誤。 // 這個時候可能遇到的錯誤是之前的failed的操作。留下了這個目錄的殘留。 // Ignore error from CreateDir since the creation of the DB is // committed only when the descriptor is created, and this directory // may already exist from a previous failed creation attempt. env_->CreateDir(dbname_); assert(db_lock_ == NULL); // 添加文件鎖 Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); // CurrentFileName = db.dir/CURRENT // 看一下這個相應的文件是否存在,然後根據情況來 // 看一下是否報錯。 if (!env_->FileExists(CurrentFileName(dbname_))) { // .. 各種option裡面相應的出錯處理。比如如果db已經存在就報錯啥的 } //======================================== // 這裡有一大段如何建立version的代碼。後面再介紹 // ======================================= // 通過建立起來的version set可以拿到min log。 // 這個min log也就是當前最大的WAL LOG序號。 // 不是應該叫max log么?為什麼叫min_log? // 根據序號分發來說。這個時候min_log就是最小的可用的WAL LOG序號了。 // 大不了可以把新來的<key, value> append到這個文件後面。 // 或者是重新生成一個WAL LOG。 const uint64_t min_log = versions_->LogNumber(); // prev log主要是記錄了上次db服務正常的時候,正在用的log文件序號。 // 也是為了方便恢復。 const uint64_t prev_log = versions_->PrevLogNumber(); // 這裡應該是到資料庫所在目錄下,把這個目錄裡面的所有的文件都放到filenames // 這個vector裡面。 // 比如testdb目錄下的文件列表是: // 000005.ldb 000011.ldb 000017.ldb 000021.log LOCK LOG.old // 000008.ldb 000014.ldb 000020.ldb CURRENT LOG MANIFEST-000019 std::vector<std::string> filenames; s = env_->GetChildren(dbname_, &filenames); // 那麼嘗試輸出filenames // .,..,000005.ldb,000008.ldb, // 000011.ldb,000014.ldb, // 000017.ldb,000018.log,CURRENT, // LOCK,LOG,LOG.old,MANIFEST-000016, std::set<uint64_t> expected; versions_->AddLiveFiles(&expected); // expected列表就只會羅列文件的序號: // [ 5,8,11,14,17 ] uint64_t number; FileType type; std::vector<uint64_t> logs; // 這裡面存放了所有大於等於當前版本min log的LOG序號。 // 此外,還會把前一個序號pre_log也保存起來。 // 遍歷目錄下面的所有的文件 for (size_t i = 0; i < filenames.size(); i++) { // 根據文件名來決定是什麼樣的類型。 // number是什麼?文件名裡面的數字。 // 如果解析失敗,比如不是logFile,這個時候number = 0. // ParseFileName主要功能就是根據文件名,返回類型以及序號。 // 除WAL LOG使用了序號。其他文件也是使用了序號。 if (ParseFileName(filenames[i], &number, &type)) { expected.erase(number); // 從列表中把日誌文件找出來 // 並且要求number >= min_log或者說number == prev_log // prev_log為0時。number == 0但是這個時候,很有可能type != logFile。 if (type == kLogFile && ((number >= min_log) || (number == prev_log))) logs.push_back(number); } } // 到這裡,基本上logs裡面存放的就是最大的xxx.ldb裡面最大的數字。 // 這個時候主要是為了保證filename和expected裡面的數字是一一對應的!! if (!expected.empty()) { // 出錯處理 } // Recover in the order in which the logs were generated // 排個序 std::sort(logs.begin(), logs.end()); // 按照數字順序恢復 for (size_t i = 0; i < logs.size(); i++) { s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence); // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. // 因為前面已經把>= min_log的序號放到了logs裡面。 // 所以這裡需要把這些較大的序號也設置為已經被使用了。 // 後面在分發的時候,這些序號就不能再發出去了。 versions_->MarkFileNumberUsed(logs[i]); } // ============================= // ... some other part of code. // ============================= return Status::OK();}

雖然看起來非常簡單。但是還是有些細節的地方是沒有介紹完整的。比如

  1. 以前老序號比如00001.ldb, 000002.ldb去哪裡的?什麼時候刪除的?
  2. 單個文件是如何Recovery的。這也就是RecoverLogFile的工作了。

那麼這裡看一下RecoverLogFile。

RecoverLogFile

RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, &max_sequence);// logs[i]就是一個序號// i == logs.size() - 1 標誌是不是最後一個log// 是否需要保存manifest// 最大的seq序號// edit在打開DB的時候生成的一個version edit。

具體代碼如下:刪除了很多出錯處理的代碼。

Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, VersionEdit* edit, SequenceNumber* max_sequence) { mutex_.AssertHeld(); // 這裡利用給定的log_number,生成相應的xxxx.log文件名。然後打開之。 std::string fname = LogFileName(dbname_, log_number); SequentialFile* file; Status status = env_->NewSequentialFile(fname, &file); // 利用打開的文件生成log Reader。這個是前面的博文介紹過的。 // We intentionally make log::Reader do checksumming even if // paranoid_checks==false so that corruptions cause entire commits // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/); // 一堆後面要用的局部變數 // Read all the records and add to a memtable std::string scratch; // 用來做緩衝區。其實沒有什麼特別的作用。 Slice record; // log::reader在read的時候,最有用的是這個record。 WriteBatch batch; // 批量寫。因為leveldb最終寫資料庫的介面都是通過這個批處理完成。 int compactions = 0; // 合併? MemTable* mem = NULL; // skiplist. 也就是memtable. // 從logReader中讀出一個完整的用戶輸入的record. while (reader.ReadRecord(&record, &scratch) && status.ok()) { // 檢查格式 if (record.size() < 12) { // 輸出報錯信息,然後繼續 continue; } // 生成一個批處理 WriteBatchInternal::SetContents(&batch, record); // 如果memtable還是空的 // 這裡生成一個新的memtable. if (mem == NULL) { mem = new MemTable(internal_comparator_); mem->Ref(); } // 把前面生成的批處理任務放到memtable裡面。 status = WriteBatchInternal::InsertInto(&batch, mem); // 批處理操作的時候,每個item都會有相應的序號。 const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + WriteBatchInternal::Count(&batch) - 1; // 更新這個序號 if (last_seq > *max_sequence) { *max_sequence = last_seq; } // 如果寫入的數據太多,要開始生成level0文件了。 if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; status = WriteLevel0Table(mem, edit, NULL); // 由於這裡還是在打開資料庫文件。所以 // 這裡也就不去管什麼memtable. immutable. mem->Unref(); mem = NULL; if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. break; } } } // end of while for every record. delete file; // See if we should keep reusing the last log file. // 如果options裡面指定說要在以前舊的WAL log裡面接著寫。並且這個last_log就是 // 最後一個日誌文件了。 // 並且是需要在沒有發生過compation的情況。 // Q: 如果發生過compaction是不是會刪除舊有的log文件。 if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { uint64_t lfile_size; if (env_->GetFileSize(fname, &lfile_size).ok() && env_->NewAppendableFile(fname, &logfile_).ok()) { log_ = new log::Writer(logfile_, lfile_size); logfile_number_ = log_number; if (mem != NULL) { mem_ = mem; mem = NULL; } else { // mem can be NULL if lognum exists but was empty. mem_ = new MemTable(internal_comparator_); mem_->Ref(); } } } // 當log文件特別多的時候。中間的某些步驟是可能生成mem。並且最後是沒有釋放的。 // 那麼這個時候,可以想辦法把這些WAL LOG轉換成為level 0文件。 // 免得後面在恢復的時候再去重新過非常多的日誌文件。 if (mem != NULL) { // mem did not get reused; compact it. if (status.ok()) { // 如果寫入到了manifest。那麼設置處理結果。 *save_manifest = true; status = WriteLevel0Table(mem, edit, NULL); } mem->Unref(); } return status;}

還存留的問題

1. 以前老序號比如00001.ldb, 000002.ldb去哪裡的?什麼時候刪除的?2. 新的WAL LOG文件是在哪裡生成的?

新的WAL LOG文件

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { // 省略了好多代碼 // 如果恢復OK if (s.ok() && impl->mem_ == NULL) { // Create new log and a corresponding memtable. uint64_t new_log_number = impl->versions_->NewFileNumber(); WritableFile* lfile; // 這裡重新生成lfile. // 所以WAL LOG文件會在這裡重新生成 s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), &lfile); if (s.ok()) { // new_log_number. edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); } } // 省略了好多代碼}

這裡可以看一下new_log_number的申請

// Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_++; }

不過有意思的是這個變數並不是一個atomic的變數。那麼如何保證多線程安全?

Q: 是不是leveldb的多線程是:生成一個公共的db*指針。然後多個線程一起使用? A: 暫時不清楚。後面注意看代碼。

序號是統一管理?

Q: LOG .log .ldb 等文件的序號是統一管理的么?A: 是的。也就是說,不可能存在2.ldb與2.log同時存在的情況。

推薦閱讀:

LevelDB源碼解析7. 日誌格式
ROS導航包源碼學習0 --- 初衷
LevelDB源碼解析8. 讀取日誌
從Chrome源碼看HTTPS

TAG:LevelDB | C | 源碼閱讀 |