標籤:

Spark Streaming場景應用-Kafka數據讀取方式

Spark Streaming 支持多種實時輸入源數據的讀取,其中包括Kafka、flume、socket流等等。除了Kafka以外的實時輸入源,由於我們的業務場景沒有涉及,在此將不會討論。本篇文章主要著眼於我們目前的業務場景,只關注Spark Streaming讀取Kafka數據的方式。 Spark Streaming 官方提供了兩種方式讀取Kafka數據:

一是Receiver-based Approach。該種讀取模式官方最先支持,並在Spark 1.2提供了數據零丟失(zero-data loss)的支持;

一是Direct Approach (No Receivers)。該種讀取方式在Spark 1.3引入。

此兩種讀取方式存在很大的不同,當然也各有優劣。接下來就讓我們具體剖解這兩種數據讀取方式。

一、Receiver-based Approach

如前文所述,Spark官方最先提供了基於Receiver的Kafka數據消費模式。但會存在程序失敗丟失數據的可能,後在Spark 1.2時引入一個配置參數spark.streaming.receiver.writeAheadLog.enable以規避此風險。以下是官方的原話:

under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure.

Receiver-based 讀取方式

Receiver-based的Kafka讀取方式是基於Kafka高階(high-level) api來實現對Kafka數據的消費。在提交Spark Streaming任務後,Spark集群會划出指定的Receivers來專門、持續不斷、非同步讀取Kafka的數據,讀取時間間隔以及每次讀取offsets範圍可以由參數來配置。讀取的數據保存在Receiver中,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等。當driver 觸發batch任務的時候,Receivers中的數據會轉移到剩餘的Executors中去執行。在執行完之後,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式,可以設置spark.streaming.receiver.writeAheadLog.enable為true。具體Receiver執行流程見下圖:

Receiver-based 讀取實現

Kafka的high-level數據讀取方式讓用戶可以專註於所讀數據,而不用關注或維護consumer的offsets,這減少用戶的工作量以及代碼量而且相對比較簡單。因此,在剛開始引入Spark Streaming計算引擎時,我們優先考慮採用此種方式來讀取數據,具體的代碼如下:

/*讀取kafka數據函數*/ def getKafkaInputStream(zookeeper: String, topic: String, groupId: String, numRecivers: Int, partition: Int, ssc: StreamingContext): DStream[String] = { val kafkaParams = Map( ("zookeeper.connect", zookeeper), ("auto.offset.reset", "largest"), ("zookeeper.connection.timeout.ms", "30000"), ("fetch.message.max.bytes", (1024 * 1024 * 50).toString), ("group.id", groupId) ) val topics = Map(topic -> partition / numRecivers) val kafkaDstreams = (1 to numRecivers).map { _ => KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) } ssc.union(kafkaDstreams) }

如上述代碼,函數getKafkaInputStream提供了zookeeper, topic, groupId, numReceivers, partition以及ssc,其傳入函數分別對應:

zookeeper: ZooKeeper連接信息

topic: Kafka中輸入的topic信息

groupId: consumer信息

numReceivers: 打算開啟的receiver個數, 並用來調整並發

partition: Kafka中對應topic的分區數

以上幾個參數主要用來連接Kafka並讀取Kafka數據。具體執行的步驟如下:

1、Kafka相關讀取參數配置,其中 zookeeper.connect即傳入進來的zookeeper參數;auto.offset.reset設置從topic的最新處開始讀取數據;zookeeper.connection.timeout.ms指zookeepr連接超時時間,以防止網路不穩定的情況;fetch.message.max.bytes則是指單次讀取數據的大小;group.id則是指定consumer。

2、指定topic的並發數,當指定receivers個數之後,但是由於receivers個數小於topic的partition個數,所以在每個receiver上面會起相應的線程來讀取不同的partition。

3、讀取Kafka數據,numReceivers的參數在此用於指定我們需要多少Executor來作為Receivers,開多個Receivers是為了提高應用吞吐量。

4、union用於將多個Receiver讀取的數據關聯起來

Receiver-based 讀取問題

採用Reveiver-based方式滿足我們的一些場景需求,並基於此抽象出了一些micro-batch、內存計算模型等。在具體的應用場景中,我們也對此種的方式做了一些優化:

1.防數據丟失。做checkpoint操作以及配置spark.streaming.receiver.writeAheadLog.enable參數;

