標籤:

怎樣理解spark中的partition和block的關係?

yarn模式中hdfs讀取數據的block和spark中一個概念么?那麼local模式呢,block怎麼理解?


hdfs中的block是分散式存儲的最小單元,類似於盛放文件的盒子,一個文件可能要佔多個盒子,但一個盒子里的內容只可能來自同一份文件。假設block設置為128M,你的文件是250M,那麼這份文件佔3個block(128+128+2)。這樣的設計雖然會有一部分磁碟空間的浪費,但是整齊的block大小,便於快速找到、讀取對應的內容。(p.s. 考慮到hdfs冗餘設計,默認三份拷貝,實際上3*3=9個block的物理空間。)

spark中的partition 是彈性分散式數據集RDD的最小單元,RDD是由分布在各個節點上的partition 組成的。partition 是指的spark在計算過程中,生成的數據在計算空間內最小單元,同一份數據(RDD)的partition 大小不一,數量不定,是根據application里的運算元和最初讀入的數據分塊數量決定的,這也是為什麼叫「彈性分散式」數據集的原因之一。

總結:

block位於存儲空間、partition 位於計算空間,

block的大小是固定的、partition 大小是不固定的,

block是有冗餘的、不會輕易丟失,partition(RDD)沒有冗餘設計、丟失之後重新計算得到


在storage模塊裡面所有的操作都是和block相關的,但是在RDD裡面所有的運算都是基於partition的,那麼partition是如何與block對應上的呢?

RDD計算的核心函數是iterator()函數:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

如果當前RDD的storage level不是NONE的話,表示該RDD在BlockManager中有存儲,那麼調用CacheManager中的getOrCompute()函數計算RDD,在這個函數中partition和block發生了關係:

首先根據RDD id和partition index構造出block id (rdd_xx_xx),接著從BlockManager中取出相應的block。

  • 如果該block存在,表示此RDD在之前已經被計算過和存儲在BlockManager中,因此取出即可,無需再重新計算。
  • 如果該block不存在則需要調用RDD的computeOrReadCheckpoint()函數計算出新的block,並將其存儲到BlockManager中。

需要注意的是block的計算和存儲是阻塞的,若另一線程也需要用到此block則需等到該線程block的loading結束。

def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
case Some(values) =&>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
case None =&>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
logInfo("Another thread is loading %s, waiting for it to finish...".format (key))
while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =&>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn"t want to make space for it. However, that case is unlikely
// because it"s unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
case Some(values) =&>
return values.asInstanceOf[Iterator[T]]
case None =&>
logInfo("Whoever was loading %s failed; we"ll try it ourselves".format (key))
loading.add(key)
}
} else {
loading.add(key)
}
}
try {
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, true)
return elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}


不是同一個概念。hdfs 中的 block 是存儲的最小單元,spark 中的 block 是 rdd 在被 task 執行之前,其基本組成 partition 被 blockManage 映射 而來的一種抽象。


其實在spark中block和partition是有一點關係的,在一個rdd從sc.textFile讀進來的話它的partition個數等於原始文件的block數


val part = sc.textFile("hdfs://host35:9000/data/wiki120M/")

part.partitions.size

2 按理是1

val part = sc.textFile("hdfs://host35:9000/data/wiki240M/")

part.partitions.size

2 按理是2

val part = sc.textFile("hdfs://host35:9000/data/wiki10G/")

part.partitions.size

102

val part = sc.wholeTextFiles("hdfs://host35:9000/data/wiki120M/")

part.partitions.size 1

val part = sc.wholeTextFiles("hdfs://host35:9000/data/wiki240M/")

part.partitions.size 2

val part = sc.wholeTextFiles("hdfs://host35:9000/data/wiki10G/")

part.partitions.size 2 按理是102

看到資料說spark partitions的默認數是hdfs block數目。但是從上面的執行情況看,完全不是。。。HDFS Block Size=128MB。


初步探索之後發現partition的數量是與文件數量,文件大小,block設置額的大小有關:

如下有 14個文件,大小看左邊,此處的 hdfs block塊的大小設置為 dfs.blocksize=134217728

252.9 M 758.8 M /user/hive/warehouse/tmp_table.db/tmp20170420/000000_0

252.9 M 758.6 M /user/hive/warehouse/tmp_table.db/tmp20170420/000001_0

252.7 M 758.1 M /user/hive/warehouse/tmp_table.db/tmp20170420/000002_0

252.4 M 757.2 M /user/hive/warehouse/tmp_table.db/tmp20170420/000003_0

252.3 M 756.9 M /user/hive/warehouse/tmp_table.db/tmp20170420/000004_0

252.2 M 756.7 M /user/hive/warehouse/tmp_table.db/tmp20170420/000005_0

252.2 M 756.7 M /user/hive/warehouse/tmp_table.db/tmp20170420/000006_0

252.2 M 756.6 M /user/hive/warehouse/tmp_table.db/tmp20170420/000007_0

252.0 M 755.9 M /user/hive/warehouse/tmp_table.db/tmp20170420/000008_0

251.8 M 755.4 M /user/hive/warehouse/tmp_table.db/tmp20170420/000009_0

242.0 M 726.0 M /user/hive/warehouse/tmp_table.db/tmp20170420/000010_0

187.0 M 561.1 M /user/hive/warehouse/tmp_table.db/tmp20170420/000011_0

161.1 M 483.4 M /user/hive/warehouse/tmp_table.db/tmp20170420/000012_0

108.2 M 324.5 M /user/hive/warehouse/tmp_table.db/tmp20170420/000015_0

在HDFS設置block塊的大小為128M 但是一個130多MB的文件依然算是一個block

在spark中partition的數量同樣是做近似計算,一個137MB的文件依然被認為是一個partition的數據

所以上面的數據,在spark中的處理partition的數量為27,處理的task的數量也是27個 ,基本上和block塊的數量一致。


二者一一對應,和hdfs的數據塊沒關係


排名第一的答案講得很清楚了,block位於存儲空間,partition位於計算空間。這麼說吧,Spark雖說經常部署在Hadoop平台上,使用hdfs作為存儲,但是也部署在其他的平台的。


我在《Spark大數據處理: 技術、應用與性能優化》查到:物理上,rdd裡面的一個分區是對應一個block,而在邏輯上,RDD的每個分區叫一個Partition。不知道是否正確?請大家斧正!

ps:這裡的p1就是物理分區,rdd只是提取部分數據,這樣也符合樓上的第三點"部分管理"


推薦閱讀:

在Spark集群中,集群的節點個數、RDD分區個數、?cpu內核個數三者與並行度的關係??
1.5版本的Spark自己來管理內存而不是使用JVM,這不使用JVM而是自己來管理內存是咋回事?
還有必要學習Hadoop 么?
大家覺得目前 初學者學數據做hadoop時的集群配置是不是特別麻煩?有沒有一種便捷的方法?
如何在 Spark 機器學習中應用 scikit-learn?

TAG:Spark |