LevelDB源碼解析8. 讀取日誌
書接前文。在寫的時候,是把Slice切分成了一個一個的record。然後按照Block = 32KB的大小對齊的。那麼讀取的時候,應該如何處理?
名詞說明
1. physical record。這個是說在寫入文件的時候的record格式。record並不是能反正一個完整的輸入字元串整體。有可能被截斷。2. 本博客裡面的slice除了是levelDB上的slice,另外一層含義是一個完整的字元串。比如用戶輸出一個78KB長度的字元串需要存入levelDB。這個slice在代碼裡面也可能叫做邏輯record。
開頭的Skip
當需要讀Slice的時候。不會每次都打開文件從頭開始讀。那麼就需要給定一個開始讀的位置。這個位置就叫做initial_offset_。
initial_offset_是在不知曉當前文件指針的情況下給定的。也就是說,文件指針可能已經移動到文件的中間部位置了current_pos.那麼開始讀取的位置就是current_pos + initial_offset_但是current_pos是什麼。這裡並不關心。或者說潛在的假設就是current_pos已經是kBlockSize對齊的?
那麼問題就來。如果給的initial_offset_長度超出了幾個block。那麼就需要把這幾個block跳掉。
跳過block
跳過block其實比較容易處理。給定的一個block大小為32KB。比如當給定的initial_offoset_長度就是64KB的時候。很明顯就是需要跳過兩個block。
如果給定的initial_offset_的長度是65KB的時候。這個時候,就需要跳過2個Block。然後從接下來的那個block裡面的1KB的位置開始讀。
看起來好像很簡單。但是想想一下這種情況
initial_offset_ = 64KB + 32KB - 6bytes
首先應該是需要跳過2個block。然後在餘下來的那個block裡面的偏移量是32KB - 6byte。
這個有點不好。想一想。最後的7個bytes應該是什麼?根據寫入日誌的代碼看來,可能是如下情況:
- record的數據。比如某個record的數據 + header剛好是32KB。並且剛好與block對齊寫入了。
- header。比如某個record的數據+header剛好是32KB - 7bytes大小。那麼餘下來的空間就剛好放著一個header。
- 空數據。根據寫日誌的情況,如果record數據 + header > 32KB - 7 bytes。那麼後下餘下的空間,需要用0來補齊。
如果initial_offset_跳到這<7個bytes了應該怎麼辦?
Case 1. 屬於某個record的數據區。 那麼如果從這裡開始讀, 讀到的數據肯定不是完整的record。 >> 因為只是把一個record的尾巴上的數據讀出來了。 A: 應該把這個record跳掉,去讀接下來的一個record。 接下來的record在一個新的block上。也就是去讀 下一個blockCase 2. header.如果遇到的是header。 >>從當前位置開始讀,也不能讀出一個完整的header的。 A: 應該把這個header跳掉。然後去讀下一個record. 接下來的record剛好是在一個新的block上。Case 3. 遇到空串。肯定應該跳到下一個block啊。
跳過塊的代碼如下:
bool Reader:: SkipToInitialBlock() { // 塊中偏移 const size_t offset_in_block = initial_offset_ % kBlockSize; // 需要跳過的塊的位置 // 這個變數的意思是說,後面在讀的時候,要讀的塊的開頭地址是什麼? // uint64_t start_read_block_location = xx. uint64_t block_start_location = initial_offset_ - offset_in_block; // Dont search a block if wed be in the trailer // 如果給定的初始位置的塊中偏移 // 剛好掉在了尾巴上的6個bytes以內。那麼 // 這個時候,應該是需要直接切入到下一個block的。 if (offset_in_block > kBlockSize - 6) { block_start_location += kBlockSize; } // 注意end_of_buffer_offset的設置是塊的開始地址。 end_of_buffer_offset_ = block_start_location; // Skip to start of first block that can contain the initial record if (block_start_location > 0) { Status skip_status = file_->Skip(block_start_location); if (!skip_status.ok()) { ReportDrop(block_start_location, skip_status); return false; } } return true;}
跳過record
在讀代碼之前,需要想到要解決的問題是什麼。前面雖然跳過整數個的Block。也考慮了initial_offset_掉落到block尾巴裡面的6個byte的時候的處理方式。但是,還有兩個問題沒有解決。
1. initial_offset_並不是block的整數倍。雖然跳過了block=32KB的整數倍。但是這個餘數應該如何處理?後面還有別的什麼作用沒有?2. skip的時候,跳過的都是block的整數倍。但是有可能存在這種較大的Slice。一個slice就是 N個block。有可能跳過block之後,還是處在slice的中間。這個時候,也是讀不了一個完整 的slice數據。那麼,如何跳動,保證後面讀一個完整的slice數據?
所以想到,initial_offset_可能還有用。還需要保證後面讀數據的時候,需要從一個完整的slice開始讀。
跳過slice的邏輯其實也比較簡單。
讀入一個record,如果發現類型是kMiddleType。那麼就跳過這個record。
代碼如下:
初始值resyncing_(initial_offset > 0) 構造函數裡面的設置// 這裡可以反過來。就是一開始的時候,把需要跳掉的部分直接跳過bool Reader::ReadRecord(Slice* record, std::string* scratch) { // .. while (true) { // 這裡是讀一個物理上的record。並不是一個完整的slice信息。 const unsigned int record_type = ReadPhysicalRecord(&fragment); // resyncing_主要是指需要跳過的部分。 // 跳過的時候是跳過一個完整的record. if (resyncing_) { if (record_type == kMiddleType) { continue; } else if (record_type == kLastType) { resyncing_ = false; continue; } else { // 其他的情況是不需要continue的。直接到下面的switch resyncing_ = false; } } // 到這裡的時候,讀取的就是一個完整的slice的開頭了。 // 所以這裡才開始正常的處理。 }
這段代碼從實現上來說,並不是特別優美。個人覺得應該是把這些預先需要跳過的部分放到構造函數裡面處理。提前先跳過了比較好。這樣就不用著每次ReadRecord的時候來處理一下是不是要跳過block/record。
這種實現也是有點無耐的。因為skip這個動作本身是可能失敗的。在構造函數裡面操作,也不太好拋出異常或者錯誤。
出錯處理
在讀record的地方,有太多的出錯處理。最好一開始就把這些代碼清除掉。看起代碼來能夠更加輕鬆點。
正確的記錄應該如下:
<firstRecord, lastRecord><firstRecord, middleRecord, lastRecord><fullRecord>
無外乎也就是這幾種情況交錯。可能還會有多個middleRecord的情況。
但是一個讀入firstRecord之後,就應該知道是「讀在某個record的中間」。
- 讀到lastRecord之後。去解除這個狀態。
- 「讀在中間的狀態」,不應該遇到firstRecord, fullRecord。也就是說,前面的record還沒有讀完的時候,不應該讀下一個record。
bool in_fragmented_record = false; // 一開始沒有處理讀在中間的狀態。 switch (record_type) { case kFullType: // 如果處理「讀在中間」的狀態。應變報錯。 if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(1)"); } } //.. case kFirstType: // 「讀在中間」的狀態也不應該遇到kFirstType。 // 也就是不應該讀到下一個record的開頭。 if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(2)"); } } //.. case kMiddleType: // 當遇到middle type的時候。必然是「讀在中間」狀態。如果不是,報錯!! if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } case kLastType: // 讀到lastType的時候,也必然是處在「讀在中間」的狀態。如果不是,報錯!! if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } case kEof: // 文件都讀結束了,還處在「讀在中間」狀態。說明寫入的時候沒有寫入一個完整的 // record。沒辦法,直接向客戶端返回沒有完整的slice數據了。 if (in_fragmented_record) { // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; dont // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; case kBadRecord: // 如果讀到了壞的record,又剛好處理「讀在中間」的狀態。那麼返回出錯!! // 如果這個壞掉的record不是在讀的record範圍裡面。直接返回讀失敗。 if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; // 不應該有其他type。直接報錯!! default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), buf); in_fragmented_record = false; scratch->clear(); break; } } }
ReadRecord
接下來看一下ReadRecord的情況。這裡刪除了前面已經分析過的代碼,比如跳掉block/slice的部分。以及出錯處理的部分。
看下面這段代碼的時候,需要注意的就是scratch變數。這個變數的含義就是
當文件中的record是<firstRecord, middleRecord, lastRecord>的時候。scratch需要做一個緩衝區,把一個一個record的數據緩存起來。最後拼接成一個大的 Slice返回給客戶端。
讀取代碼如下:
// 這裡可以反過來。就是一開始的時候,把需要跳掉的部分直接跳過bool Reader::ReadRecord(Slice* record, std::string* scratch) { // 反正傳進來,都是會被修改的 // 直接清除掉 scratch->clear(); record->clear(); // Record offset of the logical record that were reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0; Slice fragment; while (true) { const unsigned int record_type = ReadPhysicalRecord(&fragment); // ReadPhysicalRecord may have only had an empty trailer remaining in its // internal buffer. Calculate the offset of the next physical record now // that it has returned, properly accounting for its header size. // 這裡記錄下讀入的物理record的起始位置 uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size(); switch (record_type) { case kFullType: prospective_record_offset = physical_record_offset; // scratch就是用來緩存<firstRecord, middleRecord, lastRecord> // 不斷地把這些record的數據區放到scratch裡面緩存並且拼接起來。 // 如果讀到的是一個full type的record,還拼接啥啊。 scratch->clear(); *record = fragment; // 記錄下最後一個record的偏移量 last_record_offset_ = prospective_record_offset; return true; case kFirstType: prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: scratch->append(fragment.data(), fragment.size()); break; case kLastType: scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; return true; case kEof: return false; case kBadRecord: break; } } return false;}
如何讀取物理記錄
前面ReadPhysicalRecord都是知道讀入了一個物理記錄。那麼這個物理記錄是如何讀取的呢?
首先需要想到,寫入的時候是按照32KB一個block來寫入。在讀取的時候,就可以32KB來讀入了。所以在讀入的時候,肯定是以32KB為單位來讀的。下面就是讀入32KB block的代碼。為了方便閱讀,把代碼結構做了一定的調整。
unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { // 如果發現buffer的大小已經小於kHeaderSize了 if (buffer_.size() < kHeaderSize) { if (eof_) { // 注意:如果buffer_是非空的。我們有一個truncated header在文件的尾巴。 // 這可能是由於在寫header時crash導致的。 // 與其把這個失敗的寫入當成錯誤來處理,還不如直接當成EOF呢。 // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the // middle of writing the header. Instead of considering this an error, // just report EOF. buffer_.clear(); return kEof; } // 如果還沒有遇到結束 // 上一次的讀是一個完整的讀。那麼可能這裡有一點尾巴需要處理。 // Last read was a full read, so this is a trailer to skip // 這裡直接清空緩衝區 buffer_.clear(); // 這裡是讀kBlockSize個字元串到buffer_裡面。 Status status = file_->Read(kBlockSize, &buffer_, backing_store_); // 先把偏移處理了 end_of_buffer_offset_ += buffer_.size(); // read failed. if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); eof_ = true; return kEof; } // read success. if (buffer_.size() < kBlockSize) { eof_ = true; } continue; } // .. 進行後面的處理 // 接下來的代碼部分:從一個block裡面取出一個完整的record}
當成功讀入一個Block之後。接下來需要處理的就是從這個Block裡面取出一個完整的record。
完整的代碼如下:
unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { // 前面分析過代碼了 // .....{前面的代碼片段} // 已經把block的內容放到了buffer_裡面。 // Parse the header // 這麼一長串的代碼都是在解析頭部 // 應該寫個簡單的函數嘛。 const char* header = buffer_.data(); const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; const unsigned int type = header[6]; const uint32_t length = a | (b << 8); // 如果頭部記錄的數據長度比實際的buffer_.size還要大。那肯定是出錯了。 if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); if (!eof_) { ReportCorruption(drop_size, "bad record length"); return kBadRecord; } // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. // Dont report a corruption. return kEof; } // 如果是zero type。那麼返回Bad Record // 這種情況是有可能的。比如寫入record到block裡面之後。可能會遇到 // 還餘下7個bytes的情況。這個時候只能寫入一個空的record。 if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. buffer_.clear(); return kBadRecord; } // 檢查crc32 // Check crc if (checksum_) { uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { // Drop the rest of the buffer since "length" itself may have // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. size_t drop_size = buffer_.size(); buffer_.clear(); ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } // 移除頭部 buffer_.remove_prefix(kHeaderSize + length); // Skip physical record that started before initial_offset_ if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; } *result = Slice(header + kHeaderSize, length); return type; }}
Record的起始位置
// 移除當前的record佔用的緩衝區 buffer_.remove_prefix(kHeaderSize + length); // Skip physical record that started before initial_offset_ // f->read()..之後有end_of_buffer_offset_ += buffer_.size(); // 但是這裡end_of_buffer_offset_ - buffer_.size() // 減了之後,減去的不是剛讀出來的數據塊的大小。比如32KB。 // 這個時候的buffer_size.指的是未讀的record的數據大小。 // end_of_buffer_offset_ - buffer_.size就是已經讀掉的緩衝區的指針的位置。 // end_of_buffer_offset_ // - buffer_.size() // - kHeaderSize // - length // 這裡得到的就是剛讀出來的record的起始位置。 if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; }
同樣的道理
bool Reader::ReadRecord(Slice* record, std::string* scratch) { // 反正傳進來,都是會被修改的 scratch->clear(); record->clear(); bool in_fragmented_record = false; // Record offset of the logical record that were reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0; Slice fragment; while (true) { //....... uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
physical_record_offset怎麼來的,想必也清楚了。
end_of_buffer_offset_
這個變數一開始我是沒有看懂的。這裡需要畫個圖標記一下
當給定一個文件的時候。然後給了一個initial_offset_。這個時候,假設我們有一個隨意申請的緩衝區。但是這個緩衝區的增長是以 32KB為單位的。那麼這個緩衝區這個時候的結束位置就是
end_of_buffer_offset
從這個圖也需要記住:
- log reader/writer都是假裝file fp當前位置,就是我工作的起始位置。至於是不是文件真正的起始位置,並不關心。
所以,initial_offset_指的就是當前我這個reader工作時,起手時的偏移量。
紫色部分為緩衝區(並且假設這個緩衝與文件)。並且32KB為單位來增長。那麼在開始必須調整緩衝區的offset。
bool Reader:: SkipToInitialBlock() { // 塊中偏移 const size_t offset_in_block = initial_offset_ % kBlockSize; // 需要跳過的塊的位置 // 這個變數的意思是說,後面在讀的時候,要讀的塊的開頭地址是什麼? // uint64_t start_read_block_location = xx. uint64_t block_start_location = initial_offset_ - offset_in_block; // Dont search a block if wed be in the trailer // 如果給定的初始位置的塊中偏移 // 剛好掉在了尾巴上的6個bytes以內。那麼 // 這個時候,應該是需要直接切入到下一個block的。 if (offset_in_block > kBlockSize - 6) { block_start_location += kBlockSize; } // 注意end_of_buffer_offset的設置是塊的開始地址。 end_of_buffer_offset_ = block_start_location; // Skip to start of first block that can contain the initial record if (block_start_location > 0) { Status skip_status = file_->Skip(block_start_location); //.. } return true;}
經過這麼一調整,文件指針就與end_of_buffer_offset_平行了。
注意,移動之後, current文件指針與end_of_buffer_offset_就平行了。(但是值不相等)。
並且initial_offset_沒有對齊哦(這種情況是可能的)。
接下來假設讀入了32KB,也就是一個block。並且取走了一個record。
但是要注意的是,當計算
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size(); 或者 if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; }
的時候,buffser_.size()已經不是黃色區域(大部分時候是32KB)的大小了。而是紫色區域未讀數據的大小。紅色區域就是剛讀到了length + kHeaderSize大小。綠色部分為已經讀取的record的大小。
last_record_offset_
最後讀的一個完整的用戶數據的偏移量
只有讀完<firstRecord, middleRecord, lastRecord> 之後,才會設置last_record_offset_或者<fullRecord>
由於physical_record_offset記錄的是一個物理record的偏移量(不是在文件裡面的偏移量,而是在當前這個Reader裡面的偏移量)。那麼這個物理偏移量可能指向的位置是:
<firstRecord, middleRecord, lastRecord> 1 2 3
分別可能指向1,2,3開始的位置。
因此,正確的情況,應該是只在<lastRecord>/<fullRecord>這裡更新last_record_offset_。
推薦閱讀: