內存有限的情況下 Spark 如何處理 T 級別的數據?
作為內存計算模型,我們的內存很難達到T級別,而需要使用的日誌數據很容易就到達這個量級,Spark如何處理這種情況?
UPDATE 1 簡單起見,下述答案僅就無shuffle的單stage Spark作業做了概要解釋。對於多stage任務而言,在內存的使用上還有很多其他重要問題沒有覆蓋。部分內容請參考評論中 @邵賽賽 給出的補充。Spark確實擅長內存計算,內存容量不足時也可以回退。
UPDATE 2 Spark被稱為「內存計算引擎」是因為它可以做內存計算,而不是它只能做內存計算。早年因為在使用內存cache的情況下ML演算法效率提升特別明顯(1-2數量級),因此造成了一些誤傳,使得很多初學者都認為Spark只能做內存計算,數據集放不進內存就沒轍了。實際上,內存cache對於Spark來說僅僅只是一個優化,即便完全關閉,效率仍然比MapReduce要來得高。去年Spark拿下Sort Benchmark的冠軍也很能說明問題(sort過程全程不使用內存cache)。詳情參見:Sort Benchmark Home Page
首先需要解開的一個誤區是,對於Spark這類內存計算系統,並不是說要處理多大規模的數據就需要多大規模的內存。Spark相對Hadoop MR有大幅性能提升的一個前提就是大量大數據作業同一時刻需要載入進內存的數據只是整體數據的一個子集,且大部分情況下可以完全放入內存,正如Shark(Spark上的Hive兼容的data warehouse)論文1.1節所述:
In fact, one study [1] analyzed the access
patterns in the Hive warehouses at Facebook and discovered
that for the vast majority (96%) of jobs, the entire inputs
could fit into a fraction of the cluster』s total memory.[1] G. Ananthanarayanan, A. Ghodsi, S. Shenker, and
I. Stoica. Disk-locality in datacenter computing
considered irrelevant. In HotOS 』11, 2011.
至於數據子集仍然無法放入集群物理內存的情況,Spark仍然可以妥善處理,下文還會詳述。
在Spark內部,單個executor進程內RDD的分片數據是用Iterator流式訪問的,Iterator的hasNext方法和next方法是由RDD lineage上各個transformation攜帶的閉包函數複合而成的。該複合Iterator每訪問一個元素,就對該元素應用相應的複合函數,得到的結果再流式地落地(對於shuffle stage是落地到本地文件系統留待後續stage訪問,對於result stage是落地到HDFS或送回driver端等等,視選用的action而定)。如果用戶沒有要求Spark cache該RDD的結果,那麼這個過程佔用的內存是很小的,一個元素處理完畢後就落地或扔掉了(概念上如此,實現上有buffer),並不會長久地佔用內存。只有在用戶要求Spark cache該RDD,且storage level要求在內存中cache時,Iterator計算出的結果才會被保留,通過cache manager放入內存池。
簡單起見,暫不考慮帶shuffle的多stage情況和流水線優化。這裡拿最經典的log處理的例子來具體說明一下(取出所有以ERROR開頭的日誌行,按空格分隔並取第2列):
val lines = spark.textFile("hdfs://&")
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://&
按傳統單機immutable FP的觀點來看,上述代碼運行起來好像是:
- 把HDFS上的日誌文件全部拉入內存形成一個巨大的字元串數組,
- Filter一遍再生成一個略小的新的字元串數組,
- 再map一遍又生成另一個字元串數組。
真這麼玩兒的話Spark早就不用混了……
如前所述,Spark在運行時動態構造了一個複合Iterator。就上述示例來說,構造出來的Iterator的邏輯概念上大致長這樣:
new Iterator[String] {
private var head: String = _
private var headDefined: Boolean = false
def hasNext: Boolean = headDefined || {
do {
try head = readOneLineFromHDFS(...) // (1) read from HDFS
catch {
case _: EOFException =&> return false
}
} while (!head.startsWith("ERROR")) // (2) filter closure
true
}
def next: String = if (hasNext) {
headDefined = false
head.split(" ")(1) // (3) map closure
} else {
throw new NoSuchElementException("...")
}
}
上面這段代碼是我按照Spark中FilteredRDD、MappedRDD的定義和Scala Iterator的filter、map方法的框架寫的偽碼,並且省略了從cache或checkpoint中讀取現成結果的邏輯。1、2、3三處便是RDD lineage DAG中相應邏輯嵌入複合出的Iterator的大致方式。每種RDD變換嵌入複合Iterator的具體方式是由不同的RDD以及Scala Iterator的相關方法定義的。可以看到,用這個Iterator訪問整個數據集,空間複雜度是O(1)。可見,Spark RDD的immutable語義並不會造成大數據內存計算任務的龐大內存開銷。
然後來看加cache的情況。我們假設errors這個RDD比較有用,除了拿出空格分隔的第二列以外,可能在同一個application中我們還會再頻繁用它干別的事情,於是選擇將它cache住:
val lines = spark.textFile("hdfs://&")
val errors = lines.filter(_.startsWith("ERROR")).cache() // &<-- !!!
val messages = errors.map(_.split(" ")(1))
messages.saveAsTextFile("hdfs://&
加了cache之後有什麼變化呢?實際上相當於在上述複合Iterator偽碼的(2)處,將filter出來的文本行逐一追加到了內存中的一個ArrayBuffer[String]里存起來形成一個block,然後通過cache manager扔進受block manager管理的內存池。注意這裡僅僅cache了filter出來的結果,HDFS讀出的原始數據沒有被cache,對errors做map操作後得到的messages RDD也沒有被cache。這樣一來,後續任務復用errors這個RDD時,直接從內存中取就好,就不用重新計算了。
1. 序列化
2. 外排
Spark其實也可以不in memory的,只不過要慢很多...
TB級 也要看多大的成本處理TB級,以下是我們的一些思路
基於spark排序的一種更廉價的實現方案-附基於spark的性能測試
排序可以說是很多日誌系統的硬指標(如按照時間逆序排序),如果一個大數據系統不能進行排序,基本上是這個系統屬於不可用狀態,排序算得上是大數據系統的一個「剛需」,無論大數據採用的是hadoop,還是spark,還是impala,hive,總之排序是必不可少的,排序的性能測試也是必不可少的。
有著計算奧運會之稱的Sort Benchmark全球排序每年都會舉行一次,每年巨頭都會在排序上進行巨大的投入,可見排序速度的高低有多麼重要!但是對於大多數企業來說,動輒上億的硬體投入,實在划不來、甚至遠遠超出了企業的項目預算。相比大數據領域的暴力排序有沒有一種更廉價的實現方式?
在這裡,我們為大家介紹一種新的廉價排序方法,我們稱為blockSort。
500G的數據300億條數據,只使用4台 16核,32G內存,千兆網卡的虛擬機即可實現 2~15秒的 排序 (可以全表排序,也可以與任意篩選條件篩選後排序)。
一、基本的思想是這樣的,如下圖所示:
1.將數據按照大小預先劃分好,如劃分成 大、中、小三個塊(block)。
2.如果想找最大的數據,那麼只需要在最大的那個塊里去找就可以了。
3.這個快還是有層級結構的,如果每個塊內的數據量很多,可以到下面的子快內進行繼續查找,可以分多個層進行排序。
4.採用這種方法,一個億萬億級別的數據(如long類型),最壞最壞的極端情況也就進行2048次文件seek就可以篩選到結果。
怎麼樣,原理是不是非常簡單,這樣數據量即使特別多,那麼排序與查找的次數是固定的。
二、這個是我們之前基於spark做的性能測試,供大家參考
在排序上,YDB具有絕對優勢,無論是全表,還是基於任意條件組合過濾,基本秒殺Spark任何格式。
測試結果(時間單位為秒)
三、當然除了排序上,我們的其他性能也是遠遠高於spark,這塊大家也可以了解一下
1、與Spark txt在檢索上的性能對比測試。
注釋:備忘。下圖的這塊,其實沒什麼特別的,只不過由於YDB本身索引的特性,不想spark那樣暴力,才會導致在掃描上的性能遠高於spark,性能高百倍不足為奇。
下圖為ydb相對於spark txt提升的倍數
2、這些是與 Parquet 格式對比(單位為秒)
3、與ORACLE性能對比
跟傳統資料庫的對比,已經沒啥意義,Oracle不適合大數據,任意一個大數據工具都遠超oracle 性能。
4.稽查布控場景性能測試
四、YDB是怎麼樣讓spark加速的?
基於Hadoop分散式架構下的實時的、多維的、互動式的查詢、統計、分析引擎,具有萬億數據規模下的秒級性能表現,並具備企業級的穩定可靠表現。
YDB是一個細粒度的索引,精確粒度的索引。數據即時導入,索引即時生成,通過索引高效定位到相關數據。YDB與Spark深度集成,Spark對YDB檢索結果集直接分析計算,同樣場景讓Spark性能加快百倍。
五、哪些用戶適合使用YDB?
1.傳統關係型數據,已經無法容納更多的數據,查詢效率嚴重受到影響的用戶。
2.目前在使用SOLR、ES做全文檢索,覺得solr與ES提供的分析功能太少,無法完成複雜的業務邏輯,或者數據量變多後SOLR與ES變得不穩定,在掉片與均衡中不斷惡性循環,不能自動恢復服務,運維人員需經常半夜起來重啟集群的情況。
3.基於對海量數據的分析,但是苦於現有的離線計算平台的速度和響應時間無滿足業務要求的用戶。
4.需要對用戶畫像行為類數據做多維定向分析的用戶。
5.需要對大量的UGC(User Generate Content)數據進行檢索的用戶。
6.當你需要在大數據集上面進行快速的,互動式的查詢時。
7.當你需要進行數據分析,而不只是簡單的鍵值對存儲時。
8.當你想要分析實時產生的數據時。
ps: 說了一大堆,說白了最適合的還是蹤跡分析因為數據量大,數據還要求實時,查詢還要求快。這才是關鍵。
視頻地址 (看不清的同學可以進入騰訊視頻 高清播放)
https://v.qq.com/x/page/q0371wjj8fb.html
https://v.qq.com/x/page/n0371l0ytji.html
感興趣的讀者也可以閱讀YDB編程指南 http://url.cn/42R4CG8 。也可以參考該書自己安裝延雲YDB進行測試。
感覺各位都沒給出樓主一個很明確的回答,個人在項目中利用SPARK處理T級別的複雜數據處理,例如多次SHUFFLE,對集群的壓力很大,但是經過調優以後還是可以運行的,關鍵幾點在這裡大概說一下:
1、對SHUFFLE過程的內存及配置信息的調整,SHUFFLE過程的內存比例容量,嘗試次數,等待時長等等的設置可以在對SHUFFLE處理過程比較多的業務場景提高程序的穩定性。
2、在讀取文件或者進行函數操作時,可以增加PERSIST函數對處理結果進行選擇,SPARK提供了內存,內存+磁碟,內存+磁碟+副本等方式供開發者選擇,在數據集很大的情況下,建議使用內存+磁碟+副本的方法,在某一個EXCUTOR宕機時,可以利用副本到其他節點上繼續運行。
3、就是對TASK,MEMORY等根據集群的物理機情況進行調配,達到程序運行的最佳狀態。
如果用戶沒有要求Spark cache該RDD的結果,那麼這個過程佔用的內存是很小的,一個元素處理完畢後就落地或扔掉了(概念上如此,實現上有buffer),並不會長久地佔用內存。只有在用戶要求Spark cache該RDD,且storage level要求在內存中cache時,Iterator計算出的結果才會被保留,通過cache manager放入內存池。
內存有限硬碟來湊。
媽的我現在看到spark 就腦袋疼。。。這t級的數據應該不是都需要裝進內存才能算的吧
具體技術方面硬碟湊,減少序列化成本,或者直接不系列化按byte保存和傳輸,或者用direct byte buffer之類的,但是這些都只是用硬碟湊時候做的性能優化。
從數據流程上梳理可能更有效,梳理數據依賴關係,從而減少重複計算(復用計算結果),減少網路傳輸,把高頻數據放內存盡量避免放硬碟,低頻數據扔硬碟題主也可以結合KryoSerializer的序列化方式試試,而不用默認的JavaSerialization。
如果將所有的數據都載入到內存肯定是不行的,但是spark在載入數據的時候,會將多餘的數據塊刷到磁碟上
推薦閱讀: