在流式計算場景下如何確保輸入的齊全度?

使用流式計算做實時數據分析,就拿支付寶來舉例,我可以在應用系統上把用戶的行為以消息發送到storm,讓storm實現實時的用戶行為分析,甚至可以判斷用戶是否在洗錢、賬號是否可能被盜用了等等。

假設判斷洗錢的演算法是:用戶按照先後順序做了A-B-D三個動作(純YY),那就可能是洗錢,要發送報警簡訊通知審查。如果是A-B-C-D,那就是正常行為,不需報警。

但是A、B、C、D是在分散式系統上採集的,即使用戶行為的產生有先後,但是消息採集、傳送到storm、被storm處理的順序也可能不會嚴格遵循產生順序。

比如用戶的行為是A-B-C-D,但是C發生的那個系統在傳遞消息的過程中滯後了(比如依靠JAVA的消息中間件來發送,而這個JVM恰好在傳遞C時做了FGC),導致Storm接收到了A-B-D-C(很可能是用一個類似狀態機的邏輯在持續處理),那收到D的時候到底要不要發送報警?發了,那就是誤報,不發,那如果C確實沒有發生呢?

現在唯一能想到的辦法只有超時,Timeout一定時間之後才去執行檢測公式。

有沒有更加優雅準確的方案可以解決這個問題?或者是我本身的思路就不對,技術方案有問題?

——————————————————————————————————————

補充一個案例讓問題更飽滿:

前段時間我拿著相似的問題給HEKA(一個GO語言做的實時監控系統)郵件組發了個郵件,其中截圖是:

如圖所示,這是一個按60秒,進行周期型統計的曲線(很可能是一個PV統計,來自HEKA的官方PPT),我標紅了末尾的部分(最近的實時數據是「掉落」下來的)

我當時問的問題就是利用超時、滑動時間窗口做流式處理時,即使面對周期型統計(比如每分鐘的PV),也可能因為各種各樣的性能負載問題導致【不準確的計算結果被提前展示出來】。

上面「洗錢」的例子可能突出了「亂序」的問題。 現在補充的這個例子里,最終一致是可以保障的,「掉落」的那個刻度的數據遲早會補上來,storm有這能力。可問題就是在實時處理的場景中,我們就是期望能儘快的把實時數據release出去,用來展示、用來報警。此時如果沒有一個flag來判斷數據是否已經計算「齊全」,會導致使用起來的種種束縛。

簡而言之, 我可以忍受它【晚點兒再報警】,但是不能忍受【假報警】


今晚又一次沒按時作息,已經很晚了在晚睡時又看到了這個自己專業領域的問題,於是先來大致答下,明天再補充完整。

  1. 大部分分散式流式計算的輸入源都來自是分散式傳輸系統,如kafka等,為保證可以並發傳輸以及下游可以並發讀取,這些輸入源一般都會有拆成多個子輸入源,一般系統頂多在子輸入源內部保序,並不會在各個輸入源之間保序(考慮一下使用場景你會發現多個子輸入源間保序的實現代價太大而且一般沒有意義)。

    所以,在流式計算系統中亂序基本上是一件不可避免的事情。

    流式計算中,一般Join、聚合、排序都會和批處理里不一樣,一般都會把問題轉化成最一段時間內或者某個數據包內部的Join/聚合/排序,這時候,滑動時間窗口是一個在工程上比較常用的方案。基本思路很簡單,就是按時間截把數據扔到窗口裡,窗口滑入滑出時分別執行一些需要的動作。

    當然本質上這個方案還是在利用超時來解決問題,而且這個也確實仍有失敗的可能性,但在絕大部分場景下都可以滿足應用場景的需求。
  2. 用id生成器對事件編號的方案也不能完全避免出錯:

    使用id生成器,大約最好的作法就是在前端伺服器程序給用戶返回事件成功之前,按用戶名哈希到不同的id生成器上,對每個用戶名下的數據生成一個唯一的自增id,然後在後面處理時,就可以根據這個自增id判斷數據完整性了。

    但,萬一調用id生成器時,網路阻塞導致耗時較長。這時用戶的新一次請求到來了,分配到另一台機器上進行了處理,而這台機器網路狀況良好,就會導致後面一次請求的id是小於前面一個請求的id的。

    如果你假設用戶在沒接收到上一次操作的結果前不會進行下一次的操作,那倒是可以搞..

關於樓主後面所補充的使用場景我們這邊也很常見,我們沒有試圖做太多的工作,一般由業務對時效性、數據正確性的需求和數據亂序程度來綜合決定延遲多久進行處理。我們認為這個時候正確性與時效性是不可兼得的。具體你這個而就,你可以把窗口設大點,簡單的等數據從窗口滑出時再處理數據就可以了。

在你提到的場景中,數據如果完全順序到來,就可以用流式處理得到正確的結果,但在許多場景下,流式到來的數據根本是不可能直接計算出正確的結果的,例如,需要計算某個用戶某次廣告點擊是否是作弊點擊,是否要扣廣告主的廣告費。當該用戶第一次點擊時,你根據流式數據判定該用戶沒有作弊,判定需要扣掉廣告主的錢,但接著連續的到來了上萬次點擊,基本上就可以斷定其是作弊行為了。在這種場景下,就算「數據是齊全的」,也必須要用利用延時處理等方案來解決問題。所以,一般在這種情況下,都只能根據用戶的需求來在數據正確性與時效性之間進行取捨了。在對最終一致性要求行高的情況下,經常見到的一種辦法是先給用戶發布一個初步的滿足時效性的結果,後續等結果穩定之後,再發布一個正確性高的版本去修正之前的結果,好多時候這個正確性高的版本經常不會再用流式系統實現而是直接使用批處理來完成了。例如前面我提到的場景,就可以用流式緩存少量數據進行判斷,然後依據流式的結果先扣錢,隨後再用批處理系統進行修正,把多扣的錢返還給用戶。


