提高Spark姿勢水平 No.73
長文。巨長。
本文的依據是我學習整個Spark的學習歷程。在這裡,我會從幾個方面來跟大家一起討論。Spark 是什麼?Spark 跟 Hadoop 有什麼淵源?Spark 有哪些方便的組件?什麼場景下用 Spark ,如何使用?以及用什麼樣的姿勢來學習 Spark 會比較好?
Apache Spark? is a fast and general engine for large-scale data processing.
Spark就是一個能夠快速以及通用的處理大規模數據的引擎。怎麼理解這句話呢? Spark 就是一個處理引擎,它提供了類似 map , reduce , groupBy,persist 這些操作,來方便地對數據進行各種各樣的並行處理。它以一個有向無環圖來定義一個應用,方便對任務的容錯和重試處理。它定義了一個叫 RDD 的彈性數據結構,將所有的數據和中間結果都儘可能緩存在內存中,形成一個分散式內存數據集。 然後為什麼說它只是一個處理引擎呢?從數據源角度看, Spark 可以從 HBase、ElasticSeach、Hive 等渠道獲取。從運行資源角度看, Spark 可以跑在 Spark集群,Hadoop 集群 ,Mesos 集群上,所以它只是一個處理引擎。至此它擁有了快速的,通用的屬性,也就成為一個通用的大數據處理引擎。
Spark 和 Hadoop 有什麼淵源?
容我細細道來。說起 Spark ,我們不可不提到它的老前輩 MapReduce 。
MapReduce 是一個編程模型 ,可以實現運行在規模可以靈活調整的由普通機器組成的集群上,一個典型的 MapReduce計算往往由幾千台機器組成、處理以 TB 計算的數據。 這在 Google 發出《MapReduce: Simplified Data Processing on Large Clusters 》這篇論文之前,幾乎是不可想像的。並行計算,容錯機制是那麼的高效和可靠。開源的 Hadoop 就實現了 MapReduce ,以及它的基石分散式文件系統 HDFS (Hadoop Distribute File System),也即是 《Google File System》 的開源版實現。
既然這麼高效那為什麼還會出現 Spark 呢?一個巨大的原因是,Hadoop 把數據的中間結果放到了HDFS 上了,這就導致處理的過程雖然非常可靠,但是耗時也非常非常長。當初寫 Spark 是因為需要進行進行大規模數據的機器學習,總所周知機器學習需要不斷訪問數據不斷訪問數據不斷迭代,這對於 MapReduce 來說是致命的,效率很慢,所以實現了Spark。
那麼Spark發展至今,有哪些方便的組件呢?如下圖。
val datas:DataFrame = hc.sql("SELECT SEX,TALL FROM PERSONS");val model = LogisticRegressionWithSGD.train(datas, 50) KafkaSteaming.createStream(x => model.predict(x.SEX));
短短几行代碼可能就涵蓋了,Spark SQL,MLLib,SparkSteaming 。這幾個組件分別是幹啥的呢?Spark Core Engine 提供了最基礎的操作,如 map , reduce , reduceBy , groupBy 等基礎操作,提供了 RDD 和有向無環圖的管理結構,提供了容錯機制。
SparkSQL 提供了對於 Hive,HBase等資料庫的操作,以 SQL 作為統一的查詢規範進行數據訪問。不僅如此 Spark 還提供了 DataFrame 的操作方式將數據的操作抽象化為對對象的操作。
MLLib 提供了機器學習相關的流水線處理 Pipline ,以及實現了絕大部分機器學習的組件,如 LinearRegression 、GBDT、LogisticRegression、SVM等,可以非常方便地用於大規模數據的機器學習。
GraphX 提供了大規模的圖處理及圖計算演算法,其中有傳統的 stronglyConnectedComponent 強直通性演算法,也有實現了 PageRank 的新型的 Pregel 分散式圖計算框架,以及實現了 Label Propagation Algorithm 的機器學習標籤傳播演算法。
而 SparkSteaming 則提供了批量流計算,用於接收來自 Kafka 或者 Twritter 消息中間件的數據,進行實時處理。
那麼我們應該在什麼場景下使用 Spark ,以及如何使用呢?
1、有錢的時候
Spark 需要非常多非常多的內存,比 MapReduce 多多了,MapReduce 只是需要少量的內存和大量硬碟,所以跑 Spark 來說會比較貴。
2、迫切需要快速處理大數據量的時候。
如果不是很迫切,那麼使用 Hadoop 和 Hive 可能更加合適,因為它們也可以完成絕大部分的數據處理,並不是一定要用 Spark。
3、需要處理大規模圖的時候
當前做巨大圖計算的引擎來說,Spark 可以說是最合適的。
4、其他的計算框架需要 Spark 作為計算引擎的時候。
比如Hive on Spark,比如 Impala 。Spark 可以作為一個分散式計算引擎嵌入到其他計算系統中。
Spark 運行架構是怎樣的?
Spark 任務由 Driver 提交 Application 給 Master ,然後由 Master 將 Application 切分成多個 Job ,然後調度 DAG Scheduler 將 Task 切分成多個 stage ,分配給多個 Worker,每個Worker 接收到 TaskSet 任務集後,將調度 Executor 們進行任務分配,每個 Executor 都有自己的 DataSet 用於計算。通訊是使用akka。
Driver會記錄所有stage的信息。要是stage切分過多,那佔用Driver的內存會非常多。若task運行的stage失敗,默認會進行4次重試,若4次重試全部失敗,SparkContext會停止所有工作。
Driver也會記錄stage的運行時間,如果task運行的stage時間太久,Driver可能會認為這個job可能失敗了,會重新分配一個task給另外一個Executor,兩個task都會同時跑,誰先跑完誰交差,另外一個只有被幹掉的份。
從運行模式來看,Spark有這麼幾種方式可以運行。
- local
- mesos
- standalone
- yarn-client
- yarn-cluster
下面一個一個來解剖它們。
local,顧名思義,是跑在本地的,指將Driver和Executor都運行在提交任務的機器上。 local[2] 代表啟動兩個線程來跑任務, local[*]代表啟動任意數量需要的線程來跑Spark任務。
Mesos是Apache下的開源分散式資源管理框架,它被稱為是分散式系統的內核。Mesos最初是由加州大學伯克利分校的AMPLab開發的,後在Twitter得到廣泛使用。
Spark on mesos,是指跑在mesos平台上。目前有兩個模式可以選擇,粗粒度模式(CoarseMesosSchedulerBackend)和細粒度模式(MesosSchedulerBackend)。粗粒度模式下,Spark任務在指定資源的時候,所分配的資源將會被鎖定,其他應用無法share。在細粒度模式下,Spark啟動時Secheduler只會分配給當前需要的資源,類似雲的想法,不會對資源進行鎖定。
Spark on standalone,是指跑在 Spark 集群上。Spark集群可以自成一個平台,資源由Spark來管理,不藉助任何外部資源,若在測試階段可以考慮使用這種模式,比較高效,但是在生產環境若有多個任務,不太建議使用這種方式。
Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度,它的引入為集群在利用率、資源統一管理和數據共享等方面帶來了巨大好處。
Spark on yarn,是指跑在Hadoop集群上。Hadoop提供的yarn是一個比較好的資源管理平台,若項目中已經有使用Hadoop相關的組件,建議優先使用yarn來進行資源管理。
將Spark任務提交到yarn上同樣有兩個模式,一種是yarn-client,一種是yarn-cluster。
yarn-client將SparkContext運行在本地,Driver也運行在本地,這種模式一般不推薦,因為在分配Driver資源的時候,提交的機器往往並不能滿足。
yarn-cluster,將任務提交到Hadoop集群上,由yarn來決定Driver應該跑在哪個機器,SparkContext也會運行在被分配的機器上,建議使用這種模式。
無論是yarn-client還是yarn-cluster,都是在yarn平台的管理下完成,而Spark on yarn目前只支持粗粒度方式(Hadoop2.6.0),所以在任務多,資源需求大的情況下,可能需要擴大Hadoop集群避免資源搶佔。
Spark 使用的時候有哪些坑呢,如何使用呢?
00000:Spark on yarn 啟動的時候一直在 waiting。
第一種可能,隊列資源不足,所有的資源都在被其他同學佔用ing。
解決方案:把那個同學打暈,然後kill application。
第二種可能,設置的 Driver 或者 executor 的 cpu 或者 memory 資源太多。
解決方案:看看隊列資源有多少,拿小本本計算一下究竟能申請多少,然後給自己一巴掌。如果集群資源太爛,單台機器只有16G,那你就別動不動就申請一個 driver 或者 executor 一下就來個32G了。
第三種可能,程序報錯了,一直在重試。
解決方案:滾回去debug去。
特別提醒:Spark 默認是有10%的內存的 overhead 的,所以會比你申請的多10%。
00001:Driver 拋 OutOfMemory Exception
很明顯嘛,就是driver的內存不足了,嘗試看一下哪個地方佔用內存太多了,特別提醒一下,stage的切分,task的分配都是在Driver 分配的,數量太多的話會爆炸。以及collect(),count()等這些操作都是需要把所有信息搜集到driver端的。
解決方案:打自己一巴掌,然後看dump日誌或者看看自己的代碼,是不是哪裡搞錯了。如果一切都很合理,那就提高一下內存吧。
00010:executor 拋 OutOfMemory Exception
內存不足。哇,那這個可能性就多了。
是不是數據量太大 partition 數太少?太少了就多加點 partition 。
是不是產生數據傾斜了?解決它。
是不是某個操作,比如flatmap,導致單個executor產生大量數據了?
是不是請求的內存實在太少了?
00011:executor 拋 is running beyond physical memory limit
哇,你的集群資源超分配了,物理資源被其他團隊用了,GG思密達,快拿起40米長木棍。把那個人抓出來。
00100:driver 或者 executor 拋 OutOfMemoryError: GC overhead limit exceeded
出現內存泄漏或者真的集群資源不夠,一直在full GC超過次數限制了,仔細檢查一下哪些東西佔用內存太多,是不是RDD持久化佔用太多資源了,還是數據有傾斜,還是真的partition太少導致每個partition數據太多。
00101:運行 GraphX 的時候 driver 拋 OutOfMemory Exception
運行 GraphX 的時候因為會迭代計算,所以會產生非常非常多 stage,這時候 driver 可能沒有足夠多的內存可以放下這些 stage 和 task 的狀態,很容易就出現 OOM。這時候能做的事情就四個,第一增加 driver 內存,第二降低 partition 的數量,第三減少 Pregel 的迭代次數減少stage的數量,第四優化圖的切分策略。
00110:大對象網路傳輸慢。
放棄默認的 Java Serialization,改用 Kryo Serialization。
小對象用廣播的模式,避免全局 join。
GraphX 來說改善圖切分策略,減少網路交互。
GraphX 盡量單台機器配置高點,可以盡量讓更多的 partition 在同一台機器。
00111:SparkStreaming 消息堆積。
調整窗口時間,著重分析消息消費過程的瓶頸並調整相應的資源,盡量降低單筆計算時間。然後根據收集的信息再根據吞吐量來決定窗口時間。
01000:進行 Shuffle 的時候報 Spark Shuffle FetchFailedException。
數據在 Shuffle 的時候中間數據量過大或者數據產生了傾斜,導致部分目標機器崩潰。通過分析崩潰的時候的任務,改善數據 Shuffle 時的數據分布情況。
那應該以怎樣的姿勢來學習 Spark 呢?
Step1:環境搭建
自己開虛擬機或者雲主機搭好Hadoop,Spark,Hive,sqoop,原生的那種,可以直接實現為偽分散式。可以試試我下面推薦的這種版本搭配,這是CDH5.8.x的個組件版本組合。
Apache Zookeeper 3.4.5 + Apache Hadoop 2.6.0 + Apache HBase 1.2.0 + Apache Hive 1.1.0 + Apache Spark 1.6 + Apache Pig 0.12.0 + Apache Flume 1.6.0 + Apache Kafka 0.9.0 + Apache Sqoop 1.4.6/1.99.5
注意事項:版本搭配要合理,不然會有很多坑。
Step2:數據準備
使用Spark生成500萬數據,包含[身份證,手機號,日期,性別,身高]五個欄位。其中身份證格式為6位,手機號為6位,日期為yyyy-mm-dd格式,性別為F、M,身高為160-190隨機數。手機號其中有100萬必須為10086,都必須為合理的隨機數據,不能是序列,結果保存到Hive表中。
Step3:MapReduce初探
使用 Step2 產生的數據進行關係生成,相同手機號的人認為有關係,可以使用RDF 的組織方式進行保存。直接過濾空數據以及6位號碼相同的,若發現同一號碼導致的關係數超過3000,剔除,結果先保存到Hive中。
Step4:內存調優及演算法實現
利用 Step3 生成的關係,利用GraphX和SLPA進行社區劃分,可以借鑒 Spark 的 Pregel 框架,閱讀 LPA 實現的源碼。當然希望你能改造為SLPA,SLPA需要自己實現,要注意思考GraphX的局限性。
Step5:去做更多的事,實現更多的功能。
投入到更多的數據處理工作中,繼續一些億級別的調優以及機器學習的學習中,不斷學習不斷提高自己的水平。scala 是 Spark 的原生語言,但是現在也有很多的數據分析師在使用 R 在 Spark 上進行數據分析,也有數據開發工程師使用 Python 在 Spark 進行機器學習,甚至還實現一些深度學習的演算法,打通了Tensorflow,這些在未來都可能成為主流。
最後總結一下 Spark:
1、Spark 跟 MapReduce 如出一轍。
2、Spark 很快,是一個計算引擎,其他組件都是可拔插的,但需要耗費很多內存很多錢。
3、不是非得用Spark,還有很多其他的解決方案。
4、Spark 需要循序漸進學習,不斷實踐,純理論沒什麼用。
完。你不點個贊么?
推薦閱讀:
※如何解釋spark mllib中ALS演算法的原理?
※Spark里的DAG是怎麼回事?
※Spark源碼分析(1) RDD是什麼
※Scala快速入門系列:聲明變數、控制結構與函數、常用數組操作