etcd-raft日誌管理
說明
日誌是實現一致性協議的最重要手段。客戶對應用發起的狀態更新請求首先都會被記錄在日誌中,待主節點將更新日誌在集群多數節點之間完成同步以後,便將該日誌項內容在狀態機中進行應用,進而便完成了一次客戶的更新請求。
從上面的過程描述來看,日誌是一致性協議數據同步的核心。因此,如何管理節點上的日誌也是一個非常關鍵的技術,本文,我們就來研究etcd-raft中的日誌管理實現。
需要說明的是:由於實現的關係,etcd-raft的核心協議處理層不會處理日誌追加的邏輯,以保持核心協議實現的簡潔性,但是etcd提供了一個WAL(Write-Ahead-Log)的日誌庫,日誌追加功能由應用調用該庫完成。本文我們分析的重點在於:
- 應用如何調用WAL庫完成日誌追加
- WAL庫如何管理日誌
- WAL如何與協議核心相互配合完成日誌內容的同步
數據結構
好的實現應該是有非常清晰的數據結構,並且每個結構之間職責明確。
WAL
WAL管理所有的更新日誌,主要處理日誌的追加、日誌文件的切換、日誌的回放等等。
type WAL struct {n dir string n dirFile *os.File n metadata []byte n state raftpb.HardState n start walpb.Snapshot n decoder *decoder n readClose func() error n mu sync.Mutex n enti uint64 n encoder *encoder nn locks []*fileutil.LockedFile n fp *filePipelinen}n
raftLog
type raftLog struct {n storage Storage n unstable unstablen committed uint64n applied uint64n}n
由於raft協議的核心工作是在集群節點之間進行日誌複製,因此,在etcd-raft實現的核心協議處理層也必須了解當前日誌複製情況,這個結構便是raftLog。該結構被集成在核心數據結構raft中。
raftLog中主要記錄了當前日誌的狀態,包括:
committed:
applied:unstable:日誌項內存緩存,便於集群節點之間進行日誌複製storage:也是日誌項內存緩存,但是不明白其具體作用是什麼,與unstable有什麼區別
unstable
type unstable struct {n snapshot *pb.Snapshot n entries []pb.Entry n offset uint64n}n
unstable在使用內存數組維護所有的更新日誌項。對於Leader節點來說,它維護了客戶端的更新請求對應的日誌項;對於Follower節點而言,它維護的是Leader節點複製的日誌項。
無論是Leader還是Follower節點,日誌項首先都會被存儲在unstable結構,然後再由其內部狀態機將unstable維護的日誌項交給上層應用,由應用負責將這些日誌項進行持久化並轉發至系統其它節點。這也是為什麼它被稱為unstable的原因:在unstable中的日誌項都是不安全的,尚未持久化存儲,可能會因意外而丟失。
Storage && MemoryStorage
type Storage interface {n InitialState() (pb.HardState, pb.ConfState, error)n Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)n Term(i uint64) (uint64, error)n LastIndex() (uint64, error)n FirstIndex() (uint64, error)n Snapshot() (pb.Snapshot, error)n}nntype MemoryStorage struct {n sync.Mutex n hardState pb.HardState n snapshot pb.Snapshot n ents []pb.Entry n}n
Storage和MemoryStorage是介面和實現的關係
與unstable一樣,Storage也被嵌入在raftLog結構中。需要說明的一點是:將日誌項追加到Storage的動作是由應用完成的,而不是raft協議核心處理層。目前尚不理解Storage存在的意義是什麼,與unstable到底有什麼區別?物理存儲
所有的日誌項最終都被追加存儲在WAL文件中。
日誌項類型
日誌項有多種類型,見下列表:
metadataType int64 = iota + 1 nentryTypenstateTypencrcTypensnapshotTypen
- metadataType:這是一個特殊的日誌項,被寫在每個WAL文件的頭部,具體數據好像是可以由應用自定義。
- entryType:應用的更新數據,也是日誌中存儲的最關鍵數據;
- stateType:代表日誌項中存儲的內容是Snapshot;
- crcType:前一個WAL文件裡面的數據的crc,也是WAL文件的第一個記錄項
- snapshotType:當前Snapshot的索引{term, index},即當前的Snapshot位於哪個日誌記錄,不同於stateType,這裡只是記錄Snapshot的索引,而非snapshot的數據。
WAL文件物理格式
每個日誌項有以下四個部分組成
- type:日誌項類型,在上面詳細描述過;
- crc:校驗和
- data:根據日誌項類型存儲的實際數據也不盡相同,如snapshotType類型的日誌項存儲的是snapshot的日誌索引,crcType類型的日誌項則無數據項,其crc欄位便充當了數據項
- padding:為了保持日誌項8位元組對其而填充的無意義內容
這裡的crc欄位不是很明白:看實現好像crc並非是日誌項內容的校驗和,而是該日誌文件中當前日誌項以前的所有日誌項的校驗和。這樣設計的好處是什麼呢?
關鍵流程
WAL初始化
etcd的wal庫提供了初始化方法,應用需要顯示調用初始化方法來完成日誌初始化功能,初始化方法主要有兩個API:Create與Open:
func Create(dirpath string, metadata []byte) (*WAL, error) {n if Exist(dirpath) {n return nil, os.ErrExistn }n tmpdirpath := filepath.Clean(dirpath) + ".tmp" n if fileutil.Exist(tmpdirpath) {n if err := os.RemoveAll(tmpdirpath); err != nil {n return nil, errn }n }nn if err := fileutil.CreateDirAll(tmpdirpath); err != nil {n return nil, errn }n p := filepath.Join(tmpdirpath, walName(0, 0))n f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)n if err != nil {n return nil, errn }n if _, err = f.Seek(0, io.SeekEnd); err != nil {n return nil, errn }nn if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {n return nil, errn }n w := &WAL{n dir: dirpath,n metadata: metadata,n }n w.encoder, err = newFileEncoder(f.File, 0)n if err != nil {n return nil, errn }nn w.locks = append(w.locks, f)n if err = w.saveCrc(0); err != nil {n return nil, errn }n if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {n return nil, errn }nn if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {n return nil, errn }n if w, err = w.renameWal(tmpdirpath); err != nil {n return nil, errn }n pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))n if perr != nil {n return nil, perrn }nn if perr = fileutil.Fsync(pdir); perr != nil {n return nil, perrn }n if perr = pdir.Close(); err != nil {n return nil, perrn }n return w, nil n}n
Create做的事情也比較簡單:
- 創建WAL目錄,用於存儲WAL日誌文件以及snapshot索引;
- 預分配第一個WAL日誌文件,默認是64MB,使用預分配機制可以提高寫入性能;
- 其他,包括使用臨時目錄並最終重命名為正式目錄名等trick,可以忽略。
Open則是在Create完成以後被調用,主要是用於打開WAL目錄下的日誌文件,Open的主要作用是找到當前Snapshot以後的所有WAL日誌,這是因為當前的Snapshot之前的日誌我們不再關心了,因為日誌的內容肯定都已經被更新至Snapshot了,這些日誌也是在後面日誌回收中可以被刪除的部分。
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {n w, err := openAtIndex(dirpath, snap, true)n if err != nil {n return nil, errn }n if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {n return nil, errn }n return w, nil n}n
其中最重要的就是openAtIndex了,該函數用於尋找snap以後的日誌文件並打開。
WAL追加日誌項
日誌項的追加是通過調用etcd的wal庫的Save()方法實現,具體如下:
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {n w.mu.Lock()n defer w.mu.Unlock()n // short cut, do not call sync n if raft.IsEmptyHardState(st) && len(ents) == 0 {n return nil n }n mustSync := raft.MustSync(st, w.state, len(ents))n for i := range ents {n if err := w.saveEntry(&ents[i]); err != nil {n return errn }n }n if err := w.saveState(&st); err != nil {n return errn }n curOff, err := w.tail().Seek(0, io.SeekCurrent)n if err != nil {n return errn }n if curOff < SegmentSizeBytes {n if mustSync {n return w.sync()n }n return nil n }n return w.cut()n}n
該函數的核心是:
- 調用saveEntry()將日誌項存儲到WAL文件中;
- 如果追加後日誌文件超過既定的SegmentSizeBytes大小,需要調用w.cut()進行WAL文件切換,即:關閉當前WAL日誌,創建新的WAL日誌,繼續用於日誌追加。
saveEntry 日誌項數據進行編碼並追加至WAL文件,存儲在WAL文件中的日誌項有多種類型,對於普通的應用更新請求,類型為entryType。
而具體的編碼寫入方法則由專門的編碼結構實現,稱為struct encoder,該結構實現了日誌項的編碼和將日誌項編碼後的數據寫入日誌文件的功能。感興趣的讀者可以結合上面的日誌項結構自行閱讀。cut 則實現了WAL文件切換的功能,每個WAL文件的預設大小是64MB,一旦超過該大小,便創建新的WAL文件,這樣做的好處是便於WAL文件的回收,我們在後面會說明。
WAL日誌回放
WAL的日誌回放的主要流程也是由應用來完成,以etcd-raft自帶的示例應用為例:
- 載入最新的Snapshot
- 打開WAL文件目錄,根據上面的描述,這裡主要目的是找到最新Snapshot以後的日誌文件,這些是需要被回放的日誌;
- 在2的基礎上讀出所有的日誌項(會不會日誌項太多?導致內存裝不下?)
- 將3讀出的日誌項應用到內存中,這裡的內存指的是上面我們說過的Storage,給raft協議核心處理層提供的內存日誌存儲中,這樣,raft核心協議處理層就可以將日誌同步給其他節點了。
所以,對於WAL日誌回放功能,底層的WAL日誌庫只需要給上層應用提供一個讀取所有日誌項的功能即可,這由ReadAll()實現。ReadAll的實現感興趣的同學可以自行閱讀。
WAL日誌回收
TODO: 沒有看到日誌回收過程,放在Snapshot總說明
Leader日誌追加
Leader節點的日誌是由應用根據客戶端請求生成,更進一步說,是應用根據客戶端的更新請求而生成。需要說明的是:這裡的客戶端請求不僅僅是來自於客戶端,還有可能是來自於集群其他Follower節點對客戶端的請求轉發,因為客戶端可能無法正確獲知當前集群的Leader是誰。
之前的文章「etcd-raft示例分析」(https://zhuanlan.zhihu.com/p/29180575)中我們已經具體描述過應用是如何一步步將客戶端請求轉變至raft日誌項並交由協議處理層處理。
協議的核心處理層會將更新日誌存儲在內存數組之中(前面描述過的unstable)。當然,這裡還不算結束。unstable中的日誌最終還需要被寫入WAL日誌、複製到集群Follower等一些列處理。
為此,raft協議核心處理層抽象了一個Node結構。Node在啟動時會創建後台協程。該後台協程需要處理很多任務,其中之一就是為上層應用準備好Ready:這是底層的協議處理層為上層應用層準備好的任務,保存了當前需要應用處理的日誌項以及當前協議的狀態信息:例如當前的commit點、snapshot信息等。上層應用通過Node.Ready()介面獲取此類任務並作如下處理:
- 將Ready中的日誌項寫入WAL日誌;
- 將Ready中已commit的日誌項應用至狀態機;
- 通過網路傳輸模塊將消息傳輸至集群其他節點,如果是從節點,這裡應該無需向其他節點發送消息了
Leader向Follower推送日誌
前面說過,當應用層有更新請求(MsgProp)送給底層raft協議處理層時,協議處理層便會驅動自己的狀態機前進並觸發向Follower節點進行日誌複製。Leader向Follower進行日誌同步的函數是bcastAppend:
func (r *raft) bcastAppend() {n for id := range r.prs {n if id == r.id {n continue n }n r.sendAppend(id)n }n}n
因此,日誌複製是Leader節點在收到應用的更新請求後便會向Follower發起的,具體同步的內容是內存中unstable結構中維護的更新日誌。
func (r *raft) sendAppend(to uint64) { n pr := r.prs[to]n if pr.IsPaused() {n return n }n m := pb.Message{}n m.To = to n term, errt := r.raftLog.term(pr.Next - 1)n ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)nn // send snapshot if we failed to get term or entriesn if errt != nil || erre != nil { n ......n } else {n m.Type = pb.MsgAppn m.Index = pr.Next - 1 n m.LogTerm = termn m.Entries = entsn m.Commit = r.raftLog.committedn if n := len(m.Entries); n != 0 { n switch pr.State {n case ...n }n }n r.send(m)n }n}n
sendAppend 向特定的Follower(由傳入參數的to代表)發送日誌同步命令。該方法首先會找到該Follower上一次已同步的日誌位置(pr.Next-1),然後從raftLog中獲取該位置以後的日誌項,當然每次同步的數量不宜太多,由maxMsgSize限制。當然,如果無法從raftLog獲取到想要的日誌項,此時需要考慮發送Snapshot,這是因為對應的日誌項可能由於已經被commit而丟棄了(向新加入節點同步日誌的時候可能會出現這種情況),這裡我們暫不作細緻討論,後續討論Snapshot設計的時候再去探究。
Leader收到Follower對於日誌複製消息MsgApp的響應後:
func stepLeader(r *raft, m pb.Message) {n ...n switch m.Type {n case pb.MsgAppResp:n pr.RecentActive = true n // 如果Follower拒絕了同步消息 n if m.Reject {n ...n } else {n oldPaused := pr.IsPaused()n if pr.maybeUpdate(m.Index) {n switch {n case xxx:n ...n }n if r.maybeCommit() {n r.bcastAppend()n } else if oldPaused {n ... n }n }n }n }n}n
這裡的處理也比較簡單:主要是調用r.maybeCommit(),看看是否可以推進commit點,如果可以的話,繼續向Follower發送日誌同步消息。commit點的推進也比較簡單,只是簡單地將raftLog中的commit位置置為新的值即可。
func (l *raftLog) commitTo(tocommit uint64) {n if l.committed < tocommit {n if l.lastIndex() < tocommit {n ...n }n l.committed = tocommitn }n}n
Follower日誌追加
上面我們討論了Leader的日誌複製和同步響應處理流程,接下來我們考察下Follower節點在收到Leader的日誌同步消息時的處理。
Follower節點的日誌追加過程與Leader節點完全一致,不同的是日誌來源:Leader節點的日誌來自於應用的更新請求(MsgProp),而Follower的日誌則是來自於Leader的日誌複製消息(MsgApp)。
至於etcd-raft的Follower如何接受Leader節點的消息並將該消息進一步轉發至底層核心的raft協議處理層,我們已經在前面的「etcd-raft網路傳輸組件實現分析」章節有了比較清晰的描述,不熟悉的朋友可以移步至 https://zhuanlan.zhihu.com/p/29207055
Leader節點的日誌複製消息終進入Follower節點的raft核心協議處理層,更具體的說,是進入了函數 stepFollower:
func stepFollower(r *raft, m pb.Message) {n switch m.Type {n case pb.MsgProp:n ...n case pb.MsgApp:n r.electionElapsed = 0 n r.lead = m.Fromn r.handleAppendEntries(m)n }n}nnfunc (r *raft) handleAppendEntries(m pb.Message) {n // 如果消息的序號在本地已經被提交了,拒絕消息 n if m.Index < r.raftLog.committed {n r.send(pb.Message{To: m.From,n Type: pb.MsgAppResp,n Index: r.raftLog.committed})n return n }n // 調用raftLog.maybeAppend n if mlastIndex, ok := r.raftLog.maybeAppend(...); ok {n r.send(pb.Message{To: m.From,n Type: pb.MsgAppResp, n Index: mlastIndex})n } else {n r.send(pb.Message{To: m.From,n Type: pb.MsgAppResp,n Index: m.Index,n Reject: true,n RejectHint: r.raftLog.lastIndex()})n }n}nnfunc (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {n if l.matchTerm(index, logTerm) {n lastnewi = index + uint64(len(ents))n ci := l.findConflict(ents)n switch {n case ci == 0:n case ci <= l.committed:n .....n default:n offset := index + 1 n l.append(ents[ci-offset:]...)n }n l.commitTo(min(committed, lastnewi))n return lastnewi, true n }n return 0, false n}nnfunc (l *raftLog) append(ents ...pb.Entry) uint64 {n if len(ents) == 0 {n return l.lastIndex()n }n if after := ents[0].Index - 1; after < l.committed {n panic(...)n }nn l.unstable.truncateAndAppend(ents)n return l.lastIndex()n}n
從上面的流程跟蹤下來看,與Leader一樣,Follower節點的數據最終也是被寫入了日誌模塊的unstable日誌中,其實就是被追加至內存中的日誌項數組。
應用對unstable日誌處理
前面說過,主從節點上的日誌首先被存儲在內存的unstable中,但這些unstable中的日誌項最終需要被應用獲取到並作進一步處理。
協議處理層的後台任務會將unstable中的日誌項以及協議狀態信息等打包成Ready結構塞進一個管道readyc:
func (n *node) run(r *raft) {n for {n if advancec != nil {n ... n } else {n rd = newReady(r, prevSoftSt, prevHardSt)n if rd.containsUpdates() {n readyc = n.readycn } else {n readyc = nil n }n }n select {n case readyc <- rd:n ...n case xxx:n }n }n}n
應用通過協議層抽象的Node.Ready()獲取保存Ready的管道:
func (n *node) Ready() <-chan Ready { return n.readyc }n
應用對Ready的標準處理方法如下:
case rd := <-rc.node.Ready():n rc.wal.Save(rd.HardState, rd.Entries)n if !raft.IsEmptySnap(rd.Snapshot) {n rc.saveSnap(rd.Snapshot)n rc.raftStorage.ApplySnapshot(rd.Snapshot)n rc.publishSnapshot(rd.Snapshot)n }n rc.raftStorage.Append(rd.Entries)n rc.transport.Send(rd.Messages)n if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {n rc.stop()n returnn }n rc.maybeTriggerSnapshot()n rc.node.Advance()n
關鍵處理流程:
- 將日誌項寫入WAL;
- 處理Snapshot,如果有的話;
- 追加至Storage(這是幹什麼?);
- 消息發送至集群其他節點,這是對Leader而言的;
- 將已提交的日誌應用至狀態機(publishEntries)。
總結
從上面的etcd-raft日誌模塊實現分析來看,我們總結如下結論:
- 日誌項會被存儲在三個地方,按照其出現的順序分別為:unstable、WAL、storage
- unstable維護協議層的日誌項,這也是raft進行日誌複製的數據源泉;
- WAL負責日誌項的持久化存儲;
- storage中存儲日誌項目的不明
- 應用負責串聯這些日誌存儲模塊。
推薦閱讀:
※一致性相關知識的索引
※raft 經典場景分析
※事件和時間:Time, Clocks, and the Ordering of Events in a Distributed System 讀後感