標籤:

Kafka濫用案例

最近再做一個master-worker式的任務調度程序,這個第一版是同事做的,我負責重構。代碼結構在這篇文章里

之前使用kafka來做隊列,但是當我重構時發現,worker處理一個message的時間有時候需要幾分鐘,處理完茶都涼了,consumer和partition的連接都斷了,沒法commit了。

試了很多方法都不行,最後改成了下邊這個醜樣子:

def run(self):n consumer = KafkaConsumer(n bootstrap_servers=conf.KAFKA_CONF[host],n group_id=self.group_id,n enable_auto_commit=False,n auto_offset_reset=earliest,n api_version=(0, 9)n ) n n consumer.subscribe([self.topic])n n while not self.exit.is_set():n poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)n for partition in poll_msgs:n offset = consumer.committed(partition)n try:n consumer.commit({partition: OffsetAndMetadata(offset+1, None)})n except Exception, e:n Log.error(e)n continuenn msgs = poll_msgs[partition]n for msg in msgs:n self.process(msg)n n consumer.close()n Log.info(%s shutdown % self.name)n

也就是在處理前先commit,如果commit不了了則不處理該partition的任務,等到下一輪poll的時候(poll的時候partition就又連上了)再commit和處理該分區的任務。

真丑。

這個問題的本質在於kafka並不適合我們這個業務場景,kafka的設計是針對實時,高吞吐的場景,它設計時候就不會考慮一個message處理幾分鐘的情況,所以用起來會變扭死。

正確的做法是技術選型的時候就不應該用Kafka,應該換用其它隊列比如redis。

推薦閱讀:

kafka解決了什麼問題?
如何在.net中使用Apache Kafka
Kafka(二)高可用系統設計心得
Kafka Connect內部原理
如何使用Kafka在生產環境構建大規模機器學習

TAG:Kafka |