etcd-raft的線性一致讀方法一:ReadIndex

說明

在分散式系統中,存在多種一致性模型,諸如嚴格一致性、線性一致性、順序一致性等。不同的一致性模型給應用提供的數據保證也不同,其代價也不一樣,一致性越強,代價越高。但是一致性越強,對應用的使用也就越友好。關於一致性模型的更多描述可參考:aphyr.com/posts/313-str

問題背景

在分散式系統中,etcd-raft的實現模式是Leader + Followers,即存在一個Leader和多個Follower,所有的更新請求都經由Leader處理,Leader再通過日誌同步的方式複製到Follower節點。

讀請求的處理則沒有這種限制:所有的節點(Leader與Followers)都可以處理用戶的讀請求,但是由於以下幾種原因,導致從不同的節點讀數據可能會出現不一致:

  • Leader和Follower之間存在狀態差:這是因為更新總是從Leader複製到Follower,因此,Follower的狀態總的落後於Leader,不僅於此,Follower之間的狀態也可能存在差異。因此,如果不作特殊處理,從集群不同的節點上讀數據,讀出的結果可能不相同;
  • 假如限制總是從某個特點節點讀數據,一般是Leader,但是如果舊的Leader和集群其他節點出現了網路分區,其他節點選出了新的Leader,但是舊Leader並沒有感知到新的Leader,於是出現了所謂的腦裂現象,舊的Leader依然認為自己是主,但是它上面的數據已經是過時的了,如果客戶端的請求分別被舊的和新的Leader分別處理,其得到的結果也會不一致。

如果不對讀流程作任何的特殊處理,上述限制就會導致一個非一致性的讀。而線性一致性的兩個要求:一致性讀讀最新數據更是無從談起。

實現

原理

etcd-raft通過一種稱為ReadIndex的機制來實現線性一致讀,其基本原理也很簡單:Leader節點在處理讀請求時,首先需要與集群多數節點確認自己依然是Leader,然後讀取已經被應用到應用狀態機的最新數據。

基本原理包含了兩方面內容:

  • Leader首先通過某種機制確認自己依然是Leader;
  • Leader需要給客戶端返回最近已應用的數據:即最新被應用到狀態機的數據。

數據結構

ReadState

type ReadState struct {n Index uint64n RequestCtx []byten}n

ReadState負責記錄每個客戶端的讀請求的狀態,其中包含:

  • RequestCtx:客戶端讀請求的唯一標識,一般使用request id
  • Index:表示讀請求產生時當前節點的Commit點

ReadState最終會被返回給應用(通過Ready),由應用負責處理客戶端的讀請求,而且,應用需要根據該請求發起時的節點Commit信息決定返回何時的數據。

readIndexStatus

type readIndexStatus struct {n req pb.Messagen index uint64 n acks map[uint64]struct{}n}n

readIndexStatus用來追蹤Leader向Followers發送的心跳信息的響應,其中:

  • req:表示原始的ReadIndex請求,這是應用在處理客戶端讀請求時向底層raft協議核心處理層發起的ReadIndex請求;
  • index:表示Leader當前的Commit信息;
  • acks:記錄Followers的響應,某個Follower如果確認了Leader的心跳消息,acks便會記錄一個

readOnly

type readOnly struct {n option ReadOnlyOptionn pendingReadIndex map[string]*readIndexStatusn readIndexQueue []stringn}n

readOnly管理全局的讀ReadIndex請求,其中:

  • option:暫時不確定含義;
  • pendingReadIndex:保存所有的待處理的ReadIndex請求,實現上是一個map,其中key為請求的唯一標識,一般為節點為請求生成的唯一request id;
  • readIndexQueue:所有的ReadIndex的數組

關鍵流程

客戶端讀請求處理

由於etcd-raft自帶的示例應用並沒有實現線性一致性讀,因此,選擇etcd-server作為例子來說明如何通過read index方法來實現線性一致讀。

etcd-server在啟動時會創建一個後台協程,運行的方法是:linearizableReadLoop,如下:

func (s *EtcdServer) Start() { n s.start()n s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) n s.goAttach(s.purgeFile)n s.goAttach(func() { monitorFileDescriptor(s.stopping) }) n s.goAttach(s.monitorVersions)n s.goAttach(s.linearizableReadLoop)n s.goAttach(s.monitorKVHash)n}n

這個協程幹了什麼呢?其實很簡單,等著有讀請求的信號,並且在有信號來的時候調用底層的raft核心協議處理層來獲取信號發生時刻的commit index,如下:

