Spark 學習: spark 原理簡述

主要介紹下自己在學習 spark 當中的一些理解和學習過程中踩到的坑,對 spark 時間效率優化的點做個總結,各位大佬輕拍。

# Spark 原理簡述

Spark 是使用 scala 實現的基於內存計算的大數據開源集群計算環境.提供了 java,scala, python,R 等語言的調用介面.



1 引言

1.1 Hadoop 和 Spark 的關係

   Google 在 2003 年和 2004 年先後發表了 Google 文件系統 GFS 和 MapReduce 編程模型兩篇文章,. 基於這兩篇開源文檔,06 年 Nutch 項目子項目之一的 Hadoop 實現了兩個強有力的開源產品:HDFS 和 MapReduce. Hadoop 成為了典型的大數據批量處理架構,由 HDFS 負責靜態數據的存儲,並通過 MapReduce 將計算邏輯分配到各數據節點進行數據計算和價值發現.之後以 HDFS 和 MapReduce 為基礎建立了很多項目,形成了 Hadoop 生態圈.

  而 Spark 則是UC Berkeley AMP lab (加州大學伯克利分校AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架, 專門用於大數據量下的迭代式計算.是為了跟 Hadoop 配合而開發出來的,不是為了取代 Hadoop, Spark 運算比 Hadoop 的 MapReduce 框架快的原因是因為 Hadoop 在一次 MapReduce 運算之後,會將數據的運算結果從內存寫入到磁碟中,第二次 Mapredue 運算時在從磁碟中讀取數據,所以其瓶頸在2次運算間的多餘 IO 消耗. Spark 則是將數據一直緩存在內存中,直到計算得到最後的結果,再將結果寫入到磁碟,所以多次運算的情況下, Spark 是比較快的. 其優化了迭代式工作負載[^demo_zongshu].

具體區別如下:

  伯克利大學將 Spark 的整個生態系統成為 伯克利數據分析棧(BDAS),在核心框架 Spark 的基礎上,主要提供四個範疇的計算框架:

- Spark SQL: 提供了類 SQL 的查詢,返回 Spark-DataFrame 的數據結構(類似 Hive)

- Spark Streaming: 流式計算,主要用於處理線上實時時序數據(類似 storm)

- MLlib: 提供機器學習的各種模型和調優

- GraphX: 提供基於圖的演算法,如 PageRank

關於四個模塊更詳細的可以參見[^demo_mokuai]這篇博文. 後面介紹的內容主要是關於 MLlib 模塊方面的.

  

Spark 的主要特點還包括:

- (1)提供 Cache 機制來支持需要反覆迭代計算或者多次數據共享,減少數據讀取的 IO 開銷;

- (2)提供了一套支持 DAG 圖的分散式並行計算的編程框架,減少多次計算之間中間結果寫到 Hdfs 的開銷;

- (3)使用多線程池模型減少 Task 啟動開稍, shuffle 過程中避免不必要的 sort 操作並減少磁碟 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)


2 Spark 系統架構

首先明確相關術語[^demo_shuyu]:

- 應用程序(Application): 基於Spark的用戶程序,包含了一個Driver Program 和集群中多個的Executor;

- 驅動(Driver): 運行Application的main()函數並且創建SparkContext;

- 執行單元(Executor): 是為某Application運行在Worker Node上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁碟上,每個Application都有各自獨立的Executors;

- 集群管理程序(Cluster Manager): 在集群上獲取資源的外部服務(例如:Local、Standalone、Mesos或Yarn等集群管理系統);

- 操作(Operation): 作用於RDD的各種操作分為Transformation和Action.

整個 Spark 集群中,分為 Master 節點與 worker 節點,,其中 Master 節點上常駐 Master 守護進程和 Driver 進程, Master 負責將串列任務變成可並行執行的任務集Tasks, 同時還負責出錯問題處理等,而 Worker 節點上常駐 Worker 守護進程, Master 節點與 Worker 節點分工不同, Master 負載管理全部的 Worker 節點,而 Worker 節點負責執行任務.

  Driver 的功能是創建 SparkContext, 負責執行用戶寫的 Application 的 main 函數進程,Application 就是用戶寫的程序.

