PhxPaxos架構設計、實現分析
架構設計
Paxos為分散式一致性協議,用於解決非同步通信環境下分散式系統的一致性問題。做為分散式系統,必定存在多個節點共同參與(一般來說節點數量為奇數個,常用的節點數量如3,5等)。
一個Paxos實例執行可以確定一個值,其最實際的工程價值莫過於提供選主服務。但如果可以有序的確定多個值,再結合狀態機將值賦予業務含義(如日誌回放等),Paxos將具有更大的價值。
PhxPaxos基於《Paxos made simple》實現,可以做到有序確定多個值,更進一步的是其可以同時確定多組有序的值。
PhxPaxos整體架構如下:
- 一個Group用於有序的確定多個值,多個Group可以同時確定多組有序的值。
- Group中的每個Node代表一個物理節點(VM或進程),一個Node中同時運行了多個Group的Instance。
- 每個Instance為Paxos協議的執行主體,用於確定一個值,當Instance N的值被確定後,Instance N被銷毀,構建Instance N+1確定下一個值。
Node
一個Node對應一個物理節點,如果需要正確的運行Paxos協議,必須由以下部分組成:
- 網路模塊
- 負責節點間網路通信。
- 數據存儲
- 負責記錄確定的有序值。
- Paxos分組
- 上一節提到的一個Node同時運行多個Group的能力。
- 在實現上,Node維護了一個Group的列表,每個Group中包含一個Instance對象
- instance負責掌控整個Paxos協議的運行,也包括對其他機制的調控。
- 選主服務
- 用於確定Paxos的主節點。
主節點不是必須的,但選主後僅由主節點發起服務可以有效的提升性能。
選主服務有兩點比較特殊:
- 本身使用了Paxos協議
- 每個Group需要單獨選主(存在另外一種實現:選主服務下放到Paxos分組中去完成)
PhxPaxos中的節點除了做為Paxos協議的參與者,還運行另外一類成為follower的節點。Follower指定一個運行Paxos協議的節點用於數據同步,它節點不參與Paxos協議,也不參與Paxos選主。Follower更像傳統意義上的同步備,當Paxos協議節點確定一個值後,將數據同步到Follower節點。但有一點不同的是:Follower節點運行Learner,當某個值缺失時,可以通過Learner主動發起AskForLearn習得。
Instance
每個Node的每個Group下運行一個Instance實例,Instance實例用於確定當前值。多個Node相同Group中的Instance相互通信,完成Paxos協議.
Instance由以下部分組成:
- Proposer
- Paxos提案發起者。
- Acceptor
- Paxos提案接收者。
- Learner
- Paxos值習得者。
CheckpointMgr
- 鏡像數據管理者。
- 指引業務生成鏡像數據,一旦指定instance id之前的鏡像數據產生,理論上就可以移除該instance id之前的Paxos Log數據,以免空間的無限擴展。這部分可以參加《微信自研生產級paxos類庫PhxPaxos實現原理介紹》、《狀態機Checkpoint詳解》。
網路實現
PhxPaxos中網路介面抽象為NetWork,內置實現類為DFNetWrok,支持UDP、TCP兩種協議。PhxPaxos將網路操作全非同步化,一共啟了5個線程:
- UDPRecv
- 創建一個基於UDP協議的socket對象
- 線程負責接收來自網路其他節點的UDP消息,並通知Instance處理(根據消息中的group id找到正確的group)
- UDPSend
- 所有Instance需要發送的UDP消息預先放入消息隊列
- 線程負責將消息隊列中的消息發送至其他節點
- TcpAcceptor
- 創建一個基於TCP協議的socket對象
- 線程負責接收來自客戶端的連接請求,並將連接信息放入隊列,交由TcpRead線程處理
- TcpRead
- 啟動TcpAcceptor線程
- 啟動EventLoop,EventLoop採用epoll方式
- EventLoop負責將TcpAcceptor中的請求轉換為MessageEvent,並等待TcpClient發送消息
- EventLoop負責接收來自不同TcpClient的消息,並通知Instance處理(根據消息中的group id找到正確的group)
- TcpWrite
- 啟動EventLoop
- TcpClient負責接收所有Instance發送的TCP消息,預先放入消息隊列,並通知EventLoop立即處理(通過Pipe通信通知)
- TcpClient將消息放入到消息隊列前,首先建立和服務端基於TCP協議的Scoket通信
- 線程負責將消息隊列中的消息發送至其他節點
數據存儲
PhxPaxos的數據存儲抽象介面為LogStorage,內置實現類為MultiDatabase。MultiDatabase中針對每個Group創建了一個Databse對象負責真正的數據存儲,它只是Database一個很薄的封裝層。Database內部也並無過多的處理邏輯,其直接使用了高性能的本地資料庫LevelDB,關於LevelDB的詳細分析可參見我的系列文章《LevelDB源碼剖析》。
數據存儲模塊的職責如下:
- Acceptor中所有accept數據存儲,其中最主要的為instance id和value
- 存儲其他PhxPaxos需要持久化的信息,如MasterVariables、SystemVariables等
Paxos協議實現
Paxos協議中規定了三類角色:Proposer、Accetor、Learner。協議實現完全基於《Paxos made simple》,實現上也並不複雜,來看代碼實現:
Proposer
//發起提案的入口函數 int Proposer :: NewValue ( const std::string & sValue ) { BP->GetProposerBP()->NewProposal ( sValue ); //記錄本次的提案值 if ( m_oProposerState.GetValue().size() == 0 ) { m_oProposerState.SetValue ( sValue ); } m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS; m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS; //允許跳過Prepare階段,直接進入Accept if ( m_bCanSkipPrepare && !m_bWasRejectBySomeone ) { BP->GetProposerBP()->NewProposalSkipPrepare(); PLGHead ( "skip prepare, directly start accept" ); Accept(); } else //從Prepare階段開始Paxos協議 { //if not reject by someone, no need to increase ballot Prepare ( m_bWasRejectBySomeone ); } return 0; } //Prepare階段 void Proposer :: Prepare ( const bool bNeedNewBallot ) { PLGHead ( "START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu", GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size() ); BP->GetProposerBP()->Prepare(); m_oTimeStat.Point(); ExitAccept(); m_bIsPreparing = true; m_bCanSkipPrepare = false; m_bWasRejectBySomeone = false; m_oProposerState.ResetHighestOtherPreAcceptBallot(); //分配一個新的提案編號:Porposal ID if ( bNeedNewBallot ) { m_oProposerState.NewPrepare(); } //發起提案:instance id、node id、proposal id PaxosMsg oPaxosMsg; oPaxosMsg.set_msgtype ( MsgType_PaxosPrepare ); oPaxosMsg.set_instanceid ( GetInstanceID() ); oPaxosMsg.set_nodeid ( m_poConfig->GetMyNodeID() ); oPaxosMsg.set_proposalid ( m_oProposerState.GetProposalID() ); m_oMsgCounter.StartNewRound(); //添加定時器,當出現超時後,重新觸發Prepare AddPrepareTimer(); PLGHead ( "END OK" ); BroadcastMessage ( oPaxosMsg ); } //來自各個Acceptor的響應消息 void Proposer :: OnPrepareReply ( const PaxosMsg & oPaxosMsg ) { PLGHead ( "START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu", oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid() ); BP->GetProposerBP()->OnPrepareReply(); //當前以不在Prepare階段,不處理 //1. 某個節點響應過慢,提案已進入到Accept階段 //2. 整個提案響應過慢,提案已終止 if ( !m_bIsPreparing ) { BP->GetProposerBP()->OnPrepareReplyButNotPreparing(); //PLGErr("Not preparing, skip this msg"); return; } if ( oPaxosMsg.proposalid() != m_oProposerState.GetProposalID() ) { BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg(); //PLGErr("ProposalID not same, skip this msg"); return; } //記錄已收到來自node id節點的響應 m_oMsgCounter.AddReceive ( oPaxosMsg.nodeid() ); //提案被接受 if ( oPaxosMsg.rejectbypromiseid() == 0 ) { BallotNumber oBallot ( oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid() ); PLGDebug ( "[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu", oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size() ); //記錄接受提案的node id m_oMsgCounter.AddPromiseOrAccept ( oPaxosMsg.nodeid() ); //記錄接受提案節點返回的提案值 m_oProposerState.AddPreAcceptValue ( oBallot, oPaxosMsg.value() ); } else //提案被拒絕 { PLGDebug ( "[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid() ); //記錄拒絕提案的node id m_oMsgCounter.AddReject ( oPaxosMsg.nodeid() ); m_bWasRejectBySomeone = true; //記錄拒絕提案節點返回的Proposal ID,以便下次基於此信息重新發起提案 m_oProposerState.SetOtherProposalID ( oPaxosMsg.rejectbypromiseid() ); } //超過半數接受該提案,提案通過 if ( m_oMsgCounter.IsPassedOnThisRound() ) { int iUseTimeMs = m_oTimeStat.Point(); BP->GetProposerBP()->PreparePass ( iUseTimeMs ); PLGImp ( "[Pass] start accept, usetime %dms", iUseTimeMs ); //注意:這裡做了一個優化,一旦Prepare階段的提案被通過後,就自動跳過Prepare階段,以減少網路傳輸落盤次數 m_bCanSkipPrepare = true; Accept(); } //超過半數拒絕該提案,或提案已全部響應,但並未超過半數通過(如4個節點,兩個接受、兩個拒絕) else if ( m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound() ) { BP->GetProposerBP()->PrepareNotPass(); PLGImp ( "[Not Pass] wait 30ms and restart prepare" ); //設置定時器,並在10-40ms之後重新發起提案 AddPrepareTimer ( OtherUtils::FastRand() % 30 + 10 ); } PLGHead ( "END" ); } //Accept階段 void Proposer :: Accept() { PLGHead ( "START ProposalID %lu ValueSize %zu ValueLen %zu", m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size() ); BP->GetProposerBP()->Accept(); m_oTimeStat.Point(); //並發控制,標記當前已進入Accept階段 -- 無鎖化 ExitPrepare(); m_bIsAccepting = true; PaxosMsg oPaxosMsg; oPaxosMsg.set_msgtype ( MsgType_PaxosAccept ); oPaxosMsg.set_instanceid ( GetInstanceID() ); oPaxosMsg.set_nodeid ( m_poConfig->GetMyNodeID() ); oPaxosMsg.set_proposalid ( m_oProposerState.GetProposalID() ); oPaxosMsg.set_value ( m_oProposerState.GetValue() ); oPaxosMsg.set_lastchecksum ( GetLastChecksum() ); m_oMsgCounter.StartNewRound(); //添加定時器、用於Accept超時後重新進入Prepare階段 AddAcceptTimer(); PLGHead ( "END" ); //發送消息到其他節點 BroadcastMessage ( oPaxosMsg, BroadcastMessage_Type_RunSelf_Final ); } //Accept階段響應 void Proposer :: OnAcceptReply ( const PaxosMsg & oPaxosMsg ) { PLGHead ( "START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu", oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(), oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid() ); BP->GetProposerBP()->OnAcceptReply(); //當前已不在Accept階段 //1. 某個節點響應過慢,提案已完成 //2. 多個節點響應過慢,提案已重新進入Prepare階段階段 //3. 整個提案響應過慢,提案已終止 if ( !m_bIsAccepting ) { //PLGErr("Not proposing, skip this msg"); BP->GetProposerBP()->OnAcceptReplyButNotAccepting(); return; } //提案編號不一致,跳過不處理 // proposal id不一致表明一定不是同一個instance id if ( oPaxosMsg.proposalid() != m_oProposerState.GetProposalID() ) { //PLGErr("ProposalID not same, skip this msg"); BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg(); return; } //記錄已收到node id節點的消息 m_oMsgCounter.AddReceive ( oPaxosMsg.nodeid() ); //提案被接收 if ( oPaxosMsg.rejectbypromiseid() == 0 ) { PLGDebug ( "[Accept]" ); //記錄接收提案的節點編號 m_oMsgCounter.AddPromiseOrAccept ( oPaxosMsg.nodeid() ); } else //提案被拒絕 { PLGDebug ( "[Reject]" ); //記錄拒絕提案的節點編號 m_oMsgCounter.AddReject ( oPaxosMsg.nodeid() ); //必須重新進入prepare階段 m_bWasRejectBySomeone = true; //拒絕該提案節點所附的提案編號 m_oProposerState.SetOtherProposalID ( oPaxosMsg.rejectbypromiseid() ); } //提案通過 if ( m_oMsgCounter.IsPassedOnThisRound() ) { int iUseTimeMs = m_oTimeStat.Point(); BP->GetProposerBP()->AcceptPass ( iUseTimeMs ); PLGImp ( "[Pass] Start send learn, usetime %dms", iUseTimeMs ); //退出accept階段 ExitAccept(); //通知所有節點的learner,提案已Accept(但並不保證chosen),由learner判定 m_poLearner->ProposerSendSuccess ( GetInstanceID(), m_oProposerState.GetProposalID() ); } //提案未通過 else if ( m_oMsgCounter.IsRejectedOnThisRound() || m_oMsgCounter.IsAllReceiveOnThisRound() ) { BP->GetProposerBP()->AcceptNotPass(); PLGImp ( "[Not pass] wait 30ms and Restart prepare" ); //添加定時器,重新進入Prepare階段 AddAcceptTimer ( OtherUtils::FastRand() % 30 + 10 ); } PLGHead ( "END" ); }
Acceptor
// int Acceptor :: OnPrepare ( const PaxosMsg & oPaxosMsg ) { PLGHead ( "START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu", oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid() ); BP->GetAcceptorBP()->OnPrepare(); PaxosMsg oReplyPaxosMsg; oReplyPaxosMsg.set_instanceid ( GetInstanceID() ); oReplyPaxosMsg.set_nodeid ( m_poConfig->GetMyNodeID() ); oReplyPaxosMsg.set_proposalid ( oPaxosMsg.proposalid() ); oReplyPaxosMsg.set_msgtype ( MsgType_PaxosPrepareReply ); BallotNumber oBallot ( oPaxosMsg.proposalid(), oPaxosMsg.nodeid() ); //新提案編號 >= 當前已接受的提案編號;接受此提案 if ( oBallot >= m_oAcceptorState.GetPromiseBallot() ) { PLGDebug ( "[Promise] State.PromiseID %lu State.PromiseNodeID %lu " "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID, m_oAcceptorState.GetAcceptedBallot().m_llProposalID, m_oAcceptorState.GetAcceptedBallot().m_llNodeID ); oReplyPaxosMsg.set_preacceptid ( m_oAcceptorState.GetAcceptedBallot().m_llProposalID ); oReplyPaxosMsg.set_preacceptnodeid ( m_oAcceptorState.GetAcceptedBallot().m_llNodeID ); //返回當前已接受的提案值 if ( m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0 ) { oReplyPaxosMsg.set_value ( m_oAcceptorState.GetAcceptedValue() ); } m_oAcceptorState.SetPromiseBallot ( oBallot ); int ret = m_oAcceptorState.Persist ( GetInstanceID(), GetLastChecksum() ); if ( ret != 0 ) { BP->GetAcceptorBP()->OnPreparePersistFail(); PLGErr ( "Persist fail, Now.InstanceID %lu ret %d", GetInstanceID(), ret ); return -1; } BP->GetAcceptorBP()->OnPreparePass(); } else //已有更新的提案,拒絕此提案 { BP->GetAcceptorBP()->OnPrepareReject(); PLGDebug ( "[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID ); oReplyPaxosMsg.set_rejectbypromiseid ( m_oAcceptorState.GetPromiseBallot().m_llProposalID ); } nodeid_t iReplyNodeID = oPaxosMsg.nodeid(); PLGHead ( "END Now.InstanceID %lu ReplyNodeID %lu", GetInstanceID(), oPaxosMsg.nodeid() );; SendMessage ( iReplyNodeID, oReplyPaxosMsg ); return 0; } // void Acceptor :: OnAccept ( const PaxosMsg & oPaxosMsg ) { PLGHead ( "START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu", oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size() ); BP->GetAcceptorBP()->OnAccept(); PaxosMsg oReplyPaxosMsg; oReplyPaxosMsg.set_instanceid ( GetInstanceID() ); oReplyPaxosMsg.set_nodeid ( m_poConfig->GetMyNodeID() ); oReplyPaxosMsg.set_proposalid ( oPaxosMsg.proposalid() ); oReplyPaxosMsg.set_msgtype ( MsgType_PaxosAcceptReply ); BallotNumber oBallot ( oPaxosMsg.proposalid(), oPaxosMsg.nodeid() ); //新提案編號 >= 當前已接受的提案編號;接受此提案 if ( oBallot >= m_oAcceptorState.GetPromiseBallot() ) { PLGDebug ( "[Promise] State.PromiseID %lu State.PromiseNodeID %lu " "State.PreAcceptedID %lu State.PreAcceptedNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID, m_oAcceptorState.GetAcceptedBallot().m_llProposalID, m_oAcceptorState.GetAcceptedBallot().m_llNodeID ); m_oAcceptorState.SetPromiseBallot ( oBallot ); m_oAcceptorState.SetAcceptedBallot ( oBallot ); m_oAcceptorState.SetAcceptedValue ( oPaxosMsg.value() ); int ret = m_oAcceptorState.Persist ( GetInstanceID(), GetLastChecksum() ); if ( ret != 0 ) { BP->GetAcceptorBP()->OnAcceptPersistFail(); PLGErr ( "Persist fail, Now.InstanceID %lu ret %d", GetInstanceID(), ret ); return; } BP->GetAcceptorBP()->OnAcceptPass(); } else //已有更新的提案,拒絕此提案 { BP->GetAcceptorBP()->OnAcceptReject(); PLGDebug ( "[Reject] State.PromiseID %lu State.PromiseNodeID %lu", m_oAcceptorState.GetPromiseBallot().m_llProposalID, m_oAcceptorState.GetPromiseBallot().m_llNodeID ); oReplyPaxosMsg.set_rejectbypromiseid ( m_oAcceptorState.GetPromiseBallot().m_llProposalID ); } nodeid_t iReplyNodeID = oPaxosMsg.nodeid(); PLGHead ( "END Now.InstanceID %lu ReplyNodeID %lu", GetInstanceID(), oPaxosMsg.nodeid() ); SendMessage ( iReplyNodeID, oReplyPaxosMsg ); }
Learner
Learner定時發送當前的Instance Id,嘗試習得自該Instance Id後的值,處理邏輯如下:
void Learner :: OnAskforLearn ( const PaxosMsg & oPaxosMsg ) { BP->GetLearnerBP()->OnAskforLearn(); PLGHead ( "START Msg.InstanceID %lu Now.InstanceID %lu Msg.from_nodeid %lu MinChosenInstanceID %lu", oPaxosMsg.instanceid(), GetInstanceID(), oPaxosMsg.nodeid(), m_poCheckpointMgr->GetMinChosenInstanceID() ); SetSeenInstanceID ( oPaxosMsg.instanceid(), oPaxosMsg.nodeid() ); //發現一個新的Follower if ( oPaxosMsg.proposalnodeid() == m_poConfig->GetMyNodeID() ) { //Found a node follow me. PLImp ( "Found a node %lu follow me.", oPaxosMsg.nodeid() ); m_poConfig->AddFollowerNode ( oPaxosMsg.nodeid() ); } //需要習得的Instance Id比本節點的更新,無法從本節點習得,直接返回 if ( oPaxosMsg.instanceid() >= GetInstanceID() ) { return; } //需要習得的Instance Id尚未被歸檔(Checkpoint),可以從本節點習得 if ( oPaxosMsg.instanceid() >= m_poCheckpointMgr->GetMinChosenInstanceID() ) { //向Learner Sender發送Prepare請求,交由Learner Sender向發起者發送習得值 if ( !m_oLearnerSender.Prepare ( oPaxosMsg.instanceid(), oPaxosMsg.nodeid() ) ) { BP->GetLearnerBP()->OnAskforLearnGetLockFail(); PLGErr ( "LearnerSender working for others." ); if ( oPaxosMsg.instanceid() == ( GetInstanceID() - 1 ) ) { PLGImp ( "InstanceID only difference one, just send this value to other." ); //send one value AcceptorStateData oState; int ret = m_oPaxosLog.ReadState ( m_poConfig->GetMyGroupIdx(), oPaxosMsg.instanceid(), oState ); if ( ret == 0 ) { BallotNumber oBallot ( oState.acceptedid(), oState.acceptednodeid() ); SendLearnValue ( oPaxosMsg.nodeid(), oPaxosMsg.instanceid(), oBallot, oState.acceptedvalue(), 0, false ); } } return; } } //已經歸檔,發送本節點當前的Instance Id信息,交由Learner的發起者決定是否決定是否做做歸檔數據對齊 SendNowInstanceID ( oPaxosMsg.instanceid(), oPaxosMsg.nodeid() ); }
歸檔機制(Checkpoint)
關於Checkpoint,引用《狀態機Checkpoint詳解》中的前兩段:
狀態機是通過由PhxPaxos選出的有序Chosen value(PaxosLog)系列進行狀態轉移得到的,而這些狀態本質上也是一堆數據,但是這堆數據是由開發者自己進行維護的,我們不能要求開發者在這份數據上時刻做到極其嚴格的正確性, 那麼這份數據很有可能因為機器的一些異常或者重啟導致丟失,從而影響了數據的正確性。
所以我們的做法是只相信自己的數據,於是乎每次啟動(RunNode)的時候都會從0開始將所有的PaxosLog重新輸入狀態機(重演),從而保證狀態機數據的完整性。但這個做法缺陷是非常明顯的,第一:PaxosLog是無限增長的,每次都重新遍歷,重啟效率必然很低。第二:由於每次都要遍歷,那麼這些數據無法刪除,從而要求無限的磁碟空間,這是無法做到的。Checkpoint的出現用於解決這個問題。我們不能要求開發者做到時刻的狀態機數據正確,但要求開發者生成一個狀態機的Checkpoint的時候,這份數據必須是正確的完整的。一個Checkpoint代表著一份某一時刻被固化下來的狀態機數據,它通過sm.h下的StateMachine::GetCheckpointInstanceID()函數反饋它的精確時刻,於是每次啟動的重演,我們只需要從這個時刻所指向PaxosLog位置開始而不是從0開始。
推薦閱讀:
TAG:分散式資料庫 |