TiKV 源碼解析系列——Placement Driver

本系列文章主要面向 TiKV 社區開發者,重點介紹 TiKV 的系統架構,源碼結構,流程解析。目的是使得開發者閱讀之後,能對 TiKV 項目有一個初步了解,更好的參與進入 TiKV 的開發中。

TiKV 是一個分散式的 KV 系統,它採用 Raft 協議保證數據的強一致性,同時使用 MVCC + 2PC 的方式實現了分散式事務的支持。

本文為本系列文章第三節。

介紹

Placement Driver (後續以 PD 簡稱) 是 TiDB 裡面全局中心總控節點,它負責整個集群的調度,負責全局 ID 的生成,以及全局時間戳 TSO 的生成等。PD 還保存著整個集群 TiKV 的元信息,負責給 client 提供路由功能。

作為中心總控節點,PD 通過集成 etcd ,自動的支持 auto failover,無需擔心單點故障問題。同時,PD 也通過 etcd 的 raft,保證了數據的強一致性,不用擔心數據丟失的問題。

在架構上面,PD 所有的數據都是通過 TiKV 主動上報獲知的。同時,PD 對整個 TiKV 集群的調度等操作,也只會在 TiKV 發送 heartbeat 命令的結果裡面返回相關的命令,讓 TiKV 自行去處理,而不是主動去給 TiKV 發命令。這樣設計上面就非常簡單,我們完全可以認為 PD 是一個無狀態的服務(當然,PD 仍然會將一些信息持久化到 etcd),所有的操作都是被動觸發,即使 PD 掛掉,新選出的 PD leader 也能立刻對外服務,無需考慮任何之前的中間狀態。

初始化

PD 集成了 etcd,所以通常,我們需要啟動至少三個副本,才能保證數據的安全。現階段 PD 有集群啟動方式,initial-cluster 的靜態方式以及 join 的動態方式。

在繼續之前,我們需要了解下 etcd 的埠,在 etcd 裡面,默認要監聽 2379 和 2380 兩個埠。2379 主要是 etcd 用來處理外部請求用的,而 2380 則是 etcd peer 之間相互通信用的。

假設現在我們有三個 pd,分別為 pd1,pd2,pd3,分別在 host1,host2,host3 上面。

對於靜態初始化,我們直接在三個 PD 啟動的時候,給 initial-cluster 設置 pd1=host1:2380,pd2=host2:2380,pd3=host3:2380

對於動態初始化,我們先啟動 pd1,然後啟動 pd2,加入到 pd1 的集群裡面,join 設置為 pd1=host1:2379。然後啟動 pd3,加入到 pd1,pd2 形成的集群裡面, join 設置為 pd1=host1:2379

可以看到,靜態初始化和動態初始化完全走的是兩個埠,而且這兩個是互斥的,也就是我們只能使用一種方式來初始化集群。etcd 本身只支持 initial-cluster 的方式,但為了方便,PD 同時也提供了 join 的方式。

join 主要是用了 etcd 自身提供的 member 相關 API,包括 add member,list member 等,所以我們使用 2379 埠,因為需要將命令發到 etcd 去執行。而 initial-cluster 則是 etcd 自身的初始化方式,所以使用的 2380 埠。

相比於 initial-cluster,join 需要考慮非常多的 case(在 server/join.go prepareJoinCluster 函數裡面有詳細的解釋),但 join 的使用非常自然,後續我們會考慮去掉 initial-cluster 的初始化方案。

選舉

當 PD 啟動之後,我們就需要選出一個 leader 對外提供服務。雖然 etcd 自身也有 raft leader,但我們還是覺得使用自己的 leader,也就是 PD 的 leader 跟 etcd 自己的 leader 是不一樣的。

