Spark基礎性能優化
最開始接觸Spark是我剛來摩拜實習的時候,組裡有一個架構師(ccmeng1886)為了找工作把Spark的源碼通讀了三遍ORZ,還一直給我們灌輸學好Spark就能拿高工資的思想。正好年末不是很忙,就接了一個非常簡單的項目,順便學習了一下Spark,全程用Pyspark實現了一下,感覺非常地爽。
為什麼標題叫《Spark基礎性能優化》,因為本文僅僅梳理最基本的Spark性能優化方案,對於較深層次細節調優暫時還沒有精力去研究。對於演算法工程師來說,能夠遵循一些最基本的調優原則,不求甚解這就夠了。
偶然間發現美團點評技術團隊的博客(首頁 - 美團點評技術團隊)已經有人分享過Spark性能優化指南,解決了我很多疑惑,避免遺忘對它進行整理(發現美團點評技術團隊的博客寫得真是非常地接地氣,尤其對我們創業型公司來說,簡直是無法多得的寶藏),總的來說優化方案主要分為開發調優、資源調優、數據傾斜調優、shuffle調優四個部分。
在項目的開發中,最開始我把所有任務丟在一個進程里,需要花兩個多小時才能跑完;後來在Hive建了一張中間表,把預處理的數據存到Hive里,再從Hive讀取數據做預測,這樣只需要20min就能跑完,性能大約提高了6倍。後來想了一下,我的腳本之所以執行得慢有兩個重要的原因:
- 使用了太多shuffle類運算元,如join、group by等;
- 沒有把預處理的數據緩存到內存中,導致後面的action運算元都會重新計算那個RDD。這就是為什麼我僅僅建了一張中間表就使速度提高了6倍,其實有更好的做法,只要cache()一下就好了。
開發調優
最基本的Spark性能優化,就是要優化你的代碼。Spark中rdd內部的轉換關係是一個DAG(有向無環圖),只有出發了action 運算元才開始計算。開始可以畫出計算pipeline,寫得多了腦子自然會形成計算的pipeline,在開發過程中,時時刻刻都要注意一些性能優化的基本原則。
原則一:避免創建重複的RDD,儘可能復用同一個RDD
對於同一份數據不要創建多個RDD,對不同的數據執行運算元操作時要儘可能地復用一個RDD。
原則二:對多次使用的RDD進行持久化
前面已經提到Spark中rdd內部的轉換關係是一個DAG,因此對於一個RDD執行多次運算元時,都會重新從源頭處計算一遍,這種方式的性能是很差的。如下圖所示,其中D和E代表action運算元,在計算D和E時要分別從A開始計算。
最好的方法就是對C進行持久化,此時Spark就會將數據保存到內存或者磁碟中,以後每次對C這個RDD進行運算元操作時,都會直接從內存或磁碟中提取持久化的RDD數據,不會從源頭處重新計算一遍。
原則三:盡量避免使用shuffle類運算元
Spark作業運行過程中,最消耗性能的地方就是shuffle過程。shuffle過程就是將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行groupby或join等操作,reduceByKey、join、distinct、repartition等都屬於shuffle運算元。
至於什麼是shuffle,引用Spark核心設計思想的經典論文「Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing」可以解釋。
RDD就是一個不可變的帶分區的記錄集合,Spark提供了RDD上的兩類操作,轉換和動作。轉換是用來定義一個新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等,動作是返回一個結果,包括collect, reduce, count, save, lookupKey。
shuffle顧名思義就是被打散,是否被shuffle就看計算後對應多少分區,那麼:
- 如果一個RDD的依賴的每個分區只依賴另一個RDD的同一個分區,就是narow,如圖上的map、filter、union等,這樣就不需要進行shuffle,同時還可以按照流水線的方式,把一個分區上的多個操作放在一個Task里進行。
- 如果一個RDD的每個分區需要依賴另一個RDD的所有分區,就是wide,如圖上的groupbykey,這樣的依賴需要進行shuffle,運算量倍增。
原則四:使用預聚合的shuffle操作
如果有些時候實在無法避免使用shuffle操作,那麼盡量使用可以預聚合的運算元。預聚合就是在每個節點本地對相同的key進行一次聚合操作,多條相同的key被聚合起來後,那麼其他節點再拉取所有節點上的相同key時,就會大大減少磁碟IO以及網路傳輸開銷。下圖所示,每個節點本地首先對於相同key進行了聚合。
原則五:使用高性能的運算元
除了shuffle相關的運算元有優化原則之外,其他的運算元也都有著相應的優化原則,不一一陳述。
資源調優
在spark-submit時可以為作業配置合適的資源,理論上來說資源給的越多任務執行得越快,但集群又不是你家的,侵佔太多資源可能會被kill掉。摩拜的Spark部署在集群上,公司內部共享這些資源,Spark的SparkContext先把任務提交到YARN上,再由Application Master創建應用程序,然後為它向Resource Manager申請資源,並啟動Executor來運行任務集,同時監控它的整個運行過程,直到運行完成。
資源相關的參數
num-executors
- 參數說明:該參數用於設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會儘可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。
- 參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。
executor-memory
- 參數說明:該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。
- 參數調優建議:每個Executor進程的內存設置4G~8G較為合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內存量的。此外,如果你是跟團隊里其他人共享這個資源隊列,那麼申請的內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你自己的Spark作業佔用了隊列所有的資源,導致別的同學的作業無法運行。
executor-cores
- 參數說明:該參數用於設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程並行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。
- 參數調優建議:Executor的CPU core數量設置為2~4個較為合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業運行。
給出本次項目執行時所給的資源,還是比較省的,這個可以根據集群的情況進行調整。
spark-submit n --master yarn n --executor-memory 8G n --num-executors 16 n --executor-cores 2 n
數據傾斜調優
數據傾斜就是大部分執行得很快,個別任務執行得很慢。比如進行groupby的時候,某個key對應的數據量特別大,就會發生數據傾斜。
shuffle過程會導致數據傾斜,可能會觸發數據傾斜的shuffle運算元包括distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,因此能很快定位導致數據傾斜的代碼。
遇到數據傾斜時通常會使用repartition這個轉換操作對RDD進行重新分區,重新分區後數據會均勻分布在不同的分區中,避免了數據傾斜。
參考資料
Spark性能優化指南--基礎篇 -Spark性能優化指南--高級篇 -https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdfSpark中的narrow/wide dependency如何理解,有什麼作用?Spark技術在京東智能供應鏈預測的應用推薦閱讀:
https://zhuanlan.zhihu.com/p/31084018推薦閱讀:
※內存有限的情況下 Spark 如何處理 T 級別的數據?
※國內哪些互聯網公司在用mesos,哪些互聯網公司在用yarn,兩者前景如何?
※Scala 在大數據處理方面有何優勢?
※怎樣理解spark中的partition和block的關係?
※Spark里的DAG是怎麼回事?