SparkSQL中的Sort實現(二)

上節說到SparkSQL中將Sort分為兩部分:第一部分是基於boundary的range repartition,通過採樣確定每個partition的上下界,然後將數據按照上下界重新分區;第二部分呢,就是在分區內將數據進行排序。完成這兩步之後,整張表中的數據就變成有序的了。

分區內的排序是藉助UnsafeExternalRowSorter來完成的,而它其中又嵌套了一個UnsafeExternalSorter,排序的主要邏輯靠後者來完成,前者主要做一些轉換和適配工作。

用到的數據結構

UnsafeInMemorySorter

這是一個用來存儲記錄數據的地址和prefix的Sorter。其實質上是一個LongArray,奇數為存儲記錄的地址,偶數位置存儲記錄的prefix。地址和prefix都是Long類型。

對記錄進行排序時,首先判斷兩條記錄的prefix是否相等,如果根據prefix就可以判斷出兩條記錄的大小,那麼直接返回結果。否則從相應的地址中拿出兩條記錄進行進一步的比較。相對於真實存儲的記錄來說,他們的地址和prefix佔用的空間都比較小,在比較時遍歷較小的數據結構更有利於提高cache命中率。

比較的詳細過程在後文描述。

另外值得注意的一點時,其存儲底層LongArray只有一部分內存實際被存儲使用,另外一部分是給排序預留的,預留的空間比例是1/2(Radix Sort)或者1/3(Tim Sort)。

UnsafeExternalSorter

首先,這是一個可以獨立申請內存(當然也包括釋放內存和spill到磁碟)的MemoryConsumer。有一個鏈表用來保存其使用到的MemoryBlock (allocatedPages),另外一個鏈表維護spill到磁碟的信息(spillWriters),例如溢出文件路徑、blockId、溢出的記錄數等。並有各種數據結構跟蹤當前的內存使用和分配情況。

其次,這個數據結構的主要功能用於給記錄排序。主要的輔助類在上面已經提到了,是一個基於地址和prefix的內存排序器。

這裡面用到的其他輔助數據結構有:

prefix comparator

一個簡單的prefix比較器。比較使用prefix computer計算出來的相對於每條記錄的prefix值。其實就是比較他們相對應的signed long和unsigned long值。關於prefix computer的細節下面會介紹。

record comparator

根據ordering構造,用來比較兩條記錄的大小。當prefix comparator不適用時,則使用此比較器對記錄進行全維度比較。

數據的插入

數據的插入分為兩個部分,一個是真實數據的插入,另外一個是其索引的更新。後者這個點很重要,每個數據的索引包含了真是數據插入的地址和其prefix值,當記錄可以只用prefix來比較大小時,那麼真實數據的排序就轉換為了索引的排序。數據排序和指針排序哪個更快,一目了然。

prefix computer

一條記錄的prefix值是由prefix computer計算而來。計算一個Row的prefix值,輸入的是一個Row對象,輸出的是一個Long值。顧名思義,這個東東的計算基準是SortOrder的第一個維度在Row對象上的映射,映射的元素根據數據類型來計算出一個Long值。具體的數據類型和計算方法如下表:

Data TypeHow to computeBooleantrue -> 1, false -> 0Integer強轉LongDate/Timestamp強轉LongFloat/Double先強轉Double(如果是Float),Double轉Long比較複雜,參考這裡:graphics : radix tricksString其實就是Array[Byte],會根據JVM是大/小端、4/8位元組對齊分別對待,具體參考UTF8String.getPrefix()函數Binary參考ByteArray.getPrefixDecimal(小精度)參考Decimal.toUnscaledLong函數Decimal(大精度)按照Double來處理

數據插入

真實數據插入時調用Platform的API,這個地方也有一個優化,就是在插入數據之前先寫一個length值,表示數據佔用的空間大小,方便後續取數據使用。

當然每插入一條數據前,都會check一下索引頁和數據頁是否有足夠的空間來容納新的數據,沒有的話會擴大內存。這邊申請內存使用的就是Spark新引入的統一內存管理,內存不足時會擠壓其他MemoryConsumer的內存,迫使其溢出到磁碟,從而獲取可用內存。

注意,在判斷索引數組是否有足夠內存時,實際上是判斷總佔用內存是否達到了其1/2(使用radix sort)或者2/3(使用Tim sort)。

