Spark排序的原理?

比如這個testcase

test("sortByKey") {

val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)

assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0)))

}

全部的歸併結果的代碼應該在RDD.scala的Array.concat(results: _*)

def collect(): Array[T] = withScope {

val results = sc.runJob(this, (iter: Iterator[T]) =&> iter.toArray)

Array.concat(results: _*)

}

請問下到各個節點的worker分散式排序的代碼在哪能找到?多謝!


抱著 @連城 大大的大腿學習,現學現賣寫一下我現在的理解。不知道對不對,求大大們指正~

更新:又請教了一下 @連城 大大,之前寫的回答果然不夠準確,來更新下突出一下重點。

TL;DR:比較新的Spark內建的分散式排序採用的是TeraSort風格的演算法,跟MapReduce、Hadoop MapReduce里的sort相似。核心思路是:要進行分散式排序的時候,每個計算節點(「map side」)對輸入的數據做重新分片(repartition),分片採用range partitioning使得重新分片後的數據自然在分片之間是排好序的,而此時每個分片內的數據並不一定排好序了(事實上此時它們還可能分布在不同的計算節點上);重新分片的數據通過shuffle交換到新的計算節點上(「reduce side」),每個reducer把自己負責的分片內的數據做排序,然後就達到了全局排序的效果。為了讓重新分片後的數據盡量均勻地分布到每個分片中,在map side進行實際重新分片之前會另外先跑一個採樣任務對要排序的key做分析,統計數據分布情況,然後估算出能讓數據均勻分布的range是怎樣的,然後根據這個估算來實際進行重新分片。

畫個圖來簡易圖解題主所舉的例子:

=========================================================

RDD層面的 sortByKey() 的實現

就比較新的Spark來說,題主要找的 RDD 層面的 sortByKey() 實現在Spark的 shuffle 當中的。

sortByKey() 採用的是tera-sort風格的實現,其自身包含一個使用range partitioning的shuffle操作。它會

  • Stage 0:Sample。創建 RangePartitioner,先對輸入的數據的key做sampling來估算key的分布情況,然後按指定的排序切分出range,儘可能讓每個partition對應的range里的key的數量均勻。計算出來的 rangeBounds 是一個長為 numPartitions - 1 的Array[TKey],記錄頭 numPartitions - 1 個partition對應的range的上界;最後一個partition的邊界就隱含在「剩餘」當中。
  • Stage 1:Shuffle Write。開始shuffle,在map side做shuffle write,根據前面計算出的rangeBounds來重新partition。Shuffle write出的數據中,每個partition內的數據雖然尚未排序,但partition之間已經可以保證數據是按照partition index排序的了。
  • Stage 2:Shuffle Read。然後到reduce side,每個reducer再對拿到的本partition內的數據做排序。

這樣完成之後,partition之間的數據在map side就保證有排序,而每個partition內的數據在reduce side也保證有排序,就達到了全局排序的效果。

如果在 sortByKey() 後面跟一個 collect() 調用,則它會按照partition index的順序獲取結果數據,最後再把這些數組合併起來,就在本地得到了全局排序後的大數組。

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn"t work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

這塊代碼要跟隨 RangePartitioner、ShuffleRDD 然後到 ShuffleDependency、SortShuffleManager、SortShuffleWriter、BlockStoreShuffleReader 這麼一路走過來就可以看到完整的過程。

可見 sortByKey() 的實現的重頭戲都在range partitioning和shuffle當中。具體到Spark中的shuffle實現,較新版本的Spark使用的是sort-based shuffle。具體實現,map side的shuffle write在 SortShuffleWriter 類中,會通過 ExternalSorter 來寫出按照partition index排序的、按照partition切分的shuffle數據;reduce side的shuffle read在 BlockStoreShuffleReader 類中,在 sortByKey() 的場景下會通過一個 ExternalSorter 來對收到的本partition內的數據做排序。

要注意的是「sort-based shuffle」中的「sort」跟我們這裡關心的 sortByKey() 的sort並不是一回事,只是shuffle自身的一個實現手段,其中的按partition index做的sort只是為了提高shuffle的整體效率;讓重新分片後partition之間有序的重點還是在range partitioning上。

想了解更多背景信息的同學:友商Cloudera在2年多之前發過一篇博文講解當時的Spark(Spark 1.1)的shuffle實現以及他們當時要給Spark貢獻的新full sort-based shuffle實現:Improving Sort Performance in Apache Spark: It』s a Double - Cloudera Engineering Blog。在當前版的Spark里RDD層面的sort-based shuffle跟那篇博文里的描述還是相關的,所以還是值得一讀。

