Apache Beam: 下一代的大數據處理標準

聲明:本文發表於《程序員》雜誌2016年11月刊,如欲轉載,請首先獲取CSDN授權。

Apache Beam(原名Google DataFlow)是Google在2016年2月份貢獻給Apache基金會的Apache孵化項目,被認為是繼MapReduce,GFS和BigQuery等之後,Google在大數據處理領域對開源社區的又一個非常大的貢獻。Apache Beam的主要目標是統一批處理和流處理的編程範式,為無限,亂序,web-scale的數據集處理提供簡單靈活,功能豐富以及表達能力十分強大的SDK。Apache Beam項目重點在於數據處理的編程範式和介面定義,並不涉及具體執行引擎的實現,Apache Beam希望基於Beam開發的數據處理程序可以執行在任意的分散式計算引擎上。本文主要介紹Apache Beam的編程範式-Beam Model,以及通過Beam SDK如何方便靈活的編寫分散式數據處理業務邏輯,希望讀者能夠通過本文對Apache Beam有初步的了解,同時對於分散式數據處理系統如何處理亂序無限數據流的能力有初步的認識。

Apache Beam基本架構

隨著分散式數據處理不斷發展,新的分散式數據處理技術也不斷被提出,業界湧現出了越來越多的分散式數據處理框架,從最早的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分散式處理框架可能帶來的更高的性能,更強大的功能,更低的延遲等,但用戶切換到新的分散式處理框架的代價也非常大:需要學習一個新的數據處理框架,並重寫所有的業務邏輯。解決這個問題的思路包括兩個部分,首先,需要一個編程範式,能夠統一,規範分散式數據處理的需求,例如,統一批處理和流處理的需求。其次,生成的分散式數據處理任務應該能夠在各個分散式執行引擎上執行,用戶可以自由切換分散式數據處理任務的執行引擎與執行環境。Apache Beam正是為了解決以上問題而提出的。

Apache Beam主要由Beam SDK和Beam Runner組成,Beam SDK定義了開發分散式數據處理任務業務邏輯的API介面,生成的的分散式數據處理任務Pipeline交給具體的Beam Runner執行引擎。Apache Beam目前支持的API介面是由Java語言實現的,Python版本的API正在開發之中。Apache Beam支持的底層執行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等執行引擎的支持也在討論或開發當中。其基本架構如下圖所示:

圖1 Apache Beam架構圖

需要注意的是,雖然Apache Beam社區非常希望所有的Beam執行引擎都能夠支持Beam SDK定義的功能全集,但是在實際實現中可能並不一定。例如,基於MapReduce的Runner顯然很難實現和流處理相關的功能特性。目前Google DataFlow Cloud是對Beam SDK功能集支持最全面的執行引擎,在開源執行引擎中,支持最全面的則是Apache Flink。

Beam Model

Beam Model指的是Beam的編程範式,即Beam SDK背後的設計思想。在介紹Beam Model之前,先簡要介紹一下Beam Model要處理的問題域與一些基本概念。

  1. 數據。分散式數據處理要處理的數據類型一般可以分為兩類,有限的數據集和無限的數據流。有限的數據集,比如一個HDFS中的文件,一個HBase表等,特點是數據提前已經存在,一般也已經持久化,不會突然消失。而無限的數據流,比如kafka中流過來的系統日誌流,或是從twitter API拿到的twitter流等等,這類數據的特點是,數據動態流入,無窮無盡,無法全部持久化。一般來說,批處理框架的設計目標是用來處理有限的數據集,流處理框架的設計目標是用來處理無限的數據流。有限的數據集可以看做是無限的數據流的一種特例,但是從數據處理邏輯的角度,這兩者並無不同之處,例如,假設微博數據包含時間戳和轉發量,用戶希望按照統計每小時的轉發量總和,此業務邏輯應該可以同時在有限數據集和無限數據流上執行,並不應該因為數據源的不同而對業務邏輯的實現產生任何影響。
  2. 時間。Process Time是指數據進入分散式處理框架的時間,而Event-Time則是指數據產生的時間。這兩個時間通常是不同的,例如,對於一個處理微博數據的流計算任務,一條2016-06-01-12:00:00發表的微博經過網路傳輸等延遲可能在2016-06-01-12:01:30才進入到流處理系統中。批處理任務通常進行全量的數據計算,較少關注數據的時間屬性,但是對於流處理任務來說,由於數據流是無情無盡的,無法進行全量的計算,通常是對某個窗口中得數據進行計算,對於大部分的流處理任務來說,按照時間進行窗口劃分,可能是最常見的需求。
  3. 亂序。對於流處理框架處理的數據流來說,其數據的到達順序可能並不嚴格按照Event-Time的時間順序。如果基於Process Time定義時間窗口,數據到達的順序就是數據的順序,因此不存在亂序問題。但是對於基於Event Time定義的時間窗口來說,可能存在時間靠前的消息在時間靠後的消息後到達的情況,這在分散式的數據源中可能非常常見。對於這種情況,如何確定遲到數據,以及對於遲到數據如何處理通常是很棘手的問題。

