PhxPaxos sample phxecho源碼分析

前言

最近在學習分散式系統一致性演算法paxos,光看論文和文檔覺得很是不過癮,於是找到了目前很流行的一個paxos開源實現:phxpaxos。phxpaxos這是Tencent開源的multipaxos的庫,目標是打造一個高性能、高可靠的paxos庫。好奇心驅使我不得不對其一探究竟,從這篇文章開始,我將庖丁解牛,一步步揭開phxpaxos的神秘面紗,同時也加深自己對paxos這個協議的理解。

phxpaxos作為一個標準的paxos協議的實現庫,其目標是為上層的各種應用提供介面,上層應用使用該庫後即可實現應用狀態複製,保證數據可靠不丟失。因此,為了能快速地理解庫的實現,我們必須來看看上層應用如何使用phxpaxos。幸好,phxpaxos的作者們在源代碼中提供了一些應用示例,我們今天的主要目標就是分析其中最簡單的phxecho,黑盒了解phxpaxos的大致流程。

phxecho介紹

phxecho是基於phxpaxos的簡單echo應用,該應用很簡單:集群中存在多個節點(服務進程),用戶從任意節點的終端輸入任意字元串,只要集群中多數節點保持存活狀態,那用戶輸入的字元串必然會被整個集群所感知,然後終端會給用戶返回原始的輸入作為響應,這便是echo了。

phxecho使用

phxecho的使用也非常簡單,在最簡單模式下,我們使用一個伺服器,在上面啟動三個使用不同埠號的進程來模擬三個節點:

# 第一個參數是當前節點的地址信息;後續參數表示集群所有節點的地址信息./phxecho 127.0.0.1:11111 127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113./phxecho 127.0.0.1:11112 127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113./phxecho 127.0.0.1:11113 127.0.0.1:11111,127.0.0.1:11112,127.0.0.1:11113

我隨意選擇其中某個進程,在其終端輸入」hello, world」,結果如下:

please input: <echo req value>hello,world[SM Execute] ok, smid 1 instanceid 11 value hello,worldecho resp value hello,world

發現執行後為我輸入的命令生成了instanceid:11,同時給我echo了原始輸入。我們查看另外一個進程的輸出:

[SM Execute] ok, smid 1 instanceid 11 value hello,world

發現其instanceid=11也被值「hello,world」所佔據,集群處於一致狀態。

phxecho入口

phxecho的主函數位於文件sample/phxecho/main.cpp,其主要流程非常簡單:

int main(int argc, char ** argv){ // 參數解析 NodeInfo oMyNode; if (parse_ipport(argv[1], oMyNode) != 0) { ... } NodeInfoList vecNodeInfoList; if (parse_ipport_list(argv[2], vecNodeInfoList) != 0) { ... } // step 1: RunPaxos PhxEchoServer oEchoServer(oMyNode, vecNodeInfoList); int ret = oEchoServer.RunPaxos(); ... string sEchoReqValue; while (true) { getline(cin, sEchoReqValue); // step 2: Echo ret = oEchoServer.Echo(sEchoReqValue, sEchoRespValue); ... }}

main.cpp中主要是實例化了PhxEchoServer並調用其兩個主要方法 RunPaxos與Echo,這些是關鍵流程。

class PhxEchoServer

class PhxEchoServer{public: ...private: phxpaxos::NodeInfo m_oMyNode; phxpaxos::NodeInfoList m_vecNodeList; // 這個抽象了paxos協議中的節點 phxpaxos::Node * m_poPaxosNode; // 抽象了本節點狀態機 PhxEchoSM m_oEchoSM;};

我們將會看到,phxecho中的所有關鍵流程都是圍繞上面的兩個關鍵成員變數而展開的。

PhxEchoServer::RunPaxos

int PhxEchoServer :: RunPaxos(){ Options oOptions; int ret = MakeLogStoragePath(oOptions.sLogStoragePath); if (ret != 0) { return ret; } // 填充系統配置 oOptions.iGroupCount = 1; oOptions.oMyNode = m_oMyNode; oOptions.vecNodeInfoList = m_vecNodeList; GroupSMInfo oSMInfo; oSMInfo.iGroupIdx = 0; // one paxos group can have multi state machine. oSMInfo.vecSMList.push_back(&m_oEchoSM); oOptions.vecGroupSMInfoList.push_back(oSMInfo); // 初始化節點環境,包括網路通信、io線程、狀態機等 ret = Node::RunNode(oOptions, m_poPaxosNode); if (ret != 0) { ... } printf("run paxos ok
"); return 0;}

RunPaxos方法很是簡單,主要是根據應用的需求設置系統配置,然後調用phxpaxos庫提供的RunNode,這裡面其實做了大量的工作,這也是我們下面一片文章描述的重點,現在我們不用管裡面到底幹了什麼,只需要知道這個函數執行完成後,所有準備工作都搞定了,接下來就可以處理用戶請求了。

PhxEchoServer::Echo

int PhxEchoServer :: Echo(const std::string & sEchoReqValue, std::string & sEchoRespValue){ SMCtx oCtx; PhxEchoSMCtx oEchoSMCtx; //smid must same to PhxEchoSM.SMID(). oCtx.m_iSMID = 1; oCtx.m_pCtx = (void *)&oEchoSMCtx; uint64_t llInstanceID = 0; int ret = m_poPaxosNode->Propose(0, sEchoReqValue, llInstanceID, &oCtx); if (ret != 0) { ... } if (oEchoSMCtx.iExecuteRet != 0) { ... } sEchoRespValue = oEchoSMCtx.sEchoRespValue.c_str(); return 0;}

這個就更簡單了,對於用戶的請求,直接調用m_poPaxosNode->Propose交由phxpaxos庫來處理即可。我們在後面也會來分析一個簡單的Propose內到底做了些什麼。

參考

phxpaxos源碼閱讀之一:走馬觀花?

chenneal.github.io


推薦閱讀:

Paxos協議
Paxos Made Simple(粗略翻譯,略渣)
簡談zookeeper的選主過程
如何淺顯易懂地解說 Paxos 的演算法?
Paxos演算法詳解

TAG:Paxos | 分散式一致性 | 分散式存儲 |