回過頭來看題主的例子:

Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ "_/
/___/ .__/\_,_/_/ /_/\_ version 2.3.0-SNAPSHOT
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala&> val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at &:24

scala&> val sorted = pairs.sortByKey()
sorted: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at sortByKey at &:26

scala&> sorted.toDebugString
res0: String =
(2) ShuffledRDD[3] at sortByKey at &:26 []
+-(2) ParallelCollectionRDD[0] at parallelize at &:24 []

可以看到 sortByKey() 背後是由 ShuffleRDD 所實現的。如果我們進一步執行 sorted.collect() ,就可以在Spark UI里看到所執行的RDD的一些可視化信息和統計信息。

首先到localhost:4040打開Spark UI。去到Stages標籤頁,看所有執行了的stage的狀況:

這個表格中的Stage 0~2就跟上文的介紹一一對應:Stage 0是Sample,Stage 1是Shuffle Write,Stage 2是Shuffle Read,其結果輸入到 collect() 做最後歸併。有興趣的同學可以留意一下每個stage的 shuffle read 和 shuffle write 兩列數據的特徵。

點進每個stage的鏈接,讓我們通過RDD可視化來看看Stage 0,Sample:

可以看到它的RDD結構是由 ParallelCollectionRDD 和兩個 MapPartitionsRDD 構成的。其中 ParallelCollectionRDD 對應 sc.parallelize(...) 調用,而後面的兩個 MapPartitionsRDD 則是由 RangePartitioner 為了sampling而創建的,簡單說是這麼一回事:

val rdd = sc.parallelize(...) // created ParallelCollectionRDD
val sketched = rdd.map(_._1) // created 1st MapPartitionsRDD to get the keys
.mapPartitionsWithIndex { (idx, iter) =&> // created 2nd MapPartitionsRDD to sample the keys
// do sampling on the keys
}.collect() // finally trigger execution and collect results to an array

收集到的採樣數據會用於估算接下來在Stage 1重新partition的時候,每個partition應該接受哪一段範圍的key,盡量讓每個partition都得到差不多的數據量。

然後Stage 1,Shuffle Write:

不熟悉Spark的同學光看這個可視化圖可能會覺得很納悶:這個圖裡沒有shuffle啊?這是因為Spark里的stage邊界通常就是shuffle——stage的入口一側可能有shuffle read,stage的出口一側可能有shuffle write。對於一個shuffle來說,做shuffle write的一側由於歷史原因就叫做map side,做shuffle read的一側就叫做reduce side。當然這歷史原因就是它們在 MapReduce 里的命名方式。

Stage 1隻做了一件事,就是把 sc.parallelize(...) 輸入的數據通過shuffle write來達到partition之間排好序的狀態,此時每個mapper里每個partition內的數據還是尚未排序的。

最後是Stage 2,Shuffle Read:

可以看到 ShuffleRDD 被劃在這個stage里了,但還是要注意shuffle動作是由map side和reduce side協作完成的,不要因為 ShuffleRDD 被劃分在後面就覺得shuffle只是在後面發生的。

Stage 2在shuffle read過程中,每個executor會把自己收到的partition的數據做排序。完成後就達到了partition內也排好序的狀態。全局看到就達到全局排好序了。

=========================================================

上面說的都是Spark RDD層面講解。然而在比較新的Spark里,一般的應用層運算都應該用Spark SQL / DataFrame API來跑而不是直接下到RDD層面跑。所以題主的問題如果在Spark SQL層面上再講一次會更有用。來看看邏輯上相同的操作如果用Spark SQL來寫會是怎樣的。先用DataFrame API演示:

scala&> val pairsDF = Seq((1, 0), (2, 0), (0, 0), (3, 0)).toDF("a", "b")
pairsDF: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala&> val sortedDF = pairsDF.orderBy("a)
sortedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]

scala&> sortedDF.collect
res1: Array[org.apache.spark.sql.Row] = Array([0,0], [1,0], [2,0], [3,0])

看看Spark SQL內部是如何處理這個查詢的。下面是 sortedDF 的logical plan 和physical plan:

scala&> sortedDF.explain(true)
== Parsed Logical Plan ==
"Sort ["a ASC NULLS FIRST], true
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]

