etcd-raft節點變更
說明
從etcd-raft的架構來看,節點變更功能的實現需要應用和底層核心協議處理層互相配合。客戶端發起節點增加或移除的命令,應用獲得該請求,並將其轉換為一個節點變更指令交給底層的raft協議核心處理層。
數據結構
ConfChange
type ConfChange struct {n ID uint64 n Type ConfChangeType n NodeID uint64 n}n
該結構表示節點變更的信息,也是上層應用傳遞給底層的節點變更消息的結構,其中:
- ID: 表示節點變更的消息id,這個意義不大
- Type: ConfChangeAddNode或者ConfChangeRemoveNode
- NodeID: 變更節點的id
關鍵流程
節點變更請求應用處理
以etcd-raft自帶的示例應用來說明應用對此類請求的處理流程:
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {n key := r.RequestURIn switch {n case ...:n case r.Method == "POST":n ......n cc := raftpb.ConfChange{n Type: raftpb.ConfChangeAddNode,n NodeID: nodeId,n Context: url,n }n h.confChangeC <- ccn case r.Method == "DELETE":n ......n cc := raftpb.ConfChange{n Type: raftpb.ConfChangeRemoveNode,n NodeID: nodeId,n }n h.confChangeC <- ccn }n}n
應用的前端接入層實現較為簡單:將命令封裝成ConfChange消息並通過管道通知其他後台任務即可:
func (rc *raftNode) serveChannels() {n go func() {n for rc.proposeC != nil && rc.confChangeC != nil {n select {n case prop, ok := <-rc.proposeC:n ......n case cc, ok := <-rc.confChangeC:n if !ok {n rc.confChangeC = nil n } else { n confChangeCount += 1 n cc.ID = confChangeCountn rc.node.ProposeConfChange(context.TODO(), cc)n }n }n }n }()n
該ConfChange消息最終被該協程捕獲,並最終調用底層的ProposeConfChange來處理該消息。
底層處理入口
應用的ConfChange消息最終通過ProposeConfChange進入底層協議處理層:func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { n data, err := cc.Marshal()n if err != nil {n return errn }n return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})n}nnfunc (n *node) Step(ctx context.Context, m pb.Message) error { n ......n return n.step(ctx, m)n}n
可以發現,對於節點變更消息ConfChange,它與正常的更新命令處理在命令的類型(EntryConfChange)上不同外,對命令的處理流程則是完全一致的。關於這個,我們就不再贅述,有興趣的可以參考之前的文章。
當消息在多數Follower節點上被持久化存儲後,該消息便會在Leader上進行Commit,接下來該Commit也會被同步至集群的Follower節點上。此時,應用會執行日誌項中的命令,對於ConfChange命令:
func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {n for i := range ents {n switch ents[i].Type {n case raftpb.EntryNormal:n ...n case raftpb.EntryConfChange:n cc.Unmarshal(ents[i].Data)n rc.confState = *rc.node.ApplyConfChange(cc)n switch cc.Type {n case raftpb.ConfChangeAddNode:n if len(cc.Context) > 0 {n rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})n }n case raftpb.ConfChangeRemoveNode:n if cc.NodeID == uint64(rc.id) {n return falsen }n rc.transport.RemovePeer(types.ID(cc.NodeID))n }n }n ...n }n}n
對於EntryConfChange的日誌項,有以下工作要做:
- 通知raft協議核心處理層節點變更消息;
- 通知網路傳輸模塊節點變更消息;
1的通知是調用函數ApplyConfChange完成,而2則是根據節點變更的種類調用相應的函數:AddPeer或者RemovePeer。
需要說明的一點是:在etcd-raft的實現中,節點變更日誌項詳細被應用的時機和普通的更新日誌項一樣,都是在日誌項消息被複制到集群多數節點上,消息被Commit之後執行的。
增加節點
應用通知raft協議核心處理層的日誌項消息最終會被協議核心層的後台協程接受到並處理:
func (n *node) run(r *raft) {n ......n for {n select {n case cc := <-n.confc:n switch cc.Type {n case pb.ConfChangeAddNode:n r.addNode(cc.NodeID)n case pb.ConfChangeRemoveNode:n r.removeNode(cc.NodeID)n default:n panic("unexpected conf type")n }n select {n case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:n case <-n.done:n }n case ...:n }n }n} n
真正的增加節點處理如下:
func (r *raft) addNode(id uint64) {n r.pendingConf = false n if _, ok := r.prs[id]; ok {n return n }n r.setProgress(id, 0, r.raftLog.lastIndex()+1)n r.prs[id].RecentActive = true n}n
在raft協議核心處理層,增加節點便是為其分配一個Progress結構,通過該結構追蹤對端節點的運行狀態。
刪除節點
func (r *raft) removeNode(id uint64) {n r.delProgress(id)n r.pendingConf = false n if len(r.prs) == 0 {n return n }nn if r.maybeCommit() {n r.bcastAppend()n }n if r.state == StateLeader && r.leadTransferee == id {n r.abortLeaderTransfer()n }n}n
removeNode相比addNode複雜一點,除了移除該節點的Progress結構外,還需要考慮到刪除節點帶來的集群節點數量減少導致的影響,如由於多數派變少而導致的某些日誌項可以被Commit等。
推薦閱讀:
※raft協議應用方面的疑問?
※分散式系統理論進階 - Paxos
※Atlas: Baidu分散式存儲系統
※小議分散式系統的一致性模型