標籤:

Spark Streaming消費kafka使用及原理

作者 | 張偉

AI前線出品| ID:ai-front

本文主要介紹Spark Streaming(以下簡稱SS,版本1.6.3)的一些基本概念,以及SS消費kafka(版本0.8.2.1)數據的兩種方式的使用及其原理。我會對這兩種方案做詳細的解析,同時對比兩種方案優劣,以及針對Direct Approach (No Receivers)模式介紹其如何實現Exactly

Once Semantics,也就是保證接收到的數據只被處理一次,不丟,不重。

SS是Spark上的一個流式處理框架,可以面向海量數據實現高吞吐量、高容錯的實時計算。SS支持多種類型數據源,包括Kafka、Flume、twitter、zeroMQ、Kinesis以及TCP sockets等。SS實時接收數據流,並按照一定的時間間隔(下文稱為「批處理時間間隔」)將連續的數據流拆分成一批批離散的數據集;然後應用諸如map、reduce、join和window等豐富的API進行複雜的數據處理;最後提交給Spark引擎進行運算,得到批量結果數據,因此其也被稱為準實時處理系統。而結果也能保存在很多地方,如HDFS,資料庫等。另外SS也能和MLlib(機器學習)以及GraphX(圖計算)完美融合。

Spark Streaming支持多種類型數據源

Spark Streaming基礎概念

DStream Discretized Stream是SS的基礎抽象,代表持續性的數據流和經過各種Spark原語操作後的結果數據流。DStream本質上是一個以時間為鍵,RDD為值的哈希表,保存了按時間順序產生的RDD,而每個RDD封裝了批處理時間間隔內獲取到的數據。SS每次將新產生的RDD添加到哈希表中,而對於已經不再需要的RDD則會從這個哈希表中刪除,所以DStream也可以簡單地理解為以時間為鍵的RDD的動態序列。如下圖:

窗口時間間隔

窗口時間間隔又稱為窗口長度,它是一個抽象的時間概念,決定了SS對RDD序列進行處理的範圍與粒度,即用戶可以通過設置窗口長度來對一定時間範圍內的數據進行統計和分析。假如設置批處理時間間隔為1s,窗口時間間隔為3s。如下圖,DStream每1s會產生一個RDD,紅色邊框的矩形框就表示窗口時間間隔,一個窗口時間間隔內最多有3個RDD,Spark Streaming在一個窗口時間間隔內最多會對3個RDD中的數據進行統計和分析。

滑動時間間隔

滑動時間間隔決定了SS程序對數據進行統計和分析的頻率。它指的是經過多長時間窗口滑動一次形成新的窗口,滑動時間間隔默認情況下和批處理時間間隔相同,而窗口時間間隔一般設置的要比它們兩個大。在這裡必須注意的一點是滑動時間間隔和窗口時間間隔的大小一定得設置為批處理時間間隔的整數倍。

如下圖,批處理時間間隔是1個時間單位,窗口時間間隔是3個時間單位,滑動時間間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口時間間隔滿足了才觸發數據的處理。這裡需要注意的一點是,初始的窗口有可能覆蓋的數據沒有3個時間單位,但是隨著時間的推進,窗口最終會覆蓋到3個時間單位的數據。當每個2個時間單位,窗口滑動一次後,會有新的數據流入窗口,這時窗口會移去最早的兩個時間單位的數據,而與最新的兩個時間單位的數據進行匯總形成新的窗口(time3-time5)。

Spark Streaming讀取kafka數據

Spark Streaming 與Kafka集成接收數據的方式有兩種:

1. Receiver-based Approach

2. Direct Approach (No Receivers)

Receiver-based Approach

這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高級消費者API。對於所有的Receivers,接收到的數據將會保存在Spark executors中,然後由SS啟動的Job來處理這些數據。

然而,在默認的配置下,這種方法在失敗的情況下會丟失數據,為了保證零數據丟失,你可以在SS中使用WAL日誌,這是在Spark 1.2.0才引入的功能,這使得我們可以將接收到的數據保存到WAL中(WAL日誌可以存儲在HDFS上),所以在失敗的時候,我們可以從WAL中恢復,而不至於丟失數據。架構圖如下:

使用方式:

(1) 導入kafka的Spark Streaming整合包

(2) 創建DStream

需要注意的幾點:

