最大努力通知?No,事務消息

人們一般用MQ只是實現了最大努力通知模型。

我們最近嘗試實現一種事務消息解決方案。接上文:分散式事務?No, 最終一致性 - 知乎專欄

我們希望這個方案是輕量級的,能實際解決目前業務痛點。

本文幾乎看不到任何理論分析,只談做什麼,怎麼做。因為涉及到一些內部資料,有所省略。

實現事務消息核心是需要有一個支持消息重試的MQ。經調研,我廠有個自研的基於Redis的延時MQ服務。我們暫且叫他DelayQ吧。目前我廠還沒有類似RMQ的事務消息方案。

據稱RocketMQ和ActiveMQ等均有相似特性。您猜對了,我都沒用過。請各位客官幫忙告知下還有哪些有此特性。

DelayQ

DelayQ主要功能是支持不同的消息延遲策略。提供兩個核心介面:addMsg, deleteMsg.

1. addMsg負責將一個重試消息註冊到DelayQ中。其核心參數有:

最大重試次數,重試間隔, 生效時間,下游Url級對應業務參數。

2. deleteMsg負責主動將上面註冊的消息刪除掉。

大致流程是:producer通過addMsg註冊一條延遲消息,DelayQ負責在生效時間點將次消息push給下游consumer。如果下游返回成功,則不再繼續發送。否則,會每隔重試間隔嘗試發送,直到最大重試次數為止。

在一些特殊場景,producer或consumer可以通過deleteMsg介面主動刪除該消息。

我廠DelayQ是基於Redis的zset實現的。我們只是個業務團隊,所以也沒有參與其開發。下面根據個人理解,簡單說明下他大概是如何工作的(對顯示器發誓,此圖只是本人yy之作。如有雷同切莫當做泄露公司機密。我確實也沒看到詳細的設計文檔。正因此也是僅供參考,實操性不大)。

如上所述,關鍵數據結構就是zset。Redis集群方案選用Codis.

每次addMsg時,都會給該消息生成一個唯一id。然後計算其下次嘗試時間。這裡就是DelayQ要實現的核心策略部分。比如,我們可以每次間隔相同時間,也可以是指數衰減的間隔去嘗試。為了實現這些,我們還需要在消息體上面額外記錄一些信息。比如,上次嘗試時間,總共已嘗試次數等信息。這些都搞定了就將消息和其他一些必要信息寫到zset中。zset以ExecuteTime作為Score排序。

消息進到zset後,DelayQ會通過timer觸發(比如秒級),fork相應的消費線程去處理zset里ExecuteTime大於當前時間的消息。DelayQ拿到一條消息後,解析其中的callbackurl,並組裝參數,push業務消息給Consumer.

Consumer返回處理成功,那麼zrem Codis里的消息。如果處理失敗,則計算其下次嘗試時間,並更新其ExecuteTime.

YY主要流程如上。相信實際實現過程中,還有很多問題需要解決。比如,定時處理,如何提高並發度?考慮到redis丟消息情況,還需要做些啥?回調介面超時和限流問題等等。因為理解不深,所以不敢繼續寫了,怕露餡^_^

事務消息

基於上面的DelayQ,我們嘗試提出一種可靠的消息傳遞機制(事務消息)。核心思想抄襲RMQ。直接上圖。

我們在DelayQ前面增加一層Proxy,暫且就叫TMQProxy吧。producer將通過TMQProxy跟DelayQ交互,不會直接跟DelayQ進行交互。

TMQ相對於DelayQ要求Producer多提供兩個信息,一個是checkUrl, 一個是confirmUrl(可選)。producer需要分別實現這兩個介面。

check介面:主要是告知TMQProxy,當前消息是否可以發送。對應上面步驟6.

confirm介面:可選介面。TMQProxy將消息處理成功後會通過該主動通知Producer消息已經處理成功。如果producer對此結果不感興趣,就可以不必實現。

詳細過程如上面交互圖。就不一一解釋了。如果對實現感興趣,還是建議稍微仔細看下上圖,畫這個還是費了點腦細胞的。相信大家還是能看明白大致思想。

我們引入TMQProxy還有一個目的是,不想讓內部業務使用DelayQ上太過花哨。比如其內部topic這些,我們都屏蔽掉了。重試策略這些也不希望業務層玩得太靈活,所以只能提供枚舉的策略。

另外一個考慮是,DelayQ可能也只是個階段性方案,後續如果切換其他MQ。我們希望盡量做到業務方無感知。直接通過Proxy屏蔽底層具體的MQ實現。

