Kafka設計解析(六)- Kafka高性能架構之道
原創文章,轉載請務必將下面這段話置於文章開頭處。
本文轉發自技術世界,原文鏈接 http://www.jasongj.com/kafka/high_throughput/
摘要
上一篇文章《Kafka設計解析(五)- Kafka性能測試方法及Benchmark報告》從測試角度說明了Kafka的性能。本文從宏觀架構層面和具體實現層面分析了Kafka如何實現高性能。
宏觀架構層面
利用Partition實現並行處理
Partition提供並行處理的能力
Kafka是一個Pub-Sub的消息系統,無論是發布還是訂閱,都須指定Topic。如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Topic只是一個邏輯的概念。每個Topic都包含一個或多個Partition,不同Partition可位於不同節點。同時Partition在物理上對應一個本地文件夾,每個Partition包含一個或多個Segment,每個Segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,可以把一個Partition當作一個非常長的數組,可通過這個「數組」的索引(offset)去訪問其數據。
一方面,由於不同Partition可位於不同機器,因此可以充分利用集群優勢,實現機器間的並行處理。另一方面,由於Partition在物理上對應一個文件夾,即使多個Partition位於同一個節點,也可通過配置讓同一節點上的不同Partition置於不同的disk drive上,從而實現磁碟間的並行處理,充分發揮多磁碟的優勢。
利用多磁碟的具體方法是,將不同磁碟mount到不同目錄,然後在server.properties中,將log.dirs
設置為多目錄(用逗號分隔)。Kafka會自動將所有Partition儘可能均勻分配到不同目錄也即不同目錄(也即不同disk)上。
註:雖然物理上最小單位是Segment,但Kafka並不提供同一Partition內不同Segment間的並行處理。因為對於寫而言,每次只會寫Partition內的一個Segment,而對於讀而言,也只會順序讀取同一Partition內的不同Segment。
Partition是最小並發粒度
如同《Kafka設計解析(四)- Kafka Consumer設計解析》一文所述,多Consumer消費同一個Topic時,同一條消息只會被同一Consumer Group內的一個Consumer所消費。而數據並非按消息為單位分配,而是以Partition為單位分配,也即同一個Partition的數據只會被一個Consumer所消費(在不考慮Rebalance的前提下)。
如果Consumer的個數多於Partition的個數,那麼會有部分Consumer無法消費該Topic的任何數據,也即當Consumer個數超過Partition後,增加Consumer並不能增加並行度。
簡而言之,Partition個數決定了可能的最大並行度。如下圖所示,由於Topic 2隻包含3個Partition,故group2中的Consumer 3、Consumer 4、Consumer 5 可分別消費1個Partition的數據,而Consumer 6消費不到Topic 2的任何數據。
以Spark消費Kafka數據為例,如果所消費的Topic的Partition數為N,則有效的Spark最大並行度也為N。即使將Spark的Executor數設置為N+M,最多也只有N個Executor可同時處理該Topic的數據。
ISR實現可用性與數據一致性的動態平衡
CAP理論
CAP理論是指,分散式系統中,一致性、可用性和分區容忍性最多只能同時滿足兩個。
一致性
- 通過某個節點的寫操作結果對後面通過其它節點的讀操作可見
- 如果更新數據後,並發訪問情況下後續讀操作可立即感知該更新,稱為強一致性
- 如果允許之後部分或者全部感知不到該更新,稱為弱一致性
- 若在之後的一段時間(通常該時間不固定)後,一定可以感知到該更新,稱為最終一致性
可用性
- 任何一個沒有發生故障的節點必須在有限的時間內返回合理的結果
分區容忍性
- 部分節點宕機或者無法與其它節點通信時,各分區間還可保持分散式系統的功能
一般而言,都要求保證分區容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡。
常用數據複製及一致性方案
Master-Slave
- RDBMS的讀寫分離即為典型的Master-Slave方案
- 同步複製可保證強一致性但會影響可用性
- 非同步複製可提供高可用性但會降低一致性
WNR
- 主要用於去中心化的分散式系統中。DynamoDB與Cassandra即採用此方案或其變種
- N代表總副本數,W代表每次寫操作要保證的最少寫成功的副本數,R代表每次讀至少要讀取的副本數
- 當W+R>N時,可保證每次讀取的數據至少有一個副本擁有最新的數據
- 多個寫操作的順序難以保證,可能導致多副本間的寫操作順序不一致。Dynamo通過向量時鐘保證最終一致性
Paxos及其變種
- Google的Chubby,Zookeeper的原子廣播協議(Zab),RAFT等
基於ISR的數據複製方案
如《 Kafka High Availability(上)》一文所述,Kafka的數據複製是以Partition為單位的。而多個備份間的數據複製,通過Follower向Leader拉取數據完成。從一這點來講,Kafka的數據複製方案接近於上文所講的Master-Slave方案。不同的是,Kafka既不是完全的同步複製,也不是完全的非同步複製,而是基於ISR的動態複製方案。ISR,也即In-sync Replica。每個Partition的Leader都會維護這樣一個列表,該列表中,包含了所有與之同步的Replica(包含Leader自己)。每次數據寫入時,只有ISR中的所有Replica都複製完,Leader才會將其置為Commit,它才能被Consumer所消費。
這種方案,與同步複製非常接近。但不同的是,這個ISR是由Leader動態維護的。如果Follower不能緊「跟上」Leader,它將被Leader從ISR中移除,待它又重新「跟上」Leader後,會被Leader再次加加ISR中。每次改變ISR後,Leader都會將最新的ISR持久化到Zookeeper中。
至於如何判斷某個Follower是否「跟上」Leader,不同版本的Kafka的策略稍微有些區別。
- 對於0.8.*版本,如果Follower在
replica.lag.time.max.ms
時間內未向Leader發送Fetch請求(也即數據複製請求),則Leader會將其從ISR中移除。如果某Follower持續向Leader發送Fetch請求,但是它與Leader的數據差距在replica.lag.max.messages
以上,也會被Leader從ISR中移除。 - 從0.9.0.0版本開始,
replica.lag.max.messages
被移除,故Leader不再考慮Follower落後的消息條數。另外,Leader不僅會判斷Follower是否在replica.lag.time.max.ms
時間內向其發送Fetch請求,同時還會考慮Follower是否在該時間內與之保持同步。 - 0.10.* 版本的策略與0.9.*版一致
對於0.8.*版本的replica.lag.max.messages
參數,很多讀者曾留言提問,既然只有ISR中的所有Replica複製完後的消息才被認為Commit,那為何會出現Follower與Leader差距過大的情況。原因在於,Leader並不需要等到前一條消息被Commit才接收後一條消息。事實上,Leader可以按順序接收大量消息,最新的一條消息的Offset被記為High Wartermark。而只有被ISR中所有Follower都複製過去的消息才會被Commit,Consumer只能消費被Commit的消息。由於Follower的複製是嚴格按順序的,所以被Commit的消息之前的消息肯定也已經被Commit過。換句話說,High Watermark標記的是Leader所保存的最新消息的offset,而Commit Offset標記的是最新的可被消費的(已同步到ISR中的Follower)消息。而Leader對數據的接收與Follower對數據的複製是非同步進行的,因此會出現Commit Offset與High Watermark存在一定差距的情況。0.8.*版本中replica.lag.max.messages
限定了Leader允許的該差距的最大值。
Kafka基於ISR的數據複製方案原理如下圖所示。
如上圖所示,在第一步中,Leader A總共收到3條消息,故其high watermark為3,但由於ISR中的Follower只同步了第1條消息(m1),故只有m1被Commit,也即只有m1可被Consumer消費。此時Follower B與Leader A的差距是1,而Follower C與Leader A的差距是2,均未超過默認的replica.lag.max.messages
,故得以保留在ISR中。在第二步中,由於舊的Leader A宕機,新的Leader B在replica.lag.time.max.ms
時間內未收到來自A的Fetch請求,故將A從ISR中移除,此時ISR={B,C}。同時,由於此時新的Leader B中只有2條消息,並未包含m3(m3從未被任何Leader所Commit),所以m3無法被Consumer消費。第四步中,Follower A恢復正常,它先將宕機前未Commit的所有消息全部刪除,然後從最後Commit過的消息的下一條消息開始追趕新的Leader B,直到它「趕上」新的Leader,才被重新加入新的ISR中。
使用ISR方案的原因
- 由於Leader可移除不能及時與之同步的Follower,故與同步複製相比可避免最慢的Follower拖慢整體速度,也即ISR提高了系統可用性。
- ISR中的所有Follower都包含了所有Commit過的消息,而只有Commit過的消息才會被Consumer消費,故從Consumer的角度而言,ISR中的所有Replica都始終處於同步狀態,從而與非同步複製方案相比提高了數據一致性。
- ISR可動態調整,極限情況下,可以只包含Leader,極大提高了可容忍的宕機的Follower的數量。與
Majority Quorum
方案相比,容忍相同個數的節點失敗,所要求的總節點數少了近一半。
ISR相關配置說明
- Broker的
min.insync.replicas
參數指定了Broker所要求的ISR最小長度,默認值為1。也即極限情況下ISR可以只包含Leader。但此時如果Leader宕機,則該Partition不可用,可用性得不到保證。 - 只有被ISR中所有Replica同步的消息才被Commit,但Producer發布數據時,Leader並不需要ISR中的所有Replica同步該數據才確認收到數據。Producer可以通過
acks
參數指定最少需要多少個Replica確認收到該消息才視為該消息發送成功。acks
的默認值是1,即Leader收到該消息後立即告訴Producer收到該消息,此時如果在ISR中的消息複製完該消息前Leader宕機,那該條消息會丟失。而如果將該值設置為0,則Producer發送完數據後,立即認為該數據發送成功,不作任何等待,而實際上該數據可能發送失敗,並且Producer的Retry機制將不生效。更推薦的做法是,將acks
設置為all
或者-1
,此時只有ISR中的所有Replica都收到該數據(也即該消息被Commit),Leader才會告訴Producer該消息發送成功,從而保證不會有未知的數據丟失。
具體實現層面
高效使用磁碟
順序寫磁碟
根據《一些場景下順序寫磁碟快於隨機寫內存》所述,將寫磁碟的過程變為順序寫,可極大提高對磁碟的利用率。
Kafka的整個設計中,Partition相當於一個非常長的數組,而Broker接收到的所有消息順序寫入這個大數組中。同時Consumer通過Offset順序消費這些數據,並且不刪除已經消費的數據,從而避免了隨機寫磁碟的過程。
由於磁碟有限,不可能保存所有數據,實際上作為消息系統Kafka也沒必要保存所有數據,需要刪除舊的數據。而這個刪除過程,並非通過使用「讀-寫」模式去修改文件,而是將Partition分為多個Segment,每個Segment對應一個物理文件,通過刪除整個文件的方式去刪除Partition內的數據。這種方式清除舊數據的方式,也避免了對文件的隨機寫操作。
通過如下代碼可知,Kafka刪除Segment的方式,是直接刪除Segment對應的整個log文件和整個index文件而非刪除文件中的部分內容。
/**n * Delete this log segment from the filesystem.n *n * @throws KafkaStorageException if the delete fails.n */ndef delete() {n val deletedLog = log.delete()n val deletedIndex = index.delete()n val deletedTimeIndex = timeIndex.delete()n if(!deletedLog && log.file.exists)n throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")n if(!deletedIndex && index.file.exists)n throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")n if(!deletedTimeIndex && timeIndex.file.exists)n throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")n}n
充分利用Page Cache
使用Page Cache的好處如下
- I/O Scheduler會將連續的小塊寫組裝成大塊的物理寫從而提高性能
- I/O Scheduler會嘗試將一些寫操作重新按順序排好,從而減少磁碟頭的移動時間
- 充分利用所有空閑內存(非JVM內存)。如果使用應用層Cache(即JVM堆內存),會增加GC負擔
- 讀操作可直接在Page Cache內進行。如果消費和生產速度相當,甚至不需要通過物理磁碟(直接通過Page Cache)交換數據
- 如果進程重啟,JVM內的Cache會失效,但Page Cache仍然可用
Broker收到數據後,寫磁碟時只是將數據寫入Page Cache,並不保證數據一定完全寫入磁碟。從這一點看,可能會造成機器宕機時,Page Cache內的數據未寫入磁碟從而造成數據丟失。但是這種丟失只發生在機器斷電等造成操作系統不工作的場景,而這種場景完全可以由Kafka層面的Replication機制去解決。如果為了保證這種情況下數據不丟失而強制將Page Cache中的數據Flush到磁碟,反而會降低性能。也正因如此,Kafka雖然提供了flush.messages
和flush.ms
兩個參數將Page Cache中的數據強制Flush到磁碟,但是Kafka並不建議使用。
如果數據消費速度與生產速度相當,甚至不需要通過物理磁碟交換數據,而是直接通過Page Cache交換數據。同時,Follower從Leader Fetch數據時,也可通過Page Cache完成。下圖為某Partition的Leader節點的網路/磁碟讀寫信息。
從上圖可以看到,該Broker每秒通過網路從Producer接收約35MB數據,雖然有Follower從該Broker Fetch數據,但是該Broker基本無讀磁碟。這是因為該Broker直接從Page Cache中將數據取出返回給了Follower。
支持多Disk Drive
Broker的log.dirs
配置項,允許配置多個文件夾。如果機器上有多個Disk Drive,可將不同的Disk掛載到不同的目錄,然後將這些目錄都配置到log.dirs
里。Kafka會儘可能將不同的Partition分配到不同的目錄,也即不同的Disk上,從而充分利用了多Disk的優勢。
零拷貝
Kafka中存在大量的網路數據持久化到磁碟(Producer到Broker)和磁碟文件通過網路發送(Broker到Consumer)的過程。這一過程的性能直接影響Kafka的整體吞吐量。
傳統模式下的四次拷貝與四次上下文切換
以將磁碟文件通過網路發送為例。傳統模式下,一般使用如下偽代碼所示的方法先將文件數據讀入內存,然後通過Socket將內存中的數據發送出去。
buffer = File.readnSocket.send(buffer)n
這一過程實際上發生了四次數據拷貝。首先通過系統調用將文件數據讀入到內核態Buffer(DMA拷貝),然後應用程序將內存態Buffer數據讀入到用戶態Buffer(CPU拷貝),接著用戶程序通過Socket發送數據時將用戶態Buffer數據拷貝到內核態Buffer(CPU拷貝),最後通過DMA拷貝將數據拷貝到NIC Buffer。同時,還伴隨著四次上下文切換,如下圖所示。
sendfile和transferTo實現零拷貝
Linux 2.4+內核通過sendfile
系統調用,提供了零拷貝。數據通過DMA拷貝到內核態Buffer後,直接通過DMA拷貝到NIC Buffer,無需CPU拷貝。這也是零拷貝這一說法的來源。除了減少數據拷貝外,因為整個讀文件-網路發送由一個sendfile
調用完成,整個過程只有兩次上下文切換,因此大大提高了性能。零拷貝過程如下圖所示。
從具體實現來看,Kafka的數據傳輸通過TransportLayer來完成,其子類PlaintextTransportLayer
通過Java NIO的FileChannel的transferTo
和transferFrom
方法實現零拷貝,如下所示。
@Overridenpublic long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {n return fileChannel.transferTo(position, count, socketChannel);n}n
註: transferTo
和transferFrom
並不保證一定能使用零拷貝。實際上是否能使用零拷貝與操作系統相關,如果操作系統提供sendfile
這樣的零拷貝系統調用,則這兩個方法會通過這樣的系統調用充分利用零拷貝的優勢,否則並不能通過這兩個方法本身實現零拷貝。
減少網路開銷
批處理
批處理是一種常用的用於提高I/O性能的方式。對Kafka而言,批處理既減少了網路傳輸的Overhead,又提高了寫磁碟的效率。
Kafka 0.8.1及以前的Producer區分同步Producer和非同步Producer。同步Producer的send方法主要分兩種形式。一種是接受一個KeyedMessage作為參數,一次發送一條消息。另一種是接受一批KeyedMessage作為參數,一次性發送多條消息。而對於非同步發送而言,無論是使用哪個send方法,實現上都不會立即將消息發送給Broker,而是先存到內部的隊列中,直到消息條數達到閾值或者達到指定的Timeout才真正的將消息發送出去,從而實現了消息的批量發送。
Kafka 0.8.2開始支持新的Producer API,將同步Producer和非同步Producer結合。雖然從send介面來看,一次只能發送一個ProducerRecord,而不能像之前版本的send方法一樣接受消息列表,但是send方法並非立即將消息發送出去,而是通過batch.size
和linger.ms
控制實際發送頻率,從而實現批量發送。
由於每次網路傳輸,除了傳輸消息本身以外,還要傳輸非常多的網路協議本身的一些內容(稱為Overhead),所以將多條消息合併到一起傳輸,可有效減少網路傳輸的Overhead,進而提高了傳輸效率。
從零拷貝章節的圖中可以看到,雖然Broker持續從網路接收數據,但是寫磁碟並非每秒都在發生,而是間隔一段時間寫一次磁碟,並且每次寫磁碟的數據量都非常大(最高達到718MB/S)。
數據壓縮降低網路負載
Kafka從0.7開始,即支持將數據壓縮後再傳輸給Broker。除了可以將每條消息單獨壓縮然後傳輸外,Kafka還支持在批量發送時,將整個Batch的消息一起壓縮後傳輸。數據壓縮的一個基本原理是,重複數據越多壓縮效果越好。因此將整個Batch的數據一起壓縮能更大幅度減小數據量,從而更大程度提高網路傳輸效率。
Broker接收消息後,並不直接解壓縮,而是直接將消息以壓縮後的形式持久化到磁碟。Consumer Fetch到數據後再解壓縮。因此Kafka的壓縮不僅減少了Producer到Broker的網路傳輸負載,同時也降低了Broker磁碟操作的負載,也降低了Consumer與Broker間的網路傳輸量,從而極大得提高了傳輸效率,提高了吞吐量。
高效的序列化方式
Kafka消息的Key和Payload(或者說Value)的類型可自定義,只需同時提供相應的序列化器和反序列化器即可。因此用戶可以通過使用快速且緊湊的序列化-反序列化方式(如Avro,Protocal Buffer)來減少實際網路傳輸和磁碟存儲的數據規模,從而提高吞吐率。這裡要注意,如果使用的序列化方法太慢,即使壓縮比非常高,最終的效率也不一定高。
Kafka系列文章
- Kafka設計解析(一)- Kafka背景及架構介紹
- Kafka設計解析(二)- Kafka High Availability (上)
- Kafka設計解析(三)- Kafka High Availability (下)
- Kafka設計解析(四)- Kafka Consumer設計解析
- Kafka設計解析(五)- Kafka性能測試方法及Benchmark報告
- Kafka設計解析(六)- Kafka高性能架構之道
- Kafka設計解析(七)- Kafka Stream
推薦閱讀:
※Kafka 2017技術峰會摘要(流計算分類)
※《Simplifying data pipelines with Apache Kafka》課程第四章Kafka Consumer問題集
※kafka解決了什麼問題?
※Kafka的offset retention
※Kafka猛然醒來,突然變成了資料庫