Spark 支持不同的運行模式,包括Local, Standalone,Mesoses,Yarn 模式.不同的模式可能會將 Driver 調度到不同的節點上執行.集群管理模式里, local 一般用於本地調試.

  每個 Worker 上存在一個或多個 Executor 進程,該對象擁有一個線程池,每個線程負責一個 Task 任務的執行.根據 Executor 上 CPU-core 的數量,其每個時間可以並行多個 跟 core 一樣數量的 Task[^demopingtai].Task 任務即為具體執行的 Spark 程序的任務.

2.1 spark 運行原理

一開始看不懂的話可以看完第三和第四章再回來看.

底層詳細細節介紹:

  我們使用spark-submit提交一個Spark作業之後,這個作業就會啟動一個對應的Driver進程。根據你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節點上啟動。而Driver進程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,美團?大眾點評使用的是YARN作為資源管理集群)申請運行Spark作業需要使用的資源,這裡的資源指的就是Executor進程。YARN集群管理器會根據我們為Spark作業設置的資源參數,在各個工作節點上,啟動一定數量的Executor進程,每個Executor進程都佔有一定數量的內存和CPU core。

  在申請到了作業執行所需的資源之後,Driver進程就會開始調度和執行我們編寫的作業代碼了。Driver進程會將我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,並為每個stage創建一批Task,然後將這些Task分配到各個Executor進程中執行。Task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個Task處理的數據不同而已。一個stage的所有Task都執行完畢之後,會在各個節點本地的磁碟文件中寫入計算中間結果,然後Driver就會調度運行下一個stage。下一個stage的Task的輸入數據就是上一個stage輸出的中間結果。如此循環往複,直到將我們自己編寫的代碼邏輯全部執行完,並且計算完所有的數據,得到我們想要的結果為止。

  Spark是根據shuffle類運算元來進行stage的劃分。如果我們的代碼中執行了某個shuffle類運算元(比如reduceByKey、join等),那麼就會在該運算元處,劃分出一個stage界限來。可以大致理解為,shuffle運算元執行之前的代碼會被劃分為一個stage,shuffle運算元執行以及之後的代碼會被劃分為下一個stage。因此一個stage剛開始執行的時候,它的每個Task可能都會從上一個stage的Task所在的節點,去通過網路傳輸拉取需要自己處理的所有key,然後對拉取到的所有相同的key使用我們自己編寫的運算元函數執行聚合操作(比如reduceByKey()運算元接收的函數)。這個過程就是shuffle。

  當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級別的不同,每個Task計算出來的數據也會保存到Executor進程的內存或者所在節點的磁碟文件中。

  因此Executor的內存主要分為三塊:第一塊是讓Task執行我們自己編寫的代碼時使用,默認是佔Executor總內存的20%;第二塊是讓Task通過shuffle過程拉取了上一個stage的Task的輸出後,進行聚合等操作時使用,默認也是佔Executor總內存的20%;第三塊是讓RDD持久化時使用,默認佔Executor總內存的60%。

  Task的執行速度是跟每個Executor進程的CPU core數量有直接關係的。一個CPU core同一時間只能執行一個線程。而每個Executor進程上分配到的多個Task,都是以每個Task一條線程的方式,多線程並發運行的。如果CPU core數量比較充足,而且分配到的Task數量比較合理,那麼通常來說,可以比較快速和高效地執行完這些Task線程。

  以上就是Spark作業的基本運行原理的說明.

  在實際編程中,我們不需關心以上調度細節.只需使用 Spark 提供的指定語言的編程介面調用相應的 API 即可.

  在 Spark API 中, 一個 應用(Application) 對應一個 SparkContext 的實例。一個 應用 可以用於單個 Job,或者分開的多個 Job 的 session,或者響應請求的長時間生存的伺服器。與 MapReduce 不同的是,一個 應用 的進程(我們稱之為 Executor),會一直在集群上運行,即使當時沒有 Job 在上面運行。

  而調用一個Spark內部的 Action 會產生一個 Spark job 來完成它。 為了確定這些job實際的內容,Spark 檢查 RDD 的DAG再計算出執行 plan 。這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數據已經緩存下來的 RDD),產生結果 RDD 的 Action 為結束 。並根據是否發生 shuffle 劃分 DAG 的 stage.

// parameterval appName = "RetailLocAdjust"val master = "local" // 選擇模式val conf = new SparkConf().setMaster(master).setAppName(appName)// 啟動一個 SparkContext Applicationval sc = new SparkContext(conf)val rdd = sc.textFile("path/...")

  要啟動 Spark 運行程序主要有兩種方式:一種是使用 spark-submit 將腳本文件提交,一種是打開 Spark 跟某種特定語言的解釋器,如:

