MapReduce Shuffle深入理解
05-13
在上一篇文章中我們已經介紹了 MapReduce 運行的整個流程,現在接著深入工作原理以及 Shuffle 過程細節來剖析:
1. MapReduce工作原理- Client 啟動一個作業 Job 並向 JobTracker 提交一個 JobId,運行作業所需要的資源文件比如 jar 包,Client 計算後輸入的分片信息等拷貝到 HDFS 上。分片信息決定了 map 任務的數量,並由 JobTracker 分配計算。
- JobTracker 接收到作業任務後,將作業放置在作業隊列中,作業調度器根據自己的調度演算法調度到某個作業時會跟根據分片信息,為每個分片數據創建一個 map 任務,並將 map 任務分配給 TaskTracker 執行。需要注意的是 map 任務並不是隨意分配給 TaskTracker 的,具體原因詳見上一篇文章介紹的數據本地化原理。每個節點上可以運行 map 的數量是有限的,TaskTracker 根據主機核的數量和內存的大小有固定數量的 map 槽和 reduce 槽。
- TaskTracker 會定時的向 JobTracker 發送心跳,若一定時間沒有收到心跳,JobTracker 就默認這個 TaskTracker 節點掛了要做資源回收,並把這個節點上的 task 重新分配到其他節點上。JobTracker 會監聽 task 的執行情況,執行完成,JobTracker 會更新 Job 的狀態為完成,若是一定數量的 task 執行失敗,這個 Job 就會被標記為失敗,JobTracker 將發送 Job 的運行狀態信息給 Client 端。
- 環形內存緩存區:每個split數據交由一個map任務處理,map的處理結果不會直接寫到硬碟上,會先輸送到環形內存緩存區中,默認的大小是100M(可通過配置修改),當緩衝區的內容達到80%後會開始溢出,此時緩存區的溢出內容會被寫到磁碟上,形成一個個spill file,注意這個文件沒有固定大小。
- 在內存中經過分區、排序後溢出到磁碟:分區主要功能是用來指定 map 的輸出結果交給哪個 reduce 任務處理,默認是通過 map 輸出結果的 key 值取hashcode 對代碼中配置的 redue task數量取模運算,值一樣的分到一個區,也就是一個 reduce 任務對應一個分區的數據。這樣做的好處就是可以避免有的 reduce 任務分配到大量的數據,而有的 reduce 任務只分配到少量甚至沒有數據,平均 reduce 的處理能力。並且在每一個分區(partition)中,都會有一個 sort by key 排序,如果此時設置了 Combiner,將排序後的結果進行 Combine 操作,相當於 map 階段的本地 reduce,這樣做的目的是讓儘可能少的數據寫入到磁碟。
- 合併溢出文件:隨著 map 任務的執行,不斷溢出文件,直到輸出最後一個記錄,可能會產生大量的溢出文件,這時需要對這些大量的溢出文件進行合併,在合併文件的過程中會不斷的進行排序跟 Combine 操作,這樣做有兩個好處:減少每次寫入磁碟的數據量&減少下一步 reduce 階段網路傳輸的數據量。最後合併成了一個分區且排序的大文件,此時可以再進行配置壓縮處理,可以減少不同節點間的網路傳輸量。合併完成後著手將數據拷貝給相對應的reduce 處理,那麼要怎麼找到分區數據對應的那個 reduce 任務呢?簡單來說就是 JobTracker 中保存了整個集群中的宏觀信息,只要 reduce 任務向 JobTracker 獲取對應的 map 輸出位置就可以了。具體請參考上方的MapReduce工作原理。
Reduce端流程:
reduce 會接收到不同 map 任務傳來的有序數據,如果 reduce 接收到的數據較小,則會存在內存緩衝區中,直到數據量達到該緩存區的一定比例時對數據進行合併後溢寫到磁碟上。隨著溢寫的文件越來越多,後台的線程會將他們合併成一個更大的有序的文件,可以為後面合併節省時間。這其實跟 map端的操作一樣,都是反覆的進行排序、合併,這也是 Hadoop 的靈魂所在,但是如果在 map 已經壓縮過,在合併排序之前要先進行解壓縮。合併的過程會產生很多中間文件,但是最後一個合併的結果是不需要寫到磁碟上,而是可以直接輸入到 reduce 函數中計算,每個 reduce 對應一個輸出結果文件。補充說明-----------------------------------------------------------------------------Map數量:對於大文件:由任務的 split 切片決定的,一個 split 切片對應一個map任務。先明確一點 split 切片的大小可自己配置的,一般來說對於大文件會選擇split == block,如果split < block的情況下會增加 map 的數量,雖然這樣可以增加map執行的並行度,但是會導致map任務在不同節點拉取數據,浪費了網路資源等。ps:HDFS 中 block 是最小的存儲單元,默認128M。對於小文件:由參與任務的文件數量決定,默認情況一個小文件啟動一個 map 任務,小文件數量較多會導致啟動較大數量的 map 任務,增加資源消耗。此時建議將多個小文件通過 InputFormat 合併成一個大文件加入到一個 split 中,並增加 split 的大小,這樣可以有效減少 map 的數量,降低並發度,減少資源消耗。Reduce數量:由分區(partiton)的數量決定的,我們可以在代碼中配置 job.setNumReduceTasks(*)來控制 reduce 的任務數量。留個問題給讀者,由上面的彩圖可以分析出一個 MapReduce 計算過程共有幾個時機涉及到 I/O 磁碟讀寫?分別是在哪幾個時機?答對有獎,歡迎留言。如果覺得文章不錯,歡迎轉發點贊,另外有錯誤歡迎留言指出,謝謝。歡迎關注我的公眾號:Jianpan更多好文,敬請期待!推薦閱讀:
※Zookeeper在哪些系統中使用,又是怎麼用的?
※Apache Kylin v2.2.0正式發布
※Hadoop的了解
※開發人員學Linux(14):CentOS7安裝配置大數據平台Hadoop2.9.0
※從頭學習大數據培訓課程 數據倉儲工具 hive(五)hive 的 grouping sets、排序、窗口函數用法