分散式計算框架MapReduce

MapReduce概述

1、源自google的MapReduce論文,論文發表於2004.12

2、Hadoop MapReduce是google MapReduce的克隆版

3、MapReduce優點:海量數據離線處理&易開發&易運行(易開發和易運行只是相對而言)

4、MapReduce缺點:實時流式計算

實時:mapreduce的作業都是通過進程方式啟動,必然速度會慢很多,

不可能實時的把數據處理完,無法像MySQL一樣,在毫秒級或者秒級內返回結果

流式:MapReduce的輸入數據集是靜態的,不能動態變化;

MapReduce自身的設計特點決定了數據源必須是靜態的。

MapReduce編程模型之通過wordcount詞頻統計分析案例初步認識 MapReduce

上圖就是詞頻分析的流程圖。

① 把一個文件進行拆分,如上圖,把文件拆分為三段。

② 然後我們有三個作業對應三段文件並行進行處理。

③ 接著就到了如圖mapping,那什麼是mapping?就是把每一段文本再按照單詞進行拆分。

④ 然後就到了至關重要的一步,shuffling,這是影響分散式計算框架性能的一個瓶頸,

因為我們假設數據在不同機器上運行,我們進行mapping之後,要把相同數據分到同

一個節點上去合併,這樣才能統計出來最終結果。

⑤ 我們在shuffling這一步做完之後要做的就是reduce了,reduce乾的事情就是把我們

shuffling之後的相同單詞的個數加起來。

總結:這整個過程其實就是一個分而治之的思想!我們先把原始數據比如那一小段文本

給他拆開(spliting),拆開之後分開處理到一定程度(mapping&shuffling),

然後再合併到一起做最終計算(reduce)!最終輸出到hdfs中去,這樣子就得到

final result這個結果

MapReduce編程模型之執行步驟------簡述

準備map處理的輸入數據並對其進行splitting

對於輸入的數據,會被轉換成一堆的key,value,那麼這裡的key,value

到底指的是什麼呢?以上圖來看,輸入數據為三段文字,key就是指偏移量,

讀取第一行,k1=0,v1=第一行內容,讀取第二行,k1=第一行內容長度。

v1=第二行內容

mapper處理

這裡同樣也是key,value,經過之前的數據準備,到map這一步以上一步

splitting的數量為基準以相互獨立的並行方式執行,把上一步的value拆分為以單詞為

k2,出現次數為v2的形式。

shuffle處理

這裡同樣在上一步基礎之上,首先把相同數據放到同一個節點上去合併,比如上圖,

以單詞名為k2,以這個單詞每次出現的計數1為v2,比如Car出現3次,那就是k2=3,

v2=(1,1,1)

reduce處理

將shuffling之後得到的k2和v2進行最終處理,把v2中的所有計數相加得到單詞出現的總數

即k2=Car,v2=3

結果輸出

MapReduce編程模型之執行步驟------詳解

假設有兩個Node

---------------------------Map階段-------------------------

①對數據進行InputFormat-->split(如圖),那麼什麼是InputFormat-->split

InputFormat:這是一個介面包含了很多執行程序,我們以InputFormat裡面的

FileInputFormat為例,這是讀取文件基本的類,這仍然是個抽象類,繼續選擇

FileInputFormat中的TextInputFormat,這是用來讀文本的類,這裡面有幾個關鍵的方法

我們需要了解一下,一個是getsplits方法,用途是把我們的輸入文件得到很多個split,

每一個split交由相對應的一個MapTask來處理,getsplits方法的返回值是一個InputSplit[ ]

數組,也就是說一個文件可能會被拆分成好多個InputSplit。

另一個是getRecordReader方法,從這個名字我們看的出來,他是一個記錄的讀取者,

它就是把上面getsplits方法得到的InputSplit[ ]數組中每一個數據給讀進來,那麼我們以後就

知道每一個行是什麼數據了。所以這一步主要就是藉助InputFormat中的TextInputFormat

這個類來將文件進行拆分。

② 對split之後的數據進行RR(RecordReader)

這裡就是接著上一步,通過InputFormat中的TextInputFormat這個類得到InputSplit數組

然後由RecordReadergetRecordReader方法來讀取InputSplit數組中的每一個數據,

並且每讀取一份數據交由一個map來處理(並行處理)。

③由getRecordReader 這個方法讀取的數據後交給map執行關鍵步驟之一MapTask

實際包含了輸入(input)過程、切分(partition)過程、溢寫spill過程(sort和combine過程)、

merge過程。

  • 對每一個鍵值對進行map()
  • map的輸出保存在內存緩衝區,當緩衝區滿80%(一般80%),啟動溢寫,將緩衝的數據寫出到磁碟。
  • 在溢寫的結尾,合併所有的輸出,並且打包他們以便進行reduce處理。

