Flink源碼解析之State的實現

State的實現

Flink通過非同步的checkpoint機制來實現流式處理過程中的容錯,簡單來講就是定時地將本地的狀態序列化到一個持久存儲中,當出現錯誤是通過恢複檢查點的狀態來實現容錯的,對於機制的詳細介紹可以參見這個鏈接,本章主要講述flink源碼中state的實現。

StateBackend

flink將我們代碼中的操作轉化為一個個的task放在taskmanager中執行。其中每個task一個線程,每個task中包含了一個AbstractInvokable對象,task中的主要的邏輯就是調用AbstractInvokable.invoke()的方法。在流式處理中對應的實現都是繼承自StreamTask。StreamTask中包含一個OperatorChain,並規定了一些hook函數來定義生命周期。AbstractStateBackend就是在這裡進行初始化的。Flink的實現一共提供了3種state backend:MemoryStateBackend,FsStateBackend, 和RocksDBStateBackend。其中MemoryStateBackend主要用於調試開發中使用,後面2者適合於生產環境中使用。這三種實現均繼承自AbstractStateBackend類。在StreamTask的初始化過程中會初始化OperatorChain中所有的operator,而AbstractKeyedBackend也是在這個過程中初始化的,而且一個StreamTask中只有一個,其實這個也符合常理,因為多個key by操作產生的operator必然在不同的線程中。

AbstractStateBackend

AbstractStateBackend的定義比較簡單,它要求子類實現三個介面:

createStreamFactory: 為某個job的某個operator創建CheckpointStreamFactory,實際上只有FsStateBackend實現了這個介面,RocksDBStateBackend的實現需要傳入一個AbstractStateBackend,通常情況下是FsStateBackend

createKeyedStateBackend: 創建一個keyed state backend用來管理keyed state

createOperatorStateBackend: 創建一個OperateStateBackend,AbstractStateBackend提供了一個實現,就是在內存中的一個Map,key是state名字,value就是list state,之所以是list state的原因請看State類型。

FsStateBackend

FsStateBackend會在checkpoint的時候將state存儲到一個持久化的存儲中,比如hdfs。對於keyed state,FsStateBackend簡單地將其放置在內存中,因此對於比較大的state,FsStateBackend有可能會引起比較嚴重的GC。而且snapshot的過程是一個同步的過程,也就是說將state序列化並寫入文件系統的過程是一個同步的過程,過大的state同樣可能引起阻塞。

RocksDBStateBackend

RocksDBStateBackend與FsStateBackend不同,它將key state存儲在rocksdb中。這種做法有2個好處:首先,比較大的state不會引起GC;其次,由於rocksdb支持snapshot操作,因此snapshot的過程是一個非同步的過程,不會阻塞。但是rocksdb實現的state也會有幾個可能的缺點:首先,state的update和get操作都會有一個序列化和反序列化的過程,因此效率會比直接在內存中低;其次,rocksdb使用LSM-Tree作為存儲結構,compaction過程需要大量的讀寫磁碟,因此也有可能引起阻塞,對於這個問題一個可能優化是使用memory filesystem,將所有的存儲放在內存中;最後,rocksdb的tuning比較複雜,在普通的SATA硬碟上表現如何還需要確認。

使用State

State類型

flink中的state可以從2個緯度來劃分:是否屬於某個key(key state或者operator state),是否受flink管理(raw state或者managed state)。key state用於在KeyedStream中保存狀態,operater state用於在普通的非key中保存狀態。managed state是指被flink所管理的狀態。raw state是被應用程序自己管理,flink會調用相應的介面方法來實現狀態的restore和snapshot。

flink自從1.2.0開始加入了一個新的功能:dynamic scalable state。它的目的就是當flink的operator的parallism改變之後仍然能從上一次的checkpoint或者savepoint恢復。為了達到這個目的,key state被按照key group組織起來,其實這是一個與pre-sharding類似的想法,比如說key group有128個,那麼flink就會將key state分成128份來存儲,這樣只要你的processor的並行度是小於128的,總能分到一部分的key group state。對於operator state,flink會將state按照list組織起來,從而當processor的並行度改變的時候仍然能得以恢復。

Managed Key State

我們首先看看flink 是如何處理key state的。首先要說明一點,在flink中所有的key state都是managed,而且通過RuntimeContext中的getState方法來獲得。如果在普通的沒有經過key by的stream中使用這個RuntimeContext.getState方法則會拋出異常。前面我們講過不同的state backend是如何存儲key state的,這裡不再贅述。在flink中,每當一個新的數據到來時,系統會調用setCurrentKey方法,這樣當我們訪問state的時候可以知道系統對應的哪個key。

Managed Operator State

要獲得managed operator state,用戶需要實現CheckpointedFunction介面,並在initializeState方法中初始化state,這裡獲得list state。前文中我們說到過使用operator state都是保存在內存中。

Raw Operator State

對於需要自己管理operator state的用戶可以實現ListCheckpointed介面,這個介面要求用戶提供的state都是list。在實際的實現中,這個state在snapshot的時候仍然會被放入OperatorStateBackend中去。

Legacy State

在flink 1.2.0之前對於用戶自定義的state需要實現Checkpointed介面,由於這個介面無法被partition,因此這個介面已經被標記為Deprecated。

State 的snapshot和restore

snapshot

本節我們講述flink是如何將state存儲到持久化的存儲中。有了上面給大家講述的概念之後,snapshot這部分的代碼就比較直觀了。代碼的主要邏輯在StreamTask.performCheckpoint這個函數里,基本上就是對每個AbstractStreamOperator調用snapshot函數來。大家要注意的是,雖然裡面用了大量的Future來抽象不同的snapshot過程,但是基本上只有對於RocksDBStateBackend的key state是非同步的,這是因為只有rocksdb支持snapshot操作,其他的backend本質都是map,職能同步進行。當snapshot結束之後,會將state handle發送給job manager。

restore

restore的過程就比較簡單,基本上就是在task初始化的時候通過state handle拉取文件,然後恢復state。


推薦閱讀:

Apache Flink和Apache Spark有什麼異同?它們的發展前景分別怎樣?
Flink源碼解析-從API到JobGraph

TAG:Flink | 分散式計算 |