如何利用spark快速計算笛卡爾積?
在在spark集群裡面 需要計算a*b的笛卡爾積 a為一列
b為一列 我將a作為一個rdd b作為一個rdd 直接調用cartesian方法,當數據量比較大的時候計算非常的慢,看sparkUI shuffleread的數據量較大,想請教下有可能是哪裡出了問題或者有什麼更好的方法在分散式上處理計算笛卡爾積。
可以試試用 DataFrame API。
笛卡爾積映射到 SQL/DataFrame 上就是一個不帶 join 條件的 inner join。DataFrame API 相對於 RDD API 的好處在於整體執行引擎基於 Spark SQL 的 Catalyst optimizer,並且可以利用上 project Tungsten 引入的各種執行層面的優化。Spark 新近版本中無 join 條件的 inner join 被編譯為 CartesianProduct 時採用的已經是 UnsafeCartesianRDD 了。
此外,如果是兩個 DataFrame 中有一個顯著小於另一個,可以考慮將小的 DataFrame 廣播出去從而避免大量 shuffle。以下是 1.6 的 spark-shell 中的示例:import org.apache.spark.sql.functions.broadcast
val large = sqlContext.range(5)
val small = sqlContext.range(2)
large.join(broadcast(small)).show()
+---+---+
| id| id|
+---+---+
| 0| 0|
| 0| 1|
| 1| 0|
| 1| 1|
| 2| 0|
| 2| 1|
| 3| 0|
| 3| 1|
| 4| 0|
| 4| 1|
+---+---+
這個問題在實際開發中經常遇到,一般我都會廣播小表來降低網路開銷。
如果是兩個大表我想過兩個方案,但還沒有實際測試性能效果。
1.其中一個表的數據有冗餘: 將冗餘的部分提取出來,從而壓縮其中一個表。例如一個表原始數據是user1,1user2,2user3,1user4,1
則user1,user3,user4可以合併為一條記錄,四條記錄可以合併為兩條,再廣播合併後的小表,在每個partition上再還原出來。2.利用redis作為緩存存儲其中一個表: 將廣播變為批量存儲到redis,再在每個partition上批量讀取redis,從而縮減了原來廣播時先收集到driver再廣播到所有partition的網路消耗。研究過相關課題,看到delta join的問題格外的親切
更好的方法,看你問的是那一層面的優化了
spark本身研究不多,以前研究這個課題是基於hadoop
無條件情況計算量無法優化,因為計算量就是m*n這麼多
大小表的情況,可以通過廣播小表等方法優化
兩個都是大表的情況,分散式計算模型中數據傳輸量可以優化,但是很有限
這是幾年前研讀論文的總結,現在說不定過時了,但希望對你有幫助
論文:https://github.com/effyroth/paper/blob/master/10487_080605_200915055%E6%9C%89%E5%AD%A6%E5%8F%B7.pdf推薦閱讀:
※Scala 究竟好在那裡?
※如何學好Scala?請給出一條科學的時間線
※矽谷之路38:深入淺出Spark(三)什麼是Standalone
※數據科學家必知必會的7款Python工具,你會幾個?
※大數據分析美國大選——Twitter數據情感分析