No.1 map

在mapper中,用戶定義的map代碼通過處理record reader解析的每個key/value來產生0

個或多個新的key/value結果。key/value的選擇對MapReduce作業的完成效率來說非常重

要。key是數據在reducer中處理時被分組的依據,value是reducer需要分析的數據。

No.2 combine

1)combine簡述

combiner階段是程序員可以選擇的,combiner其實也是一種reduce操作,因此我們看見

WordCount類里是用reduce進行載入的。Combiner是一個本地化的reduce操作,它是

map運算的後續操作,主要是在map計算出中間文件前做一個簡單的合併重複key值的操

作,例如我們對文件里的單詞頻率做統計,map計算時候如果碰到一個hadoop的單詞就

會記錄為1,但是這篇文章里hadoop可能會出現n多次,那麼map輸出文件冗餘就會很

多,因此在reduce計算前對相同的key做一個合併操作,那麼文件會變小,這樣就提高了

寬頻的傳輸效率,畢竟hadoop計算力寬頻資源往往是計算的瓶頸也是最為寶貴的資源,

Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。那哪些場景

才能使用Combiner呢?Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的

計算結果。所以combiner操作是有風險的,使用它的原則是combiner的輸入不會影響到

reduce計算的最終輸入,例如:如果計算只是求總數,最大值,最小值可以使用

combiner,但是做平均值計算使用combiner的話,最終的reduce計算結果就會出錯。

2)combine具體操作

combine分為map端的combine和reduce端的combine,combine將有相同key的key/value

對的value加起來,減少溢寫到磁碟的數據量,combine函數把map函數產生的多個

key/value合併成一 個新的key2/value2,將新的key2/value2作為輸入值給reduce函數,

這個value2有多個。實際上combine操作作用就相當於reduce,把mapper端的key,[value,

…]先處理,變成<key, [value1]>(如<a, [1, 1, 1]> => <a, 3>), 這樣能有效減少最後寫入磁

盤文件的大小,網路需要傳輸的大小更少,因此更快。

具體實現是由Combine類。 實現combine函數,該類的主要功能是合併相同的key,通過

job.setCombinerClass()方法設置,默認為null,不合併中間結果。

map端的combine和MapReduce中的reduce的區別:

在mapreduce中,map多,reduce少。 在reduce中由於數據量比較多,所以先把map

裡面的數據合併歸類,這樣到了reduce的時候就減輕了壓力。

舉個例子:

map理解為銷售人員,reduce理解為銷售經理。

每個人(map)只管銷售,賺了多少錢銷售人員不統計,也就是說這個銷售人員沒有

Combine,那麼這個銷售經理就累垮了,因為每個人都沒有統計,它需要統計所有人

員賣了多少件,賺錢了多少錢。這樣是不行的,所以銷售經理(reduce)為了減輕壓

力,每個人(map)都必須統計自己賣了多少錢,賺了多少錢(Combine),然後經

理所做的事情就是統計每個人統計之後的結果。這樣經理就輕鬆多了。所以Combine

在map所做的事情,減輕了reduce的事情。這就是為什麼說map的Combine乾的是

reduce的事情,有點類似上下級的關係。

No.3 partitioner

partition是分割map每個節點的結果,按照key分別映射給不同的reduce,也是可以自定

義的。這裡其實可以理解歸類。

我們對於錯綜複雜的數據歸類。比如在動物園裡有牛羊雞鴨鵝,他們都是混在一起的,

但是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的作用就是把這些

數據歸類。只不過在寫程序的時候,mapreduce使用哈希HashPartitioner幫我們歸類

了。這個我們也可以自定義。

經過partitioner處理後,每個key-value對都得到分配到的reduecer信息,然後把記錄先

寫入內存(In-memory buffer)。

內存分配圖如下:

內存會劃分一個個的partition,每個partition都交個獨自一個reduecer處理(總的

partition數目=reducer數量)。

No.3 sort & combiner

1、在partitioner處理時當寫入內存的數據越來越多時當buffer達到一定閥值(默認

80M),就開始執行spill步驟,即分成小文件寫入磁碟。在寫之前,先對memory中每個

partition進行排序(in-memory sort)。如果數據量大的話,這個步驟會產生很多

個spilled文件,如果我們定義了combine,那麼在排序之前還會進行combine,最後一個步

驟就是merge,把溢寫(spill)步驟產生的所有spilled files,merge成一個大的已排序文

件。merge是相同的partition之間進行。

MERGE:

Merge是怎樣的?如「aaa」從某個map task讀取過來時值是5,從另外一個map 讀取值是

8,因為它們有相同的key,所以得merge成group。什麼是group。對於「aaa」就是像這樣

