Spark編程有哪些有用技巧?

任何你自己覺得值得一提的技巧都可以說,比如加速計算、提高編程效率、在EC2集群上省錢,等等。


1.正確使用轉換操作,明確何時應該減少shuffle,何時應該增加shuffle。

2.恰當的資源配置,資源配置不當會嚴重影響作業性能,哪怕你代碼上無懈可擊。

以上是粗略的小結,具體請看Cloudera的兩篇關於Spark調優的博客,How-to: Tune Your Apache Spark Jobs (Part 1)


也不能算技巧,一些小經驗分享一下~

使用Scala的簡潔風格操作資料庫

開始安利一下這個小組件了。

你可以在Spark程序中這樣子讀HBase:

val rdd = sc.fromHBase[(String, String, String)]("mytable")
.select("col1", "col2")
.inColumnFamily("columnFamily")
.withStartRow("startRow")
.withEndRow("endRow")
//rdd type:RDD[(String,String,String)]

這樣子寫HBase:

//rdd type:RDD[(String,String,String)]
rdd.toHBase("mytable")
.insert("col1", "col2")
.inColumnFamily("columnFamily")
.save()

寫MySQL的時候歪了,沒有直接操作RDD,而是操作集合(貌似這樣所有Scala程序都能用它讀寫MySQL)

讀MySQL:

//以Spark中為例
val list = sc.fromMysql[(Int,String,Int)]("table-name")
.select("id","name","age")
.where("where-conditions")
.get
//list type:Seq[(Int,String,Int)]

寫MySQL:

//list type:Iterable[T]
list.toMysql("table-name")
.insert("columns")
.where("where-conditions")
.save()

代碼在此:chubbyjiang/Spark_DB_Connector

關於filter和union

這倆貨還是不要湊一起的好...

如果你這樣子操作:

val data1 = rdd.filter(condition1)
val data2 = rdd.filter(condition2)
...
val dataN = rdd.filter(conditionN)
val totalData = data1.union(data2)...union(dataN).otherTransformation

去WebUI瞧一瞧吧,有個大大的surprise。

WebUI

這是個好東西啊,你想要的和不想要的都在上面了。

各個Executor的Input數據不平衡-&>你可能有數據傾斜的麻煩

特定的Stage中的Input比較不均勻-&>會不會有filter這種會造成局部數據大小偏差的操作

Executor的GC時間長-&>是不是很多數據往內存裡面擠或者其他數據問題

UI中還可以看到任務執行的各個階段所以及其佔據的總體時間,可以從中看出有問題的階段從而進行排查調優。

將廣播變數類型作為類成員屬性

這塊有個坑,假設在一個class中聲明一個成員變數的類型為Broadcast[T],然後在外部通過obj.bro = sc.broadcast(something)來設置的話,引用時為空。

broadcast操作會將要廣播的數據存在某個地方(磁碟內存),並且開啟一個類似HttpServer的服務允許各個節點上的Executor來Driver端獲取數據,當task在各個節點上反序列化開始執行並使用到了廣播變數之後就會到Driver那把數據拿到本地使用。

當使用obj.bro的時候是一個指向某個內存地址的指針,猜想跟上面的過程不搭嘎,so~

foreachPartition

號外號外,如果(foreachPartition操作的數據很大 || 單Executor內存不太夠) 你這樣子做的話:

rdd.foreachPartition{
x.toList.//在list上面的其他操作
}

會驚喜的得到一份OOM(或者任務失敗)大禮包,因為toList操作把當前分區中的所有數據一次性載入到內存中處理。

主要臨床癥狀表現為:

  • 一開始的幾個分區可以順利完成操作
  • 隨著數據越讀越多,Executor中的內存開始吃力的時候觸發GC
  • WebUI上各個Executor的GC時間特別長
  • task停頓導致無法響應心跳檢測從而任務失敗

大量小文件的處理

小文件一多,直接讀起來處理的話生成的task數量會讓人目瞪口呆 ,處理的方式也不複雜,能在文件生成的時候就規定好大小是最直接的方法。

