Flink源碼解析-從API到JobGraph
Streaming API
通常情況下,如果想要使用flink進行並行計算,開發者會把自己的業務邏輯抽象成流式計算的模型,使用flink提供的api定義Job來實現該模型,因此一個flinkjob的生命是從api開始的。我們從一個官網的word count的例子開始,稍做一些修改,代碼如下:
如上便是一個flinkJob編寫的方式:首先,代碼段1獲取一個類StreamExecutionEnvironment的對象,我們稍後會詳細介紹這個類;代碼段2~7便是利用streamming Api來定義自己的Job,方法:
addSource( ... ),flatMap( ... ), keyBy(0),timeWindow( ... ), sum( ... ), print( ... )
就是flink提供的API;代碼段7是提交上面定義的job 。
StreamExecutionEnvironment, API, transformation
我們來看一下類:StreamExecutionEnvironment,有下面一個屬性:
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
容易看出,transformations是個ArrayList, 元素是StreamTransformation的對象,StreamTransformation把用戶通過Streaming API提交的udf(如FlatMapFunction 的對象)作為自己的operator屬性存儲,同時還把上游的transformation作 為input屬性存儲。streaming api, transformation, StreamExecutionEnvironment,三者的關係用一句話概括就是:用戶通過api構造transformation存儲到StreamExecutionEnvironment,如下圖:
有以下幾點需要指出:
- StreamExecutionEnvironment不存儲SourceTransformation, 因為flink不允許提交只有Source的job,而根據其他類型的Transformation的input引用可以回溯到SourceTransformation。
- Stream可以分為兩種類型,一種是繼承DataStream類,另一種不繼承;功能上的區別在於,前者產生transformation(flink會根據transformation的組織情況構建DAG),後者不產生transformation,但是會賦予Stream一些特殊的功能,例如:window, iterate, union等。下圖反映的flink所有的stream類。 從類名字上也能推斷每個類型Stream的功能。
- 一個job可以沒有sink,其他operator也能良好工作。
從Transformation到StreamGraph
StreamGraph是flink客戶端把用戶定義的Transformation(就是存儲在裡面的那個ArrayList< StreamTransformation>)組織成StreamGraph——邏輯上的DAG。代碼段:
env.execute("word count on flink");
做了兩件事情:
- 把Transformation轉化成StreamGraph。
- 把StreamGraph按照一定的原則切分成JobGraph。
此部分我們先來討論1,首先Flink到底有多少類型的transformation呢?如下圖:
Transformation的結構相對簡單,所有的Transformation均繼承自StreamTransformation。先來介紹兩個概念:StreamNode(DAG圖的頂點), StreamEdge(DAG圖的邊)。兩者的關係十分清晰,如下圖:Flink Client會遍歷StreamExecutionEnvironment中的transformations數組,按照用戶定義構建上面的DAG圖。其中:
StreamNode對象裡面存儲了用戶的udf對象、輸入輸出的序列化方法、所有輸入和輸出的StreamAdge對象、該StreamNode對應的transformationID等信息。
StreamAdge對象存儲了上游的StreamNode、下游的StreamNode還有自身的一個edgeId。
從StreamGraph到JobGraph
上文提到過,StreamGraph會被切分成JobGraph,這裡來介紹一下:首先,StreamGraph和JobGraph有什麼區別,StreamGraph是邏輯上的DAG圖,不需要關心jobManager怎樣去調度每個Operator的調度和執行;JobGraph是對StreamGraph進行切分,因為有些節點可以打包放在一起被JobManage安排調度,因此JobGraph的DAG每一個頂點就是JobManger的一個調度單位。假如StreamGraph切分如下圖:
那麼JobGraph的DAG如下圖,綠色實心的兩個頂點是上圖打包在一起的StreamNode:可見,StreamGraph的切分,實際上是逐條審查每一個StreamAdge和改SteamAdge兩頭連接的兩個StreamNode的特性,來決定改StreamAdge兩頭的StreamNode是不是可以打包在一起,flink給出了明確的規則,看面的代碼段:
該方法返回true的時候,兩端的StreamNode才能打包在一起,幾個有趣的條件需要指出:- 下游的StreamNode的輸入StreamAdge的個數必須是1。
- 上游的StreamNode和下游的StreamNode必須有相同的SlotSharingGroup(可以在Api中指定該變數)。
- 上游的StreamNode和下游的StreamNode的必須有相同的並行度(Api可以指定該變數)。
當然其他條件也非常重要,但是在本文,暫不展開。這裡我們來著重介紹一下第一條規則:下游的StreamNode的輸入StreamAdge的個數必須是1。根據這條規則我們至少可以得到下面兩條結論:
- 在一個JobGraphNode里可以包含該的多個StreamNode,這些StreamNode是以樹狀結構組織在一起且只有一顆。
- 兩個StreamNode 、,假設是的上游,從到可能存在多條路徑,下面命題成立:如果從某條路徑上來看和不能打包在一起,那麼肯定不存在一條其他的路徑使二者能夠打包在一起。換句話說,最終JobGraph的結構和遍歷StreamEdge的順序無關,是唯一的。
至此Flink客戶端的工作基本完成,接下來Flink客戶的通過Akka把生成的JobGraph提交給JobManager,JobManager開始根據JobGraph部署工作,接下來詳細介紹下改過程。
從JobGraph到ExecutionGraph
上文提到,在客戶的完成JobGraph的構建之後,將其通過akka提交給JobManager,接下來我們介紹下JobManager怎樣按照JobGraph的規划進行任務調度。
JobManager收到客戶端提交的JobGraph之後,會構建ExecutionGraph;ExecutionGraph的拓撲結構和JobGraph保持一致,只是把JobGraph重構成ExecutionJobGraph,其中按照JobVertex將頂點分裝成ExecutionJobVertex,按照JobEdge將邊封裝成ExecutionAdge,還構建IntermediateResult(中間數據)用來描述節點之間的Data shuffle 。如下圖:
上游節點會把產生的數據寫到IntermediateReslut(下文稱:中間數據集)中,是中間數據集的生產者;下游節點會處理中間數據集產生的數據,是中間數據集的消費者,具體的可能有下面兩種情況:
ExecutionJobGraph有下面幾個特點:- Partition的數量和上游節點的並行度保持一致。
- 下游節點在和上游節點建立連接時,只有POINTWISE和ALL_TO_ALL兩種模式,事實上只有RescalePartitioner和ForwardPartitioner是POINTWISE模式,其他的都是ALL_TO_ALL。默認情況下如果不指定partitioner,如果上游節點和下游節點並行度一樣為ForwardPartitioner,否則為RebalancePartioner ,前者POINTWISE,後者ALL_TO_ALL。
推薦閱讀:
※Apache Beam: 下一代的大數據處理標準
※能同時對實時流數據和離線數據進行(准)實時分析的技術或者框架有那些?
※Spark 2017歐洲技術峰會摘要(流計算分類)