標籤:

《Kafka:The Definitive Guide》第四章Kafka Consumer問題集

1. Kafka consumer和其它隊列中間件的使用上有很大區別嘛?

有,有很多概念上的不同,比如offset。

2.為什麼有consumer group的概念?

因為一個consumer不夠用啊,當consumer有瓶頸的時候就需要開多個consumer,這時候這一組consumer就叫consumer group。

3.partition rebalance的過程?

如果開始有2個consumer,4個partition。

增加consumer到4個,此時進行rebalance。

如果再加1個consumer,那麼這個consumer就沒活幹了,因為一個partition只能由一個consumer來消費。

4.如何設置partition的數量?

要大於你的consumer group中consumer的個數。

5.為什麼需要多個consumer group?

因為可能有不同的系統需要消費這個topic。

6.什麼時候會partition rebalance?

partition rebalalnce就是partition重新分配給consumer的過程。

a.有新的consumer加入的時候

b.有consumer掛了的時候

c.當添加了新的partition的時候

7.rebalance為系統的什麼特性保駕護航?

高可用性和可擴展性。

8.consumer之間的member關係(consumerGroup)是如何維護的?

依靠GroupCoordinator來維護。consumer定時給group coordinator發心跳。

實際上在api層,poll()就是發心跳的時機。

9.如果一段時間不發心跳會怎麼樣?

如果長時間不發心跳,那麼session timeout後,group coordinator就認為這個consumer掛了,然後進行partition rebalance。

10.consumer主動close和被動掛掉,對於rebalance有什麼不同的影響?

主動close,那麼group coordinator會馬上rebalance,這個partition的message會馬上被其它consumer處理。

而被動掛掉,需要session timeout,那麼可能有一段時間這個partition的數據沒有consumer處理,引起較大latency。

11.group leader是幹嘛的?

group leader相當於班長,group coordinator相當於班主任。

group coordinator負責維護consumer group的列表,然後把這個列表給到group leader,group leader進行partition balance的任務。

consumer的心跳是打到group coordinator的。

12.consumer.poll()除了可以拉到message,還有什麼用處?

必須不斷的poll()才能保持心跳。所以message的處理一定要快!

13.poll的timeout參數有什麼作用?

這個參數應該是個latency-throughout tradeoff。設的小一些可以減少latency,快速響應,增大則可以提高吞吐,但latency增大。

參數基本上都是tradeoff,要不然就不需要參數了,代碼里直接寫死最優參數就好了。

14.什麼是commit?

We call the action of updating the current position in the partition a commit.

15.consumer是如何commit的?

發送一條message給topic __consumer_offsets,message包含每個partition上的offset。

16.如果consumer crash可能會有什麼影響?

如果consumer已經消費的message沒有commit,那麼就會重複消費這部分message。

如果consumer沒有消費的message卻commit了,那麼這部分message就不會被真實消費了。

17.consumer有哪些管理offset的方式?

a.automatic commit

一個控制commit interval的參數,叫auto.commit.interval.ms。

那一定是每隔interval就會commit一次嘛?

不是,commit是在poll()的時候,判斷距離上次commit是否超過了inteval,超過則同時進行commit,沒超過則不commit。

b.commit sync

同步commit,即調用commitSync(),該函數是阻塞的,等待commit完成後返回。

這種方式就是latency小,但是無法達到高吞吐。

c.commit async

非同步commit,除了不阻塞意外,和同步commit的區別是同步commit會重試,比如網路有問題,那麼同步commit會重試到成功或失敗。而非同步不會重試。

原因是如果重試的話,可能會出現offset 3000先commit了,然後重試的offset 2000才到,這時候就會重複消費了。(即commit order問題)

所以是否sync/async,是一種reliable-throughout的權衡,一般大數據還是高吞吐更重要。reliable可以同步業務系統的冪等性達到。

18.那是用sync好還是async好呢?

一般來說,用async的方式沒有太大問題,因為即便中間的offset commit丟了,只要後邊有成功的commit就行了。

但是如果這是最後一次commit,之後就close了,或者rebalance了,那麼這次commit就很重,不能丟。所以最後這次commit是要同步commit的。所以一個經典的pattern是同時使用async和sync。

19.上面提到的commitAsync()和commitSync(),都是提交之前poll的所有message,那麼如果我們想commit指定的offset可以嘛?

可以。

比如下面是每一千條commit一次。

20.可以消費指定offset的數據嘛?

可以seek。一般用於把offset存到外部系統的時候。

21.如何保證consume exactly once呢?

先來看這個有問題的代碼,有可能consumer在store record之後掛掉,那麼就重複消費兩次這個消息:

如何保證只消費一次呢?就讓process store commit是一個原子操作就可以了。

然後在rebalance的時候,接收rebalance event,seek到db中保存的offset就可以了。

22.如果想保證exactly consume once,那麼只能把offset保存在db中嘛?commit給kafka的方式一定不行嘛?

是的,commit的方式不行,因為exactly once是利用了db的transaction實現的,目前kafka沒有transaction。但是kafka在做這項功能了。

23.consumer有哪些parameter?

fetch.min.bytes : latency-throughout tradeoff

fetch.max.wait.ms : latency-throughout tradeoff

max.partition.fetch.bytes: 如果太大的話處理時間過長,可能導致session timeout

session.timeout.ms : 默認3s

推薦閱讀:

大數據平台開發人員的核心競爭力是什麼?
Kafka Consumer消費能力較低時的解決方案
Kafka猛然醒來,突然變成了資料庫
Kafka(二)高可用系統設計心得
Kafka Connect內部原理

TAG:Kafka |