1) kafka的topic和partition並不和SS生成的RDD的partition相對應,所以上面代碼中topicMap里增加threads只能增加使用一個receiver消費這個topic的線程數,它並不能增加Spark處理數據的並行數,因為每個input DStream在一個worker機器上只創建一個接受單個數據流的receiver。

2) 可以為不同topic和group創建多個DStream來使用多個receiver並行的接受數據。例如:一個單獨的kafka input DStream接受兩個topic的數據可以分為兩個kafka input DStream,每個只接受一個topic的數據,這樣可以並行的接受速度從而提高整體吞吐量。

3) 如果開啟了wal來保證數據不丟失話,需要設置checkpoint目錄,並且像上面代碼一樣指定數據序列化到hdfs上的方式(比如:StorageLevel.MEMORY_AND_DISK_SER)

4) 建議每個批處理時間間隔周期接受到的數據最好不要超過接受Executor的內存(Storage)的一半。

要描述清楚 Receiver-based Approach ,我們需要了解其接收流程,分析其內存使用,以及相關參數配置對內存的影響。

數據接收流程

當執行SS的start方法後,SS會標記StreamingContext為Active狀態,並且單獨起個線程通過ReceiverTracker將從ReceiverInputDStreams中獲取的receivers以並行集合的方式分發到worker節點,並運行他們。worker節點會啟動ReceiverSupervisor。接著按如下步驟處理:

1) ReceiverSupervisor會啟動對應的Receiver(這裡是KafkaReceiver)

2) KafkaReceiver 會根據配置啟動新的線程接受數據,在該線程中調用ReceiverSupervisor.pushSingle方法填充數據,注意,這裡是一條一條填充的。

3) ReceiverSupervisor 會調用 BlockGenerator.addData 進行數據填充。

到目前為止,整個過程不會有太多內存消耗,正常的一個線性調用。所有複雜的數據結構都隱含在BlockGenerator中。

BlockGenerator 存儲結構

BlockGenerator 會複雜些,重要的數據存儲結構有四個:

1) 維護了一個緩存 currentBuffer ,這是一個變長的數組的ArrayBuffer。currentBuffer並不會被複用,而是每個spark.streaming.blockInterval都會新建一個空的變長數據替換老的數據作為新的currentBuffer,然後把老的對象直接封裝成Block放入到blocksForPushing的隊列里,BlockGenerator會負責保證currentBuffer 只有一個。currentBuffer填充的速度是可以被限制的,以秒為單位,配置參數為spark.streaming.receiver.maxRate,是單個Receiver 每秒鐘允許添加的條數。這個是Spark內存控制的第一步,填充currentBuffer 是阻塞的,消費Kafka的線程直接做填充。

2) 維護了一個 blocksForPushing的阻塞隊列,size 默認為10個(1.6.3版本),可通過spark.streaming.blockQueueSize進行配置。該隊列主要用來實現生產-消費模式,每個元素其實是一個currentBuffer形成的block。

3) blockIntervalTimer 是一個定時器。其實是一個生產者,負責將當前currentBuffer的數據放到blocksForPushing 中,並新建一個currentBuffer。通過參數spark.streaming.blockInterval 設置,默認為200ms。放的方式很簡單,直接把currentBuffer做為Block的數據源。這就是為什麼currentBuffer不會被複用。

4) blockPushingThread 也是一個定時器,負責將Block從blocksForPushing取出來,然後交給BlockManagerBasedBlockHandler.storeBlock方法。10毫秒會取一次,不可配置。到這一步,才真的將數據放到了Spark的BlockManager中。

下面我們會詳細分析每一個存儲對象對內存的使用情況:

currentBuffer

首先自然要說下currentBuffer,它緩存的數據會被定時器每隔spark.streaming.blockInterval

(默認200ms)的時間拿走,這個緩存用的是Spark的運行時內存(我們使用的是靜態內存管理模式,默認應該是heap*0.2,如果是統一內存管理模式的話應該是heap*0.25),而不是storage內存。如果200ms期間你從Kafka接受的數據足夠大,則這部分內存很容易OOM或者進行大量的GC,導致receiver所在的Executor極容易掛掉或者處理速度也很慢。如果你在SparkUI發現Receiver掛掉了,考慮有沒有可能是這個問題。

blocksForPushing