- spark-shell: 啟動了 Spark 的 scala 解釋器.

- pyspark: 啟動了 Spark 的 python 解釋器.

- sparkR: 啟動了 Spark 的 R 解釋器.

(以上解釋器位於spark 的 bin 目錄下)


3 RDD 初識

  RDD(Resilent Distributed Datasets)俗稱彈性分散式數據集,是 Spark 底層的分散式存儲的數據結構,可以說是 Spark 的核心, Spark API 的所有操作都是基於 RDD 的. 數據不只存儲在一台機器上,而是分布在多台機器上,實現數據計算的並行化.彈性表明數據丟失時,可以進行重建.在Spark 1.5版以後,新增了數據結構 Spark-DataFrame,仿造的 R 和 python 的類 SQL 結構-DataFrame, 底層為 RDD, 能夠讓數據從業人員更好的操作 RDD.

  在Spark 的設計思想中,為了減少網路及磁碟 IO 開銷,需要設計出一種新的容錯方式,於是才誕生了新的數據結構 RDD. RDD 是一種只讀的數據塊,可以從外部數據轉換而來,你可以對RDD 進行函數操作(Operation),包括 Transformation 和 Action. 在這裡只讀表示當你對一個 RDD 進行了操作,那麼結果將會是一個新的 RDD, 這種情況放在代碼里,假設變換前後都是使用同一個變數表示這一 RDD,RDD 裡面的數據並不是真實的數據,而是一些元數據信息,記錄了該 RDD 是通過哪些 Transformation 得到的,在計算機中使用 lineage 來表示這種血緣結構,lineage 形成一個有向無環圖 DAG, 整個計算過程中,將不需要將中間結果落地到 HDFS 進行容錯,加入某個節點出錯,則只需要通過 lineage 關係重新計算即可.

1).RDD 主要具有如下特點:

- 1.它是在集群節點上的不可變的、已分區的集合對象;

- 2.通過並行轉換的方式來創建(如 Map、 filter、join 等);

- 3.失敗自動重建;

- 4.可以控制存儲級別(內存、磁碟等)來進行重用;

- 5.必須是可序列化的;

- 6.是靜態類型的(只讀)。

2).RDD 的創建方式主要有2種:

- 並行化(Parallelizing)一個已經存在與驅動程序(Driver Program)中的集合如set、list;

- 讀取外部存儲系統上的一個數據集,比如HDFS、Hive、HBase,或者任何提供了Hadoop InputFormat的數據源.也可以從本地讀取 txt、csv 等數據集

3).RDD 的操作函數(operation)主要分為2種類型 Transformation 和 Action.

Transformation 操作不是馬上提交 Spark 集群執行的,Spark 在遇到 Transformation 操作時只會記錄需要這樣的操作,並不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.針對每個 Action,Spark 會生成一個 Job, 從數據的創建開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函數操作是一個Action.

如下例子:

val arr = Array("cat", "dog", "lion", "monkey", "mouse")// create RDD by collectionval rdd = sc.parallize(arr) // Map: "cat" -> c, catval rdd1 = rdd.Map(x => (x.charAt(0), x))// groupby same key and countval rdd2 = rdd1.groupBy(x => x._1). Map(x => (x._1, x._2.toList.length))val result = rdd2.collect() print(result)// output:Array((d,1), (l,1), (m,2))

  首先,當你在解釋器里一行行輸入的時候,實際上 Spark 並不會立即執行函數,而是當你輸入了val result = rdd2.collect()的時候, Spark 才會開始計算,從 sc.parallize(arr) 到最後的 collect,形成一個 Job.


4.shuffle 和 stage

shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟.

  RDD 的 Transformation 函數中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的區別是是否發生 shuffle(洗牌) 操作.寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴於其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴於父RDD 的多個分片,所以會造成父 RDD 的各個分片在集群中重新分片, 看如下兩個示例:

