ZAB協議在Zookeeper中的實現

說明

nn

ZAB 協議是為分散式協調服務ZooKeeper專門設計的一種支持崩潰恢復的一致性協議。基於該協議,ZooKeeper 實現了一種主從模式的系統架構來保持集群中各個副本之間的數據一致性。

之前看了下ZAB協議的論文,也搞清楚了一些關鍵問題的解決思路,但是沒看代碼總有一種霧裡看花水中望月之感,本著不作死就不會死的精神,大概翻閱了Zookeeper的ZAB模塊實現,特總結與大家分享。

nn

我會從「通信協議」、「核心數據結構以及API」兩方面主要描述Zookeeper中ZAB協議的具體實現,重點關注ZAB協議實現中抽象的對象以及對象之間的關聯。弱化請求處理流程,因為這些在描述ZAB協議中重點描述。

nnnn

通信協議

nn

ZAB協議中節點之間通信主要發生在Leader和Follower之間。他們之間主要的命令分為控制類和數據傳輸類。

nn

其中控制類命令主要包含以下命令:

nn

  • PING: Leader發送給Follower,確定雙方依然存活且網路連通正常;
  • COMMIT:Leader發送給Follower,通知其提交某個特定的zxid的日誌。

nn

而數據傳輸類則主要包含:

nn

  • PROPOSAL: Leader通過該命令向Follower發送自己的日誌數據

nnnn

有一個不太明白的是SYNC命令,好像在Leader和Follower之間正常的數據交互中不會有該命令的身影。

nnnn

數據結構

nn

LearnerHandler

nn

Leader抽象出來的與Follower節點進行信息交互的對象。同時,該對象維護了Leader與Follower之間通信狀態(如是否出現網路不通情況)。

nn

Follower節點啟動後,會主動連接Leader,而Leader會監聽Follower的建立連接的請求。並為每個Follower的tcp連接創建一個LearnerHandler對象,該對象會:

nn

  • 接收Follower發來的請求,可能包括以下請求:Follower轉發的客戶端的更新請求(命令類型:Leader.REQUEST),Follower對Leader的Proposal命令的回復消息ACK,Follower給Leader發送的PING(Follower會給Leader發送PING消息?);
  • 給Follower發送心跳消息。

nnnn

public class LearnerHandler extends ZooKeeperThread {n final Leader leader;nn protected long sid = 0;nn final LinkedBlockingQueue<QuorumPacket> queuedPackets =n new LinkedBlockingQueue<QuorumPacket>();nn private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();nn private BinaryInputArchive ia;n private BinaryOutputArchive oa;n private BufferedOutputStream bufferedOutput;n private volatile boolean sendingThreadStarted = false;nn private boolean needOpPacket = true; nn private long leaderLastZxid;n ......n}n

nn

Leader

nn

Leader抽象了集群當前的主節點,此類節點負責:

nn

  • 處理所有客戶端的更新請求,並將這些請求使用ZAB協議以日誌同步方式廣播至所有的Follower;
  • 通過心跳信息維護集群狀態,在必要時會結束自己的Leader狀態,觸發新的選主

nnnn

public class Leader {n ......n final LeaderZooKeeperServer zk;n final QuorumPeer self;n ......nn // 所有Follower信息n private final HashSet<LearnerHandler> learners =n new HashSet<LearnerHandler>();nn ......nn // Leader主函數n void lead() throws IOException, InterruptedException {n ......n }n}n

nn

Leader對象對外提供一些重要API:

nn

  • propose(): 將客戶端的請求Request包裝成QuorumPacket並發往Followers,sendPacket()會將該QuorumPacket發往每個Follower的消息隊列;
  • processAck():Leader收到Follower對Proposal消息的確認後調用該方法,該方法裡面會判斷某個消息是否被多數Follower確認,如果是,那麼會提交該消息。在LearnerHandler內收到Follower的Ack消息時會觸發函數processAck。

nn

Follower

nn

Follower抽象了集群的從節點,從節點負責:

nn

  • 接受Leader命令,同步Leader節點日誌,並將其應用到自身的狀態機
  • 維護與Leader的心跳,並在Leader節點異常時會發起一次選主

nn

public class Follower extends Learner{nn private long lastQueued;n final FollowerZooKeeperServer fzk;nn // 從節點主函數n void followLeader() throws InterruptedException {n ......n }n}n

nn

Follower中最主要的功能是接受並處理Leader發過來的命令,Leader和Follower之間的命令類型見「通信協議」,每種命令的處理方法如下:

nn

  • PROPOSAL: Follower將其記錄日誌即可,調用方法FollowerZooKeeperServer::logRequest(),然後給Leader返回ACK;
  • COMMIT:Follower將請求的zxid進行提交,所謂的提交其實就是將該命令應用到狀態機中。

nn

QuorumPeer

nn

QuorumPeer負責維護節點的選主狀態信息,無論是Leader還是Follower,都需要記錄這些信息。比如,當前節點的狀態,當前節點選擇了誰作為主,等等等等。

nn

其實,每個節點啟動時都是運行在QuorumPeer的主循環之內,在循環內進行選主過程,完成選主後,根據選主結果決定本節點角色(Leader/Follower)。接下來就進入Leader/Follower的處理邏輯,直到該由於種種異常節點需要重新發起選主,便再一次進入QuorumPeer的主循環了。

nnnn

public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {n // 定義節點三種狀態n public enum ServerState {n LOOKING, FOLLOWING, LEADING, OBSERVING;n }n // 當前節點idn private long myid;nn // 記錄當前投票信息n volatile private Vote currentVote;nn // 記錄當前節點狀態,默認是LOOKINGn private ServerState state = ServerState.LOOKING;nn // 節點啟動入口n public synchronized void start() {n ......n }nn // 節點運行主循環n @Overriden public void run() {n ......n while (running) {n switch (getPeerState()) {n // 該狀態下開始選主n case LOOKING:n try {n reconfigFlagClear();n if (shuttingDownLE) {n shuttingDownLE = false;n startLeaderElection();n }n // 開始選主咯n setCurrentVote(makeLEStrategy().lookForLeader());n } catch (Exception e) {n setPeerState(ServerState.LOOKING);n } n break;n case OBSERVING:n // 觀察者角色,忽略n break;n case FOLLOWING:n // 變成Follower,進入followLeader()n try {n setFollower(makeFollower(logFactory));n follower.followLeader();n } catch (Exception e) {n LOG.warn("Unexpected exception",e);n } finally {n follower.shutdown();n setFollower(null);n updateServerState();n }n break;n case LEADING:n // 變成Leader,進入lead()n try {n setLeader(makeLeader(logFactory));n leader.lead();n setLeader(null);n } catch (Exception e) {n LOG.warn("Unexpected exception",e);n } finally {n if (leader != null) {n leader.shutdown("Forcing shutdown");n setLeader(null);n }n updateServerState();n }n break;n }n start_fle = Time.currentElapsedTime();n ......n }n }n}n

推薦閱讀:

聊聊一致性哈希
分散式資料庫中為什麼要使用 Vector Clock?
最佳日誌實踐(v2.0)
有什麼關於pregel的論文或者演算法值得一讀?
分散式系統工程如何搭建,有沒有一套完整的系統的實踐方法論,比如,存儲服務?

TAG:分布式系统 | 分布式一致性 |