Kafka入門簡介

本文簡單的介紹下kafka,主要包含以下部分:

  • 什麼是Kafka
  • Kafka的基本概念
  • Kafka分散式架構
  • 配置單機版Kafka
  • 實驗一:kafka-python實現生產者消費者
  • 實驗二:消費組實現容錯性機制
  • 實驗三:offset管理

什麼是Kafka

Kafka是一個分散式流處理系統,流處理系統使它可以像消息隊列一樣publish或者subscribe消息,分散式提供了容錯性,並發處理消息的機制。

Kafka的基本概念

kafka運行在集群上,集群包含一個或多個伺服器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時間戳(timestamp)。

kafka有以下一些基本概念:

Producer - 消息生產者,就是向kafka broker發消息的客戶端。

Consumer - 消息消費者,是消息的使用方,負責消費Kafka伺服器上的消息。

Topic - 主題,由用戶定義並配置在Kafka伺服器,用於建立Producer和Consumer之間的訂閱關係。生產者發送消息到指定的Topic下,消息者從這個Topic下消費消息。

Partition - 消息分區,一個topic可以分為多個 partition,每個

partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的

id(offset)。

Broker - 一台kafka伺服器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

Consumer Group - 消費者分組,用于歸組同類消費者。每個consumer屬於一個特定的consumer group,多個消費者可以共同消息一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者就組成了一個分組,擁有同一個分組名稱,通常也被稱為消費者集群。

Offset - 消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來指定要消費的消息。

Kafka分散式架構

如上圖所示,kafka將topic中的消息存在不同的partition中。如果存在鍵值(key),消息按照鍵值(key)做分類存在不同的partiition中,如果不存在鍵值(key),消息按照輪詢(Round Robin)機制存在不同的partition中。默認情況下,鍵值(key)決定了一條消息會被存在哪個partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)來指定消息的位置。一個topic的一個partition只能被一個consumer group中的一個consumer消費,多個consumer消費同一個partition中的數據是不允許的,但是一個consumer可以消費多個partition中的數據。

kafka將partition的數據複製到不同的broker,提供了partition數據的備份。每一個partition都有一個broker作為leader,若干個broker作為follower。所有的數據讀寫都通過leader所在的伺服器進行,並且leader在不同broker之間複製數據。

上圖中,對於Partition 0,broker 1是它的leader,broker 2和broker 3是follower。對於Partition 1,broker 2是它的leader,broker 1和broker 3是follower。

在上圖中,當有Client(也就是Producer)要寫入數據到Partition 0時,會寫入到leader Broker 1,Broker 1再將數據複製到follower Broker 2和Broker 3。

在上圖中,Client向Partition 1中寫入數據時,會寫入到Broker 2,因為Broker 2是Partition 1的Leader,然後Broker 2再將數據複製到follower Broker 1和Broker 3中。

上圖中的topic一共有3個partition,對每個partition的讀寫都由不同的broker處理,因此總的吞吐量得到了提升。

配置單機版Kafka

這裡我們使用kafka 0.10.0.0版本。

第一步:下載並解壓包

$ wget https://archive.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgzn$ tar -xzf kafka_2.11-0.10.0.0.tgzn$ cd kafka_2.11-0.10.0.0n

第二步:啟動Kafka

kafka需要用到zookeeper,所以需要先啟動zookeeper。我們這裡使用下載包里自帶的單機版zookeeper。

$ bin/zookeeper-server-start.sh config/zookeeper.propertiesn[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)n...n

然後啟動kafka

$ bin/kafka-server-start.sh config/server.propertiesn[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)n[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)n...n

第三步:創建topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testn

查看創建的topic

$ bin/kafka-topics.sh --list --zookeeper localhost:2181ntestn

第四步:向topic中發送消息

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testnThis is a messagenThis is another messagen

第五步:從topicc中消費消息

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningnThis is a messagenThis is another messagen

實驗一:kafka-python實現生產者消費者

kafka-python是一個python的Kafka客戶端,可以用來向kafka的topic發送消息、消費消息。

這個實驗會實現一個producer和一個consumer,producer向kafka發送消息,consumer從topic中消費消息。結構如下圖

producer代碼

# producer.pynimport timenfrom kafka import KafkaProducernnproducer = KafkaProducer(bootstrap_servers="localhost:9092")ni = 0nwhile True:n ts = int(time.time() * 1000)n producer.send(topic="test", value=str(i), key=str(i), timestamp_ms=ts)n producer.flush()n print in i += 1n time.sleep(1)n

consumer代碼

