go-raft實現

說明

nn

goraft是Raft協議的Golang版本的實現,項目地址為:goraft/raft。整個代碼質量較高,值得仔細品味。因此,整理了該博文探究下其內部實現。

nn

數據結構

nn

goraft主要抽象了server、peer和log三個結構,分別代表服務節點、Follower節點和日誌。

nn

server

nn

Raft作為一種多節點狀態一致性維護協議,運行過程中必然涉及到多個物理節點,server就是用來抽象其中的每個節點,維護節點的狀態信息。其結構如下:

nnnn

type server struct {n *eventDispatchernn name stringn path stringn state stringn transporter Transportern context interface{}n currentTerm uint64nn votedFor stringn log *Logn leader stringn peers map[string]*Peern mutex sync.RWMutexn syncedPeer map[string]boolnn stopped chan booln c chan *evn electionTimeout time.Durationn heartbeatInterval time.Durationnn snapshot *Snapshotnn // PendingSnapshot is an unfinished snapshot.n // After the pendingSnapshot is saved to disk,n // it will be set to snapshot and also will ben // set to nil.n pendingSnapshot *Snapshotnn stateMachine StateMachinen maxLogEntriesPerRequest uint64n connectionString stringn routineGroup sync.WaitGroupn}n

nn

  • state:每個節點總是處於以下狀態的一種:follower、candidate、leader
  • currentTerm:Raft協議關鍵概念,每個term內都會產生一個新的leader
  • peers:raft中每個節點需要了解其他節點信息,尤其是leader節點
  • syncedPeer:對於leader來說,該成員記錄了日誌已經被sync到了哪些follower
  • c:當前節點的命令通道,所有的命令都通過該channel來傳遞
  • pendingSnapshot:暫時未知

peer

npeer描述的是集群中其他節點的信息,結構如下:

nnnn

// A peer is a reference to another server involved in the consensus protocol.ntype Peer struct {n server *servern Name string `json:"name"`n ConnectionString string `json:"connectionString"`n prevLogIndex uint64n stopChan chan booln heartbeatInterval time.Durationn lastActivity time.Timen sync.RWMutexn}n

nn

  • server:peer中的某些方法會依賴server的狀態,如peer內的appendEntries方法需要獲取server的currentTerm
  • Name:peer的名稱
  • ConnectionString:peer的ip地址,形式為」ip:port」
  • prevLogIndex:這個很關鍵,記錄了該peer的當前日誌index,接下來leader將該index之後的日誌繼續發往該peer
  • lastActivity:記錄peer的上次活躍時間

log

nn

log是Raft協議的核心,Raft使用日誌來存儲客戶發起的命令,並通過日誌內容的同步來維護多節點上狀態的一致性。

nnnn

// A log is a collection of log entries that are persisted to durable storage.ntype Log struct {n ApplyFunc func(*LogEntry, Command) (interface{}, error)n file *os.Filen path stringn entries []*LogEntryn commitIndex uint64n mutex sync.RWMutexn startIndex uint64 n startTerm uint64n initialized booln}n

nn

  • ApplyFunc:日誌被應用至狀態機的方法,這個應該由使用raft的客戶決定
  • file:日誌文件句柄
  • path:日誌文件路徑
  • entries:內存日誌項緩存
  • commitIndex:日誌提交點,小於該提交點的日誌均已經被應用至狀態機
  • startIndex/startTerm:日誌中起始日誌項的index和term

log entry

nn

log entry是客戶發起的command存儲在日誌文件中的內容

nn

type LogEntry struct {n Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`n Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`n CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`n Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`n XXX_unrecognized []byte `json:"-"`n}nn// A log entry stores a single item in the log.ntype LogEntry struct {n pb *protobuf.LogEntryn Position int64 // position in the log filen log *Logn event *evn}n

nn

  • LogEntry是日誌項在內存中的描述結構,其最終存儲在日誌文件是經過protocol buffer編碼以後的信息
  • Position代表日誌項存儲在日誌文件內的偏移
  • 編碼後的日誌項包含Index、Term,原始Command的名稱以及Command具體內容

關鍵流程

nn

客戶端請求

nn

客戶端使用go-raft的時候,先初始化環境,這裡不仔細描述,接下來看客戶如何發起一個請求:

nnnn

command := &raft.DefaultJoinCommand{}nif _, err := s.raftServer.Do(command); err != nil {n http.Error(w, err.Error(), http.StatusInternalServerError)n returnn}n

nn

客戶命令執行的入口是Do:

nnnn

func (s *server) Do(command Command) (interface{}, error) {n return s.send(command)n}nn// Sends an event to the event loop to be processed. The function will wait until the event is actually processed before returning.nfunc (s *server) send(value interface{}) (interface{}, error) {n if !s.Running() {n return nil, StopErrorn }nn event := &ev{target: value, c: make(chan error, 1)}n select {n case s.c <- event:n case <-s.stopped:n return nil, StopErrorn }n select {n case <-s.stopped:n return nil, StopErrorn case err := <-event.c:n return event.returnValue, errn }n}n

nn

send的處理流程很簡單,首先將命令寫入到server的命令channel,然後等待命令處理完成。

nn

而server作為leader啟動完成時會進入一個leaderLoop來處理所有用戶的命令:

nnnn

func (s *server) leaderLoop() {n logIndex, _ := s.log.lastInfo()n ......n // Begin to collect response from followersn for s.State() == Leader {n select {n case <-s.stopped:n ......n case e := <-s.c:n switch req := e.target.(type) {n // 代表客戶端命令n case Command:n s.processCommand(req, e)n continuen ......n }n }n }n} n

nn

processCommand處理如下:

nnnn

// Processes a command.nfunc (s *server) processCommand(command Command, e *ev) {n s.debugln("server.command.process")nn // Create an entry for the command in the log.n entry, err := s.log.createEntry(s.currentTerm, command, e)nn if err != nil {n s.debugln("server.command.log.entry.error:", err)n e.c <- errn returnn }nn if err := s.log.appendEntry(entry); err != nil {n s.debugln("server.command.log.error:", err)n e.c <- errn returnn }nn s.syncedPeer[s.Name()] = truen if len(s.peers) == 0 {n commitIndex := s.log.currentIndex()n s.log.setCommitIndex(commitIndex)n s.debugln("commit index ", commitIndex)n }n}n

nn

這裡的邏輯比較簡單,創建日誌項並將日誌項append至日誌文件,如果過程中由任何錯誤,就將這個錯誤寫入e.c:e.c <- err,這樣等待在該channel的客戶端就會收到通知,立即返回。

nn

如果沒有錯誤,這時候客戶端還是處於等待狀態的,這是因為雖然該Command被leader節點成功處理了,但是該Command的日誌還沒有被同步至大多數Follow節點,因此該Command也就無法被提交,所以發起該Command的客戶端依然等在那,Command被提交,這在後面的日誌同步過程中會有所體現。

nn

日誌同步

nn

go-raft的leader向Follower同步日誌是在heartbeat中完成的:

nnnn

// Listens to the heartbeat timeout and flushes an AppendEntries RPC.nfunc (p *Peer) heartbeat(c chan bool) {n stopChan := p.stopChann c <- truen ticker := time.Tick(p.heartbeatInterval)nn for {n select {n case flush := <-stopChan:n if flush {n // before we can safely remove a noden // we must flush the remove command to the node firstn p.flush()n returnn } else {n returnn }nn case <-ticker:n start := time.Now()n p.flush()n duration := time.Now().Sub(start)n p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))n }n }n}nnfunc (p *Peer) flush() {n debugln("peer.heartbeat.flush: ", p.Name)n prevLogIndex := p.getPrevLogIndex()n term := p.server.currentTermnn entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)nn if entries != nil {n p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))n } else {n p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))n }n}n

nn

核心的邏輯是將leader上的日誌通過構造一個AppendEntriesRequest發送給從節點,當然只同步那些Follower上還沒有的日誌,即prevLogIndex以後的log entry。

nnnn

// Sends an AppendEntries request to the peer through the transport.nfunc (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {nn resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)n if resp == nil {n p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))n returnn }n p.setLastActivity(time.Now())n // If successful then update the previous log index.n p.Lock()n if resp.Success() {n ......n }n ......n resp.peer = p.Namen // Send response to server for processing.n p.server.sendAsync(resp)n}n

nn

這裡會將Follower的心跳的響應繼續發送給server。server會在leaderLoop中處理該類消息:

nnnn

func (s *server) leaderLoop() {n logIndex, _ := s.log.lastInfo()n ......n // Begin to collect response from followersn for s.State() == Leader {n select {n case e := <-s.c:n switch req := e.target.(type) {n case Command:n s.processCommand(req, e)n continuen case *AppendEntriesRequest:n e.returnValue, _ = s.processAppendEntriesRequest(req)n case *AppendEntriesResponse:n s.processAppendEntriesResponse(req)n case *RequestVoteRequest:n e.returnValue, _ = s.processRequestVoteRequest(req)n }nn // Callback to event.n e.c <- errn }n }n s.syncedPeer = niln}n

nn

處理Follower的響應在函數processAppendEntriesResponse中:

nnnn

func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {n // If we find a higher term then change to a follower and exit.n if resp.Term() > s.Term() {n s.updateCurrentTerm(resp.Term(), "")n returnn }nn // panic response if its not successful.n if !resp.Success() {n returnn }nn // if one peer successfully append a log from the leader term,n // we add it to the synced listn if resp.append == true {n s.syncedPeer[resp.peer] = truen }nn if len(s.syncedPeer) < s.QuorumSize() {n returnn }n // Determine the committed index that a majority has.n var indices []uint64n indices = append(indices, s.log.currentIndex())n for _, peer := range s.peers {n indices = append(indices, peer.getPrevLogIndex())n }n sort.Sort(sort.Reverse(uint64Slice(indices)))nn commitIndex := indices[s.QuorumSize()-1]n committedIndex := s.log.commitIndexnn if commitIndex > committedIndex {n s.log.sync()n s.log.setCommitIndex(commitIndex)n }n}n

nn

這裡會判斷如果多數的Follower都已經同步日誌了,那麼就可以檢查所有的Follower此時的日誌點,並根據log index排序,leader會算出這些Follower的提交點,然後提交,調用setCommitIndex。

nnnn

// Updates the commit index and writes entries after that index to the stable storage.nfunc (l *Log) setCommitIndex(index uint64) error {n l.mutex.Lock()n defer l.mutex.Unlock()nn // this is not error any more after limited the number of sending entriesn // commit up to what we already haven if index > l.startIndex+uint64(len(l.entries)) {n index = l.startIndex + uint64(len(l.entries))n }n if index < l.commitIndex {n return niln }nn for i := l.commitIndex + 1; i <= index; i++ {n entryIndex := i - 1 - l.startIndexn entry := l.entries[entryIndex]nn l.commitIndex = entry.Index()nn // Decode the command.n command, err := newCommand(entry.CommandName(), entry.Command())n if err != nil {n return errn }n returnValue, err := l.ApplyFunc(entry, command)n if entry.event != nil {n entry.event.returnValue = returnValuen entry.event.c <- errn }n _, isJoinCommand := command.(JoinCommand)n if isJoinCommand {n return niln }n }n return niln}n

nn

這裡的提交主要是設置好commitIndex,並且將日誌項中的Command應用到狀態機。最後,判斷這個LogEntry是不是由客戶直接發起的,如果是,那麼還需要將狀態機的處理結果通過event.c返回給客戶端,這樣,客戶端就可以返回了,請回顧上面的客戶端請求。

nn

選主

nn

在Raft協議運行過程中,Leader節點會周期性的給Follower發送心跳,心跳的作用有二:一方面,Follower通過心跳確認Leader此時還是活著的;第二,Leader通過心跳將自身的日誌同步發送給Follower。

nn

但是,如果Follower在超過一定時間後沒有收到Leader的心跳信息,就認定Leader可能離線,於是,該Follower就會變成Candidate,發起一次選主,通知其他節點開始為我投票。

nnnn

func (s *server) followerLoop() {n since := time.Now()n electionTimeout := s.ElectionTimeout()n timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)nn for s.State() == Follower {n var err errorn update := falsen select {n ......n // 超過一定時間未收到請求n case <-timeoutChan:n if s.promotable() {n // 狀態變為Candidaten s.setState(Candidate)n } else {n update = truen }n }n }n ......n}nn// The main event loop for the servernfunc (s *server) loop() {n defer s.debugln("server.loop.end")nn state := s.State()nn for state != Stopped {n switch state {n case Follower:n s.followerLoop()n // 狀態變為Candidate後,進入candidateLoopn case Candidate:n s.candidateLoop()n case Leader:n s.leaderLoop()n case Snapshotting:n s.snapshotLoop()n }n state = s.State()n }n}n

nn

當節點狀態由Follower變為Candidate後,就會進入candidateLoop來觸發一次選主過程。

nnnn

func (s *server) candidateLoop() {n for s.State() == Candidate {n if doVote {n s.currentTerm++n s.votedFor = s.namen // 向所有其他節點發起Vote請求n respChan = make(chan *RequestVoteResponse, len(s.peers))n for _, peer := range s.peers {n s.routineGroup.Add(1)n go func(peer *Peer) {n defer s.routineGroup.Done()n n peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)n }(peer)n }n // 自己給自己投一票n votesGranted = 1n timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)n doVote = falsen }n // 如果多數節點同意我作為Leader,設置新狀態n if votesGranted == s.QuorumSize() {n s.setState(Leader)n returnn }nn // 等待其他節點的選主請求的響應n select {n case <-s.stopped:n s.setState(Stopped)n returnnn case resp := <-respChan:n if success := s.processVoteResponse(resp); success {n votesGranted++n }n ......n case <-timeoutChan:n // 如果再一次超時了,重新發起選主請求n doVote = truen }n}n

nn

別看上面的代碼很多,但是其中邏輯非常清楚。就不作過多說明了。

nn

上面描述了一個Follower節點變為Candidate後,如何發起一次選主,接下來看看一個節點在收到其他節點發起的選主請求後的處理,在函數processRequestVoteRequest():

nn

// Processes a "request vote" request.nfunc (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {nn if req.Term < s.Term() {n return newRequestVoteResponse(s.currentTerm, false), falsen }n if req.Term > s.Term() {n s.updateCurrentTerm(req.Term, "")n } else if s.votedFor != "" && s.votedFor != req.CandidateName {n return newRequestVoteResponse(s.currentTerm, false), falsen }nn lastIndex, lastTerm := s.log.lastInfo()n if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {n return newRequestVoteResponse(s.currentTerm, false), falsen }n s.votedFor = req.CandidateNamen return newRequestVoteResponse(s.currentTerm, true), truen}n

nn

接受一個遠程節點的選主請求需要滿足以下條件:

nn

  • 遠程節點的term必須要大於等於當前節點的term;
  • 遠程節點的log必須比當前節點的更新;
  • 當前節點的term和遠程節點的選主請求的term如果一樣且當前節點未給任何其他節點投出自己的選票。

整個流程其實也是蠻簡單的。

nn

節點變更

nn

在Raft協議中,節點的變更也是作為一個客戶的命令通過一致性協議統一管理:也就是說,節點變更命令被寫入Leader的日誌,然後再由Leader同步到Follower,最後如果多數Follower成功寫入該日誌,主節點提交該日誌。

nn

在Go-Raft中,存在兩種節點變更命令:DefaultJoinCommand和DefaultLeaveCommand,對於這兩種命令的處理關鍵在於這兩個命令的Apply方法,如下:

nnnn

func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {n err := server.AddPeer(c.Name, c.ConnectionString)n return []byte("join"), errn}nnfunc (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {n err := server.RemovePeer(c.Name)n return []byte("leave"), errn}n

nn

增加節點最終的提交方法是AddPeer:

nnnn

func (s *server) AddPeer(name string, connectiongString string) error {n if s.peers[name] != nil {n return niln }nn if s.name != name {n peer := newPeer(s, name, connectiongString, s.heartbeatInterval)n // 如果是主上新增一個peer,那還需要啟動後台協程發送n if s.State() == Leader {n peer.startHeartbeat()n }n s.peers[peer.Name] = peern s.DispatchEvent(newEvent(AddPeerEventType, name, nil))n }n // Write the configuration to file.n s.writeConf()n return niln}n

nnnn

// Removes a peer from the server.nfunc (s *server) RemovePeer(name string) error {n // Skip the Peer if it has the same name as the Servern if name != s.Name() {n // Return error if peer doesnt exist.n peer := s.peers[name]n if peer == nil {n return fmt.Errorf("raft: Peer not found: %s", name)n }n // 如果是Leader,停止給移除節點的心跳協程n if s.State() == Leader {n s.routineGroup.Add(1)n go func() {n defer s.routineGroup.Done()n peer.stopHeartbeat(true)n }()n }n delete(s.peers, name)n s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))n }n // Write the configuration to file.n s.writeConf()n return niln}n

nn

Snapshot

nn

根據Raft論文描述,隨著系統運行,存儲命令的日誌文件會一直增長,為了避免這種情況,論文中引入了Snapshot。Snapshot的出發點很簡單:淘汰掉那些無用的日誌項,那麼問題就來了:

nn

  • 哪些日誌項是無用的,可以丟棄?
  • 如何丟棄無用日誌項?

接下來我們各個擊破:

nn

  • 如果某個日誌項中存儲的用戶命令(Command)已經被提交到狀態機中,那麼它就被視為無用的,可以被清理;
  • 因為日誌的提交是按照index順序執行的,因此,只要知道當前副本的提交點(commit index),那麼在此之前的所有日誌項必然也已經被提交了,因此,這個提交點之前(包括該提交點)的日誌都可以被刪除。實現上,只要將提交點之後的日誌寫入新的日誌文件,再刪除老的日誌文件,就大功告成了;
  • 最後需要注意的一點是:在回收日誌文件之前,必須要對當前的系統狀態機進行保存,否則,狀態機數據丟失以後,又刪了日誌,狀態真的就無法恢復了。

goraft的Snapshot是由應用主動觸發的,調用其內部函數TakeSnapshot:

nnnn

func (s *server) TakeSnapshot() error {n ......n lastIndex, lastTerm := s.log.commitInfo()n ......n path := s.SnapshotPath(lastIndex, lastTerm)n s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}n // 首先應用保存狀態機當前狀態n state, err := s.stateMachine.Save()n if err != nil {n return errn }nn // 準備Snapshot狀態:包括當前日誌的index,當前peer等n peers := make([]*Peer, 0, len(s.peers)+1)n for _, peer := range s.peers {n peers = append(peers, peer.clone())n }n s.pendingSnapshot.Peers = peersn s.pendingSnapshot.State = staten s.saveSnapshot()nn // 最後,回收日誌項:s.log.compact()n if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {n compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshotn compactTerm := s.log.getEntry(compactIndex).Term()n s.log.compact(compactIndex, compactTerm)n }n return niln}n

nn

關於compact()函數就不作仔細描述了,有興趣的朋友可以自行閱讀,非常簡單的。

推薦閱讀:

etcd-raft日誌管理
一致性相關知識的索引

TAG:分布式系统 | 分布式一致性 | 分布式存储 |