從頭學習大數據培訓課程 spark 基於內存的分散式計算框架(一)spark 介紹、RDD 原理、spark 開發環境搭建

1.什麼Spark

Spark是加州大學伯克利分校AMP實驗室(Algorithms, Machines, and People Lab)開發的通用內存並行計算框架,用於構建大型的、低延遲的數據分析應用程序。

Spark使用Scala語言開發,它還提供了對Scala、Python、Java(支持Java 8)和R語言的支持

Apache頂級項目,項目主頁:http://spark.apache.org

2.Spark歷史

2009年由Berkeleys AMPLab開始編寫最初的源代碼

2010年開放源代碼

2013年6月進入Apache孵化器項目

2014年2月成為Apache的頂級項目(8個月時間)

2014年5月底Spark1.0.0發布,打破Hadoop保持的基準排序紀錄

2014年12月Spark1.2.0發布

2015年11月Spark1.5.2發布

2016年1月Spark1.6發布

2016年12月Spark2.1發布

3.為什麼要用Spark

運行速度快:使用DAG執行引擎以支持循環數據流與內存計算

易用性好:支持使用Scala、Java、Python和R語言進行編程,可以通過Spark Shell進行互動式編程

通用性強:Spark提供了完整而強大的工具,包括SQL查詢、流式計算、機器學習和圖演算法組件

隨處運行:可運行於獨立的集群模式中,可運行於Hadoop中,也可運行於Amazon EC2等雲環境中,並且可以訪問HDFS、Cassandra、HBase、Hive等多種數據源

4.對比Hadoop

解決問題的出發點不一樣

Hadoop用普通硬體解決存儲和計算問題

Spark用於構建大型的、低延遲的數據分析應用程序,不實現存儲

Spark是在借鑒了MapReduce之上發展而來的,繼承了其分散式並行計算的優點並改進了MapReduce明顯的缺陷

Spark中間數據放到內存中,迭代運算效率高

Spark引進了彈性分散式數據集的抽象,數據對象既可以放在內存,也可以放在磁碟,容錯性高,可用自動重建

RDD計算時可以通過CheckPoint來實現容錯

Hadoop只提供了Map和Reduce操作,Spark更加通用,提供的數據集操作類型有很多種,主要分為: Transformations和Actions兩大類

5.spark生態

spark core實現了spark的基本功能、包括任務調度、內存管理、錯誤恢復與存儲系統交互等模塊。spark core中還包含了對彈性分散式數據集(resileent distributed dataset)的定義

spark sql是spark用來操作結構化數據的程序,能過SPARK SQL,我們可以使用SQL或者HIVE(HQL)來查詢數據,支持多種數據源,比如HIVE表就是JSON等,除了提供SQL查詢介面,還支持將SQL和傳統的RDD結合,開發者可以在一個應用中同時使用SQL和編程的方式(API)進行數據的查詢分析,SPARK SQL是在1.0中被引入的

Spark Streaming是Spark提供的對實時數據進行流式計算的組件,比如網頁伺服器日誌,或者是消息隊列都是數據流。

MLLib是Spark中提供常見的機器學習功能的程序庫,包括很多機器學習演算法,比如分類、回歸、聚類、協同過濾等。

GraphX是用於圖計算的比如社交網路的朋友關係圖。

6.Spark應用場景

Yahoo將Spark用在Audience Expansion中的應用,進行點擊預測和即席查詢等

淘寶技術團隊使用了Spark來解決多次迭代的機器學習演算法、高計算複雜度的演算法等。應用於內容推薦、社區發現等

騰訊大數據精準推薦藉助Spark快速迭代的優勢,實現了在「數據實時採集、演算法實時訓練、系統實時預測」的全流程實時並行高維演算法,最終成功應用於廣點通pCTR投放系統上。

優酷土豆將Spark應用於視頻推薦(圖計算)、廣告業務,主要實現機器學習、圖計算等迭代計算

