Spark源碼分析(3) RDD 的轉換
RDD 的轉換可以產生新的 RDD
如上圖,外圈是 RDD 的轉換,內圈紅色 RDD 是轉換產生的新 RDD。
按顏色區分轉換:
- 綠色是單 RDD 窄依賴轉換
- 黑色是多 RDD 窄依賴轉換
- 紫色是 KV 洗牌型轉換
- 黃色是重分區轉換
- 藍色是特例的轉換
單 RDD 窄依賴轉換
MapPartitionRDD
這個 RDD 在第一次分析中已經分析過。簡單複述一下:
- 依賴列表:一個窄依賴,依賴上游 RDD
- 分區列表:上游 RDD 的分區列表
- 計算流程:映射關係(輸入一個分區,返回一個迭代器)
- 分區器 :上游 RDD 的分區器
- 存儲位置:上游 RDD 的優先位置 可見除了計算流程,其他都是上游 RDD 的內容。
- map 傳入一個帶「值到值」轉化函數的迭代器(例如字元串到字元串長度)
- mapPartitions 傳入一個「迭代器到迭代器」的轉化函數,如果需要按分區做一些比較重的過程(例如資料庫連接等)
- flatMap 傳入一個「迭代器到迭代器的迭代器」的轉化函數(例如,統計字母,「字元串的迭代器」到「『字元的迭代器』的迭代器」)
- filter 傳入了一個帶「值到布爾值」篩選函數的迭代器
PartitionwiseSampledRDD
在分區中採樣的RDD
- 分區列表:在上游的分區的基礎上包裝一個採樣過程,形成一個新的分區PartitionwiseSampledRDDPartition
- 計算流程:採樣器返回的迭代器
- 其他成分:與上游 RDD 相同 PartitionwiseSampledRDD,有放回的採樣用泊松採樣器,無放回的採樣用伯努利採樣器,傳給分區器。
多 RDD 窄依賴
UnionRDD
- 依賴列表:每個上游 RDD 一個RangeDependency,每個RangeDependency依賴上游 RDD 的所有分區
- 分區列表:每個上游 RDD 一個UnionPartition,構成列表
- 計算流程:獲得目標分區的迭代器
- 分區器 :None
- 存儲位置:每個上游 RDD 的優先位置
CartesianRDD
笛卡爾積,是兩個 RDD 每個數據都進行一次關聯。下文中兩個 RDD 的關聯中,兩個 RDD 分別稱為 rdd1、rdd2。
- 依賴列表:兩個窄依賴組成的數組,分別依賴 rdd1、rdd2
- 分區列表:「rdd1的分區數 乘以 rdd2的分區數」個分區
- 計算流程:rdd1的一條記錄與 rdd2的一條記錄合成元組
- 分區器 :None
- 存儲位置:rdd1、rdd2的存儲位置的積
洗牌型轉換
洗牌型轉換,是多個 RDD 關聯的的轉換。
CoGroupedRDD
多個源 RDD 依據 key 關聯,key 相同的合併,形成最終的目標 RDD。
- 依賴列表:每個源 RDD 一個依賴,構成列表。如果源 RDD 的分區器與目標的分區器相同,則是1-to-1依賴,如果不同,則是洗牌依賴
- 分區列表:目標 RDD 分區器指定的分區數量個CoGroupPartition,每個分區記錄了數據來源分區。其中如果是洗牌依賴的數據源,需要洗牌過程,具體洗牌過程以後再分析
- 計算流程:返回一個迭代器,迭代對象是 key 和 key 對應源分區迭代器的數組 組成的元祖
- 分區器 :目標 RDD 的分區器
- 存儲位置:None
ShuffledRDD
同樣是多個源 RDD 依據 key 關聯,key 相同的做排序或聚合運算,形成最終的目標 RDD。
- 依賴列表:一個洗牌依賴,依賴所有上游 RDD
- 分區列表:目標 RDD 分區器指定的分區數量個ShuffledRDDPartition,每個分區只有一個編號(因為每個上游分區)
- 計算流程:洗牌過程,具體洗牌過程以後再分析
- 分區器 :目標 RDD 的分區器
- 存儲位置:None
除了這五個成員以外,還有另外幾個重要的成員:序列化器、key 排序器、聚合器、map 端合併器,他們都將用於洗牌
其他
- coalesce,是減少分區數量,可以在過濾之後,使數據更集中,以提高效率
- repartition,是重新分區,增加或減少分區數量,數據隨機重新分配,可以消除分區間的數據量差異
- pipe,是與外部程序管道關聯,從外部程序中獲取數據。
Scala語法
在 RDD.scala中,幾乎每一個轉換和操作函數都會有一個withScope,例如:
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
withScope是一個函數,調用了RDDOperationScope.withScope方法:
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
withScope就像是一個 AOP(面向切面編程),嵌入到所有RDD 的轉換和操作的函數中,RDDOperationScope會把調用棧記錄下來,用於繪製Spark UI的 DAG(有向無環圖,可以理解為 Spark 的執行計劃)。
我們用下面的代碼簡單演示一下 Scala 用函數做 AOP:
object Day1 { def main(args: Array[String]) = { Range(1,5).foreach(twice) println() Array("China", "Beijing", "HelloWorld").foreach(length) } def twice(i: Int): Int = aopPrint { i * 2 } def length(s: String): Int = aopPrint { s.length } def aopPrint[U](i: => U): U = { print(i + " ") i }}
aopPrint的 入參是「一個返回類型為U的函數」。這段程序中aopPrint就是一個模擬的切面,作用是把所有的函數返回值列印出來。結果是:
2 4 6 8 5 7 10
從代碼上看,aopPrint並沒有降低代碼的可讀性。讀者依然能很清楚地讀懂twice和length函數。列印返回結果這個流程是獨立於函數之外的切面。
結論
- RDD 的轉換分圖上幾種
- RDD 的轉換可以看成是產生新的 RDD,而新的 RDD 記錄了每一個分區依賴上游的哪些分區、每個分區如何用上游分區計算而來
本文源碼
spark/core/rdd包下的部分 RDD 類spark/core/src/main/scala/org/apache/spark/rdd at master · apache/spark · GitHub
推薦閱讀:
※Scala快速入門系列:聲明變數、控制結構與函數、常用數組操作
※想研讀下spark的源碼,怎麼搭閱讀和調試的環境呢?
※矽谷之路54:深入淺出Spark(七)如何排序100TB
※如何用spark做矩陣計算?
TAG:Spark |