Beam Model處理的目標數據是無限的時間亂序數據流,不考慮時間順序或是有限的數據集可看做是無限亂序數據流的一個特例。Beam Model從下面四個維度歸納了用戶在進行數據處理的時候需要考慮的問題:

  1. What。如何對數據進行計算?例如,Sum,Join或是機器學習中訓練學習模型等。在Beam SDK中由Pipeline中的操作符指定。
  2. Where。數據在什麼範圍中計算?例如,基於Process-Time的時間窗口,基於Event-Time的時間窗口,滑動窗口等等。在BeamSDK中由Pipeline中的窗口指定。
  3. When。何時將計算結果輸出?例如,在1小時的Event-Time時間窗口中,每隔1分鐘,將當前窗口計算結果輸出。在Beam SDK中由Pipeline中的Watermark和觸發器指定。
  4. How。遲到數據如何處理?例如,將遲到數據計算增量結果輸出,或是將遲到數據計算結果和窗口內數據計算結果合併成全量結果輸出。在Beam SDK中由Accumulation指定。

Beam Model將」WWWH「四個維度抽象出來組成了Beam SDK,用戶在基於Beam SDK構建數據處理業務邏輯時,在每一步只需要根據業務需求按照這四個維度調用具體的API即可生成分散式數據處理Pipeline,並提交到具體執行引擎上執行。「WWWH」四個維度的抽象僅僅關注業務邏輯本身,和分散式任務如何執行沒有任何關係。

Beam SDK

不同於Apache Flink或是Apache Spark,Beam SDK使用同一套API表示數據源,輸出目標以及操作符等。下面介紹4個基於Beam SDK的數據處理任務,通過這四個數據處理任務,讀者可以了解通過Beam Mode是如何統一靈活的描述批處理和流處理任務的,這4個任務用來處理手機遊戲領域的統計需求,包括:

  1. 用戶分數。批處理任務,基於有限數據集統計用戶分數。
  2. 每小時團隊分數。批處理任務,基於有限數據集統計每小時,每個團隊的分數。
  3. 排行榜。流處理任務,2個統計項,每小時每個團隊的分數以及用戶實時的歷史總得分數。
  4. 遊戲狀態。流處理任務,統計每小時每個團隊的分數,以及更複雜的每小時統計信息,比如每小時每個用戶在線時間等。

註:示例代碼來自Beam的源碼,具體地址參見:apache/incubator-beam。部分分析內容參考了Beam的官方文檔,詳情請參見引用鏈接。

下面基於Beam Model的「WWWH」四個維度,分析業務邏輯,並通過代碼展示如何通過Beam SDK實現「WWWH」四個維度的業務邏輯。

用戶分數

統計每個用戶的歷史總得分數是一個非常簡單的任務,在這裡我們簡單的通過一個批處理任務實現,每次需要新的用戶分數數據的時候,重新執行一次這個批處理任務即可。對於用戶分數任務,「WWWH」四維度分析結果如下:

通過「WWWH」的分析,對於用戶分數這個批處理任務,通過Beam Java SDK實現的代碼如下所示:

gameEventsnn [... input ...]nn [... parse ...]nn .apply("ExtractUserScore", new ExtractAndSumScore("user")) nn [... output ...];n

ExtractAndSumScore實現了「What」中描述的邏輯,即按用戶分組,然後累加分數,其相關代碼如下:

gameInfonn .apply(MapElementsnn .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))nn .withOutputType(nn TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))nn .apply(Sum.<String>integersPerKey());n

通過MapElements確定Key與Value分別是用戶與分數,然後Sum定義按key分組,並累加分數。Beam支持將多個對數據的操作合併成一個操作,這樣不僅可以支持更清晰的業務邏輯實現,同時也可以在多處重用合併後的操作邏輯。

每小時團隊分數

按照小時統計每個團隊的分數,獲得最高分數的團隊可能獲得獎勵,這個分析任務增加了對窗口的要求,不過我們依然可以通過一個批處理任務實現,對於這個任務的「WWWH」四個維度的分析如下:

相對於第一個用戶分數任務,只是在Where部分回答了「數據在什麼範圍中計算?」的問題,同時在What部分「如何計算數據?」中,分組的條件由用戶改為了團隊,這在代碼中也會相應的體現:

gameEventsnn [... input ...]nn [... parse ...]nn .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i)nn -> new Instant(i.getTimestamp())))nn .apply("FixedWindowsTeam", Window.<GameActionInfo>into(nn FixedWindows.of(Duration.standardMinutes(windowDuration))))nn .apply("ExtractTeamScore", new ExtractAndSumScore("team"))nn [... output ...];n

「AddEventTimestamps」定義了如何從原始數據中抽取EventTime數據,「FixedWindowsTeam」則定義了1小時固定窗口,然後重用了ExtractAndSumScore類,只是將分組的列從用戶改成了團隊。對於每小時團隊分數任務,引入了關於「Where」部分窗口定義的新業務邏輯,但是從代碼中可以看到,關於「Where」部分的實現和關於「What」部分的實現是完全獨立的,用戶只需要新加兩行關於「Where」的代碼,非常簡單和清晰。

