1.5版本的Spark自己來管理內存而不是使用JVM,這不使用JVM而是自己來管理內存是咋回事?

1.5版本的Spark自己來管理內存而不是使用JVM,這樣可以避免JVM GC帶來的性能損失,這不使用JVM而是自己來管理內存是咋回事?


Spark的作者之一的一篇博文對此作了詳細解釋

Project Tungsten: Bringing Spark Closer to Bare Metal

如果英文不好可以看csdn上有人翻譯的:

Project Tungsten:讓Spark將硬體性能壓榨到極限-CSDN.NET

總的來說,就是spark運行過程中,jvm對象GC消耗的資源過大,為了減少這兩項佔用的時間,spark1.5採用了自己的內存管理器來直接控制二進位數據。但是,這些優化僅僅是針對DataFrame模塊的,如果你直接使用RDD,和以前並沒有什麼區別。


最近正好在專欄里寫了一篇相關的文章,主要介紹Hadoop生態圈以內存為中心的一些項目逃離JVM,自己進行內存管理的背景,進展以及未來發展的趨勢,鏈接如下:脫離JVM? Hadoop生態圈的掙扎與演化 - 大數據技術與實踐 - 知乎專欄,有興趣可以看看。

對於題主的問題:Spark如何自己管理內存?,大概需要用到的主要技術包括:

1. 定製的序列化工具。顯式內存管理的前提步驟就是序列化,將Java對象序列化成二進位數據存儲在off-heap上。

2. 顯式的內存管理。按塊申請釋放內存,通過UnSafe API存取二進位數據。

3. 緩存友好的數據結構和演算法。只將操作相關的數據連續存儲,可以最大化的利用L1/L2/L3緩存,減少Cache miss的概率,提升CPU計算的吞吐量。以排序為例,由於排序的主要操作是對Key進行對比,Spark將所有排序數據的Key與Value分開,對Key連續存儲,所以訪問Key時的Cache命中率會大大提高。


這個功能源於Databricks的Tungsten項目。

可以參考這篇博客:Project Tungsten: Bringing Spark Closer to Bare Metal

這裡簡單說明一下:

Tungsten項目是Spark執行引擎的巨大修改,聚焦在提高Spark應用的內存和CPU利用率。

實踐證據表明:CPU和內存逐漸成為Spark性能瓶頸,而不是網路IO。

為什麼說CPU成為了新的瓶頸?有多方面的原因。

  1. 硬體水平的提升,例如10Gbps網卡,高帶寬SSD硬碟等等。

  2. 軟體層面的優化,例如數據裁剪避免了大規模讀取數據的IO。對spark shuffle來說,序列化和哈希都是CPU密集型的。

綜上:Spark today is often constrained by CPU efficiency and memory pressure rather than IO.

Tungsten主要包括三方面內容:

  • 內存管理和二進位數據處理:可以徹底淘汰JVM的內存模型和垃圾回收機制。
  • 能夠感知緩存的計算:能夠充分利用數據存儲層次的演算法和數據結構。
  • Code Generation機制:充分利用現代編譯器和CPU資源。

下面分別介紹一下:

1. Memory Management and Binary Processing

大概的意思是說Java對象在內存中的存儲開銷很大。

例如:

string "abcd"在JVM的存儲,每個字元以UTF-16編碼,每個String對象還包含12位元組的頭信息和8位元組的hashcode。

於是形成了如下存儲結構:

java.lang.String object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) ...
4 4 (object header) ...
8 4 (object header) ...
12 4 char[] String.value []
16 4 int String.hash 0
20 4 int String.hash32 0
Instance size: 24 bytes (reported by Instrumentation API)

一個簡單的4位元組的string存儲到JVM管理的內存中竟然佔用了48位元組

當內存不夠用時,GC的開銷就會增大。

那麼:

Spark不是普通的Java應用,它能夠理解各個計算階段的數據流形式,

Spark比垃圾回收器更了解數據對象的生存周期。從而更高效地進行內存管理。

Tungsten引入了新的內存管理模型直接操作內存中的二進位數據而不是Java對象。

基於sun.misc.Unsafe類庫實現,該庫由JVM提供,用來實現C語言風格的內存訪問。

另外Unsafe庫中的方法都是內置的,意味著會調用時會直接被JIT編譯成一段機器指令序列。

Spark 1.4中的實現如下:

用一個hashmap直接訪問內存中Spark管理的二進位數據,性能提升很明顯。

  • Spark 1.4 該hashmap將用於SQL DataFrames中的聚合計算。
  • Spark 1.5 將用於更多計算,如sort 和 join。

2. Cache-aware Computation

