MapReduce過程中,如果Map之後每個Key對應Value的數量不平衡會不會影響效率?
貌似在Reduce過程,將會收到所有以某個相同值為Key的List。這個Reduce是否會被Hadoop拆分到多個TaskTracker上執行嗎?還是一個Key只能對應在一個TaskTracker上執行?
舉個例子來說,例如詞頻統計,Map過程一般是產生以單詞為key,以1為value的鍵值對。在Reduce過程,會受到某個詞的一個「包含一大堆1」的List,然後再對這個List進行sum操作。假如現在以某個單詞為Key的List中包含了1T個「1」,那這個操作會在單機上進行嘛(這樣豈不是要在單機上做1T個數據記數)?還是需要手動優化演算法(例如讓相同Key的盡量少,每個TaskTracker分配的任務都很小)?(剛接觸Hadoop,所以內部工作機制還沒來得及研究。
這個問題,就是經典的數據傾斜(data-skew)問題,在Hadoop上,數據傾斜問題一般由用戶自己解決。
例如,詞頻統計的一種實現是,map端輸出(word, 空串) 給reduce端,reduce端統計每個word下數據條數。
則如果用戶傳入的是上述邏輯,如果你將一個key下的reduce拆開分散式的執行,則每個拆開的並發上得到的count不能再使用同樣的reduce邏輯進行最終聚合,而必須將最終聚合的reduce邏輯換成求輸入的所有數據的和。
則可以看出如果用戶只傳入了前述map端與reduce端的邏輯,框架是不可能將reduce一個key下的數據拆開再合併成最終結果的。
但為了用戶可以方便的解決這種場景下的數據傾斜問題,Hadoop提供了Combiner,即,用戶可以設置一個預聚合的邏輯,map的產出先在每個Mapper本地交給預聚合的Combiner運算元完成同key預聚合,然後再將Combiner的產出交給Reducer去處理。例如,前述詞頻統計的例子,就可以使用Combiner在Map端預統計本part上每個key出現的次數,再在Reduce端完成最終的求和。
而數據傾斜問題情況很多,Combiner並不能解決所有的問題。
例如,現在有一份日誌文件,每行是一個訪問日誌,表示某用戶訪問了某個網站,現要以此統計多個網站每個網站的uv(即不同的用戶數)。
那麼,一種較優的執行策略就是我們可以在每個map端在每個website下對user去重,然後把去重後的user發往reduce端,並設置partitioner確保同website的數據由同一個reducer處理(由於可以按website + user排序,實際上Reduce端可以做到O(1)的空間複雜度,這裡暫不詳述方法)。
上述方法就是一個比較高效的作法了,但是它在website下數據嚴重不均時,依然存在問題,即假設在某個website下,每個map端去重後都有較多user,那麼可能該reducer依然需要花費大量時間進行去重,這種場景下,解決數據不均的方案就是將一輪MR拆成兩輪,第一輪按照website + user進行partition,僅僅完成去重,完成後再進行一輪MR去做按website統計user個數。
需要注意的是,一般場景下,如果沒有數據傾斜,使用一輪MR要比使用兩輪MR更好一些,因為畢竟少了一輪Shuffle。
總結一下這個方案就是重設partition的key,告訴MR框架按照更細的粒度劃分reduce,進行預聚合,隨後再按原來的目標劃分方式重新聚合。
從介面層面來看,在現有的任何開源上的系統中上述優化都是無法自動完成的,需要用戶自己分析自己的業務邏輯來手工完成優化。究其本源,我認為是因為現有的分散式系統里partition方式都是確定的、用戶需要感知的,故不能由框架自由依據情況裁定。
打一下廣告再,我們目前在百度內部做的一套系統,可以解決這種場景的自動優化,因為我們可以在計算過程中完全屏蔽掉partition的概念。目前已完成一篇論文,正在投遞狀態。預計今年Q3可能會開源出來。敬請期待。
另還有一個方案是有時候可以將特殊的數據數據量最大的key過濾出來單獨處理,這裡不再詳述。
另外,前述的都是在進行聚合時的解決方案,在join操作時也經常遇到數據傾斜,一般有三種策略,一是改map join,即如果存在可在內存中載入的小表,則將小表在map端內存里載入,join操作完成在map端搞定;二是將特殊數據單獨處理的方案這裡也適用;三是將一個表加隨機key,另一個表broadcast N份進行join。(注意上述優化可能並不適用於full join的場景)
暫時先寫這麼多吧,有空的話(大概率是坑)再整理吧。
謝邀
剛剛特意翻了一下書
確認你第一個問題答案是不會再拆解的再分配給Reducer之前
作一個combiner 這個操作是在mapper之後做的 就是要達到部分聚合 減少網路中傳輸的數據量舉例子 假如有3個mapper a b c 單詞"hi"在原文本中總出現了30w次
經過mapper + combiner後 各台mapper輸出
("hi", 15w)("hi", 5w)("hi", 10w)最後聚在某reducer r ("hi",30w)
還有 在我的回答中有(近期有推薦hadoop書單 可以看下當然會影響效率,試想一下,如果極端情況,假設shuffle過程產生了100個Key, 假設某個Key匯聚了10萬個值,而其它Key都匯聚了一個值。宏觀上看,幾乎所有數據不就都匯聚到一個節點,集群豈不是變成了單機。Spark本身提供了兩種Key機制,Hash 和Range,另外還支持用戶自定義的Key類型。 針對不Key平衡的情況,可以考慮通過Range,把多個稀疏的Key值劃在一個Key的範圍內,這樣可以保持Key的相對平衡。
這個是會影響效率的,典型的Data Skew,已經有好多這方面的研究工作,一般的思想就是調整中間數據的劃分策略。
是的,相同的key是會發到同一個reduce處理。如果相同的key對應的value量級太大確實會導致這個reduce很慢。如果其他的reduce要處理的數據相對較少,就會出現數據傾斜。 數據傾斜一般是指某一個partitioner的處理的數據量遠大於其他的partitioner,這個在分散式系統中很常見。一般有兩種原因:(1)partition演算法不夠隨機,這種情況比較少見,換相對隨機的演算法就可以。(2)某個key對應的數據非常多,而這個key對應的數據又必須發送到同一個partitioner進行處理。 對於第二種數據傾斜沒有一個統一的處理方式,要看具體問題。一般常見的方式可以將key加一個隨機擾動量,使得量大的這個key均勻分到不同的reduce中去處理。再起一輪mapreduce處理這次的結果,將key的擾動量去掉,去統計結果。因為第一步reduce處理將key對應的結果大大減少了(題主這個case每個reduce一個key只輸出一條記錄),所以到第二輪mapreduce這個key對應的數據就很少了。 另外,如果一個map輸出的相同的key記錄很多,可以加combiner進行map端的reduce,減少數據量。對於題主的這個case的數據傾斜應該也是有效的,前提是你這個量很大的key在所有的map中也相對均勻的。不然也會導致少量map運行特別久。
這是常見的數據傾斜。解決該問題可以在mapper和reducer之間加個combiner。
肯定會啊,這個就是所謂的「數據傾斜」問題
目測題主的情況是剛開始學習分散式計算,我就用人話講講:
分散式計算可以理解成一個工作太大了,一個人幹不了(一台機器處理不過來),所以找了好多人一起干,每個人干一些。然後就涉及工作的分配了,比如題主這裡提到的詞頻統計,需要統計幾個文件里的每個詞都一共出現了多少次。可能需要統計的詞太多了,放在好多個文件里,一個人處理不來這麼多的文件,也處理不了這麼多的詞。於是就找2批人,一批(稱為map)就負責讀文件,讀到一個詞後就告訴下一批人里負責這個詞的人,說這個詞又有一次。最後前面這批人都干好了後,第二批人(稱為reduce)就統計前面那批人告訴過來的結果,一次次的加,最後得到結果。對於第一批人,他們每個人只讀了一部分的文件,而第二批人,每個人只負責幾個詞。
分散式計算不患寡而患不均,要是有個人,比如說拿到的詞是很常見的詞,比如「的」這個詞,出現的次數太多了,那整個任務其他人跑好了後,就他一直干不完活,作業也不能算結束,大家都得一起等著。這個就叫長尾或數據傾斜。(覺得這裡說的太直白的可以深扒這兩個詞)
解法的話,現在比較常見的有2種,一種是map端combiner,也就是第一批人你別告訴我「的」有一個,然後又告訴我「的」有一個,你直接告訴我「的」有兩個。那這次的統計工作,其實就有很大一批被分配到前面一批人的身上了。雖然活一定要乾的,只是現在長尾工作分配到第一批人身上,而他們每個人讀不同的文件,一般情況下遇到「的」這個詞的概率差不多,那就不會出現多個人等一個的情況。另外這樣還能減少中間的網路傳輸,以前要傳N次,現在只需要傳一次匯總後的結果了。不過這個也不是萬能的,比如要求平均值,不能先每個map先算一次平均值,最後再重新拿這些平均值再平均一次,而是每個map的combiner里要先算一次count_combiner和sum_combiner,最後reduce里算avg = sum(count_combiner)/sum(sum_combiner)。而有一些計算可能根本就沒辦法。另外還需要提到一種解法,就是先知道哪個詞工作量特別多(本例「的」這個詞),然後為這個專門的詞負責多個工作人員。最後這幾個工作人員再把他們的結果匯總一下,對應到hive里是hive.groupby.skewindata。
著作權歸作者所有。
商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。
作者:greahuang
鏈接:hadoop 中map、reduce數量對mapreduce執行速度的影響
來源:CSDN博客
增加task的數量,一方面增加了系統的開銷,另一方面增加了負載平衡和減小了任務失敗的代價;
map task的數量即mapred.map.tasks的參數值,用戶不能直接設置這個參數。Input Split的大小,決定了一個Job擁有多少個map。默認input split的大小是64M(與dfs.block.size的默認值相同)。然而,如果輸入的數據量巨大,那麼默認的64M的block會有幾萬甚至幾十萬的Map Task,集群的網路傳輸會很大,最嚴重的是給Job Tracker的調度、隊列、內存都會帶來很大壓力。mapred.min.split.size這個配置項決定了每個 Input Split的最小值,用戶可以修改這個參數,從而改變map task的數量。一個恰當的map並行度是大約每個節點10-100個map,且最好每個map的執行時間至少一分鐘。 reduce task的數量由mapred.reduce.tasks這個參數設定,默認值是1。合適的reduce task數量是0.95或者0.75*( nodes * mapred.tasktracker.reduce.tasks.maximum), 其中,mapred.tasktracker.tasks.reduce.maximum的數量一般設置為各節點cpu core數量,即能同時計算的slot數量。對於0.95,當map結束時,所有的reduce能夠立即啟動;對於0.75,較快的節點結束第一輪reduce後,可以開始第二輪的reduce任務,從而提高負載均衡。測試總結:一 數據2億的數據量整個測試過程中輸入數據保持不變,輸出邏輯數據保持不變二 測試數據第一組數據:1.map=128,reduce=20Elapsed: 5hrs, 18mins, 56secDiagnostics: Average Map Time 1hrs, 43mins, 29secAverage Reduce Time 2hrs, 36mins, 2secAverage Shuffle Time 1hrs, 14mins, 12secAverage Merge Time 0sec 2.map=256,reduce=20Elapsed: 4hrs, 23mins, 59secDiagnostics: Average Map Time 56mins, 23secAverage Reduce Time 2hrs, 40mins, 58secAverage Shuffle Time 1hrs, 5mins, 56secAverage Merge Time 2sec3.map=512,reduce=20 Elapsed: 4hrs, 13mins, 31sec
Diagnostics: Average Map Time 28mins, 16secAverage Reduce Time 2hrs, 37mins, 19secAverage Shuffle Time 56mins, 45secAverage Merge Time 1mins, 39sec由上面的數據來看,在reduce數量不變前提下map執行時間會成倍數下降,但是shuffle和redue執行時間變化很小,導致,總體執行時間越到最後執行時間下降越緩慢
當遇到skew 問題的時候,我知道的辦法是重寫partition。舉個例子吧,比如你要爬某網站,但是domain是相同的,你就不能用domain來作為partition,這樣幾乎所有的網站都會hashing 以及skew到一個map裡面去,一個intuitive 的方法就是 partition by URL,這樣就可以解決skew。 這個例子可能不太恰當,但是是這個意思
至於combiner,雖然會減少數據的傳輸,但是其對於內存的消耗還是挺大的,combiner是在map階段收尾的時候做,就好像是提前reduce了一下,這兩種方法具體情況具體應用~我記得在網上看到過類似解決方案。出現這種情況的話,可以在相同的key上做個二級標記,然後再選用合適的partion策略。比如單詞 we出現10w次,而其他單詞只出現幾次,那可以對we進行人為二級劃分,比如we-1,we-2.....we99,we100。這樣雖然咱們知道key是一樣的,但是機器卻認為key不一樣。最後再增加個reduce進行規約。
推薦閱讀:
※Spark 為什麼 不允許 RDD 嵌套(如 RDD[RDD[T]])?
※怎樣理解spark中的partition和block的關係?
※在Spark集群中,集群的節點個數、RDD分區個數、?cpu內核個數三者與並行度的關係??
※1.5版本的Spark自己來管理內存而不是使用JVM,這不使用JVM而是自己來管理內存是咋回事?
※還有必要學習Hadoop 么?