如果產生文件的系統不受你的控制那就考慮一下要不要把它們讀起來合併一份。

如果程序開始處理前不能很好的搞定的話,那就在代碼上動動手腳,RDD執行操作前加個coalesce(numPartitions,false)吧,不Shuffle的情況下也可以有效的減低task數量,不過數據可能不會均勻的分布在設置的partition中,因為不Shuffle的話就只是處理local的數據了

RDD緩存

  • 大數據量的RDD不合適持久化,有可能操作任務失敗拖慢進度
  • 盡量不要直接使用原始數據集進行緩存,取出需要的內容減少體積之後再緩存,最好不要使用對象

哦,還有一個,記得用一個變數保存rdd.cache的返回值,不然你怎麼action都沒用。。。

曾經滿懷期待的在Storage頁面上狂刷新等著看緩存的數據,結果一片空白,驚呼有BUG!回頭一看cache有個返回值。。。

Shuffle相關

涉及到網路的一般都是程序中的瓶頸(畢竟和遠程取數據相比,本地數據都可以看成是高速緩存了...)。少年,下手的時候留意一下你屏幕中的Shuffle操作。

稍微留意一下可以讓Shuffle如絲般順滑:

  • spark.shuffle.consolidateFiles
  • spark.shuffle.manager
  • 盡量減少Shuffle操作:其實很多Shuffle操作符都可以通過其他運算元來替換的,比如groupByKey-&>reduceByKey/aggregateByKey,或者其他組合拳
  • 不能避免的情況下:盡量減少Shuffle前的數據集,沒用的欄位都扔了吧,能不用對象還是不要用了
  • 等等篇幅太多了

Shuffle優化網上有很多例子,隨手翻翻還是有很多收穫的。

並行度和任務數量調整

在Yarn管理模式下一些啟動的參數可以決定程序佔多少資源。(例如num-executors/executor-memory/executor-cores這些)

集群啟動的Container數量是根據資源分配的,如果設置的參數太高導致資源不足就只能起最大限制的Container,此時有可能使得資源利用不高。

例如,集群有24個核心,48g的內存,啟動Executor數量為24,每個Executor內存為2g,此時因為內存限制的原因,集群只會15個Container,導致實際上只用到了15個cpu。

如果想達到最大CPU/內存利用率,可以多測試幾下,例如上面的集群中20個Executor,內存為1650m可以啟動20個Container,內存使用41g,cpu利用率也達到了21,包括Driver進程。

當然,也不能一個程序就把所有資源吃掉,可以以這個最大限度作參考,好歹知個底~

可以簡單的這樣估算集群最大承受能力:

  • 根據集群總CPU核心數估算啟動多少個Executor,最好保留幾個核心
  • 使用總內存(減去Driver內存)除以得到的Executor數量估算每個Executor可以使用的內存,一般來說取這個內存的2/3,因為有其他保留內存限制,超出之後可能只會啟動少數Container(可以多測試幾下看最大啟動的內存限制是多少)
  • 根據Executor的情況,合理設置作業的task任務數


謝邀,我這裡重點說一些通用的性能優化建議吧。

代碼方面:

  1. 理解Shuffle對性能的影響,並儘可能減少Shuffle次數(對於Spark是Stage個數)
  2. 理解任務並行度對性能的影響,Task並行執行符合木桶效應,利用好Spark的WebUI學會分析Task慢在哪,並設置合理的任務並發數,對於SparkSQL注意spark.sql.shuffle.partitions參數的配置
  3. 避免嚴重的任務傾斜,即便Task並行度設置的是合理的,也會被傾斜的任務拖慢
  4. 避免嚴重的數據膨脹(類似笛卡爾積操作),防止出現OOM
  5. 利用好cache,將需要反覆用到的RDD或者DataFrame進行cache,加速計算

