TiKV 源碼解析系列——multi-raft 設計與實現

本系列文章主要面向 TiKV 社區開發者,重點介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀之後,能對 TiKV 項目有一個初步了解,更好的參與進入 TiKV 的開發中。

需要注意,TiKV 使用 Rust 語言編寫,用戶需要對 Rust 語言有一個大概的了解。另外,本系列文章並不會涉及到 TiKV 中心控制服務 Placement Driver(PD) 的詳細介紹,但是會說明一些重要流程 TiKV 是如何與 PD 交互的。

TiKV 是一個分散式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分散式事務的支持。

本文為本系列文章第二節。

Placement Driver

在繼續之前,我們先簡單介紹一下 Placement Driver(PD)。PD 是 TiKV 的全局中央控制器,存儲整個 TiKV 集群的元數據信息,負責整個 TiKV 集群的調度,全局 ID 的生成,以及全局 TSO 授時等。

PD 是一個非常重要的中心節點,它通過集成 etcd,自動的支持了分散式擴展以及 failover,解決了單點故障問題。關於 PD 的詳細介紹,後續我們會新開一篇文章說明。

在 TiKV 裡面,跟 PD 的交互是放在源碼的 pd 目錄下,現在跟 PD 的交互都是通過自己定義的 RPC 實現,協議非常簡單,在 pd/mod.rs 裡面我們直接提供了用於跟 PD 進行交互的 Client trait,以及實現了 RPC Client。

PD 的 Client trait 非常簡單,多數都是對集群元信息的 set/get 操作,需要額外注意的幾個:

bootstrap_cluster:當我們啟動一個 TiKV 服務的時候,首先需要通過 is_cluster_bootstrapped 來判斷整個 TiKV 集群是否已經初始化,如果還沒有初始化,我們就會在該 TiKV 服務上面創建第一個 region。

region_heartbeat:定期 Region 向 PD 彙報自己的相關信息,供 PD 做後續的調度。譬如,如果一個 Region 給 PD 上報的 peers 的數量小於預設的副本數,那麼 PD 就會給這個 Region 添加一個新的副本 Peer。

store_heartbeat:定期 store 向 PD 彙報自己的相關信息,供 PD 做後續調度。譬如,Store 會告訴 PD 當前的磁碟大小,以及剩餘空間,如果 PD 發現空間不夠了,就不會考慮將其他的 Peer 遷移到這個 Store 上面。

ask_split/report_split:當 Region 發現自己需要 split 的時候,就 ask_split 告訴 PD,PD 會生成新分裂 Region 的 ID ,當 Region 分裂成功之後,會 report_split 通知 PD。

注意,後面我們會讓 PD 支持 gRPC 協議,所以 Client API 到時候可能會有變更。

Raftstore

因為 TiKV 目標是支持 100 TB+ 以上的數據,一個 Raft 集群是鐵定沒法支持這麼多數據的,所以我們需要使用多個 Raft 集群,也就是 Multi Raft。在 TiKV 裡面,Multi Raft 的實現是在 Raftstore 完成的,代碼在 raftstore/store 目錄。

Region

因為我們要支持 Multi Raft,所以我們需要將數據進行分片處理,讓每個 Raft 單獨負責一部分數據。

通常的數據分片演算法就是 Hash 和 Range,TiKV 使用的 Range 來對數據進行數據分片。為什麼使用 Range,主要原因是能更好的將相同前綴的 key 聚合在一起,便於 scan 等操作,這個 Hash 是沒法支持的,當然,在 split/merge 上面 Range 也比 Hash 好處理很多,很多時候只會涉及到元信息的修改,都不用大範圍的挪動數據。

當然,Range 有一個問題在於很有可能某一個 Region 會因為頻繁的操作成為性能熱點,當然也有一些優化的方式,譬如通過 PD 將這些 Region 調度到更好的機器上面,提供 Follower 分擔讀壓力等。

總之,在 TiKV 裡面,我們使用 Range 來對數據進行切分,將其分成一個一個的 Raft Group,每一個 Raft Group,我們使用 Region 來表示。

Region 的 protobuf 協議定義如下:

message Region {n optional uint64 id = 1 [(gogoproto.nullable) = false];n optional bytes start_key = 2;n optional bytes end_key = 3;n optional RegionEpoch region_epoch = 4;n repeated Peer peers = 5;n}nnmessage RegionEpoch {n optional uint64 conf_ver = 1 [(gogoproto.nullable) = false];n optional uint64 version = 2 [(gogoproto.nullable) = false];n}nnmessage Peer { n optional uint64 id = 1 [(gogoproto.nullable) = false]; n optional uint64 store_id = 2 [(gogoproto.nullable) = false];n}n

id:Region 的唯一表示,通過 PD 全局唯一分配。

