etcd raft如何實現leadership transfer
leadership transfer可以把raft group中的leader身份轉給其中一個follower。這個功能可以用來做負載均衡,比如可以把leader放在性能更好的機器或者離客戶端更近的機器上。
對於一個大規模分散式系統來說,負載均衡非常重要。然而raft本身在選主方面必須要求新主包含所有的意境committed的log,從這點上看,在選主階段,不能加入自定義的選主邏輯。而paxos協議不太一樣,paxos對選主沒有要求,任何一個成員都可以成為主,選主協議可以自己實現。paxos leader當選後,從其他成員把commit的log拉過來即可。所以為了這個feature,raft作者提出了一個方案作為raft的擴展。
大概原理就是保證transferee(transfer的目標follower)擁有和原leader有一樣新的日誌,期間需要停寫,然後給transferee發送一個特殊的消息,讓這個follower可以馬上進行選主,而不用等到election timeout,正常情況下,這個follower的term最大,當選,原來的leader變為備。
還是一樣看看etcd實現的raft library怎麼做,省略無關代碼
首先應用通過如下函數來啟動leader transfer,其中lead是當前的leader,transferee是目標leader,在任意一個成員上調用即可。
func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {n select {n // manually set from and to, so that leader can voluntarily transfers its leadershipn case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:n case <-n.done:n case <-ctx.Done():n }n}n
跑raft的goroutine從recvc中拿出message,首先做各種各樣的檢查,比如是否已經有transfer leader正在進行中,如果正在進行,目標是誰,然後做相應的處理。如果沒有,則調用一下代碼:
r.leadTransferee = leadTransfereenif pr.Match == r.raftLog.lastIndex() {n r.sendTimeoutNow(leadTransferee)n r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)n} else {n r.sendAppend(leadTransferee)n}n
首先將目標leader保存在leadTransferee中,標示著有transfer正在進行,後續如果有請求propose進來,會檢查:
if r.leadTransferee != None {n r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)n returnn}n
這裡相當於停寫。
回到上面:
- 如果transferee和leader的log一樣新,則給transferee發送MsgTimeoutNow類型的消息,告訴transferee可以立即選主,不需要等到election timeout。transferee端:
r.campaign(campaignTransfer)n
raft為了防止出現網路分區的情況下,candidate頻繁增加term從而導致term爆炸,在選主的時候新增加了一個PreVote階段,通過了這個階段才會真正開始Vote,這裡,由於transferee明確知道是transfer,就沒有必要採用這種兩階段的選主,所以傳入的參數是campaignTransfer
- 如果leader發現transferee的日誌落後,則給transferee append日誌,leader在收到響應MsgAppResp後,會檢查:
// Transfer leadership is in progress.nif m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {n r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)n r.sendTimeoutNow(m.From)n}n
如果發現transferee已經日誌最新,則同樣,給transferee發送MsgTimeoutNow
最後,看看etcd如何調用:
func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error {n now := time.Now()n interval := time.Duration(s.Cfg.TickMs) * time.Millisecondnn plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))n s.r.TransferLeadership(ctx, lead, transferee)n for s.Lead() != transferee {n select {n case <-ctx.Done(): // time outn return ErrTimeoutLeaderTransfern case <-time.After(interval):n }n }nn // TODO: drain all requests, or drop all messages to the old leadernn plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))n return niln}n
調用TransferLeadership後,每隔一段時間檢查是否transfer成功,要麼超時,直接返回。
推薦閱讀:
※go-raft實現
※事件和時間:Time, Clocks, and the Ordering of Events in a Distributed System 讀後感
※分散式一致性演算法是如何解決少數派節點的寫順序一致性問題的?
※state machine replication vs primary backup system