Raft 筆記

由於對分散式系統感興趣,在學習 mit 的 6.824 分散式系統課程。實現了 6.824 的 Raft,分享一下我的實現以及思(cai)考(keng)。

本文主要從 Raft 介紹和實現兩個角度組織

Raft 簡介

Raft 是一種分散式一致性 (consesus) 協議,類似的協議還有 Paxos,可以把 Raft 協議看作是 Paxos 協議的變種。Raft 主打的是易理解,其易理解主要在兩方面:

  1. 強化 leader 的作用,日誌只能從 leader 流向 follower
  2. 日誌是連續的,中間沒有空洞

Raft 跟其他所有 quorum-based consesus 協議一樣,一般部署奇數個節點,只要 n/2+1 個節點能夠工作,整個集群都能正常工作 (背後的原理都是 n/2+1 節點做了一件事,n/2+1 幾點做了另一件事,則必定有一個節點同時做了兩件事,進而推導協議的準確性)。

一個 raft 節點可能處於三種狀態:leader,candidate,follower。各個狀態的轉換如下:

raft 使用 term 作為 logical clock,其主要有兩個作用,一是識別過期信息,二是通過限制在每個 term 下只能投票一次,進而保證在每個 term 下只有一個 leader。理解 term 的意義對實現 raft 至關重要。

Raft 可以分為四個部分,分別是

  • leader election
  • log replication
  • log compaction
  • membership change

Leader Election

leader 選取的目的就是選舉出一個 leader,leader 用來接受來自 client 的讀寫請求,並將日誌同步到 follower。

開始時 raft node 處於 follower 狀態,如果 follower 在 election timeout 內沒有收到 leader 的 heartbeat,則這個 follower 認為 leader 掛了(注意 leader 可能並沒有掛,分散式系統下一個節點掛不掛只能由 majority 決定),將 term 加 1 變為 candidate 狀態並廣播 RequestVote 消息請求其他節點投票給自己。一個節點收到 RequestVote 消息會檢測發起請求的 candidate 的 term 是否 >= 自己的 term,candidate 的日誌是否比自己完善,如果都滿足,則會表示贊同,否則拒絕。如果 candidate 收到 majority 的 vote,則表明這個 candidate 有著所有已經提交的日誌,滿足成為 leader 的條件。成為 leader 後,這個 leader 會不斷發送 heartbeat 維持其 leader 的地位。

Log Replication

leader 會接受來自 client 的請求,寫入自己的 log 後會廣播 AppendEntries 給集群中的所有節點,AppendEntries 會包含需要同步給 follower 的日誌項。上面提到的 heartbeat 其實就是日誌項為空的 AppendEntries。當日誌被 majority 節點接收後,則 leader 認為這些日誌是已經 committed,可以應用給上層狀態機。AppendEntries 會帶上 leader 最新 committed 日誌的 index,這樣 follower 收到 AppendEntries 的同時能夠知道哪些日誌是已經 committed 可以應用給上層狀態機。可以發現所有的決定都是 leader 做的,follower 只是被動接受信息,相比 Paxos,這樣做更容易理解協議。

Follower 必定不可能無腦的接受來自 leader 的 AppendEntries,其可能落後 Leader 一些日誌(比如之前在另一個 partition 後來恢復了)或者多餘 leader 一些日誌(比如是上一個 term 的 leader,一些日誌未複製到其他節點這個節點就掛了)。對於這種場景,raft 要求 follower 完整複製 leader 的日誌,而不管其有什麼日誌。為了實現這一點,Leader 會對每個 follower 維護一個 nextIndex,用來標誌接下來要複製日誌的開始。Leader 每次發送 AppendEntries 時會發送 nextIndex 後的一些日誌,如果 Follower 檢測到這些 Leader 發來的日誌與自己的日誌有重複,則回復 false。Leader 收到 false 會減小 nextIndex 重試,直到找到分歧開始點,並複製這個點之後的日誌。

Raft 實現

我的設計參考了 etcd,整個 raft 狀態機就只有一個 goroutine,發送/接受 rpc 是另外的 goroutine,但他們會把對應的信息丟入 channel,含 raft 狀態機的 goroutine 會先從這些 channel 中拿信息,然後才丟入 raft 狀態機。這樣整個 raft 狀態機不需要考慮線程同步的問題