start_key, end_key:用來表示這個 Region 的範圍 [start_key, end_key),對於最開始的 region,start 和 end key 都是空,TiKV 內部會特殊處理。

region_epoch:當一個 Region 添加或者刪除 Peer,或者 split 等,我們就會認為這個 Region 的 epoch 發生的變化,RegionEpoch 的 conf_ver 會在每次做 ConfChange 的時候遞增,而 version 則是會在每次做 split/merge 的時候遞增。

peers:當前 Region 包含的節點信息。對於一個 Raft Group,我們通常有三個副本,每個副本我們使用 Peer 來表示,Peer 的 id 也是全局由 PD 分配,而 store_id 則表明這個 Peer 在哪一個 Store 上面。

RocksDB / Keys Prefix

對於實際數據存儲,無論是 Raft Meta,Log,還是 State Machine 的 data,我們都存到一個 RocksDB 實例裡面。關於 RocksDB,可以詳細參考 facebook/rocksdb。

我們使用不同的前綴來對 Raft 以及 State Machine 等數據進行區分,具體可以參考 raftstore/store/keys.rs,對於 State Machine 實際的 data 數據,我們統一添加 『z』 前綴。而對於其他會存在本地的元數據(包括 Raft),我們統一添加 0x01 前綴。

這裡簡單說明一下一些重要元數據的 Key 格式,我們忽略最開始的 0x01 前綴。

  • 0x01:用於存放StoreIdent,在初始化這個 Store 的時候,我們會將 Store 的 Cluster ID,Store ID 等信息存儲到這個 key 裡面。

  • 0x02:用來存儲 Raft 一些信息,0x02 之後會緊跟該 Raft Region 的 ID(8位元組大端序 ),然後在緊跟一個 Suffix 來標識不同的子類型:

    • 0x01:用於存放 Raft Log,後面緊跟 Log Index(8位元組大端序)

    • 0x02:用於存放 RaftLocalState

    • 0x03:用於存放 RaftApplyState

  • 0x03:用來存儲 Region 本地的一些元信息,0x03 之後緊跟 Raft Region ID,隨後在緊跟一個 Suffix 來表示不同的子類型:

    • 0x01:用於存放 RegionLocalState

對於上面提到的幾個類型,都在 protobuf 裡面定義:

message RaftLocalState {n optional eraftpb.HardState hard_state = 1;n optional uint64 last_index = 2;n}nnmessage RaftApplyState {n optional uint64 applied_index = 1;n optional RaftTruncatedState truncated_state = 2;n}nnenum PeerState {n Normal = 0;n Applying = 1;n Tombstone = 2;n}nnmessage RegionLocalState {n optional PeerState state = 1;n optional metapb.Region region = 2;n}n

RaftLocalState: 用於存放當前 Raft 的 HardState 以及最後一個 Log index。

RaftApplyState: 用於存放當前 Raft 最後 apply 的 Log index 以及被 truncated 的 Log 信息。

RegionLocalStaste: 用於存放 Region 信息以及在該 Store 上面對應的 Peer 狀態,Normal 表明是一個正常的 Peer,Applying 表明該 Peer 還沒做完 apply snapshot 的操作,而 Tombstone 則表明該 Peer 已經被移除出了 Region,不能在參與到 Raft Group 裡面。

Peer Storage

前面已經知道,我們通過 RawNode 來使用 Raft。因為一個 Region 對應的一個 Raft Group,Region 裡面的 Peer 就對應的是一個 Raft 副本。所以,我們在 Peer 裡面封裝了對 RawNode 的操作。

要使用 Raft,我們需要定義自己的 Storage,這在 raftstore/store/peer_storage.rs 的 PeerStorage 類裡面實現。

當創建 PeerStorage 的時候,首先我們會從 RocksDB 裡面得到該 Peer 之前的 RaftLocalState,RaftApplyState,以及 last_term 等,這些會緩存到內存裡面,便於後續的快速度訪問。

PeerStorage 需要注意幾個地方:

