spark的shuffle和Hadoop的shuffle(mapreduce)的區別和關係是什麼?

對spark的shuffle和Hadoop的shuffle(mapreduce)的區別和關係不是很理解,求解釋。


謝謝 @上班狗@Ryan Fan 邀請

最近忙成狗,就不詳述實現過程了,僅僅對比一下。

1. 從邏輯角度來講,Shuffle 過程就是一個 GroupByKey 的過程,兩者沒有本質區別。

只是 MapReduce 為了方便 GroupBy 存在於不同 partition 中的 key/value records,就提前對 key 進行排序。Spark 認為很多應用不需要對 key 排序,就默認沒有在 GroupBy 的過程中對 key 排序。

2. 從數據流角度講,兩者有差別。

MapReduce 只能從一個 Map Stage shuffle 數據,Spark 可以從多個 Map Stages shuffle 數據(這是 DAG 型數據流的優勢,可以表達複雜的數據流操作,參見 CoGroup(), join() 等操作的數據流圖 SparkInternals/4-shuffleDetails.md at master · JerryLead/SparkInternals · GitHub)。

3. Shuffle write/read 實現上有一些區別。

以前對 shuffle write/read 的分類是 sort-based 和 hash-based。MapReduce 可以說是 sort-based,shuffle write 和 shuffle read 過程都是基於key sorting 的 (buffering records + in-memory sort + on-disk external sorting)。早期的 Spark 是 hash-based,shuffle write 和 shuffle read 都使用 HashMap-like 的數據結構進行 aggregate (without key sorting)。但目前的 Spark 是兩者的結合體,shuffle write 可以是 sort-based (only sort partition id, without key sorting),shuffle read 階段可以是 hash-based。因此,目前 sort-based 和 hash-based 已經「你中有我,我中有你」,界限已經不那麼清晰。

4. 從數據 fetch 與數據計算的重疊粒度來講,兩者有細微區別。

MapReduce 是粗粒度,reducer fetch 到的 records 先被放到 shuffle buffer 中休息,當 shuffle buffer 快滿時,才對它們進行 combine()。而 Spark 是細粒度,可以即時將 fetch 到的 record 與 HashMap 中相同 key 的 record 進行 aggregate。

5. 從性能優化角度來講,Spark考慮的更全面。

MapReduce 的 shuffle 方式單一。Spark 針對不同類型的操作、不同類型的參數,會使用不同的 shuffle write 方式。比如 Shuffle write 有三種實現方式:

其中 Serialized sorting 方式既可以使用堆內內存,也可以使用堆外內存。更多的細節就不詳述了,感興趣可以看相關的實現類。繼續幹活!


謝邀,同時謝謝答案區 @天八八和 @Ryan Fan的討論。從原理上來說是幾乎一樣的,具體可以看這個介紹:

hadoop shuffle過程

Shuffle 過程 (注意這個文章裡面的一些觀點和我先前的答案一樣有些過時)

執行方面在最新版本看來已經大同小異,

1) 首先Spark已經具有了sort shuffle和hash shuffle 兩個功能。[1]

2) 對於Hadoop,最初只有sort shuffle。在最新版本中已經具有了專有shuffle 插件這樣一個功能,讓用戶可以自己寫自己喜歡用的shuffle function。所以可以說已經包含了1)中的功能,而且更廣。[2-3]

[1] [SPARK-2045] Sort-based shuffle implementation

[2] Apache Hadoop 3.0.0-alpha1

[3] Apache Hadoop 2.7.1 (hadoop encrypted shuffle)

另外一些有意思的例子可以看這兩篇文章,注意可能有些過時

https://weishungchung.com/2014/08/10/shuffling-in-spark-vs-hadoop-mapreduce/

Apache Spark源碼走讀之24 -- Sort-based Shuffle的設計與實現


謝謝邀請。

先上一張圖:

解說:

1、MapReduce在Map階段完成之後數據會被寫入到內存中的一個環形緩衝區(後續的分區/分組/排序在這裡完成);Spark的Map階段完成之後直接輸出到磁碟。

2、受第一步的影響,MapReduce輸出的數據是有序的(針對單個Map數據來說);Spark的數據是無序的(可以使用RDD運算元達到排序的效果)。

