標籤:

5分鐘 Hadoop Shuffle 優化

上篇5分鐘深入 Hadoop 的文章中,我們介紹了如何優化輸入處理,讓 Hadoop 達到更高的性能;另一個有可能讓 Hadoop 性能實現質的飛越的過程是 Shuffle 階段:Shuffle 階段負責把 map output 傳遞到 reduce 階段,深入理解這個階段有助於我們回答面試中很多進階問題,從而 really impress the interviewer. 本文就來詳細討論 Hadoop 中的 Shuffle 優化。

Shuffle 是什麼?

討論如何優化 Shuffle 之前,我們先來介紹 Shuffle 階段具體發生了什麼,從而對優化有個深入的理解。

Shuffle 階段需要做哪些事情,完成哪些需求?

大家還記得 <5分鐘搞懂 Hadoop> 那篇文章中舉的例子嗎?

我們從這個例子的圖中可以看出,每個 map function 會輸出一組 key value pair, Shuffle 階段需要從所有 map host 上把相同的 key 的 key value pair 組合在一起,組合後傳給 reduce host, 作為輸入進入 reduce function 里。

那 Shuffle 階段是如何完成這些功能的呢?

首先介紹新 Component:partitioner

所有 map function 產生的 key 可能有成百上千,經過 Shuffle 組合 key 的工作之後,依然是相同數目,而負責 reduce 工作的 host 可能只有幾十個,幾百個,那 Hadoop 的分配 key value pair 的策略是什麼?

Partitioner component 負責計算哪些 key 應當被放到同一個 reduce 里。

舉個栗子

使用默認的 partitioner: HashPartitioner,它會把 key 放進一個 hash function 里,然後得到結果。如果兩個 key 的 hashed result 一樣,他們的 key value pairs 就被放到同一個 reduce function 里。我們也把分配到同一個 reduce function 里的 key value pairs 叫做一個 reduce partition.

我們看到 hash function 最終產生多少不同的 result, 這個 Hadoop job 就會有多少個 reduce partition/reduce function,這些 reduce function 最終被JobTracker 分配到負責 reduce 的 host 上,進行處理。

在 Map 端的運作

Map function 的運行方式就是從 RecordReader 那邊讀出一個 input key value pair, 處理,然後把處理結果(通常也是 key value pair 形式)寫到一個Hadoop maintained memory buffer 里,然後讀取下一個 input key value pair.

Hadoop maintained memory buffer 里的 key value pair 按 key 值排序,並且按照 reduce partition 分到不同 partition 里(這就是 partitioner 被調用的時候)。一旦 memory buffer 滿了,就會被 Hadoop 寫到 file 里,這個過程叫 spill, 寫出的 file 叫 spill file.

注意,這些 spill file 存在 map 所在 host 的 local disk 上,而不是我們之前介紹過的 HDFS.

隨著 Map 不斷運行,有可能有多個 spill file 被製造出來。當 Map 結束時,這些 spill file 會被 merge 起來——不是 merge 成一個 file,而是按 reduce partition 分成多個。

在 Reduce 端的運作:

由於 Map tasks 有可能在不同時間結束,所以 reduce tasks 沒必要等所有 map tasks 都結束才開始。事實上,每個 reduce task 有一些 threads 專門負責從 map host copy map output(默認是5個,可以通過 $mapred.reduce.parallel.copies 參數設置);考慮到網路的延遲問題,並行處理可以在一定程度上提高效率。

通過前面的學習,我們知道 Hadoop JobTracker 會根據輸入數據位置安排 map tasks,但 reduce tasks 是不知道這種安排的。那麼當 reduce task 需要從map host copy map output 時,它怎麼知道 map host 的位置呢(URL/IP)?

其實當 Map tasks 成功結束時,他們會通知負責的 tasktracker, 然後消息通過 jobtracker 的 heartbeat 傳給 jobtracker. 這樣,對於每一個 job, jobtracker 知道 map output 和 map tasks 的關聯。Reducer 內部有一個 thread 負責定期向 jobtracker 詢問 map output 的位置,直到 reducer 得到所有它需要處理的 map output 的位置。

Reducer 的另一個 thread 會把拷貝過來的 map output file merge 成更大的 file. 如果 map task 被 configure 成需要對 map output 進行壓縮,那 reduce 還要對 map 結果進行解壓縮。當一個 reduce task 所有的 map output 都被拷貝到一個它的 host上時,reduce 就要開始對他們排序了。

排序並不是一次把所有 file 都排序,而是分幾輪。每輪過後產生一個結果,然後再對結果排序。最後一輪就不用產生排序結果了,而是直接向 reduce 提供輸入。這時,用戶提供的 reduce function 就可以被調用了。輸入就是 map task 產生的 key value pairs.

Hadoop Benefit

我們需要注意的是,Shuffle 階段的功能是完完全全由 Hadoop framework 提供的,這裡邊沒有任何用戶的代碼(即使我們有可能需要根據具體 Hadoop job 的特點配置一下這個階段,但也非常方便)。

Shuffle 階段應該說是整個 MapReduce Job 里需要處理問題最複雜,需要提高 performance 最多的地方。想像一下,公司里的 cluster 中可能有上百個 reducer 從上千個 mapper 那邊高效率的拷貝處理數據,全靠 Hadoop 處理這個階段,讓我們可以輕輕鬆鬆就寫寫 map function 和 reduce function 就得到理想的效率,這是 Hadoop 廣受歡迎的重要原因之一。

優化

在我們了解了 Hadoop 如何處理 shuffle 之後,我們可以通過配置一些 Hadoop Job 的參數調整 Hadoop shuffle performance:

  • io.sort.mb 這個參數控制 map 端 memory buffer 大小,越大每次 map 要 spill 的 file 就越大,總 spill file 個數越少; 所以如果你的 Hadoop Job map output 很多,適當增加這個參數有助於 performance;

  • io.sort.spill.percent 這個參數控制 map memory buffer 到達多少比例時開始 spill. 我們知道 spill 是寫 IO 操作,所以需要時間。如果 percentage 太高,有可能當 spill 還沒有完成時,map output 已經把 memory buffer 填滿,這樣影響 performance;同樣,太低會造成太多 spill fie;

  • tasktracker.http.threads 控制 map 端有多少個 thread 負責向 reduce 傳送 map output. 本著並行計算的原則,可以適當調高;

  • mapred.reduce.parallel.copies 這個參數控制 reduce 端有多少個 thread 去 copy map output. 本著並行計算的原則,可以適當調高;

  • mapred.job.reduce.input.buffer.percent 控制 reduce host 上 JVM 中用於 merge map output 的比例。可以適當調高;

  • io.sort.factor控制 reduce 端同時被 sort 的文件的個數。我們說 reduce 端 file sor t分批進行,這個參數就是每批有多少個。如果內存大,可以適當增加,以減少 sort 批次。

預告

  • 通過之前兩篇文章,大家應該對如何調整 Hadoop Job 的 performance 有了一個更深的了解。下一篇,我們將繼續討論面試中不可迴避的 HDFS,也是Hadoop 系統的基石。歡迎大家關注。

推薦閱讀:

嫌棄Hadoop?可能是你的打開方式有問題
大數據那些事(8):HIVE之初期起
大數據那些事(28):卡夫卡們的故事
技術分享丨HDFS 入門
大數據那些事(12):Michael,Daniel和輪子

TAG:Hadoop | 大数据 |