Flink中的join實現原理
首先假如我們有兩個Dataset,一個Dataset中的數據為用戶信息,另一個Dataset中的數據是站點訪問記錄。
case class PageVisit(url: String, ip: String, userId: Long)case class User(id: Long, name: String, email: String, country: String)
如果想通過這兩個Dataset獲取來自中國用戶的訪問記錄應該怎麼做?很顯然,把兩個Dataset join一下然後根據country進行過濾即可,join的key選擇userId和id。
val visits: DataSet[PageVisit] = ...val users: DataSet[User] = ...// filter the users data setval chinaUsers = users.filter((u) => u.country.equals("ch"))// join data setsval chinaVisits: DataSet[(PageVisit, User)] = // equi-join condition (PageVisit.userId = User.id) visits.join(chinaUsers).where("userId").equalTo("id")
很美好對不對,不過join背後的實現可就不像用起來這麼簡單了。
在Flink中,join的實現分為兩個階段,第一個階段被稱為Ship階段,而第二個階段被稱為Local階段,這個Ship階段很像mapreduce中的shuffle,就是將具有相同join key的element shuffle到同一個節點,不然沒法在task節點進行本地join。ship階段有兩種不同的策略,一種是根據join key把element重新進行partition。
第二種策略是將一個完整的Dataset shuffle到每個task節點。比如R S兩個Dataset,圖中示例就是將R shuffle到每個節點。
可以看到ship階段需要將大量的數據緩存在內存中,數據量如果很大的時候會造成頻繁的內存溢出,所以為了應對這種問題,前面也提到過Flink應用一套更高效的內存管理機制,而且Flink中特有的序列化方式能將java 對象大大壓縮從而節約內存空間。
Ship階段完成之後,每個節點都要開始進行本地join,也就是上面所說的Local階段。
在這個階段,Flink借用了資料庫中常見的兩種join方式,一種是Sort-Merge-Join,另一種是Hybrid-Hash-Join。
所謂Sort-Merge-Join其實就是先將兩個Dataset中的元素進行排序,然後利用兩個游標不斷的從Dataset中取具有相同join key的元素出來。而Hybrid-Hash-Join相對來說更複雜一些,先來看簡單的Hash-Join,就是將一個較小的Dataset裝進哈希表,然後遍歷另一個Dataset,根據join key進行配對。但是如果較小的那個Dataset中的數據量也很大根本沒法裝進內存中呢?這時候就要將Dataset再進行partition,然後在各個partition上進行簡單的Hash-Join。Hybrid-Hash-Join在此基礎上有個小的優化,就是在內存足夠的情況下,將一些數據一直保存在內存中。
因為不同階段有不同的策略,所以可以構建出多種的join策略,具體選擇哪種join策略,在Flink中會根據初始Dataset的數據量對各個join策略進行預估,然後選擇預估時間最小的那個策略。
參考資料:Peeking into Apache Flinks Engine Room
Peeking into Apache Flinks Engine Room常見的join演算法 - CSDN博客
推薦閱讀:
※Flink yarn-session啟動流程分析
※Flink源碼解析-從API到JobGraph
※Apache Flink和Apache Spark有什麼異同?它們的發展前景分別怎樣?
※Apache Storm 1.1.0 中文文檔 | ApacheCN
TAG:Flink |