SparkSQL的3種Join實現

引言

Join是SQL語句中的常用操作,良好的表結構能夠將數據分散在不同的表中,使其符合某種範式,減少表冗餘、更新容錯等。而建立表和表之間關係的最佳方式就是Join操作。

SparkSQL作為大數據領域的SQL實現,自然也對Join操作做了不少優化,今天主要看一下在SparkSQL中對於Join,常見的3種實現。

SparkSQL的3種Join實現

Broadcast Join

大家知道,在資料庫的常見模型中(比如星型模型或者雪花模型),表一般分為兩種:事實表和維度表。維度表一般指固定的、變動較少的表,例如聯繫人、物品種類等,一般數據有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。

因為Join操作是對兩個表中key值相同的記錄進行連接,在SparkSQL中,對兩個表做Join最直接的方式是先根據key分區,再在每個分區中把key值相同的記錄拿出來做連接操作。但這樣就不可避免地涉及到shuffle,而shuffle在Spark中是比較耗時的操作,我們應該儘可能的設計Spark應用使其避免大量的shuffle。

當維度表和事實表進行Join操作時,為了避免shuffle,我們可以將大小有限的維度表的全部數據分發到每個節點上,供事實表使用。executor存儲維度表的全部數據,一定程度上犧牲了空間,換取shuffle操作大量的耗時,這在SparkSQL中稱作Broadcast Join,如下圖所示:

Table B是較小的表,黑色表示將其廣播到每個executor節點上,Table A的每個partition會通過block manager取到Table A的數據。根據每條記錄的Join Key取到Table B中相對應的記錄,根據Join Type進行操作。這個過程比較簡單,不做贅述。

Broadcast Join的條件有以下幾個:

1. 被廣播的表需要小於spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M (或者加了broadcast join的hint)

2. 基表不能被廣播,比如left outer join時,只能廣播右表

看起來廣播是一個比較理想的方案,但它有沒有缺點呢?也很明顯。這個方案只能用於廣播較小的表,否則數據的冗餘傳輸就遠大於shuffle的開銷;另外,廣播時需要將被廣播的表現collect到driver端,當頻繁有廣播出現時,對driver的內存也是一個考驗。

Shuffle Hash Join

當一側的表比較小時,我們選擇將其廣播出去以避免shuffle,提高性能。但因為被廣播的表首先被collect到driver段,然後被冗餘分發到每個executor上,所以當表比較大時,採用broadcast join會對driver端和executor端造成較大的壓力。

但由於Spark是一個分散式的計算引擎,可以通過分區的形式將大批量的數據劃分成n份較小的數據集進行並行計算。這種思想應用到Join上便是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,SparkSQL將較大表的join分而治之,先將表劃分成n個分區,再對兩個表中相對應分區的數據分別進行Hash Join,這樣即在一定程度上減少了driver廣播一側表的壓力,也減少了executor端取整張被廣播表的內存消耗。其原理如下圖:

Shuffle Hash Join分為兩步:

1. 對兩張表分別按照join keys進行重分區,即shuffle,目的是為了讓有相同join keys值的記錄分到對應的分區中

2. 對對應分區中的數據進行join,此處先將小表分區構造為一張hash表,然後根據大表分區中記錄的join keys值拿出來進行匹配

Shuffle Hash Join的條件有以下幾個:

1. 分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold所配置的值,默認是10M

2. 基表不能被廣播,比如left outer join時,只能廣播右表

3. 一側的表要明顯小於另外一側,小的一側將被廣播(明顯小於的定義為3倍小,此處為經驗值)

我們可以看到,在一定大小的表中,SparkSQL從時空結合的角度來看,將兩個表進行重新分區,並且對小表中的分區進行hash化,從而完成join。在保持一定複雜度的基礎上,盡量減少driver和executor的內存壓力,提升了計算時的穩定性。

Sort Merge Join

上面介紹的兩種實現對於一定大小的表比較適用,但當兩個表都非常大時,顯然無論適用哪種都會對計算內存造成很大壓力。這是因為join時兩者採取的都是hash join,是將一側的數據完全載入到內存中,使用hash code取join keys值相等的記錄進行連接。

當兩個表都非常大時,SparkSQL採用了一種全新的方案來對錶進行Join,即Sort Merge Join。這種實現方式不用將一側數據全部載入後再進星hash join,但需要在join前將數據排序,如下圖所示:

可以看到,首先將兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區後對每個分區內的數據進行排序,排序後再對相應的分區內的記錄進行連接,如下圖示:

看著很眼熟吧?也很簡單,因為兩個序列都是有序的,從頭遍歷,碰到key相同的就輸出;如果不同,左邊小就繼續取左邊,反之取右邊。

可以看出,無論分區有多大,Sort Merge Join都不用把某一側的數據全部載入到內存中,而是即用即取即丟,從而大大提升了大數據量下sql join的穩定性。

後記

本文介紹了SparkSQL中的3中Join實現,其實這也不是什麼新鮮玩意兒。傳統DB也有這也的玩法兒,SparkSQL只是將其做成分散式的實現。

本文僅僅從大的理論方面介紹了這幾種實現,具體到每個join type是怎麼遍歷、沒有join keys時應該怎麼做、這些實現對join keys有什麼具體的需求,這些細節都沒有展現出來。感興趣的話,可以去翻翻源碼。

聲明:本文為原創,版權歸本人所有,禁止用於任何商業目的,轉載請註明出處:blog.csdn.net/asongofic


推薦閱讀:

下一波計算浪潮
分散式計算和並行計算的區別與聯繫在哪裡?
MapReduce中的map做兩層for循環,效率特別低?
spark的shuffle和Hadoop的shuffle(mapreduce)的區別和關係是什麼?
研一學生~突然搞起了Big Data。在學Hadoop中,突然意識到只會用工具是遠遠不夠的,很想搞懂立面的演算法和思想。請教達人們指點分散式存儲與運算的主要思想與演算法?

TAG:Spark | 分布式计算 | 大数据 |