硬體方面:

  1. Spark進行Shuffle是要寫磁碟的,給伺服器配個SSD專門存Shuffle數據平均至少能帶來20%的性能提升
  2. 經常跑大任務,建議採用萬兆網路環境,避免網路瓶頸
  3. 合理的分配CPU及內存資源,建議1 core對應2G內存,同時伺服器需要預留至少10%左右的內存做文件緩存之類的,過多使用內存會嚴重影響性能
  4. 出現性能問題,善用netstat,top,dstat等工具,確定瓶頸在哪(CPU、內存、網路 or 磁碟?),並改善

系統方面(Linux):

  1. 掛載磁碟設置noatime,提升IO性能
  2. 默認的ulimit參數一般是不夠用的,需要改大

其他:出現性能問題不一定是Spark的問題,大數據生態圈的組件特別多,有可能是其他組件(如:Hadoop、Hive等)出現性能問題,需要對各組件做完善的性能監控(zabbix;Ganglia等)。


spark作為一個將大部分中間數據扔在內存里做中間輾轉地的分散式框架,為人所熟知的是它優秀靈活的計算能力,但越是靈活的東西越需要一些奇技淫巧。

這裡說一些在處理大量數據時一些提速以及安全的方式╮(╯▽╰)╭。

1.join操作。不要用join操作*10086。如果一定需要join完成操作,請用group後再map模擬join操作。這樣不僅安全可靠,而且速度有很大提升。distinct同理。

2.這個有人提到了,spark的速度和你設置executor數目和任務細分數相關。由於shuffle操作會將stage切成兩段,一個shuffle之前的任務全部結束前後面的任務不會開始,調節一個恰當的executor(可以或略大於被任務整除的數目)數目可以顯著提高效率。

3.cache方法和persist方法在對中間數據很大時資源浪費比較大,如果可以建議用save把中間rdd放到hdfs硬碟上。

4.少用shuffle操作,優化的時候尤其注意UI上shuffle write數據量過大的操作,它是導致任務緩慢的最大原因。

5.在可以用reduce解決問題的時候,不要用group。因為reduce會先處理當前節點的所有分片同key進行合併再對不同節點同樣的key進行合併,而group會將所有相同的key整合後再處理。對內存的消耗顯而易見。

6.注意!由於在用flatmap輸出的出參是list。如果list裡面放同一個對象非原始類型的多次引用,那麼在下一步rdd處理時,它們之間會相互影響,像你在單機內存里一樣!即使它們被分到了不同節點!

這個小特性可以完成一些微妙的操作,具體就看使用者了。


說一個最近才踩到的坑吧。。可能弱了一點。

在使用Java/Scala的一些「工具類」之前,一定要確認工具類是否是線程安全的。

比如SimpleDateFormat,別問我怎麼知道的。。


所謂Spark是起源於美國加州大學伯克利分校AMPLab的大數據計算平台,在2010年開源,目前是Apache軟體基金會的頂級項目。隨著Spark在大數據計算領域的暫露頭角,越來越多的企業開始關注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapReduce保持的排序記錄。Spark利用1/10的節點數,把100TB數據的排序時間從72分鐘提高到了23分鐘。

Spark在架構上包括內核部分和4個官方子模塊

Spark SQL

Spark Streaming

機器學習庫MLlib

圖計算庫GraphX

由Spark在伯克利的數據分析軟體棧BDAS(Berkeley Data Analytics Stack)中的位置可見,Spark專註於數據的計算,而數據的存儲在生產環境中往往還是由Hadoop分散式文件系統HDFS承擔。

Spark在BDAS中的位置

Spark被設計成支持多場景的通用大數據計算平台,它可以解決大數據計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數據源的讀取數據,並且擁有不斷發展的機器學習庫和圖計算庫供開發者使用。數據和計算在Spark內核及Spark的子模塊中是打通的,這就意味著Spark內核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內核為基礎,進一步支持更多的計算場景,例如使用Spark SQL讀入的數據可以作為機器學習庫MLlib的輸入。以下列舉了一些在Spark平台上的計算場景。

Spark的應用場景舉例