TMQProxy提供的介面大致如下(示例):

(知乎不支持表格?)

稍微啰嗦下timeout。timeout設置合理性還是很重要的。這裡check和confirm的超時時間,我們會限制比較小的時間。如果不滿足,則可能拒絕接入TMQProxy。而下游的處理時間一般都會比較長。這裡,我們可以容忍第一次超時。但是處理成功後第二次請求還耗很長時間是接受不了的。所以需要下游該加結果cache加結果cache,該優化冪等演算法優化冪等演算法。如果連續timeout多次,其實這條消息可以考慮丟棄。

簡化

細心的朋友可能發現上一節的圖裡多出了個DB,主要是存儲消息的一些狀態。引入DB目的是完全代替業務側的消息表。可以在消息狀態查詢,故障恢復等中起到兜底作用。當然,也可以有效減少查詢check介面的次數。

實際應用中,我們認為這個DB的引入有點過重。因為Redis和DelayQ穩定性已然經受住了更大業務量的考驗。目前已經趨於成熟了,也沒發生過大面積丟消息等嚴重事故。

所以我們在TMQProxy中,把DB先拋棄了。上面所有DB相關操作實際上,我們是沒有實現的。所以消息Id實際上也是用了DelayQ返回的消息Id。消息Id,主要用於delete。這裡有點小坑,就不擴展了。實際應用中,因為我們通過同步介面中返回碼控制消息是否要刪除掉,所以饒過了這個坑。

所以,很多時候,我們設計時候盡量考慮全,實際實現和應用的時候會做很多tradeoff。據說這個是架構師關鍵素養。這話題小的就不敢擴展了。

還有啥

一個系統,如果只是畫幾張圖,把代碼寫完就萬事大吉該多好?可惜,我們想讓他上線,還需要考慮很多很多問題(考慮的永遠都不夠多)。因為有點跑題,所以就不再擴展,只是蜻蜓點水,純屬湊字數了。

1. 限流。對於一個通用服務,因為接入方雜七雜八,所以不管是producer請求頻次控制,還是對下游的保護,都離不開限流措施。可以在不同層進行限流。介面層,我們考慮按producer+msgType等多個維度進行限流。做全局限流還有點麻煩。假設loadbalancer足夠均勻,我們只對單機做限流基本就夠了。這裡推薦com.google.common.util.concurrent.RateLimiter,非常好用。感謝肖少早前推薦。

2. 監控。不怕出問題,就怕出了問題不知道。監控非常重要。關鍵是監控些啥?不展開了,這個真的非常非常重要。相信大部分廠子都有自己的一套相對完整的監控系統。

3. 降級和兜底。首先Producer可以把TMQProxy當做直接調用下游失敗後的補充手段。這樣就不會對TMQProxy產生強依賴。TMQProxy自身沒有存儲,所以除了邏輯錯誤,最大可能就是下游DelayQ故障了。此時,因為沒有存儲,只能通過日誌進行恢復了。我們需要規範日誌列印,並準備好處理腳本。把故障期間消費失敗的消息重新處理一遍。理論上只要DelayQ不丟消息,我們也可以等待DelayQ恢復後重試。不過如果add就失敗,那根據上面流程,業務上肯定是會有損失的。所以我們對DelayQ的穩定性,要求至少是4個9。所以更多的降級,應該在producer側結合業務去考慮。對於DelayQ丟消息場景,目前大部分業務場景只能通過對賬進行發現和補賬。當然,有些業務自己會實現主動查詢機制。通過定期或人為(可能是用戶查詢,也可能是後台查詢)觸發查詢下游,並同步狀態。

4. 部署。如果有條件,最好獨立部署。作為proxy,可能性能瓶頸在網路IO上。所以消息體不應過大。一種常見做法是Producer發給下游的只是一個Id,下游獲得該消息後,還得查一下producer才能拿到全部信息。實際情況還是看量。

5. 壓測。線上線下壓測還是要經常做一做的。上線前做一次可能不夠。因為邏輯可能會有變化。線下壓測可能也不夠,因為線上應用場景跟線下模擬的可能不同。線上線下機器配置可能也不一致。所以有條件做全鏈路壓測,那就圓滿了。全鏈路壓測是個浩大的工程,大廠似乎都在玩。比如微信動不動就演練....


推薦閱讀:

聊聊分散式
Spring整合Quartz分散式調度
分散式事務提交協議: 2PC/3PC
分散式事務解決方案與適用場景分析
分散式系統數據層設計模式

TAG:分散式事務 | 分散式一致性 | 交易系統 |