Spark以內存計算著稱。基於緩存感知的計算能夠充分利用L1/ L2/L3 CPU cache。

通過對Spark用戶應用進行性能剖析,我們發現CPU計算時間的很大一部分比例用在等待數據從內存中取回。

因此Tungsten的一部分工作就是設計cache-friendly演算法和數據結構來提高緩存命中率。

例如對記錄進行排序,標準的排序流程是存儲一個指向記錄的指針數組來進行快排,交換指針而非記錄。

Sort一般有比較好的緩存命中率,因為對數據的掃描是連續的。

然而,對指針排序則緩存命中率很差。因為通過指針訪問的數據不是物理連續的。

那麼我們如何提升數據排序時數據訪問的局部性呢?

一個簡單的方法是:將每個記錄的key和記錄的指針存儲在一起。

例如:排序的key是int64,那麼我們每個記錄用int128來存儲(64位指針,64位key)

這樣再進行排序比較兩個記錄時將不再需要通過指針的方式去訪問key,從而提升緩存命中率。

上述原理在Spark中上述原理如何應用呢?

大多數分散式數據處理都可歸結為一系列的運算例如聚合排序或者join。

我們開發一個內置的cache-aware的sort運算元即可比之前的性能提升3倍以上,該排序將在基於sort的shuffle,高基數聚合以及sort-merge join中使用到。

後續大多數運算的底層演算法都將做到cache-aware。

3. Code Generation

所謂Code Generation是指:Spark在運行時動態生成用於表達式求值的bytecode而不是通過解釋器一步步地解釋執行。

Code Generation減少了對基礎數據類型的封裝,並且避免了虛方法調用的高開銷。

隨著Tungsten的推進,Code generation覆蓋了更多表達式的計算。

另外,我們將陸續Code generation從應用在一次計算一條記錄的表達式求值上,進而推廣到應用在向量化表達式求值上以及其他場景。

例如內存中的二進位格式數據轉化為shuffle階段用於傳輸的數據形式時,這一過程也將使用Code generation技術加速數據的序列化。

性能提升同Kryo對比:


嘖嘖,破爛spark裝犢子搞直接內存自己玩樂,你等Java9出來的unsafe都沒了, 看玩不死你,哈哈哈,爪哇碼農就乖乖的著眼於業務實現,至於性能交給jit吧。


就是使用unsafe裡面的native api直接在堆外來分配內存,這樣就能避免gc…

對於數據量較大的計算來說,spark會將好多數據都存放在內存,由於計算時間長,這些數據可能是map的結果等著做join,所以full gc可能並不能降低內存使用,所以會出現頻繁的gc,計算也就無法繼續進行,如果full gc時間太長甚至會導致心跳收不到,任務就掛了


其他人說的理論,這些patch供參考

SPARK-7076 Binary processing compact tuple representation

SPARK-7077 Binary processing hash table for aggregation

SPARK-7080 Binary processing based aggregate operator

SPARK-7081 Faster sort-based shuffle path using binary processing cache-aware sort

SPARK-7082 Binary processing external sort-merge join

SPARK-7083 Binary processing dimensional join

SPARK-7184 Investigate turning codegen on by default

SPARK-7190 UTF8String backed by binary data

SPARK-7251 Perform sequential scan when iterating over entries in BytesToBytesMap

SPARK-7288 Suppress compiler warnings due to use of sun.misc.Unsafe

SPARK-7293 Report memory used in aggregations and joins

SPARK-7311 Enable in-memory serialized map-side shuffle to work with SQL serializers

SPARK-7375 Avoid defensive copying in SQL exchange operator when sort-based shuffle buffers data in serialized form

SPARK-7440 Remove physical Distinct operator in favor of Aggregate

SPARK-7450 Use UNSAFE.getLong() to speed up BitSetMethods#anySet()

SPARK-7517 Rename unsafe module to managedmemory

SPARK-7691 Use type-specific row accessor functions in CatalystTypeConverters" toScala functions

SPARK-7698 Implement buffer pooling / re-use in ExecutorMemoryManager when using HeapAllocator

SPARK-7812 Speed up SQL code generation

SPARK-7813 Push code generation into expression definition

SPARK-7814 Turn code generation on by default

SPARK-7815 Enable UTF8String to work against memory address directly

SPARK-7887 Remove EvaluatedType from SQL Expression

SPARK-7956 Use Janino to compile SQL expression

SPARK-8117 Push codegen into Expression

SPARK-8149 Break ExpressionEvaluationSuite down to multiple files

SPARK-8154 Remove Term/Code type aliases in code generation

SPARK-9700 Pick default page size more intelligently