7.spark編程

1.RDD設計背景

在實際應用中,存在許多迭代式計算,這些應用場景的共同之處是,不同計算階段之間會重用中間結果,即一個階段的輸出結果會作為下一個階段的輸入。但是,目前的MapReduce框架都是把中間結果寫入到HDFS中,帶來了大量的數據複製、磁碟IO和序列化開銷。顯然,如果能將結果保存在內存當中,就可以大量減少IO。RDD就是為了滿足這種需求而出現的,它提供了一個抽象的數據架構,我們不必擔心底層數據的分散式特性,只需將具體的應用邏輯表達為一系列轉換處理,不同RDD之間的轉換操作形成依賴關係,可以實現管道化,從而避免了中間結果的落地存儲,大大降低了數據複製、磁碟IO和序列化開銷。

2.RDD概念

一個RDD就是一個分散式對象集合,本質上是一個只讀的分區記錄集合,每個RDD可以分成多個分區,每個分區就是一個數據集片段(HDFS上的塊),並且一個RDD的不同分區可以被保存到集群中不同的節點上,從而可以在集群中的不同節點上進行並行計算。RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,不能直接修改,只能基於穩定的物理存儲中的數據集來創建RDD,或者通過在其他RDD上執行確定的轉換操作(如map、join和groupBy)而創建得到新的RDD。RDD提供了一組豐富的操作以支持常見的數據運算,分為「行動」(Action)和「轉換」(Transformation)兩種類型,前者用於執行計算並指定輸出的形式,後者指定RDD之間的相互依賴關係。兩類操作的主要區別是,轉換操作(比如map、filter、groupBy、join等)接受RDD並返回RDD,而行動操作(比如count、collect等)接受RDD但是返回非RDD(即輸出一個值或結果)。

Spark用Scala語言實現了RDD的API,程序員可以通過調用API實現對RDD的各種操作。RDD典型的執行過程如下:

RDD讀入外部數據源(或者內存中的集合)進行創建;

RDD經過一系列的「轉換」操作,每一次都會產生不同的RDD,供給下一個「轉換」使用;

最後一個RDD經「行動」操作進行處理,並輸出到外部數據源(或者變成Scala/JAVA集合或變數)。

需要說明的是,RDD採用了惰性調用,即在RDD的執行過程中,真正的計算髮生在RDD的「行動」操作,對於「行動」之前的所有「轉換」操作,Spark只是記錄下「轉換」操作應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關係,而不會觸發真正的計算。

從輸入中邏輯上生成A和C兩個RDD,經過一系列「轉換」操作,邏輯上生成了F(也是一個RDD),之所以說是邏輯上,是因為這時候計算並沒有發生,Spark只是記錄了RDD之間的生成和依賴關係。當F要進行輸出時,也就是當F進行「行動」操作的時候,Spark才會根據RDD的依賴關係生成DAG,並從起點開始真正的計算。

這一系列處理稱為一個「血緣關係(Lineage)」,即DAG拓撲排序的結果。採用惰性調用,通過血緣關係連接起來的一系列RDD操作就可以實現管道化(pipeline),避免了多次轉換操作之間數據同步的等待,而且不用擔心有過多的中間數據,因為這些具有血緣關係的操作都管道化了,一個操作得到的結果不需要保存為中間數據,而是直接管道式地流入到下一個操作進行處理。同時,這種通過血緣關係把一系列操作進行管道化連接的設計方式,也使得管道中每次操作的計算變得相對簡單,保證了每個操作在處理邏輯上的單一性;相反,在MapReduce的設計中,為了儘可能地減少MapReduce過程,在單個MapReduce中會寫入過多複雜的邏輯。

java程序示例

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD lines = sc.textFile(logFile);

lines.filter(new Function<String, Boolean>() {

@Override

public Boolean call(String v1) throws Exception {

return v1.contains("helle world");

}

});

