Python操作分散式流處理系統Kafka

什麼是Kafka

Kafka是一個分散式流處理系統,流處理系統使它可以像消息隊列一樣publish或者subscribe消息,分散式提供了容錯性,並發處理消息的機制。

Kafka的基本概念

kafka運行在集群上,集群包含一個或多個伺服器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時間戳(timestamp)。

kafka有以下一些基本概念:

Producer-消息生產者,就是向kafkabroker發消息的客戶端。

Consumer-消息消費者,是消息的使用方,負責消費Kafka伺服器上的消息。

Topic-主題,由用戶定義並配置在Kafka伺服器,用於建立Producer和Consumer之間的訂閱關係。生產者發送消息到指定的Topic下,消息者從這個Topic下消費消息。

Partition-消息分區,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。

Broker-一台kafka伺服器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

ConsumerGroup-消費者分組,用于歸組同類消費者。每個consumer屬於一個特定的consumergroup,多個消費者可以共同消息一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者就組成了一個分組,擁有同一個分組名稱,通常也被稱為消費者集群。

Offset-消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來指定要消費的消息。

Kafka分散式架構

如上圖所示,kafka將topic中的消息存在不同的partition中。如果存在鍵值(key),消息按照鍵值(key)做分類存在不同的partiition中,如果不存在鍵值(key),消息按照輪詢(RoundRobin)機制存在不同的partition中。默認情況下,鍵值(key)決定了一條消息會被存在哪個partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)來指定消息的位置。一個topic的一個partition只能被一個consumergroup中的一個consumer消費,多個consumer消費同一個partition中的數據是不允許的,但是一個consumer可以消費多個partition中的數據。

kafka將partition的數據複製到不同的broker,提供了partition數據的備份。每一個partition都有一個broker作為leader,若干個broker作為follower。所有的數據讀寫都通過leader所在的伺服器進行,並且leader在不同broker之間複製數據。

上圖中,對於Partition0,broker1是它的leader,broker2和broker3是follower。對於Partition1,broker2是它的leader,broker1和broker3是follower。

在上圖中,當有Client(也就是Producer)要寫入數據到Partition0時,會寫入到leaderBroker1,Broker1再將數據複製到followerBroker2和Broker3。

在上圖中,Client向Partition1中寫入數據時,會寫入到Broker2,因為Broker2是Partition1的Leader,然後Broker2再將數據複製到followerBroker1和Broker3中。

上圖中的topic一共有3個partition,對每個partition的讀寫都由不同的broker處理,因此總的吞吐量得到了提升。

實驗一:kafka-python實現生產者消費者

kafka-python是一個python的Kafka客戶端,可以用來向kafka的topic發送消息、消費消息。

這個實驗會實現一個producer和一個consumer,producer向kafka發送消息,consumer從topic中消費消息。結構如下圖

producer代碼

consumer代碼

接下來創建testtopic

打開兩個窗口中,我們在window1中運行producer,如下

在window2中運行consumer,如下

可以看到window2中的consumer成功的讀到了producer寫入的數據

實驗二:消費組實現容錯性機制

這個實驗將展示消費組的容錯性的特點。這個實驗中將創建一個有2個partition的topic,和2個consumer,這2個consumer共同消費同一個topic中的數據。結構如下所示

producer部分代碼和實驗一相同,這裡不再重複。consumer需要指定所屬的consumergroup,代碼如下

接下來我們創建topic,名字test,設置partition數量為2

打開三個窗口,一個窗口運行producer,還有兩個窗口運行consumer。

運行consumer的兩個窗口的輸出如下:

可以看到兩個consumer同時運行的情況下,它們分別消費不同partition中的數據。window1中的consumer消費partition0中的數據,window2中的consumer消費parition1中的數據。

我們嘗試關閉window1中的consumer,可以看到如下結果

剛開始window2中的consumer只消費partition1中的數據,當window1中的consumer退出後,window2中的consumer中也開始消費partition0中的數據了。

實驗三:offset管理

kafka允許consumer將當前消費的消息的offset提交到kafka中,這樣如果consumer因異常退出後,下次啟動仍然可以從上次記錄的offset開始向後繼續消費消息。

這個實驗的結構和實驗一的結構是一樣的,使用一個producer,一個consumer,testtopic的partition數量設為1。

producer的代碼和實驗一中的一樣,這裡不再重複。consumer的代碼稍作修改,這裡consumer中列印出下一個要被消費的消息的offset。consumer代碼如下

在一個窗口中啟動producer,在另一個窗口並且啟動consumer。consumer的輸出如下

可以嘗試退出consumer,再啟動consumer。每一次重新啟動,consumer都是從offset=98的消息開始消費的。

修改consumer的代碼如下,在consumer消費每一條消息後將offset提交回kafka

啟動consumer

可以看到consumer從offset=98的消息開始消費,到offset=829時,我們Ctrl+C退出consumer。

我們再次啟動consumer

可以看到重新啟動後,consumer從上一次記錄的offset開始繼續消費消息。之後每一次consumer重新啟動,consumer都會從上一次停止的地方繼續開始消費。

如想了解更多關於python語言的文章,請前往51Testing軟體測試網-中國軟體測試人的精神家園.

上文內容不用於商業目的,如涉及知識產權問題,請權利人聯繫博為峰小編(021-64471599-8017),我們將立即處理。

推薦閱讀:

一晚上糊出一個語言「前端」
Python 工匠:編寫條件分支代碼的技巧
阻擋你學會Haskell最大的兩個問題是什麼?
如何有步驟地實現一個解釋器?如果採用低級語言,如C之類的語言來實現像Lisp這樣的語言,需要什麼知識和工具?
測試的道理

TAG:Python | 測試 | 編程語言 |