3、MapReduce緩衝區的數據處理完之後會spill到磁碟形成一個文件,文件數量達到閾值之後將會進行merge操作,將多個小文件合併為一個大文件;Spark沒有merge過程,一個Map中如果有對應多個Reduce的數據,則直接寫多個磁碟文件。

4、MapReduce全部通過網路來獲得數據;對於本地數據Spark可以直接讀取

5、就是上圖剩下的步驟。。。


感謝邀請,樓上的大牛們都已經回答的差不多了,基本已經算是一個學習帖子了。

不過,既然邀請了,還是要啰嗦幾句。

從 High-Level 的角度來看,兩者並沒有大的差別。

都是將 Mapper(Spark 里是 Shufflemaptask)的輸出進行 Partition,不同的 Partition 送到不同的 Reducer(Spark 里 Reducer 可能是下一個 Stage 里的 Shufflemaptask,也可能是 Resulttask)。Reducer 以內存作緩衝區,邊 Shuffle 邊 Aggregate 數據,等到數據 Aggregate 好以後進行 Reduce() (Spark 里可能是後續的一系列操作)。

從 Low-Level 的角度來看,兩者差別不小。

Hadoop Mapreduce 是 Sort-Based,進入 Combine() 和 Reduce() 的 Records 必須先 Sort。這樣的好處在於 Combine/Reduce() 可以處理大規模的數據,因為其輸入數據可以通過外排得到(Mapper 對每段數據先做排序,Reducer 的 Shuffle 對排好序的每段數據做歸併)。目前的 Spark 默認選擇的是 Hash-Based,通常使用 Hashmap 來對 Shuffle 來的數據進行 Aggregate,不會對數據進行提前排序。如果用戶需要經過排序的數據,那麼需要自己調用類似 Sortbykey() 的操作;如果你是 Spark 1.1 的用戶,可以將Spark.Shuffle.Manager 設置為 Sort,則會對數據進行排序。在 Spark 1.2中,Sort 將作為默認的 Shuffle 實現。


謝 @周翀 邀

但是作為一個Spark領域的菜鳥,要回答這麼專業的技術問題,是不敢造次的。

貼上自己在學Spark時參考的一個很棒的github文檔,作者 @Lijie Xu 大牛或許可以不吝賜教?

SparkInternals/4-shuffleDetails.md at master · JerryLead/SparkInternals · GitHub


謝邀。我出來打個醬油,發現被邀請回答一個大數據精髓類問題,本著一知半解毀人不倦的態度,我決定亂按鍵盤一會兒。

聯繫:

1)本質上,Spark和Hadoop的shuffle都是對mapreduce論文中體現的思維模式在具體的實踐中通過不同的程序語言(Scala vs Java)來實現的一種實踐,排除那些花哨的噱頭,兩者是殊途同歸的;當然,由於Spark在Hadoop之後,它基於Hadoop的MR已有的優缺點作出了自己的改進(同時也挖了一些新坑,但是坑較少),目前看起來是有些「後來居上」的意味,從我的一些經驗來看,官網上面吹噓的100x的性能差距,僅在某些小數據集的特定場景下確實能虐Hadoop幾條街,但是如果放到日常的數據(非實驗/測試環境)平台來跑,5-10倍已經是相當逆天了;另外兩者給人一個顯著不同是: Yahoo的工程能力確實強,Hadoop的MR即使再慢,總是能到達終點,順利跑完;而Spark的學院血統對於某些問題的解決方式確實讓人眼前一亮(當然這可能是因為答主對FP不熟悉導致看到Scala有些用法覺得獻出去的膝蓋無法收回),但是Spark卻時不時的就跑掛了,你跑了一天半的數據,然後去吃了個拉麵,回來,我靠,竟然掛了~掛了,你心頭是否會有一萬隻 飄過?

2)另外,現在隨著Java 8一些新特性的發布,從Spark 2.0的發布,Hadoop的更新,我越來越覺得兩者在趨同化了,差異性越來越不明顯,Hadoop按照自己的roadmap走倒是越來越強大,但是Spark,答主在它還是相當低版本的時候,作為計算引擎的時候,像風一樣的男子,那速度,那性能,簡直就是你在對Hadoop無語多年後適時而來讓你終於可以吐槽的宣洩口的感覺! 而現在,連Spark SQL都來了,對於這個路線答主無力表達什麼,只是忽然覺得越來越豐滿的Spark,越來越強大的Spark,我卻沒有那麼喜歡它了。

