實時處理中的"exactly once"方案具體的應用場景或者使用方法是什麼?

最近看了storm,jstorm,flink,spark等提供的exactly once方案。

如果我理解的沒錯的話,這些方案是能夠保障一個粒度較小的task只執行一次,或者只通過checkpoint一次,但是在task內,或者checkpoint之間還是有可能有代碼執行多次的。離那種強嚴密統計的要求還有差距。比如說某一步里有一行代碼是向db或者redis里累加數據,crash的時候只知道這個task沒執行完,沒辦法判斷寫redis或者db的那行代碼跑沒跑,怎麼能保障沒有重複累加呢?

所以想問一下這些方案有什麼使用場景,或者說現實情況下是用什麼方式更好的使用這種機制的?有具體的案例最好。


保證exactly-once是需要源端,streaming系統和輸出共同協作,主要的要求是

1.源端要支持重放, 比如Kafka,Amazon Kinesis

2. 中間streaming系統的容錯處理保證task只會產生exactly-once的輸出

3. 輸出端要有transactional update

  • 下游輸出冪等的情況比較好處理,streaming系統輸出結果可以直接update
  • 下游輸出不冪等,需要引入版本控制機制

可以參考:

High-throughput, low-latency, and exactly-once stream processing with Apache Flink? - data Artisans

Mastering Apache Spark 2.0


流式計算框架的exactly once指的是最終的處理結果是exactly once的,不是說對輸入的數據只恰好處理一次。這裡以計數為例,我們說的exactly once指的是寫出的最終的結果(這裡我們假設是DB)與輸入的數據一致,一條不多一條不少。這個聽起來很容易,但是實現起來卻並不容易,因為 流式計算框架通常是分散式的,而且通常有著比較複雜的topology。在這裡我簡單描述下三種流式計算框架(storm trident, spark streaming, flink)分別是如何實現exactly once的。

storm trident

首先我們來說說storm trident。因為分散式系統中隨時都可能出現機器掛掉的情況,要保證實現exactly once最基本的一點就是在掛了之後需要重試。光重試是不夠的,重試的過程中還要保證順序。打個比方我們處理2條kafka的消息,消息A表示為(offset: 1, key: cnn, count: 1),消息B表示為(offset: 2, key: cnn, count: 2)。如果不保證順序,那麼處理的流程可能是這樣的:

  1. 將A的數據存入資料庫
  2. 將B的數據存入資料庫
  3. 通知系統B處理完成
  4. 系統掛了,只能重試,由於A尚未ack,所以系統從A開始拉取,但B已經處理完成,因此無法實現exactly once

但是如果我們保證處理的順序性,那麼處理的流程就是這樣的:

  1. 將A的數據存入資料庫,並記錄下最後的offset
  2. 通知系統A處理完成
  3. 將B的數據存入資料庫,並記錄下最後的offset
  4. 通知系統B處理完成

在這種情況下如果尚未通知系統A處理完成就掛了,有2種可能:A未寫入資料庫或者A已經寫入資料庫。而這兩種情況可以根據記錄下來的最後的offset來判斷。

一個分散式系統中如果一條一條地處理,顯然吞吐太低,嚴重浪費資源,這裡可以將一條一條改成一批一批地處理。這樣雖然比一條一條地處理好,但是面對複雜的topology,依然會嚴重浪費資源。比如我的topology需要先解析kafka里的日誌,並提取出相應的欄位,然後根據key shuffle到不同的機器上聚合然後存入到資料庫里去。這裡顯然只有存入資料庫的這個動作是需要按照順序一批一批處理,而前面解析日誌的部分不需要嚴格按照順序處理,因此storm trident里的operator分為兩種,一種是有狀態的,一種是無狀態的。有狀態的處理需要嚴格保序,而無狀態的operator則不需要等待上一個批次處理完成。這個其實與tcp中的窗口的思想有點類似。

以上就是storm trident的基本思想。因此從上面的分析可以看出,要實現exactly once的處理,輸入流需要支持回退(kafka就是一個常用的輸入流),輸出需要支持update(比如mysql, redis),比如使用kafka作為輸出是做不到exactly once的。

spark streaming

spark streaming通過將輸入切分成一個一個的batch,在遭遇失敗的時候需要重新回放最後一個batch,因此它要求foreach rdd的操作是冪等。看了storm trident分析的同學可能有一個困惑:spark streaming要實現exactly once,就要保證按順序處理,它是如何做到的呢?我們知道spark streaming的每一個batch都會生成一個job來處理,在內部實現中spark streaming只允許同時運行一個job,也就是只允許同時處理一個batch。這種做法的問題在於會嚴重浪費資源。

flink

flink所提出的使用checkpiont方法來實現exactly once是目前我了解到的最優雅的方式。trident和spark streaming的batch方式的一個問題在於資源的利用率。batch切小了吞吐上不去,切大了需要預分配更多的資源,而且trident基於storm原生的ack機制,所以batch還與超時的設置相關。flink的基本思路就是將狀態定時地checkpiont到hdfs中去,當發生failure的時候恢復上一次的狀態,然後將輸出update到外部。這裡需要注意的是輸入流的offset也是狀態的一部分,因此一旦發生failure就能從最後一次狀態恢復,從而保證輸出的結果是exactly once的。因此這裡注意exactly once的條件跟storm trident相同:輸入流要能回退到某一點,輸出要能被update。


