TiKV 源碼解析系列——如何使用 Raft

本系列文章主要面向 TiKV 社區開發者,重點介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀之後,能對 TiKV 項目有一個初步了解,更好的參與進入 TiKV 的開發中。

需要注意,TiKV 使用 Rust 語言編寫,用戶需要對 Rust 語言有一個大概的了解。另外,本系列文章並不會涉及到 TiKV 中心控制服務 Placement Driver(PD) 的詳細介紹,但是會說明一些重要流程 TiKV 是如何與 PD 交互的。

TiKV 是一個分散式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分散式事務的支持。

概述

本文檔主要面向 TiKV 社區開發者,主要介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀文檔之後,能對 TiKV 項目有一個初步了解,更好的參與進入 TiKV 的開發中。

需要注意,TiKV 使用 Rust 語言編寫,用戶需要對 Rust 語言有一個大概的了解。另外,本文檔並不會涉及到 TiKV 中心控制服務 Placement Driver(PD) 的詳細介紹,但是會說明一些重要流程 TiKV 是如何與 PD 交互的。

TiKV 是一個分散式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分散式事務的支持。

架構

TiKV 的整體架構比較簡單,如下:

Placement Driver : Placement Driver (PD) 負責整個集群的管理調度。

Node : Node 可以認為是一個實際的物理機器,每個 Node 負責一個或者多個 Store。

Store : Store 使用 RocksDB 進行實際的數據存儲,通常一個 Store 對應一塊硬碟。

Region : Region 是數據移動的最小單元,對應的是 Store 裡面一塊實際的數據區間。每個 Region 會有多個副本(replica),每個副本位於不同的 Store ,而這些副本組成了一個 Raft group。

Raft

TiKV 使用 Raft 演算法實現了分散式環境下面數據的強一致性,關於 Raft,可以參考論文 「In Search of an Understandable Consensus Algorithm」 以及官網,這裡不做詳細的解釋。簡單理解,Raft 是一個 replication log + State Machine 的模型,我們只能通過 leader 進行寫入,leader 會將 command 通過 log 的形式複製到 followers,當集群的大多數節點都收到了這個 log,我們就認為這個 log 是 committed,可以 apply 到 State Machine 裡面。

TiKV 的 Raft 主要移植 etcd Raft,支持 Raft 所有功能,包括:

  • Leader election

  • Log replicationLog compaction

  • Membership changesLeader transfer

  • Linearizable / Lease read

這裡需要注意,TiKV 以及 etcd 對於 membership change 的處理,跟 Raft 論文是稍微有一點不一樣的,主要在於 TiKV 的 membership change 只有在 log applied 的時候生效,這樣主要的目的是為了實現簡單,但有一個風險在於如果我們只有兩個節點,要從裡面移掉一個節點,如果一個 follower 還沒收到 ConfChange 的 log entry,leader 就當掉並且不可恢復了,整個集群就沒法工作了。所以通常我們都建議用戶部署 3 個或者更多個奇數個節點。

Raft 庫是一個獨立的庫,用戶也可以非常方便的將其直接嵌入到自己的應用程序,而僅僅只需要自行處理存儲以及消息的發送。這裡簡單介紹一下如何使用 Raft,代碼在 TiKV 源碼目錄的 /src/raft 下面。

Storage

首先,我們需要定義自己的 Storage,Storage 主要用來存儲 Raft 相關數據,trait 定義如下:

pub trait Storage {n fn initial_state(&self) -> Result<RaftState>;n fn entries(&self, low: u64, high: u64, max_size: u64) -> Result<Vec<Entry>>;n fn term(&self, idx: u64) -> Result<u64>;n fn first_index(&self) -> Result<u64>;n fn last_index(&self) -> Result<u64>;n fn snapshot(&self) -> Result<Snapshot>;n}n

我們需要實現自己的 Storage trait,這裡詳細解釋一下各個介面的含義:

initial_state:初始化 Raft Storage 的時候調用,它會返回一個 RaftState,RaftState 的定義如下:

pub struct RaftState {n pub hard_state: HardState,n pub conf_state: ConfState,n}n

HardState 和 ConfState 是 protobuf,定義:

message HardState {n optional uint64 term = 1; n optional uint64 vote = 2; n optional uint64 commit = 3; n}nnmessage ConfState {n repeated uint64 nodes = 1;n}n

在 HardState 裡面,保存著該 Raft 節點最後一次保存的 term 信息,之前 vote 的哪一個節點,以及已經 commit 的 log index。而 ConfState 則是保存著 Raft 集群所有的節點 ID 信息。

在外面調用 Raft 相關邏輯的時候,用戶需要自己處理 RaftState 的持久化。

entries: 得到 [low, high) 區間的 Raft log entry,通過 max_size 來控制最多返回多少個 entires。

term,first_index 和 last_index 分別是得到當前的 term,以及最小和最後的 log index。