lines.cache();

long count = lines.count();

System.out.println(count);

可以看出,一個Spark應用程序,基本是基於RDD的一系列計算操作。第1行代碼用於創建JavaSparkContext對象;第2行代碼從HDFS文件中讀取數據創建一個RDD;第3行代碼對fileRDD進行轉換操作得到一個新的RDD,即filterRDD;lines.cache()表示對filterRDD進行持久化,把它保存在內存或磁碟中(這裡採用cache介面把數據集保存在內存中),方便後續重複使用,當數據被反覆訪問時(比如查詢一些熱點數據,或者運行迭代演算法),這是非常有用的,而且通過cache()可以緩存非常大的數據集,支持跨越幾十甚至上百個節點;lines.count()是一個行動操作,用於計算一個RDD集合中包含的元素個數。這個程序的執行過程如下:

創建這個Spark程序的執行上下文,即創建SparkContext對象;

從外部數據源(即HDFS文件)中讀取數據創建fileRDD對象;

構建起fileRDD和filterRDD之間的依賴關係,形成DAG圖,這時候並沒有發生真正的計算,只是記錄轉換的軌跡;

執行action代碼時,count()是一個行動類型的操作,觸發真正的計算,開始實際執行從fileRDD到filterRDD的轉換操作,並把結果持久化到內存中,最後計算出filterRDD中包含的元素個數。

3.RDD特性

總體而言,Spark採用RDD以後能夠實現高效計算的主要原因如下:

(1)高效的容錯性。現有的分散式共享內存、鍵值存儲、內存資料庫等,為了實現容錯,必須在集群節點之間進行數據複製或者記錄日誌,也就是在節點之間會發生大量的數據傳輸,這對於數據密集型應用而言會帶來很大的開銷。在RDD的設計中,數據只讀,不可修改,如果需要修改數據,必須從父RDD轉換到子RDD,由此在不同RDD之間建立了血緣關係。所以,RDD是一種天生具有容錯機制的特殊集合,不需要通過數據冗餘的方式(比如檢查點)實現容錯,而只需通過RDD父子依賴(血緣)關係重新計算得到丟失的分區來實現容錯,無需回滾整個系統,這樣就避免了數據複製的高開銷,而且重算過程可以在不同節點之間並行進行,實現了高效的容錯。此外,RDD提供的轉換操作都是一些粗粒度的操作(比如map、filter和join),RDD依賴關係只需要記錄這種粗粒度的轉換操作,而不需要記錄具體的數據和各種細粒度操作的日誌(比如對哪個數據項進行了修改),這就大大降低了數據密集型應用中的容錯開銷;

(2)中間結果持久化到內存。數據在內存中的多個RDD操作之間進行傳遞,不需要「落地」到磁碟上,避免了不必要的讀寫磁碟開銷;

(3)存放的數據可以是Java對象,避免了不必要的對象序列化和反序列化開銷。

RDD之間的依賴關係

RDD中不同的操作會使得不同RDD中的分區會產生不同的依賴。RDD中的依賴關係分為窄依賴(Narrow Dependency)與寬依賴(Wide Dependency)

兩種依賴之間的區別。

窄依賴表現為一個父RDD的分區對應於一個子RDD的分區,或多個父RDD的分區對應於一個子RDD的分區

寬依賴則表現為存在一個父RDD的一個分區對應一個子RDD的多個分區

總體而言,如果父RDD的一個分區只被一個子RDD的一個分區所使用就是窄依賴,否則就是寬依賴。窄依賴典型的操作包括map、filter、union等,寬依賴典型的操作包括groupByKey、sortByKey等。對於連接(join)操作,可以分為兩種情況。

(1)對輸入進行協同劃分,屬於窄依賴。所謂協同劃分(co-partitioned)是指多個父RDD的某一分區的所有「鍵(key)」,落在子RDD的同一個分區內,不會產生同一個父RDD的某一分區,落在子RDD的兩個分區的情況。