func (s *EtcdServer) linearizableReadLoop() {n var rs raft.ReadStaten for {n ctx := make([]byte, 8)n binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())nn // 主要是等待readwaitc上的信號 n select {n case <-s.readwaitc:n case <-s.stopping:n return n }nn // 有信號到來 n nextnr := newNotifier()n s.readMu.Lock()n nr := s.readNotifiern s.readNotifier = nextnrn s.readMu.Unlock()nn // 準備讀取當前節點的commit index n cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())n if err := s.r.ReadIndex(cctx, ctx); err != nil {n ......n }n cancel()n // 等待底層返回當次讀請求時的commit index n // 增加了超時控制 n for !timeout && !done {n ......n }nn // 判讀應用當前已經應用至狀態機的index是否小於已commit的index n // 如果是,等待狀態機提交至commit index n if ai := s.getAppliedIndex(); ai < rs.Index {n select {n case <-s.applyWait.Wait(rs.Index):n case <-s.stopping:n return n }n }n // 至此,commit index已經被應用至狀態機 n // 可以通知讀請求從狀態機中返回數據了 n nr.notify(nil)n }n}n

這裡的linearizableReadLoop相當於是一把鎖,控制讀請求何時可以從狀態機中讀取最新數據。

我們上面說到linearizableReadLoop是在收到信號的時候進行read index控制,那該信號又是由誰觸發呢?

信號的直接來源是linearizableReadNotify:

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {n s.readMu.RLock()n nc := s.readNotifiern s.readMu.RUnlock()nn // 這裡發送信號 n select {n case s.readwaitc <- struct{}{}:n default:n }nn // 等待read state結束 n select {n case <-nc.c:n return nc.errn case <-ctx.Done():n return ctx.Err()n case <-s.done:n return ErrStoppedn }n}n

linearizableReadNotify的實現非常簡單,這裡就不再贅述,linearizableReadNotify會在讀請求中被調用,如下:

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {n if !r.Serializable {n err := s.linearizableReadNotify(ctx)n if err != nil {n return nil, errn }n }n ......n}n

上面我們就以etcd-server為例分析了客戶端的讀請求是如何在服務端被處理。由於增加了read index過程而導致了比正常的請求處理增加了複雜性。接下來,我們要看看raft的核心協議處理層如何處理這種ReadIndex請求。

核心協議層處理ReadIndex請求

也即:客戶端的讀請求也需要被參與到協議的核心處理層。為此,核心處理層提供了一個介面專門處理讀,ReadIndex:

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {n return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})n}nnfunc (n *node) step(ctx context.Context, m pb.Message) error {n ch := n.recvcn if m.Type == pb.MsgProp {n ch = n.propcn }n select {n case ch <- m:n return nil n case <-ctx.Done():n return ctx.Err()n case <-n.done:n return ErrStoppedn }n}n

ReadIndex的消息最終被發往了recvc這個channel,後台協程會接管這個消息並調用核心的協議處理函數:

func (n *node) run(r *raft) {n for {n select {n case m := <-n.recvc:n _, ok := r.prs[m.From]n if ok || !IsResponseMsg(m.Type) {n r.Step(m)n }n case ...:n }n }n}n

如果節點是Leader,那麼該消息最終的處理函數是stepLeader

func stepLeader(r *raft, m pb.Message) {n switch m.Type {n case pb.MsgReadIndex:n if r.quorum() > 1 {n if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {n returnn }n switch r.readOnly.option {n // 關注的是該分支 n case ReadOnlySafe:n r.readOnly.addRequest(r.raftLog.committed, m)n r.bcastHeartbeatWithCtx(m.Entries[0].Data)n case ReadOnlyLeaseBased:n ......n }n }n}n

本文討論的ReadIndex方案對應的是ReadOnlySafe這個分支,其中addRequest()會把這個讀請求到達時的leader的commit index保存起來,並且維護一些狀態信息,而bcastHeartbeatWithCtx()則向其他Followers節點發送心跳消息MsgHeartbeat,而當leader收到心跳響應消息MsgHeartbeatResp時處理為:

func stepLeader(r *raft, m pb.Message) {n ......n case pb.MsgHeartbeatResp:n ackCount := r.readOnly.recvAck(m)n if ackCount < r.quorum() {n returnn }n rss := r.readOnly.advance(m)n for _, rs := range rss {n req := rs.req n if req.From == None || req.From == r.id { n // from local membern r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})n } else {n // 如果是follower節點發來的ReadIndex請求n // 給它返迴響應n r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})n }n }n }n}n

如果接收到了多數派的心跳響應,則會從剛才保存的信息中將對應讀請求當時的commit index和請求id拿出來,填充到ReadState中,該結構最終會被返回給調用ReadIndex的應用。

需要說明的是:r.readOnly.advance(m)會返回m以前的所有的request id(m其實也就是應用調用ReadIndex時的請求id,而且請求id是單調遞增的,因此當在某個時刻確認了請求id為m的主信息,那麼m之前的主信息也認為是此),因此,上面代碼的rss其實是一個slice。

以上我們其實是假設該讀請求是發生在leader上面的,leader通過心跳確認自己是主,然後再保證讀時刻的commit index被應用至狀態機後返回讀結果給客戶端,這就保證了客戶端得到的結果一定是最新的。

那假如讀請求是發生在follower上呢?對於etcd-server應用來說,它照樣是遵循上面的請求處理流程,應用層會調用協議的核心處理層的ReadIndex方法,但是核心協議層識別當前節點是follower,那請求處理流程是:

func stepFollower(r *raft, m pb.Message) {n switch m.Type {n ......n case pb.MsgReadIndex:n if r.lead == None {n returnn }n m.To = r.lead n r.send(m)n case pb.MsgReadIndexResp:n r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})n ......n} n

可以發現,如果是在Follower節點上執行ReadIndex,那麼它必須先要向Leader去查詢commit index,然後收到響應後在創建ReadState記錄commit信息,後續的處理和Leader別無二致。

總結

經過上面一通複雜處理,就達到了效果:無論從主還是從上去讀,保證讀到的數據都一致(都是在主上被commit後的狀態)

參考

  • zhuanlan.zhihu.com/p/27

推薦閱讀:

自習周報:CoreOS 的黑魔法

TAG:分布式存储 | etcd | 分布式一致性 |