標籤:

Spark Streaming:大規模流式數據處理的新貴

噹噹噹噹~

俺老孫來也~

之前我們提到了過Spark的內核(感興趣的同學可以歷史回顧一下)

今天來講講大規模流式數據處理的新貴

Spark Streaming

提到Spark Streaming,我們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於數據分析的軟體棧。從它的視角來看,目前的大數據處理可以分為如以下三個類型。

  • 複雜的批量數據處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間。

  • 基於歷史數據的互動式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間。

  • 基於實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。

對於這些問題而言,目前已有很多相對成熟的開源軟體來處理以上三種情景,我們可以利用MapReduce來進行批量數據處理,可以用Impala來進行互動式查詢,對於流式數據處理,我們可以採用Storm。對於大多數互聯網公司來說,一般都會同時遇到以上三種情景,那麼在使用的過程中這些公司可能會遇到如下的不便。

  • 三種情景的輸入輸出數據無法無縫共享,需要進行格式相互轉換。

  • 每一個開源軟體都需要一個開發和維護團隊,提高了成本。

  • 在同一個集群中對各個系統協調資源分配比較困難。

BDAS就是以Spark為基礎的一套軟體棧,利用基於內存的通用計算模型將以上三種情景一網打盡,同時支持Batch、Interactive、Streaming的處理,且兼容支持HDFS和S3等分散式文件系統,可以部署在YARN和Mesos等流行的集群資源管理器之上。BDAS的構架如圖1所示,其中Spark可以替代MapReduce進行批處理,利用其基於內存的特點,特別擅長迭代式和互動式數據處理;Shark處理大規模數據的SQL查詢,兼容Hive的HQL。本文要重點介紹的Spark Streaming,在整個BDAS中進行大規模流式處理。

BDAS軟體棧

Spark Streaming構架

  • 計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然後將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加,或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。

Spark Streaming構架圖

  • 容錯性:對於流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯機制。每一個RDD都是一個不可變的分散式可重算的數據集,其記錄著確定性的操作繼承關係(lineage),所以只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。

Spark Streaming中RDD的lineage關係圖

  • 對於Spark Streaming來說,其RDD的傳承關係如圖片所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每一個Batch Size所產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連接的,由於Spark Streaming輸入數據可以來自於磁碟,例如HDFS(多份拷貝)或是來自於網路的數據流(Spark Streaming會將網路輸入數據的每一個數據流拷貝兩份到其他的機器)都能保證容錯性。所以RDD中任意的Partition出錯,都可以並行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

  • 實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會經過Spark DAG圖分解,以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式准實時計算場景。

  • 擴展性與吞吐量:Spark目前在EC2上已能夠線性擴展到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在Grep這個測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

Spark Streaming與Storm吞吐量比較圖

Spark Streaming的編程模型

Spark Streaming的編程和Spark的編程如出一轍,對於編程的理解也非常類似。對於Spark來說,編程就是對於RDD的操作;而對於Spark Streaming來說,就是對DStream的操作。下面將通過一個大家熟悉的WordCount的例子來說明Spark Streaming中的輸入操作、轉換操作和輸出操作。

  • Spark Streaming初始化:在開始進行DStream操作之前,需要對Spark Streaming進行初始化生成StreamingContext。參數中比較重要的是第一個和第三個,第一個參數是指定Spark Streaming運行的集群地址,而第三個參數是指定Spark Streaming運行時的batch窗口大小。在這個例子中就是將1秒鐘的輸入數據進行一次Spark Job處理。

val ssc = new StreamingContext(「Spark://…」, 「WordCount」, Seconds(1), [Homes], [Jars])

  • Spark Streaming的輸入操作:目前Spark Streaming已支持了豐富的輸入介面,大致分為兩類:一類是磁碟輸入,如以batch size作為時間間隔監控HDFS文件系統的某個目錄,將目錄中內容的變化作為Spark Streaming的輸入;另一類就是網路流的方式,目前支持Kafka、Flume、Twitter和TCP socket。在WordCount例子中,假定通過網路socket作為輸入流,監聽某個特定的埠,最後得出輸入DStream(lines)。

val lines = ssc.socketTextStream(「localhost」,8888)

  • Spark Streaming的轉換操作:與Spark RDD的操作極為類似,Spark Streaming也就是通過轉換操作將一個或多個DStream轉換成新的DStream。常用的操作包括map、filter、flatmap和join,以及需要進行shuffle操作的groupByKey/reduceByKey等。在WordCount例子中,我們首先需要將DStream(lines)切分成單詞,然後將相同單詞的數量進行疊加, 最終得到的wordCounts就是每一個batch size的(單詞,數量)中間結果。

val words = lines.flatMap(_.split(「 」))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

另外,Spark Streaming有特定的窗口操作,窗口操作涉及兩個參數:一個是滑動窗口的寬度(Window Duration);另一個是窗口滑動的頻率(Slide Duration),這兩個參數必須是batch size的倍數。例如以過去5秒鐘為一個輸入窗口,每1秒統計一下WordCount,那麼我們會將過去5秒鐘的每一秒鐘的WordCount都進行統計,然後進行疊加,得出這個窗口中的單詞統計。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

但上面這種方式還不夠高效。如果我們以增量的方式來計算就更加高效,例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼我們可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量(如圖5所示),這種方法可以復用中間三秒的統計量,提高統計的效率。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))