首先就是 RAFT_INIT_LOG_TERM 和 RAFT_INIT_LOG_INDEX,它們的值都是 5(只要大於 1 都可以)。在 TiKV 裡面,一個 Peer 的創建有如下幾種方式:

  1. 主動創建,通常對於第一個 Region 的第一個副本 Peer,我們採用這樣的創建方式,初始化的時候,我們會將它的 Log Term 和 Index 設置為 5。

  2. 被動創建,當一個 Region 添加一個副本 Peer 的時候,當這個 ConfChange 命令被 applied 之後, Leader 會給這個新增 Peer 所在的 Store 發送 Message,Store 收到這個 Message 之後,發現並沒有相應的 Peer 存在,並且確定這個 Message 是合法的,就會創建一個對應的 Peer,但此時這個 Peer 是一個未初始化的 Peer,不知道所在的 Region 任何的信息,我們使用 0 來初始化它的 Log Term 和 Index。Leader 就能知道這個 Follower 並沒有數據(0 到 5 之間存在 Log 缺口),Leader 就會給這個 Follower 直接發送 snapshot。

  3. Split 創建,當一個 Region 分裂成兩個 Region,其中一個 Region 會繼承分裂之前 Region 的元信息,只是會將自己的 Range 範圍修改。而另一個 Region 相關的元信息,則會新建,新建的這個 Region 對應的 Peer,初始的 Log Term 和 Index 也是 5,因為這時候 Leader 和 Follower 都有最新的數據,不需要 snapshot。(注意:實際 Split 的情況非常的複雜,有可能也會出現發送 snapshot 的情況,但這裡不做過多說明)。

然後就是需要注意 snapshot 的處理。無論 generate 還是 apply snapshot,都是一件比較費時的操作,為了不讓 snapshot 的處理卡主整個 Raft 線程,PeerStore 都是會先只同步更新 snapshot 相關的元信息,這樣就不用阻礙後續的 Raft 流程,然後會在另一個線程非同步的進行 snapshot 的操作。PeerStorage 會維護一個 snapshot 的 state,如下:

pub enum SnapState {n Relax,n Generating(Receiver<Snapshot>),n Applying(Arc<AtomicUsize>),n ApplyAborted,n}n

這裡注意 Generating 是一個 channel Receiver,當非同步 snapshot 生成好之後,就會給這個 channel 發送消息,這樣下一次 Raft 檢查的時候,就能直接從這個 channel 得到 snapshot 了。Applying 是一個共享的原子整數,這樣就能多線程去判斷當前 applying 的狀態,包括:

pub const JOB_STATUS_PENDING: usize = 0;npub const JOB_STATUS_RUNNING: usize = 1;npub const JOB_STATUS_CANCELLING: usize = 2;npub const JOB_STATUS_CANCELLED: usize = 3;npub const JOB_STATUS_FINISHED: usize = 4;npub const JOB_STATUS_FAILED: usize = 5;n

譬如,如果狀態是 JOB_STATUS_RUNNING,那麼表明當前正在進行 applying snapshot 的操作。現階段,我們是不允許 FAILED 的,也就是如果 apply snapshot 失敗,我們會 panic。

Peer

Peer 封裝了 Raft RawNode,我們對 Raft 的 Propose,ready 的處理都是在 Peer 裡面完成的。

首先關注 propose 函數,Peer 的 propose 是外部 Client command 的入口。Peer 會判斷這個 command 的類型:

  • 如果是只讀操作,並且 Leader 仍然是在 lease 有效期內,Leader 就能直接提供 local read,不需要走 Raft 流程。

  • 如果是 Transfer Leader 操作,Peer 首先會判斷自己還是不是 Leader,同時判斷需要變成新 Leader 的 Follower 是不是有足夠新的 Log,如果條件都滿足,Peer 就會調用 RawNode 的 transfer_leader 命令。

  • 如果是 Change Peer 操作,Peer 就會調用 RawNode propose_conf_change。

  • 剩下的,Peer 會直接調用 RawNode 的 propose。

在 propose 之前,Peer 也會將這個 command 對應的 callback 存到 PendingCmd 裡面,當對應的 log 被 applied 之後,會通過 command 裡面唯一的 uuid 找到對應的 callback 調用,並給 Client 返回相應的結果。

另一個需要關注的就是 Peer 的 handle_raft_ready 系列函數,在之前 Raft 章節裡面介紹過,當一個 RawNode ready 之後,我們需要對 ready 裡面的數據做一系列處理,包括將 entries 寫入 Storage,發送 messages,apply committed_entries 以及 advance 等。這些全都在 Peer 的 handle_raft_ready 系列函數裡面完成。

對於 committed_entries 的處理,Peer 會解析實際的 command,調用對應的處理流程,執行對應的函數,譬如 exec_admin_cmd 就執行 ConfChange,Split 等 admin 命令,而 exec_write_cmd 則執行通常的對 State Machine 的數據操作命令。為了保證數據的一致性,Peer 在 execute 的時候,都只會將修改的數據保存到 RocksDB 的 WriteBatch 裡面,然後在最後原子的寫入到 RocksDB,寫入成功之後,才修改對應的內存元信息。如果寫入失敗,我們會直接 panic,保證數據的完整性。

在 Peer 處理 ready 的時候,我們還會傳入一個 Transport 對象,用來讓 Peer 發送 message,Transport 的 trait 定義如下:

pub trait Transport: Send + Clone {n fn send(&self, msg: RaftMessage) -> Result<()>;n}n

