分散式計算框架 Spark Streaming原理介紹
1、Spark Streaming簡介
1.1 概述
Spark Streaming 是Spark核心API的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數據源獲取數據之後,可以使用諸如map、reduce、join和window等高級函數進行複雜演算法的處理。最後還可以將處理結果存儲到文件系統,資料庫和現場儀錶盤。在「One Stack rule them all」的基礎上,還可以使用Spark的其他子框架,如集群學習、圖計算等,對流數據進行處理。
Spark Streaming處理的數據流圖:
Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據一定的時間間隔拆分成一批批的數據,然後通過Spark Engine處理這些批數據,最終得到處理後的一批批結果數據。
對應的批數據,在Spark內核對應一個RDD實例,因此,對應流數據的DStream可以看成是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批後,通過一個先進先出的隊列,然後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,然後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。
1.2 術語定義
l離散流(discretized stream)或DStream:這是Spark Streaming對內部持續的實時數據流的抽象描述,即我們處理的一個實時數據流,在Spark Streaming中對應於一個DStream 實例。
l批數據(batch data):這是化整為零的第一步,將實時流數據以時間片為單位進行分批,將流處理轉化為時間片數據的批處理。隨著持續時間的推移,這些處理結果就形成了對應的結果數據流了。
l時間片或批處理時間間隔( batch interval):這是人為地對流數據進行定量的標準,以時間片作為我們拆分流數據的依據。一個時間片的數據對應一個RDD實例。
l窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,
l滑動時間間隔:前一個窗口到後一個窗口所經過的時間長度。必須是批處理時間間隔的倍數
lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming連接到一個外部數據源來讀取數據。
1.3 Storm與Spark Streming比較
l處理模型以及延遲
雖然兩框架都提供了可擴展性(scalability)和可容錯性(fault tolerance),但是它們的處理模型從根本上說是不一樣的。Storm可以實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming可以在一個短暫的時間窗口裡面處理多條(batches)Event。所以說Storm可以實現亞秒級時延的處理,而Spark Streaming則有一定的時延。
l容錯和數據保證
然而兩者的代價都是容錯時候的數據保證,Spark Streaming的容錯為有狀態的計算提供了更好的支持。在Storm中,每條記錄在系統的移動過程中都需要被標記跟蹤,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯誤狀態恢復時被處理多次。這就意味著可變更的狀態可能被更新兩次從而導致結果不正確。
任一方面,Spark Streaming僅僅需要在批處理級別對記錄進行追蹤,所以他能保證每個批處理記錄僅僅被處理一次,即使是node節點掛掉。雖然說Storm的 Trident library可以保證一條記錄被處理一次,但是它依賴於事務更新狀態,而這個過程是很慢的,並且需要由用戶去實現。
l實現和編程API
Storm主要是由Clojure語言實現,Spark Streaming是由Scala實現。如果你想看看這兩個框架是如何實現的或者你想自定義一些東西你就得記住這一點。Storm是由BackType和 Twitter開發,而Spark Streaming是在UC Berkeley開發的。
Storm提供了Java API,同時也支持其他語言的API。 Spark Streaming支持Scala和Java語言(其實也支持Python)。
l批處理框架集成
Spark Streaming的一個很棒的特性就是它是在Spark框架上運行的。這樣你就可以想使用其他批處理代碼一樣來寫Spark Streaming程序,或者是在Spark中交互查詢。這就減少了單獨編寫流批量處理程序和歷史數據處理程序。
l生產支持
Storm已經出現好多年了,而且自從2011年開始就在Twitter內部生產環境中使用,還有其他一些公司。而Spark Streaming是一個新的項目,並且在2013年僅僅被Sharethrough使用(據作者了解)。
Storm是 Hortonworks Hadoop數據平台中流處理的解決方案,而Spark Streaming出現在 MapR的分散式平台和Cloudera的企業數據平台中。除此之外,Databricks是為Spark提供技術支持的公司,包括了Spark Streaming。
雖然說兩者都可以在各自的集群框架中運行,但是Storm可以在Mesos上運行, 而Spark Streaming可以在YARN和Mesos上運行。
2、運行原理
2.1 Streaming架構
SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等複雜操作,並將結果保存到外部文件系統、資料庫或應用到實時儀錶盤。
l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。
圖Spark Streaming構架
l容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的數據集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。
對於Spark Streaming來說,其RDD的傳承關係如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連接的,由於Spark Streaming輸入數據可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的數據流(Spark Streaming會將網路輸入數據的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。
Spark Streaming中RDD的lineage關係圖
l實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會經過Spark DAG圖分解以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式准實時計算場景。
l擴展性與吞吐量:Spark目前在EC2上已能夠線性擴展到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。
Spark Streaming與Storm吞吐量比較圖
2.2 編程模型
DStream(Discretized Stream)作為Spark Streaming的基礎抽象,它代表持續性的數據流。這些數據流既可以通過外部輸入源賴獲取,也可以通過現有的Dstream的transformation操作來獲得。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每個RDD都包含了自己特定時間間隔內的數據流。如圖7-3所示。
圖7-3 DStream中在時間軸下生成離散的RDD序列
對DStream中數據的各種操作也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操作可以通過RDD的transformation生成新的DStream。這裡的執行引擎是Spark。
2.2.1 如何使用Spark Streaming
作為構建於Spark之上的應用框架,Spark Streaming承襲了Spark的編程風格,對於已經了解Spark的用戶來說能夠快速地上手。接下來以Spark Streaming官方提供的WordCount代碼為例來介紹Spark Streaming的使用方式。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
1.創建StreamingContext對象 同Spark初始化需要創建SparkContext對象一樣,使用Spark Streaming就需要創建StreamingContext對象。創建StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。需要注意的是參數Seconds(1),Spark Streaming需要指定處理數據的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s為時間窗口進行數據處理。此參數需要根據用戶的需求和集群的處理能力進行適當的設置;
2.創建InputDStream如同Storm的Spout,Spark Streaming需要指明數據源。如上例所示的socketTextStream,Spark Streaming以socket連接作為數據源讀取數據。當然Spark Streaming支持多種不同的數據源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等數據源;
3.操作DStream對於從數據源得到的DStream,用戶可以在其基礎上進行各種操作,如上例所示的操作就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源得到的數據首先進行分割,然後利用Map和ReduceByKey方法進行計算,當然最後還有使用print()方法輸出結果;
4.啟動Spark Streaming之前所作的所有步驟只是創建了執行流程,程序沒有真正連接上數據源,也沒有對數據進行任何操作,只是設定好了所有的執行計劃,當ssc.start()啟動後程序才真正進行所有預期的操作。
至此對於Spark Streaming的如何使用有了一個大概的印象,在後面的章節我們會通過源代碼深入探究一下Spark Streaming的執行流程。
2.2.2 DStream的輸入源
在Spark Streaming中所有的操作都是基於流的,而輸入源是這一系列操作的起點。輸入 DStreams 和 DStreams 接收的流都代表輸入數據流的來源,在Spark Streaming 提供兩種內置數據流來源:
l 基礎來源 在 StreamingContext API 中直接可用的來源。例如:文件系統、Socket(套接字)連接和 Akka actors;
l 高級來源 如 Kafka、Flume、Kinesis、Twitter 等,可以通過額外的實用工具類創建。
2.2.2.1 基礎來源
在前面分析怎樣使用Spark Streaming的例子中我們已看到ssc.socketTextStream()方法,可以通過 TCP 套接字連接,從從文本數據中創建了一個 DStream。除了套接字,StreamingContext 的API還提供了方法從文件和 Akka actors 中創建 DStreams作為輸入源。
Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法可以從任何文件系統(如:HDFS、S3、NFS 等)的文件中讀取數據,然後創建一個DStream。Spark Streaming 監控 dataDirectory 目錄和在該目錄下任何文件被創建處理(不支持在嵌套目錄下寫文件)。需要注意的是:讀取的必須是具有相同的數據格式的文件;創建的文件必須在 dataDirectory 目錄下,並通過自動移動或重命名成數據目錄;文件一旦移動就不能被改變,如果文件被不斷追加,新的數據將不會被閱讀。對於簡單的文本文,可以使用一個簡單的方法streamingContext.textFileStream(dataDirectory)來讀取數據。
Spark Streaming也可以基於自定義 Actors 的流創建DStream ,通過 Akka actors 接受數據流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法可以創建基於 RDD 隊列的DStream,每個RDD 隊列將被視為 DStream 中一塊數據流進行加工處理。
2.2.2.2 高級來源
這一類的來源需要外部 non-Spark 庫的介面,其中一些有複雜的依賴關係(如 Kafka、Flume)。因此通過這些來源創建 DStreams 需要明確其依賴。例如,如果想創建一個使用 Twitter tweets 的數據的DStream 流,必須按以下步驟來做:
1)在 SBT 或 Maven工程里添加 spark-streaming-twitter_2.10 依賴。
2)開發:導入 TwitterUtils 包,通過 TwitterUtils.createStream 方法創建一個DStream。
3)部署:添加所有依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),然後部署應用程序。
需要注意的是,這些高級的來源一般在Spark Shell中不可用,因此基於這些高級來源的應用不能在Spark Shell中進行測試。如果你必須在Spark shell中使用它們,你需要下載相應的Maven工程的Jar依賴並添加到類路徑中。
其中一些高級來源如下:
lTwitter Spark Streaming的TwitterUtils工具類使用Twitter4j,Twitter4J 庫支持通過任何方法提供身份驗證信息,你可以得到公眾的流,或得到基於關鍵詞過濾流。
lFlume Spark Streaming可以從Flume中接受數據。
lKafka Spark Streaming可以從Kafka中接受數據。
lKinesis Spark Streaming可以從Kinesis中接受數據。
需要重申的一點是在開始編寫自己的 SparkStreaming 程序之前,一定要將高級來源依賴的Jar添加到SBT 或 Maven 項目相應的artifact中。常見的輸入源和其對應的Jar包如下圖所示。
另外,輸入DStream也可以創建自定義的數據源,需要做的就是實現一個用戶定義的接收器。
2.2.3 DStream的操作
與RDD類似,DStream也提供了自己的一系列操作方法,這些操作可以分成三類:普通的轉換操作、窗口轉換操作和輸出操作。
2.2.3.1 普通的轉換操作
普通的轉換操作如下表所示:
轉換
描述
map(func)
源 DStream的每個元素通過函數func返回一個新的DStream。
flatMap(func)
類似與map操作,不同的是每個輸入元素可以被映射出0或者更多的輸出元素。
filter(func)
在源DSTREAM上選擇Func函數返回僅為true的元素,最終返回一個新的DSTREAM 。
repartition(numPartitions)
通過輸入的參數numPartitions的值來改變DStream的分區大小。
union(otherStream)
返回一個包含源DStream與其他 DStream的元素合併後的新DSTREAM。
count()
對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。
reduce(func)
使用函數func(有兩個參數並返回一個結果)將源DStream 中每個RDD的元素進行聚 合操作,返回一個內部所包含的RDD只有一個元素的新DStream。
countByValue()
計算DStream中每個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。
reduceByKey(func, [numTasks])
當一個類型為(K,V)鍵值對的DStream被調用的時候,返回類型為類型為(K,V)鍵值對的新 DStream,其中每個鍵的值V都是使用聚合函數func匯總。注意:默認情況下,使用 Spark的默認並行度提交任務(本地模式下並行度為2,集群模式下位8),可以通過配置numTasks設置不同的並行任務數。
join(otherStream, [numTasks])
當被調用類型分別為(K,V)和(K,W)鍵值對的2個DStream時,返回類型為(K,(V,W))鍵值對的一個新 DSTREAM。
cogroup(otherStream, [numTasks])
當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。
transform(func)
通過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這可以用來在DStream做任意RDD操作。
updateStateByKey(func)
返回一個新狀態的DStream,其中每個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func後的更新。這個方法可以被用來維持每個鍵的任何狀態數據。
在上面列出的這些操作中,transform()方法和updateStateByKey()方法值得我們深入的探討一下:
l transform(func)操作
該transform操作(轉換操作)連同其其類似的 transformWith操作允許DStream 上應用任意RDD-to-RDD函數。它可以被應用於未在DStream API 中暴露任何的RDD操作。例如,在每批次的數據流與另一數據集的連接功能不直接暴露在DStream API 中,但可以輕鬆地使用transform操作來做到這一點,這使得DStream的功能非常強大。例如,你可以通過連接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),然後基於此做實時數據清理的篩選,如下面官方提供的偽代碼所示。事實上,也可以在transform方法中使用機器學習和圖形計算的演算法。
l updateStateByKey操作
該 updateStateByKey 操作可以讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :
(1) 定義狀態 - 狀態可以是任意的數據類型。
(2) 定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。
讓我們用一個例子來說明,假設你要進行文本數據流中單詞計數。在這裡,正在運行的計數是狀態而且它是一個整數。我們定義了更新功能如下:
此函數應用於含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對裡面的每個元素(如wordCount中的word)調用一下更新函數,newValues是最新的值,runningCount是之前的值。
2.2.3.2 窗口轉換操作
Spark Streaming 還提供了窗口的計算,它允許你通過滑動窗口對數據進行轉換,窗口轉換操作如下:
轉換
描述
window(windowLength, slideInterval)
返回一個基於源DStream的窗口批次計算後得到新的DStream。
countByWindow(windowLength,slideInterval)
返回基於滑動窗口的DStream中的元素的數量。
reduceByWindow(func, windowLength,slideInterval)
基於滑動窗口對源DStream中的元素進行聚合操作,得到一個新的DStream。
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])
基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操作,得到一個新的DStream。
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks])
一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合併移去最早的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法可以復用中間三秒的統計量,提高統計的效率。
countByValueAndWindow(windowLength,slideInterval, [numTasks])
基於滑動窗口計算源DStream中每個RDD內每個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue一樣,reduce任務的數量可以通過一個可選參數進行配置。
批處理間隔示意圖
在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的,因此在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據匯總起來成為一批數據交給系統去處理。
對於窗口操作而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操作中,只有窗口的長度滿足了才會觸發批數據的處理。除了窗口的長度,窗口操作還有另一個重要的參數就是滑動間隔(slide duration),它指的是經過多長時間窗口滑動一次形成新的窗口,滑動窗口默認情況下和批次間隔的相同,而窗口間隔一般設置的要比它們兩個大。在這裡必須注意的一點是滑動間隔和窗口間隔的大小一定得設置為批處理間隔的整數倍。
如批處理間隔示意圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口間隔滿足了才觸發數據的處理。這裡需要注意的一點是,初始的窗口有可能流入的數據沒有撐滿,但是隨著時間的推進,窗口最終會被撐滿。當每個2個時間單位,窗口滑動一次後,會有新的數據流入窗口,這時窗口會移去最早的兩個時間單位的數據,而與最新的兩個時間單位的數據進行匯總形成新的窗口(time3-time5)。
對於窗口操作,批處理間隔、窗口間隔和滑動間隔是非常重要的三個時間概念,是理解窗口操作的關鍵所在。
2.2.3.3 輸出操作
Spark Streaming允許DStream的數據被輸出到外部系統,如資料庫或文件系統。由於輸出操作實際上使transformation操作後的數據可以通過外部系統被使用,同時輸出操作觸發所有DStream的transformation操作的實際執行(類似於RDD操作)。以下表列出了目前主要的輸出操作:
轉換
描述
print()
在Driver中列印出DStream中數據的前10個元素。
saveAsTextFiles(prefix, [suffix])
將DStream中的內容以文本的形式保存為文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix, [suffix])
將DStream中的內容按對象序列化並且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(prefix, [suffix])
將DStream中的內容以文本的形式保存為Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)
最基本的輸出操作,將func函數應用於DStream中的RDD上,這個操作會輸出數據到外部系統,比如保存RDD到文件或者網路資料庫等。需要注意的是func函數是在運行該streaming應用的Driver進程里執行的。
dstream.foreachRDD是一個非常強大的輸出操作,它允將許數據輸出到外部系統。但是 ,如何正確高效地使用這個操作是很重要的,下面展示了如何去避免一些常見的錯誤。
通常將數據寫入到外部系統需要創建一個連接對象(如 TCP連接到遠程伺服器),並用它來發送數據到遠程系統。出於這個目的,開發者可能在不經意間在Spark driver端創建了連接對象,並嘗試使用它保存RDD中的記錄到Spark worker上,如下面代碼:
這是不正確的,這需要連接對象進行序列化並從Driver端發送到Worker上。連接對象很少在不同機器間進行這種操作,此錯誤可能表現為序列化錯誤(連接對不可序列化),初始化錯誤(連接對象在需要在Worker 上進行需要初始化) 等等,正確的解決辦法是在 worker上創建的連接對象。
通常情況下,創建一個連接對象有時間和資源開銷。因此,創建和銷毀的每條記錄的連接對象可能招致不必要的資源開銷,並顯著降低系統整體的吞吐量 。一個更好的解決方案是使用rdd.foreachPartition方法創建一個單獨的連接對象,然後使用該連接對象輸出的所有RDD分區中的數據到外部系統。
這緩解了創建多條記錄連接的開銷。最後,還可以進一步通過在多個RDDs/ batches上重用連接對象進行優化。一個保持連接對象的靜態池可以重用在多個批處理的RDD上將其輸出到外部系統,從而進一步降低了開銷。
需要注意的是,在靜態池中的連接應該按需延遲創建,這樣可以更有效地把數據發送到外部系統。另外需要要注意的是:DStreams延遲執行的,就像RDD的操作是由actions觸發一樣。默認情況下,輸出操作會按照它們在Streaming應用程序中定義的順序一個個執行。
2.3 容錯、持久化和性能調優
2.3.1 容錯
DStream基於RDD組成,RDD的容錯性依舊有效,我們首先回憶一下SparkRDD的基本特性。
lRDD是一個不可變的、確定性的可重複計算的分散式數據集。RDD的某些partition丟失了,可以通過血統(lineage)信息重新計算恢復;
l如果RDD任何分區因worker節點故障而丟失,那麼這個分區可以從原來依賴的容錯數據集中恢復;
l由於Spark中所有的數據的轉換操作都是基於RDD的,即使集群出現故障,只要輸入數據集存在,所有的中間結果都是可以被計算的。
Spark Streaming是可以從HDFS和S3這樣的文件系統讀取數據的,這種情況下所有的數據都可以被重新計算,不用擔心數據的丟失。但是在大多數情況下,Spark Streaming是基於網路來接受數據的,此時為了實現相同的容錯處理,在接受網路的數據時會在集群的多個Worker節點間進行數據的複製(默認的複製數是2),這導致產生在出現故障時被處理的兩種類型的數據:
1)Data received and replicated :一旦一個Worker節點失效,系統會從另一份還存在的數據中重新計算。
2)Data received but buffered for replication :一旦數據丟失,可以通過RDD之間的依賴關係,從HDFS這樣的外部文件系統讀取數據。
此外,有兩種故障,我們應該關心:
(1)Worker節點失效:通過上面的講解我們知道,這時系統會根據出現故障的數據的類型,選擇是從另一個有複製過數據的工作節點上重新計算,還是直接從從外部文件系統讀取數據。
(2)Driver(驅動節點)失效 :如果運行 Spark Streaming應用時驅動節點出現故障,那麼很明顯的StreamingContext已經丟失,同時在內存中的數據全部丟失。對於這種情況,Spark Streaming應用程序在計算上有一個內在的結構——在每段micro-batch數據周期性地執行同樣的Spark計算。這種結構允許把應用的狀態(亦稱checkpoint)周期性地保存到可靠的存儲空間中,並在driver重新啟動時恢復該狀態。具體做法是在ssc.checkpoint(<checkpoint directory>)函數中進行設置,Spark Streaming就會定期把DStream的元信息寫入到HDFS中,一旦驅動節點失效,丟失的StreamingContext會通過已經保存的檢查點信息進行恢復。
最後我們談一下Spark Stream的容錯在Spark 1.2版本的一些改進:
實時流處理系統必須要能在24/7時間內工作,因此它需要具備從各種系統故障中恢復過來的能力。最開始,SparkStreaming就支持從driver和worker故障恢復的能力。然而有些數據源的輸入可能在故障恢復以後丟失數據。在Spark1.2版本中,Spark已經在SparkStreaming中對預寫日誌(也被稱為journaling)作了初步支持,改進了恢復機制,並使更多數據源的零數據丟失有了可靠。
對於文件這樣的源數據,driver恢復機制足以做到零數據丟失,因為所有的數據都保存在了像HDFS或S3這樣的容錯文件系統中了。但對於像Kafka和Flume等其它數據源,有些接收到的數據還只緩存在內存中,尚未被處理,它們就有可能會丟失。這是由於Spark應用的分布操作方式引起的。當driver進程失敗時,所有在standalone/yarn/mesos集群運行的executor,連同它們在內存中的所有數據,也同時被終止。對於Spark Streaming來說,從諸如Kafka和Flume的數據源接收到的所有數據,在它們處理完成之前,一直都緩存在executor的內存中。縱然driver重新啟動,這些緩存的數據也不能被恢復。為了避免這種數據損失,在Spark1.2發布版本中引進了預寫日誌(WriteAheadLogs)功能。
預寫日誌功能的流程是:1)一個SparkStreaming應用開始時(也就是driver開始時),相關的StreamingContext使用SparkContext啟動接收器成為長駐運行任務。這些接收器接收並保存流數據到Spark內存中以供處理。2)接收器通知driver。3)接收塊中的元數據(metadata)被發送到driver的StreamingContext。這個元數據包括:(a)定位其在executor內存中數據的塊referenceid,(b)塊數據在日誌中的偏移信息(如果啟用了)。
用戶傳送數據的生命周期如下圖所示。
類似Kafka這樣的系統可以通過複製數據保持可靠性。允許預寫日誌兩次高效地複製同樣的數據:一次由Kafka,而另一次由SparkStreaming。Spark未來版本將包含Kafka容錯機制的原生支持,從而避免第二個日誌。
2.3.2 持久化
與RDD一樣,DStream同樣也能通過persist()方法將數據流存放在內存中,默認的持久化方式是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣做的好處是遇到需要多次迭代計算的程序時,速度優勢十分的明顯。而對於一些基於窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操作,如updateStateBykey,其默認的持久化策略就是保存在內存中。
對於來自網路的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩台機器上,這也是為了容錯性而設計的。
另外,對於窗口和有狀態的操作必須checkpoint,通過StreamingContext的checkpoint來指定目錄,通過 Dtream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。
2.3.3 性能調優
1. 優化運行時間
l 增加並行度 確保使用整個集群的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用集群資源;
l 減少數據序列化,反序列化的負擔 Spark Streaming默認將接受到的數據序列化後存儲,以減少內存的使用。但是序列化和反序列話需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的系列化介面可以更高效地使用CPU;
l 設置合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的作業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此設置一個合理的批處理間隔以確保作業能夠在這個批處理間隔內結束時必須的;
l 減少因任務提交和分發所帶來的負擔 通常情況下,Akka框架能夠高效地確保任務及時分發,但是當批處理間隔非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式通常會比使用Fine-grained Mesos模式有更小的延遲。
2. 優化內存使用
l控制batch size(批處理間隔內的數據量) Spark Streaming會把批處理間隔內接收到的所有數據存放在Spark內部的可用內存區域中,因此必須確保當前節點Spark的可用內存中少能容納這個批處理時間間隔內的所有數據,否則必須增加新的資源以提高集群的處理能力;
l及時清理不再使用的數據 前面講到Spark Streaming會將接受的數據全部存儲到內部可用內存區域中,因此對於處理過的不再需要的數據應及時清理,以確保Spark Streaming有富餘的可用內存空間。通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數需要小心設置以免後續操作中所需要的數據被超時錯誤處理;
l觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引起一系列不可預料的問題。觀察GC的運行情況,採用不同的GC策略以進一步減小內存回收對Job運行的影響。
推薦閱讀:
※雲計算是什麼意思?雲計算有什麼用?
※酷站推薦 - yuque.com - 語雀 | 輕鬆擁有在線知識庫 | 團隊文檔編輯
TAG:雲計算平台 |