Kafka濫用案例
之前使用kafka來做隊列,但是當我重構時發現,worker處理一個message的時間有時候需要幾分鐘,處理完茶都涼了,consumer和partition的連接都斷了,沒法commit了。
試了很多方法都不行,最後改成了下邊這個醜樣子:
def run(self):n consumer = KafkaConsumer(n bootstrap_servers=conf.KAFKA_CONF[host],n group_id=self.group_id,n enable_auto_commit=False,n auto_offset_reset=earliest,n api_version=(0, 9)n ) n n consumer.subscribe([self.topic])n n while not self.exit.is_set():n poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)n for partition in poll_msgs:n offset = consumer.committed(partition)n try:n consumer.commit({partition: OffsetAndMetadata(offset+1, None)})n except Exception, e:n Log.error(e)n continuenn msgs = poll_msgs[partition]n for msg in msgs:n self.process(msg)n n consumer.close()n Log.info(%s shutdown % self.name)n
也就是在處理前先commit,如果commit不了了則不處理該partition的任務,等到下一輪poll的時候(poll的時候partition就又連上了)再commit和處理該分區的任務。
真丑。
這個問題的本質在於kafka並不適合我們這個業務場景,kafka的設計是針對實時,高吞吐的場景,它設計時候就不會考慮一個message處理幾分鐘的情況,所以用起來會變扭死。
正確的做法是技術選型的時候就不應該用Kafka,應該換用其它隊列比如redis。
推薦閱讀:
※kafka解決了什麼問題?
※如何在.net中使用Apache Kafka
※Kafka(二)高可用系統設計心得
※Kafka Connect內部原理
※如何使用Kafka在生產環境構建大規模機器學習
TAG:Kafka |