《Kafka:The Definitive Guide》第四章Kafka Consumer問題集
有,有很多概念上的不同,比如offset。
2.為什麼有consumer group的概念?
因為一個consumer不夠用啊,當consumer有瓶頸的時候就需要開多個consumer,這時候這一組consumer就叫consumer group。
3.partition rebalance的過程?
如果開始有2個consumer,4個partition。
增加consumer到4個,此時進行rebalance。
如果再加1個consumer,那麼這個consumer就沒活幹了,因為一個partition只能由一個consumer來消費。4.如何設置partition的數量?要大於你的consumer group中consumer的個數。
5.為什麼需要多個consumer group?
因為可能有不同的系統需要消費這個topic。
6.什麼時候會partition rebalance?partition rebalalnce就是partition重新分配給consumer的過程。
a.有新的consumer加入的時候
b.有consumer掛了的時候
c.當添加了新的partition的時候
7.rebalance為系統的什麼特性保駕護航?
高可用性和可擴展性。
8.consumer之間的member關係(consumerGroup)是如何維護的?
依靠GroupCoordinator來維護。consumer定時給group coordinator發心跳。
實際上在api層,poll()就是發心跳的時機。
9.如果一段時間不發心跳會怎麼樣?
如果長時間不發心跳,那麼session timeout後,group coordinator就認為這個consumer掛了,然後進行partition rebalance。
10.consumer主動close和被動掛掉,對於rebalance有什麼不同的影響?
主動close,那麼group coordinator會馬上rebalance,這個partition的message會馬上被其它consumer處理。
而被動掛掉,需要session timeout,那麼可能有一段時間這個partition的數據沒有consumer處理,引起較大latency。
11.group leader是幹嘛的?
group leader相當於班長,group coordinator相當於班主任。
group coordinator負責維護consumer group的列表,然後把這個列表給到group leader,group leader進行partition balance的任務。
consumer的心跳是打到group coordinator的。
12.consumer.poll()除了可以拉到message,還有什麼用處?
必須不斷的poll()才能保持心跳。所以message的處理一定要快!
13.poll的timeout參數有什麼作用?
這個參數應該是個latency-throughout tradeoff。設的小一些可以減少latency,快速響應,增大則可以提高吞吐,但latency增大。
參數基本上都是tradeoff,要不然就不需要參數了,代碼里直接寫死最優參數就好了。
14.什麼是commit?
We call the action of updating the current position in the partition a commit.
15.consumer是如何commit的?
發送一條message給topic __consumer_offsets,message包含每個partition上的offset。
16.如果consumer crash可能會有什麼影響?
如果consumer已經消費的message沒有commit,那麼就會重複消費這部分message。
如果consumer沒有消費的message卻commit了,那麼這部分message就不會被真實消費了。
17.consumer有哪些管理offset的方式?
a.automatic commit
一個控制commit interval的參數,叫auto.commit.interval.ms。
那一定是每隔interval就會commit一次嘛?
不是,commit是在poll()的時候,判斷距離上次commit是否超過了inteval,超過則同時進行commit,沒超過則不commit。
b.commit sync
同步commit,即調用commitSync(),該函數是阻塞的,等待commit完成後返回。
這種方式就是latency小,但是無法達到高吞吐。
c.commit async
非同步commit,除了不阻塞意外,和同步commit的區別是同步commit會重試,比如網路有問題,那麼同步commit會重試到成功或失敗。而非同步不會重試。
原因是如果重試的話,可能會出現offset 3000先commit了,然後重試的offset 2000才到,這時候就會重複消費了。(即commit order問題)
所以是否sync/async,是一種reliable-throughout的權衡,一般大數據還是高吞吐更重要。reliable可以同步業務系統的冪等性達到。
18.那是用sync好還是async好呢?
一般來說,用async的方式沒有太大問題,因為即便中間的offset commit丟了,只要後邊有成功的commit就行了。
但是如果這是最後一次commit,之後就close了,或者rebalance了,那麼這次commit就很重,不能丟。所以最後這次commit是要同步commit的。所以一個經典的pattern是同時使用async和sync。
19.上面提到的commitAsync()和commitSync(),都是提交之前poll的所有message,那麼如果我們想commit指定的offset可以嘛?可以。
比如下面是每一千條commit一次。
20.可以消費指定offset的數據嘛?可以seek。一般用於把offset存到外部系統的時候。
21.如何保證consume exactly once呢?
先來看這個有問題的代碼,有可能consumer在store record之後掛掉,那麼就重複消費兩次這個消息:
如何保證只消費一次呢?就讓process store commit是一個原子操作就可以了。
然後在rebalance的時候,接收rebalance event,seek到db中保存的offset就可以了。
22.如果想保證exactly consume once,那麼只能把offset保存在db中嘛?commit給kafka的方式一定不行嘛?是的,commit的方式不行,因為exactly once是利用了db的transaction實現的,目前kafka沒有transaction。但是kafka在做這項功能了。23.consumer有哪些parameter?
fetch.min.bytes : latency-throughout tradeoff
fetch.max.wait.ms : latency-throughout tradeoff
max.partition.fetch.bytes: 如果太大的話處理時間過長,可能導致session timeout
session.timeout.ms : 默認3s
推薦閱讀:
※大數據平台開發人員的核心競爭力是什麼?
※Kafka Consumer消費能力較低時的解決方案
※Kafka猛然醒來,突然變成了資料庫
※Kafka(二)高可用系統設計心得
※Kafka Connect內部原理
TAG:Kafka |