Kafka的初步認識
來自專欄程序之心9 人贊了文章
什麼是消息系統?
早期兩個應用程序間進行消息傳遞需要保證兩個應用程序同時在線,並且耦合度很高。為了解決應用程序不在線的情況下業務正常運轉,就產生了消息系統,消費發送者(生產者)將消息發送至消息系統,消息接受者(消費者)從消息系統中獲取消息。
提到消息系統,不得不說一下JMS即Java消息服務(Java Message Service)應用程序介面。是一個Java平台中關於面向消息中間件的API。用於在兩個應用程序之間或分散式系統中發送消息,進行非同步通信。Java消息服務是一個與具體平台無關的API。
通常消息傳遞有兩種類型的消息模式可用一種是點對點queue隊列模式(p2p),另一種是topic發布-訂閱模式(public-subscribe)。
點對點消息系統
在點對點系統中,消息被保留在隊列中。 一個或多個消費者可以消耗隊列中的消息,但是特定消息只能由最多一個消費者消費。一旦消費者讀取隊列中的消息,它就從該隊列中消失。該系統的典型示例是訂單處理系統,其中每個訂單將由一個訂單處理器處理,但多個訂單處理器也可以同時工作。下圖描述了結構。
發布 - 訂閱消息系統
在發布-訂閱系統中,消息被保留在主題中。與點對點系統不同,消費者可以訂閱一個或多個主題並使用該主題中的所有消息。 在發布 - 訂閱系統中,消息生產者稱為發布者,消息使用者稱為訂閱者。一個現實生活的例子是Dish電視,它發布不同的渠道,如運動,電影,音樂等,任何人都可以訂閱自己的頻道集,並獲得他們訂閱的頻道時可用。
MQ消息隊列對比
下面針對RabbitMQ與kafka進行對比
應用場景上
RabbitMQ:遵循AMQP(Advanced Message Queuing Protocol)協議,由內在高並發的erlanng語言開發,用在實時的對可靠性要求比較高的消息傳遞上。
kafka:是Linkedin於2010年12月份開源的消息發布訂閱系統,它主要用於處理活躍的流式數據,大數據量的數據處理上。
在吞吐量上
RabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不一樣,rabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操作;基於存儲的可靠性的要求存儲可以採用內存或者硬碟。
kafka具有高的吞吐量,內部採用消息的批量處理,數據的存儲和獲取是本地磁碟順序批量操作,消息處理的效率很高。
在集群負載均衡上
RabbitMQ的負載均衡需要單獨的loadbalancer進行支持。
kafka採用zookeeper對集群中的broker、consumer進行協調管理。
什麼是Kafka?
Apache Kafka是一個分散式發布-訂閱消息系統和一個強大的隊列,實際上就是JMS的一個變形,可以處理大量的數據,並使您能夠將消息從一個端點傳遞到另一個端點。Kafka適合離線和在線消息消費。Kafka消息保留在磁碟上,並在群集內複製以防止數據丟失。Kafka構建在ZooKeeper同步服務之上。
Kafka的特性
以下是Kafka的幾個好處
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行消費操作。
可擴展性:kafka集群支持熱擴展
持久性、可靠性:消息被持久化到本地磁碟,並且支持數據備份防止數據丟失
容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
高並發:支持數千個客戶端同時讀寫
應用場景
Kafka可以在許多用例中使用。 其中一些列出如下:
日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
消息系統:解耦和生產者和消費者、緩存消息等。
用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個伺服器發布到kafka 的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分散式應用的數據,生產各種操作的集中反饋,比如報警和報告。
流式處理:比如spark streaming和storm
事件源
Kafka基本概念
Kafka中發布訂閱的對象是topic。我們可以為每類數據創建一個topic,把向topic發布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫數據。一個kafka集群由一個或多個broker伺服器組成,它負責持久化和備份具體的kafka消息。
Broker(經紀人):Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
Topic(主題):一類消息,消息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
Segment:partition物理上由多個segment組成,每個Segment存著message信息。
offset:一條消息在消息系統中的偏移量。
Producer : 生產message發送到topic。
Consumer : 訂閱topic消費message, consumer作為一個線程來消費。
ConsumerGroup:一個ConsumerGroup包含多個consumer,這個是預先在配置文件中配置好的。各個consumer(consumer線程)可以組成一個組(Consumer group),partition中的每個message只能被組(Consumer group) 中的一個consumer(consumer 線程)消費,如果一個message可以被多個consumer(consumer 線程) 消費的話,那麼這些consumer必須在不同的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的數據的時候,由於要保證不能多個線程拿同一條message,所以就需要行級別悲觀鎖(for update),這就導致了consume的性能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer線程去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就形成了分散式消費的概念。
生產者和消費者
針對生產者和消費者,需要注意以下幾點
分區在producer端進行
一個分區只會由消費者組內的一個consumer消費,kafka會通過負載均衡機制自動分配
offset由consumer端進行維護,一般交給zookeeper進行維護
只能保證一個分區內的數據是有序的
二、 Apache Kafka - 安裝步驟
註:安裝kafka前需要提前安裝JDK與zookeeper
Step 1: 下載Kafka並解壓
> tar -xzfkafka_2.9.2-0.8.1.1.tgz
> cdkafka_2.9.2-0.8.1.1
Step 2: 配置環境變數(可選)
vi/etc/profile
KAFKA_HOME=/opt/kafka_2.9.2-0.8.1.1
PATH=$PATH:$KAFKA_HOME/bin
Step 3: 修改配置文件中的以下內容
cd /opt/kafka_2.9.2-0.8.1.1/config
viserver.properties
broker.id=0 //為依次增長的:0、1、2、3、4,集群中唯一id
log.dirs=/opt/kafka_2.9.2-0.8.1.1/logs //日誌地址
zookeeper.connect=localhost:2181 //zookeeperServers列表,各節點以逗號分開
cd /opt/kafka_2.9.2-0.8.1.1/config
vi zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
dataLogDir=/usr/local/kafka/log/zookeeper
Step 4: 啟動單節點服務
在kafka的bin中存在很多sh文件,其中包含對zookeeper的啟動與停止。首先啟動zookeeper再啟動kafka的broker。
./bin/zookeeper-server-start.shconfig/zookeeper.properties &
./bin/kafka-server-start.shconfig/server.properties &
Step 5: 創建topic
./bin/kafka-topics.sh --create --zookeeper192.168.2.105:2181 --replication-factor 1 --partitions 1 --topic testlzy
列出所有topic
./bin/kafka-topics.sh --zookeeper 192.168.2.105:2181--list
Step 5: 創建生產者
./bin/kafka-console-producer.sh--broker-list 192.168.2.105:9093 --topic testlzy
Step 6: 創建消費者
./bin/kafka-console-consumer.sh --zookeeperlocalhost:2181 --topic testlzy --from-beginning
此時如果在生產者控制台中發布消息,消費者端能接收到,就算成功了。
kafka常用命令
以下是kafka常用命令行總結:
0. 查看有哪些主題:
./kafka-topics.sh --list --zookeeper192.168.0.201:2181
1. 查看topic的詳細信息
./kafka-topics.sh -zookeeper 127.0.0.1:2181-describe -topic testKJ1
2. 為topic增加副本
./kafka-reassign-partitions.sh -zookeeper127.0.0.1:2181 -reassignment-json-file json/partitions-to-move.json -execute
3. 創建topic
./kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic testKJ1
4. 為topic增加partition
./bin/kafka-topics.sh –zookeeper127.0.0.1:2181 –alter –partitions 20 –topic testKJ1
5. kafka生產者客戶端命令
./kafka-console-producer.sh --broker-listlocalhost:9092 --topic testKJ1
6. kafka消費者客戶端命令
./kafka-console-consumer.sh -zookeeperlocalhost:2181 --from-beginning --topic testKJ1
7. kafka服務啟動
./kafka-server-start.sh -daemon../config/server.properties
8. 下線broker
./kafka-run-class.shkafka.admin.ShutdownBroker --zookeeper 127.0.0.1:2181 --broker #brokerId#--num.retries 3 --retry.interval.ms 60
shutdown broker
9. 刪除topic
./kafka-run-class.shkafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper 127.0.0.1:2181
./kafka-topics.sh --zookeeperlocalhost:2181 --delete --topic testKJ1
10. 查看consumer組內消費的offset
./kafka-run-class.shkafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test--topic testKJ1
./kafka-consumer-offset-checker.sh --zookeeper192.168.0.201:2181 --group group1 --topic group1
三、 Apache Kafka – 核心原理
負載均衡
負載均衡的兩種策略(消費端配置)
partition.assignment.strategy=range|round-robin
在kafka中partition分發消息給消費者不是已消費為力度進行分配的,是以消費者線程為力度進行分配的。
Range
kafka中的每個topic的分區是獨立進行分配的,topic間不受到任何影響。
topic中先是對partition進行數字排序,線程按照字典排序。
接下來用分區的數量除以線程數量就是每個線程能夠分到的消息數量
partition_per_thread= 分區數量/線程數量
如果整除了,那麼每個線程依次分配partition_per_thread個分區
如果不整除,低位的幾個thread會多消費分區
如果分區個數少於線程數量,就會出現線程空閑的時候,因為kafka會保證一個分區只能被一個消費者進行消費。所以建議在配置的時候分區數量和消費者線程數量相等最好。
Round-robin
在kafka中一個消費者組是可以訂閱多個topic的。當訂閱了多個topic後,他內部會把所有topic進行混亂以後再按照range策略走一遍,他會保證每個topic在consumer中的線程數量必須相等。
備註
一般應用range的比較多,如果consumer組中有個線程shutdown了,那麼kafka會自動的重新進行負載均衡的分配。這個負載均衡增加了下游的消費能力。而且非常方便的進行消費者的擴展。當然kafka也可以去除這樣的負載均衡策略,默認消費端分為high level的客戶端(啟用負載均衡機制)和simple的客戶端(不啟用負載均衡,需要自己決定消費哪個分區的消息)。
主從及副本分布
kafka的主從主要提供了分區容錯的能力,可以配置一個leader和若干follower,leader是處理消息,而follower只是leader的一個備份,平常的連接都是連在leader上的。當leader宕機以後,kafka會從follower中選舉一台leader來進行服務。
對於第R個副本,先隨機取一個broker放分區0,然後順序放其他分區。這樣保證了leader和follower均勻的分布在了每個broker上。
通過-topic命令可以查看指定topic的分區和副本的分布情況
topic:topic名稱
partition:分區名稱
leader:此分區的leader在哪個broker上
replicas:所有的副本分布在哪個broker上
isr:replicas中所有in-sync的節點
對於in-sync
節點必須可以維護和zookeeper的連接,zookeeper通過心跳機制檢查每個節點的連接。
如果節點是個follower。他必須能及時的同步leader的寫操作,延時不能太久。
設置方式
replica.lag.max.messages:落後的消息數量
replica.lag.time.max.ms:卡住的時間
kafka是通過這兩個參數去判斷是不是一個有效的副本follower。當leader宕機以後,是從這些有效副本中進行選舉的。無效的是不參加選舉的。
kafka的持久化
消息格式
kafka的消息格式如圖
文件系統
kafka會將消息組織到硬碟上,在broker的數據目錄中會有以topic名稱-分區號命名的文件夾,
在文件夾中存在成對出現的文件。kafka不是將所有消息放到一個大文件里,而是根據消息的offset進行了分段。每一個段內放多少消息是可以配置的。文件名字代表此文件中的第一個數據的offset。index為索引文件,log為數據文件,存放的消息格式見上圖。對於index文件維護的是一個稀疏索引,由消息的編號指向物理偏移,運行時會被載入到內存。
過期數據清理
kafka既然支持了持久化,他對磁碟空間是有要求的。對於刪除過期數據kafka提供了兩種策略
1、默認策略為直接刪除
l 超過指定的時間的消息:
log.retention.hours=168
l 超過指定大小後,刪除舊的消息:
log.retention.bytes=1073741824
2、壓縮(只在特定的業務場景下有意義)
全局:log.cleaner.enable=true
在特定的topic上:log.cleanup.policy=compact
保留每個key最後一個版本的信息,若最後一個版本消息內容為空,這個key被刪除
推薦閱讀:
※理解 RabbitMQ Exchange
※Python操作rabbitmq系列(一)
※rabbitmq-延遲隊列
※Rabbitmq系列之1--基礎概念
※kafka的高可靠性實現方案及保障機制