SPARK-9548 BytesToBytesMap could have a destructive iterator

SPARK-9228 Combine unsafe and codegen into a single option

SPARK-9363 SortMergeJoin operator should support UnsafeRow

SPARK-9693 Reserve a page in all unsafe operators to avoid starving an operator

SPARK-9703 EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfied

SPARK-8160 Tungsten style external aggregation

SPARK-9736 JoinedRow.anyNull should delegate to the underlying rows

SPARK-7165 Sort-merge Join for left/right outer joins

SPARK-8159 Improve expression function coverage (Spark 1.5)

SPARK-8189 Use 100ns precision for TimestampType

SPARK-8190 ExpressionEvalHelper.checkEvaluation should also run the optimizer version

SPARK-8286 Rewrite UTF8String in Java and move it into unsafe package.

SPARK-8301 Improve UTF8String substring/startsWith/endsWith/contains performance

SPARK-8303 Move DateUtils into unsafe package

SPARK-8305 Improve codegen

SPARK-8307 Improve timestamp from parquet

SPARK-8317 Do not push sort into shuffle in Exchange operator

SPARK-8319 Update logic related to key ordering in shuffle dependencies

SPARK-8354 Fix off-by-factor-of-8 error when allocating scratch space in UnsafeFixedWidthAggregationMap

SPARK-8446 Add helper functions for testing physical SparkPlan operators

SPARK-8498 Fix NullPointerException in error-handling path in UnsafeShuffleWriter

SPARK-8579 Support arbitrary object in UnsafeRow

SPARK-8713 Support codegen for not thread-safe expressions

SPARK-8850 Turn unsafe mode on by default

SPARK-8866 Use 1 microsecond (us) precision for TimestampType

SPARK-8876 Remove InternalRow type alias in expressions package

SPARK-8879 Remove EmptyRow class

SPARK-9022 UnsafeProject

SPARK-9023 UnsafeExchange

SPARK-9024 Unsafe HashJoin

SPARK-9050 Remove out-of-date code in Exchange that was obsoleted by SPARK-8317

SPARK-9054 Rename RowOrdering to InterpretedOrdering and use newOrdering to build orderings

SPARK-9143 Add planner rule for automatically inserting Unsafe Safe row format converters

SPARK-9247 Use BytesToBytesMap in unsafe broadcast join

SPARK-9258 Remove all semi join physical operator

SPARK-9266 Prevent "managed memory leak detected" exception from masking original exception

SPARK-9285 Remove InternalRow"s inheritance from Row

SPARK-9329 Bring UnsafeRow up to feature parity with other InternalRow implementations

SPARK-9331 Indent generated code properly when dumping them in debug code or exception mode

SPARK-9334 Remove UnsafeRowConverter in favor of UnsafeProjection

SPARK-9336 Remove all extra JoinedRows

SPARK-9373 Support StructType in Tungsten style Projection

SPARK-9389 Support ArrayType in Tungsten

SPARK-9394 CodeFormatter should handle parentheses

SPARK-9411 Make page size configurable

SPARK-9412 Support records larger than a page size

SPARK-9413 Support MapType in Tungsten

SPARK-9418 Use sort-merge join as the default shuffle join Improvement RESOLVED Reynold Xin

SPARK-9421 Fix null-handling bug in UnsafeRow.getDouble, getFloat(), and get(ordinal, dataType)

SPARK-9448 GenerateUnsafeProjection should not share expressions across instances

SPARK-9450 [INVALID] HashedRelation.get() could return an Iterator[Row] instead of Seq[Row]

SPARK-9457 Sorting improvements

SPARK-9464 Add property-based tests for UTF8String

SPARK-9738 remove FromUnsafe and add its codegen version to GenerateSafe

SPARK-9751 Audit operators to make sure they can support UnsafeRows

SPARK-9784 Exchange.isUnsafe should check whether codegen and unsafe are enabled

SPARK-9785 HashPartitioning compatibility should consider expression ordering

SPARK-9815 Rename PlatformDependent.UNSAFE -&> Platform

  更多關於Tungsten項目的進展,請關注SPARK-7075和SPARK-9697。

Spark - ASF JIRA


推薦閱讀:

還有必要學習Hadoop 么?
大家覺得目前 初學者學數據做hadoop時的集群配置是不是特別麻煩?有沒有一種便捷的方法?
如何在 Spark 機器學習中應用 scikit-learn?
Hadoop、spark、SaaS、PaaS、IaaS、雲計算概念區分?
如何用形象的比喻描述大數據的技術生態?Hadoop、Hive、Spark 之間是什麼關係?

TAG:Java | Java虛擬機JVM | Java程序員 | Spark |