區別:

如今的Shuffle, map,spill,sort,combine,reduce,merge都已經成為標配,區別在哪裡呢? 我掛一漏萬,說幾個自己的理解(如果錯了,煩請指出,千萬別打臉)。

1) 最大的區別: 中間數據的存儲/落地。 Spark一針見血的解決了Hadoop MR的痼疾:神馬東西/屁大點兒東西都要往HDFS上面放,HDFS是跨機器的,多副本的,你這一遍一遍的寫,然後一遍一遍的拉取,天都黑了;而Spark針對這個問題,能不落地的就不落地,非要落地的也可以直接落地到本地磁碟上,而非總是通過網路(HDFS)讀寫,這節省出來的latency時間5分鐘,可以通話兩小時!

Hadoop認為數據總是很大的,節點總是會掛的,所以它推崇一步一個腳印的搞,所以就慢的令人髮指;而眾所周知,數據集在很多時候是可以想辦法變小的, Spark就抓住這一點,你的數據集並非總是那麼大,你的機器也並非總是會掛,哥就跑個完美的場景: 數據集小於集群內存,生存的文件經常都很小,這時候就是Spark開掛的時候了! 在今天很多2U集群動輒雙路/四路CPU,128GB內存,12TB硬碟的情況下,稍微多幾個節點,數據很」大「的問題就沒有那麼嚴重了。

這點在最早期版本的Spark中尤為明顯,數據幾乎都不落地,所以性能碾壓Hadoop慘絕人寰,當然致命的問題就是, RDD經過各種轉換之後,經常爆倉了, OOM,然後。。。然後現在也map都要每一條落地了,通過配置,也可以合併小文件了,性能嘛,在大數據集下也就下降了,這就是上面講的」趨同化「導致的,所以如果有人直接告訴你Spark百倍於Hadoop,那是耍流氓。

2)Hadoop的reducer一定要等到所有的mapper全部完成之後才能開始,而Spark的reducer在mapper的總算完成到一定的量之後就會開始,也就是可以一邊map一邊reduce,這估計是另外一個加速度的效果導致比Hadoop的MR要快。

3)Spark具體到代碼級別的實現時,有很多惰性求值的寫法在裡面,導致程序運行的效率在看不見的地方卻比Hadoop優化了非常多,怎麼類比好呢?Hadoop的MR方式具體在實現上就好比我們用同步的命令式的Javascript方式實現了資料庫連接,而Spark採用了非同步的閉包式的搞法來連接,威力是很不一樣的;這些特徵在Shuffle這類操作是功效尤為明顯;但是在Java 8發布之後,以及將來Java繼續擴展FP式的功能之後, Spark的這方面的優勢可能就沒有那麼明顯了。

4)Spark講Inputs抽象稱為RDD,在各個不同的Stage,各種花式轉換,之後還是RDD,相對於Hadoop那一坨一坨的MR簡直不要優雅,所以如果你想定製Shuffle階段的比如排序演算法,都會更加越來越熟練,而Hadoop的MR寫完一周沒看,下一次拿起來完全不知道什麼東西; RDD這個抽象我覺得在代碼的可讀性上面做的簡直太好了,看到RDD時有當年看到Wordpress將所有內容抽象成為post時的會心一笑,這對於學習一個開源的框架實在太重要了,我依然覺得,抽象的越優雅的東西越簡單,越簡單的東西越容易組合,組合的力量也越大。

5)Hadoop的MR過程,如果一個Job執行錯了,需要從頭再來一遍;而Spark的job執行有類似於「增量」的效果在裡面, 如果job執行到哪個步驟出錯了,這個錯誤之前的結果會被直接用到下一次的job執行中,也即是job可以做到從出錯的地方開始,而非每次都全部從頭再來。

6)綜上所述,答主大膽猜測Spark這幫Commiter(sun子)對Hadoopd了解程度甚至有可能超過Spark本身,Spark的現有方案直指Hadoop的命門,我覺得Spark這是對Hadoop的:

真愛

而看著同樣的一件事情,不同的實現方案,都是這麼帥氣,「Think Different」,世界又可愛了幾分。


我在https://0x0fff.com/spark-misconceptions/中讀到關於Hadoop Shuffle和Spark Shuffle的區別是這樣的:

Spark puts the data on HDDs only once during shuffles, MR do it 2 times.

