1.5版本的Spark自己來管理內存而不是使用JVM,這不使用JVM而是自己來管理內存是咋回事?
1.5版本的Spark自己來管理內存而不是使用JVM,這樣可以避免JVM GC帶來的性能損失,這不使用JVM而是自己來管理內存是咋回事?
Spark的作者之一的一篇博文對此作了詳細解釋
Project Tungsten: Bringing Spark Closer to Bare Metal如果英文不好可以看csdn上有人翻譯的:Project Tungsten:讓Spark將硬體性能壓榨到極限-CSDN.NET總的來說,就是spark運行過程中,jvm對象和GC消耗的資源過大,為了減少這兩項佔用的時間,spark1.5採用了自己的內存管理器來直接控制二進位數據。但是,這些優化僅僅是針對DataFrame模塊的,如果你直接使用RDD,和以前並沒有什麼區別。
最近正好在專欄里寫了一篇相關的文章,主要介紹Hadoop生態圈以內存為中心的一些項目逃離JVM,自己進行內存管理的背景,進展以及未來發展的趨勢,鏈接如下:脫離JVM? Hadoop生態圈的掙扎與演化 - 大數據技術與實踐 - 知乎專欄,有興趣可以看看。對於題主的問題:Spark如何自己管理內存?,大概需要用到的主要技術包括:
1. 定製的序列化工具。顯式內存管理的前提步驟就是序列化,將Java對象序列化成二進位數據存儲在off-heap上。
2. 顯式的內存管理。按塊申請釋放內存,通過UnSafe API存取二進位數據。
3. 緩存友好的數據結構和演算法。只將操作相關的數據連續存儲,可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率,提升CPU計算的吞吐量。以排序為例,由於排序的主要操作是對Key進行對比,Spark將所有排序數據的Key與Value分開,對Key連續存儲,所以訪問Key時的Cache命中率會大大提高。
這個功能源於Databricks的Tungsten項目。可以參考這篇博客:Project Tungsten: Bringing Spark Closer to Bare Metal
Tungsten項目是Spark執行引擎的巨大修改,聚焦在提高Spark應用的內存和CPU利用率。
實踐證據表明:CPU和內存逐漸成為Spark性能瓶頸,而不是網路IO。
為什麼說CPU成為了新的瓶頸?有多方面的原因。
- 硬體水平的提升,例如10Gbps網卡,高帶寬SSD硬碟等等。
- 軟體層面的優化,例如數據裁剪避免了大規模讀取數據的IO。對spark shuffle來說,序列化和哈希都是CPU密集型的。
綜上:Spark today is often constrained by CPU efficiency and memory pressure rather than IO.
Tungsten主要包括三方面內容:
- 內存管理和二進位數據處理:可以徹底淘汰JVM的內存模型和垃圾回收機制。
- 能夠感知緩存的計算:能夠充分利用數據存儲層次的演算法和數據結構。
- Code Generation機制:充分利用現代編譯器和CPU資源。
大概的意思是說Java對象在內存中的存儲開銷很大。
例如:
string "abcd"在JVM的存儲,每個字元以UTF-16編碼,每個String對象還包含12位元組的頭信息和8位元組的hashcode。
於是形成了如下存儲結構:
java.lang.String object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) ...
4 4 (object header) ...
8 4 (object header) ...
12 4 char[] String.value []
16 4 int String.hash 0
20 4 int String.hash32 0
Instance size: 24 bytes (reported by Instrumentation API)
一個簡單的4位元組的string存儲到JVM管理的內存中竟然佔用了48位元組!
當內存不夠用時,GC的開銷就會增大。
那麼:
Spark不是普通的Java應用,它能夠理解各個計算階段的數據流形式,
Spark比垃圾回收器更了解數據對象的生存周期。從而更高效地進行內存管理。Tungsten引入了新的內存管理模型直接操作內存中的二進位數據而不是Java對象。
基於sun.misc.Unsafe類庫實現,該庫由JVM提供,用來實現C語言風格的內存訪問。
另外Unsafe庫中的方法都是內置的,意味著會調用時會直接被JIT編譯成一段機器指令序列。Spark 1.4中的實現如下:
用一個hashmap直接訪問內存中Spark管理的二進位數據,性能提升很明顯。
- Spark 1.4 該hashmap將用於SQL DataFrames中的聚合計算。
- Spark 1.5 將用於更多計算,如sort 和 join。
Spark以內存計算著稱。基於緩存感知的計算能夠充分利用L1/ L2/L3 CPU cache。
通過對Spark用戶應用進行性能剖析,我們發現CPU計算時間的很大一部分比例用在等待數據從內存中取回。
因此Tungsten的一部分工作就是設計cache-friendly演算法和數據結構來提高緩存命中率。
例如對記錄進行排序,標準的排序流程是存儲一個指向記錄的指針數組來進行快排,交換指針而非記錄。
Sort一般有比較好的緩存命中率,因為對數據的掃描是連續的。
然而,對指針排序則緩存命中率很差。因為通過指針訪問的數據不是物理連續的。那麼我們如何提升數據排序時數據訪問的局部性呢?
一個簡單的方法是:將每個記錄的key和記錄的指針存儲在一起。
例如:排序的key是int64,那麼我們每個記錄用int128來存儲(64位指針,64位key)
這樣再進行排序比較兩個記錄時將不再需要通過指針的方式去訪問key,從而提升緩存命中率。
上述原理在Spark中上述原理如何應用呢?
大多數分散式數據處理都可歸結為一系列的運算例如聚合排序或者join。
我們開發一個內置的cache-aware的sort運算元即可比之前的性能提升3倍以上,該排序將在基於sort的shuffle,高基數聚合以及sort-merge join中使用到。
後續大多數運算的底層演算法都將做到cache-aware。
3. Code Generation所謂Code Generation是指:Spark在運行時動態生成用於表達式求值的bytecode而不是通過解釋器一步步地解釋執行。
Code Generation減少了對基礎數據類型的封裝,並且避免了虛方法調用的高開銷。
隨著Tungsten的推進,Code generation覆蓋了更多表達式的計算。
另外,我們將陸續Code generation從應用在一次計算一條記錄的表達式求值上,進而推廣到應用在向量化表達式求值上以及其他場景。
例如內存中的二進位格式數據轉化為shuffle階段用於傳輸的數據形式時,這一過程也將使用Code generation技術加速數據的序列化。
性能提升同Kryo對比:
嘖嘖,破爛spark裝犢子搞直接內存自己玩樂,你等Java9出來的unsafe都沒了, 看玩不死你,哈哈哈,爪哇碼農就乖乖的著眼於業務實現,至於性能交給jit吧。
就是使用unsafe裡面的native api直接在堆外來分配內存,這樣就能避免gc…
對於數據量較大的計算來說,spark會將好多數據都存放在內存,由於計算時間長,這些數據可能是map的結果等著做join,所以full gc可能並不能降低內存使用,所以會出現頻繁的gc,計算也就無法繼續進行,如果full gc時間太長甚至會導致心跳收不到,任務就掛了其他人說的理論,這些patch供參考
SPARK-7076 Binary processing compact tuple representationSPARK-7077 Binary processing hash table for aggregationSPARK-7080 Binary processing based aggregate operatorSPARK-7081 Faster sort-based shuffle path using binary processing cache-aware sortSPARK-7082 Binary processing external sort-merge joinSPARK-7083 Binary processing dimensional joinSPARK-7184 Investigate turning codegen on by defaultSPARK-7190 UTF8String backed by binary dataSPARK-7251 Perform sequential scan when iterating over entries in BytesToBytesMapSPARK-7288 Suppress compiler warnings due to use of sun.misc.UnsafeSPARK-7293 Report memory used in aggregations and joinsSPARK-7311 Enable in-memory serialized map-side shuffle to work with SQL serializersSPARK-7375 Avoid defensive copying in SQL exchange operator when sort-based shuffle buffers data in serialized formSPARK-7440 Remove physical Distinct operator in favor of AggregateSPARK-7450 Use UNSAFE.getLong() to speed up BitSetMethods#anySet()SPARK-7517 Rename unsafe module to managedmemory SPARK-7691 Use type-specific row accessor functions in CatalystTypeConverters" toScala functionsSPARK-7698 Implement buffer pooling / re-use in ExecutorMemoryManager when using HeapAllocatorSPARK-7812 Speed up SQL code generationSPARK-7813 Push code generation into expression definition SPARK-7814 Turn code generation on by defaultSPARK-7815 Enable UTF8String to work against memory address directlySPARK-7887 Remove EvaluatedType from SQL ExpressionSPARK-7956 Use Janino to compile SQL expressionSPARK-8117 Push codegen into ExpressionSPARK-8149 Break ExpressionEvaluationSuite down to multiple filesSPARK-8154 Remove Term/Code type aliases in code generationSPARK-9700 Pick default page size more intelligently SPARK-9548 BytesToBytesMap could have a destructive iteratorSPARK-9228 Combine unsafe and codegen into a single optionSPARK-9363 SortMergeJoin operator should support UnsafeRow SPARK-9693 Reserve a page in all unsafe operators to avoid starving an operatorSPARK-9703 EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfiedSPARK-8160 Tungsten style external aggregation SPARK-9736 JoinedRow.anyNull should delegate to the underlying rowsSPARK-7165 Sort-merge Join for left/right outer joins SPARK-8159 Improve expression function coverage (Spark 1.5)SPARK-8189 Use 100ns precision for TimestampTypeSPARK-8190 ExpressionEvalHelper.checkEvaluation should also run the optimizer versionSPARK-8286 Rewrite UTF8String in Java and move it into unsafe package.SPARK-8301 Improve UTF8String substring/startsWith/endsWith/contains performanceSPARK-8303 Move DateUtils into unsafe packageSPARK-8305 Improve codegenSPARK-8307 Improve timestamp from parquetSPARK-8317 Do not push sort into shuffle in Exchange operatorSPARK-8319 Update logic related to key ordering in shuffle dependenciesSPARK-8354 Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMapSPARK-8446 Add helper functions for testing physical SparkPlan operators SPARK-8498 Fix NullPointerException in error-handling path in UnsafeShuffleWriterSPARK-8579 Support arbitrary object in UnsafeRow SPARK-8713 Support codegen for not thread-safe expressionsSPARK-8850 Turn unsafe mode on by defaultSPARK-8866 Use 1 microsecond (us) precision for TimestampTypeSPARK-8876 Remove InternalRow type alias in expressions packageSPARK-8879 Remove EmptyRow classSPARK-9022 UnsafeProjectSPARK-9023 UnsafeExchangeSPARK-9024 Unsafe HashJoinSPARK-9050 Remove out-of-date code in Exchange that was obsoleted by SPARK-8317SPARK-9054 Rename RowOrdering to InterpretedOrdering and use newOrdering to build orderingsSPARK-9143 Add planner rule for automatically inserting Unsafe Safe row format convertersSPARK-9247 Use BytesToBytesMap in unsafe broadcast joinSPARK-9258 Remove all semi join physical operatorSPARK-9266 Prevent "managed memory leak detected" exception from masking original exceptionSPARK-9285 Remove InternalRow"s inheritance from Row SPARK-9329 Bring UnsafeRow up to feature parity with other InternalRow implementationsSPARK-9331 Indent generated code properly when dumping them in debug code or exception modeSPARK-9334 Remove UnsafeRowConverter in favor of UnsafeProjectionSPARK-9336 Remove all extra JoinedRowsSPARK-9373 Support StructType in Tungsten style ProjectionSPARK-9389 Support ArrayType in TungstenSPARK-9394 CodeFormatter should handle parenthesesSPARK-9411 Make page size configurableSPARK-9412 Support records larger than a page sizeSPARK-9413 Support MapType in TungstenSPARK-9418 Use sort-merge join as the default shuffle join Improvement RESOLVED Reynold Xin SPARK-9421 Fix null-handling bug in UnsafeRow.getDouble, getFloat(), and get(ordinal, dataType)SPARK-9448 GenerateUnsafeProjection should not share expressions across instancesSPARK-9450 [INVALID] HashedRelation.get() could return an Iterator[Row] instead of Seq[Row]SPARK-9457 Sorting improvements SPARK-9464 Add property-based tests for UTF8String SPARK-9738 remove FromUnsafe and add its codegen version to GenerateSafeSPARK-9751 Audit operators to make sure they can support UnsafeRows SPARK-9784 Exchange.isUnsafe should check whether codegen and unsafe are enabledSPARK-9785 HashPartitioning compatibility should consider expression orderingSPARK-9815 Rename PlatformDependent.UNSAFE -&> Platform 更多關於Tungsten項目的進展,請關注SPARK-7075和SPARK-9697。Spark - ASF JIRA推薦閱讀:
※還有必要學習Hadoop 么?
※大家覺得目前 初學者學數據做hadoop時的集群配置是不是特別麻煩?有沒有一種便捷的方法?
※如何在 Spark 機器學習中應用 scikit-learn?
※Hadoop、spark、SaaS、PaaS、IaaS、雲計算概念區分?
※如何用形象的比喻描述大數據的技術生態?Hadoop、Hive、Spark 之間是什麼關係?
TAG:Java | Java虛擬機JVM | Java程序員 | Spark |