Omid Transaction Processing
I. Omid簡介和動機[1]
Omid由Yahoo開發, 為Yahoo內容管理系統Sieve提供ACID事務處理服務, 現已經成為Apache Incubator項目[2].
Omid是over NoSQL資料庫的分散式事務處理協議. 為HBase[3]增添了的cross-row transaction(或名為multirow transaction)[4]的處理能力. 採用事務處理層和存儲層解耦分離的設計, 避免了事務處理向存儲層侵人; 因此可作為通用事務處理框架, 嫁接於其他key-value資料庫之上.
Internet-scale data processing pipeline是Omid的典型應用場景, 其思路為: 數據從輸入到輸出結果, 需要經過多個stage的處理, 所有stage構成pipeline. 不同於MapReduce的批量處理, pipeline提供的增量處理更高效省錢; 與傳統Streaming處理採用通信銜接stage不同的是, Omid和Percolator[4]可讓pipeline採用共享存儲的方式傳遞處理結果. Omid和Percolator以事務處理的方式解決了, 並行計算的stage所面臨的訪問共享數據的data race問題.
Omid採用單一中心化Transaction Manager(abbr. TM), 做事務並發控制. 一般地, 單一中心節點會成為性能瓶頸, 出現單點故障影響系統的可用性.
Omid結合應用特點和自身架構, 對TM做了很多提升TPS的優化, 有些優化(batching)可能會增大RT; 對於latency-sensitive的conversational事務和throughput-sensitive的queued short-lived update[5]事務, Omid更適合後者.
TM的容錯直接影響Omid可用性, 其HA機制略複雜, 總體上採用failstop+failover, 並且也解決了failover期間的write-too-late問題, 後續會詳細展開.
Omid採用MVCC+SI作為並發控制協議, 比較適用於衝突極少的場景, 不適合long-lived update事務和衝突頻繁的更新事務; 在Sieve中, 據觀察, 衝突引起的abort rate不超過0.002%.
在某些場景下, Omid的TPS可達50K/s; Latency的影響因素很多, 有幾十ms~上百ms, 甚至超過1s; 取決於系統負載, batch size, 事務訪問元素數量, region server的個數等等.
本文的論述採用由淺入深的方式, 比如: 衝突檢測部分, 會先做極大的簡化, 然後深入其中; TM的HA方案中, 會先講解write-too-late問題; 看過論文的讀者, 可能會意識到本文的開始部分和論文略有出入, 希望堅持自己的勇氣讀完. 事務處理協議, 複雜並且缺乏直觀, 難以建立了良好的直覺. 前期的細節爆炸對理解有誤導作用, 反而不美.
為了排版方便, 本文採用記號不同於原文, 但從字面理解其含義也不難.
本文為了消除歧義, 參考其他文獻, 對一些術語做了調整:
- start_ts, 表示原文中的read timestamp[6,7];
- 用key的fingerprint或者直接用fingerprint指示key的64bit hash值[8].
II. Omid架構
架構圖傳遞三方面信息: 怎麼部署? 組件有哪些職責? 組件如何交互?
部署
構成Omid三個組件為: Client, TM和Persistent Storage.
組件的角色職責
- Client: 發起事務, 讀, 寫私有副本, 提交和安裝新版本.
- TM: 處理Client的請求, 分配時間戳, 衝突檢測, 提交事務, 寫Persistent Storage和高可用.
- Persistent Storage: 提供可靠的多版本的持久化key-value存儲, 支持單行原子操作或事務.
組件交互(data flow/control flow)
以Update事務為例說明組件交互.
- Client執行BEGIN操作發起事務, 請求TM分配txid(txid即為start_ts).
- TM管理邏輯時鐘, 產生嚴格單調遞增序列, 給Client分配txid. txid即具有唯一性以區分不同事務, 又可以作為key-value的版本編號, 還能用於判斷事務的執行區間是否重疊.
- Client執行讀操作GET/READ, 從Data Store中獲取數據.
- Client根據讀出的值, 計算結果, 調用函數PUT/WRITE執行tentative write, 試探性地往Data Store寫私有版本.
- Client執行COMMIT操作發起提交, TM分配commit_ts, 做backward衝突檢測, 決定事務夭折還是提交; 如果提交, 則調用UpdateCT函數寫(txid, commit_ts)到CT表中並落盤, 作為事務提交的標誌.
- Client根據TM提交返回的結果, 完成後續操作. 如果事務夭折, 則撤銷tentative write創建的私有版本; 如果事務提交成功, 則安裝tentative write創建的私有版本, 並且刪除提交信息.
總結 Omid的更新事務可以分為begin, read, tentative write, commit和 install五部分; 只讀事務是更新事務的特例, aborted事務選擇rollback撤銷tentative write.
III. Omid數據模型和事務處理
Omid事務簡言之
- 事務所訪問的數據為共享的可靠的持久化的多版本的key-value item集合;
- 事務是對若干item的執行read/write(i.e. get/put)操作的序列;
- 該操作序列為begin和commit所包圍, 具有ACID特性.
DT表和CT表
Omid在HBase中創建Data Table(abbr. DT)和Commit Table(abbr. CT), 用以存儲item和commit信息.
注意: 後文中出現SQL語句只是為了從概念上理解Omid系統, 有別於系統具體實現.
DT和CT可用如下schema表示.
CREATE TABLE DT ( key VARBINARY(1024), value VARBINARY(1024), txid BIGINT, # txid即為start_ts commit_ts BIGINT, PRIMARY KEY(key, txid)); CREATE TABLE CT ( txid BIGINT PRIMARY, commit_ts BIGINT UNIQUE);
DT表中的單行記錄表示item的一個版本. 事務更新某key時, 在DT表中插入一條記錄, txid和commit_ts保存事務ID(開始時間戳start_ts)和提交時間戳commit_ts. 例如: txid=100的事務創建的key為Alice的版本, 取值為100, 提交時間為101.
CT表中的單行記錄表示committed事務的提交信息, 類似於WAL log中的<COMMIT T>
標記, uncommitted事務一定不會出現在CT表中, complete事務在CT表中的記錄, 可以刪除. 例如: txid=110的事務的提交時間戳為101.
事務的簡單例子
T: Alice向Bob轉50$, 操作序列為:beginr[Alice]r[Bob]...計算...w[Alice]w[Bob]commit假設上文DT表的記錄表示Alice和Bob當前的賬面餘額分別為100$和200$.
(1). begin T:
TM返回txid=130
(2). read:
分別讀DT表中Alice和Bob當前賬面餘額. Alice和Bob記錄的提交時間為101和120, 小於T.txid; 因此對T可見, 返回100$和200$
(3). tentative write:
分別更新DT表中Alice和Bob的賬面餘額.此時的寫屬於tentative write, 不會對原有到記錄作in-place修改,而是插入新的記錄. 分別插入下面記錄到DT表中: (Alice, 50, 130, NULL); (BOB, 250, 130, NULL).commit_ts取值NULL標記DT表中的紀錄為tentative version.
(4). commit T
TM給T分配commit_ts=137;然後做衝突檢查, T通過檢查, 決定提交T;通過UpdateCT插入記錄(T.txid, T.commit_ts)到CT表;CT表更新後, 返回T.commit_ts;
(5). install:
事務繼續roll forward. Client對DT表中(key=Alice,txid=T.txid)的記錄做in-place更新, 將其commit_ts修改T.commit_ts,對(key=Bob, txid=T.txid)的記錄也做同樣處理. 修改完DT表後, 刪除CT表中記錄(T.txid, T.commit_ts). 至此事務完成.
總結:
Omid事務是對若干reliable persistent multi-versioned key-value item執行get/put操作的序列;
操作序列被begin和end包圍, 具有ACID特性;
Omid事務用DT表存儲committed/tentative update, 用CT表存儲事務的提交標記.
IV: Omid事務Atomicity/Durability/Recovery
在使用redo/undo log[7]的單機事務中, <COMMIT T>
落盤事件是事務T的commit point; 資料庫crash後重啟, 恢復事務T的操作如是:
- log中若存在<COMMIT T>, 則順序地redo事務T操作日誌; 否則,
- 事務T處於aborted或者outstanding狀態, 反序undo事務T的操作日誌.
在Omid中, tentative write不會更改DT表已有的記錄, 而是插入了一條新記錄
(key, new_val, T.txid, NULL)
通過DT表的commit_ts屬性, 可以區分item的版本是對外可見committed版本, 還是事務T私有的tentative版本.
- 如果記錄的commit_ts為NULL, 說明為tentative版本;
- 如果記錄的commit_ts為事務T.commit_ts, 則說明為committed版本, 該行記錄的txid和commit_ts是創建這條記錄的事務的txid和commit_ts.
從本質上講, 事務T的tentative write相當於向DT表中插入了一條redo/undo日誌記錄.
UpdateCT(T.txid, T.commit_ts)成功執行的事件, 是事務T的commit point, 是事務T處於aborted/outstanding狀態還是committed狀態的分水嶺, 相當於redo/undo log的<COMMIT T>
成功落盤的事件
Omid的事務的狀態
- outstanding: 事務尚在執行中. 如果client或者TM crash了, 則自動進入aborted狀態.
- aborted: 事務已經夭折. TM執行並發控制邏輯時, 發現事務T無法進行下去, 則使其夭折.
- committed: 事務成功提交. 到達commit point.
- complete: 事務夭折或者提交後. 完成了後續roll forward和rollback操作.
其實, Omid恢復時, 沒有刻意區分outstanding和aborted狀態, 都當成aborted事務來處理.
Omid根據事務的最終狀態決定事務恢復時, 執行roll forward還是roll back操作.
- committed: 選擇roll forward, 將DT表中的tentative記錄的commit_ts屬性修改為T.commit_ts, 刪除CT表中的提交紀錄.
- aborted: 選擇rollback, 刪除DT表中tentative記錄.
roll forward的時機
- 事務T提交後, 執行事務T的Client繼續主動roll forward.
- piggyback方式, 事務T2讀item時, 讀到DT表中T1創建的tentative版本, 查看CT表, 如果發現T1已經提交, 則安裝該版本.
- 後台cleanup進程非同步掃描CT表和DT表, 作roll forward處理.
roll back的時機
- 事務T失敗, 主動撤銷tentative版本.
- piggyback方式, 事務T2讀紀錄時, 發現了DT表中的存在write-too-late問題的過期事務T2所寫tentative版本, 嘗試殺死該事務並撤銷tentative版本(注意: 原文中沒有提及撤銷tentative版本, 當然這樣做, 也無可厚非).
- 後台cleanup進程非同步地roll back.
總結:
Omid的DT表+CT表類似redo/undo日誌,
UpdateCT成功執行事件是commit point, 成功則roll forward(順序redo), 失敗則rollback(反序undo).
V. 事務並發控制和隔離性
Omid採用MVCC而非Lock-based CC, 通過衝突檢測(conflict detection)的方式, 以決定事務的最終結局: committed or aborted ?
每個事務T都要這樣屬性
1. start_ts: start timestamp, 事務開始執行的時間戳; 等同於txid, read timestamp.2. commit_ts: commit timestamp, 事務提交的時間戳.3. [start_ts, commit_ts]: 事務的持續時間, 事務執行的時間區間,用於衝突檢查.4. read_set: 事務讀操作的元素集合.5. write_set: 事務寫操作的元素集合.
衝突檢測
事務T1和T2同時具備如下兩個條件則衝突:
Condition I: 執行區間重疊, i.e. T2.start_ts < T1.commit || T1.start_ts < T2.commit_ts; Condition II: write_set重疊: T1.write_set ∩ T2.write_set ≠ ?.
違反I: 事務在不同的時間段執行, 相當於串列執行.
違法II: 事務修改的元素無交集(disjoint), 訪問資料庫的不同部分.事務調度的目標: 讓無衝突的事務, 並發執行; 讓有衝突到事務通過wait和rollback的方式相互錯開而串列執行.注意:- Omid是OCC協議, 並混雜了MVCC.
- OCC中在validate phase, 還需要檢查當前事務的read_set與已驗證過的事務的write_set是否有衝突.
- Omid採用了多版本,事務T可以直接讀取T.start_ts時刻的snapshot的最新版本的數據.
Omid因為採用backward衝突檢查[12], 因此TM只檢查當前事務T0, 是否和已提交(committed)事務之間的是否存在衝突.
backward衝突檢查
檢查集合{T|T0.start_ts < T.commit_ts}中的事務是否和T0衝突(與已提交的事務的衝突);
forward衝突檢查
檢查集合{T|T.start_ts < T.commit_ts}中的事務是否和T0衝突(與未提交的事務的衝突).
如果無衝突, 則事務提交; 如果有衝突, 則事務夭折
隔離性1: 事務T0能夠讀到提交時間早於T0開始時間的所有事務的變更.
上述條件, i.e. T0可以讀到{T|T.commit_ts < T0.start_ts}的全部(累計)結果.
或者說: T0開始前已經提交的事務產生的變更對T0可見; 或者說: T0能夠讀到T0.start_ts時刻的snapshot.假設T1和T2都訪問元素a,T1.commit_ts < T2.start_ts.如果T2無法讀到T1的對元素a產生的更新, 則會出現Stale Read;T2基於a的stale value, 更新a, 會導致"Lost Update" anomaly[9].比如a當前值為(value=0, txid=1, commit_ts=2),T1和T2均對a執行+1操作,調度為: b1[txid=10] #T1 beginr1[a=0]w1[a:=1, txid=10]c1[commit_ts=12] #(a, 1, 10, 12)b2[txid=13] #T2 beginr2[a=0] #stale readw2[a:=1, txid=13]c2[commit_ts=15] #(a, 1, 13, 15)T1的更新丟失.
Stale Read為什麼會不可思議的發生呢? 因為write-too-late!!
write-too-late[7]
其實Timestamp-based CC都會面臨這樣一個需要解決的問題.
該問題被稱為"write-too-late", T1獲得commit_ts的確比T2的start_ts小, 但是T1因為故障, 各種delay會造成T1的寫操作執行太晚, 以至於T1的讀先於T2的寫. 從本質上講, 就是TOCTTOU[13], 正所謂T1 "起了大早, 趕了晚集", T2 "後發先至".
Omid的TM花了不少的篇幅來解決write-too-late問題, TM的HA方案中要解決的三個問題之一就是此問題.
為什麼會出現write-too-late呢?
如上圖所示, T1.start_ts=t1, T1.commit_ts=t2, T2.start_ts=t3; T1的執行時間區間[t1, t2]顯然早於T2.start_ts. 因此T1的變更對T2可見. 然後從t2~t4時刻的UpdateCT(t1, t2)成功執行, 中間經過了較長的延遲. 如果T2在t3~t4內讀被T1修改item, 則必然讀不到; 只有t4之後, T1的修改才對T2可見.
因此需要保持下面的invariant:
TM處理事務T0的BEGIN請求時,集合S={T|T.commit_ts < T0.start_ts}是早於T0開始提交到事務,只有等到S中的所有事務執行完UpdateCT(T.start_ts, T.commit_ts)(落盤)或abort後, TM才能返回T0.start_ts.
在同一TM中, 能夠保持該invariant; 當TM做failover時, 無法保持該條件, 詳見後文的TM HA部分.
隔離性2: 事務T0和已提交的事務衝突, 則T0必須夭折
上述條件, i.e. 存在T s.t. T is committed && T0.start_ts < T.commit_ts < T0.commit_ts, 則T0必須夭折.
否則會出現Lost Update anomaly.舉一個Lost Update anomaly例子: Alice同時向Bob和Candy轉錢會出現double-spending問題.
DT中存儲了Alice, Bob, Candy的同名賬戶餘額: Tl: Alice->Bob:50$b1r1[Alice] r1[Bob] w1[Alice]w1[Bob] c1T2: Alice->Candy:100$b2r2[Alice] r2[Candy] w2[Alice]w2[Cady] c2schedule:b1r1[Alice] r1[Bob]b2r2[Alice] r2[Candy] w1[Alice]w1[Bob] c1w2[Alice]w2[Cady] c2
T1和T2開始前, DT表的初始狀態為:
T1和T2開始執行, 分配start_ts分別為20和25.
T1讀到Alice=100, Bob=200
T2讀到Alice=100, Candy=300
T1將Alice和Bob的賬戶分別執行-50和+50, 寫入並提交, T1.commit_ts=30:
T2將Alice和Candy的賬戶分別執行-100和+100, 寫入並提交, T2.commit_ts=35
最後, Alice用100$完成了150$的轉賬, 賬面總額憑空增加了50$, 出現了double-spending. 原因是T1對Alice的更新丟失了.
很顯然, T1和T2衝突, 其中有一個必須夭折.
總結:
隔離性是指: 1.事務提交則對後續事務可見, 2.事務和已提交事務發生衝突則夭折;
不滿足隔離性則會出現Stale Read和Lost Update anomaly.
VI. TM衝突檢查
衝突檢查的簡單版本
TM維護hash table用於衝突檢查, hash table保存最近修改過key和修改時間.
假設T0是TM正在提交的事務, 則:與T0衝突的committed事務集合: ConflictSet(T0) = {T.txid | T is committed && T conflicts with T0 }與T0執行時間區間重疊的committed事務集合: OverlappingSet(T0) = {T.txid | T is committed && T0.start_ts < T.commit_ts}執行時間區間重疊是事務衝突的必要條件, 所以: ConflictSet(T0) ∈ OverlappingSet(T0) TM維護活躍事務的最小txid min_outstanding_txid = MIN{T.txid | T is outstanding}與活躍事務有可能存在衝突的已提交事務集合: PotentialConflictSet = {T.txid | T is committed && min_outstanding_txid < T.commit_ts}顯然, min_outstanding_txid < T0.start_ts, 所以 ConflictSet(T0) ∈ OverlappingSet(T0) ∈ PotentialConflictSetTM的hash table表只需要維護PotentialConflictSet中所有事務的write_set的並集;可以將hash table看成是DT表的同步更新的物化視圖: CREATE MATERIALIZED VIEW HashTable AS SELECT key, MAX(commit_ts) AS last_commit_ts FROM DT WHERE min_outstanding_txid < commit_ts GROUP BY key; 衝突檢查的邏輯也可以用SQL表達為: SELECT COUNT(*) FROM HashTable WHERE key in T0.write_set and T0.txid < last_commit_ts如果為0, 則無衝突; 否則, 存在衝突.
即hash table中保存提交時間晚於最小活躍事務開始時間的已提交事務的write_set中key和key最近修改時間.
衝突檢查時, 查找T0.write_set中key是否在hash table中出現; 如果出現, 進一步比較last_commit_ts是否大於T0.start_ts; 如果大於, 則T0衝突而abort.
如果write_set中所有key, 要麼在hash table中沒有出現, 要麼T0.start_ts >last_commit_ts, 則T0可以提交; 將write_set中的每個key和T0.commit_ts作為key-value對插入到hash table中, 並且執行UpdateCT(T0.start_ts, T0.commit_ts).
TM 優化
截止目前, 我們簡化的TM提交邏輯, 都是概念性的, 雖然有助於理解TM的並發控制原理, 但實則性能無法接受, 主要原因是:
- 如果write_set和hash table直接存儲item的key, 則事務write_set的大小, key的長度直接影響Client和TM之間的write_set的傳輸延遲, 也影響hash_table的內存使用量.
- TM採用in-memory hash table, 內存容量有限, TM hash table內存使用必然有上限.
- 應該利用機器的多CPU多核的並行計算, 提升提交處理的速度.
- TM更新CT表的RTT影響事務處理的吞吐率.
為了解決上述問題, 採用的優化有:
(1). write_set和hash table不直接保存key, 而是保存key的64bit hash值
我們不妨把該key對應的hash值稱為key的fingerprint[8]. 如果key平均長度大於8位元組, 則採用fingerprint方式能夠更加有效地利用帶寬和TM的內存資源. 而且可以更加精確地估計hash table的內存使用, 便於調優.
應該選取具有速度快, 均勻, 碰撞稀少的特點的hash函數.
使用hash帶來的收益計算如下:
假設: key平均長度為L, write_set包含key數量為N, client和TM之間的帶寬為W0, fingerprint計算的帶寬為W1,則傳輸write_set的LatencySpeedup為: LatencySpeedup = (L*N/W0)/((8*N)/W0 + (L*N)/W1) = (L*N/W0)/(8*N*W1+L*N*W0)/(W0*W1)) = L*W1/(L*W0+8*W1)如果: L = 128B, N = 1048576, W0 = 100MBps, W1 = 1000MBps, LatencySpeedup = 128*1000/(128*100+8+1000) ≈ 9.27 即write_set的傳輸速度提高了9.27倍
hash碰撞可能會導致本來無衝突的事務, 被誤判衝突而夭折(spurious aborts), 導致事務失敗幾率增大, 因此需要選擇碰撞稀少的hash函數, 關於hash函數請參考[10].
(2). TM將hash table分成若干個固定大小的bucket
每個bucket是一個fixed-size array. array的每個slot存儲pair(fingerprint, last_commit_ts).
fingerprint到bucket的分布, 最簡單方式是採用取模運算, 當然也可以採取其他hash函數.
如果hash table的load factor較大, 或者hash函數不夠好導致數據傾斜嚴重, 致使bucket填滿. 插入的新fingerprint到滿桶中, 會替換掉桶中last_commit_ts最小slot. 也就是說, 在某些情況下, hash table中並沒有保存全部潛在衝突的fingerprint, 滿bucket只保存最近更新的fingerprint.
對事務T衝突檢查時, 如果
1. fingerprint FP在bucket查找不到,2. bucket已滿,3. T.start_ts < bucket.last_commit_ts
上述三個條件都滿足的情況下, 則FP有可能依然存在衝突, 但被換出了. 因此TM保守地認為T有衝突, 而abort T.
所以, hash table的滿bucket保存最近更新的fingerprint策略, 也會產生衝突誤判.
(3). TM的採用SEDA[14]框架做並發處理
SEDA的思路是: 將計算過程分解成若干stage, stage之間通過隊列連接構成pipeline, 每個stage多線程處理. 能夠做到inter-stage和intra-stage並發, 充分利用多CPU多核的並行計算能力.
比如可以將TM的提交處理分為三個stage: 分配時間戳, 查找更新hash table作衝突檢測和UpdateCT.
(4). 衝突檢測的並發處理
衝突檢測需要查找和更新hash table, 而hash table全局共享. 如果加鎖的粒度為整個hash table, 則衝突檢測變成了串列執行, 會導致其他待提交的事務等鎖.
TM採用並發的衝突檢測, 加鎖粒度為bucket, 能做到bucket之間的並行處理.
顯然兩個事務如果write_set中fingerprint所分布的bucket集合無交集(disjoint), 則兩個事務可以訪問不同的桶.
事實上, Omid的衝突檢測對write_set的逐個fingerprint一一檢查: 將fingerprint對應的bucket加鎖, 判斷是否衝突; 如果衝突, 則事務失敗; 如果不衝突, 則插入pair(fingerprint, T.commit_ts). 檢查完全部fingerprint, 發現沒有衝突, 則提交事務.
如果事務在第n個fingerprint檢查時失敗而夭折, 則前n-1個fingerprint在hash table的last_commit_ts已經更新為該事務的commit_ts. 夭折事務會影響其他事務衝突檢查, 產生衝突誤判, 導致事務夭折, 甚至連鎖式夭折(cascading rollback)[7].
(5). UpdateCT批量執行
TM將UpdateCT操作打包成一batch, 並發執行batch. 能夠將多個UpdateCT的開銷平攤, 以提升TM的吞吐量.
當然依然需要保證事務T的BEGIN操作返回start_ts之前, commit_ts小於T.start_ts的所有事務完成提交.
UpdateCT能夠提升吞吐, 但不利於response time.
Omid可通過CT表分散到更多的RegionServer的方式, 來降低UpdateCT的延遲.
總結: TM的優化目標是為了提升TPS, 但會增加事務衝突誤判的幾率, 會讓更多的事務夭折, 甚至出現cascading rollback. 所以Omid適用於衝突極少應用場景.
VII. TM HA
業界普遍採用primary-backup+failstop+failover機制實現設備或服務的高可用
- primary-backup: 部署多個服務, 一個是primary, 對外提供服務; 其餘是backup, 作為熱備.
- failstop: 如果primary出現故障, 則及時停止服務, 避免錯誤潛伏導致更隱蔽, 更不可控的錯誤.
- failover:
when the primary fails, the backup takes over almost instantly
[5],
當然HA還涉及service discovery, failure detection, leader election, data replication等問題.
Omid TM採用雙機主備的方式實現高可用, 並未採用RSM; 當新TM啟用後, 對老primary TM的hash table並不知情, 而是擁有一個空hash table.
保證時間戳嚴格單調遞
Omid TM採用與Percolator相同的時間戳分配機制, 能夠保證failover的情況下, 時間戳依然嚴格單調遞增.
下面invariant始終保持:
老primary TM分配出的timestamp都小於新primary TM上分配出的timestamp.
這個invariant如此重要, 能夠保證新TM上事務必然不會和老TM上事務衝突, 因為衝突的必要條件之一T1. commit_ts < T2.start_ts
不成立, 這也是為什麼Omid TM可以不用對hash table作主從數據複製.
時間戳分配機制[4]
1. TM將時間戳空間劃分為epoch.
epoch表示一個時間戳區間(start_ts, end_ts], epoch不重疊, 上一個epoch的end_ts和下一個epoch的start_ts相同, 如: epoch1為(0, 1000]; 下一個epoch2為(1000,2000].2. TM維護兩個狀態: maxTS和prevTS. maxTS: shared and non-volatile; maxTS是當前epoch的end_ts, 也是下一個epoch的start_ts; maxTS是共享並且非易失的, maxTS survives when TM crashes; Omid用zk保存maxTS. prevTS: private and volatile; prevTS是已經分配出去的時間戳; prevTS為TM所私有; 是保存在內存中的變數. 3. TM分配epoch要做兩件事: 先將epoch的end_ts賦給maxTS並持久化, 然後將start_ts賦給prevTS, 注意先後順序不能搞反.4. 初始化時, TM先分配一個初始的epoch作為當前epoch.5. 當TM分配時間戳時, 如果當前epoch已經耗盡(prevTS == maxTS), 分配下一個epoch作為當前epoch; TM將當前當前prevTS自身+1, 然後分配出去.6. 如果TM crash掉, 新TM讀到maxTS, 分配start_ts=maxTS的epoch作為當前epoch, 接著分配時間戳.TM crash之前,已分配的時間戳 <= maxTS; 新TM啟用後, 分配時間戳 > maxTS; 因此時間戳在TM failover的時候依然保持嚴格的單調遞增.write-too-late問題
回憶單機TM中, 我們通過始終保持下面的invariant以消除write-too-late問題:
TM返回當前事務T0的start_ts之前, {T|T.commit_ts < T0.start_ts}的全體事務: 要麼aborted, 要麼committed(i.e.記錄(T.txid,TT.commit_ts)通過UpdateCT函數插入到了CT表中), 不能有處於outstand狀態的事務.
failover時, 該invariant無法保持!!!
比如老TM提交事務T1, 發送UpdateCT(T1.txid, T1.commit_ts)請求給HBase後crash, HBase因為OS調度等原因延遲一段時間後處理該請求. 在處理該請求之前, 新TM已經failover完成, 事務T2分獲得了start_ts並且讀取了T1所修改的key.
Omid為了避免TM failover時的write-too-late問題, 對上述的invariant進行了增強.
如果T1和T2滿足: 1. T1.commit_ts < T2.start_ts; 2. K ∈ T1.write_set ∩ T2.read_set.因為write-too-late原因, T2讀到了item K的T1之前的版本, 則T1不能提交.
具體實施由client在執行讀操作時將write-too-late的事務殺死(invalidate).
如果T2讀取數據時,發現DT表有T1的tentative update, 並且T1.txid屬於過期epoch; 則, 執行原子操作invalidate T1 if T1.txid not in CT.
primary的uniqueness約束
primary TM遭遇無法及時修復的故障時, 因停服而不可用; backup TM自動failover, 恢復服務.
然而, backup如何發現primary故障呢?
答案是故障檢測, 通常採用lease機制[11].
租過房子的讀者會知道: 租房子的時候, 會有和房東協商好租期, 如果房子到期還需續租, 則應該在租期截止之前至少一個月, 把續租的意願告訴房東;如果房租到期, 房客沒有續租, 好心的房東一般會給訪客預留若干天(當然這段時間也是按天計費的), 用來找新的房源. 生活中租房的規則能夠保證同一個房子的兩個租客的租期一定不會重疊, 房子的租期是由空窗期所分割.
Lease機制
(1). lease機制有兩個參數: 租期LeasePeriod和寬限期GracePeriod, 而且LeasePeriod < GracePeriod
(2). primary維護租期到期時間LeaseDueTime. 當Now() < LeaseDueTime時, primary可對外提供服務; 否則primary failstop(未必一定自殺, 只要能夠failstop即可). (3). backup維護寬限期到期時間GraceDueTime, 當Now() > GraceDueTime時, backup做failover, 晉陞為primary; 否則backup保持不變.(4). primary在LeaseDueTime到達之前, 向backup發送續租請求(ExtendLease).(5). backup收到ExtendLease請求後, GraceDueTime已經到達, backup都會拒絕續租; 否則同意續租, 並且:- 更新(Renew) GraceDueTime = Now()+GracePeriod
- 回復primary消息"同意續租"以響應.
(6). primary收到backup同意續租的響應後, 做:
- 更新(Renew) LeaseDueTime = Now() + LeasePeriod.
(7). 如果primary因為crash, network partitioning等故障無法續租, 則backup最多經過一個GracePeriod, 執行failover, 取得控制權.
假設:
- ExtendLease請求Latency不超過δ, 如果請求超過δ, 則認為失敗.
- primary和backup的clock drift rate[6,]為e(t), |e(t)| < Δ(注意primary和backup到物理時鐘沒有必要同步, 主要保持幾乎相同的漂移速率即可).
當:
LeasePeriod + δ + Δ*GracePeriod < GracePeriod, 可以確保不出現雙主.
比如GracePeriod為5000ms, δ為100ms, Δ=4ms/s
則LeasePeriod只要不超過5000ms-120ms=4880ms即可. 可以選在租期到達4000ms的時候, 發送ExtendLease.Omid租約機制
Omid的租約機制原理類似, 但實現不同. 上述的實現是基於通信的lease機制, 而Omid採用共享數據的方式實現, 參考代碼: https://github.com/apache/incubator-omid/blob/master/tso-server/src/main/java/org/apache/omid/tso/LeaseManager.java.
Omid租約演算法如下:
(1). primary和backup共享Zookeeper的permanent multi-versioned znode: /tso-lease.
(2). 兩者都嘗試通過zk版本的CAS操作, 將/tso-lease的數據版本+1.
zk的CAS原子操作為: Zookeeper.setData(znode, data, version);
當version!=-1時, 只有參數中的version和znode的當前version匹配時, 才會更新成功並且znode的version增1; 否則更新失敗, 且znode數據和版本保持不變.
backup和primary用局部變數lastNodeVersion緩存/tso-lease的最新version, 用於更新時的verison匹配檢查.
(3). primary在租期剩餘時間少於guardLeasePeriodInMs時tryToRenewLeasePeriod以續租.
leasePeriodInMs取10s(可配置),
guardLeasePeriodInMs為leasePeriodInMs/4.
也就是說primary在租期過了3/4部分, 就要續租.
對比上文中的租約機制, 我們可知:
- leasePeriodInMs - guardLeasePeriodInMs為PeriodLease lease,
- PeriodInMs為GracePeriod
primary持有lastNodeVersion當前值, 經過leasePeriodInMs的3/4後, 嘗試對/tso-lease的版本進行+1操作. 如果返回成功, 則更新lastNodeVersion為znode的最新版本, 租期截止時間更新為: System.currentTimeMillis() + leasePeriodInMs. 如果失敗, 則primary failstop(commit suicide).
(4). backup每隔leasePeriodInMS周期tryToGetInitialLeasePeriod() 以failover取代primary.
backup持有lastNodeVersion當前值, 經過leasePeriodInMs時間後, 也嘗試更新/tso-lease. 如果返回成功, 則failover以取代老的primary; 如果失敗, 則重新緩存lastNodeVersion為/tso-lease的最新版本, 經過lastNodeVersion周期後, 再次tryToGetInitialLeasePeriod.
primary和backup的租期之間存在空窗期guardLeasePeriodInMs, 只要設置合理的空窗期, 就能夠保證新老primary的租期不重疊.
lease機制能夠為高可用提供探測故障, failstop和保證primary的唯一性約束的功能
primary uniqueness invariant
既在任意時刻之多有一個primary; 當然, 容許出現無主的空窗期.
為什麼要保證primary uniqueness呢? 是為了避免網路斷開導致brain-split. 試想兩個出現兩個primary, 執行衝突事務而無法檢查, 各自的邏輯時鐘出現串擾倒退, 則會出現Lost Update anomaly.
總結
- TM通過primary-backup+failstop+failover提供HA;
- TM HA能夠保證故障切換不影響邏輯時鐘的嚴格單調遞增特性;
- 故障切換破壞了的單機TM避免write-too-late問題的invariant, TM HA增強了該invarint.
- TM HA通過lease機制實現故障檢測, failstop和primary unique invariant保證.
VIII: Omid讀操作
Client在執行當前事務的讀操作時解析item的版本
Omid的讀操作比較有意思, 如何實現, 並非簡單地從start_ts時刻的snapshot中讀取數據, 因為讀操作面臨write-too-late和roll forward的問題.
DT中的任意item, 都有這樣的性質:
時間戳的嚴格單調遞增, 能夠保證item的所有版本的記錄的txid和commit_ts都唯一.
item的版本分為committed和tentative.
隔離性能夠保證committed-committed, committed-tentative版本之間, [txid, commit_ts]區間互不重疊.
但tentative-tentative版本之間可能會重疊, 隔離性能保證最多只有一個能夠成功安裝.
tentative版本的txid大於提交版本的txid.
Client需要解析item的多個版本記錄, 讀出max{rec.txid|rec.commit_ts < T.start_ts}對應的value.
關注點有:
- 創建tentative版本事務T1已經在CT表中有valid提交信息, 但沒有roll forward安裝該版本. 當前事務T要代勞rollback.
- T1在CT表的提交信息為invalid, 跳過該版本.
- T1在CT表中無提交信息且T1.txid屬於老TM上的過期epoch, 當前事務用原子操作(HBase版本的CAS操作checkAndMutate)在CT表中將T1殺死(invalidation).
- T1在CT表中無提交信息且T1.txid屬於當前epoch, 因為競爭條件存在, 當前事務re-read DT表中的item, 才能判斷是tentative版本是否提交.
讀操作分支很多, 講述不及偽代碼清晰, 請直接看論文. 這裡著重說明兩個競爭條件.
兩個競爭條件
(1). 殺死epoch過期的事務需要用CAS操作, 否則:
t0時刻: T2發現item a的由T1創建的過期tentative版本, 在CT表中找不到提交記錄(T1.txid, T1.commit_ts);
t1時刻: 記錄(T1.txid, T1.commit_ts)插入CT表; t2時刻: T3讀item a, 也發現T1創建的tentative版本, 在CT表中查找到T1的提交信息, 安裝tentative版本, 並返回a在T1創建的版本.t3時刻: T2將(txid=T1.txid, invalid=true)記錄插入到CT表中, 撤銷T1的版本(已經被T3安裝), 返回item a在T1之前的版本.事務T1在老TM上, 事務T2和T3在新TM上; T1不管是abort還是commit, 都應該發生在T2和T3之前; T2和T3要麼都讀到item a的T1之前的老值, 要麼都讀到T1之後的新值. 然而此情況下, T2和T3讀取item a的值不一致.所以, Client查看過期tentative是否提交時, 使用putIfAbsents(採用checkAndMutate)函數原子性地invalidate過期事務: atomic { put (txid=T1.txid, invalid=true) if T1.txid absents from CT; }
如果返回成功, 則成功地殺死了事務T1, 撤銷tentative版本; 如果失敗, 則說明T1.txid在CT表中已經存在, 要麼已經被其他事務殺死, 要麼T1已經提交.
(2). epoch未過期的tentative版本在CT中不存在對應的提交消息需要re-read DT表確認是否提交, 否則
t0時刻: T2發現item a的有epoch未過期的tentative版本, T1是創建者.
t1時刻: T1完成了put(T1.txid, T1.commit_ts) into CT, 安裝tentative版本, delete (T1.txid, T1.commit_ts) from CT的操作.t2時刻: T2在CT表中查看T1.txid是否存在, 發現不存在, 誤認為T1尚未提交.顯然這種情況下, 重讀DT表中tentative版本的commit_ts, 就能夠真正確認是否提交.總結
保證MVCC的SI隔離級別, 也需要客戶端的參與. 主要原因是3相繼發生的事件: commit_ts分配, 提交標記落盤和apply完成之間存在事件差, 會有其他並發事務的操作穿插(interleaving), 破壞隔離性.
IX. 結論
Omid基於HBase提供ACID-compliant cross-row transaction處理能力, 採用MVCC並發控制協議, 隔離級別為SI, 適用於衝突稀少的場景.
雖然單一的中心化的TM服務的scalability受限於單機的處理能力, 開發者針對tps-oriented的workload做了很多優化, 提升了性能; 同時TM也具有高可用方案.
原文中有提到version的GC, 總而言之GC依賴於HBase的version GC, 採用vacuum worker方式. 至於GC的策略, 請讀者參閱原論文.
scalability是TM最大問題, 可選的方案有:
- TM輕量化, 大多數工作移交給Client, 比如Percolator;
- TM可scale-out, 增加TM的節點來提升負載能力.
Omid的CT表和DT表可以選擇不同的存儲系統, 比如採用更加高效LSM實現, 或者內存資料庫.
參考文獻
[1] O. Shacham, F. Perez-Sorrosal, E. B. P. O. T. 15th, 2017, 「Omid, reloaded: scalable and highly-available transaction processing,」 usenix.org.
[2] http://omid.incubator.apache.org/
[3] https://hbase.apache.org/acid-semantics.html
[4] D. Peng, F. D. OSDI, 2010, 「Large-scale Incremental Processing Using Distributed Transactions and Notifications.,」 usenix.org.
[5] J. Gray and A. Reuter, Transaction Processing. Elsevier, 1992.
[6] J. C. Corbett, J. Dean, M. Epstein, A. Fikes, C. F. A. T. on, 2013, 「Spanner: Googles globally distributed database,」 dl.acm.org.
[7] H. Garcia-Molina, Ullman, JD, Widom, J.,「Database Systems, the complete book.」 2002.
[8] J. Bentley, D. M. D. C. Conference, 1999, 「Data compression using long common strings,」 ieeexplore.ieee.org.
[9] H. Berenson, P. Bernstein, J. Gray, J. M. A. SIGMOD, 1995, 「A critique of ANSI SQL isolation levels,」 dl.acm.org.
[10] http://15721.courses.cs.cmu.edu/spring2016/slides/11.pdf
[11] W. Lin, M. Yang, L. Zhang, and L. Zhou, 「PacificA: Replication in log-based distributed storage systems,」 2008.
[12] http://15721.courses.cs.cmu.edu/spring2016/slides/05.pdf
[13] https://en.wikipedia.org/wiki/Time_of_check_to_time_of_use
[14] https://en.wikipedia.org/wiki/Staged_event-driven_architecture
推薦閱讀:
TAG:資料庫事務 |