RAID 6 應用於消息隊列

The Problem

Kafka 是一個很流行的消息隊列。但是在使用中,我們發現目前的消息隊列設計仍然有改進的空間

問題出在從在線業務系統往kafka master寫入的過程中。kafka給業務系統提出了兩難的選擇

  • 當往kafka master的網路出問題了。或者kafka master自身在選主的時候。要麼業務系統選擇放棄latency,去等待故障修復
  • 要麼業務系統選擇丟數據。在kafka master不可用的時候把消息給扔掉

這兩個選擇都很艱難。kafka雖然號稱是一個分散式的系統,但是對於單partition的寫入仍然是單點的。

解決辦法也很簡單。就是如果partition 1寫入失敗,就去寫partition 2。不同partition的master可以是不同的節點。或者可以同時寫多個partition,一個成功就算成功。代價是放棄了單partition的消息有序性,以及更多的消息冗餘而且消費方要做更多的努力去保持冪等。

第二個問題在master和replica之間的複製。如果不等任何一個replica複製成功就返回,master掛了就會丟數據。如果需要replica,則顯著增加寫入的延遲,並且佔用更多的資源。

我們來看看是否可以應用 RAID6 的磁碟存儲技術到消息隊列上。

衝突是可以避免的

我們先來看是否可以把中心化的master節點給幹掉?形成這樣的部署結構呢?

其中寫入代理和在線業務系統部署在同一台機器上。當然寫入代理也可以是寫入的sdk,做為lib嵌入到在線業務系統里。讀取代理和離線系統部署在同一台機器上,也可以是讀取的sdk,做為lib嵌入到離線系統里。

但是這樣會引起一個很嚴重的問題,offset衝突。kafka通過把給定partition的數據交給唯一的一個master機器來寫入來分配這個offset。沒有了這個中心化的master,怎麼保證多個在線業務系統的機器在寫入的時候不會產生offset衝突呢?

這個offset可以使用機器的timestamp的nanosecond。這個選擇聽起來很糟糕對不對?

  • 不要求多個寫入機器之間的絕對有序。本來前序的處理就是亂序的負載均衡的。多個機器生產的數據嚴格排序大部分場景上下沒有收益。使用timestamp可以使得總體上來說基本有序。
  • nanosecond timestamp分配的offset是為了盡量避免多個producer寫入到同一個offset,產生寫入衝突。
  • nanosecond是十億分之一秒。也就是說按照大部分業務生產的速度來說,衝突的概率是很小的。
  • 如果寫入發生了衝突,則offset+一個random的整數,然後重試。

然後問題就是怎樣知道寫入衝突了

每個replica對應到一個offset就是一個洞。offset與offset之間不需要連續。如果是6個replica,那麼每個offset就有6個洞要填。每個repica保證每個洞只能被寫入一次。那麼多個寫入方對於同一個offset就是爭搶這6個洞。比如6個寫入成功了4份就可以認為寫入成功了。因為其他的producer只剩2個洞可以用,它們在這個offset肯定寫不成功的。

但是如果一個producer寫成功了3個洞,另外一個producer寫成功了3個洞,這樣怎麼辦?這種情況就是兩個producer都寫入失敗了。這個offset就算作廢了。讀取的時候不能只讀一個洞就認為讀到了值。讀取也要讀到了4份完全一致的數據才算讀取成功了。

這樣的好處就是如果是要寫6份,寫成功了4份就算成功。可以並行地啟動6份寫入,然後只要等到4份ACK了就可以認為寫入成功了,剩下的2份非同步處理就可以了。一方面容忍了三分之一機器的故障。同時也容忍了三分之一機器出現寫入慢的長尾問題。最大化的利用了不把雞蛋放到一個籃子里的好處。

但是消費方怎麼辦?它如何能讀取到所有producer寫入的數據呢?畢竟producer之間的時鐘不是完全同步的。有可能讀到了offset10024之後,之前的offset10018又被寫入了,這條消息不就被漏掉了嗎?解決辦法就是延遲讀取。只要滯後500ms就可以cover大部分NTP不同步引起的問題了。對於大部分准實時和離線業務來說,500ms的延遲讀取不算什麼大的問題。另外一個辦法就是讓讀取的窗口重疊。比如之前是[0, 500ms], [500ms, 1000ms],現在就改成[0, 400ms], [200ms, 600ms], [400ms, 800ms],以讀取放大的代價來避免漏掉消息。

完全逐條的硬實時大部分情況下就是個偽需求。micro batch is good enough。micro batch比逐條的模型,減少了單個offset產生的contention。

複製是可以避免的

基於多副本複製的另外一個問題是因為複製引起的磁碟,網路和cpu的消耗。如果複製6份寫入,4份成功就算成功的話,1份數據就要冗餘出5份來做寫入。雖然通過並發寫入降低了延遲,但是總的寫入放大仍然是非常可觀的。對於一些價值可能並不高的數據,也許不希望有這麼多副本。這個也是kafka設計ISR的模式的出發點,希望儘可能少的減少副本數量,降低成本。

如何既保持6副本,但是減少總的寫入放大呢?也許我們可以嘗試RAID 6,通過reed solomon編碼,把一份消息切成6份,存入6個節點裡。只要4份寫入成功了,就算寫入完了。這樣寫入放大的成本僅僅是reed solomon編碼帶來的冗餘。而且實際需要同步等待寫入的只是6份中的4份而已。

reed solomon編碼的問題是編碼慢,解碼慢。編碼可以用SSE進行優化,比如klauspost/reedsolomon

解碼慢可以用多個消費方共享同一份拷貝來解決。本質上是一份讀緩存。這份緩存不用考慮可用性,隨時可以用原始的拷貝重建。

總結

本文構想了一種降低消息隊列寫入延遲和提高可用性的方法。把kafka的單partition從CP系統變成了AP系統。同時減少了總體的持久存儲成本。


推薦閱讀:

Kafka(一)初識消息隊列
Kafka濫用案例
kafka解決了什麼問題?
Kafka Connect內部原理
《Simplifying data pipelines with Apache Kafka》課程第四章Kafka Consumer問題集

TAG:Kafka | 消息队列 | 分布式系统 |