的:{「aaa」, [5, 8, 2, …]},數組中的值就是從不同溢寫文件中讀取出來的,然後再把這些

值加起來。請注意,因為merge是將多個溢寫文件合併到一個文件,所以可能也有相同的

key存在,在這個過程中如果client設置過Combiner,也會使用Combiner來合併相同的

key。

---------------------------Reduce階段-------------------------

在 reduce task 之前,不斷拉取當前 job 里每個 maptask 的最終結果,然後對從不同地方拉取

過來的數據不斷地做 merge ,也最終形成一個文件作為 reduce task 的輸入文件。

總而言之,reduce的運行可以分成copy、merge、reduce三個階段,下面將具體說明這3個階

段的詳細執行流程。

copy

由於job的每一個map都會根據reduce(n)數將數據分成map 輸出結果分成n個partition,所以map的中間結果中是有可能包含每一個reduce需要處理的部分數據的。所以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束後,所有的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數據,因此map和reduce是交叉進行的,如下圖所示:

educe進程啟動數據copy線程(Fetcher),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。由於map通常有許多個,所以對一個reduce來說,下載也可以是並行的從多個map下載,這個並行度是可以通過mapred.reduce.parallel.copies(default 5)調整。默認情況下,每個只會有5個並行的下載線程在從map下數據,如果一個時間段內job完成的map有100個或者更多,那麼reduce也最多只能同時下載5個map的數據,所以這個參數比較適合map很多並且完成的比較快的job的情況下調大,有利於reduce更快的獲取屬於自己部分的數據。

reduce的每一個下載線程在下載某個map數據的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的文件丟失,或者網路瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載線程並不會無休止的等待下去,當一定時間後下載仍然失敗,那麼下載線程就會放棄這次下載,並在隨後嘗試從另外的地方下載(因為這段時間map可能重跑)。reduce下載線程的這個最大的下載時間段是可以通過mapred.reduce.copy.backoff(default 300秒)調整的。如果集群環境的網路本身是瓶頸,那麼用戶可以通過調大這個參數來避免reduce下載線程被誤判為失敗的情況。不過在網路環境比較好的情況下,沒有必要調整。通常來說專業的集群網路不應該有太大問題,所以這個參數需要調整的情況不多。

merge

這裡的merge如map端的merge動作類似,只是數組中存放的是不同map端copy來的數值。Copy過來的數據會先放入內存緩衝區中,然後當使用內存達到一定量的時候才刷入磁碟。這裡需要強調的是,merge有三種形式:1)內存到內存 2)內存到磁碟 3)磁碟到磁碟。內存到內存的merge一般不適用,主要是內存到磁碟和磁碟到磁碟的merge。

這裡的緩衝區大小要比map端的更為靈活,它基於JVM的heap size設置。這個內存大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數 mapred.job.shuffle.input.buffer.percent(default 0.7) 來設置, 這個參數其實是一個百分比,意思是說,shuffile在reduce內存中的數據最多使用內存量為:0.7 × maxHeap of reduce task。

也就是說,如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設置,比如設置為-Xmx1024m)的一定比例用來緩存數據。默認情況下,reduce會使用其heapsize的70%來在內存中緩存數據。假設 mapred.job.shuffle.input.buffer.percent為0.7,reduce task的max heapsize為1G,那麼用來做下載數據緩存的內存就為大概700MB左右。這700M的內存,跟map端一樣,也不是要等到全部寫滿才會往磁碟刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁碟刷(刷磁碟前會先做sort)。這個限度閾值也是可以通過參數 mapred.job.shuffle.merge.percent(default 0.66)來設定。與map 端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫文件。這種merge方式一直在運行,直到沒有map端的數據時才結束,然後啟動磁碟到磁碟的merge方式生成最終的那個文件。

reducer

當reduce將所有的map上對應自己partition的數據下載完成後,就會開始真正的reduce計算階段。當reduce task真正進入reduce函數的計算階段的時候,有一個參數也是可以調整reduce的計算行為。也就是mapred.job.reduce.input.buffer.percent(default 0.0)。由於reduce計算時肯定也是需要消耗內存的,而在讀取reduce需要的數據時,同樣是需要內存作為buffer,這個參數是控制,需要多少的內存百分比來作為reduce讀已經sort好的數據的buffer百分比。默認情況下為0,也就是說,默認情況下,reduce是全部從磁碟開始讀處理數據。如果這個參數大於0,那麼就會有一定量的數據被緩存在內存並輸送給reduce,當reduce計算邏輯消耗內存很小時,可以分一部分內存用來緩存數據,反正reduce的內存閑著也是閑著。

Reduce在這個階段,框架為已分組的輸入數據中的每個 <key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。 Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable, Writable)寫入 文件系統的。Reducer的輸出是沒有排序的。