之前在大數據概述的課程中我們提到了Hadoop,大數據工程師都非常了解Hadoop MapReduce一個最大的問題是在很多應用場景中速度非常慢,只適合離線的計算任務。這是由於MapReduce需要將任務劃分成map和reduce兩個階段,map階段產生的中間結果要寫回磁碟,而在這兩個階段之間需要進行shuffle操作。Shuffle操作需要從網路中的各個節點進行數據拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapReduce慢的根本原因之一,大量的時間耗費在網路磁碟IO中而不是用於計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapReduce的弊端會顯得更加明顯。

那Spark是如果設計分散式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數據集(Resilient Distributed Dataset),也就是RDD

關鍵詞:彈性分布數據集RDD

RDD是Spark中對數據和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的並能夠被並行操作的數據集合。對RDD的操作分為兩種transformation和action。Transformation操作是通過轉換從一個或多個RDD生成新的RDD。Action操作是從RDD生成最後的計算結果。在Spark最新的版本中,提供豐富的transformation和action操作,比起MapReduce計算模型中僅有的兩種操作,會大大簡化程序開發的難度。

RDD的生成方式只有兩種,一是從數據源讀入,另一種就是從其它RDD通過transformation操作轉換。一個典型的Spark程序就是通過Spark上下文環境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,最後通過調用最終RDD的action方法輸出結果。

每個RDD都可以用下面5個特性來表示,其中後兩個為可選的:

  • 分片列表(數據塊列表)

  • 計算每個分片的函數

  • 對父RDD的依賴列表

  • 對key-value類型的RDD的分片器(Partitioner)(可選)

  • 每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)(可選)

雖然Spark是基於內存的計算,但RDD不光可以存儲在內存中,根據useDisk、useMemory、useOffHeap, deserialized、replication五個參數的組合Spark提供了12種存儲級別,在後面介紹RDD的容錯機制時,我們會進一步理解。值得注意的是當StorageLevel設置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基於內存的分散式文件系統,目前正在快速發展,在這裡我們就不做詳細介紹啦,可以通過其官方網站進一步了解。

DAG、Stage與任務的生成

Spark的計算髮生在RDD的action操作,而對action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。

Spark內核會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。舉個例子,在下圖中,從輸入中邏輯上生成A和C兩個RDD,經過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因為這時候計算沒有發生,Spark內核做的事情只是記錄了RDD的生成和依賴關係。當F要進行輸出時,也就是F進行了action操作,Spark會根據RDD的依賴生成DAG,並從起點開始真正的計算。

邏輯上的計算過程:DAG

有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分成任務集,也就是Stage,這樣可以將任務提交到計算節點進行真正的計算。Spark計算的中間結果默認是保存在內存中的,Spark在劃分Stage的時候會充分考慮在分散式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據就是RDD的依賴類型。

根據不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴於父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴於父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有groupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對於窄依賴,Spark會將其盡量劃分在同一個stage中,因為它們可以進行流水線計算。

RDD的寬依賴和窄依賴

最後我們再通過下圖來詳細解釋一下Spark中的Stage劃分。我們從HDFS中讀入數據生成3個不同的RDD,通過一系列transformation操作後再將計算結果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內核會以此為邊界將其前後劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。

Spark中的Stage劃分


在今年的Spark Summit上聽到的: Top 5 Mistakes When Writing Spark Applications(視頻需要科學上網)


根據不同的應用程序進行Spark參數配置,須知沒有哪一種配置是通吃所有的benchmark的


好了了了了了


默認配置通常不理想。設置恰當的worker數量和內存可以減少總體時間。


推薦閱讀:

hadoop和大數據的關係?和spark的關係?互補?並行?
Zookeeper在哪些系統中使用,又是怎麼用的?
分散式計算框架 Hadoop 為什麼叫 "Hadoop" ?
演算法研發工作中對於MPI和Spark的一些困惑?
做大數據相關專業,如Hadoop、Hive、Impala等研究與優化的人就業情況如何呢?

TAG:編程 | Hadoop | 並行計算 | HDFS | Spark |