標籤:

Kafka的offset retention

在消費kafka message的時候,發現如果某個partition一天都沒有消費的話,它的offset就變為None了:

查了一下,原來是offset也有retention機制,默認是一天就回收了。

所以消費者的代碼又改成了下邊這個樣子,一個消費者這麼多代碼。。。醜死。。。

def run(self): n consumer = KafkaConsumer(n bootstrap_servers=conf.KAFKA_CONF[host],n group_id=self.group_id,n enable_auto_commit=False,n auto_offset_reset=latest,n api_version=(0, 9)n )n n consumer.subscribe([self.topic])nn last_commit_time = datetime.datetime.now()n offset_commit_interval = 300n n while not self.exit.is_set():n poll_msgs = consumer.poll(timeout_ms=5000, max_records=1)nn now = datetime.datetime.now()n if (now - last_commit_time).total_seconds() > offset_commit_interval:n assigned_partitions = consumer.assignment()nn for partition in assigned_partitions:n committed_offset = consumer.committed(partition) or 0n try:n consumer.commit({partition: OffsetAndMetadata(committed_offset, None)})n except Exception, e:n Log.error(e)n Log.info(interval commit: partition:%s, offset:%s % (partition, committed_offset))nn last_commit_time = nownn for partition in poll_msgs:nn msgs = poll_msgs[partition]n if not msgs:n continuenn offset = consumer.committed(partition) or 0n Log.info(before %s:%s % (partition, consumer.committed(partition)))n try:n consumer.commit({partition: OffsetAndMetadata(offset+1, None)})n except Exception, e:n Log.error(e)n continuen Log.info(after %s:%s % (partition, consumer.committed(partition)))nn for msg in msgs:n self.process(msg) n

推薦閱讀:

如何使用Kafka在生產環境構建大規模機器學習
Kafka Connect內部原理
《Simplifying data pipelines with Apache Kafka》課程第四章Kafka Consumer問題集

TAG:Kafka |