kafka的高可靠性實現方案及保障機制

kafka的高可靠性實現方案及保障機制

來自專欄大象跳舞4 人贊了文章

1.文件存儲機制

前面提到消息在邏輯上以topic存在,而在物理存儲上是partition,在實際存儲中partition是作為目錄存在於broker上。

假如server.properties中log.dirs和num.partitions配置如下:

log.dirs=/tmp/kafka-logs

num.partitions=4

同時創建一個名為my_test_topic的topic,則會在/tmp/kafka-logs/下面生成四個目錄,分別為:

my_test_topic-0

my_test_topic-1

my_test_topic-2

my_test_topic-3

這四個目錄對應就是my_test_topic的四個分區,由此也可以看到分區的命名規則為topic名稱+以0開頭的序號。

每個partition又包含多個segment文件,一個segment文件由.index文件和.log兩個文件組成,分別為segment的索引

文件和數據文件,這兩個文件的命名基於同一個topic下的所有partition,即每個partition下的所有segment文件命名

都不一樣。具體的命名規則為:partition全局的第一個segment從0開始,後續每個segment文件名為上一個segment文件最後一條消息的offset值,數值大小為64位,20位數字字元長度,沒有數字用0填充,如下:

00000000000000000000.index

00000000000000000000.log

00000000000000170410.index

00000000000000170410.log

00000000000000239430.index

00000000000000239430.log

index索引文件用來存儲元數據,log文件用來存儲消息,索引文件中的元數據指向數據文件中消息的物理偏移地址。

兩者關係如下圖示:

以index文件中(64 785)這組記錄為例,它表示在00000000000000239430.log這個log文件中是第64個消息,

在全局為第239494個消息,即offset,它的物理偏移地址為785。consumer使用offset從partition讀取消息依賴於上述關係。

舉例說明:

假如要讀取offset=239432的消息,使用二分查找定位到到00000000000000239430.index,然後根據偏移量定位到

消息的物理地址偏移量為10,然後讀取Message239432。因為消息中記錄了offset、magic、crc32、length等信息

所以可以直到在哪裡結束讀取。

2.消息發送機制

2.1消息發送模式

Kafka的發送模式由producer端的配置參數producer.type來設置,這個參數指定了在後台線程中消息的發送方式是同步的還是非同步的,默認是同步的方式,即producer.type=sync。如果設置成非同步的模式,即producer.type=async,可以是producer以batch的形式push數據,這樣會極大的提高broker的性能,但是這樣會增加丟失數據的風險。如果需要確保消息的可靠性,必須要將producer.type設置為sync。

對於非同步模式,有如下四個參數可以設置:

參數 | 描述

--- | --

queue.buffering.max.ms | 默認值:5000。啟用非同步模式是,producer緩存消息的時間。比如設置成1000,

它會緩存1s的數據然後發送出去,這樣可以極大的增加broker的吞吐量,但是會降低數據的失效性。

queue.buffering.max.message | 默認值:10000。producer緩存隊列里的最大緩存消息數,如果超過這個值,producer就會阻塞或者丟棄消息。

queue.enqueue.timeout.ms | 默認值:-1。當達到上面參數時producer會阻塞等待的的時間。如果設置為0,

隊列滿時producer不會阻塞,消息會被直接丟棄。若設置為-1,producer會阻塞,不會丟棄消息。

batch.num.message | 默認值:200。啟用非同步模式時,一個batch緩存消息的數量,當達到這個數值時producer才會發送。

2.2數據可靠性級別

當producer向leader發送數據時,可以通過request.required.acks參數來設置數據可靠性的級別。

參數設值 | 描述

--- | --

1(默認) | producer在數據replication的leader收到數據並且得到確認後發送下一條數據。如果leader宕機,消息丟失。

0 | producer無需確認leader的回復而繼續發送,這種情況下數據可靠性最低,但是傳輸效率最高。

-1 | producer需要等待replication中所有的follower都確認收到數據後才發送下一條數據。可靠性最高,但是當只有leader而沒有follower的時候和request.required.acks=1一樣。

3.副本同步機制

kafka每個topic的partition有N(N>=1)個副本,其中N由replica factor這個配置項決定。kafka通過多個副本實現故障

的自動轉移,當kafka集群中一個節點掛掉後可以保證服務仍然正常。多個副本中有一個副本為leader,其餘為follower。

leader負責處理針對該partition的所有讀寫請求,follower能自動的從leader複製數據。下圖是一個有4個broker、3個partition(同一個顏色深度為一個partition)、replica factor=3的kafka集群中leader和follower之間關係的示意圖。

