標籤:

請教一個spark的小白問題?

舉個簡單的例子,假如求一個數組的和,每個executor計算了個分區內的數據的和,那麼最後誰來把每個executor上的結果再加起來呢?


題主問的是Spark,不過既然是這個時間點問的,那我就從Spark SQL(而不是Spark RDD)的角度出發來舉例說說。

用Apache Spark 2.2.0來做實驗,默認配置下起個local mode的Spark Shell來跑下面這個簡單的做global aggregate的查詢例子:

DataFrame API版

import org.apache.spark.sql.functions._

val df = spark.range(10).agg(sum(id))
df.show
df.explain

或者SQL版:

val df = spark.sql("select sum(id) from range(10)")
df.show
df.explain

兩者是完全等價的。它們運行的結果都會是:

+-------+
|sum(id)|
+-------+
| 45|
+-------+

== Physical Plan ==
*HashAggregate(keys=[], functions=[sum(id#46L)])
+- Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(id#46L)])
+- *Range (0, 10, step=1, splits=8)

運算結果是45沒錯,而下面的query plan結構就回答了題主的問題:

每個executor計算了個分區內的數據的和,那麼最後誰來把每個executor上的結果再加起來呢?

簡單來說就是還是在某個executor上做了最終的求和。首先Spark會起一堆task(跑在executor上)來跑最初每個partition內的求和,然後會經過一個SinglePartition模式的ShuffleExchange來把所有上游task的數據集中到一個executor上的一個task來做處理,最後在driver上拿到的就是最後這個executor輸出的一行結果。

當然要是願意的話可以寫很多形狀不同的查詢來達到一樣的結果,而中間過程可能會很不同。例如說如果這樣寫個grouping aggregate的例子:

val df = spark.sql("select sum(id) as s from range(100) group by id % 10")
df.collect().map(_(0).asInstanceOf[Long]).sum

這樣的話Spark會起task分別對10個group做了求和之後,把這10個分組的求和都返回給driver:

scala&> df.show
+---+
| s|
+---+
|450|
|520|
|510|
|540|
|500|
|460|
|480|
|530|
|470|
|490|
+---+

於是driver上我們自己調用 df.collect() 把這10個和拿到一個數組(Array[org.apache.spark.sql.Row])里之後自己對這個數組再求一次和也可以得到最後的結果。這的話executor上就不會做最最終的求和而是都返回給driver去讓用戶代碼自己做。

(不過要留意的是,這個情況下Spark對每個分組還是會做partial aggregate -&> final aggregate這種結構的查詢:

== Physical Plan ==
*HashAggregate(keys=[(id#23L % 10)#25L], functions=[sum(id#23L)])
+- Exchange hashpartitioning((id#23L % 10)#25L, 200)
+- *HashAggregate(keys=[(id#23L % 10) AS (id#23L % 10)#25L], functions=[partial_sum(id#23L)])
+- *Range (0, 100, step=1, splits=8)


spark有兩種操作: Transform, Action. 任何想要得到結果的Action, 都是在Driver中執行的.

你說的操作, 具體的代碼應該是: `rdd.reduce(_ + _)`, 即將所有元素求和.

下面是源碼分析, 看不懂的話, 記住上面的話就行.

===============================源碼分界線============================

源碼基於`Spark 2.1`, 估計對`2.X`都適用, 畢竟`core`的代碼是比較穩定的.

`rdd.reduce`:

`SparkContext.runJob`:

`DAGScheduler.runJob`:

`DAGScheduler.submitJob`:

`JobWaiter.taskSucceeded`:

這些就是全部的源碼. `submitJob`只是提交了下事件, 在事件完成時會自動調用`JobWaiter`的`taskSucceeded`方法. 在`taskSucceeded`方法中同步執行了`resultHandler`, 作為閉包函數, 更新了`jobResult`, 最終將結果返回.


最後的匯總是在Driver端進行的.所以你提交spark任務的時候,也需要看數據量給driver端合適的內存.同時,你可以看看spark的運行原理.深入了解一下各個角色做了什麼事情,比如DAG的劃分等等.


推薦閱讀:

spark spark.yarn.executor.memoryOverhead 太小錯誤?
Spark中,Dataset和DataFrame的性能比較?
Spark SQL 和 Shark 在架構上有哪些區別?將來會合併嗎?

TAG:Spark |