當 PD 啟動之後,Leader 的選舉如下:

  1. 檢查當前集群是不是有 leader,如果有 leader,就 watch 這個 leader,只要發現 leader 掉了,就重新開始 1。

  2. 如果沒有 leader,開始 campaign,創建一個 Lessor,並且通過 etcd 的事務機制寫入相關信息,如下:

    // Create a lessor. ctx, cancel := context.WithTimeout(s.client.Ctx(), requestTimeout)leaseResp, err := lessor.Grant(ctx, s.cfg.LeaderLease)cancel()// The leader key must not exist, so the CreateRevision is 0.resp, err := s.txn(). If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)). Then(clientv3.OpPut(leaderKey, s.leaderValue, clientv3.WithLease(clientv3.LeaseID(leaseResp.ID)))). Commit()

    如果 leader key 的 CreateRevision 為 0,表明其他 PD 還沒有寫入,那麼我就可以將我自己的 leader 相關信息寫入,同時會帶上一個 Lease。如果事務執行失敗,表明其他的 PD 已經成為了 leader,那麼就重新回到 1。

  3. 成為 leader 之後,我們對定期進行保活處理:

    // Make the leader keepalived.ch, err := lessor.KeepAlive(s.client.Ctx(), clientv3.LeaseID(leaseResp.ID))if err != nil { return errors.Trace(err)}

    當 PD 崩潰,原先寫入的 leader key 會因為 lease 到期而自動刪除,這樣其他的 PD 就能 watch 到,重新開始選舉。

  4. 初始化 raft cluster,主要是從 etcd 裡面重新載入集群的元信息。拿到最新的 TSO 信息:

    // Try to create raft cluster.err = s.createRaftCluster()if err != nil { return errors.Trace(err)}log.Debug("sync timestamp for tso")if err = s.syncTimestamp(); err != nil { return errors.Trace(err)}

  5. 所有做完之後,開始定期更新 TSO,監聽 lessor 是否過期,以及外面是否主動退出:

    for { select { case _, ok := <-ch: if !ok { log.Info("keep alive channel is closed") return nil } case <-tsTicker.C: if err = s.updateTimestamp(); err != nil { return errors.Trace(err) } case <-s.client.Ctx().Done(): return errors.New("server closed") }}

TSO

前面我們說到了 TSO,TSO 是一個全局的時間戳,它是 TiDB 實現分散式事務的基石。所以對於 PD 來說,我們首先要保證它能快速大量的為事務分配 TSO,同時也需要保證分配的 TSO 一定是單調遞增的,不可能出現回退的情況。

TSO 是一個 int64 的整形,它由 physical time + logical time 兩個部分組成。Physical time 是當前 unix time 的毫秒時間,而 logical time 則是一個最大 1 << 18 的計數器。也就是說 1ms,PD 最多可以分配 262144 個 TSO,這個能滿足絕大多數情況了。

對於 TSO 的保存於分配,PD 會做如下處理:

  1. 當 PD 成為 leader 之後,會從 etcd 上面獲取上一次保存的時間,如果發現本地的時間比這個大,則會繼續等待直到當前的時間大於這個值:

    last, err := s.loadTimestamp()if err != nil { return errors.Trace(err)}var now time.Timefor { now = time.Now() if wait := last.Sub(now) + updateTimestampGuard; wait > 0 { log.Warnf("wait %v to guarantee valid generated timestamp", wait) time.Sleep(wait) continue } break}

  2. 當 PD 能分配 TSO 之後,首先會向 etcd 申請一個最大的時間,譬如,假設當前時間是 t1,每次最多能申請 3s 的時間窗口,PD 會向 etcd 保存 t1 + 3s 的時間值,然後 PD 就能在內存裡面直接使用這一段時間窗口.噹噹前的時間 t2 大於 t1 + 3s 之後,PD 就會在向 etcd 繼續更新為 t2 + 3s:

    if now.Sub(s.lastSavedTime) >= 0 { last := s.lastSavedTime save := now.Add(s.cfg.TsoSaveInterval.Duration) if err := s.saveTimestamp(save); err != nil { return errors.Trace(err) }}

    這麼處理的好處在於,即使 PD 當掉,新啟動的 PD 也會從上一次保存的最大的時間之後開始分配 TSO,也就是 1 處理的情況。

  3. 因為 PD 在內存裡面保存了一個可分配的時間窗口,所以外面請求 TSO 的時候,PD 能直接在內存裡面計算 TSO 並返回。

    resp := pdpb.Timestamp{}for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) if !ok { log.Errorf("we havent synced timestamp ok, wait and retry, retry count %d", i) time.Sleep(200 * time.Millisecond) continue } resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) resp.Logical = atomic.AddInt64(&current.logical, int64(count)) if resp.Logical >= maxLogical { log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) time.Sleep(updateTimestampStep) continue } return resp, nil}

    因為是在內存裡面計算的,所以性能很高,我們自己內部測試每秒能分配百萬級別的 TSO。

  4. 如果 client 每次事務都向 PD 來請求一次 TSO,每次 RPC 的開銷也是非常大的,所以 client 會批量的向 PD 獲取 TSO。client 會首先收集一批事務的 TSO 請求,譬如 n 個,然後直接向 PD 發送命令,參數就是 n,PD 收到命令之後,會生成 n 個 TSO 返回給客戶端。

心跳

在最開始我們說過,PD 所有關於集群的數據都是由 TiKV 主動心跳上報的,PD 對 TiKV 的調度也是在心跳的時候完成的。通常 PD 會處理兩種心跳,一個是 TiKV 自身 store 的心跳,而另一個則是 store 裡面 region 的 leader peer 上報的心跳。