blocksForPushing這個是作為currentBuffer 和BlockManager之間的中轉站。默認存儲的數據最大可以達到 10*currentBuffer 大小。一般不大可能有問題,除非你的spark.streaming

.blockInterval設置的比10ms 還小,官方推薦最小也要設置成 50ms,只要你不設置的過大,這塊不用太擔心。

blockPushingThread

blockPushingThread負責從 blocksForPushing 獲取數據,並且寫入BlockManager。blockPushingThread只寫他自己所在的Executor的 blockManager,也就是一個receiver每個批處理時間間隔周期的數據都會被一個Executor接收。這是導致內存被撐爆的最大風險,在數據量很大的情況下,會導致Receiver所在的Executor直接掛掉。 對應的解決方案在上面需要注意的建議4)有提到,也可以使用多個Receiver來消費同一個topic,降低每個receiver接收的數據量,使用類似下面的代碼

前面我們提到,SS的消費速度可以設置上限,其實SS也可以根據之前的周期處理情況來自動調整下一個周期處理的數據量。你可以通過將 spark.streaming.backpressure.enabled 設置為true打開該功能。演算法的論文可參考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing ,還是有用的,我現在也都開啟著。

另外,Spark里除了這個Dynamic,還有一個就是Dynamic Allocation,也就是Executor數量會根據資源使用情況,自動分配資源。具體見官網文檔。

Direct Approach (No Receivers)

和基於Receiver接收數據不一樣,這種方式定期地從Kafka的topic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每個批處理時間間隔裡面處理數據。當作業需要處理的數據來臨時,Spark通過調用Kafka的低級消費者API讀取一定範圍的數據。這個特性目前還處於試驗階段,而且僅僅在Scala和Java語言中提供相應的API。

和基於Receiver方式相比,這種方式主要有一些幾個優點:

(1)簡化並行。我們不需要創建多個Kafka輸入流,然後union他們。而使用DirectStream,SS將會創建和Kafka分區一樣的RDD分區個數,而且會從Kafka並行地讀取數據,也就是說Spark分區將會和Kafka分區有一一對應的關係,這對我們來說很容易理解和使用;

(2)高效。第一種實現零數據丟失是通過將數據預先保存在WAL中,這將會複製一遍數據,這種方式實際上很不高效,因為這導致了數據被拷貝兩次:一次是被Kafka複製;另一次是寫到WAL中。但是本方法因為沒有Receiver,從而消除了這個問題,所以不需要WAL日誌;

(3)恰好一次語義(Exactly-once semantics)。第一種實現中通過使用Kafka高層次的API把偏移量寫入Zookeeper中,這是讀取Kafka中數據的傳統方法。雖然這種方法可以保證零數據丟失,但是還是存在一些情況導致數據會丟失,因為在失敗情況下通過SS讀取偏移量和Zookeeper中存儲的偏移量可能不一致。而本文提到的方法是通過Kafka低層次的API,並沒有使用到Zookeeper,偏移量僅僅被SS保存在Checkpoint中。這就消除了SS和Zookeeper中偏移量的不一致,而且可以保證每個記錄僅僅被SS讀取一次,即使是出現故障。但是本方法唯一的壞處就是沒有更新Zookeeper中的偏移量,所以基於Zookeeper的Kafka監控工具將會無法顯示消費的狀況。但是你可以通過自己手動地將偏移量寫入到Zookeeper中。

架構圖如下:

使用方式:

個人認為,DirectApproach更符合Spark的思維。我們知道,RDD的概念是一個不變的,分區的數據集合。我們將kafka數據源包裹成了一個KafkaRDD,RDD里的partition 對應的數據源為kafka的partition。唯一的區別是數據在Kafka里而不是事先被放到Spark內存里。其實包括FileInputStream里也是把每個文件映射成一個RDD,比較好奇,為什麼一開始會有Receiver-based Approach,額外添加了Receiver這麼一個概念。

DirectKafkaInputDStream

SS通過Direct Approach接收數據的入口自然是KafkaUtils.createDirectStream 了。在調用該方法時,會先創建

protected val kc = new KafkaCluster(kafkaParams)

KafkaCluster

這個類是真實負責和Kafka 交互的類,該類會獲取Kafka的partition信息,接著會創建DirectKafkaInputDStream。此時會獲取每個Topic的每個partition的offset。 如果配置成smallest則拿到最早的offset,否則拿最近的offset。