(2)對輸入做非協同劃分,屬於寬依賴,。

對於窄依賴的RDD,可以以流水線的方式計算所有父分區,不會造成網路之間的數據混合。對於寬依賴的RDD,則通常伴隨著Shuffle操作,即首先需要計算好所有父分區數據,然後在節點之間進行Shuffle。

窄依賴與寬依賴的區別

Spark的這種依賴關係設計,使其具有了天生的容錯性,大大加快了Spark的執行速度。因為,RDD數據集通過「血緣關係」記住了它是如何從其它RDD中演變過來的,血緣關係記錄的是粗顆粒度的轉換操作行為,當這個RDD的部分分區數據丟失時,它可以通過血緣關係獲取足夠的信息來重新運算和恢復丟失的數據分區,由此帶來了性能的提升。相對而言,在兩種依賴關係中,窄依賴的失敗恢復更為高效,它只需要根據父RDD分區重新計算丟失的分區即可(不需要重新計算所有分區),而且可以並行地在不同節點進行重新計算。而對於寬依賴而言,單個節點失效通常意味著重新計算過程會涉及多個父RDD分區,開銷較大。此外,Spark還提供了數據檢查點和記錄日誌,用於持久化中間RDD,從而使得在進行失敗恢復時不需要追溯到最開始的階段。在進行故障恢復時,Spark會對數據檢查點開銷和重新計算RDD分區的開銷進行比較,從而自動選擇最優的恢復策略。

5.階段的劃分

Spark通過分析各個RDD的依賴關係生成了DAG,再通過分析各個RDD中的分區之間的依賴關係來決定如何劃分階段,具體劃分方法是:在DAG中進行反向解析,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到當前的階段中;將窄依賴盡量劃分在同一個階段中,可以實現流水線計算。例如,假設從HDFS中讀入數據生成3個不同的RDD(即A、C和E),通過一系列轉換操作後再將計算結果保存回HDFS。對DAG進行解析時,在依賴圖中進行反向解析,由於從RDD A到RDD B的轉換以及從RDD B和F到RDD G的轉換,都屬於寬依賴,因此,在寬依賴處斷開後可以得到三個階段,即階段1、階段2和階段3。可以看出,在階段2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,比如,分區7通過map操作生成的分區9,可以不用等待分區8到分區9這個轉換操作的計算結束,而是繼續進行union操作,轉換得到分區13,這樣流水線執行大大提高了計算的效率。

由上述論述可知,把一個DAG圖劃分成多個「階段」以後,每個階段都代表了一組關聯的、相互之間沒有Shuffle依賴關係的任務組成的任務集合。每個任務集合會被提交給任務調度器(TaskScheduler)進行處理,由任務調度器將任務分發給Executor運行。

通過上述對RDD概念、依賴關係和階段劃分的介紹,結合之前介紹的Spark運行基本流程,這裡再總結一下RDD在Spark架構中的運行過程:

(1)創建RDD對象;

(2)SparkContext負責計算RDD之間的依賴關係,構建DAG;

(3)DAGScheduler負責把DAG圖分解成多個階段,每個階段中包含了多個任務,每個任務會被任務調度器分發

給各個工作節點(Worker Node)上的Executor去執行。

RDD支持兩種類型的操作:

transformation:從一個RDD轉換為一個新的RDD。

action:基於一個數據集進行運算,並返回RDD。

例如,map是一個transformation操作,map將數據集的每一個元素按指定的函數轉換為一個RDD返回。reduce是一個action操作

Spark的所有transformation操作都是懶執行,它們並不立馬執行,而是先記錄對數據集的一系列transformation操作。這種設計讓Spark的運算更加高效,例如,對一個數據集map操作之後使用reduce只返回結果,而不返回龐大的map運算的結果集。

