KAFKA的最佳實踐
來自專欄 Gemfield
背景
一個Python項目中要使用kafka去deliver log,1個producer,5個consumer。單條消息的大小是100KB ~ 500KB,producer要在1秒鐘之內能夠發送30個這樣的message。然後每個consumer也要在1秒鐘之內消費30個這樣的message。另外,消息不需要持久化(好像kafka沒法關閉數據的落盤),消息不需要設置replica。所以你可以看到,這個場景還是比較簡單的,非得說一些特點的話,那就是單條消息比較大(但是還沒有超越kafka默認的1MB)、實時性要求比較高、數據量比較大(1秒鐘平均可能產生200KB * 30 = 6MB的數據量,1秒鐘可能要消費6MB * 5 = 30MB的數據量)。
JVM級別的優化
因為kafka是java寫的,所以JVM還是要考慮一些參數設置的。
下面是一個推薦的配置,你可以自己斟酌:
-Xmx8g -Xms8g -XX:MetaspaceSize=96m -XX:+UseG1GC-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
Xmx8g:最大堆大小是8G,gemfield設置的是16G,有錢任性,哼;
Xms8g:起始堆大小是8G。
在Gemfield的機器上,啟動的服務帶的參數如下所示:
/usr/lib/jvm/java-1.8-openjdk/jre/bin/java -Xmx16G -Xms16G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log ......
OS級別的優化
1,磁碟的讀寫性能通常是個瓶頸,多個磁碟或者更快的磁碟是個更好的選擇。可以通過server.properties中的 log.dirs來配置多個硬碟。
2,增大socket buffer size,可以提高kafka的吞吐量:
gemfield@ubuntu:~$ cat /proc/sys/net/core/wmem_max212992gemfield@ubuntu:~$ cat /proc/sys/net/core/rmem_max212992
Gemfield沒有更改。更詳細的設置請參考:TCP Tune
Log flush管理
這個決定消息如何同步到磁碟上。
# The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000
根據Gemfield的實踐,動這些配置沒有什麼用。還是省省吧。
Log Retention Policy 消息保留策略
也就是消息保留多久會被清除掉。
# The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age #log.retention.hours=168log.retention.hours=1 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000
gemfield將log.retention.hours設置為了1,並不需要將消息保留多久。
使用的文件系統
1,推薦EXT4 或者 XFS,XFS文件系統上Kafka會有更好的performance表現;
2,不要使用網路分區;
3,如果對持久化不做要求的話,可以使用內存文件系統,避免在磁碟IO上的浪費。比如gemfield就在某docker compose編排的系統中使用tmpfs文件系統(使用的是目前排名第一的wurstmeister/kafka做的docker鏡像):
version: 3services: zookeeper: image: wurstmeister/zookeeper restart: always ports: - "2181:2181" kafka: image: wurstmeister/kafka restart: always ports: - "9092:9092" environment: KAFKA_MESSAGE_MAX_BYTES: 5000000 KAFKA_AUTO_CREATE_TOPICS_ENABLE: false KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_CREATE_TOPICS: "football_live_1:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LOG_DIRS: /kafka/gemfield tmpfs: - /kafka/gemfield volumes: - /var/run/docker.sock:/var/run/docker.sock
注意啊,tmpfs內存型文件系統上的數據在容器重啟後就丟了。
Zookeeper
不要把zookeeper部署到和kafka broker同一台機器上。
Topic/Partitions
1,增加partitions增加並行吞吐量;
2,但是增加partition會(輕微)增加延遲。
3,Message.max.bytes 設置單條消息的最大位元組,特別重要的是,如果你設置了replica,確保replica.fetch.max.bytes要大於或者等於message.max.bytes。
Producer
1,Batch.size ,多少個消息發送一次。
2,Linger.ms ,多長時間發送一次。
3,Compression.type,壓縮類型,這是producer端的一個主要工作。
4,Max.in.flight.requests.per.connection;
5,Acks;比方說0表示無需應答,所以速度很快(但是稍微不那麼可靠點)。
6,更大的消息體積可以提高吞吐量。
Consumer
好像沒啥需要設置的。Consumer的速度一般是producer的好幾倍,所以不是什麼瓶頸。
KAFKA的python客戶端
好了,重點來了。上面說了那麼多看似有道理的配置,但其實你只要知道,90%的情況下上面的配置對提高performance沒什麼卵用。比方說在Python社區最流行的3個kafka客戶端分別是pykafka、kafka-python、confluent-kafka-python。這裡有一篇文章Python Kafka Client Benchmarking,介紹了這三者之間的性能比較(PK的版本是pykafka 2.3.1、kafka-python 1.1.1和confluent-kafka-python 0.9.1)。
以三者的producer為例,confluent-kafka-python >> pykafka(librdkafka backend) >> pykafka > kafka-python;以三者的consumer為例,confluent-kafka-python >> pykafka(librdkafka backend) >> kafka-python > pykafka。
1,kafka-python
最初在調研的時候,因為kafka-python排在搜索結果的第一名,並且github上的star數量最多,所以首先選擇了kafka-python。在最新的kafka-python 1.4.2版本中,本來預期producer一秒能發送20MB的數據(基於本文開始背景中的上下文),但實際上只能發送3~4MB左右。這就導致一種情況,就是producer非同步發送的東西cache在一個buffer里,由於網路來不及將其發送出去,於是導致buffer越攢越多,當達到buffer_memory的限制時,數據會被清空(也就是還未發送就被扔掉了),並且producer會阻塞住好一會兒......
buffer_memory (int) – The total bytes of memory the producer should use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block up to max_block_ms, raising an exception on timeout. In the current implementation, this setting is an approximation. Default: 33554432 (32MB)
在kafka-python的github上找到了這麼一條issue:Slower producer throughput on 1.4.1 than on 1.3.5 · Issue #1412 · dpkp/kafka-python
這個問題在1.4.1中出現,有人解釋了原因是因為沒有crc32的硬解碼。
在最新版(截止2018年5月15日)的1.4.2中可以通過安裝crc32來解決這個performance問題:
pip install crc32
安裝了crc32之後,這個問題就解決了。然而Gemfield要轉向confluent了。
2,confluent-kafka-python
在最新版的confluent-kafka-python 上(反正2018年5月已經是了),pip install安裝confluent-kafka-python時使用的 manylinux wheels中已經嵌入了librdkafka的so庫了,比如:
gemfield@gemfield.org:~# find / -name "librdkafka*" /usr/local/lib/python2.7/dist-packages/confluent_kafka/.libs/librdkafka-6f63ed6f.so.1
在使用的時候,因為速度太他瞄的快了,所以很可能會看到「BufferError: Local: Queue full」這樣的錯誤,這就是本地的producer的buffer滿了,為什麼滿了?因為網路來不及將越攢越多的數據送出去。這個buffer是靠兩個config屬性來指定的:
gemfield_config = {bootstrap.servers: kafka, queue.buffering.max.kbytes: 2000000, queue.buffering.max.messages: 1000000}
queue.buffering.max.kbytes 來指定能夠緩存多大的信息,默認是400MB,最大能提高到2097151KB,也就是大約2GB(因為緩存不是100%你的消息,還有key或者其他meta信息,所以實際可用的到不了2GB)。
queue.buffering.max.messages來指定能夠緩存多少個信息,默認是10萬個吧,夠用了。
可以通過更改這兩個屬性和優化程序的流程式控制制來避免BufferError: Local: Queue full這樣的錯誤。
3,Gemfield自己的比較
下面是發送3000個200KB的消息的PK結果:
#這個使用的是confluent-kafka-pythongemfield@gemfield.org:~# time python kf_producer.py real 0m3.609suser 0m0.360ssys 0m0.684s#這個使用的是kafka-pythongemfield@gemfield.org:~# time python producer.py real 0m7.121suser 0m3.976ssys 0m2.392s
推薦閱讀:
※分散式系統論文筆記目錄
※zooKeeper基礎
※分散式系統測試的應用方法——場景注入測試
※OceanBase英雄貼
※PacificA 一致性協議解讀