標籤:

kafka-python基本使用

kafka-python基本使用

來自專欄 Python程序員

kafka-python為Apache Kafka的python客戶端。下面將介紹它的基本使用

1.Kafka及ZooKeeper的安裝

這裡將不累贅說明,參考 Apache Kafka

2.kafka-python的安裝

pip3 install kafka-python

3.kafka-python的基本使用

  • 最簡單使用實例

1.消費端

from kafka import KafkaConsumerconsumer = KafkaConsumer(my_topic, group_id= group2, bootstrap_servers= [localhost:9092])for msg in consumer: print(msg)

  • 第1個參數為 topic的名稱
  • group_id : 指定此消費者實例屬於的組名,可以不指定
  • bootstrap_servers : 指定kafka伺服器

2.生產端

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092])future = producer.send(my_topic , key= bmy_key, value= bmy_value, partition= 0)result = future.get(timeout= 10)print(result)

producer.send函數為發送消息

  • 第1個參數為 topic名稱,必須指定
  • key : 鍵,必須是位元組字元串,可以不指定(但key和value必須指定1個),默認為None
  • value : 值,必須是位元組字元串,可以不指定(但key和value必須指定1個),默認為None
  • partition : 指定發送的partition,由於kafka默認配置1個partition,固為0

future.get函數等待單條消息發送完成或超時,經測試,必須有這個函數,不然發送不出去,或用time.sleep代替

3.發送或接收消息解析

消費者端接收消息如下:

ConsumerRecord(topic=my_topic, partition=0, offset=4, timestamp=1529569531392, timestamp_type=0, key=bmy_value, value=None, checksum=None, serialized_key_size=8, serialized_value_size=-1)

  • topic
  • partition
  • offset : 這條消息的偏移量
  • timestamp : 時間戳
  • timestamp_type : 時間戳類型
  • key : key值,位元組類型
  • value : value值,位元組類型
  • checksum : 消息的校驗和
  • serialized_key_size : 序列化key的大小
  • serialized_value_size : 序列化value的大小,可以看到value=None時,大小為-1

KafkaConsumer

  • 手動分配partition

from kafka import KafkaConsumerfrom kafka import TopicPartitionconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092])consumer.assign([TopicPartition(topic= my_topic, partition= 0)])for msg in consumer: print(msg)

  • 超時處理

from kafka import KafkaConsumerconsumer = KafkaConsumer(my_topic, group_id= group2, bootstrap_servers= [localhost:9092], consumer_timeout_ms=1000)for msg in consumer: print(msg)

若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待

consumer_timeout_ms : 毫秒數

  • 訂閱多個topic

from kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092])consumer.subscribe(topics= [my_topic, topic_1])for msg in consumer: print(msg)

可同時接收多個topic消息

也可用正則訂閱一類topic

from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], value_deserializer=lambda m: json.loads(m.decode(ascii)))consumer.subscribe(pattern= ^my.*)for msg in consumer: print(msg)

  • 解碼json數據

編碼(生產者):value_serializer

解碼(消費者):value_deserializer

1.先看producer發送的json數據

from kafka import KafkaProducerimport jsonproducer = KafkaProducer(bootstrap_servers=[localhost:9092], value_serializer=lambda m: json.dumps(m).encode(ascii))future = producer.send(my_topic , value= {value_1 : value_2}, partition= 0)future.get(timeout= 10)

2.consumer沒有解碼收到的數據

ConsumerRecord(topic=my_topic, partition=0, offset=22, timestamp=1529575016310, timestamp_type=0, key=None, value=b{"value_1": "value_2"}, checksum=None, serialized_key_size=-1, serialized_value_size=22)

可以看到value為原始的json位元組數據,接下來可以再做一步解碼操作

3.consumer自動解碼

from kafka import KafkaConsumerimport jsonconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], value_deserializer=lambda m: json.loads(m.decode(ascii)))consumer.subscribe(topics= [my_topic, topic_1])for msg in consumer: print(msg)

接收結果:

ConsumerRecord(topic=my_topic, partition=0, offset=23, timestamp=1529575235994, timestamp_type=0, key=None, value={value_1: value_2}, checksum=None, serialized_key_size=-1, serialized_value_size=22)

  • 可以看到接收結果中,value已經自動解碼,並為字元串類型
  • 不僅value可以json,key也可以,只需指定 key_deserializer

KafkaProducer

  • 發送字元串類型的key和value

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092],key_serializer= str.encode, value_serializer= str.encode)future = producer.send(my_topic , key= key_3, value= value_3, partition= 0)future.get(timeout= 10)

指定 key_serializer 和 value_serializer 為 str.encode,但消費者收到的還是位元組字元串

若想要消費者收到的為字元串類型,就需要解碼操作,key_deserializer= bytes.decode

from kafka import KafkaConsumerconsumer = KafkaConsumer(group_id= group2, bootstrap_servers= [localhost:9092], key_deserializer= bytes.decode, value_deserializer= bytes.decode)consumer.subscribe(pattern= ^my.*)for msg in consumer: print(msg)

  • 可壓縮消息發送

compression_type=gzip

若消息過大,還可壓縮消息發送,可選值為 『gzip』, 『snappy』, 『lz4』, or None

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers=[localhost:9092], compression_type=gzip)future = producer.send(my_topic , key= bkey_3, value= bvalue_3, partition= 0)future.get(timeout= 10)

  • 發送msgpack消息

msgpack為MessagePack的簡稱,是高效二進位序列化類庫,比json高效

producer = KafkaProducer(value_serializer=msgpack.dumps)producer.send(msgpack-topic, {key: value})

參考文章

  • kafka-python - kafka-python 1.4.4.dev documentation

推薦閱讀:

Kafka 2017技術峰會摘要(流計算分類)
Kafka猛然醒來,突然變成了資料庫
對於kafka consumer,多進程和多線程哪種更合適?
Kafka濫用案例
從分散式看Kafka

TAG:Python | Kafka |