// Map: "cat" -> c, catval rdd1 = rdd.Map(x => (x.charAt(0), x))// groupby same key and countval rdd2 = rdd1.groupBy(x => x._1). Map(x => (x._1, x._2.toList.length))

  第一個 Map 操作將 RDD 里的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,可以在集群的各個內存中獨立計算,也就是並行化,第二個 groupby 之後的 Map 操作,為了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數據在內存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗時的操作,應盡量避免不必要的 shuffle.

  寬依賴主要有兩個過程: shuffle write 和 shuffle fetch. 類似 Hadoop 的 Map 和 Reduce 階段.shuffle write 將 ShuffleMapTask 任務產生的中間結果緩存到內存中, shuffle fetch 獲得 ShuffleMapTask 緩存的中間結果進行 ShuffleReduceTask 計算,這個過程容易造成OutOfMemory.

  shuffle 過程內存分配使用 ShuffleMemoryManager 類管理,會針對每個 Task 分配內存,Task 任務完成後通過 Executor 釋放空間.這裡可以把 Task 理解成不同 key 的數據對應一個 Task. 早期的內存分配機制使用公平分配,即不同 Task 分配的內存是一樣的,但是這樣容易造成內存需求過多的 Task 的 OutOfMemory, 從而造成多餘的 磁碟 IO 過程,影響整體的效率.(例:某一個 key 下的數據明顯偏多,但因為大家內存都一樣,這一個 key 的數據就容易 OutOfMemory).1.5版以後 Task 共用一個內存池,內存池的大小默認為 JVM 最大運行時內存容量的16%,分配機制如下:假如有 N 個 Task,ShuffleMemoryManager 保證每個 Task 溢出之前至少可以申請到1/2N 內存,且至多申請到1/N,N 為當前活動的 shuffle Task 數,因為N 是一直變化的,所以 manager 會一直追蹤 Task 數的變化,重新計算隊列中的1/N 和1/2N.但是這樣仍然容易造成內存需要多的 Task 任務溢出,所以最近有很多相關的研究是針對 shuffle 過程內存優化的.

如下 DAG 流程圖中,分別讀取數據,經過處理後 join 2個 RDD 得到結果

在這個圖中,根據是否發生 shuffle 操作能夠將其分成如下的 stage 類型:

(join 需要針對同一個 key 合併,所以需要 shuffle)

  運行到每個 stage 的邊界時,數據在父 stage 中按照 Task 寫到磁碟上,而在子 stage 中通過網路按照 Task 去讀取數據。這些操作會導致很重的網路以及磁碟的I/O,所以 stage 的邊界是非常佔資源的,在編寫 Spark 程序的時候需要盡量避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,所以那些產生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的數據將被切分為多少個 partition[^demoa]。

PS:shuffle 操作的時候可以用 combiner 壓縮數據,減少 IO 的消耗


5.性能優化

主要是我之前寫腳本的時候踩過的一些坑和在網上看到的比較好的調優的方法.

5.1 緩存機制和 cache 的意義

  Spark中對於一個RDD執行多次運算元(函數操作)的默認原理是這樣的:每次你對一個RDD執行一個運算元操作時,都會重新從源頭處計算一遍,計算出那個RDD來,然後再對這個RDD執行你的運算元操作。這種方式的性能是很差的。

因此對於這種情況,我們的建議是:對多次使用的RDD進行持久化。

  首先要認識到的是, .Spark 本身就是一個基於內存的迭代式計算,所以如果程序從頭到尾只有一個 Action 操作且子 RDD 只依賴於一個父RDD 的話,就不需要使用 cache 這個機制, RDD 會在內存中一直從頭計算到尾,最後才根據你的 Action 操作返回一個值或者保存到相應的磁碟中.需要 cache 的是當存在多個 Action 操作或者依賴於多個 RDD 的時候, 可以在那之前緩存RDD. 如下:

val rdd = sc.textFile("path/to/file").Map(...).filter(...)val rdd1 = rdd.Map(x => x+1)val rdd2 = rdd.Map(x => x+100)val rdd3 = rdd1.join(rdd2)rdd3.count()

  在這裡 有2個 RDD 依賴於 rdd, 會形成如下的 DAG 圖:

  所以可以在 rdd 生成之後使用 cache 函數對 rdd 進行緩存,這次就不用再從頭開始計算了.緩存之後過程如下:

  除了 cache 函數外,緩存還可以使用 persist, cache 是使用的默認緩存選項,一般默認為Memoryonly(內存中緩存), persist 則可以在緩存的時候選擇任意一種緩存類型.事實上, cache 內部調用的是默認的 persist.

持久化的類型如下:

