kafka-python基本使用
來自專欄 Python程序員
kafka-python為Apache Kafka的python客戶端。下面將介紹它的基本使用
1.Kafka及ZooKeeper的安裝
這裡將不累贅說明,參考 Apache Kafka
2.kafka-python的安裝
pip3 install kafka-python
3.kafka-python的基本使用
- 最簡單使用實例
1.消費端
from kafka import KafkaConsumerconsumer = KafkaConsumer(my_topic, group_id= group2, bootstrap_servers= [localhost:9092])for msg in consumer: print(msg)
- 第1個參數為 topic的名稱
- group_id : 指定此消費者實例屬於的組名,可以不指定
- bootstrap_servers : 指定kafka伺服器
2.生產端
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092])future = producer.send(my_topic , key= bmy_key, value= bmy_value, partition= 0)result = future.get(timeout= 10)print(result)
producer.send函數為發送消息
- 第1個參數為 topic名稱,必須指定
- key : 鍵,必須是位元組字元串,可以不指定(但key和value必須指定1個),默認為None
- value : 值,必須是位元組字元串,可以不指定(但key和value必須指定1個),默認為None
- partition : 指定發送的partition,由於kafka默認配置1個partition,固為0
future.get函數等待單條消息發送完成或超時,經測試,必須有這個函數,不然發送不出去,或用time.sleep代替
3.發送或接收消息解析
消費者端接收消息如下:
ConsumerRecord(topic=my_topic, partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=bmy_value, value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)
- topic
- partition
- offset : 這條消息的偏移量
- timestamp : 時間戳
- timestamp_type : 時間戳類型
- key : key值,位元組類型
- value : value值,位元組類型
- checksum : 消息的校驗和
- serialized_key_size : 序列化key的大小
- serialized_value_size : 序列化value的大小,可以看到value=None時,大小為-1
KafkaConsumer
- 手動分配partition
from kafka import KafkaConsumerfrom kafka import TopicPartitionconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092])consumer.assign([TopicPartition(topic= my_topic, partition= 0)])for msg in consumer: print(msg)
- 超時處理
from kafka import KafkaConsumerconsumer = KafkaConsumer(my_topic, group_id= group2, bootstrap_servers= [localhost:9092], consumer_timeout_ms=1000)for msg in consumer: print(msg)
若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待
consumer_timeout_ms : 毫秒數
- 訂閱多個topic
from kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092])consumer.subscribe(topics= [my_topic, topic_1])for msg in consumer: print(msg)
可同時接收多個topic消息
也可用正則訂閱一類topic
from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], value_deserializer=lambda m: json.loads(m.decode(ascii)))consumer.subscribe(pattern= ^my.*)for msg in consumer: print(msg)
- 解碼json數據
編碼(生產者):value_serializer
解碼(消費者):value_deserializer
1.先看producer發送的json數據
from kafka import KafkaProducerimport jsonproducer = KafkaProducer(bootstrap_servers=[localhost:9092], value_serializer=lambda m: json.dumps(m).encode(ascii))future = producer.send(my_topic , value= {value_1 : value_2}, partition= 0)future.get(timeout= 10)
2.consumer沒有解碼收到的數據
ConsumerRecord(topic=my_topic, partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b{"value_1": "value_2"}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
可以看到value為原始的json位元組數據,接下來可以再做一步解碼操作
3.consumer自動解碼
from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], value_deserializer=lambda m: json.loads(m.decode(ascii)))consumer.subscribe(topics= [my_topic, topic_1])for msg in consumer: print(msg)
接收結果:
ConsumerRecord(topic=my_topic, partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={value_1: value_2}, checksum=None, serialized_key_size=-1, serialized_value_size=22)
- 可以看到接收結果中,value已經自動解碼,並為字元串類型
- 不僅value可以json,key也可以,只需指定 key_deserializer
KafkaProducer
- 發送字元串類型的key和value
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092],key_serializer= str.encode, value_serializer= str.encode)future = producer.send(my_topic , key= key_3, value= value_3, partition= 0)future.get(timeout= 10)
指定 key_serializer 和 value_serializer 為 str.encode,但消費者收到的還是位元組字元串
若想要消費者收到的為字元串類型,就需要解碼操作,key_deserializer= bytes.decode
from kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], key_deserializer= bytes.decode, value_deserializer= bytes.decode)consumer.subscribe(pattern= ^my.*)for msg in consumer: print(msg)
- 可壓縮消息發送
compression_type=gzip
若消息過大,還可壓縮消息發送,可選值為 『gzip』, 『snappy』, 『lz4』, or None
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092], compression_type=gzip)future = producer.send(my_topic , key= bkey_3, value= bvalue_3, partition= 0)future.get(timeout= 10)
- 發送msgpack消息
msgpack為MessagePack的簡稱,是高效二進位序列化類庫,比json高效
producer = KafkaProducer(value_serializer=msgpack.dumps)producer.send(msgpack-topic, {key: value})
參考文章
- kafka-python - kafka-python 1.4.4.dev documentation
推薦閱讀:
※Kafka 2017技術峰會摘要(流計算分類)
※Kafka猛然醒來,突然變成了資料庫
※對於kafka consumer,多進程和多線程哪種更合適?
※Kafka濫用案例
※從分散式看Kafka