真實數據插入時也基本上是重複以上流程:)

索引頁和數據頁的內容以及對應關係如下圖所示:

數據的排序

無spill

radix sort

如果能只根據prefix將數據進行排序的話,radix sort是最好用的排序演算法。

radix sort也就是俗稱的基數排序,這個演算法牛逼了,能達到O(w*n)的時間複雜度(其中n是需要排序的記錄數,w是排序的位數),參考:Radix sort。

上面已經介紹過了,在ordering只有一個維度時,可以通過只排序索引數組的方式達到將數據數組排序的效果。具體的實現就是通過radix sort將prefix進行排序,通過prefix和address的關聯,實現數據數組的排序。

radix sort的排序過程分以下幾步:

  1. 統計每個位上的直方圖,計算此位上每個桶的插入offset

  2. 從頭遍歷prefix值,將其插入到指定桶的位置,並將桶的位置加1

  3. 計算高一位的直方圖,重複1、2步直到將所有的位排序完畢

在SparkSQL的排序中,使用了位計算的技巧將計算使用的空間最小化,此處簡略圖示如下:

當然,這裡只取了4條數據演示,實際排序的數據也只有兩位數。而在實際的SparkSQL的排序中,數據的個數即記錄的總個數,而prefix的位數也有64位,分為每8位一組。

另外,前面提到的radix sort的情況下,索引數組為什麼要空一半的原因也揭示了。沒錯,就是為了排序過程中做臨時存儲空間使用的。

Tim sort

當不能簡單的使用prefix對數據進行排序的時候,Radix sort就不適用了,這裡使用Tim sort進行排序,其中的comparator首先比較prefix的值,當兩條記錄的prefix值相同時,才取兩條記錄,按照row comparator進行大小的比較。

Tim sort的原理在這裡。它是插入排序和歸併排序的結合。分為兩個步驟:

首先,將待排序的數據分為幾個片段分別進行插入排序,片段的長度由待排序數據的長度而定,在Spark SQL中這個數字是16-32之間。排序後的片段索引(片段開始位置和片段長度)存儲在堆棧中。

其次,將排序好的n個片段兩兩進行歸併排序,歸併排序的選擇方式為:

記棧頂自上而下的三個元素分別是X/Y/Z,當X長度小於等於Y+Z的長度時,取Y和X/Z中較短的那個進行歸併排序,直到所有片段歸併排序完成。

Tim sort的排序過程如下圖所示:

有spill

前面說的排序都是基於插入時沒有發生spill的情況,當插入時有數據spill到磁碟時,Spark SQL是怎麼處理的呢?

其實很簡單,在spill時,會將已經讀取的數據進行排序(排序過程和上述一致),排序後寫到磁碟內,這樣每次spill的數據就都是有序的了。

形成多個spill文件後,再讀取時,類似於merge sort的思路,先比較他們幾個文件的第一個元素的大小,最小的那個先讀取,Spark SQL中是使用priority queue來實現的。如下圖所示:

後記

可以看出,SparkSQL在排序中做了大量的優化,spill機制保證了內存不足時可以將數據spill到磁碟上,從而保證穩定性;使用Radix sort在數據僅有一維排序時,縮小排序所訪問的內存空間,提高cache命中率;在多維排序時,也使用了目前對真實數據排序時間複雜度最低的Tim sort演算法。

當然在整個SparkSQL中更是使用了大量的內存管理方式使得計算過程中的額外內存消耗更低,減少GC頻率;在整個排序過程中也盡量使用位運算的方式來進行信息統計和收集,提高CPU指令效率。經過這些優化,SparkSQL的排序基本上可以滿足業界關於在大量數據下的場景應用。

聲明:本文首發於CSDN,為原創,版權歸本人所有,禁止用於任何商業目的,轉載請註明出處:http://blog.csdn.net/asongoficeandfire/article/details/53728182

推薦閱讀:

5分鐘 Hadoop Shuffle 優化
嫌棄Hadoop?可能是你的打開方式有問題
大數據那些事(8):HIVE之初期起
大數據那些事(28):卡夫卡們的故事
技術分享丨HDFS 入門

TAG:Spark | 大数据 | Hadoop |