默認情況下,每個轉換的RDD在執行action操作時都會重新計算。即使兩個action操作會使用同一個轉換的RDD,該RDD也會重新計算。除非使用persist方法或cache方法將RDD緩存到內存,這樣在下次使用這個RDD時將會提高計算效率,也支持將RDD持久化到硬碟上,或在多個節點上複製

8.Transformations

下面列出了Spark常用的transformation操作。詳細的細節請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。

map(func)

將原來RDD的每個數據項,使用map中用戶自定義的函數func進行映射,轉變為一個新的元素,並返回一個新的RDD。

filter(func)

使用函數func對原RDD中數據項進行過濾,將符合func中條件的數據項組成新的RDD返回。

flatMap(func)

類似於map,但是輸入數據項可以被映射到0個或多個輸出數據集合中,所以函數func的返回值是一個數據項集合而不是一個單一的數據項。

mapPartitions(func)

類似於map,但是該操作是在每個分區上分別執行,所以當操作一個類型為T的RDD時func的格式必須是Iterator => Iterator。即mapPartitions需要獲取到每個分區的迭代器,在函數中通過這個分區的迭代器對整個分區的元素進行操作。

mapPartitionsWithIndex(func)

類似於mapPartitions,但是需要提供給func一個整型值,這個整型值是分區的索引,所以當處理T類型的RDD時,func的格式必須為(Int, Iterator) => Iterator。

sample(withReplacement, fraction, seed)

對數據採樣。用戶可以設定是否有放回(withReplacement)、採樣的百分比(fraction)、隨機種子(seed)。

union(otherDataset)

返回原數據集和參數指定的數據集合併後的數據集。使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合併的RDD元素數據類型相同。該操作不進行去重操作,返回的結果會保存所有元素。如果想去重,可以使用distinct()。

intersection(otherDataset)

返回兩個數據集的交集。

distinct([numTasks]))

將RDD中的元素進行去重操作。

groupByKey([numTasks])

操作(K,V)格式的數據集,返回 (K, Iterable)格式的數據集。

注意,如果分組是為了按key進行聚合操作(例如,計算sum、average),此時使用reduceByKey或aggregateByKey計算效率會更高。

注意,默認情況下,並行情況取決於父RDD的分區數,但可以通過參數numTasks來設置任務數。

reduceByKey(func, [numTasks])

使用給定的func,將(K,V)對格式的數據集中key相同的值進行聚集,其中func的格式必須為(V,V) => V。可選參數numTasks可以指定reduce任務的數目。

aggregateByKey(zeroValue)(seqOp, combOp,[numTasks])

對(K,V)格式的數據按key進行聚合操作,聚合時使用給定的合併函數和一個初試值,返回一個(K,U)對格式數據。需要指定的三個參數:zeroValue為在每個分區中,對key值第一次讀取V類型的值時,使用的U類型的初始變數;seqOp用於在每個分區中,相同的key中V類型的值合併到zeroValue創建的U類型的變數中。combOp是對重新分區後兩個分區中傳入的U類型數據的合併函數。

sortByKey([ascending], [numTasks])

(K,V)格式的數據集,其中K已實現了Ordered,經過sortByKey操作返回排序後的數據集。指定布爾值參數ascending來指定升序或降序排列。

join(otherDataset, [numTasks])

用於操作兩個鍵值對格式的數據集,操作兩個數據集(K,V)和(K,W)返回(K, (V, W))格式的數據集。通過leftOuterJoin、rightOuterJoin、fullOuterJoin完成外連接操作。

cogroup(otherDataset, [numTasks])

用於操作兩個鍵值對格式數據集(K,V)和(K,W),返回數據集格式為 (K,(Iterable, Iterable)) 。這個操作也稱為groupWith。對在兩個RDD中的Key-Value類型的元素,每個RDD相同Key的元素分別聚合為一個集合,並且返回兩個RDD中對應Key的元素集合的迭代器。