關於leader和follower之間的數據複製,考慮下面幾個問題:

1.當leader掛掉後,會通過選舉機制從follower中產生一個新的leader。加入之前的leader已有的數據條數為5,被選舉

的follower只複製了其中三條,那麼consumer從新的leader獲取數據就會出現問題,這種情況下如何保證數據正確性?

2.若某個follower出現故障,導致複製數據出現異常,該如何保證系統可靠性?

針對上述問題,kafka引入了HW(HighWaterMark)和ISR(In Sync Replicas),下面分別介紹。

3.1 HW

和HW相關的還有LEO,LEO是LogEndOffset的縮寫,標識每個partition的log最後一條message的位置。HW是指

consumer能夠讀到的此partition最新的位置。

3.2 ISR

ISR是指副本同步,副本對於kafka的吞吐率有一定的影響,但是極大的增強了可用性,默認情況下Kafka的replica數量為1,即每個partition都有一個唯一的leader,為了確保消息的可靠性,通常應用中將其值(由broker的參數offsets.topic.replication.factor指定)大小設置為大於1,比如3。所有的副本(replicas)統稱為Assigned Replicas,即AR。ISR是AR中的一個子集,由leader維護ISR列表,follower從leader同步數據有一些延遲(包括延遲時間replica.lag.time.max.ms和延遲條數replica.lag.max.messages兩個維度, 當前最新的版本0.10.x中只支持replica.lag.time.max.ms這個維度),任意一個超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也會先存放在OSR中,AR=ISR+OSR。

3.3基於HW和ISR的消息同步

一個partition中取ISR中最小的LEO作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follower各自負責更新自己的HW的狀態。對於leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步後更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對於來自內部broKer的讀取請求,沒有HW的限制。

(1) follower和leader數據一致,HW和LEO也一致,無需複製。

(2)leader接收到producer寫入的數據,此時LEO移動,兩個follower開始複製數據。

(3)follower1完全複製了消息,follower2隻複製了一部分,HW移動一位。

(4)follower2完全複製消息,HW和LEO一致。

由此可見,Kafka的複製機制既不是完全的同步複製,也不是單純的非同步複製。事實上,同步複製要求所有能工作的follower都複製完,這條消息才會被commit,這種複製方式極大的影響了吞吐率。而非同步複製方式下,follower非同步的從leader複製數據,數據只要被leader寫入log就被認為已經commit,這種情況下如果follower都還沒有複製完,落後於leader時,突然leader宕機,則會丟失數據。而Kafka的這種使用ISR的方式則很好的均衡了確保數據不丟失以及吞吐率。

3.4文件截斷機制

如上圖,某個topic的某partition有三個副本,分別為A、B、C。A作為leader肯定是LEO最高,B緊隨其後,C機器由於配置比較低,網路比較差,故而同步最慢。這個時候A機器宕機,這時候如果B成為leader,假如沒有HW,在A重新恢復之後會做同步(makeFollower)操作,在宕機時log文件之後直接做追加操作,而假如B的LEO已經達到了A的LEO,會產生數據不一致的情況,所以使用HW來避免這種情況。 A在做同步操作的時候,先將log文件截斷到之前自己的HW的位置,即3,之後再從B中拉取消息進行同步。

如果失敗的follower恢復過來,它首先將自己的log文件截斷到上次checkpointed時刻的HW的位置,之後再從leader中同步消息。leader掛掉會重新選舉,新的leader會發送「指令」讓其餘的follower截斷至自身的HW的位置然後再拉取新的消息。

4.consumer消息保證

有三種級別的消息保證:

At most once: 消息可能會丟,但絕不會重複傳輸

At least once:消息絕不會丟,但可能會重複傳輸

* Exactly once:每條消息肯定會被傳輸一次且僅傳輸一次

(1)先commit offset再消費消息,如果在offset被commit之後但消息沒有被消費時consumer宕機,則會丟失消息,實現at most once。

(2)先消費消息,再commit offset,如果在offset被commit之前consumer宕機,當重啟時會重複消費。實現at least once。

(3)要實現exactly once則需要對commit offset和消息消費加上事務,或者對at least once加上去重機制。

推薦閱讀:

rabbitmq-延遲隊列
Rabbitmq系列之1--基礎概念
任務與消息隊列
Python操作rabbitmq系列(四):根據類型訂閱消息

TAG:RabbitMQ | 分散式系統 | Kafka |