但沒太明白這句話的內部含義是什麼,還請知乎上各位大牛,幫忙解答一下~~

謝謝!~


著作權歸作者所有。

商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。

作者:楊鑫newlfe

鏈接:Hadoop中的Shuffle 與 Spark中的Shuffle的區別與聯繫

來源:CSDN博客

Hadoop中的Shuffle 與 Spark中的Shuffle的區別與聯繫

MapReduce過程、Spark和Hadoop以Shuffle為中心的對比分析

mapreduce與Spark的map-Shuffle-reduce過程

mapreduce過程解析(mapreduce採用的是sort-based shuffle)

將獲取到的數據分片partition進行解析,獲得k/v對,之後交由map()進行處理.

map函數處理完成之後,進入collect階段,對處理後的k/v對進行收集,存儲在內存的環形緩衝區中。

當環形緩衝區中的數據達到閥值之後(也可能一直沒有達到閥值,也一樣要將內存中的數據寫入磁碟),將內存緩衝區中的數據通過SpillThread線程轉移到磁碟上。需要注意的是,轉移之前,首先利用快排對記錄數據進行排序(原則是先按照分區編號,再按照key進行排序,注意,排序是在寫入磁碟之前的)。之後按照partition編號,獲取上述排序之後的數據並將其寫入Spill.out文件中(一個Spill.out文件中可能會有多個分區的數據--因為一次map操作會有多次的spill的過程),需要注意的是,如果人為設置了combiner,在寫入文件之前,需要對每個分區中的數據進行聚集操作。該文件同時又對應SpillRecord結構(Spill.out文件索引)。

map的最後一個階段是merge:該過程會將每一個Spill.out文件合併成為一個大文件(該文件也有對應的索引文件),合併的過程很簡單,就是將多個Spill.out文件的在同一個partition的數據進行合併。(第一次聚合)

shuffle階段。首先要說明的是shuffle階段有兩種閥值設置。第一,獲取來自map的結果數據的時候,根據數據大小(file.out的大小)自然劃分到內存或者是磁碟(這種閥值的設置跟map階段完全不同);第二,內存和磁碟能夠保存的文件數目有閥值,超出閥值,會對文件進行merge操作,即小文件合併成為大文件。Shuffle過程:

1)獲取完成的Map Task列表。

2)進行數據的遠程拷貝(http get的方法),根據數據文件的大小自然劃分到內存或者是磁碟。

3)當內存或者磁碟的文件較多時,進行文件合併。(第二次聚合)

reduce之前需要進行Sort操作,但是兩個階段是並行化的,Sort在內存或者磁碟中建立小頂堆,並保存了指向該小頂堆根節點的迭代器,同時Reduce Task通過迭代器將key相同的數據順次講給reduce()函數進行處理。

Spark Shuffle過程解析(採用hash-based shuffle)

RDD是Spark與Hadoop之間最明顯的差別(數據結構),Spark中的RDD具有很多特性,在這裡就不再贅述。

Spark與Hadoop之間的Shuffle過程大致類似,Spark的Shuffle的前後也各有一次聚合操作。但是也有很明顯的差別:Hadoop的shuffle過程是明顯的幾個階段:map(),spill,merge,shuffle,sort,reduce()等,是按照流程順次執行的,屬於push類型;但是,Spark不一樣,因為Spark的Shuffle過程是運算元驅動的,具有懶執行的特點,屬於pull類型。

Spark與Hadoop的Shuffle之間第二個明顯的差別是,Spark的Shuffle是hash-based類型的,而Hadoop的Shuffle是sort-based類型的。下面簡介一下Spark的Shuffle:

1.正因為是運算元驅動的,Spark的Shuffle主要是兩個階段:Shuffle Write和Shuffle Read。

2.ShuffleMapTask的整個的執行過程就是Shuffle Write階段

3.Sprk的Shuffle過程剛開始的操作就是將map的結果文件中的數據記錄送到對應的bucket裡面(緩衝區),分到哪一個bucket根據key來決定(該過程是hash的過程,每一個bucket都對應最終的reducer,也就是說在hash-based下,數據會自動劃分到對應reducer的bucket裡面)。之後,每個bucket裡面的數據會不斷被寫到本地磁碟上,形成一個ShuffleBlockFile,或者簡稱FileSegment。上述就是整個ShuffleMapTask過程。之後,reducer會去fetch屬於自己的FileSegment,進入shuffle read階段。

