Spark SQL中的聚合(Aggregate)實現

Spark SQL中的聚合(Aggregate)實現

Sort Based Aggregate

首先來說說實現比較簡單(但實際執行起來卻不簡單)的Sort Based Aggregate。顧名思義,這是一種基於排序的聚合實現,在進行聚合之前,會根據grouping key進行分區以及分區內排序,將具有相同grouping key的記錄都分布在同一個partition內且前後相鄰,聚合時只需要順序遍歷整個分區內的數據,即可以得到聚合結果。

如圖:

途中可以看出清晰的執行流程,包括重分區和分區內排序,最後遍歷每個分區,對每個key region做一個聚合,得到最終結果。

Hash Based Aggregation

即基於Hash Map的聚合實現,一般情況下使用java的HashMap進行聚合即可,但java的內置類在使用上存在著冗餘信息多、佔用空間大、受GC影響的問題。故SparkSQL中實現了自己的一套HashMap,裁減掉不必要的功能,使空間利用更加充分,內存申請/釋放也更有效率。

基於HashMap的數據聚合

這個HashMap的實現類為BytesToBytesMap,由兩部分配合完成功能:存儲地址和hash code的長整形數組,以及存儲真實數據的一系列內存頁(即Spark內存管理的基本單位)。如下圖:

左邊Ptr Array即保存指針的一個長整形數組,右邊的內存頁保存數據,當然,每條記錄除了key和value之外,還保存著其他一些諸如長度等信息,此圖中不詳細標明。當有數據插入時,首先根據key的hash code判斷Ptr Array相應的位置(keyN addr)是否已有數據填充:

  1. 如果沒有數據填充則在當前memory page追加一個欄位,存儲該key對相應的key和value值,並在Ptr Array中存儲相應的地址和hash code;

  2. 如果已有位置填充,則判斷其hash code(存儲在與keyN addr相鄰的keyN hashcode中)以及key addr指向位置對應的key值是否相等,若兩者均相等,則視為對本key的Aggregate Buffer的更新;

  3. 若已有位置有填充且key值不相同,則視為hash值碰撞,使用基於三角數的二次探測法(關於二次探測法的更多信息,可以參考:Quadratic_probing)尋找下一個可用位置,直到取到可用位置為止。

聚合後的數據讀取

當所有數據都處理完畢,根據數據的key值將其和相應位置的agg buffer進行聚合之後,就需要讀取聚合後HashMap中所存儲的內容了。

讀取其實很簡單,就是對memory pages逐頁讀取,每個page的開頭都記錄著本頁所包含的記錄總數。每個數據頁讀取完畢後釋放本頁佔用的內存。每條記錄讀取出來之後,根據result projection生成結果記錄。

switch to sort base aggregation

正如上節所說,hash based aggregation是的後台是基於內存管理機制實現的一個HashMap,當內存不足時,HashMap無法存儲更多的數據,此時就需要將記錄溢出到磁碟上,用時間來交換空間。

溢出

溢出基於UnsafeExternalSorter和UnsafeInMemorySorter實現,是不是有些眼熟?這些實現機制和之前將的Spark SQL Join(參考:SparkSQL中的Sort實現(二))是一致的,基於HashMap中的記錄構建InMemSorter和ExternalSorter之後,就講數據溢出到磁碟上,只在內存中保存溢出後的文件指針。

讀取

此處即讀取多個ExternalSorter中的記錄,即讀取溢出到各個磁碟上的文件中的記錄。讀取其實也很簡單,因為各個文件中的記錄都是有序的,所以可以從頭開始處理文件中的記錄,將屬於同一個key的進行聚合,當前key的所有元素讀取完畢後即可輸出對應當前key的result projection。

總的來說,當內存不足時,需要藉助磁碟空間來實現聚合。但溢出到磁碟面臨的主要問題是從屬於同一個key的記錄可能被分散到不同的溢出文件中。Spark SQL使用溢出前排序,讀取時順序讀取的方法很好的解決了這個問題。

預聚合(偏聚合)

為了減少shuffle數據量以及reduce端的壓力,通常Spark SQL在map端會做一個partial aggregate(通常叫做預聚合或者偏聚合),即在shuffle前將同一分區內所屬同key的記錄先進行一個預結算,再將結果進行shuffle,發送到reduce端做一個匯總。如下圖:

聲明:本文為原創,首發於CSDN,版權歸本人所有,禁止用於任何商業目的,轉載請註明出處:Spark SQL中的聚合(Aggregate)實現


推薦閱讀:

BIM360、Buzzsaw與SAAS模式
作為全行業的IT基礎,遊戲領域是如何擁抱雲計算的?
混合雲規模應用大勢所趨,服務商選擇是關鍵
原來這才是亞馬遜AWS站在雲計算之巔的秘訣

TAG:Spark | 大数据 | 云计算 |