主要有四個 channel,proposeCh 用於接受客戶端的請求,recvCh 用於接受 rpc 以及 rpc 的響應,electionTimeoutCh 和 heartbeatTimeoutCh 分別是選取,心跳超時,需要進入下一輪選舉和發送心跳

整個大的 loop 是這樣的

func (rf *Raft) run(applyCh chan ApplyMsg) { for { select { case msg := <-rf.proposeCh: rf.step(msg) case msg := <-rf.recvCh: rf.step(msg) case <-rf.electionTimeoutCh: rf.step(Msg{msgType: msgElection}) case <-rf.heartbeatTimeoutCh: rf.step(Msg{msgType: msgHeartbeat}) case <-rf.done: return } }}

lab 的 rpc 調用都是同步的,我把它封裝成了非同步,用了一個很簡單的 reply channel 的 trick

// RequestVote RPC Handlerfunc (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { replyCh := make(chan interface{}) msg := Msg{ msgType: msgVote, term: args.Term, from: args.CandidateId, requestVoteArgs: *args, replyCh: replyCh, } select { case rf.recvCh <- msg: case <-time.After(time.Second * 10): } select { case resp := <-replyCh: *reply = resp.(RequestVoteReply) case <-time.After(time.Second * 10): }}func (rf *Raft) asyncSendRequestVote(id int) { args := RequestVoteArgs{rf.currentTerm, rf.me, rf.log.LastIndex(), rf.log.LastTerm()} go func(server *labrpc.ClientEnd, args RequestVoteArgs) { reply := RequestVoteReply{} ok := server.Call("Raft.RequestVote", &args, &reply) if ok { msg := Msg{ msgType: msgVoteResp, term: reply.Term, from: id, requestVoteArgs: args, requestVoteReply: reply, } select { case rf.recvCh <- msg: case <-time.After(time.Second * 10): } } }(rf.peers[id], args)}

理解 raft 我覺得最為重要的是需要理解 term 的意義,term 作為 logical time,告訴了當前節點哪些 msg 過期(term < currentTerm, 過期信息直接忽視),節點自身的信息是否過期 (term > currentTerm,reset term 並成為 follower)。還有一個要注意的是由於 rpc 調用都是非同步的,可能會收到上一個 term 時發送的請求的響應,這個請求應該有有用信息,但我這裡為了更好的理解進入狀態機消息的各種情況,簡化設計是直接丟棄了。

這樣一來 term check 和每個狀態下會收到消息的可能情況如下