是否進行序列化和磁碟寫入,需要充分考慮所分配到的內存資源和可接受的計算時間長短,序列化會減少內存佔用,但是反序列化會延長時間,磁碟寫入會延長時間,但是會減少內存佔用,也許能提高計算速度.此外要認識到:cache 的 RDD 會一直佔用內存,當後期不需要再依賴於他的反覆計算的時候,可以使用 unpersist 釋放掉.

5.2 shuffle 的優化

  我們前面說過,進行 shuffle 操作的是是很消耗系統資源的,需要寫入到磁碟並通過網路傳輸,有時還需要對數據進行排序.常見的 Transformation 操作如:repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle 數據[^demoa],合理的選用操作將降低 shuffle 操作的成本,提高運算速度.具體如下:

- 當進行聯合的規約操作時,避免使用 groupByKey。舉個例子,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執行的結果是一樣的,但是前者需要把全部的數據通過網路傳遞一遍,而後者只需要根據每個 key 局部的 partition 累積結果,在 shuffle 的之後把局部的累積值相加後得到結果.

- 當輸入和輸入的類型不一致時,避免使用 reduceByKey。舉個例子,我們需要實現為每一個key查找所有不相同的 string。一個方法是利用 map 把每個元素的轉換成一個 Set,再使用 reduceByKey 將這些 Set 合併起來[^demoa].

- 生成新列的時候,避免使用單獨生成一列再 join 回來的方式,而是直接在數據上生成.

- 當需要對兩個 RDD 使用 join 的時候,如果其中一個數據集特別小,小到能塞到每個 Executor 單獨的內存中的時候,可以不使用 join, 使用 broadcast 操作將小 RDD 複製廣播到每個 Executor 的內存里 join.(broadcast 的用法可以查看官方 API 文檔)

關於 shuffle 更多的介紹可以查看[^demoa]這篇博文.

5.3 資源參數調優

這些參數主要在 spark-submit 提交的時候指定,或者寫在配置文件中啟動.可以通過 spark-submit --help 查看.

具體如下:

  資源參數的調優,沒有一個固定的值,需要根據自己的實際情況(包括Spark作業中的shuffle操作數量、RDD持久化操作數量以及Spark web ui中顯示的作業gc情況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。

5.4 小結

  • 對需要重複計算的才使用 cache, 同時及時釋放掉(unpersist)不再需要使用的 RDD.
  • 避免使用 shuffle 運算.需要的時候盡量選取較優方案.
  • 合理配置 Executor/Task/core 的參數,合理分配持久化/ shuffle的內存佔比,

    • driver-memory: 1G
    • executor-memory: 4~8G(根據實際需求來)
    • num-executors: 50~100
    • executor-cores: 2~4
    • Tasks: 500~1000

6.本地搭建 Spark 開發環境

6.1 Spark-Scala-IntelliJ

本地搭建 Spark-scala開發環境, 並使用 IntelliJ idea 作為 IDE 的方法,參見下一篇文章:

Spark-Scala-IntelliJ開發環境搭建和編譯運行流程

6.2 Spark-Notebook 開發環境

本地搭建 Spark-Notebook(python or scala) 開發環境, 參見下一篇文章:

Spark-Notebook開發環境搭建.pdf


databatman | 凱菜


參考文獻

  1. 文獻:大數據分析平台建設與應用綜述 ?
  2. Spark學習手冊(三):Spark模塊摘讀 ?
  3. Spark入門實戰系列–3.Spark編程模型(上)–編程模型及SparkShell實戰 ?
  4. 文獻: 基於 spark 平台推薦系統研究. ?
  5. Apache Spark源碼走讀之7 – Standalone部署方式分析 ?
  6. Spark性能優化指南——基礎篇 ?
  7. Apache Spark Jobs 性能調優(一) ?
  8. Spark性能優化指南——基礎篇 ?
  9. Apache Spark Jobs 性能調優(一) ?
  10. Apache Spark Jobs 性能調優(一) ?
  11. Apache Spark Jobs 性能調優(一) ?
  12. Spark性能優化指南——基礎篇 ?

推薦閱讀:

從頭學習大數據培訓課程 數據倉儲工具 hive(五)hive 的 grouping sets、排序、窗口函數用法
從頭學習大數據培訓課程 hadoop,mapreducer 分散式計算框架(二) Windows 偽分散式環境部署
Apache Kudu 加速對頻繁更新數據的分析
Apache Hadoop mapreduce 的Shuffle過程詳解

TAG:Spark | Hadoop | 大數據 |