cartesian(otherDataset)

對類型為T和U的兩個數據集進行操作,返回包含兩個數據集所有元素對的(T,U)格式的數據集。即對兩個RDD內的所有元素進行笛卡爾積操作。

pipe(command, [envVars])

以管道(pipe)方式將 RDD的各個分區(partition)使用 shell命令處理(比如一個 Perl或 bash腳本)。 RDD的元素會被寫入進程的標準輸入(stdin),將進程返回的一個字元串型 RDD(RDD of strings),以一行文本的形式寫入進程的標準輸出(stdout)中。

coalesce(numPartitions)

把RDD的分區數降低到通過參數numPartitions指定的值。在得到的更大一些數據集上執行操作,會更加高效。

repartition(numPartitions)

隨機地對RDD的數據重新洗牌(Reshuffle),從而創建更多或更少的分區,以平衡數據。總是對網路上的所有數據進行洗牌(shuffles)。

repartitionAndSortWithinPartitions(partitioner)

根據給定的分區器對RDD進行重新分區,在每個結果分區中,按照key值對記錄排序。這在每個分區中比先調用repartition再排序效率更高,因為它可以將排序過程在shuffle操作的機器上進行。

9.Actions

下面列出了Spark支持的常用的action操作。詳細請參考RDD API文檔(Scala、Java、Python、R)和鍵值對RDD方法文檔(Scala、Java)。

reduce(func)

使用函數func聚集數據集中的元素,這個函數func輸入為兩個元素,返回為一個元素。這個函數應該符合結合律和交換了,這樣才能保證數據集中各個元素計算的正確性。

collect()

在驅動程序中,以數組的形式返回數據集的所有元素。通常用於filter或其它產生了大量小數據集的情況。

count()

返回數據集中元素的個數。

first()

返回數據集中的第一個元素(類似於take(1))。

take(n)

返回數據集中的前n個元素。

takeSample(withReplacement,num, [seed])

對一個數據集隨機抽樣,返回一個包含num個隨機抽樣元素的數組,參數withReplacement指定是否有放回抽樣,參數seed指定生成隨機數的種子。

takeOrdered(n, [ordering])

返回RDD按自然順序或自定義順序排序後的前n個元素。

saveAsTextFile(path)

將數據集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。Spark將在每個元素上調用toString方法,將數據元素轉換為文本文件中的一行記錄。

saveAsSequenceFile(path) (Java and Scala)

將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。該操作只支持對實現了Hadoop的Writable介面的鍵值對RDD進行操作。在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)。

saveAsObjectFile(path) (Java and Scala)

將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可以使用SparkContext.objectFile()進行載入。

countByKey()

僅支持對(K,V)格式的鍵值對類型的RDD進行操作。返回(K,Int)格式的Hashmap,(K,Int)為每個key值對應的記錄數目。

foreach(func)

對數據集中每個元素使用函數func進行處理。該操作通常用於更新一個累加器(Accumulator)或與外部數據源進行交互。注意:在foreach()之外修改累加器變數可能引起不確定的後果。詳細介紹請閱讀

10.Spark環境部署

主要運行方式

Local

Standalone

On YARN

On Mesos

11.環境依賴

Linux,CentOS 6/7

安裝JDK

下載 spark 安裝包

spark.apache.org/downlo

12.應用開發環境

JDK 1.8

Eclipse 4.6

Scala-IDE

scala-ide.org

Scala 2.11 可選

scala-lang.org

1.eclipse的scala開發插件

2.開發spark項目的maven依賴

2.開發spark項目的maven依賴

本文由海牛學院青牛原創 | 如需轉載請註明出處


推薦閱讀:

Adagio 請給我一個交代
昨天,極限挑戰給我打了個電話……
關於培訓的心得體會怎麼寫?
學而不思
英語8種時態的歸納

TAG:Spark | 大數據 | 教育培訓 |