大聖悄悄告訴你Spark內核的故事!

親愛的同學們好

俺老孫來也~

好久不見 是不是特別想念

俺老孫很遺憾的通知你們

暑假餘額已經用完了!

要正式開始學習了!

咳咳

作為正式開課的第一天

今天我們要講的東西厲害了哦

就是傳說中的

不好意思 發錯圖片了

當然是傳說中的

所謂Spark是起源於美國加州大學伯克利分校AMPLab的大數據計算平台,在2010年開源,目前是Apache軟體基金會的頂級項目。隨著Spark在大數據計算領域的暫露頭角,越來越多的企業開始關注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節點數,把100TB數據的排序時間從72分鐘提高到了23分鐘。

Spark在架構上包括內核部分和4個官方子模塊

Spark SQL

Spark Streaming

機器學習庫MLlib

圖計算庫GraphX

由Spark在伯克利的數據分析軟體棧BDAS(Berkeley Data Analytics Stack)中的位置可見,Spark專註於數據的計算,而數據的存儲在生產環境中往往還是由Hadoop分散式文件系統HDFS承擔。

Spark在BDAS中的位置

Spark被設計成支持多場景的通用大數據計算平台,它可以解決大數據計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數據源的讀取數據,並且擁有不斷發展的機器學習庫和圖計算庫供開發者使用。數據和計算在Spark內核及Spark的子模塊中是打通的,這就意味著Spark內核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內核為基礎,進一步支持更多的計算場景,例如使用Spark SQL讀入的數據可以作為機器學習庫MLlib的輸入。以下列舉了一些在Spark平台上的計算場景。

Spark的應用場景舉例

之前在大數據概述的課程中我們提到了Hadoop,大數據工程師都非常了解Hadoop MapReduce一個最大的問題是在很多應用場景中速度非常慢,只適合離線的計算任務。這是由於MapReduce需要將任務劃分成map和reduce兩個階段,map階段產生的中間結果要寫回磁碟,而在這兩個階段之間需要進行shuffle操作。Shuffle操作需要從網路中的各個節點進行數據拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時間耗費在網路磁碟IO中而不是用於計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。

那Spark是如果設計分散式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數據集(Resilient Distributed Dataset),也就是RDD

關鍵詞:彈性分布數據集RDD

RDD是Spark中對數據和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的並能夠被並行操作的數據集合。對RDD的操作分為兩種transformation和action。Transformation操作是通過轉換從一個或多個RDD生成新的RDD。Action操作是從RDD生成最後的計算結果。在Spark最新的版本中,提供豐富的transformation和action操作,比起MapReduce計算模型中僅有的兩種操作,會大大簡化程序開發的難度。

RDD的生成方式只有兩種,一是從數據源讀入,另一種就是從其它RDD通過transformation操作轉換。一個典型的Spark程序就是通過Spark上下文環境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,最後通過調用最終RDD的action方法輸出結果。

每個RDD都可以用下面5個特性來表示,其中後兩個為可選的:

  • 分片列表(數據塊列表)

  • 計算每個分片的函數

  • 對父RDD的依賴列表

  • 對key-value類型的RDD的分片器(Partitioner)(可選)

  • 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)(可選)

雖然Spark是基於內存的計算,但RDD不光可以存儲在內存中,根據useDisk、useMemory、useOffHeap, deserialized、replication五個參數的組合Spark提供了12種存儲級別,在後面介紹RDD的容錯機制時,我們會進一步理解。值得注意的是當StorageLevel設置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基於內存的分散式文件系統,目前正在快速發展,在這裡我們就不做詳細介紹啦,可以通過其官方網站進一步了解。

DAG、Stage與任務的生成

Spark的計算髮生在RDD的action操作,而對action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。

Spark內核會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。舉個例子,在下圖中,從輸入中邏輯上生成A和C兩個RDD,經過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因為這時候計算沒有發生,Spark內核做的事情只是記錄了RDD的生成和依賴關係。當F要進行輸出時,也就是F進行了action操作,Spark會根據RDD的依賴生成DAG,並從起點開始真正的計算。

邏輯上的計算過程:DAG

有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分成任務集,也就是Stage,這樣可以將任務提交到計算節點進行真正的計算。Spark計算的中間結果默認是保存在內存中的,Spark在劃分Stage的時候會充分考慮在分散式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據就是RDD的依賴類型。

根據不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴於父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴於父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有groupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對於窄依賴,Spark會將其盡量劃分在同一個stage中,因為它們可以進行流水線計算。

RDD的寬依賴和窄依賴

最後我們再通過下圖來詳細解釋一下Spark中的Stage劃分。我們從HDFS中讀入數據生成3個不同的RDD,通過一系列transformation操作後再將計算結果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內核會以此為邊界將其前後劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。

Spark中的Stage劃分

今天的課程就是這樣

有任何不懂的

歡迎隨時騷擾俺老孫

下回見

歡迎訂閱公眾號:apesedu


推薦閱讀:

如何使用 parquet file 建立一個 RDD 的同時讀進 case class 結構?
較新版本的spark中,那些shuffle數據塊如何緩存?
Hive On Spark/Tez項目目前進展如何?
ambari使用最全面解析
幾個Scala入門視頻教程

TAG:Spark | 大數據 | 大數據分析 |