== Analyzed Logical Plan ==
a: int, b: int
Sort [a#5 ASC NULLS FIRST], true
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Sort [a#5 ASC NULLS FIRST], true
+- LocalRelation [a#5, b#6]

== Physical Plan ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]

同樣的查詢用Spark SQL的SQL版來寫也完全一樣(留意兩者生成的optimized logical plan以及physical plan都是完全一樣的):

scala&> val pairsDF = Seq((1, 0), (2, 0), (0, 0), (3, 0)).toDF("a", "b")
pairsDF: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala&> pairsDF.createOrReplaceTempView("pairs")

scala&> val sortedDF = spark.sql("SELECT * FROM pairs ORDER BY a")
sortedDF: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala&> sortedDF.explain(true)
== Parsed Logical Plan ==
"Sort ["a ASC NULLS FIRST], true
+- "Project [*]
+- "UnresolvedRelation `pairs`

== Analyzed Logical Plan ==
a: int, b: int
Sort [a#5 ASC NULLS FIRST], true
+- Project [a#5, b#6]
+- SubqueryAlias pairs
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]

== Optimized Logical Plan ==
Sort [a#5 ASC NULLS FIRST], true
+- LocalRelation [a#5, b#6]

== Physical Plan ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]

可以看到Spark SQL的版本也是通過range partition + shuffle(exchange)來實現前半段排序(partition之間排好序,partition內尚未排序),然後Sort操作符在reduce side把partition內也給排好序。整體流程跟RDD版的 sortByKey() 是完全一樣的,雖說具體經過的代碼路徑不完全一樣。實際流程也是三步:

  • Stage 0:Sample。跟RDD版一樣通過 RangePartitioner 來做
  • Stage 1:Shuffle Write。跟RDD版一樣底層通過 SortShuffleManager 管理 SortShuffleWriter 來實現
  • Stage 2:Shuffle Read。這裡不是直接由shuffle reader來做排序,而是交給Spark SQL自己的Sort操作符來做,不過底下最終跑的代碼概念上還是很相似的,具體使用的類是 UnsafeExternalRowSorter。

這組DataFrame層面的操作經過Catalyst優化和處理後會生成出下面的RDD結構去實際執行:

scala&> sortedDF.queryExecution.toRdd.toDebugString
res1: String =
(5) MapPartitionsRDD[12] at toRdd at &:28 []
| ShuffledRowRDD[11] at toRdd at &:28 []
+-(4) MapPartitionsRDD[10] at toRdd at &:28 []
| MapPartitionsRDD[6] at toRdd at &:28 []
| ParallelCollectionRDD[5] at toRdd at &:28 []

例子中的physical plan里的Sort操作符有參與whole-stage code generation,這個例子中它對應生成的Java代碼如下:

scala&> sortedDF.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean sort_needToSort;
/* 009 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */ private scala.collection.Iterator& sort_sortedIter;
/* 013 */ private scala.collection.Iterator inputadapter_input;
/* 014 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ sort_needToSort = true;
/* 026 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */ sort_sorter = sort_plan.createSorter();
/* 028 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */ inputadapter_input = inputs[0];
/* 031 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ private void sort_addToSorter() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext() !stopEarly()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */ if (shouldStop()) return;
/* 042 */ }
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ protected void processNext() throws java.io.IOException {
/* 047 */ if (sort_needToSort) {
/* 048 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */ sort_addToSorter();
/* 050 */ sort_sortedIter = sort_sorter.sort();
/* 051 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */ sort_needToSort = false;
/* 056 */ }
/* 057 */
/* 058 */ while (sort_sortedIter.hasNext()) {
/* 059 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */ append(sort_outputRow);
/* 062 */
/* 063 */ if (shouldStop()) return;
/* 064 */ }
/* 065 */ }
/* 066 */ }

執行Spark SQL版例子的各種可視化信息:

Stage 0

Stage 1 Stage 2

SQL

&>_&<


DAGScheduler把一個spark作業轉換成成stage的DAG(Directed Acyclic Graph有向無環圖),根據RDD和stage之間的關係,找出開銷最小的調度方法,然後把stage以TaskSet的形式提交給 TaskScheduler。


推薦閱讀:

HBase可以替代redis嗎?
為什麼(hadoop基準測試中)HDFS寫入速度如此之慢?
Hadoop Streaming模式的優缺點?
hadoop web管理Hue,Ambari 和CM 的區別是什麼?
未來想成為一名大數據架構師,可是不知如何在hadoop spark Storm中糾結?

TAG:演算法 | Hadoop | MapReduce | 大數據 | Spark |