Kafka Consumer消費能力較低時的解決方案
背景
隨著業務的發展,項目組有大量的任務需要處理。
這些任務需要主要分為兩種類型:
- 通過介面調用, 後台執行任務
- 通過調度系統定時執行
介面調用就需要執行任務不能阻塞, 不然系統的處理能力就會下降。任務調度系統需要在在一個最小的檢測粒度時間內,執行完所有任務。這兩種情況都面臨這樣一個問題, 任務不能阻塞,不然會非常影響性能。所以需要引入消息中間件,將任務派發方和任務執行方分離出來。
在這種情況下, 我們選擇了kafka作為了我們的消息中間件, 選擇kafka主要基於以下幾點:
- 支持分散式, 避免單點問題
- 技術方案成熟, 公司內部有上線項目
- 性能優異, 能夠持久化消息
遇到的問題
我們團隊在kafka使用上面都沒有經驗, 其他同事說kafka consumer在消費超時後會掉線,導致重複消費,當時沒有這個使用場景,不能理解這個概念。
第一次發現問題是在聯調的時候,任務執行方發現consumer會列印出錯誤日誌,重複消費,並且陷入循環。
當時很快定位到問題, consumer長時間沒有發送心跳包, 導致觸發rebalance操作, consumer被踢下線了。
對於這個問題,需要詳細講述一下kafka consumer相關的機制。
kafka為了保證partition分配的高效率, 使用了如下機制:
- 所有的consumer都要和coordinator連接
- coordinator選出一個consumer作為leader來分配partition
- leader分配完以後通知coordinator, 由coordinator來通知給其他consumer
- 如果一個consumer不能工作了, coordinator會觸發rebalance機制,重新分配partition
coordinator判定一個consumer不能工作, 依靠的就是heartbeat機制。consumer的配置裡面有一項是session_timeout,如果heartbeat不能在session_timeout時間內發出一次請求,coordinator就會觸發一次rebalance操作,重新分配partition。
從上面這樣看沒什麼問題,很多系統都是這麼設計的,一個工作線程,一個心跳包線程。但是kafka consumer為了設計上的簡單(或者是出於其他目的),他們只有一個線程,也就是說工作邏輯和心跳包邏輯是同步的。對於心跳包這種定時任務,他們使用了一種叫做delayed_task的方案。
delayed_task是Best-Effort的,為什麼這麼說呢,我們來看看delayed_task是在什麼時候工作的:
- 取出一批數據
- 執行delayed_task
- 循環yield 這批數據
- 重複執行上述過程
前面我們也說過, consumer只有一個線程, 也就意味著,如果主邏輯消耗了大量時間,delayed_task中的任務就會延期執行。在這種情況下, delayed_task只能保證任務不會提前進行,不能保證任務準時執行。拿一個具體的場景來說, 如果主邏輯花費了60s, 那麼delayed_task中的任務最早也只能在60s之後執行,像heartbeat任務就直接超時了。
在提出解決方案之前, 我們需要考慮一下幾個問題:
生產者速度大於消費者速度怎麼處理
如果生產者速度大於消費者速度,消息就會積累。常規的解決方案是增加partition,增加消費者數量,但是在某一些場景下卻不能這麼實現。思考一下,如果生產者的速度不是恆定的,而是波動的,並且波峰和波谷差距比較大,大部分時間出於波谷,這樣在波谷時其實資源是閑置的,並且會降低消費速度。另外對於消費的實時性比較高的場景,如果短時間內消息被積壓,縱然最後能夠消費掉,但是已經過了有效期,這樣的消費其實是無效的。
所以我們必須有能力知道兩個數據,即當前隊列剩餘的消息的數量和當前消息產生的時間。
在消費速度不一致的情況下如何提交offset
kafka-consumer的offset的提交機制是定時向delayed_task裡面加入一個AutoCommitTask。但是在消費者消費速度不均衡的情況下不能這麼做,如果消費者消費速度比較快,定時提交offset的機制會使得一旦consumer宕機,會丟失一大批消費信息。
同時我們也不能單純的以消費數量作為是否提交的閾值,在消費者比較消費速率比較慢的情況下,一旦consumer宕機,我們會耗費大量時間在無用的消費上面。
所以我們需要同時衡量數量和時間兩個變數,作為我們是否提交的閾值offset提交失敗該怎麼處理
consumer的offset提交是按照TopicPartition作為提交單元的。在consumer消費過程中,可能會發生reblance事件,如果當前consumer分配到的partition數量大於1個,可能這個partition會被分配給其他的consumer。在這個過程中,consumer已經消費了該條數據,那麼在提交offset的時候,就會遇到CommitOffsetError,因為這個partition已經不屬於自己了。
這種情況下該如何處理這些數據解決方案
帶著上面的一些問題,我們開始著手提出解決方案。
從上面的分析可以看出來, consumer掉線的最主要問題就是delayed_task和主函數出於同一個工作線程中,那麼最直觀的解決方法就是將這兩個分離出來。
由於python GIL的限制,加上kafka consumer 是線程不安全的, 所以我們使用多進程來解決這個問題。
在consumer中,除了迭代器_message_generator之外,還提供了一個poll函數。這個函數和迭代器功能差不多,也能夠獲取消息,同時也會執行delayed_task。不同之處是, 這個函數會一次性返回一批數據,這樣我們就有能力統計剩下的消息的數量。同時我們要求在producer發送消息的時候,一定要帶上create_time這個欄位,標註消息產生的時間。客戶端現在同時能獲取數量和時間兩個參數,對於實時性要求比較高的場景,他就可以選擇性的丟棄一批不滿足要求的數據。
當消費者消費速度比較低的時候,我們需要停止獲取數據,但是同時不能停下delayed_task。幸運的是,consumer提供了一個pause的函數,可以讓我們停止對應的partition。一旦使用pause函數,poll函數將不會返回任何數據,單他依然會執行delayed_task。
由於我們使用poll函數一次性返回多個數據,加上在消費速度不均衡的情況下offset管理的問題。所以我們必須要手動管理offset, 保存我們上次提交offset的時間和未提交offset的數量,一旦其中某一個達到閾值,就真正的提交offset。
當我們提交offset失敗的時候,我們需要清除對應的partition的所有數據,防止consumer做無用消費。
綜合上面,我們就有能力構造出一個強健的consumer客戶端,方便其他同學來使用。
核心代碼
while True:n topic_records = self.consumer.poll().values()n if not topic_records:n self.get_offset()n time.sleep(self.config[idle_timeout])n self.consumer.pause(*self.consumer.assigment())n paused = Truen for records in topic_records:n remain = len(records)n for record in records:n while True:n data = {"record":record, "remain": remain}n try:n self.task_queue.put(data, self.config[block_timeout])n remain -= 1n breakn except Full:n self.consumer.poll()n self.get_offset()n if self.task_queue.qsize < self.config[resumen_count] and paused:n partitions = self.consumer.paused()n if partitions:n self.consumer.resume(*partitions)n
推薦閱讀:
※Kafka Connect內部原理
※Kafka 安裝及快速入門
※Kafka濫用案例
TAG:Kafka |