# consumer.pynfrom kafka import KafkaConsumernnconsumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"])nfor message in consumer:n print messagen

接下來創建test topic

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testnCreated topic "test".n

打開兩個窗口中,我們在window1中運行producer,如下

# window1n$ python producer.pyn0n1n2n3n4n5n...n

在window2中運行consumer,如下

# window2n$ python consumer.pynConsumerRecord(topic=utest, partition=0, offset=128, timestamp=1512554839806, timestamp_type=0, key=128, value=128, checksum=-1439508774, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=129, timestamp=1512554840827, timestamp_type=0, key=129, value=129, checksum=1515993224, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=130, timestamp=1512554841834, timestamp_type=0, key=130, value=130, checksum=453490213, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=131, timestamp=1512554842841, timestamp_type=0, key=131, value=131, checksum=-632119731, serialized_key_size=3, serialized_value_size=3)n...n

可以看到window2中的consumer成功的讀到了producer寫入的數據

實驗二:消費組實現容錯性機制

這個實驗將展示消費組的容錯性的特點。這個實驗中將創建一個有2個partition的topic,和2個consumer,這2個consumer共同消費同一個topic中的數據。結構如下所示

producer部分代碼和實驗一相同,這裡不再重複。consumer需要指定所屬的consumer group,代碼如下

# consumer.pynfrom kafka import KafkaConsumernnconsumer = KafkaConsumer("test", bootstrap_servers=["localhost:9092"], group_id="testgoup")nfor message in consumer:n print messagen

接下來我們創建topic,名字test,設置partition數量為2

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testnCreated topic "test".n

打開三個窗口,一個窗口運行producer,還有兩個窗口運行consumer。

運行consumer的兩個窗口的輸出如下:

# window1n$ python consumer.pynConsumerRecord(topic=utest, partition=0, offset=11, timestamp=1512556619298, timestamp_type=0, key=15, value=15, checksum=-1492440752, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=12, timestamp=1512556621308, timestamp_type=0, key=17, value=17, checksum=-1029407634, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=13, timestamp=1512556622316, timestamp_type=0, key=18, value=18, checksum=1544755853, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=14, timestamp=1512556624326, timestamp_type=0, key=20, value=20, checksum=2130557725, serialized_key_size=2, serialized_value_size=2)n...nnn# window2n$ python consumer.pynConsumerRecord(topic=utest, partition=1, offset=6, timestamp=1512556617287, timestamp_type=0, key=13, value=13, checksum=-1494513008, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=7, timestamp=1512556618293, timestamp_type=0, key=14, value=14, checksum=-1499251221, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=8, timestamp=1512556620303, timestamp_type=0, key=16, value=16, checksum=-783427375, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=9, timestamp=1512556623321, timestamp_type=0, key=19, value=19, checksum=-1902514040, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=10, timestamp=1512556626337, timestamp_type=0, key=22, value=22, checksum=782849423, serialized_key_size=2, serialized_value_size=2)n...n

可以看到兩個consumer同時運行的情況下,它們分別消費不同partition中的數據。window1中的consumer消費partition 0中的數據,window2中的consumer消費parition 1中的數據。

我們嘗試關閉window1中的consumer,可以看到如下結果

# window2nnConsumerRecord(topic=utest, partition=1, offset=105, timestamp=1512557514410, timestamp_type=0, key=46, value=46, checksum=-1821060627, serialized_key_siz e=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=106, timestamp=1512557518428, timestamp_type=0, key=50, value=50, checksum=281004575, serialized_key_size= 2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=107, timestamp=1512557521442, timestamp_type=0, key=53, value=53, checksum=1245067939, serialized_key_size =2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=1, offset=108, timestamp=1512557525461, timestamp_type=0, key=57, value=57, checksum=-1003840299, serialized_key_siz e=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=98, timestamp=1512557494325, t imestamp_type=0, key=26, value=26, checksum=-1576244323, serialized_key_size =2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=99, timestamp=1512557495329, t imestamp_type=0, key=27, value=27, checksum=510530536, serialized_key_size=2 , serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=100, timestamp=1512557502360, timestamp_type=0, key=34, value=34, checksum=1781705793, serialized_key_size =2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=101, timestamp=1512557504368, timestamp_type=0, key=36, value=36, checksum=2142677730, serialized_key_size =2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=102, timestamp=1512557505372, timestamp_type=0, key=37, value=37, checksum=-1376259357, serialized_key_siz e=2, serialized_value_size=2)n...n

剛開始window2中的consumer只消費partition1中的數據,當window1中的consumer退出後,window2中的consumer中也開始消費partition 0中的數據了。