排行榜

前面兩個任務均是基於有限數據集的批處理任務,對於排行榜來說,我們同樣需要統計用戶分數以及每小時團隊分數,但是從業務角度希望得到的是實時數據。對於Apache Beam來說,一個相同處理邏輯的批處理任務和流處理任務的唯一不同就是任務的輸入和輸出,中間的業務邏輯Pipeline無需任何改變。對於當前示例的排行榜數據分析任務,我們不僅希望他們滿足和前兩個示例相同的業務邏輯,同時也可以滿足更定製化的業務需求,例如:

  1. 流處理任務相對於批處理任務,一個非常重要的特性是,流處理任務可以更加實時的返回計算結果,例如計算每小時團隊分數時,對於一小時的時間窗口,默認是在一小時的數據全部到達後,把最終的結算結果輸出,但是流處理系統應該同時支持在一小時窗口只有部分數據到達時,就將部分計算結果輸出,從而使得用戶可以得到實時的分析結果。
  2. 保證和批處理任務一致的計算結果正確性。由於亂序數據的存在,對於某一個計算窗口,如何確定所有數據是否到達(Watermark)?遲到數據如何處理?處理結果如何輸出,總量,增量,並列?流處理系統應該提供機制保證用戶可以在滿足低延遲性能的同時達到最終的計算結果正確性。

上述兩個問題正是通過回答「When」和「How」兩個問題來定義用戶的數據分析需求。「When」取決於用戶希望多常得到計算結果,在回答「When」的時候,基本上可以分為四個階段:

  1. Early。在窗口結束前,確定何時輸出中間狀態數據。
  2. On-Time。在窗口結束時,輸出窗口數據計算結果。由於亂序數據的存在,如何判斷窗口結束可能是用戶根據額外的知識預估的,且允許在用戶設定的窗口結束後出現遲到的屬於該窗口的數據。
  3. Late。在窗口結束後,有遲到的數據到達,在這個階段,何時輸出計算結果。
  4. Final。能夠容忍遲到的最大限度,例如1小時。到達最後的等待時間後,輸出最終的計算結果,同時不再接受之後的遲到數據,清理該窗口的狀態數據。

對於每小時團隊得分的流處理任務,本示例希望的業務邏輯為,基於Event Time的1小時時間窗口,按團隊計算分數,在一小時窗口內,每5分鐘輸出一次當前的團隊分數,對於遲到的數據,每10分鐘輸出一次當前的團隊分數,在窗口結束2小時後遲到的數據一般不可能會出現,假如出現的話,直接拋棄。「WWWH」表達如下:

在基於Beam SDK的實現中,用戶基於「WWWH」 Beam Model表示的業務邏輯可以分別獨立直接的實現出來:

gameEventsn [... input ...]n .apply("LeaderboardTeamFixedWindows", Windown .<GameActionInfo>into(FixedWindows.of(n Duration.standardMinutes(Durations.minutes(60))))n .triggering(AfterWatermark.pastEndOfWindow()n .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()n .plusDelayOf(Durations.minutes(5)))n .withLateFirings(AfterProcessingTime.pastFirstElementInPane()n .plusDelayOf(Durations.minutes(10))))n .withAllowedLateness(Duration.standardMinutes(120)n .accumulatingFiredPanes())n .apply("ExtractTeamScore", new ExtractAndSumScore("team"))n [... output ...]n

LeaderboardTeamFixedWindows對應「Where」定義窗口,Trigger對應「Where」定義結果輸出條件,Accumulation對應「How」定義輸出結果內容,ExtractTeamScore對應「What」定義計算邏輯。

總結

Apache Beam的Beam Model對無限亂序數據流的數據處理進行了非常優雅的抽象,「WWWH」四個維度對數據處理的描述,非常清晰與合理,Beam Model在統一了對無限數據流和有限數據集的處理模式的同時,也明確了對無限數據流的數據處理方式的編程範式,擴大了流處理系統可應用的業務範圍,例如,Event-Time/Session窗口的支持,亂序數據的處理支持等。Apache Flink,Apache Spark Streaming等項目的API設計均越來越多的借鑒或參考了Apache Beam Model,且作為Beam Runner的實現,與Beam SDK的兼容度也越來越高。本文主要介紹了Beam Model,以及如何基於Beam Model設計現實中的數據處理任務,希望能夠讓讀者對Apache Beam項目能夠有一個初步的了解。由於Apache Beam已經進入Apache Incubator孵化,所以讀者也可以通過官網或是郵件組了解更多Apache Beam的進展和狀態。

引用

1. Apache Beam (incubating)

2. oreilly.com/ideas/the-w

3. The world beyond batch: Streaming 102

4. cloud.google.com/datafl


推薦閱讀:

Spark 2017歐洲技術峰會摘要(流計算分類)

TAG:大数据 | 流计算 |