snapshot:得到當前的 Storage 的一個 snapshot,有時候,當前的 Storage 數據量已經比較大,生成 snapshot 會比較耗時,所以我們可能得在另一個線程非同步去生成,而不用阻塞當前 Raft 線程,這時候,可以返回 SnapshotTemporarilyUnavailable 錯誤,這時候,Raft 就知道正在準備 snapshot,會一段時間之後再次嘗試。

需要注意,上面的 Storage 介面只是 Raft 庫需要的,實際我們還會用這個 Storage 存儲 raft log 等數據,所以還需要單獨提供其他的介面。在 Raft storage.rs 裡面,我們提供了一個 MemStorage,用於測試,大家也可以參考 MemStorage 來實現自己的 Storage。

Config

在使用 Raft 之前,我們需要知道 Raft 一些相關的配置,在 Config 裡面定義,這裡只列出需要注意的:

pub struct Config {n pub id: u64,n pub election_tick: usize,n pub heartbeat_tick: usize,n pub applied: u64,n pub max_size_per_msg: u64,n pub max_inflight_msgs: usize,n}n

id: Raft 節點的唯一標識,在一個 Raft 集群裡面,id 是不可能重複的。在 TiKV 裡面,id 的通過 PD 來保證全局唯一。

election_tick:當 follower 在 election_tick 的時間之後還沒有收到 leader 發過來的消息,那麼就會重新開始選舉,TiKV 默認使用 50。

heartbeat_tick: leader 每隔 hearbeat_tick 的時間,都會給 follower 發送心跳消息。默認 10。

applied: applied 是上一次已經被 applied 的 log index。

max_size_per_msg: 限制每次發送的最大 message size。默認 1MB。

max_inflight_msgs: 限制複製時候最大的 in-flight 的 message 的數量。默認 256。

這裡詳細解釋一下 tick 的含義,TiKV 的 Raft 是定時驅動的,假設我們每隔 100ms 調用一次 Raft tick,那麼當調用到 headtbeat_tick 的 tick 次數之後,leader 就會給 follower 發送心跳。

RawNode

我們通過 RawNode 來使用 Raft,RawNode 的構造函數如下:

pub fn new(config: &Config, store: T, peers: &[Peer]) -> Result<RawNode<T>>n

我們需要定義 Raft 的 Config,然後傳入一個實現好的 Storage,peers 這個參數只是用於測試,實際要傳空。生成好 RawNode 對象之後,我們就可以使用 Raft 了。我們關注如下幾個函數:

tick: 我們使用 tick 函數定期驅動 Raft,在 TiKV,我們每隔 100ms 調用一次 tick。

propose: leader 通過 propose 命令將 client 發過來的 command 寫入到 raft log,並複製給其他節點。

propose_conf_change: 跟 propose 類似,只是單獨用來處理 ConfChange 命令。

step: 當節點收到其他節點發過來的 message,主動調用驅動 Raft。

has_ready: 用來判斷一個節點是不是 ready 了。

ready: 得到當前節點的 ready 狀態,我們會在之前用 has_ready 來判斷一個 RawNode 是否 ready。

apply_conf_change: 當一個 ConfChange 的 log 被成功 applied,需要主動調用這個驅動 Raft。

advance: 告訴 Raft 已經處理完 ready,開始後續的迭代。

對於 RawNode,我們這裡重點關注下 ready 的概念,ready 的定義如下:

pub struct Ready {n pub ss: Option<SoftState>,n pub hs: Option<HardState>,n pub entries: Vec<Entry>,n pub snapshot: Snapshot,n pub committed_entries: Vec<Entry>,n pub messages: Vec<Message>,n}n

ss: 如果 SoftState 變更,譬如添加,刪除節點,ss 就不會為空。

hs: 如果 HardState 有變更,譬如重新 vote,term 增加,hs 就不會為空。

entries: 需要在 messages 發送之前存儲到 Storage。

snapshot: 如果 snapshot 不是 empty,則需要存儲到 Storage。

committed_entries: 已經被 committed 的 raft log,可以 apply 到 State Machine 了。

messages: 給其他節點發送的消息,通常需要在 entries 保存成功之後才能發送,但對於 leader 來說,可以先發送 messages,在進行 entries 的保存,這個是 Raft 論文裡面提到的一個優化方式,TiKV 也採用了。

當外部發現一個 RawNode 已經 ready 之後,得到 Ready,處理如下:

  1. 持久化非空的 ss 以及 hs。

  2. 如果是 leader,首先發送 messages。

  3. 如果 snapshot 不為空,保存 snapshot 到 Storage,同時將 snapshot 裡面的數據非同步應用到 State Machine(這裡雖然也可以同步 apply,但 snapshot 通常比較大,同步會 block 線程)。

  4. 將 entries 保存到 Storage 裡面。

  5. 如果是 follower,發送 messages。

  6. 將 committed_entries apply 到 State Machine。

  7. 調用 advance 告知 Raft 已經處理完 ready。

(未完待續…)

推薦閱讀:

MongoDB的水平擴展,你做對了嗎?
解析 TiDB 在線數據同步工具 Syncer
太閣技術秀:一起聊聊cassandra
PingCAP佈道Percona Live 2017 展示TiDB強悍性能

TAG:分布式数据库 |