2.提高receiver數據吞吐量。採用MEMORY_AND_DISK_SER方式讀取數據、提高單Receiver的內存或是調大並行度,將數據分散到多個Receiver中去。

以上處理方式在一定程度上滿足了我們的應用場景,諸如micro-batch以及內存計算模型等。但是同時因為這兩方面以及其他方面的一些因素,導致也會出現各種情況的問題:

配置spark.streaming.receiver.writeAheadLog.enable參數,每次處理之前需要將該batch內的日誌備份到checkpoint目錄中,這降低了數據處理效率,反過來又加重了Receiver端的壓力;另外由於數據備份機制,會受到負載影響,負載一高就會出現延遲的風險,導致應用崩潰。

採用MEMORY_AND_DISK_SER降低對內存的要求。但是在一定程度上影響計算的速度

單Receiver內存。由於receiver也是屬於Executor的一部分,那麼為了提高吞吐量,提高Receiver的內存。但是在每次batch計算中,參與計算的batch並不會使用到這麼多的內存,導致資源嚴重浪費。

提高並行度,採用多個Receiver來保存Kafka的數據。Receiver讀取數據是非同步的,並不參與計算。如果開較高的並行度來平衡吞吐量很不划算。

Receiver和計算的Executor的非同步的,那麼遇到網路等因素原因,導致計算出現延遲,計算隊列一直在增加,而Receiver則在一直接收數據,這非常容易導致程序崩潰。

在程序失敗恢復時,有可能出現數據部分落地,但是程序失敗,未更新offsets的情況,這導致數據重複消費。

為了回辟以上問題,降低資源使用,我們後來採用Direct Approach來讀取Kafka的數據,具體接下來細說。

二、Direct Approach (No Receivers)

區別於Receiver-based的數據消費方法,Spark 官方在Spark 1.3時引入了Direct方式的Kafka數據消費方式。相對於Receiver-based的方法,Direct方式具有以下方面的優勢:

簡化並行(Simplified Parallelism)。不現需要創建以及union多輸入源,Kafka topic的partition與RDD的partition一一對應,官方描述如下:

No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

高效(Efficiency)。Receiver-based保證數據零丟失(zero-data loss)需要配置spark.streaming.receiver.writeAheadLog.enable,此種方式需要保存兩份數據,浪費存儲空間也影響效率。而Direct方式則不存在這個問題。

Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.

強一致語義(Exactly-once semantics)。High-level數據由Spark Streaming消費,但是Offsets則是由Zookeeper保存。通過參數配置,可以實現at-least once消費,此種情況有重複消費數據的可能。

The first approach uses Kafka』s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

Direct 讀取方式

Direct方式採用Kafka簡單的consumer api方式來讀取數據,無需經由ZooKeeper,此種方式不再需要專門Receiver來持續不斷讀取數據。當batch任務觸發時,由Executor讀取數據,並參與到其他Executor的數據計算過程中去。driver來決定讀取多少offsets,並將offsets交由checkpoints來維護。將觸發下次batch任務,再由Executor讀取Kafka數據並計算。從此過程我們可以發現Direct方式無需Receiver讀取數據,而是需要計算時再讀取數據,所以Direct方式的數據消費對內存的要求不高,只需要考慮批量計算所需要的內存即可;另外batch任務堆積時,也不會影響數據堆積。其具體讀取方式如下圖:

Direct 讀取實現

Spark Streaming提供了一些重載讀取Kafka數據的方法,本文中關注兩個基於Scala的方法,這在我們的應用場景中會用到,具體的方法代碼如下:

方法createDirectStream中,ssc是StreamingContext;kafkaParams的具體配置見Receiver-based之中的配置,與之一樣;這裡面需要指出的是fromOffsets ,其用來指定從什麼offset處開始讀取數據。

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) }

方法createDirectStream中,該方法只需要3個參數,其中kafkaParams還是一樣,並未有什麼變化,不過其中有個配置auto.offset.reset可以用來指定是從largest或者是smallest處開始讀取數據;topic是指Kafka中的topic,可以指定多個。具體提供的方法代碼如下:

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ): InputDStream[(K, V)] = { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics) new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler) }

在實際的應用場景中,我們會將兩種方法結合起來使用,大體的方向分為兩個方面:

應用啟動。當程序開發並上線,還未消費Kafka數據,此時從largest處讀取數據,採用第二種方法;

應用重啟。因資源、網路等其他原因導致程序失敗重啟時,需要保證從上次的offsets處開始讀取數據,此時就需要採用第一種方法來保證我們的場景。

