Spark排序的原理?
比如這個testcase
test("sortByKey") { val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0)))}
全部的歸併結果的代碼應該在RDD.scala的Array.concat(results: _*) def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) =&> iter.toArray) Array.concat(results: _*) }請問下到各個節點的worker分散式排序的代碼在哪能找到?多謝!
抱著 @連城 大大的大腿學習,現學現賣寫一下我現在的理解。不知道對不對,求大大們指正~
更新:又請教了一下 @連城 大大,之前寫的回答果然不夠準確,來更新下突出一下重點。
TL;DR:比較新的Spark內建的分散式排序採用的是TeraSort風格的演算法,跟MapReduce、Hadoop MapReduce里的sort相似。核心思路是:要進行分散式排序的時候,每個計算節點(「map side」)對輸入的數據做重新分片(repartition),分片採用range partitioning使得重新分片後的數據自然在分片之間是排好序的,而此時每個分片內的數據並不一定排好序了(事實上此時它們還可能分布在不同的計算節點上);重新分片的數據通過shuffle交換到新的計算節點上(「reduce side」),每個reducer把自己負責的分片內的數據做排序,然後就達到了全局排序的效果。為了讓重新分片後的數據盡量均勻地分布到每個分片中,在map side進行實際重新分片之前會另外先跑一個採樣任務對要排序的key做分析,統計數據分布情況,然後估算出能讓數據均勻分布的range是怎樣的,然後根據這個估算來實際進行重新分片。
畫個圖來簡易圖解題主所舉的例子:
=========================================================
RDD層面的 sortByKey() 的實現
就比較新的Spark來說,題主要找的 RDD 層面的 sortByKey() 實現在Spark的 shuffle 當中的。
sortByKey() 採用的是tera-sort風格的實現,其自身包含一個使用range partitioning的shuffle操作。它會
- Stage 0:Sample。創建 RangePartitioner,先對輸入的數據的key做sampling來估算key的分布情況,然後按指定的排序切分出range,儘可能讓每個partition對應的range里的key的數量均勻。計算出來的 rangeBounds 是一個長為 numPartitions - 1 的Array[TKey],記錄頭 numPartitions - 1 個partition對應的range的上界;最後一個partition的邊界就隱含在「剩餘」當中。
- Stage 1:Shuffle Write。開始shuffle,在map side做shuffle write,根據前面計算出的rangeBounds來重新partition。Shuffle write出的數據中,每個partition內的數據雖然尚未排序,但partition之間已經可以保證數據是按照partition index排序的了。
- Stage 2:Shuffle Read。然後到reduce side,每個reducer再對拿到的本partition內的數據做排序。
這樣完成之後,partition之間的數據在map side就保證有排序,而每個partition內的數據在reduce side也保證有排序,就達到了全局排序的效果。
如果在 sortByKey() 後面跟一個 collect() 調用,則它會按照partition index的順序獲取結果數據,最後再把這些數組合併起來,就在本地得到了全局排序後的大數組。
/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
* order of the keys).
*/
// TODO: this currently doesn"t work on P other than Tuple2!
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
這塊代碼要跟隨 RangePartitioner、ShuffleRDD 然後到 ShuffleDependency、SortShuffleManager、SortShuffleWriter、BlockStoreShuffleReader 這麼一路走過來就可以看到完整的過程。
可見 sortByKey() 的實現的重頭戲都在range partitioning和shuffle當中。具體到Spark中的shuffle實現,較新版本的Spark使用的是sort-based shuffle。具體實現,map side的shuffle write在 SortShuffleWriter 類中,會通過 ExternalSorter 來寫出按照partition index排序的、按照partition切分的shuffle數據;reduce side的shuffle read在 BlockStoreShuffleReader 類中,在 sortByKey() 的場景下會通過一個 ExternalSorter 來對收到的本partition內的數據做排序。
要注意的是「sort-based shuffle」中的「sort」跟我們這裡關心的 sortByKey() 的sort並不是一回事,只是shuffle自身的一個實現手段,其中的按partition index做的sort只是為了提高shuffle的整體效率;讓重新分片後partition之間有序的重點還是在range partitioning上。
想了解更多背景信息的同學:友商Cloudera在2年多之前發過一篇博文講解當時的Spark(Spark 1.1)的shuffle實現以及他們當時要給Spark貢獻的新full sort-based shuffle實現:Improving Sort Performance in Apache Spark: It』s a Double - Cloudera Engineering Blog。在當前版的Spark里RDD層面的sort-based shuffle跟那篇博文里的描述還是相關的,所以還是值得一讀。
回過頭來看題主的例子:
Welcome to
____ __
/ __/__ ___ _____/ /__
_ / _ / _ `/ __/ "_/
/___/ .__/\_,_/_/ /_/\_ version 2.3.0-SNAPSHOT
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala&> val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at &
scala&> val sorted = pairs.sortByKey()
sorted: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at sortByKey at &
scala&> sorted.toDebugString 可以看到 sortByKey() 背後是由 ShuffleRDD 所實現的。如果我們進一步執行 sorted.collect() ,就可以在Spark UI里看到所執行的RDD的一些可視化信息和統計信息。 首先到localhost:4040打開Spark UI。去到Stages標籤頁,看所有執行了的stage的狀況:
res0: String =
(2) ShuffledRDD[3] at sortByKey at &
+-(2) ParallelCollectionRDD[0] at parallelize at &
這個表格中的Stage 0~2就跟上文的介紹一一對應:Stage 0是Sample,Stage 1是Shuffle Write,Stage 2是Shuffle Read,其結果輸入到 collect() 做最後歸併。有興趣的同學可以留意一下每個stage的 shuffle read 和 shuffle write 兩列數據的特徵。
點進每個stage的鏈接,讓我們通過RDD可視化來看看Stage 0,Sample:
可以看到它的RDD結構是由 ParallelCollectionRDD 和兩個 MapPartitionsRDD 構成的。其中 ParallelCollectionRDD 對應 sc.parallelize(...) 調用,而後面的兩個 MapPartitionsRDD 則是由 RangePartitioner 為了sampling而創建的,簡單說是這麼一回事:
val rdd = sc.parallelize(...) // created ParallelCollectionRDD
val sketched = rdd.map(_._1) // created 1st MapPartitionsRDD to get the keys
.mapPartitionsWithIndex { (idx, iter) =&> // created 2nd MapPartitionsRDD to sample the keys
// do sampling on the keys
}.collect() // finally trigger execution and collect results to an array
收集到的採樣數據會用於估算接下來在Stage 1重新partition的時候,每個partition應該接受哪一段範圍的key,盡量讓每個partition都得到差不多的數據量。
然後Stage 1,Shuffle Write:
不熟悉Spark的同學光看這個可視化圖可能會覺得很納悶:這個圖裡沒有shuffle啊?這是因為Spark里的stage邊界通常就是shuffle——stage的入口一側可能有shuffle read,stage的出口一側可能有shuffle write。對於一個shuffle來說,做shuffle write的一側由於歷史原因就叫做map side,做shuffle read的一側就叫做reduce side。當然這歷史原因就是它們在 MapReduce 里的命名方式。
Stage 1隻做了一件事,就是把 sc.parallelize(...) 輸入的數據通過shuffle write來達到partition之間排好序的狀態,此時每個mapper里每個partition內的數據還是尚未排序的。
最後是Stage 2,Shuffle Read:
可以看到 ShuffleRDD 被劃在這個stage里了,但還是要注意shuffle動作是由map side和reduce side協作完成的,不要因為 ShuffleRDD 被劃分在後面就覺得shuffle只是在後面發生的。
Stage 2在shuffle read過程中,每個executor會把自己收到的partition的數據做排序。完成後就達到了partition內也排好序的狀態。全局看到就達到全局排好序了。
=========================================================
上面說的都是Spark RDD層面講解。然而在比較新的Spark里,一般的應用層運算都應該用Spark SQL / DataFrame API來跑而不是直接下到RDD層面跑。所以題主的問題如果在Spark SQL層面上再講一次會更有用。來看看邏輯上相同的操作如果用Spark SQL來寫會是怎樣的。先用DataFrame API演示:
scala&> val pairsDF = Seq((1, 0), (2, 0), (0, 0), (3, 0)).toDF("a", "b")
pairsDF: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala&> val sortedDF = pairsDF.orderBy("a)
sortedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: int, b: int]
scala&> sortedDF.collect
res1: Array[org.apache.spark.sql.Row] = Array([0,0], [1,0], [2,0], [3,0])
看看Spark SQL內部是如何處理這個查詢的。下面是 sortedDF 的logical plan 和physical plan:
scala&> sortedDF.explain(true)
== Parsed Logical Plan ==
"Sort ["a ASC NULLS FIRST], true
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Analyzed Logical Plan ==
a: int, b: int
Sort [a#5 ASC NULLS FIRST], true
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Sort [a#5 ASC NULLS FIRST], true
+- LocalRelation [a#5, b#6]
== Physical Plan ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]
同樣的查詢用Spark SQL的SQL版來寫也完全一樣(留意兩者生成的optimized logical plan以及physical plan都是完全一樣的):
scala&> val pairsDF = Seq((1, 0), (2, 0), (0, 0), (3, 0)).toDF("a", "b")
pairsDF: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala&> pairsDF.createOrReplaceTempView("pairs")
scala&> val sortedDF = spark.sql("SELECT * FROM pairs ORDER BY a")
sortedDF: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala&> sortedDF.explain(true)
== Parsed Logical Plan ==
"Sort ["a ASC NULLS FIRST], true
+- "Project [*]
+- "UnresolvedRelation `pairs`
== Analyzed Logical Plan ==
a: int, b: int
Sort [a#5 ASC NULLS FIRST], true
+- Project [a#5, b#6]
+- SubqueryAlias pairs
+- Project [_1#2 AS a#5, _2#3 AS b#6]
+- LocalRelation [_1#2, _2#3]
== Optimized Logical Plan ==
Sort [a#5 ASC NULLS FIRST], true
+- LocalRelation [a#5, b#6]
== Physical Plan ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]
可以看到Spark SQL的版本也是通過range partition + shuffle(exchange)來實現前半段排序(partition之間排好序,partition內尚未排序),然後Sort操作符在reduce side把partition內也給排好序。整體流程跟RDD版的 sortByKey() 是完全一樣的,雖說具體經過的代碼路徑不完全一樣。實際流程也是三步:
- Stage 0:Sample。跟RDD版一樣通過 RangePartitioner 來做
- Stage 1:Shuffle Write。跟RDD版一樣底層通過 SortShuffleManager 管理 SortShuffleWriter 來實現
- Stage 2:Shuffle Read。這裡不是直接由shuffle reader來做排序,而是交給Spark SQL自己的Sort操作符來做,不過底下最終跑的代碼概念上還是很相似的,具體使用的類是 UnsafeExternalRowSorter。
這組DataFrame層面的操作經過Catalyst優化和處理後會生成出下面的RDD結構去實際執行:
scala&> sortedDF.queryExecution.toRdd.toDebugString
res1: String =
(5) MapPartitionsRDD[12] at toRdd at &
| ShuffledRowRDD[11] at toRdd at &
+-(4) MapPartitionsRDD[10] at toRdd at &
| MapPartitionsRDD[6] at toRdd at &
| ParallelCollectionRDD[5] at toRdd at &
例子中的physical plan里的Sort操作符有參與whole-stage code generation,這個例子中它對應生成的Java代碼如下:
scala&> sortedDF.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Sort [a#5 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(a#5 ASC NULLS FIRST, 200)
+- LocalTableScan [a#5, b#6]
Generated code: 執行Spark SQL版例子的各種可視化信息:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private boolean sort_needToSort;
/* 009 */ private org.apache.spark.sql.execution.SortExec sort_plan;
/* 010 */ private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
/* 011 */ private org.apache.spark.executor.TaskMetrics sort_metrics;
/* 012 */ private scala.collection.Iterator&
/* 013 */ private scala.collection.Iterator inputadapter_input;
/* 014 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
/* 015 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
/* 016 */ private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;
/* 017 */
/* 018 */ public GeneratedIterator(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */ sort_needToSort = true;
/* 026 */ this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
/* 027 */ sort_sorter = sort_plan.createSorter();
/* 028 */ sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();
/* 029 */
/* 030 */ inputadapter_input = inputs[0];
/* 031 */ this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[1];
/* 032 */ this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[2];
/* 033 */ this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[3];
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ private void sort_addToSorter() throws java.io.IOException {
/* 038 */ while (inputadapter_input.hasNext() !stopEarly()) {
/* 039 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 040 */ sort_sorter.insertRow((UnsafeRow)inputadapter_row);
/* 041 */ if (shouldStop()) return;
/* 042 */ }
/* 043 */
/* 044 */ }
/* 045 */
/* 046 */ protected void processNext() throws java.io.IOException {
/* 047 */ if (sort_needToSort) {
/* 048 */ long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
/* 049 */ sort_addToSorter();
/* 050 */ sort_sortedIter = sort_sorter.sort();
/* 051 */ sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
/* 052 */ sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
/* 053 */ sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
/* 054 */ sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
/* 055 */ sort_needToSort = false;
/* 056 */ }
/* 057 */
/* 058 */ while (sort_sortedIter.hasNext()) {
/* 059 */ UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();
/* 060 */
/* 061 */ append(sort_outputRow);
/* 062 */
/* 063 */ if (shouldStop()) return;
/* 064 */ }
/* 065 */ }
/* 066 */ }
Stage 0
Stage 1 Stage 2
SQL
&>_&<
DAGScheduler把一個spark作業轉換成成stage的DAG(Directed Acyclic Graph有向無環圖),根據RDD和stage之間的關係,找出開銷最小的調度方法,然後把stage以TaskSet的形式提交給 TaskScheduler。
推薦閱讀:
※HBase可以替代redis嗎?
※為什麼(hadoop基準測試中)HDFS寫入速度如此之慢?
※Hadoop Streaming模式的優缺點?
※hadoop web管理Hue,Ambari 和CM 的區別是什麼?
※未來想成為一名大數據架構師,可是不知如何在hadoop spark Storm中糾結?