exactly once是指的運算元對下游產出的結果有且僅有一份會被下游獲取

也就是說,某級運算元可能會被重複調度多次,但無論被重複執行多少遍,都保證一份輸入有且僅有一次產出可以被下游獲取到。

不過,exactly once有一個前提,即運算元不能自己直接與外部系統交互。

如果確實需要與外部系統交互,就需要自己做一些額外的操作才能確保exactly once。

如果做不到了exactly once,僅僅做到at least once的話,而用戶又有上述exactly once需求時,則用戶需要在數據中加上編號,需要每級運算元都自己去完成去重操作,使用上會極度複雜低效。

而做到了exactly once的系統,則除與外部系統交互的模塊外,用戶都不再需要關注去重的工作。

與外部系統交互的模塊,一般與外部系統的交互無非是輸入、輸出、狀態存儲三種需求。

一般而言,現代的流式計算系統,都會提供一些機制,用以給用戶提供一種與計算保證原子性的內部狀態存儲機制(我這裡把它稱為Status),它會把數據處理切割成許多輪(最小可以把一條輸入數據當做是一輪),然後每輪處理中,用戶對Status的操作,會與數據的處理保證原子性。

用戶在每輪處理前讀取Status,處理後,會更新Status,然後本輪處理返回。

某輪處理返回後,系統會去嘗試

1. 本輪內給下游產生輸出真正成功提交,以便可被下游讀取

2. 並同時持久化更新Status

3. 標記輸入數據已經處理過了,未來不會再嘗試重試

這三個操作會是一個原子的過程,即,如果一個操作失敗,則另外兩個也會同時失敗。這樣,就保證了無論何時,用戶收到的Status與數據一定是匹配的。

有了這樣的機制之後,輸入、輸出、狀態存儲,只要上下游系統滿足一些特性,就可以實現輸入、輸出以及外部狀態的不丟不重了。

先來考慮輸入數據:假設輸入系統是一種帶訂閱id的訂閱系統,即,訂閱系統可以提供讀取某個id起的數據這樣的介面,則每輪讀取完成後,在Status中保存已讀取的id,下輪讀取時從該id下一位讀取就可以了。

這樣,由於id被更新與數據可被下游獲取是同一個原子的過程,這就保證了一份數據輸入,下游只會獲取到一份。

再來考慮輸出數據:一般的流式計算系統中,輸出的有序性需要下游系統支持冪等的操作,然後在流式計算系統中去使用這些冪等的操作。一個典型的場景是如果某流式系統支持Status並且流式系統保證順序的話,可以用Status為每條數據維護一個id,下游系統就可以使用該id去重,以保證最終結果不重複。

輸出數據時為保吞吐,經常會進行非同步寫入,這時,就可能需要一些複雜的機制來保證輸入的數據不能被過早刪除掉,一定要等到數據確保寫入下游系統後,才可以標記上游數據處理完成,可以不必再重複處理。

而與外部狀態存儲的需求則更為複雜一些,一般需要進行多版本號控制,即在外部存儲中維護多個版本的狀態,然後在Status中維護版本號,如果發生故障,則從Status中獲取版本號,以便知曉自己該去讀取哪個版本的狀態。

雖然,聽起來這些很複雜,但實際上,目前的各種流式系統中,基本上都會把一些常見的輸入、輸出、外部狀態存儲需求都包裝起來,使得用戶不再需要關注前邊這些細節就可以與這些常見外部系統不丟不重的打交道(當然,你得用專用的介面而不是裸用外部系統原始介面)。

所以,總結一下就是,做到不丟不重(exactly once)也不能保證運算元不被重算,但可以保證下游收到的數據一定是不丟不重的。


對於應用方來講,框架可以幫你做到at least once,單純想靠框架幫你實現exactly once基本不可能,這得取決於你的輸出是不是冪等的。如果不冪等,又出現了retry,那基本就gg…


先說結論:這些框架只能幫助application實現effective once, 而無法從end2end上保證這點。

Exact once其實非常誤導人,準確來說分散式環境的exactly once都是effective once;詳見:Viktor Klang: 「 I』m coining the phrase 『effectively-once』 for message processing with at-least-once + idempotent operations ,」 http://twitter.com , October 20, 2016.

適應環境:side-effect 不允許重複多次的情況,都需要用fencing或者idempotent來得到effective once防止程序錯誤. 而且根據end to end argument(Jerome H. Saltzer, David P. Reed, and David D. Clark: 「 End-to-End Arguments in System Design ,」 ACM Transactions on Computer Systems , volume 2, number 4, pages 277–288, November 1984. doi:10.1145/357401.357402) 這種保證是無法利用中間件保證的,而必須由處在end一端的application層來實現,中間件層只能提供幫助。


推薦閱讀:

sbt結合IDEA對Spark進行斷點調試開發
【博客存檔】Machine Learning With Spark Note 5:構建聚類模型
Spark Core源碼分析--任務提交
矽谷之路 48: 深入淺出Spark(五)數據怎麼存

TAG:ApacheStorm | Spark | Flink |