每個DirectKafkaInputDStream 也會持有一個KafkaCluster實例。

到了計算周期後,對應的DirectKafkaInputDStream .compute方法會被調用,此時做下面幾個操作:

1) 獲取對應Kafka Partition的untilOffset。這樣就確定了需要獲取數據的offset的範圍,同時也就知道了需要計算多少數據了

2) 構建一個KafkaRDD實例。這裡我們可以看到,每個計算周期里,DirectKafkaInputDStream和 KafkaRDD 是一一對應的

3) 將相關的offset信息報給InputInfoTracker

4) 返回該RDD

KafkaRDD 的組成結構

KafkaRDD

包含 N(N=Kafka的partition數目)個 KafkaRDDPartition,每個KafkaRDDPartition 其實只是包含一些信息,譬如topic,offset等,真正如果想要拉數據,是通過KafkaRDDIterator 來完成,一個KafkaRDDIterator對應一個 KafkaRDDPartition。整個過程都是延時過程,也就是說數據其實都還在Kafka里,直到有實際的action被觸發,才會主動去kafka拉數據。

限速

Direct Approach ( NoReceivers ) 的接收方式也是可以限制接受數據的量的。你可以通過設置 spark.streaming.kafka.maxRatePerPartition來完成對應的配置。需要注意的是,這裡是對每個Partition進行限速。所以你需要事先知道Kafka有多少個分區,才好評估系統的實際吞吐量,從而設置該值。

相應的,spark.streaming.backpressure.enabled

參數在Direct Approach 中也是繼續有效的。

Receiver-based Approach VS Direct Approach (No Receivers)

經過上面對兩種數據接收方案的介紹,我們發現, Receiver-based Approach 存在各種內存摺騰,對應的Direct Approach (No Receivers)則顯得比較純粹簡單些,這也給其帶來了較多的優勢,主要有如下幾點:

1) 因為按需要拉數據,所以不存在緩衝區,就不用擔心緩衝區把內存撐爆了。這個在Receiver-based Approach就比較麻煩,你需要通過spark.streaming.blockInterval等參數來調整。

2) 數據默認就被分布到了多個Executor上。Receiver-based Approach 你需要做特定的處理,才能讓Receiver分不到多個Executor上。

3) Receiver-based Approach 的方式,一旦你的Batch Processing 被delay了,或者被delay了很多個batch,那估計你的Spark Streaming程序離崩潰也就不遠了。 Direct Approach (No Receivers) 則完全不會存在類似問題。就算你delay了很多個batch time,你內存中的數據只有這次處理的。

4) Direct Approach (No Receivers) 直接維護了 Kafka offset,可以保證數據只有被執行成功了,才會被記錄下來,通過checkpoint機制。如果採用Receiver-based Approach,消費Kafka和數據處理是被分開的,這樣就很不好做容錯機制,比如系統宕掉了。所以你需要開啟WAL,但是開啟WAL帶來一個問題是,數據量很大,對HDFS是個很大的負擔,而且也會給實時程序帶來比較大延遲。

我原先以為Direct Approach 因為只有在計算的時候才拉取數據,可能會比Receiver-based Approach 的方式慢,但是經過我自己的實際測試,總體性能 Direct Approach會更快些,因為Receiver-based Approach可能會有較大的內存隱患,GC也會影響整體處理速度。

如何保證數據接收的可靠性

SS 自身可以做到 at least once 語義,具體方式是通過CheckPoint機制。

CheckPoint 機制

CheckPoint 會涉及到一些類,以及他們之間的關係:

DStreamGraph類負責生成任務執行圖,而JobGenerator則是任務真實的提交者。任務的數據源則來源於DirectKafkaInputDStream,checkPoint 一些相關信息則是由類DirectKafkaInputDStreamCheckpointData負責。好像涉及的類有點多,其實沒關係,我們完全可以不用關心他們。先看看checkpoint都幹了些啥,checkpoint 其實就序列化了一個類而已:org.apache.spark.streaming.Checkpoint

以下是其中的類成員:

其他的都比較容易理解,最重要的是 graph,該類全路徑名是: org.apache.spark.streaming.DStreamGraph

裡面有兩個核心的數據結構是:

private val inputStreams = new ArrayBuffer[InputDStream[_]]()

private val outputStreams = new ArrayBuffer[DStream[_]]()

inputStreams 對應的就是DirectKafkaInputDStream 了。

