標籤:

Map Reduce和流處理

Map Reduce和流處理

11 人贊了文章

歡迎大家前往騰訊雲+社區,獲取更多騰訊海量技術實踐乾貨哦~

本文由@從流域到海域翻譯,發表於騰訊雲+社區

map()和reduce()是在集群式設備上用來做大規模數據處理的方法,用戶定義一個特定的映射,函數將使用該映射對一系列鍵值對進行處理,直接產生出一系列鍵值對。

Map Reduce和流處理

Hadoop的Map / Reduce模型在並行處理大量數據方面非常出色。它提供了一個通用的分區機制(基於數據的關鍵)來分配不同機器上的聚合式工作負載。基本上, map / reduce的演算法設計都是關於如何在處理過程中的不同階段為記錄值選擇正確的key。

然而,「時間維度」與數據的其他維度屬性相比具有非常不同的特徵,特別是在涉及實時數據處理時。它對面向批處理的Map/Reduce模型提出了一系列不同的挑戰。

  1. 實時處理需要非常低的響應延遲,這意味著沒有太多的數據能夠在「時間」維度上進行處理。
  2. 從多個數據源收集到的數據可能沒有全部到達匯總點。
  3. 在Map/Reduce的標準模型中,reduce階段在map階段完成之前無法啟動。而且在下載到reducer之前,所有處理過程的中間數據都保存在磁碟中。所有這些都顯著增加了處理的延遲。

儘管Hadoop Map/Reduce是針對批處理的工作負載而設計的,但某些應用程序(如欺詐檢測,廣告顯示,網路監控需要實時響應以處理大量數據),現在已開始考慮各種調整Hadoop的方法以使其適合更實時的處理環境。在本篇文章中,我嘗試了一些基於Map/Reduce模型的執行低延遲並行處理的技術。

常用流處理模型

在這個模型中,數據是在各種各樣的OLTP系統中生成的,這些系統更新了事務數據存儲,並非同步發送其他數據用於分析處理。分析處理過程將輸出寫入到決策模型,該決策模型會將信息反饋給OLTP系統來進行實時決策。

注意與OLTP系統分離的分析處理的「非同步性質」,在該方式下OLTP系統不會放慢速度等待分析處理完成。無論如何,我們仍然需要儘快進行分析處理,否則決策模型將不能反映當前世界的真實場景,它將不會很有用處。什麼程度的延遲可容忍的是應用程序指定的。

在Map/Reduce中進行微批處理

一種方法是根據時間窗(例如每小時)將數據分成小批量,並將每批中收集的數據提交給Map/Reduce作業。這需要分段機制,以便OLTP應用程序可以繼續獨立於分析處理。而作業調度程序用於規範生產者和消費者,基於此它們每個生產者或消費者都可以獨立進行。(生產者和消費者是在操作系統理論中對產生數據和處理數據的程序的稱呼,譯者注)

連續性Map/Reduce

這裡讓我們想像一下有關Map/Reduce執行模型的一些可能的修改,以使其適應實時流處理。我並不擔心Hadoop在線原型(HOP)所採用的方法的向後兼容性 。

長時間運行

第一種修改方法是使mapper和reducer長時間運行。因此,我們不能等待map階段結束之後才開始reduce階段,因為map階段永遠不會結束。這意味著mapper在完成處理後會將數據推送到reducer,並讓reducer對數據進行排序。這種方法的缺點是它沒有機會去運行地圖側的combine()函數以降低帶寬使用率。它還將更多的工作量轉移到正需要進行分類的reducer。

注意在延遲和優化之間需要有一個折衷。優化需要更多的數據在源頭(即Mapper)就進行累積,如此即可以執行本地合併(即:結合在一起)。不幸的是,低延遲需要儘快發送數據,因此沒有太多時間使大量累積操作可以完成。

HOP提出了一種自適應流控制機制,在該方式下數據會被儘快推送到Reducer,直到Reducer被重載並退回(使用某種流量控制協議)。然後mapper將緩衝處理後的消息並在發送給reducer之前執行combine()函數。這種方法將會自動地來回移動Reducer和Mapper之間的聚合工作負載。

時間窗口:切片和範圍

這是一個「時間片(time slice)」概念和一個「時間範圍(time range)」的概念。「切片(Slice)」定義了執行reduce處理之前所累計結果的時間窗口。這也是mapper在發送到reducer之前應積累的最小數據量。

「範圍(Range)」定義了結果所匯總的時間窗口。它可以是一個具有明確起點定義的界標窗口或者是跳躍窗口的(考慮移動的界標場景)。它也可以是一個滑動窗口,其中從當前時間開始聚合的固定大小的窗口。

在從每個mapper接收到特定時間片後,reducer可以啟動聚合處理並將結果與之前的聚合結果進行合併。切片(大小)可以根據mapper發送的數據量來進行動態調整。

增量處理

請注意,reducer需要在收到所有mapper中相同時間片的所有記錄後計算聚合片值。之後,它會調用用戶定義的merge()函數將切片值與範圍值合併。如果範圍需要刷新(例如達到跳轉窗口邊界),將調用init()函數來獲取刷新的範圍值。如果範圍值需要更新(當某個切片值超出滑動範圍時),則會調用unmerge()函數。

以下是我們如何在每小時更新(即:一小時大小切片)的情況下,在24小時滑動窗口內跟蹤平均命中率(即:每小時總命中數)的示例。

# Call at each hit recordmap(k1, hitRecord) { site = hitRecord.site # lookup the slice of the particular key slice = lookupSlice(site) if (slice.time - now > 60.minutes) { # Notify reducer whole slice of site is sent advance(site, slice) slice = lookupSlice(site) } emitIntermediate(site, slice, 1)}combine(site, slice, countList) { hitCount = 0 for count in countList { hitCount += count } # Send the message to the downstream node emitIntermediate(site, slice, hitCount)}

# Called when reducer receive full slice from all mappersreduce(site, slice, countList) { hitCount = 0 for count in countList { hitCount += count } sv = SliceValue.new sv.hitCount = hitCount return sv}# Called at each jumping window boundaryinit(slice) { rangeValue = RangeValue.new rangeValue.hitCount = 0 return rangeValue}# Called after each reduce()merge(rangeValue, slice, sliceValue) { rangeValue.hitCount += sliceValue.hitCount}# Called when a slice fall out the sliding windowunmerge(rangeValue, slice, sliceValue) { rangeValue.hitCount -= sliceValue.hitCount}

問答

比較好的MapReduce例子有哪些?

相關閱讀

MapReduce極簡教程

大數據運算模型 MapReduce 原理

如何為Hadoop選擇最佳彈性MapReduce框架

此文已由作者授權騰訊雲+社區發布,原文鏈接:cloud.tencent.com/devel

weixin.qq.com/r/6TlxaU- (二維碼自動識別)


推薦閱讀:

mapreduce shuffle細節
基於Alluxio的HDFS多集群統一入口的實現
Spark入門:讀寫文件
MapReduce Shuffle深入理解

TAG:MapReduce | Hadoop |