實驗三:offset管理

kafka允許consumer將當前消費的消息的offset提交到kafka中,這樣如果consumer因異常退出後,下次啟動仍然可以從上次記錄的offset開始向後繼續消費消息。

這個實驗的結構和實驗一的結構是一樣的,使用一個producer,一個consumer,test topic的partition數量設為1。

producer的代碼和實驗一中的一樣,這裡不再重複。consumer的代碼稍作修改,這裡consumer中列印出下一個要被消費的消息的offset。consumer代碼如下

from kafka import KafkaConsumer, TopicPartitionnntp = TopicPartition("test", 0)nconsumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="testgoup", auto_offset_reset="earliest", enable_auto_commit=False)nconsumer.assign([tp])nprint "starting offset is", consumer.position(tp)nfor message in consumer:n print messagen

在一個窗口中啟動producer,在另一個窗口並且啟動consumer。consumer的輸出如下

$ python consumer.pynstart offset is 98nConsumerRecord(topic=utest, partition=0, offset=98, timestamp=1512558902904, timestamp_type=0, key=98, value=98, checksum=-588818519, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=99, timestamp=1512558903909, timestamp_type=0, key=99, value=99, checksum=1042712647, serialized_key_size=2, serialized_value_size=2)nConsumerRecord(topic=utest, partition=0, offset=100, timestamp=1512558904915, timestamp_type=0, key=100, value=100, checksum=-838622723, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=101, timestamp=1512558905920, timestamp_type=0, key=101, value=101, checksum=-2020362485, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=102, timestamp=1512558906926, timestamp_type=0, key=102, value=102, checksum=-345378749, serialized_key_size=3, serialized_value_size=3)n...n

可以嘗試退出consumer,再啟動consumer。每一次重新啟動,consumer都是從offset=98的消息開始消費的。

修改consumer的代碼如下,在consumer消費每一條消息後將offset提交回kafka

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadatanntp = TopicPartition("test2", 0)nconsumer = KafkaConsumer(bootstrap_servers=["localhost:9092"], group_id="testgoup", auto_offset_reset="earliest", enable_auto_commit=False)nconsumer.assign([tp])nprint "start offset is ", consumer.position(tp)nfor message in consumer:n print messagen consumer.commit(message.offset + 1)n

啟動consumer

$ python consumer.pynstart offset is 98nConsumerRecord(topic=utest, partition=0, offset=98, timestamp=1512559632153, timestamp_type=0, key=824, value=824, checksum=828849435, serialized_key_size=3, serialized_value_size=3)n...nConsumerRecord(topic=utest, partition=0, offset=827, timestamp=1512559635164, timestamp_type=0, key=827, value=827, checksum=442222330, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=828, timestamp=1512559636169, timestamp_type=0, key=828, value=828, checksum=-267344764, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=829, timestamp=1512559637173, timestamp_type=0, key=829, value=829, checksum=1225853586, serialized_key_size=3, serialized_value_size=3)n

可以看到consumer從offset=98的消息開始消費,到offset=829時,我們Ctrl+C退出consumer。

我們再次啟動consumer

$ python consumer.pynstart offset is 830nConsumerRecord(topic=utest, partition=0, offset=830, timestamp=1512559638177, timestamp_type=0, key=830, value=830, checksum=1003305652, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=831, timestamp=1512559639181, timestamp_type=0, key=831, value=831, checksum=-361607666, serialized_key_size=3, serialized_value_size=3)nConsumerRecord(topic=utest, partition=0, offset=832, timestamp=1512559640185, timestamp_type=0, key=832, value=832, checksum=-345891932, serialized_key_size=3, serialized_value_size=3)n...n

可以看到重新啟動後,consumer從上一次記錄的offset開始繼續消費消息。之後每一次consumer重新啟動,consumer都會從上一次停止的地方繼續開始消費。

總結

本文主要介紹了一下kafka的基本概念,並結合一些實驗幫助理解kafka中的一些難點,如多個consumer的容錯性機制,offset管理。

引用資料

kafka-python在線文檔 - kafka-python - kafka-python 1.3.6.dev documentation

kafka官方文檔 - Apache Kafka

推薦閱讀:

基於AMQP實現的golang消息隊列MaxQ
目前linux進程間通信的常用方法是什麼(pipe?信號量?消息隊列?)?
LocalMQ:從零構建類 RocketMQ 高性能消息隊列
Kafka,Mq,Redis作為消息隊列使用時的差異?

TAG:Kafka | Python | 消息队列 |