再進一步,DirectKafkaInputDStream 有一個重要的對象

protected[streaming] override val checkpointData =

new DirectKafkaInputDStreamCheckpointData

checkpointData 里則有一個data 對象,裡面存儲的內容也很簡單data.asInstanceOf[mutable.HashMap[Time,Array[OffsetRange.OffsetRangeTuple]]]

就是每個batch 的唯一標識time 對象,以及每個KafkaRDD對應的的Kafka偏移信息。

而 outputStreams 里則是RDD,如果你存儲的時候做了foreach操作,那麼應該就是forEachRDD了,他被序列化的時候是不包含數據的。

經過上面的分析,我們發現:

1) checkpoint 是非常高效的。沒有涉及到實際數據的存儲。一般大小只有幾十K,因為只存了Kafka的偏移量等信息。

2) checkpoint 採用的是序列化機制,尤其是DStreamGraph的引入,裡面包含了可能如ForeachRDD等,而ForeachRDD裡面的函數應該也會被序列化。如果採用了CheckPoint機制,而你的程序包做了做了變更,恢復後可能會有一定的問題(這個在測試過程中碰到過)。接著我們看看JobGenerator是怎麼提交一個真實的batch任務的,分析在什麼時間做checkpoint 操作,從而保證數據的高可用:

1) 產生jobs

2) 成功則提交jobs 然後非同步執行

3) 失敗則會發出一個失敗的事件

4) 無論成功或者失敗,都會發出一個 DoCheckpoint 事件。

5) 當任務運行完成後,還會再調用一次DoCheckpoint 事件。

只要任務運行完成後沒能順利執行完DoCheckpoint前crash,都會導致這次Batch被重新調度。也就說無論怎樣,不存在丟數據的問題,而這種穩定性是靠checkpoint機制以及Kafka的可回溯性來完成的。

那現在會產生一個問題,假設我們的業務邏輯會對每一條數據都處理,則

1) 我們沒有處理一條數據

2) 我們可能只處理了部分數據

3) 我們處理了全部數據

根據我們上面的分析,無論如何,這次失敗了,都會被重新調度,那麼我們可能會重複處理數據。有可能事最後失敗的那一批次數據的一部分,也可能是全部,但不會更多了。

業務需要做事務,保證 Exactly Once 語義

這裡業務場景被區分為兩個:

1) 冪等操作

2) 業務代碼需要自身添加事物操作

所謂冪等操作就是重複執行不會產生問題,如果是這種場景下,你不需要額外做任何工作。但如果你的應用場景是不允許數據被重複執行的,那隻能通過業務自身的邏輯代碼來解決了。

這個SS 倒是也給出了官方方案:

這代碼什麼含義呢?就是說針對每個partition的數據,產生一個uniqueId,只有這個partion的所有數據被完全消費,則算成功,否則算失敗,要回滾。下次重複執行這個uniqueId

時,如果已經被執行成功過的,則skip掉。這樣,就能保證數據Exactly Once 語義了。

總結

根據我的實際經驗,目前Direct Approach 穩定性個人感覺比 Receiver-based Approach 更好些,推薦使用 Direct Approach 方式和Kafka進行集成,並且開啟相應的checkpoint 功能,保證數據接收的穩定性,Direct Approach 模式本身可以保證數據 at least once語義,如果你需要Exactly Once 語義時,需要保證你的業務是冪等,或者保證了相應的事務。

作者介紹

張偉,TalkingData數據工程師,5年軟體開發經驗,3年大數據相關工作經驗,擅長離線計算、批計算、nosql資料庫等。


-全文完-

關注人工智慧的落地實踐,與企業一起探尋 AI 的邊界,AICon 全球人工智慧技術大會火熱售票中,8 折倒計時一周搶票,詳情點擊:

t.cn/Rl2MftP

《深入淺出TensorFlow》迷你書現已發布,關注公眾號「AI前線」,ID:ai-front,回復關鍵字:TF,獲取下載鏈接!


推薦閱讀:

在Spark集群中,集群的節點個數、RDD分區個數、?cpu內核個數三者與並行度的關係??
Scala 究竟好在那裡?
有什麼關於 Spark 的書推薦?
數據科學家必知必會的7款Python工具,你會幾個?
關於Spark有哪些大牛們的博客?

TAG:Spark | 大数据 |