func (rf *Raft) step(msg Msg) { // term check if msg.term == 0 { // term == 0 是 msgElection / msgHeartbeat / msgPropose } else if msg.term > rf.currentTerm { rf.resetTerm(msg.term, -1) rf.becomeFollower(-1) } else if msg.term < rf.currentTerm { return } // 只處理 Request/Response term 相同的 msg var term int if msg.msgType == msgVoteResp { term = msg.requestVoteArgs.Term } else if msg.msgType == msgAppendResp { term = msg.appendEntriesArgs.Term } if term == 0 { } else if term > rf.currentTerm { panic("term should increase") } else if term < rf.currentTerm { return } // 只有同一個 term 的 msg 才會走下面邏輯 switch rf.state { case Leader: rf.stepLeader(msg) case Candidate: rf.stepCandidate(msg) case Follower: rf.stepFollower(msg) }}func (rf *Raft) stepLeader(msg Msg) { switch msg.msgType { case msgElection: panic("leader should not get msgElection") case msgHeartbeat: rf.broadcastAppendEntries(true) rf.resetHeartbeatTimeout() case msgVote: // ignore, 一個之前被 partition 的節點選舉超時成為 candidate request vote case msgVoteResp: // ignore, 這個節點還是 candidate 時 request vote 的 response 延遲,收到當前 term 的 msgVoteResp 是正常情況 case msgAppend: panic("leader should not get current term msgAppend") case msgAppendResp: rf.handleAppendEntriesResp(msg) case msgPropose: rf.handlePropose(msg) }}func (rf *Raft) stepCandidate(msg Msg) { switch msg.msgType { case msgElection: rf.becomeCandidate() rf.broadcastRequestVote() case msgHeartbeat: panic("candidate should not get msgHeartbeat") case msgAppendTimeout: panic("candidate should not get msgAppendTimeout") case msgVote: // ignore, 多個 follower 同時成為 candidate, handleRequestVote 必定會返回 false // rf.handleRequestVote(msg) case msgVoteResp: rf.handleRequestVoteResp(msg) case msgAppend: // 當前 term 下有 candidate 競選成功 rf.becomeFollower(msg.from) rf.handleAppendEntries(msg) case msgAppendResp: panic("candidate should not get current term msgAppendResp") case msgPropose: rf.handlePropose(msg) }}func (rf *Raft) stepFollower(msg Msg) { switch msg.msgType { case msgElection: rf.becomeCandidate() rf.broadcastRequestVote() case msgHeartbeat: panic("follower should not get msgHeartbeat") case msgVote: rf.handleRequestVote(msg) case msgVoteResp: // ignore, candidate 發現已經有 leader 競選成功,降級為 follower, 之前競選時發送的 RequestVote response 延時 case msgAppend: rf.handleAppendEntries(msg) case msgAppendResp: panic("follower should not get current term msgAppendResp") case msgPropose: rf.handlePropose(msg) }}

handleRequestVote, handleAppendEntries, handleRequestVoteResp, handleAppendEntriesResp 這幾個 rpc 客戶端和服務端的處理只要理解並嚴格遵照 Figure2 就行了。

實現中踩的坑:

  1. Heartbeat 的 AppendEntries RPC 也需要做 prevLogIndex 的檢測,index of last new entry 對 heartbeat 來講是 prevLogIndex,對 appendEntry 來講則是發送的最後一條日誌的 index (本身 prevLogIndex 和發送日誌在索引上就是連續的)。而且對 Heartbeat 也必須做 consistency check 的檢測,follower reply true 則相當於告訴 leader 一些日誌已經同步到這個 follower (事實上沒有),leader 可能會 commit 一些未被 majority commit 的日誌,raft 的 safety 會有問題。
  2. 最早實現的一版帶日誌項的 appendEntries 的觸發是有 client 請求,但在如果之前一個 node 被 partition,後來網路恢復,如果遲遲沒有 client 請求的話,這個 node 就會一直沒有最新的已經 commit 日誌,會有 test case 過不去。於是另外實現了一個 appendTimeoutCh,類似 heartbeat,但是定期發送帶日誌的 appendEntries RPC,但感覺這裡複雜了,在發 heartbeat 前 random 一下決定是否帶 entries 就行了。

另外還有幾個有困惑的點:

  1. Figure2 中 AppendEntries receiver 端第五點 If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry), 但是可能會有 leaderCommit > commitIndex > index of last new entry 的情況,這樣不是會導致 commitIndex 的回退么?我現在的實現是 leaderCommit > rf.commitIndex && newEntryLastIndex > rf.commitIndex,這樣才能跑過所有 test,不知道是我實現的有問題還是哪裡理解錯了。
  2. 最後實現的 raft 大多數情況下都有跑過所有的測試,但是偶爾 liveness 會有點問題。debug 後復現了下場景:有 5 個 server,因為一些原因,現在 node1, node2,node3 在partition A,node4,node5 在partition B,node4,node5 的 raft 的日誌是最新的,node3 的 raft 日誌是最新的,這種情況下感覺 node3 選舉成為 leader 的可能性不太高額(只有 node3 有最新日誌因此只有 node3 才可能成為 leader)。原因是 node1 (node2 同理) 每次 election timeout 成為 candidate,因為 candidate 是投票給自己的,這樣 node3 無法獲得足夠的投票不能選舉成為 leader,而且只能等待 node1 超時重新開啟投票,但 node1 重新開啟投票時又會投票給自己,這樣會造成一段時間內無法選舉出 leader。

推薦閱讀:

bifrost : Rust 下的分散式系統框架
阿里PolarDB中的ParallelRaft中的亂序應答、亂序提交如何保證Raft協議的安全性?
Scaling Memcache in Facebook 筆記(二)
關於中小行核心系統分散式應用架構
分散式系統中實現遞增序列該怎麼做呢?

TAG:Raft | 分布式系统 |