4.需要注意的是reducer進行數據的fetch操作是等到所有的ShuffleMapTask執行完才開始進行的,因為所有的ShuffleMapTask可能不在同一個stage裡面,而stage執行後提交是要在父stage執行提交之後才能進行的,所以fetch操作並不是FileSegment產生就執行的。

5.需要注意的是,剛fetch來的FileSegment存放在softBuffer緩衝區,Spark規定這個緩衝界限不能超過spark.reducer.maxMbInFlight,這裡用softBuffer表示,默認大小48MB。

6.經過reduce處理後的數據放在內存+磁碟上(採用相關策略進行spill)。

7.fetch一旦開始,就會邊fetch邊處理(reduce)。MapReduce shuffle階段就是邊fetch邊使用combine()進行處理,但是combine()處理的是部分數據。MapReduce不能做到邊fetch邊reduce處理,因為MapReduce為了讓進入reduce()的records有序,必須等到全部數據都shuffle-sort後再開始reduce()。然而,Spark不要求shuffle後的數據全局有序,因此沒必要等到全部數據shuffle完成後再處理。為了實現邊shuffle邊處理,而且流入的records是無序的可以用aggregate的數據結構,比如HashMap。

hash-based 和 sort-based的對比

hash-based故名思義也就是在Shuffle的過程中寫數據時不做排序操作,只是將數據根據Hash的結果,將各個Reduce分區的數據寫到各自的磁碟文件中。這樣帶來的問題就是如果Reduce分區的數量比較大的話,將會產生大量的磁碟文件(Map*Reduce)。如果文件數量特別巨大,對文件讀寫的性能會帶來比較大的影響,此外由於同時打開的文件句柄數量眾多,序列化,以及壓縮等操作需要分配的臨時內存空間也可能會迅速膨脹到無法接受的地步,對內存的使用和GC帶來很大的壓力,在Executor內存比較小的情況下尤為突出,例如Spark on Yarn模式。但是這種方式也是有改善的方法的:

在一個core上連續執行的ShuffleMapTasks可以共用一個輸出文件ShuffleFile。先執行完的ShuffleMapTask形成ShuffleBlock i,後執行的ShuffleMapTask可以將輸出數據直接追加到ShuffleBlock i後面,形成ShuffleBlock i』,每個ShuffleBlock被稱為FileSegment。下一個stage的reducer只需要fetch整個ShuffleFile就行了。這樣的話,整個shuffle文件的數目就變為C*R了。

sort-based是Spark1.1版本之後實現的一個試驗性(也就是一些功能和介面還在開發演變中)的ShuffleManager,它在寫入分區數據的時候,首先會根據實際情況對數據採用不同的方式進行排序操作,底線是至少按照Reduce分區Partition進行排序,這樣來至於同一個Map任務Shuffle到不同的Reduce分區中去的所有數據都可以寫入到同一個外部磁碟文件中去,用簡單的Offset標誌不同Reduce分區的數據在這個文件中的偏移量。這樣一個Map任務就只需要生成一個shuffle文件,從而避免了上述HashShuffleManager可能遇到的文件數量巨大的問題。上述過程與mapreduce的過程類似。

兩者的性能比較,取決於內存,排序,文件操作等因素的綜合影響。

對於不需要進行排序的Shuffle操作來說,如repartition等,如果文件數量不是特別巨大,HashShuffleManager面臨的內存問題不大,而SortShuffleManager需要額外的根據Partition進行排序,顯然HashShuffleManager的效率會更高。

而對於本來就需要在Map端進行排序的Shuffle操作來說,如ReduceByKey等,使用HashShuffleManager雖然在寫數據時不排序,但在其它的步驟中仍然需要排序,而SortShuffleManager則可以將寫數據和排序兩個工作合併在一起執行,因此即使不考慮HashShuffleManager的內存使用問題,SortShuffleManager依舊可能更快。


推薦閱讀:

研一學生~突然搞起了Big Data。在學Hadoop中,突然意識到只會用工具是遠遠不夠的,很想搞懂立面的演算法和思想。請教達人們指點分散式存儲與運算的主要思想與演算法?
Hadoop和Hadoop2有很大的區別么?
如何入門分散式計算?
雙十一時,阿里的技術人員在後台做了些什麼?

TAG:分散式計算 | Hadoop | Spark |