Spark Streaming中滑動窗口的疊加處理和增量處理

  • Spark Streaming的輸入操作:對於輸出操作,Spark提供了將數據列印到屏幕及輸入到文件中。在WordCount中我們將DStream wordCounts輸入到HDFS文件中。

wordCounts = saveAsHadoopFiles(「WordCount」)

  • Spark Streaming啟動:經過上述的操作,Spark Streaming還沒有進行工作,我們還需要調用Start操作,Spark Streaming才開始監聽相應的埠,然後收取數據,並進行統計。

ssc.start()

Spark Streaming案例分析

在互聯網應用中,網站流量統計作為一種常用的應用模式,需要在不同粒度上對不同數據進行統計,既有實時性的需求,又需要涉及到聚合、去重、連接等較為複雜的統計需求。傳統上,若是使用Hadoop MapReduce框架,雖然可以容易地實現較為複雜的統計需求,但實時性卻無法得到保證;反之若是採用Storm這樣的流式框架,實時性雖可以得到保證,但需求的實現複雜度也大大提高了。Spark Streaming在兩者之間找到了一個平衡點,能夠以准實時的方式容易地實現較為複雜的統計需求。下面介紹一下使用Kafka和Spark Streaming搭建實時流量統計框架。

  • 數據暫存:Kafka作為分散式消息隊列,既有非常優秀的吞吐量,又有較高的可靠性和擴展性,在這裡採用Kafka作為日誌傳遞中間件來接收日誌,抓取客戶端發送的流量日誌,同時接受Spark Streaming的請求,將流量日誌按序發送給Spark Streaming集群。

  • 數據處理:將Spark Streaming集群與Kafka集群對接,Spark Streaming從Kafka集群中獲取流量日誌並進行處理。Spark Streaming會實時地從Kafka集群中獲取數據並將其存儲在內部的可用內存空間中。當每一個batch窗口到來時,便對這些數據進行處理。

  • 結果存儲:為了便於前端展示和頁面請求,處理得到的結果將寫入到資料庫中。

相比於傳統的處理框架,Kafka+Spark Streaming的架構有以下幾個優點。

  • Spark框架的高效和低延遲保證了Spark Streaming操作的准實時性。

  • 利用Spark框架提供的豐富API和高靈活性,可以精簡地寫出較為複雜的演算法。

  • 編程模型的高度一致使得上手Spark Streaming相當容易,同時也可以保證業務邏輯在實時處理和批處理上的復用。

在基於Kafka+Spark Streaming的流量統計應用運行過程中,有時會遇到內存不足、GC阻塞等各種問題。下面介紹一下如何對Spark Streaming應用程序進行調優來減少甚至避免這些問題的影響。

性能調優

優化運行時間

  • 增加並行度。確保使用整個集群的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操作,增加其並行度以確保更為充分地使用集群資源。

  • 減少數據序列化、反序列化的負擔。Spark Streaming默認將接收到的數據序列化後存儲以減少內存的使用。但序列化和反序列化需要更多的CPU時間,因此更加高效的序列化方式(Kryo)和自定義的序列化介面可以更高效地使用CPU。

  • 設置合理的batch窗口。在Spark Streaming中,Job之間有可能存在著依賴關係,後面的Job必須確保前面的Job執行結束後才能提交。若前面的Job執行時間超出了設置的batch窗口,那麼後面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成後續Job的阻塞。因此,設置一個合理的batch窗口確保Job能夠在這個batch窗口中結束是必須的。

  • 減少任務提交和分發所帶來的負擔。通常情況下Akka框架能夠高效地確保任務及時分發,但當batch窗口非常小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone模式和Coarse-grained Mesos模式通常會比使用Fine-Grained Mesos模式有更小的延遲。

優化內存使用

  • 控制batch size。Spark Streaming會把batch窗口內接收到的所有數據存放在Spark內部的可用內存區域中,因此必須確保當前節點Spark的可用內存至少能夠容納這個batch窗口內所有的數據,否則必須增加新的資源以提高集群的處理能力。

  • 及時清理不再使用的數據。上面說到Spark Streaming會將接收到的數據全部存儲於內部的可用內存區域中,因此對於處理過的不再需要的數據應及時清理以確保Spark Streaming有富餘的可用內存空間。通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據。

  • 觀察及適當調整GC策略。GC會影響Job的正常運行,延長Job的執行時間,引起一系列不可預料的問題。觀察GC的運行情況,採取不同的GC策略以進一步減小內存回收對Job運行的影響。

Spark Streaming提供了一套高效、可容錯的准實時大規模流式處理框架,它能和批處理及即時查詢放在同一個軟體棧中,降低學習成本。如果你學會了Spark編程,那麼也就學會了Spark Streaming編程,如果理解了Spark的調度和存儲,那麼Spark Streaming也類似。

好了

今天俺老孫就講到這裡

明天同學們可以期待驚喜一下

推薦閱讀:

Spark MLlib數據類型介紹
Spark比Hadoop的優勢有這麼大嗎?
為什麼Spark比MapReduce快?
Scala 在大數據處理方面有何優勢?
spark在那裡指定master URL呢?

TAG:Spark | 大数据 |