它就只有一個函數 send,TiKV 實現的 Transport 會將需要 send 的 message 發到 Server 層,由 Server 層發給其他的節點。

Multi Raft

Peer 只是單個 Region 的副本,因為 TiKV 是支持 Multi Raft,所以對於一個 Store 來說,我們需要管理多個 Region 的副本,這些都是在 Store 類裡面統一進行管理的。

Store 會保存所有的 Peers 信息,使用:region_peers: HashMap<u64, Peer>

region_peers 的 key 就是 Region ID,而 Peer 則是該 Region 在該 Store 上面的副本 Peer。

Store 使用 mio 驅動整個流程(後續我們會使用 tokio-core 來簡化非同步邏輯處理)。

我們在 mio 裡面註冊一個 base Raft Tick,每隔 100ms,調用一次,Store 會遍歷所有的 Peer,一次調用對應的 RawNode tick 函數,驅動 Raft。

Store 通過 mio 的 notify 機制,接受外面 Client 的請求處理,以及其他 Store 發過來的 Raft message。 譬如收到 Msg::RaftCmd 消息之後,Store 就會調用 propose_raft_command 來處理,而收到 Msg::RaftMessage 消息之後,Store 就會調用 on_raft_message 來處理。

在每次 EventLoop 循環的最後,也就是 mio 的 tick 回調裡面,Store 會進行 on_raft_ready 的處理:

  1. Store 會遍歷所有的 ready Peers,調用 handle_raft_ready_append,我們會使用一個 WriteBatch 來處理所有的 ready append 數據,同時保存相關的結果。

  2. 如果 WriteBatch 成功,會依次調用 post_raft_ready_append,這裡主要用來處理Follower 的消息發送(Leader 的消息已經在 handle_raft_ready_append 裡面完成)。

  3. 然後,Store 會依次調用 handle_raft_ready_apply,apply 相關 committed entries,然後調用 on_ready_result 處理最後的結果。

Server

Server 層就是 TiKV 的網路層,現階段,TiKV 使用 mio 來實現整個網路的處理,而網路協議則是使用自定義的,如下:

message = header + body nheader: | 0xdaf4(2 bytes magic value) | 0x01(version 2 bytes) | msg_len(4 bytes) | msg_id(8 bytes) |n

任何一個 message,我們都使用 header + body 的方式,body 就是實際的 message 數據,使用 protobuf 編碼,而 header,首先就是兩個位元組的 magic value,0xdaf4,然後就是版本號,再就是 message 的整個長度,以及 message 的唯一 ID。

對於 mio,在 Linux 下面就是封裝的 epoll,所以熟悉 epoll 的用戶應該能非常方便的使用 mio 進行網路開發,簡單流程如下:

  • bind 一個埠,生成一個 TcpListener 對象,並且 register 到 mio。

  • 處理 TcpListener on_readable 的回調,調用 accept 函數得到生成的 socket TcpStream,register 到 mio,我們後續就用這個 TcpStream 跟客戶端進行交互。

  • TcpStream 處理 on_readable 或者 on_writable 的回調。

同時,Server 通過 mio 的 notify 來接受外面發過來的消息,譬如 TiKV 實現的 Transport,就是 Peer 在調用 send 的時候,將這個 message 直接通過 channel 發給 Server,然後在 notify 裡面處理,找到對應的 Store connection,再發送給遠端的 Store 的。

對於 snapshot 的發送,Server 會單獨新開一個連接,直接使用一個線程同步發送,這樣代碼邏輯就會簡單很多,不需要處理過多的非同步 IO 邏輯。而對於接收端來說,在收到一個 message 的時候,會首先看這個 message 的類型,如果發現是 snapshot 的,則會進入接受 snapshot 的流程,會將收到的數據直接發給 snapshot 相關的線程,寫到對應的 snapshot 文件裡面。如果是其他的 message,也會直接 dispatch 到對應的處理邏輯處理,可以參考 Server 的 on_conn_msg 函數。

因為 Server 就是對網路 IO 的處理,邏輯比較簡單,這裡就不過多說明,但是,鑒於現階段 TiKV 使用的是自定義的網路協議,並不利於跟外部其他客戶端的對接,並且也沒有 pipeline,stream 等優秀特性的 支持,所以後續我們會換成 gRPC。

總結

這裡,我們解釋了 TiKV 核心的 Raft 庫,Multi Raft。在後續的章節,我們會介紹 Transaction,Coprocessor 以及 PD 是如何對整個集群進行變更的。(第二部分完結)

推薦閱讀:

MongoDB的水平擴展,你做對了嗎?
TiDB 增加 MySQL 內建函數
解析 TiDB 在線數據同步工具 Syncer
TiKV 源碼解析系列——如何使用 Raft

TAG:分布式数据库 |