那麼一般需要多少個Reduce呢?

Reduce的數目建議是0.95或1.75乘以 ( * mapred.tasktracker.reduce.tasks.maximum)。 用0.95,所有reduce可以在maps一完成時就立刻啟動,開始傳輸map的輸出結果。用1.75,速度快的節點可以在完成第一輪reduce任務後,可以開始第二輪,這樣可以得到比較好的負載均衡的效果。

reduces的性能很大程度上受shuffle的性能所影響。應用配置的reduces數量是一個決定性的因素。太多或者太少的reduce都不利於發揮最佳性能: 太少的reduce會使得reduce運行的節點處於過度負載狀態,在極端情況下我們見過一個reduce要處理100g的數據。這對於失敗恢復有著非常致命的負面影響,因為失敗的reduce對作業的影響非常大。太多的reduce對shuffle過程有不利影響。在極端情況下會導致作業的輸出都是些小文件,這對NameNode不利,並且會影響接下來要處理這些小文件的mapreduce應用的性能。在大多數情況下,應用應該保證每個reduce處理1-2g數據,最多5-10g。

總結

做菜的例子

小紅:如何用蔬菜沙拉來解釋mapreduce?

小明:Map(映射): 把洋蔥、番茄、生菜等等食材切好,這是各自作用在這些物體上的一個Map操作。所以你給Map一個洋蔥,Map就會把洋蔥切好。 同樣的,你把番茄、生菜一一地拿給Map,你也會得到各種切好的塊。 所以,當你在切洋蔥這樣的蔬菜時,你執行就是一個Map操作。 Map操作適用於每一種蔬菜,它會相應地生產出一種或多種碎塊,在我們的例子中生產的是蔬菜塊。在Map操作中可能會出現有個洋蔥壞掉了的情況,你只要把壞洋蔥丟了就行了。所以,如果出現壞洋蔥了,Map操作就會過濾掉壞洋蔥而不會生產出任何的壞洋蔥塊。

小紅:那我們說說的reduce吧

小明:Reduce(化簡):在這一階段,你將各種蔬菜碎都放入盤子/鍋裡面加入你喜愛的一些佐料和沙拉拌一下,你就可以得到你喜愛的蔬菜沙拉了。這意味要做一盤蔬菜沙拉,你得切好所有的原料。因此,你要將map操作的蔬菜聚集在一起。

小紅:那分散式是什麼意思?

小明:假設你參加了一個比賽並且你的食譜贏得了最佳蔬菜沙拉獎。得獎之後,你的蔬菜沙拉大受歡迎,於是你想要開始出售自製品牌的蔬菜沙拉。假設你每天需要生產10000份,你會怎麼辦呢?

小紅:我會找一個能為我大量提供原料的供應商。

小明:是的..就是那樣的。那你能否獨自完成製作呢?也就是說,獨自將原料都切碎? 而且現在,我們還需要供應不同種類的蔬菜沙拉,像青菜沙拉、番茄沙拉等等。

小紅: 當然不能了,我會租下一間鋪子,我會僱傭更多的工人來切蔬菜。這樣我就可以更快地生產蔬菜沙拉了。

小明:沒錯,所以現在你就不得不分配工作了,你將需要幾個人一起切蔬菜。每個人都要處理滿滿一袋的蔬菜,而每一個人都相當於在執行一個簡單的Map操作。每一個人都將不斷的從袋子里拿出蔬菜來,並且每次只對一種蔬菜進行處理,也就是將它們切碎,直到袋子空了為止。

這樣,當所有的工人都切完以後,工作台(每個人工作的地方)上就有了洋蔥塊、番茄塊等等。

小紅:但是我怎麼會製造出不同種類的蔬菜沙拉呢?

我:現在你會看到MapReduce遺漏的階段—攪拌階段。MapReduce將所有輸出的蔬菜片都攪拌在了一起,這些蔬菜片都是在以key為基礎的 map操作下產生的。攪拌將自動完成,你可以假設key是一種原料的名字,就像洋蔥一樣。 所以全部的洋蔥keys都會攪拌在一起,並轉移到同一個盤子。這樣,你就能得到洋蔥沙拉了。同樣地,所有的番茄也會被轉移到標記著番茄的盤子里,並製造出番茄沙拉。

小紅:我終於明白mapreduce乾的活了!

統計圖書的例子

map

We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That』s map. The more people we get, the faster it goes.

reduce

Now we get together and add our individual counts. That』s reduce.

結束


推薦閱讀:

YouTube 上有哪些自學編程的優質頻道
編程的一些小習慣
有哪些值得關注的技術博客(Linux篇)
學編程的一些核心建議

TAG:数据分析 | 大数据分析 | 编程 |