總體方向上,我們採用以上方法滿足我們的需要,當然具體的策略我們不在本篇中討論,後續會有專門的文章來介紹。從largest或者是smallest處讀Kafka數據代碼實現如下:

/** * 讀取kafka數據,從最新的offset開始讀 * * @param ssc : StreamingContext * @param kafkaParams : kafka參數 * @param topics : kafka topic * @return : 返迴流數據 */private def getDirectStream(ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): DStream[String] = { val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics ) kafkaDStreams.map(_._2)}

程序失敗重啟的邏輯代碼如下:

/** * 如果已有offset,則從offset開始讀數據 * * @param ssc : StreamingContext * @param kafkaParams : kafkaParams配置參數 * @param fromOffsets : 已有的offsets * @return : 返迴流數據 */private def getDirectStreamWithOffsets(ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = { val kfkData = try { KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => mmd.message() ) } catch { //offsets失效, 從最新的offsets讀。 case _: Exception => val topics = fromOffsets.map { case (tap, _) => tap.topic }.toSet getDirectStream(ssc, kafkaParams, topics) } kfkData}

代碼中的fromOffsets參數從外部存儲獲取並需要處理轉換,其代碼如下:

val fromOffsets = offsets.map { consumerInfo => TopicAndPartition(consumerInfo.topic, consumerInfo.part) -> consumerInfo.until_offset}.toMap

該方法提供了從指定offsets處讀取Kafka數據。如果發現讀取數據異常,我們認為是offsets失敗,此種情況去捕獲這個異常,然後從largest處讀取Kafka數據。

Direct 讀取問題

在實際的應用中,Direct Approach方式很好地滿足了我們的需要,與Receiver-based方式相比,有以下幾方面的優勢:

1.降低資源。Direct不需要Receivers,其申請的Executors全部參與到計算任務中;而Receiver-based則需要專門的Receivers來讀取Kafka數據且不參與計算。因此相同的資源申請,Direct 能夠支持更大的業務。

2.降低內存。Receiver-based的Receiver與其他Exectuor是非同步的,並持續不斷接收數據,對於小業務量的場景還好,如果遇到大業務量時,需要提高Receiver的內存,但是參與計算的Executor並無需那麼多的內存。而Direct 因為沒有Receiver,而是在計算時讀取數據,然後直接計算,所以對內存的要求很低。實際應用中我們可以把原先的10G降至現在的2-4G左右。

3.魯棒性更好。Receiver-based方法需要Receivers來非同步持續不斷的讀取數據,因此遇到網路、存儲負載等因素,導致實時任務出現堆積,但Receivers卻還在持續讀取數據,此種情況很容易導致計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,才會讀取數據並計算。隊列出現堆積並不會引起程序的失敗。

至於其他方面的優勢,比如 簡化並行(Simplified Parallelism)、高效(Efficiency)以及強一致語義(Exactly-once semantics)在之前已列出,在此不再介紹。雖然Direct 有以上這些優勢,但是也存在一些不足,具體如下:

1.提高成本。Direct需要用戶採用checkpoint或者第三方存儲來維護offsets,而不像Receiver-based那樣,通過ZooKeeper來維護Offsets,此提高了用戶的開發成本。

2.監控可視化。Receiver-based方式指定topic指定consumer的消費情況均能通過ZooKeeper來監控,而Direct則沒有這種便利,如果做到監控並可視化,則需要投入人力開發。

總結

本文介紹了基於Spark Streaming的Kafka數據讀取方式,包括Receiver-based以及Direct兩種方式。兩種方式各有優劣,但相對來說Direct 適用於更多的業務場景以及有更好的可護展性。至於如何選擇以上兩種方式,除了業務場景外也跟團隊相關,如果是應用初期,為了快速迭代應用,可以考慮採用第一種方式;如果要深入使用的話則建議採用第二種方式。本文只介紹了兩種讀取方式,並未涉及到讀取策略、優化等問題。這些會在後續的文章中詳細介紹。

文章來源《Spark Streaming場景應用-Kafka數據讀取方式》

推薦閱讀:

Dubbo 新編程模型之外部化配置
分散式服務治理的設計問題?
北京沙龍報名 | 關於Dubbo開源的那些事兒
有沒有人能對twitter的finagle和國內的dubbo做個對比?

TAG:dubbo | Spark | Kafka |