對於 store 的心跳,PD 在 handleStoreHeartbeat 函數裡面處理,主要就是將心跳裡面當前的 store 的一些狀態緩存到 cache 裡面。store 的狀態包括該 store 有多少個 region,有多少個 region 的 leader peer 在該 store 上面等,這些信息都會用於後續的調度。

對於 region 的心跳,PD 在 handleRegionHeartbeat 裡面處理。這裡需要注意,只有 leader peer 才會去上報所屬 region 的信息,follower peer 是不會上報的。收到 region 的心跳之後,首先 PD 也會將其放入 cache 裡面,如果 PD 發現 region 的 epoch 有變化,就會將這個 region 的信息也保存到 etcd 裡面。然後,PD 會對這個 region 進行具體的調度,譬如發現 peer 數目不夠,添加新的 peer,或者有一個 peer 已經壞了,刪除這個 peer 等,詳細的調度實現,我們會在後續討論。

這裡再說一下 region 的 epoch,在 region 的 epoch 裡面,有 conf_ver 和 version,分別表示這個 region 不同的版本狀態。如果一個 region 發生了 membership changes,也就是新增或者刪除了 peer,conf_ver 會加 1,如果 region 發生了 split 或者 merge,則 version 加 1。

無論是 PD 還是在 TiKV,我們都是通過 epoch 來判斷 region 是否發生了變化,從而拒絕掉一些危險的操作。譬如 region 已經發生了分裂,version 變成了 2,那麼如果這時候有一個寫請求帶上的 version 是 1, 我們就會認為這個請求是 stale,會直接拒絕掉。因為 version 變化表明 region 的範圍已經發生了變化,很有可能這個 stale 的請求需要操作的 key 是在之前的 region range 裡面而沒在新的 range 裡面。

Split / Merge

前面我們說了,PD 會在 region 的 heartbeat 裡面對 region 進行調度,然後直接在 heartbeat 的返回值裡面帶上相關的調度信息,讓 TiKV 自己去處理,TiKV 處理完成之後,通過下一個 heartbeat 重新上報,PD 就能知道是否調度成功了。

對於 membership changes,比較容易,因為我們有最大副本數的配置,假設三個,那麼當 region 的心跳上來,發現只有兩個 peer,那麼就 add peer,如果有四個 peer,就 remove peer。而對於 region 的 split / merge,則情況稍微要複雜一點,但也比較簡單。注意,現階段,我們只支持 split,merge 處於開發階段,沒對外發布,所以這裡僅僅以 split 舉例:

  1. 在 TiKV 裡面,leader peer 會定期檢查 region 所佔用的空間是否超過某一個閥值,假設我們設置 region 的 size 為 64MB,如果一個 region 超過了 96MB, 就需要分裂。

  2. Leader peer 會首先向 PD 發送一個請求分裂的命令,PD 在 handleAskSplit 裡面處理,因為我們是一個 region 分裂成兩個,對於這兩個新分裂的 region,一個會繼承之前 region 的所有的元信息,而另一個相關的信息,譬如 region ID,新的 peer ID,則需要 PD 生成,並將其返回給 leader。

  3. Leader peer 寫入一個 split raft log,在 apply 的時候執行,這樣 region 就分裂成了兩個。

  4. 分裂成功之後,TiKV 告訴 PD,PD 就在 handleReportSplit 裡面處理,更新 cache 相關的信息,並持久化到 etcd。

路由

因為 PD 保存了所有 TiKV 的集群信息,自然對 client 提供了路由的功能。假設 client 要對 key 寫入一個值。

  1. client 先從 PD 獲取 key 屬於哪一個 region,PD 將這個 region 相關的元信息返回。

  2. client 自己 cache,這樣就不需要每次都從 PD 獲取。然後直接給 region 的 leader peer 發送命令。

  3. 有可能 region 的 leader 已經漂移到其他 peer,TiKV 會返回 NotLeader 錯誤,並帶上新的 leader 的地址,client 在 cache 裡面更新,並重新向新的 leader 發送請求。

  4. 也有可能 region 的 version 已經變化,譬如 split 了,這時候,key 可能已經落入了新的 region 上面,client 會收到 StaleCommand 的錯誤,於是重新從 PD 獲取,進入狀態 1。

小結

PD 作為 TiDB 集群的中心調度模塊,在設計上面,我們盡量保證無狀態,方便擴展。本篇文章主要介紹了 PD 是如何跟 TiKV,TiDB 協作交互的。後面,我們會詳細地介紹核心調度功能,也就是 PD 是如何控制整個集群的。


推薦閱讀:

跨數據中心一致性是如何實現的?
對資料庫和分散式很感興趣,學習路線是什麼?

TAG:分布式数据库 |