標籤:

Spark中,Dataset和DataFrame的性能比較?

import org.apache.spark.sql.types._
import org.apache.spark.sql._

object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val sc = spark.sparkContext
val sql = spark.sqlContext

val schema = new StructType(Array(StructField("id",
IntegerType), StructField("age", IntegerType)))
val rdd = sc.parallelize(for (i &<- 5000000.to(1, -1)) yield Row(i, i)) val df = spark.createDataFrame(rdd, schema) var start = System.currentTimeMillis() df.sort("age").show println("DataFrame: "+(System.currentTimeMillis()-start)) val rdd1 = sc.parallelize(for (i &<- 5000000.to(1, -1)) yield User(i, i)) import sql.implicits._ val ds = rdd1.toDS start = System.currentTimeMillis() ds.sort("age").show println("Dataset: "+(System.currentTimeMillis()-start)) } } case class User(name: Int, age: Int) {} 上面這個例子為什麼Dataset會比DataFrame快四五倍, 還是local模式下比較沒意義? Dataset和DataFrame的效率跟Tungsten計劃有沒有關係? spark版本是2.2.0 問題補充: 分別注釋執行或者交換順序執行結論也是一樣的 Dataset: == Physical Plan == *Sort [age#4 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(age#4 ASC NULLS FIRST, 200) +- *SerializeFromObject [assertnotnull(input[0, User, true]).name AS name#3, assertnotnull(input[0, User, true]).age AS age#4] +- Scan ExternalRDDScan[obj#2] DataFrame: == Physical Plan == *Sort [age#11 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(age#11 ASC NULLS FIRST, 200) +- Scan ExistingRDD[id#10,age#11]


@朱詩雄 說的缺少 warm-up 代碼是對的。看到題主補充說對調了 Dataset 和 DataFrame 的代碼後 Dataset 仍然比 DataFrame 快,於是在本地嘗試重現了一下,發現的確如此。仔細看了看,這個 benchmark 還有幾個問題。

第一個問題是構造 df 時顯式用了 Row:

for (i &<- 5000000.to(1, -1)) yield Row(i, i)

這行 code 實際上將會創建至少 2,000 萬個對象(此處 for ... yield 返回結果的容器是個 Vector,不是 lazy 的):

  1. 500 萬 GenericRow(trait Row 的默認實現)
  2. 每個 GenericRow 內各一個 Array[Any],共計 500 萬
  3. 由於數據存在 Array[Any] 內,每行的兩個 Int 被 auto-box 成了 java.lang.Integer,共計一千萬

另一方面,構造 ds 時,同樣情況下對象創建數只有 df 的四分之一(User 內的 Int 不會被 auto-box)。

第二個問題在於 show()。題主這個 benchmark 的本意應該是對 500 萬行數據進行排序然後輸出。然而 show() 默認只顯式 20 行,因此題主的排序代碼實際上類似於 df.limit(20).sort().show(),而 Spark 會將帶 limit 的 sort 自動優化為 top-K 來計算。

題主後來補充在問題中的 query plan 應該是分別通過

df.sort("age").explain()

ds.sort("age").explain()

得到的。而這樣列印出的 physical plan 並不包含 show() 中額外添加的 limit()。要查看實際的 query plan,可以在 main() 的末尾加上 sleep 然後打開 localhost:4040,進入 Spark UI 的 SQL tab 後,可以分別查看兩個 job 對應的 query plan。

DataFrame job 的 plan:

== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#14 ASC NULLS FIRST], true
+- LogicalRDD [id#13, age#14]

== Analyzed Logical Plan ==
id: int, age: int
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#14 ASC NULLS FIRST], true
+- LogicalRDD [id#13, age#14]

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#14 ASC NULLS FIRST], true
+- LogicalRDD [id#13, age#14]

== Physical Plan ==
TakeOrderedAndProject(limit=21, orderBy=[age#14 ASC NULLS FIRST], output=[id#13,age#14])
+- Scan ExistingRDD[id#13,age#14]

Dataset job 的 plan:

== Parsed Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#4 ASC NULLS FIRST], true
+- SerializeFromObject [assertnotnull(assertnotnull(input[0, ammonite.$file.Zhihu$User, true])).name AS name#3, assertnotnull(assertnotnull(input[0, ammonite.$file.Zhihu$User, true])).age AS age#4]
+- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
name: int, age: int
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#4 ASC NULLS FIRST], true
+- SerializeFromObject [assertnotnull(assertnotnull(input[0, ammonite.$file.Zhihu$User, true])).name AS name#3, assertnotnull(assertnotnull(input[0, ammonite.$file.Zhihu$User, true])).age AS age#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
GlobalLimit 21
+- LocalLimit 21
+- Sort [age#4 ASC NULLS FIRST], true
+- SerializeFromObject [assertnotnull(input[0, ammonite.$file.Zhihu$User, true]).name AS name#3, assertnotnull(input[0, ammonite.$file.Zhihu$User, true]).age AS age#4]
+- ExternalRDD [obj#2]

== Physical Plan ==
TakeOrderedAndProject(limit=21, orderBy=[age#4 ASC NULLS FIRST], output=[name#3,age#4])
+- *SerializeFromObject [assertnotnull(input[0, ammonite.$file.Zhihu$User, true]).name AS name#3, assertnotnull(input[0, ammonite.$file.Zhihu$User, true]).age AS age#4]
+- Scan ExternalRDDScan[obj#2]

可見,logical plan 中多了 limit 的操作,而 physical plan 里的 TakeOrderedAndProject 就是 top-K(出於 show() 的實現細節,這裡實際上取了 21 行,此處略去不表)。

由於這個 top-K 優化的存在,真正的 query 執行時間實際上很短,並沒有真的對 500 萬行數據進行排序。那麼時間都花到哪裡去了呢?這跟第三個問題有關。

第三個問題在於兩個 job 各 500 萬行數據都是在 driver 端生成的。在用 for ... yield 生成數據時,500 萬個 Row、500 萬個 User 都是一次性生成完畢的。由於 Spark 的分散式性質,要求所有 task 都可序列化(Serializable)。Spark 在提交 job 之前會預先嘗試做一遍序列化,以確保 task 的確可以序列化。這個檢查即便是 local mode 也不會免除。

所以在這個 benchmark 里,時間都花到數千萬對象的序列化上去了。我在本地用 jvisualvm 簡單 profile 了一下也證實了這點。同時,由於問題一的存在,df 涉及的 object 要多出三倍,序列化消耗的時間自然就比 ds 就更高了。

如果把 df 的生成改成 lazy 的,就可以避免掉這重序列化開銷:

val rdd = sc.parallelize(5000000 to 1 by -1).map { i =&> Row(i, i) }

注意這裡第一步的 5000000 to 1 by -1 返回的是一個 Scala Range,是 lazy 的;第二步用 RDD 的 map 來構造 Row,也是 lazy 的。

同理,ds 的生成可以改為:

val rdd = sc.parallelize(5000000 to 1 by -1).map { i =&> User(i, i) }

這樣一來,由於數據的生成挪到了 executor 端,Spark task 的序列化成本大大縮減,就不會再出現 ds 比 df 快若干倍的問題了。


很多情況下,Dataset 的性能實際上是會比 DataFrame 要來得差的,因為 Dataset 會涉及到額外的數據格式轉換成本。這可以說是 Dataset 為了類型安全而付出的代價。尤其是在 Dataset query 中還內嵌了多個強類型的 Scala closure 的時候,Spark 會插入額外的序列化操作,在內部的 UnsafeRow 格式和 Dataset 攜帶的 Java 類型間反覆倒騰數據。從第二組 plan 中也可以看到比第一組多了一個 SerializeFromObject 節點。但在題主的測試,用例過於簡單,這個對比體現不出來。再者,由於 DataFrame 實際上就是 Dataset[Row],所以也這個 benchmark 里同樣存在從 Row 轉換到 UnsafeRow 的開銷。

可以將題主的代碼修改一下。Top-K 的優化估計不是題主的本意,但其實也不妨礙這個 benchmark。在這個前提下,DataFrame 部分可以改為:

spark.range(5000000, 0, -1).select($"id", $"id" as "age").sort($"age").show()

這裡 spark.range() 直接流式吐出 UnsafeRow,省掉了 Row 到 UnsafeRow 的轉換開銷。

Dataset 部分可以改為:

spark.range(5000000, 0, -1).map { id =&> User(id, id) }.sort($"age")).show()

我在本地用 Ammonite 簡單跑了一下,每個 query 在正式測之前各跑了 30 遍 warm-up,最後結果是:

  • DataFrame: 568
  • Dataset: 601

如果真的要看對所有數據排序的性能,可以將 show() 換成 foreach { _ =&> }。

我這麼做 benchmark 仍然很粗糙。JVM 因為 JIT 等因素,裸寫 benchmark 代碼有很多需要考慮的地方。嚴肅的 JVM 上的 benchmark 建議用 JMH 來做。


你的測試缺少熱身代碼。會收jit和各種lazy初始化的影響。

你把兩段代碼換一下順序,看看是不是結論就相反了。


推薦閱讀:

大數據分析美國大選——Twitter數據情感分析
Rust 重構 Spark 框架需要做哪些準備工作?
內存有限的情況下 Spark 如何處理 T 級別的數據?
Spark 2017歐洲技術峰會摘要(開發人員分類)
如何用形象的比喻描述大數據的技術生態?Hadoop、Hive、Spark 之間是什麼關係?

TAG:Spark |