在Storm內部,我經常使用的手段為:

    1. 整個鏈路按用戶id進行分組。也就是說,相同的用戶只能走同一鏈路,這樣在動作發生後也是順序進行處理。
    2. 注意:時序有要求的業務,前方不能進行自由分散處理,因為會有消息落在各個bolt和spolt里,此時應該統一使用用戶的id進行並發計算。
    3. 副作用: 如果某一用戶操作過於頻繁,則會形成這條鏈路的熱點,也就是數據傾斜。在數據保證處理(ACK方式)下,如果控制不好則對整體吞吐帶來很大的傷害。 慎用!如果你的用戶數據分布比較均勻的話,那麼這種方式還是比較方便的。
      1. 我曾經數據傾斜為500倍左右(按地域購買訂單來shard的,一個很大的坑,北京、上海、杭州比西部一些省份多500倍左右),而且業務要求還得必須這樣子來分,我唯一的辦法是把我要的業務邏輯與其它耗時邏輯分離。最後一個單Bolt把整個業務要求的指標達到並超越。這樣如果熱點就是發生這個Bolt上,那我也不怕了,因為這個Bolt的處理量非常的大(50萬左右的並發,業務要求30左右)
    4. 針對副作用:
      1. 先處理對時序要求較高的業務邏輯。過了這個坎,後續處理可以按其它方式處理。
      2. 如果是計算性的業務:1,能否把計算後移,「非法業務」檢查前移;2,能否把計算抽取出來單獨一層前移(夠嗆,因為哪哪都是同步)並橫向擴展計算規模(前移擴展,後移分散)這種方法很笨,但比沒有好些。
      3. 如果是IO性的業務: 1,IO後移。2,無他,增加並發。3,非同步還是不能用的。

如果從源頭上來講已經沒法保證消息時序了(進入Storm集群之前)。。那隻能使用「用戶操作」緩存用戶操作和操作時間來處理了。今天就寫到這兒吧,如果有需要我們再探討。。


垃圾知乎, 不玩了


可以看一下淘寶鷹眼對於數據齊全度問題的處理方式,他們會在數據源頭按照數據的量,或者固定時間(例如1分鐘)插入一些屏障(barrier),來check數據是否按照時間順序完備(即只有收到了u具體量或者時間點的barrier,才認為一個時間點,或者一定量的數據收集完畢,如果確認收集完畢,即可將告警發出,若未收集完畢,則不告警,這樣可以避免粗暴的使用超時機制來確認是否告警),引文說明如下:

我們自己實現了一個齊全度演算法來完成確定性的保證。它與 Apache Flink 中採納的 Snapshot

演算法有些近似,它其實是在整個流的過程當中去安插一些屏障

(Barrier),我們可以做一個假設,在所有流計算數據產生的地方,數據都是有序的。也就是說我們在收集數據的時候,通過第一探測數據源的深度,第二個在數據時間間隔出現交替的時候,比如說從

12:10 跑到 12:11 的時候去安插一個屏障隨著流做傳遞,流計算過程中保證

FIFO,下游可以把這些屏障聚合起來,這樣最終數據落地的時候,我就可以知道數據源當中有多少個屏障已經達到了最終的存儲當中。通過這種方式,我們其實可以預測出當一個系統在任意一個時刻它的齊全度是多少,這樣的話我們其實可以很容易得到一件事,就是說這個線掉下去了,但是因為在某一個時間點的屏障還沒有完全到齊,所以我現在可以預測是因為監控系統卡住了,沒有辦法給下游的運維人員發報警。因為我知道現在數據還沒有收齊,我沒有具備確定性的閾值保證,是沒有辦法去發報警的。這個齊全度的演算法,也就是我們在流計算當中,針對監控系統報警場景作出的確定性保證的優化。

引文鏈接:

全鏈路穩定性背後的數字化支撐:阿里巴巴鷹眼技術解密


沒經驗,瞎說。

首先排除超時問題。超時問題太複雜,無法要求機器擁有上帝視角。像tcp的4次關閉握手,paxos(具體忘記了)都只能減小超時錯誤的概率,無法完全避免。

下面的就簡單了。事件發生一定有發生時間,記錄時間戳比較就行了。

就是要減少C先於D發生,但卻在D時間之後到達這種事的概率。

通常D事件後,還會有EFG等事件發生(即使沒有也要創造一些虛事件),但EFG不參與決策。這樣如果C未到達,但EFG有任意事件到達,即可認為C未發生。


推薦閱讀:

數據接入 | 如何快速提升數據分析的效率?(上)
想學習大數據要掌握些什麼知識?
國內圖計算研究哪裡比較強?
MapReduce如何解決數據傾斜?
國內較知名的大數據服務平台有哪幾家?

TAG:分散式計算 | 一致性 | ApacheStorm | 大數據處理 | 實時計算 |